I have a little work that can be easily parallelized, and I want to use Java threads to split the work on my quad-core machine. This is a genetic algorithm applied to the traveling salesman problem. This does not seem to be easily parallelizable, but the first cycle is very easy. The second part, where I talk about real evolution, may or may not be, but I want to know if I am slowing down due to the way I implement flows, or if the algorithm itself.
Also, if anyone has any better ideas on how I should implement what I'm trying to do, that would be greatly appreciated.
In main (), I have this:
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(numThreads*numIter); ThreadPoolExecutor tpool = new ThreadPoolExecutor(numThreads, numThreads, 10, TimeUnit.SECONDS, queue); barrier = new CyclicBarrier(numThreads); k.init(tpool);
I have a loop that runs inside init () and looks like this:
for (int i = 0; i < numCities; i++) { x[i] = rand.nextInt(width); y[i] = rand.nextInt(height); }
What I changed to this:
int errorCities = 0, stepCities = 0; stepCities = numCities/numThreads; errorCities = numCities - stepCities*numThreads; // Split up work, assign to threads for (int i = 1; i <= numThreads; i++) { int startCities = (i-1)*stepCities; int endCities = startCities + stepCities; // This is a bit messy... if(i <= numThreads) endCities += errorCities; tpool.execute(new citySetupThread(startCities, endCities)); }
And here is the citySetupThread () class:
public class citySetupThread implements Runnable { int start, end; public citySetupThread(int s, int e) { start = s; end = e; } public void run() { for (int j = start; j < end; j++) { x[j] = ThreadLocalRandom.current().nextInt(0, width); y[j] = ThreadLocalRandom.current().nextInt(0, height); } try { barrier.await(); } catch (InterruptedException ie) { return; } catch (BrokenBarrierException bbe) { return; } } }
The above code runs once in the program, so it was a kind of test case for my stream constructions (this is my first experience with Java streams). I implemented the same in a real critical section, in particular in the evolutionary part of the genetic algorithm, the class of which is as follows:
public class evolveThread implements Runnable { int start, end; public evolveThread(int s, int e) { start = s; end = e; } public void run() {
What exists in the evolve () function, which is called in init () as follows:
for (int p = 0; p < numIter; p++) evolve(p, tpool);
Yes, I know that this is not a very good design, but for other reasons, I am stuck with this. Inside evolution are the relevant parts shown here:
// Threaded inner loop int startEvolve = popSize - 1, endEvolve = (popSize - 1) - (popSize - 1)/numThreads; // Split up work, assign to threads for (int i = 0; i < numThreads; i++) { endEvolve = (popSize - 1) - (popSize - 1)*(i + 1)/numThreads + 1; tpool.execute(new evolveThread(startEvolve, endEvolve)); startEvolve = endEvolve; } // Wait for our comrades try { barrier.await(); } catch (InterruptedException ie) { return; } catch (BrokenBarrierException bbe) { return; } population[1].crossover(population[0], population[1]); population[1].mutate(numCities); population[0].mutate(numCities); // Pick out the strongest Arrays.sort(population, population[0]); current = population[0]; generation++;
I really want to know the following:
What role does the queue play? Am I correctly creating a queue for so many jobs that I think will be executed for all threads in the pool? If the size is not large enough, I get a RejectedExecutionException. I just decided to do numThreads * numIterations, because there would be so many jobs (for the actual evolution method that I mentioned earlier). It's weird though .. I didn't have to do this if the .await () barrier worked, which forces me ...
Am I using the .await () barrier correctly? I currently have this in two places: inside the run () method for the Runnable object and after the for loop that does all the tasks. I would think that only one is needed, but I get errors if I delete one or the other.
I am suspicious of threading disputes, as this is the only thing I can extract from an absurd slowdown (which scales with input parameters). I want to know if this is related to the way I implement the thread pool and barriers. If not, then I will have to look into the crossover () and mutate () methods.