Trying to get an idea of how to think from the point of view of actors, not flows. I am a little puzzled by the following use case:
Consider a system in which there is a manufacturing process that creates work (for example, by reading data from a file) and a number of work processes that consume work (for example, by analyzing data and writing it to the database). The rates at which they are produced and consumed can vary, and the system must remain resistant to this. For example, if workers cannot keep up with the times, the manufacturer must detect this and ultimately slow down or wait.
This is pretty easy to implement using threads:
val producer:Iterator[Work] = createProducer() val queue = new LinkedBlockingQueue[Work](QUEUE_SIZE) val workers = (0 until NUM_WORKERS) map { i => new Thread() { override def run() = { while (true) { try { // take next unit of work, waiting if necessary val work = queue.take() process(work) } catch { case e:InterruptedException => return } } } } } // start the workers workers.foreach(_.start()) while (producer.hasNext) { val work = producer.next() // add new unit of work, waiting if necessary queue.put(work) } while (!queue.isEmpty) { // wait until queue is drained queue.wait() } // stop the workers workers.foreach(_.interrupt())
There is nothing wrong with this model, and I have successfully used it before. This example is probably too verbose, since using Executor or CompletionService would be appropriate for this task. But I like the abstraction of the actor, and in many cases it’s easier to reason. Is there a way to rewrite this example with the help of participants, especially to make sure that the buffer is full (for example, full mailboxes, discarded messages, etc.)?
scala concurrency actor refactoring
toluju
source share