Acca tenacity with confirmed delivery yields inconsistent results

I played with Akka Perseverance and wrote the following program to test my understanding. The problem is that every time I run this program, I get different results. The correct answer is 49995000, but I do not always understand this. I cleared the log directory between each run, but that does not make any difference. Can anyone understand what is going on? The program simply sums all the numbers from 1 to n (where n is 9999 in the code below).

The correct answer is: (n * (n + 1)) / 2. For n = 9999, this is 49995000.

EDIT: It seems to work more consistently with JDK 8 than with JDK 7. Should I use only JDK 8?

package io.github.ourkid.akka.aggregator.guaranteed import akka.actor.Actor import akka.actor.ActorPath import akka.actor.ActorSystem import akka.actor.Props import akka.actor.actorRef2Scala import akka.persistence.AtLeastOnceDelivery import akka.persistence.PersistentActor case class ExternalRequest(updateAmount : Int) case class CountCommand(deliveryId : Long, updateAmount : Int) case class Confirm(deliveryId : Long) sealed trait Evt case class CountEvent(updateAmount : Int) extends Evt case class ConfirmEvent(deliveryId : Long) extends Evt class TestGuaranteedDeliveryActor(counter : ActorPath) extends PersistentActor with AtLeastOnceDelivery { override def persistenceId = "persistent-actor-ref-1" override def receiveCommand : Receive = { case ExternalRequest(updateAmount) => persist(CountEvent(updateAmount))(updateState) case Confirm(deliveryId) => persist(ConfirmEvent(deliveryId)) (updateState) } override def receiveRecover : Receive = { case evt : Evt => updateState(evt) } def updateState(evt:Evt) = evt match { case CountEvent(updateAmount) => deliver(counter, id => CountCommand(id, updateAmount)) case ConfirmEvent(deliveryId) => confirmDelivery(deliveryId) } } class FactorialActor extends Actor { var count = 0 def receive = { case CountCommand(deliveryId : Long, updateAmount:Int) => { count = count + updateAmount sender() ! Confirm(deliveryId) } case "print" => println(count) } } object GuaranteedDeliveryTest extends App { val system = ActorSystem() val factorial = system.actorOf(Props[FactorialActor]) val delActor = system.actorOf(Props(classOf[TestGuaranteedDeliveryActor], factorial.path)) import system.dispatcher system.scheduler.schedule(0 seconds, 2 seconds) { factorial ! "print" } for (i <- 1 to 9999) delActor ! ExternalRequest(i) } 

