How can Akka flows materialize continuously?

I am using Akka Streams in Scala to poll AWS SQS using AWS Java SDK . I created an ActorPublisher that deactivates messages in a two second interval:

class SQSSubscriber(name: String) extends ActorPublisher[Message] { implicit val materializer = ActorMaterializer() val schedule = context.system.scheduler.schedule(0 seconds, 2 seconds, self, "dequeue") val client = new AmazonSQSClient() client.setRegion(RegionUtils.getRegion("us-east-1")) val url = client.getQueueUrl(name).getQueueUrl val MaxBufferSize = 100 var buf = Vector.empty[Message] override def receive: Receive = { case "dequeue" => val messages = iterableAsScalaIterable(client.receiveMessage(new ReceiveMessageRequest(url).getMessages).toList messages.foreach(self ! _) case message: Message if buf.size == MaxBufferSize => log.error("The buffer is full") case message: Message => if (buf.isEmpty && totalDemand > 0) onNext(message) else { buf :+= message deliverBuf() } case Request(_) => deliverBuf() case Cancel => context.stop(self) } @tailrec final def deliverBuf(): Unit = if (totalDemand > 0) { if (totalDemand <= Int.MaxValue) { val (use, keep) = buf.splitAt(totalDemand.toInt) buf = keep use foreach onNext } else { val (use, keep) = buf.splitAt(Int.MaxValue) buf = keep use foreach onNext deliverBuf() } } } 

In my application, I also try to start a thread with an interval of 2 seconds:

 val system = ActorSystem("system") val sqsSource = Source.actorPublisher[Message](SQSSubscriber.props("queue-name")) val flow = Flow[Message] .map { elem => system.log.debug(s"${elem.getBody} (${elem.getMessageId})"); elem } .to(Sink.ignore) system.scheduler.schedule(0 seconds, 2 seconds) { flow.runWith(sqsSource)(ActorMaterializer()(system)) } 

However, when I run my application, I get java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds] and subsequent dead letter messages caused by ActorMaterializer .

Is there a recommended approach for continuous materialization of Akka flow?

+5
source share
1 answer

I do not think you need to create a new ActorPublisher every 2 seconds. This seems redundant and wasteful to the memory. In addition, I do not think that an actor-publisher is needed. From what I can say about the code, your implementation will constantly increase the number of threads requesting the same data. Each Message from the client will be handled by N different akka threads, and, even worse, N will grow over time.

Iterator for infinite loop request

You can get the same behavior from your ActorPublisher using scala Iterator . You can create an Iterator that continuously queries the client:

 //setup the client val client = { val sqsClient = new AmazonSQSClient() sqsClient setRegion (RegionUtils getRegion "us-east-1") sqsClient } val url = client.getQueueUrl(name).getQueueUrl //single query def queryClientForMessages : Iterable[Message] = iterableAsScalaIterable { client receiveMessage (new ReceiveMessageRequest(url).getMessages) } def messageListIteartor : Iterator[Iterable[Message]] = Iterator continually messageListStream //messages one-at-a-time "on demand", no timer pushing you around def messageIterator() : Iterator[Message] = messageListIterator flatMap identity 

This implementation only asks the client when all previous messages have been destroyed, and therefore really reactive . There is no need to track a fixed size buffer. Your solution needs a buffer because message creation (via a timer) is canceled due to message consumption (via println). In my implementation, creation and consumption are closely related through backpressure.

Akka Stream Source

Then you can use this Iterator generator function to feed akka stream. A source:

 def messageSource : Source[Message, _] = Source fromIterator messageIterator 

Flow shaping

And finally, you can use this source to execute println (as a note: your flow value is actually Sink with Flow + Sink = Sink ). Using the flow value from the question:

 messageSource runWith flow 

One stream akka Stream processes all messages.

+7
source

All Articles