How to prevent actor starvation in the presence of other long-term participants?

Used by Scala 2.8 Actors. I have a long job that can be parallelized. It consists of about 650,000 units of work. I divide it into 2600 different individual subtasks, and for each of them I create a new actor:

actor {
  val range = (0L to total by limit)
  val latch = new CountDownLatch(range.length)
  range.foreach { offset =>
    actor {
      doExpensiveStuff(offset,limit)
      latch.countDown
    }
  }
  latch.await
}

This works pretty well, but generally requires 2 + h to complete. The problem is that at the same time, any other actors that I create to perform common tasks seem to be headed by 2,600 actors who are also patiently waiting for their time to work in the stream, but have been waiting longer than any new participants who come .

How can I avoid this starvation?

Initial thoughts:

  • 2600 , . , , , .
  • 2600 , . , , 8 ? , , , .

UPDATE

, . , ThreadPool , ThreadPool . :

import testing._
import java.util.concurrent._
import actors.Futures._

val count = 100000
val poolSize = 4
val numRuns = 100

val ActorTest = new Benchmark {
  def run = {
    (1 to count).map(i => future {
      i * i
    }).foreach(_())
  }
}

val ThreadPoolTest = new Benchmark {
  def run = {
    val queue = new LinkedBlockingQueue[Runnable]
    val pool = new ThreadPoolExecutor(
          poolSize, poolSize, 1, TimeUnit.SECONDS, queue)
    val latch = new CountDownLatch(count)
    (1 to count).map(i => pool.execute(new Runnable {
      override def run = {
        i * i
        latch.countDown
      }
    }))
    latch.await
  }
}

List(ActorTest,ThreadPoolTest).map { b =>
  b.runBenchmark(numRuns).sum.toDouble / numRuns
}

// List[Double] = List(545.45, 44.35)

Future ActorTest, , . , Actor 10 . , ThreadPoolExecutor , Actor .

, , , , . ThreadPools , , .

+5
3

, , , fork/join ( 4, ). , .

  • , , ( ResizableThreadPoolScheduler, parallelism, )
  • ( )
  • @DaGGeRRz, Akka, (, ).

Actor :

( , actors.corePoolSize JVM). scheduler Actor , ResizableThreadPoolScheduler, , , . actors.enableForkJoin JVM false, a ResizableThreadPoolScheduler .

: scala -lang.

+6

, , , .

Future, , ? , Fork parallelism ( ) :

import actors.Futures._
def mkFuture(i : Int) = future {
  doExpensiveStuff(i, limit)
}
val fs = (1 to range by limit).map(mkFuture)
awaitAll(timeout, fs) //wait on the work all finishing

, parallelism, , , (, IO).

+4

, , scala .

+3
source

All Articles