Incremental processing acca acca

I have actors who need to work very long and work computationally expensive, but the calculation itself can be done gradually. Thus, although it takes several hours to fully calculate, the intermediate results are actually extremely useful, and I would like to be able to answer any of their requests. This is the pseudo code of what I want to do:

var intermediateResult = ... loop { while (mailbox.isEmpty && computationNotFinished) intermediateResult = computationStep(intermediateResult) receive { case GetCurrentResult => sender ! intermediateResult ...other messages... } } 
+6
source share
4 answers

I believe from your comment to Roland Kuhn that you have a job that can be considered recursive, at least in blocks. If this is not the case, I don’t think there might be some kind of clean solution to solve your problem, and you will have to deal with complex pattern matching blocks.

If my assumptions are correct, I would plan the calculation asynchronously and allow the actor to freely respond to other messages. The key point is the use of future monadic capabilities and the availability of a simple reception unit. You will have to handle three messages (startComput, changeState, getState)

As a result, you get the following:

 def receive { case StartComputation(myData) =>expensiveStuff(myData) case ChangeState(newstate) = this.state = newstate case GetState => sender ! this.state } 

And then you can use the map method in the future by defining your own recursive map:

  def mapRecursive[A](f:Future[A], handler: A => A, exitConditions: A => Boolean):Future[A] = { f.flatMap { a=> if (exitConditions(a)) f else { val newFuture = f.flatMap{ a=> Future(handler(a))} mapRecursive(newFuture,handler,exitConditions) } } } 

Once you have this tool, everything will be easier. If you look at the following example:

 def main(args:Array[String]){ val baseFuture:Future[Int] = Promise.successful(64) val newFuture:Future[Int] = mapRecursive(baseFuture, (a:Int) => { val result = a/2 println("Additional step done: the current a is " + result) result }, (a:Int) => (a<=1)) val one = Await.result(newFuture,Duration.Inf) println("Computation finished, result = " + one) } 

His conclusion:

Additional step: current a is 32

Additional step: current a is 16

Additional step: current a is 8

Additional step: current a is 4

Additional step: current a is 2

Additional step: current a is 1

Calculation completed, result = 1

You understand that you can do the same inside the expensiveStuff method

  def expensiveStuff(myData:MyData):Future[MyData]= { val firstResult = Promise.successful(myData) val handler : MyData => MyData = (myData) => { val result = myData.copy(myData.value/2) self ! ChangeState(result) result } val exitCondition : MyData => Boolean = (myData:MyData) => myData.value==1 mapRecursive(firstResult,handler,exitCondition) } 

EDIT - MORE DETAILS

If you do not want to block an Actor that processes messages from your mailbox in a thread-safe and synchronous manner, the only thing you can do is perform the calculation in another thread. This is definitely a high level of non-blocking reception.

However, you were right in saying that the approach I propose involves a fine for high productivity. Each step is taken in a different future, which may not be necessary at all. Therefore, you can process a handler for single or multi-threaded execution. After all, there is no magic formula:

  • If you plan to plan and minimize cost asynchronously, all work must be done in one thread.
  • However, this may interfere with other work, because if all threads in the thread pool are accepted, futures will be queued. Thus, you can split an operation into several futures so that even with full use you can plan a new job before the old job is completed.

 def recurseFuture[A](entryFuture: Future[A], handler: A => A, exitCondition: A => Boolean, maxNestedRecursion: Long = Long.MaxValue): Future[A] = { def recurse(a:A, handler: A => A, exitCondition: A => Boolean, maxNestedRecursion: Long, currentStep: Long): Future[A] = { if (exitCondition(a)) Promise.successful(a) else if (currentStep==maxNestedRecursion) Promise.successful(handler(a)).flatMap(a => recurse(a,handler,exitCondition,maxNestedRecursion,0)) else{ recurse(handler(a),handler,exitCondition,maxNestedRecursion,currentStep+1) } } entryFuture.flatMap { a => recurse(a,handler,exitCondition,maxNestedRecursion,0) } } 

I improved my handler method for testing purposes:

  val handler: Int => Int = (a: Int) => { val result = a / 2 println("Additional step done: the current a is " + result + " on thread " + Thread.currentThread().getName) result } 

Approach 1: Restart the handler on itself so that everything runs on the same thread.

  println("Starting strategy with all the steps on the same thread") val deepestRecursion: Future[Int] = recurseFuture(baseFuture,handler, exitCondition) Await.result(deepestRecursion, Duration.Inf) println("Completed strategy with all the steps on the same thread") println("") 

Approach 2: Recurse for a limited depth handler on itself

 println("Starting strategy with the steps grouped by three") val threeStepsInSameFuture: Future[Int] = recurseFuture(baseFuture,handler, exitCondition,3) val threeStepsInSameFuture2: Future[Int] = recurseFuture(baseFuture,handler, exitCondition,4) Await.result(threeStepsInSameFuture, Duration.Inf) Await.result(threeStepsInSameFuture2, Duration.Inf) println("Completed strategy with all the steps grouped by three") executorService.shutdown() 
+4
source

