I'm trying to find tolerant behavior in Akka Actors. I am working on some code, which depends on which actors in the system are available for lengthy processing. I find that my processing stops after a couple of hours (it will take about 10 hours), and not much happens. I believe that my actors are not recovering due to exceptions.
What do I need to do to get the Actors to restart one on one forever? I expect this can be done from this documentation http://akka.io/docs/akka/1.1.3/scala/fault-tolerance
I work with akka 1.1.3 and scala 2.9
import akka.actor.Actor import akka.actor.Actor._ import akka.actor.ActorRef import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached import akka.dispatch.Dispatchers import akka.routing.CyclicIterator import akka.routing.LoadBalancer import akka.config.Supervision._ object TestActor { val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool") .setCorePoolSize(100) .setMaxPoolSize(100) .build } class TestActor(val name: Integer) extends Actor { self.lifeCycle = Permanent self.dispatcher = TestActor.dispatcher def receive = { case num: Integer => { if( num % 2 == 0 ) throw new Exception("This is a simulated failure") println("Actor: " + name + " Received: " + num) //Thread.sleep(100) } } override def postStop(){ println("TestActor post Stop ") } //callback method for restart handling override def preRestart(reason: Throwable){ println("TestActor "+ name + " restaring after shutdown because of " + reason) } //callback method for restart handling override def postRestart(reason: Throwable){ println("Restaring TestActor "+name+"after shutdown because of " + reason) } } trait CyclicLoadBalancing extends LoadBalancer { this: Actor => val testActors: List[ActorRef] val seq = new CyclicIterator[ActorRef](testActors) } trait TestActorManager extends Actor { self.lifeCycle = Permanent self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 1000, 5000) val testActors: List[ActorRef] override def preStart = testActors foreach { self.startLink(_) } override def postStop = { System.out.println("postStop") } } object FaultTest { def main(args : Array[String]) : Unit = { println("starting FaultTest.main()") val numOfActors = 5 val supervisor = actorOf( new TestActorManager with CyclicLoadBalancing { val testActors = (0 until numOfActors toList) map (i => actorOf(new TestActor(i))); } ) supervisor.start(); println("Number of Actors: " + Actor.registry.actorsFor(classOf[TestActor]).length) val testActor = Actor.registry.actorsFor(classOf[TestActor]).head (1 until 200 toList) foreach { testActor ! _ } } }
This code sets the 5 characters behind the LoadBalancer, which simply print the integers that are sent to them, except that they throw exceptions on even numbers to simulate errors. Integers from 0 to 200 are sent to these Actors. I expect odd numbers to get a way out, but everything seems to close after a couple of errors on even numbers. Running this code with sbt leads to this conclusion:
[info] Running FaultTest starting FaultTest.main() Loading config [akka.conf] from the application classpath. Number of Actors: 5 Actor: 2 Received: 1 Actor: 2 Received: 9 Actor: 1 Received: 3 Actor: 3 Received: 7 [info] == run == [success] Successful. [info] [info] Total time: 13 s, completed Aug 16, 2011 11:00:23 AM
I think that five actors are taking place here, and the first 5 even numbers force them out of business, and they do not restart.
How can this code be changed so that Actors recover from an exception?
I expect this to actually print all the odd numbers from 1 to 200. I think that every actor will fail on even numbers, but will be restarted with an intact mailbox on exceptions. I expect to see println from preRestart and postRestart. What needs to be configured in this code example for this to happen?
Here are some additional assumptions about accs and actors that may lead to my misunderstanding. I assume that the actor can be configured using Supervisor or faultHandler so that it is restarted and is still available when an exception occurs during reception. I assume that the message that was sent to the actor will be lost if it throws an exception at the time of receipt. I assume that preRestart () and postRestart () will be called for the actor who throws the exception.
The sample code is what I'm trying to do, and is based on Why is my actor manager shortened to Akka?
** Another code example **
Here is another sample code that is simpler. I start one actor who throws exceptions on even numbers. There is no load balancing or other material in the way. I am trying to print information about an actor. I wait to exit the program within a minute after messages have been sent to the Actor and keep track of what is happening.
I expect this to print odd numbers, but it looks like the Actor is sitting with messages in his inbox.
Am I having the wrong OneForOneStrategy installation? Do I need to associate an actor with something? Is such a configuration fundamentally wrong for me? Should the dispatcher be configured with fault tolerance in some way? Can I start threads in dispatcher?
import akka.actor.Actor import akka.actor.Actor._ import akka.actor.ActorRef import akka.actor.ActorRegistry import akka.config.Supervision._ class SingleActor(val name: Integer) extends Actor { self.lifeCycle = Permanent self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 30, 1000) def receive = { case num: Integer => { if( num % 2 == 0 ) throw new Exception("This is a simulated failure, where does this get logged?") println("Actor: " + name + " Received: " + num) } } override def postStop(){ println("TestActor post Stop ") } override def preRestart(reason: Throwable){ println("TestActor "+ name + " restaring after shutdown because of " + reason) } override def postRestart(reason: Throwable){ println("Restaring TestActor "+name+"after shutdown because of " + reason) } } object TestSingleActor{ def main(args : Array[String]) : Unit = { println("starting TestSingleActor.main()") val testActor = Actor.actorOf( new SingleActor(1) ).start() println("number of actors: " + registry.actors.size) printAllActorsInfo (1 until 20 toList) foreach { testActor ! _ } for( i <- 1 until 120 ){ Thread.sleep(500) printAllActorsInfo } } def printAllActorsInfo() ={ registry.actors.foreach( (a) => println("Actor hash: %d has mailbox %d isRunning: %b isShutdown: %b isBeingRestarted: %b " .format(a.hashCode(),a.mailboxSize,a.isRunning,a.isShutdown,a.isBeingRestarted))) } }
I get output like:
[info] Running TestSingleActor starting TestSingleActor.main() Loading config [akka.conf] from the application classpath. number of actors: 1 Actor hash: -1537745664 has mailbox 0 isRunning: true isShutdown: false isBeingRestarted: false Actor: 1 Received: 1 Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false ... 117 more of these lines repeted ... Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false [info] == run == [success] Successful. [info] [info] Total time: 70 s, completed Aug 17, 2011 2:24:49 PM