I believe from your comment to Roland Kuhn that you have a job that can be considered recursive, at least in blocks. If this is not the case, I donβt think there might be some kind of clean solution to solve your problem, and you will have to deal with complex pattern matching blocks.
If my assumptions are correct, I would plan the calculation asynchronously and allow the actor to freely respond to other messages. The key point is the use of future monadic capabilities and the availability of a simple reception unit. You will have to handle three messages (startComput, changeState, getState)
As a result, you get the following:
def receive { case StartComputation(myData) =>expensiveStuff(myData) case ChangeState(newstate) = this.state = newstate case GetState => sender ! this.state }
And then you can use the map method in the future by defining your own recursive map:
def mapRecursive[A](f:Future[A], handler: A => A, exitConditions: A => Boolean):Future[A] = { f.flatMap { a=> if (exitConditions(a)) f else { val newFuture = f.flatMap{ a=> Future(handler(a))} mapRecursive(newFuture,handler,exitConditions) } } }
Once you have this tool, everything will be easier. If you look at the following example:
def main(args:Array[String]){ val baseFuture:Future[Int] = Promise.successful(64) val newFuture:Future[Int] = mapRecursive(baseFuture, (a:Int) => { val result = a/2 println("Additional step done: the current a is " + result) result }, (a:Int) => (a<=1)) val one = Await.result(newFuture,Duration.Inf) println("Computation finished, result = " + one) }
His conclusion:
Additional step: current a is 32
Additional step: current a is 16
Additional step: current a is 8
Additional step: current a is 4
Additional step: current a is 2
Additional step: current a is 1
Calculation completed, result = 1
You understand that you can do the same inside the expensiveStuff method
def expensiveStuff(myData:MyData):Future[MyData]= { val firstResult = Promise.successful(myData) val handler : MyData => MyData = (myData) => { val result = myData.copy(myData.value/2) self ! ChangeState(result) result } val exitCondition : MyData => Boolean = (myData:MyData) => myData.value==1 mapRecursive(firstResult,handler,exitCondition) }
EDIT - MORE DETAILS
If you do not want to block an Actor that processes messages from your mailbox in a thread-safe and synchronous manner, the only thing you can do is perform the calculation in another thread. This is definitely a high level of non-blocking reception.
However, you were right in saying that the approach I propose involves a fine for high productivity. Each step is taken in a different future, which may not be necessary at all. Therefore, you can process a handler for single or multi-threaded execution. After all, there is no magic formula:
- If you plan to plan and minimize cost asynchronously, all work must be done in one thread.
- However, this may interfere with other work, because if all threads in the thread pool are accepted, futures will be queued. Thus, you can split an operation into several futures so that even with full use you can plan a new job before the old job is completed.
def recurseFuture[A](entryFuture: Future[A], handler: A => A, exitCondition: A => Boolean, maxNestedRecursion: Long = Long.MaxValue): Future[A] = { def recurse(a:A, handler: A => A, exitCondition: A => Boolean, maxNestedRecursion: Long, currentStep: Long): Future[A] = { if (exitCondition(a)) Promise.successful(a) else if (currentStep==maxNestedRecursion) Promise.successful(handler(a)).flatMap(a => recurse(a,handler,exitCondition,maxNestedRecursion,0)) else{ recurse(handler(a),handler,exitCondition,maxNestedRecursion,currentStep+1) } } entryFuture.flatMap { a => recurse(a,handler,exitCondition,maxNestedRecursion,0) } }
I improved my handler method for testing purposes:
val handler: Int => Int = (a: Int) => { val result = a / 2 println("Additional step done: the current a is " + result + " on thread " + Thread.currentThread().getName) result }
Approach 1: Restart the handler on itself so that everything runs on the same thread.
println("Starting strategy with all the steps on the same thread") val deepestRecursion: Future[Int] = recurseFuture(baseFuture,handler, exitCondition) Await.result(deepestRecursion, Duration.Inf) println("Completed strategy with all the steps on the same thread") println("")
Approach 2: Recurse for a limited depth handler on itself
println("Starting strategy with the steps grouped by three") val threeStepsInSameFuture: Future[Int] = recurseFuture(baseFuture,handler, exitCondition,3) val threeStepsInSameFuture2: Future[Int] = recurseFuture(baseFuture,handler, exitCondition,4) Await.result(threeStepsInSameFuture, Duration.Inf) Await.result(threeStepsInSameFuture2, Duration.Inf) println("Completed strategy with all the steps grouped by three") executorService.shutdown()