I do not think you need to create a new ActorPublisher every 2 seconds. This seems redundant and wasteful to the memory. In addition, I do not think that an actor-publisher is needed. From what I can say about the code, your implementation will constantly increase the number of threads requesting the same data. Each Message from the client will be handled by N different akka threads, and, even worse, N will grow over time.
Iterator for infinite loop request
You can get the same behavior from your ActorPublisher using scala Iterator . You can create an Iterator that continuously queries the client:
//setup the client val client = { val sqsClient = new AmazonSQSClient() sqsClient setRegion (RegionUtils getRegion "us-east-1") sqsClient } val url = client.getQueueUrl(name).getQueueUrl //single query def queryClientForMessages : Iterable[Message] = iterableAsScalaIterable { client receiveMessage (new ReceiveMessageRequest(url).getMessages) } def messageListIteartor : Iterator[Iterable[Message]] = Iterator continually messageListStream //messages one-at-a-time "on demand", no timer pushing you around def messageIterator() : Iterator[Message] = messageListIterator flatMap identity
This implementation only asks the client when all previous messages have been destroyed, and therefore really reactive . There is no need to track a fixed size buffer. Your solution needs a buffer because message creation (via a timer) is canceled due to message consumption (via println). In my implementation, creation and consumption are closely related through backpressure.
Akka Stream Source
Then you can use this Iterator generator function to feed akka stream. A source:
def messageSource : Source[Message, _] = Source fromIterator messageIterator
Flow shaping
And finally, you can use this source to execute println (as a note: your flow value is actually Sink with Flow + Sink = Sink ). Using the flow value from the question:
messageSource runWith flow
One stream akka Stream processes all messages.
source share