(Thread pools in Java) Increasing the number of threads creates a slowdown for a simple loop. What for?

I have a little work that can be easily parallelized, and I want to use Java threads to split the work on my quad-core machine. This is a genetic algorithm applied to the traveling salesman problem. This does not seem to be easily parallelizable, but the first cycle is very easy. The second part, where I talk about real evolution, may or may not be, but I want to know if I am slowing down due to the way I implement flows, or if the algorithm itself.

Also, if anyone has any better ideas on how I should implement what I'm trying to do, that would be greatly appreciated.

In main (), I have this:

final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(numThreads*numIter); ThreadPoolExecutor tpool = new ThreadPoolExecutor(numThreads, numThreads, 10, TimeUnit.SECONDS, queue); barrier = new CyclicBarrier(numThreads); k.init(tpool); 

I have a loop that runs inside init () and looks like this:

 for (int i = 0; i < numCities; i++) { x[i] = rand.nextInt(width); y[i] = rand.nextInt(height); } 

What I changed to this:

 int errorCities = 0, stepCities = 0; stepCities = numCities/numThreads; errorCities = numCities - stepCities*numThreads; // Split up work, assign to threads for (int i = 1; i <= numThreads; i++) { int startCities = (i-1)*stepCities; int endCities = startCities + stepCities; // This is a bit messy... if(i <= numThreads) endCities += errorCities; tpool.execute(new citySetupThread(startCities, endCities)); } 

And here is the citySetupThread () class:

 public class citySetupThread implements Runnable { int start, end; public citySetupThread(int s, int e) { start = s; end = e; } public void run() { for (int j = start; j < end; j++) { x[j] = ThreadLocalRandom.current().nextInt(0, width); y[j] = ThreadLocalRandom.current().nextInt(0, height); } try { barrier.await(); } catch (InterruptedException ie) { return; } catch (BrokenBarrierException bbe) { return; } } } 

The above code runs once in the program, so it was a kind of test case for my stream constructions (this is my first experience with Java streams). I implemented the same in a real critical section, in particular in the evolutionary part of the genetic algorithm, the class of which is as follows:

 public class evolveThread implements Runnable { int start, end; public evolveThread(int s, int e) { start = s; end = e; } public void run() { // Get midpoint int n = population.length/2, m; for (m = start; m > end; m--) { int i, j; i = ThreadLocalRandom.current().nextInt(0, n); do { j = ThreadLocalRandom.current().nextInt(0, n); } while(i == j); population[m].crossover(population[i], population[j]); population[m].mutate(numCities); } try { barrier.await(); } catch (InterruptedException ie) { return; } catch (BrokenBarrierException bbe) { return; } } } 

What exists in the evolve () function, which is called in init () as follows:

 for (int p = 0; p < numIter; p++) evolve(p, tpool); 

Yes, I know that this is not a very good design, but for other reasons, I am stuck with this. Inside evolution are the relevant parts shown here:

 // Threaded inner loop int startEvolve = popSize - 1, endEvolve = (popSize - 1) - (popSize - 1)/numThreads; // Split up work, assign to threads for (int i = 0; i < numThreads; i++) { endEvolve = (popSize - 1) - (popSize - 1)*(i + 1)/numThreads + 1; tpool.execute(new evolveThread(startEvolve, endEvolve)); startEvolve = endEvolve; } // Wait for our comrades try { barrier.await(); } catch (InterruptedException ie) { return; } catch (BrokenBarrierException bbe) { return; } population[1].crossover(population[0], population[1]); population[1].mutate(numCities); population[0].mutate(numCities); // Pick out the strongest Arrays.sort(population, population[0]); current = population[0]; generation++; 

I really want to know the following:

  • What role does the queue play? Am I correctly creating a queue for so many jobs that I think will be executed for all threads in the pool? If the size is not large enough, I get a RejectedExecutionException. I just decided to do numThreads * numIterations, because there would be so many jobs (for the actual evolution method that I mentioned earlier). It's weird though .. I didn't have to do this if the .await () barrier worked, which forces me ...

  • Am I using the .await () barrier correctly? I currently have this in two places: inside the run () method for the Runnable object and after the for loop that does all the tasks. I would think that only one is needed, but I get errors if I delete one or the other.

  • I am suspicious of threading disputes, as this is the only thing I can extract from an absurd slowdown (which scales with input parameters). I want to know if this is related to the way I implement the thread pool and barriers. If not, then I will have to look into the crossover () and mutate () methods.

+4
source share
2 answers

Firstly, I think you might have a mistake with how you plan to use CyclicBarrier. You are currently initializing it with the number of worker threads as the number of parties. However, you have an additional side; main stream. So I think you need to do:

 barrier = new CyclicBarrier(numThreads + 1); 

I think this should work, but I personally find it strange using the barrier.

When using the workflow thread pool model, I find it easier to use the Semaphore or Java Future model.

For semaphore:

 class MyRunnable implements Runnable { private final Semaphore sem; public MyRunnable(Semaphore sem) { this.sem = sem; } public void run() { // do work // signal complete sem.release() } } 

Then in your main thread:

 Semaphore sem = new Semaphore(0); for (int i = 0; i < numJobs; ++i) { threadPool.execute(new MyRunnable(sem)); } sem.acquire(numJobs); 

In fact, it does the same thing as a barrier, but it’s easier for me to think about work tasks to “signal” that they are being performed instead of “synchronizing” with the main thread again.

For example, if you look at the sample code in CyclicBarrier JavaDoc , the call to barrier.await() is inside the loop inside the worker. Thus, it is indeed the synchronization of several long worker threads, and the main thread is not involved in the barrier. The call to barrier.await() at the end of the worker outside the loop is more complete with an alarm.

+3
source

As the number of tasks increases, you increase the overhead by adding each task. This means that you want to minimize the number of tasks, that is, the same as the number of processors that you have. For some tasks using double, the number of processors may be better if the workload is not equal.

By the way: you do not need a barrier in each task, you can wait for the completion of each task by calling get() on each of them.

+1
source

All Articles