How to guarantee consistency for forks in akka

We create a chain of actors for each (small) incoming group of messages to ensure their sequential processing and conveyor (groups are differentiated by a common identifier). The problem is that our chain has forks, for example A1 -> (A2 -> A3 | A4 -> A5) , and we should not guarantee any costs between messages passing through A2 -> A3 and A4 -> A5 . The currently implemented solution is to block A1 actor til. The current message is fully processed (in one of the sub-chains):

 def receive { //pseudocode case x => ... val f = A2orA4 ? msg Await.complete(f, timeout) } 

As a result, the number of threads in the application is directly related to the number of messages that are being processed, regardless of whether these messages are active or just asynchronously waiting for a response from an external service. It has been working for about two years with a fork-join (or any other dynamic) pool, but, of course, it cannot work with a fixed pool and extremely reduces performance in case of high load. Moreover, this affects the GC, since each blocked fork-actor contains a redundant previous message state inside.

Even with backpressure, it creates N times more threads than received messages (since there are N consecutive forks in the stream), which is still bad, since the process of a single message takes a lot of time, but not so much CPU. Therefore, we must process more messages since we have enough memory. The first solution I came across was to linearize the chain as A1 -> A2 -> A3 -> A4 -> A5 . Is there any better?

+3
multithreading scala future akka
Jan 20 '15 at 12:05
source share
1 answer

A simpler solution is to save the future for the last message received in the state of the actor and associate it with the previous future:

 def receive = process(Future{new Ack}) //completed future def process(prevAck: Future[Ack]): Receive = { //pseudocode case x => ... context become process(prevAck.flatMap(_ => A2orA4 ? msg)) } 

Thus, he will create a chain of futures without any locks. The chain will be erased after the completion of futures (except the last).

+6
Jan 20 '15 at 12:05
source share



All Articles