Using Java multithreading, what is most effective for coordinating the search for the best result?

Let me clarify that the method described below is workable. I hope to improve the throughput of the method. It works, and it works quite well. We strive to increase throughput even further, which is why I am studying this.

The challenge is to improve the performance of the scoring algorithm, which returns the best result for a set of tasks. I have a set of tasks that I perform to evaluate with ExecutorService. Each task checks to see if it currently has the best score, and updates the best score in a synchronized fashion if it is new. To give some insight into the scale at which I work, each task takes a fraction of a millisecond, but there are thousands of them, leaving several hundred milliseconds to find the best. I execute this counting algorithm several hundred times per minute. The result is that 30 seconds out of 60 are spent executing this estimation algorithm.

When my thread pool has 8 threads (with 24 virtual cores), tasks take 0.3 ms each. When I have 20 threads (the same machine, 24 virtual cores), the tasks take 0.6 ms each. I suspect that when I add more threads to my thread pool ExecutorService, that my performance degrades due to this synchronization for a better result (more threads fighting for blocking).

I searched quite a lot, but I can not find satisfactory (in fact, I can not find any alternatives). I am going to collect all the grades and either save them in sorted order or sort them after completing all the tasks, but I'm not sure if this will be any improvement.

Does anyone have any thoughts on another, more efficient way to collect better scores?

Here's the current methodology:

final double[] bestScore = { Double.MAX_VALUE };
// for each item in the collection {
    tasks.add(Executors.callable(new Runnable() {
        public void run() {
            double score = //... do the scoring for the task
            if (score < bestScore[0]) {
                synchronized(bestScore) {
                    if (score < bestScore[0]) { // check again after we have the lock
                        bestScore[0] = score;
                        ...
                        // also save off other task identifiers in a similar fashion
                    }
                }
            }
        }
    }
} // end of loop creating scoring tasks

List<Future<Object>> futures = executorService.invokeAll(tasks /*...timeout params here*/);
... // handle cancelled tasks 

// now use the best scoring task that was saved off when it was found.
+4
4

, , ExecutorService. , . Callable, ( ) . , .

, DoubleAccumulator, , . :

final DoubleAccumulator lowest = new DoubleAccumulator(Math::min, Double.POSITIVE_INFINITY);
/* Loop, creating all the tasks... */
for ( ... ) {
  tasks.add(Executors.callable(new Runnable() {
    public void run()
    {
      double score = 0; /* Compute a real score here. */
      lowest.accumulate(score);
    }
  }));
}
/* Invoke all the tasks, when successful... */
double lowestScore = lowest.get();

, - AtomicReference, , , , .

- " ", , , fork-join, Stream.

, , , .

+2

, 10 . , . 10 000 , , 10 , 1000.

max 1000 . 10 , 10, .

+2

. bestScore, ? double? , ?

, , . Callable , , :

public class ScoreComputer implements Callable<Double> {
    @Override
    public Double call() throws Exception {
        double score = 0;
        //Compute and return score here.
        return score;
    }
}

a ScoreComputer, Future<Double>, . .

public static void main(String[] args) throws ExecutionException, InterruptedException {
        double bestScore = Double.MAX_VALUE;
        List<Future<Double>> futures = new ArrayList<>();
        //For each item in collection create a task and set it to run.
        ExecutorService service = Executors.newCachedThreadPool();
        futures.add(service.submit(new ScoreComputer()));

        List<Double> scores = new ArrayList<>();

        for(Future<Double> future : futures) {
            scores.add(future.get());
        }

        Double bestScoreInTasks = Collections.min(scores);
        if(bestScore < bestScoreInTasks) {
            bestScore = bestScoreInTasks;
        }
        System.out.println(bestScore);
    }

, . , IMO . Java Doc:

newCachedThreadPool() , , , . , , .

+1

, , EricF, , . , Runnable (, , , ).

-, :

ScoreCalculatorOriginal.java( ):

public class ScoreCalculatorOriginal {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ArrayList<Callable<Object>> tasks = new ArrayList<>();
        final double[] bestScore = { Double.MAX_VALUE };
        for(int i = 0; i < 100000; i++) {
            tasks.add(Executors.callable(() -> {
                Random random = new Random();
                double score = Math.pow(Math.sin(random.nextDouble()), 2) * Math.pow(Math.cos(random.nextDouble()), 2);
                if (score < bestScore[0]) {
                    synchronized (bestScore) {
                        if (score < bestScore[0]) {
                            bestScore[0] = score;
                        }
                    }
                }
            }));
        }

        long start = System.nanoTime();
        List<Future<Object>> futures = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())
                .invokeAll(tasks);
        for(Future<Object> future : futures) {
            future.get();
        }
        long end = System.nanoTime();
        System.out.printf("Calculation took %.3f ms%n", (end - start) / 1e6);
    }
}

