Performing processor related tasks with Scala members?

Suppose I need to complete several tasks related to CPU. For example, if I have 4 processors, I would probably create a pool of fixed-size threads from 4-5 worker threads waiting for a queue, and put tasks in a queue. In Java, I can use java.util.concurrent (possibly ThreadPoolExecutor ) to implement this mechanism.

How would you implement it with Scala members?

+7
source share
2 answers

All participants are mainly threads that are executed by the scheduler under the hood. The scheduler creates a pool of threads to execute actors roughly associated with your number of cores. This means that you can simply create an actor for each task that you need to complete and leave the rest to Scala:

 for(i <- 1 to 20) { actor { print(i); Thread.sleep(1000); } } 

The disadvantage here depends on the number of tasks, the cost of creating a thread for each task can be quite expensive, since threads in Java are not so cheap.

An easy way to create a limited pool of active participants and then distribute tasks with them through messaging would look something like this:

 import scala.actors.Actor._ val numWorkers = 4 val pool = (1 to numWorkers).map { i => actor { loop { react { case x: String => println(x) } } } } for(i <- 1 to 20) { val r = (new util.Random).nextInt(numWorkers) pool(r) ! "task "+i } 

The reason we want to create multiple participants is because one actor processes only one message (i.e. a task) at a time to get parallelism for your tasks, which are necessary to create several.

Note: the default scheduler becomes especially important when it comes to tasks related to I / O, as you will definitely want to change the size of the thread pool in this case. Two good blog posts that describe this in detail: Explore the schedule of Scala Actors and Scala actors of a thread pool pool .

With that said, Akka is an Actor framework that provides tools for more complex workflows with Actors, and that is what I will use in any real application. This is where load balancing is done (not random):

 import akka.actor.Actor import Actor._ import akka.routing.{LoadBalancer, CyclicIterator} class TaskHandler extends Actor { def receive = { case t: Task => // some computationally expensive thing t.execute case _ => println("default case is required in Akka...") } } class TaskRouter(numWorkers: Int) extends Actor with LoadBalancer { val workerPool = Vector.fill(numWorkers)(actorOf[TaskHandler].start()) val seq = new CyclicIterator(workerPool) } val router = actorOf(new TaskRouter(4)).start() for(i <- 1 to 20) { router ! Task(..) } 

You may have different types of load balancing (CyclicIterator is a circular distribution), so you can check the documents here for more information.

+9
source

Well, you usually don't. Part of the appeal of using actors is that they process such details for you.

If, however, you insist on this, you need to override the protected scheduler method in the Actor class to return the corresponding IScheduler . See also scala.actors.scheduler package , and Actor comments for character regarding planners.

+2
source

All Articles