The best way to do this is very close to what you are already doing:

 case class Continue(todo: ToDo) class Worker extends Actor { var state: IntermediateState = _ def receive = { case Work(x) => val (next, todo) = calc(state, x) state = next self ! Continue(todo) case Continue(todo) if todo.isEmpty => // done case Continue(todo) => val (next, rest) = calc(state, todo) state = next self ! Continue(rest) } def calc(state: IntermediateState, todo: ToDo): (IntermediateState, ToDo) } 

EDIT: more background

When an actor sends messages to himself, Akkas internal processing will basically trigger those that are inside the while ; the number of messages processed at a time is determined by the settings of throughput dispatchers (by default 5), after this amount of processing the stream will be returned to the pool and the continuation will be transferred to the dispatcher as a new task. Therefore, in the above solution there are two configurations:

  • handle multiple steps for a single message (if the processing steps are REALLY small)
  • increase throughput tuning to increase throughput and decrease equity.

Hundreds of such participants seem to be working in the original problem, presumably on shared hardware that doesn't have hundreds of processors, so you probably need to set the bandwidth so that each batch takes no more than approx. 10ms.

Performance rating

Let's play a little with Fibonacci:

 Welcome to Scala version 2.10.0-RC1 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_07). Type in expressions to have them evaluated. Type :help for more information. scala> def fib(x1: BigInt, x2: BigInt, steps: Int): (BigInt, BigInt) = if(steps>0) fib(x2, x1+x2, steps-1) else (x1, x2) fib: (x1: BigInt, x2: BigInt, steps: Int)(BigInt, BigInt) scala> def time(code: =>Unit) { val start = System.currentTimeMillis; code; println("code took " + (System.currentTimeMillis - start) + "ms") } time: (code: => Unit)Unit scala> time(fib(1, 1, 1000)) code took 1ms scala> time(fib(1, 1, 1000)) code took 1ms scala> time(fib(1, 1, 10000)) code took 5ms scala> time(fib(1, 1, 100000)) code took 455ms scala> time(fib(1, 1, 1000000)) code took 17172ms 

This means that in a supposedly fully optimized loop, fib_100000 takes half a second. Now let's play a little with the actors:

 scala> case class Cont(steps: Int, batch: Int) defined class Cont scala> val me = inbox() me: akka.actor.ActorDSL.Inbox = akka.actor.dsl.Inbox$Inbox@32c0fe13 scala> val a = actor(new Act { var s: (BigInt, BigInt) = _ become { case Cont(x, y) if y < 0 => s = (1, 1); self ! Cont(x, -y) case Cont(x, y) if x > 0 => s = fib(s._1, s._2, y); self ! Cont(x - 1, y) case _: Cont => me.receiver ! s } }) a: akka.actor.ActorRef = Actor[akka://repl/user/$c] scala> time{a ! Cont(1000, -1); me.receive(10 seconds)} code took 4ms scala> time{a ! Cont(10000, -1); me.receive(10 seconds)} code took 27ms scala> time{a ! Cont(100000, -1); me.receive(10 seconds)} code took 632ms scala> time{a ! Cont(1000000, -1); me.receive(30 seconds)} code took 17936ms 

This is already interesting: a lot of time per step (with huge BigInts behind the scenes in the last line), the actors are not much more. Now let's see what it will do with smaller calculations in a simpler way:

 scala> time{a ! Cont(10000, -10); me.receive(30 seconds)} code took 462ms 

This is pretty close to the result for the direct option above.

Conclusion

Sending messages to yourself is NOT expensive for almost all applications, just keep the actual processing step a little longer than a few hundred nanoseconds.

+6
source

You should not use Actors to do lengthy calculations, as they block the threads that Actors code should run.

I would try to go with a project that uses a separate Thread / ThreadPool for calculations and uses AtomicReferences to store / query intermediate results in the lines of the following pseudocode:

 val cancelled = new AtomicBoolean(false) val intermediateResult = new AtomicReference[IntermediateResult]() object WorkerThread extends Thread { override def run { while(!cancelled.get) { intermediateResult.set(computationStep(intermediateResult.get)) } } } loop { react { case StartComputation => WorkerThread.start() case CancelComputation => cancelled.set(true) case GetCurrentResult => sender ! intermediateResult.get } } 
+2
source

This is a classic concurrency problem. You want to want a few routines / actors (or what you want to call). The code is basically correct Go, with indecently long variable names for context. The first procedure processes the queries and intermediate results:

 func serveIntermediateResults( computationChannel chan *IntermediateResult, queryChannel chan chan<-*IntermediateResult) { var latestIntermediateResult *IntermediateResult // initial result for { select { // an update arrives case latestIntermediateResult, notClosed := <-computationChannel: if !notClosed { // the computation has finished, stop checking computationChannel = nil } // a query arrived case queryResponseChannel, notClosed := <-queryChannel: if !notClosed { // no more queries, so we're done return } // respond with the latest result queryResponseChannel<-latestIntermediateResult } } } 

In your long calculation, you update your intermediate result wherever you need it:

 func longComputation(intermediateResultChannel chan *IntermediateResult) { for notFinished { // lots of stuff intermediateResultChannel<-currentResult } close(intermediateResultChannel) } 

Finally, to request the current result, you have a wrapper to make it enjoyable:

 func getCurrentResult() *IntermediateResult { responseChannel := make(chan *IntermediateResult) // queryChannel was given to the intermediate result server routine earlier queryChannel<-responseChannel return <-responseChannel } 
+1
source

Source: https://habr.com/ru/post/927525/


All Articles