SBT file

 name := "akka_aggregator" organization := "io.github.ourkid" version := "0.0.1-SNAPSHOT" scalaVersion := "2.11.4" scalacOptions ++= Seq("-unchecked", "-deprecation") resolvers ++= Seq( "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" ) val Akka = "2.3.7" val Spray = "1.3.2" libraryDependencies ++= Seq( // Core Akka "com.typesafe.akka" %% "akka-actor" % Akka, "com.typesafe.akka" %% "akka-cluster" % Akka, "com.typesafe.akka" %% "akka-persistence-experimental" % Akka, "org.iq80.leveldb" % "leveldb" % "0.7", "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8", // For future REST API "io.spray" %% "spray-httpx" % Spray, "io.spray" %% "spray-can" % Spray, "io.spray" %% "spray-routing" % Spray, "org.typelevel" %% "scodec-core" % "1.3.0", // CSV reader "net.sf.opencsv" % "opencsv" % "2.3", // Logging "com.typesafe.akka" %% "akka-slf4j" % Akka, "ch.qos.logback" % "logback-classic" % "1.0.13", // Testing "org.scalatest" %% "scalatest" % "2.2.1" % "test", "com.typesafe.akka" %% "akka-testkit" % Akka % "test", "io.spray" %% "spray-testkit" % Spray % "test", "org.scalacheck" %% "scalacheck" % "1.11.6" % "test" ) fork := true mainClass in assembly := Some("io.github.ourkid.akka.aggregator.TestGuaranteedDeliveryActor") 

application.conf file

 ########################################## # Akka Persistence Reference Config File # ########################################## akka { # Loggers to register at boot time (akka.event.Logging$DefaultLogger logs # to STDOUT) loggers = ["akka.event.slf4j.Slf4jLogger"] # Log level used by the configured loggers (see "loggers") as soon # as they have been started; before that, see "stdout-loglevel" # Options: OFF, ERROR, WARNING, INFO, DEBUG loglevel = "DEBUG" # Log level for the very basic logger activated during ActorSystem startup. # This logger prints the log messages to stdout (System.out). # Options: OFF, ERROR, WARNING, INFO, DEBUG stdout-loglevel = "INFO" # Filter of log events that is used by the LoggingAdapter before # publishing log events to the eventStream. logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" # Protobuf serialization for persistent messages actor { serializers { akka-persistence-snapshot = "akka.persistence.serialization.SnapshotSerializer" akka-persistence-message = "akka.persistence.serialization.MessageSerializer" } serialization-bindings { "akka.persistence.serialization.Snapshot" = akka-persistence-snapshot "akka.persistence.serialization.Message" = akka-persistence-message } } persistence { journal { # Maximum size of a persistent message batch written to the journal. max-message-batch-size = 200 # Maximum size of a deletion batch written to the journal. max-deletion-batch-size = 10000 # Path to the journal plugin to be used plugin = "akka.persistence.journal.leveldb" # In-memory journal plugin. inmem { # Class name of the plugin. class = "akka.persistence.journal.inmem.InmemJournal" # Dispatcher for the plugin actor. plugin-dispatcher = "akka.actor.default-dispatcher" } # LevelDB journal plugin. leveldb { # Class name of the plugin. class = "akka.persistence.journal.leveldb.LeveldbJournal" # Dispatcher for the plugin actor. plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" # Dispatcher for message replay. replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" # Storage location of LevelDB files. dir = "journal" # Use fsync on write fsync = on # Verify checksum on read. checksum = off # Native LevelDB (via JNI) or LevelDB Java port native = on # native = off } # Shared LevelDB journal plugin (for testing only). leveldb-shared { # Class name of the plugin. class = "akka.persistence.journal.leveldb.SharedLeveldbJournal" # Dispatcher for the plugin actor. plugin-dispatcher = "akka.actor.default-dispatcher" # timeout for async journal operations timeout = 10s store { # Dispatcher for shared store actor. store-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" # Dispatcher for message replay. replay-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" # Storage location of LevelDB files. dir = "journal" # Use fsync on write fsync = on # Verify checksum on read. checksum = off # Native LevelDB (via JNI) or LevelDB Java port native = on } } } snapshot-store { # Path to the snapshot store plugin to be used plugin = "akka.persistence.snapshot-store.local" # Local filesystem snapshot store plugin. local { # Class name of the plugin. class = "akka.persistence.snapshot.local.LocalSnapshotStore" # Dispatcher for the plugin actor. plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" # Dispatcher for streaming snapshot IO. stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher" # Storage location of snapshot files. dir = "snapshots" } } view { # Automated incremental view update. auto-update = on # Interval between incremental updates auto-update-interval = 5s # Maximum number of messages to replay per incremental view update. Set to # -1 for no upper limit. auto-update-replay-max = -1 } at-least-once-delivery { # Interval between redelivery attempts redeliver-interval = 5s # Maximum number of unconfirmed messages that will be sent in one redelivery burst redelivery-burst-limit = 10000 # After this number of delivery attempts a `ReliableRedelivery.UnconfirmedWarning` # message will be sent to the actor. warn-after-number-of-unconfirmed-attempts = 5 # Maximum number of unconfirmed messages that an actor with AtLeastOnceDelivery is # allowed to hold in memory. max-unconfirmed-messages = 100000 } dispatchers { default-plugin-dispatcher { type = PinnedDispatcher executor = "thread-pool-executor" } default-replay-dispatcher { type = Dispatcher executor = "fork-join-executor" fork-join-executor { parallelism-min = 2 parallelism-max = 8 } } default-stream-dispatcher { type = Dispatcher executor = "fork-join-executor" fork-join-executor { parallelism-min = 2 parallelism-max = 8 } } } } } 

The correct conclusion:

 18:02:36.684 [default-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started 0 18:02:36.951 [default-akka.actor.default-dispatcher-14] DEBUG asSerialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl] 18:02:36.966 [default-akka.actor.default-dispatcher-3] DEBUG asSerialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent] 3974790 24064453 18:02:42.313 [default-akka.actor.default-dispatcher-11] DEBUG asSerialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent] 49995000 49995000 49995000 49995000 

Incorrect start:

 17:56:22.493 [default-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - Default Loggers started 0 17:56:22.750 [default-akka.actor.default-dispatcher-2] DEBUG asSerialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl] 17:56:22.765 [default-akka.actor.default-dispatcher-7] DEBUG asSerialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent] 3727815 22167811 17:56:28.391 [default-akka.actor.default-dispatcher-3] DEBUG asSerialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent] 49995000 51084018 51084018 52316760 52316760 52316760 52316760 52316760 

Another incorrect start:

 17:59:12.122 [default-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started 0 17:59:12.387 [default-akka.actor.default-dispatcher-7] DEBUG asSerialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl] 17:59:12.402 [default-akka.actor.default-dispatcher-13] DEBUG asSerialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent] 2982903 17710176 49347145 17:59:18.204 [default-akka.actor.default-dispatcher-13] DEBUG asSerialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent] 51704199 51704199 55107844 55107844 55107844 55107844 
+7
scala akka akka-persistence
source share
1 answer

You are using AtLeastOnceDelivery semantics. As said here :

Note. Delivery "At least once" implies that the original order for sending a message is not always saved, and the recipient may receive a duplicate of the Message. This means that the semantics do not match the semantics of the normal ActorRef send operation:

it is not once that the message delivery order for the same sender-receiver pair is not preserved due to possible retries after the accident and restarting the target messages are still delivered - the embodiment of a new actor. This semantics is similar to what ActorPath represents (see Actor's Life Cycle), so you need to supply a path, not a link when delivering messages. messages are sent to the path with the choice of actor.

Thus, some numbers can be obtained more than once. You can simply ignore the duplicate numbers inside the FactorialActor or not use this semantics.

+10
source share

All Articles