I played with Akka Perseverance and wrote the following program to test my understanding. The problem is that every time I run this program, I get different results. The correct answer is 49995000, but I do not always understand this. I cleared the log directory between each run, but that does not make any difference. Can anyone understand what is going on? The program simply sums all the numbers from 1 to n (where n is 9999 in the code below).
The correct answer is: (n * (n + 1)) / 2. For n = 9999, this is 49995000.
EDIT: It seems to work more consistently with JDK 8 than with JDK 7. Should I use only JDK 8?
package io.github.ourkid.akka.aggregator.guaranteed import akka.actor.Actor import akka.actor.ActorPath import akka.actor.ActorSystem import akka.actor.Props import akka.actor.actorRef2Scala import akka.persistence.AtLeastOnceDelivery import akka.persistence.PersistentActor case class ExternalRequest(updateAmount : Int) case class CountCommand(deliveryId : Long, updateAmount : Int) case class Confirm(deliveryId : Long) sealed trait Evt case class CountEvent(updateAmount : Int) extends Evt case class ConfirmEvent(deliveryId : Long) extends Evt class TestGuaranteedDeliveryActor(counter : ActorPath) extends PersistentActor with AtLeastOnceDelivery { override def persistenceId = "persistent-actor-ref-1" override def receiveCommand : Receive = { case ExternalRequest(updateAmount) => persist(CountEvent(updateAmount))(updateState) case Confirm(deliveryId) => persist(ConfirmEvent(deliveryId)) (updateState) } override def receiveRecover : Receive = { case evt : Evt => updateState(evt) } def updateState(evt:Evt) = evt match { case CountEvent(updateAmount) => deliver(counter, id => CountCommand(id, updateAmount)) case ConfirmEvent(deliveryId) => confirmDelivery(deliveryId) } } class FactorialActor extends Actor { var count = 0 def receive = { case CountCommand(deliveryId : Long, updateAmount:Int) => { count = count + updateAmount sender() ! Confirm(deliveryId) } case "print" => println(count) } } object GuaranteedDeliveryTest extends App { val system = ActorSystem() val factorial = system.actorOf(Props[FactorialActor]) val delActor = system.actorOf(Props(classOf[TestGuaranteedDeliveryActor], factorial.path)) import system.dispatcher system.scheduler.schedule(0 seconds, 2 seconds) { factorial ! "print" } for (i <- 1 to 9999) delActor ! ExternalRequest(i) }
SBT file
name := "akka_aggregator" organization := "io.github.ourkid" version := "0.0.1-SNAPSHOT" scalaVersion := "2.11.4" scalacOptions ++= Seq("-unchecked", "-deprecation") resolvers ++= Seq( "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" ) val Akka = "2.3.7" val Spray = "1.3.2" libraryDependencies ++= Seq( // Core Akka "com.typesafe.akka" %% "akka-actor" % Akka, "com.typesafe.akka" %% "akka-cluster" % Akka, "com.typesafe.akka" %% "akka-persistence-experimental" % Akka, "org.iq80.leveldb" % "leveldb" % "0.7", "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8", // For future REST API "io.spray" %% "spray-httpx" % Spray, "io.spray" %% "spray-can" % Spray, "io.spray" %% "spray-routing" % Spray, "org.typelevel" %% "scodec-core" % "1.3.0", // CSV reader "net.sf.opencsv" % "opencsv" % "2.3", // Logging "com.typesafe.akka" %% "akka-slf4j" % Akka, "ch.qos.logback" % "logback-classic" % "1.0.13", // Testing "org.scalatest" %% "scalatest" % "2.2.1" % "test", "com.typesafe.akka" %% "akka-testkit" % Akka % "test", "io.spray" %% "spray-testkit" % Spray % "test", "org.scalacheck" %% "scalacheck" % "1.11.6" % "test" ) fork := true mainClass in assembly := Some("io.github.ourkid.akka.aggregator.TestGuaranteedDeliveryActor")
application.conf file
The correct conclusion:
18:02:36.684 [default-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started 0 18:02:36.951 [default-akka.actor.default-dispatcher-14] DEBUG asSerialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl] 18:02:36.966 [default-akka.actor.default-dispatcher-3] DEBUG asSerialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent] 3974790 24064453 18:02:42.313 [default-akka.actor.default-dispatcher-11] DEBUG asSerialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent] 49995000 49995000 49995000 49995000
Incorrect start:
17:56:22.493 [default-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - Default Loggers started 0 17:56:22.750 [default-akka.actor.default-dispatcher-2] DEBUG asSerialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl] 17:56:22.765 [default-akka.actor.default-dispatcher-7] DEBUG asSerialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent] 3727815 22167811 17:56:28.391 [default-akka.actor.default-dispatcher-3] DEBUG asSerialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent] 49995000 51084018 51084018 52316760 52316760 52316760 52316760 52316760
Another incorrect start:
17:59:12.122 [default-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started 0 17:59:12.387 [default-akka.actor.default-dispatcher-7] DEBUG asSerialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl] 17:59:12.402 [default-akka.actor.default-dispatcher-13] DEBUG asSerialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent] 2982903 17710176 49347145 17:59:18.204 [default-akka.actor.default-dispatcher-13] DEBUG asSerialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent] 51704199 51704199 55107844 55107844 55107844 55107844