Akka Planning Schemes

Consider the classic program "Word Count". It counts the number of words in all files in a directory. The wizard receives a certain directory and splits the work among the actors (each worker works with one file). This is the pseudo code:

class WordCountWorker extends Actor { def receive = { case FileToCount(fileName:String) => val count = countWords(fileName) sender ! WordCount(fileName, count) } } class WordCountMaster extends Actor { def receive = { case StartCounting(docRoot) => // sending each file to worker val workers = createWorkers() fileNames = scanFiles(docRoot) sendToWorkers(fileNames, workers) case WordCount(fileName, count) => // aggregating results ... } } 

But I want to run the Word Count program on a schedule (for example, every 1 minute), providing different directories for scanning.

And Akka provides a good way to schedule messaging:

 system.scheduler.schedule(0.seconds, 1.minute, wordCountMaster , StartCounting(directoryName)) 

But the problem with the specified scheduler starts when the scheduler sends a new message by tick, but the previous message has not yet been processed (for example, I sent a message to scan some large directory, and after 1 second I sent another message to scan another directory, so processing works 1 th directory is not finished yet). Thus, my WordCountMaster will receive WordCountMaster messages from workers who process different directories.

As a workaround, instead of planning to send messages, I can schedule the execution of some block of code that will create a new WordCountMaster each time. That is, one directory = one WordCountMaster . But I believe this is inefficient, and I need to take care of providing unique names for WordCountMaster to avoid an InvalidActorNameException .

So my question is: should I create a new WordCountMaster for each tick, as I mentioned in the previous paragraph? Or are there some best ideas / templates on how to reverse engineer this program to support planning?


Some update: In case of creating one main actor in the directory, I have some problems:

  • Problem with naming members

InvalidActorNameException: actor name [WordCountMaster] is not unique!

and

InvalidActorNameException: the name of the actor [WordCountWorker] is not unique!

I can solve this problem simply by not providing the actor name. But in this case, my actors get auto-generated names like $a , $b , etc. This is not good for me.

  1. The problem with the configuration:

I want to exclude the configuration of my routers on application.conf . That is, I want to provide the same configuration for each WordCountWorker router. But since I do not control the names of the actors, I can not use the configuration below, because I do not know the name of the actor:

  /wordCountWorker{ router = smallest-mailbox-pool nr-of-instances = 5 dispatcher = word-counter-dispatcher } 
+5
source share
4 answers

I'm not an Akka expert, but I think the approach to actor for aggregation is not inefficient. You need to somehow maintain parallel aggregations. You can either give each aggregation an identifier to keep them separated by an identifier in one and only main actor, or you can use the Akka logic and username and life cycle logic, and also delegate each population for each round of counting to an actor who will only live for this logic of aggregation.

For me, using one actor for aggregation seems more elegant.

Also note that Akka has an implementation for the aggregation pattern, as described here

+4
source

You must hire to become / not obstruct in the workplace. If your worker starts scanning a large folder, use become to change the behavior of an actor that ignores another message (or a response that does not process it), after scanning the directory, send a message with the word number and unbecome to standard behavior.

+2
source

Personally, I would not use actors to solve this problem with aggregation in general, but in any case, here.

I don't think there is a reasonable way to handle word counts for multiple directories at the same time, as you suggest. Instead, you should have a master master who controls the counters. So instead, you have three actor classes:

  • FileCounter: it receives a file for reading and just processes it. When this is done, it will send the result back to the sender.
  • CounterSupervisor: this keeps track of which FileCounter file has completed its tasks and sends the result back to WordCountForker.
  • WordCountForker: this actor will track which subsystem has completed its task, and if they are all busy, create a new CounterSupervisor to solve this problem.

The file counter should be the easiest to write.

 class FileCounter() extends Actor with ActorLogging { import context.dispatcher override def preStart = { log.info("FileCounter Actor initialized") } def receive = { case CountFile(file) => log.info("Counting file: " + file.getAbsolutePath) FileIO.readFile(file).foreach { data => val words = data .split("\n") .map { _.split(" ").length } .sum context.parent ! FileCount(words) } } } 

And now the actor who controls the file counters.

 class CounterSupervisor(actorPool: Int) extends Actor with ActorLogging { var total = 0 var files: Array[File] = _ var pendingActors = 0 override def preStart = { for(i <- 1 to actorPool) context.actorOf(FileCounter.props(), name = s"counter$i") } def receive = { case CountDirectory(base) => log.info("Now counting starting from directory : " + base.getAbsolutePath) total = 0 files = FileIO.getAllFiles(base) pendingActors = 0 for(i <- 1 to actorPool if(i < files.length)) { pendingActors += 1 context.child(s"counter$i").get ! CountFile(files.head) files = files.tail } case FileCount(count) => total += count pendingActors -= 1 if(files.length > 0) { sender() ! CountFile(files.head) files = files.tail pendingActors += 1 } else if(pendingActors == 0) { context.parent ! WordCountTotal(total) } } } 

And then the actor who controls the overseers.

 class WordCountForker(counterActors: Int) extends Actor with ActorLogging { var busyActors: List[(ActorRef, ActorRef)] = Nil var idleActors: List[ActorRef] = _ override def preStart = { val first = context.actorOf(CounterSupervisor.props(counterActors)) idleActors = List(first) log.info(s"Initialized first supervisor with $counterActors file counters.") } def receive = { case msg @ CountDirectory(dir) => log.info("Count directory received") val counter = idleActors match { case Nil => context.actorOf(CounterSupervisor.props(counterActors)) case head :: rest => idleActors = rest head } counter ! msg busyActors = (counter, sender()) :: busyActors case msg @ WordCountTotal(n) => val path = sender().path.toString() val index = busyActors.indexWhere { _._1.path.toString == path } val (counter, replyTo) = busyActors(index) replyTo ! msg idleActors = counter :: idleActors busyActors = busyActors.patch(index, Nil, 1) } } 

I left some parts of the answer so that it is as concise as possible, if you want to see the rest of the code I posted a Gist .

In addition to your concerns about efficiency, the solution here will prevent one subsystem for each directory, but you will still spawn more than one if necessary.

+2
source

At first. to the naming problem: just name your actors dynamically and uniquely, something like this:
WorkerActor + "-" + file name ... or ... MasterActor + "-" + directoryName
Or am I missing something?

Secondly, why planning? Wouldn't it be more logical to start work on the next directory when the first one is completed? If planning is a requirement, I see many different solutions to your problem, and I will try to solve some of them:

1.
Three-level hierarchy:
MasterActor β†’ DirectoryActor β†’ WorkerActor
Create a new directory actor for each new directory and a new worker for each file.

2.
Two-level hierarchy:
MasterActor β†’ Employee Instructor
You create a new worker for each file.
Two options for determining the results:
a) Give work to employees by asking and summarizing the results using futures
b) Include the message identifier in the job (e.g. directory name)

3.
Two-level hierarchy with load balancing:
Same as in option 2, but you do not create a new employee for each file, you have a fixed number of workers with either a balancing manager or the smallest mailbox router.

4.
One-level hierarchy with futures:
The master actor has no children, he works and aggregates the results only with futures.

I also recommend reading about the Akka aggregation scheme, as suggested by Gregor Ryman in his answer.

+1
source

All Articles