8 ( ):

103.358

4 :

104.351

1 :

102.918

.

:

ScoreCalculatorFast.java:

public class ScoreCalculatorFast {

    public static void main(String[] args) throws InterruptedException {
        ScoreExecutor executor = new ScoreExecutor(Runtime.getRuntime().availableProcessors());
        List<ScoreExecutor.ScoreJob> jobs = new ArrayList<>();
        for(int i = 0; i < 100000; i++) {
            jobs.add(() -> {
                Random random = new Random();
                return Math.pow(Math.sin(random.nextDouble()), 2) * Math.pow(Math.cos(random.nextDouble()), 2);
            });
        }
        long start = System.nanoTime();
        executor.getBestScore(jobs);
        long end = System.nanoTime();
        System.out.printf("Calculation took %.3f ms%n", (end - start) / 1e6);
    }
}

8 :

19.624

4 :

24.275

1 :

41.357

, . , , , . , , .

:

ScoreExecutor.java:

public class ScoreExecutor {

    /**
     * A job that calculates a score
     */
    public static interface ScoreJob {

        /**
         * Calculate the score
         * @return the calculated score
         */
        double calculateScore();
    }

    // This is the threads that do all the work
    final ArrayList<ScoreThread> threads;

    ScoreExecutor(int numThreads) {
        // Create the threads
        threads = new ArrayList<>();
        for(int i = 0; i < numThreads; i++) {
            threads.add(new ScoreThread());
        }
        // Start them
        for(ScoreThread thread : threads) {
            thread.start();
        }
    }

    /**
     * Execute a collection of ScoreJobs and return the best score among them.
     * @param jobs The jobs to execute
     * @return The best score from the scores calculated by the jobs
     * @throws InterruptedException
     */
    public double getBestScore(Collection<ScoreJob> jobs) throws InterruptedException {
        ArrayList<ScoreJob> jobList = new ArrayList<>(jobs);
        // Start all threads
        int chunkSize = jobList.size() / threads.size();
        for(int i = 0; i < threads.size() - 1; i++) {
            threads.get(i).startJobs(jobList.subList(i * chunkSize, (i+1) * chunkSize));
        }
        // Start the last thread
        int lastIndex = threads.size() - 1;
        threads.get(lastIndex).startJobs(jobList.subList(lastIndex * chunkSize, jobList.size()));

        // Get the best score from each thread
        LinkedList<Double> threadScores = new LinkedList<>();
        for(ScoreThread thread : threads) {
            threadScores.add(thread.getBestScore());
        }
        // Calculate the best score
        double bestScore = Double.MAX_VALUE;
        for(Double score : threadScores) {
            if(score < bestScore) {
                bestScore = score;
            }
        }
        return bestScore;
    }

    /**
     * Worker thread
     */
    private class ScoreThread extends Thread {

        // If we're currently running a score calculation
        private volatile boolean run;

        // The current best score
        private volatile double bestScore;

        // Latch for synchronisation with the executor
        private CountDownLatch latch;

        // The list of jobs to execute
        private final LinkedList<ScoreJob> scoreJobs = new LinkedList<>();

        private void startJobs(Collection<ScoreJob> jobs) {
            synchronized (this) {
                if(!run) {
                    // Start the thread
                    scoreJobs.addAll(jobs);
                    latch = new CountDownLatch(1);
                    run = true;
                    this.notifyAll();
                } else {
                    throw new IllegalStateException("This thread is already running jobs");
                }
            }
        }

        /**
         * Get the best score at the end of the calculation.
         * Waits until all jobs are finished and then returns
         * this thread best score.
         * @return This threads best score
         * @throws InterruptedException
         */
        private double getBestScore() throws InterruptedException {
            // Wait for completion and return
            latch.await();
            return bestScore;
        }

        @Override
        public void run() {
            run = false;
            try {
                // External loop, run forever so we can run multiple jobs
                while (true) {
                    // Wait for a job to be started
                    synchronized (this) {
                        while (!run) {
                            wait();
                        }
                    }
                    // This threads best score
                    double bestScore = Double.MAX_VALUE;
                    ScoreJob job; // The current job
                    // Get a job
                    while((job = scoreJobs.poll()) != null) {
                        // Calculate the score
                        double score = job.calculateScore();
                        // Update the best score
                        if(score < bestScore) {
                            bestScore = score;
                        }
                    }
                    // We're done, update the best score and release the latch
                    this.bestScore = bestScore;
                    latch.countDown();
                    // Set run to false so we wait for the next batch of jobs
                    run = false;
                }
            } catch(InterruptedException e) {
                e.printStackTrace();
                return;
            }
        }
    }
}

, , . , .

+1

All Articles