When should I use CompletionService over ExecutorService?

I just found a CompletionService in this blog post . However, this does demonstrate the benefits of CompletionService over the standard ExecutorService. The same code can be written using. So, when is it useful to use a CompletionService?

Can you give a short sample code to make it crystal clear? For example, this code example just shows where CompletionService is not required (= equivalent to ExecutorService)

ExecutorService taskExecutor = Executors.newCachedThreadPool(); // CompletionService<Long> taskCompletionService = // new ExecutorCompletionService<Long>(taskExecutor); Callable<Long> callable = new Callable<Long>() { @Override public Long call() throws Exception { return 1L; } }; Future<Long> future = // taskCompletionService.submit(callable); taskExecutor.submit(callable); while (!future.isDone()) { // Do some work... System.out.println("Working on something..."); } try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } 
+70
java multithreading concurrency
Feb 06 2018-11-11T00:
source share
10 answers

With ExecutorService , after you submit tasks to run, you need to manually write code to efficiently receive the results of completed tasks.

With CompletionService this is pretty much automated. The difference is not very obvious in the code you submitted, because you submit only one task. However, imagine that you have a list of tasks to submit. In the example below, several tasks are passed to the CompletionService. Then, instead of trying to figure out what task was completed (to get the results), it simply asks the CompletionService instance to return the results as they appear.

 public class CompletionServiceTest { class CalcResult { long result ; CalcResult(long l) { result = l; } } class CallableTask implements Callable<CalcResult> { String taskName ; long input1 ; int input2 ; CallableTask(String name , long v1 , int v2 ) { taskName = name; input1 = v1; input2 = v2 ; } public CalcResult call() throws Exception { System.out.println(" Task " + taskName + " Started -----"); for(int i=0;i<input2 ;i++) { try { Thread.sleep(200); } catch (InterruptedException e) { System.out.println(" Task " + taskName + " Interrupted !! "); e.printStackTrace(); } input1 += i; } System.out.println(" Task " + taskName + " Completed @@@@@@"); return new CalcResult(input1) ; } } public void test(){ ExecutorService taskExecutor = Executors.newFixedThreadPool(3); CompletionService<CalcResult> taskCompletionService = new ExecutorCompletionService<CalcResult>(taskExecutor); int submittedTasks = 5; for (int i=0;i< submittedTasks;i++) { taskCompletionService.submit(new CallableTask ( String.valueOf(i), (i * 10), ((i * 10) + 10 ) )); System.out.println("Task " + String.valueOf(i) + "subitted"); } for (int tasksHandled=0;tasksHandled<submittedTasks;tasksHandled++) { try { System.out.println("trying to take from Completion service"); Future<CalcResult> result = taskCompletionService.take(); System.out.println("result for a task availble in queue.Trying to get()"); // above call blocks till atleast one task is completed and results availble for it // but we dont have to worry which one // process the result here by doing result.get() CalcResult l = result.get(); System.out.println("Task " + String.valueOf(tasksHandled) + "Completed - results obtained : " + String.valueOf(l.result)); } catch (InterruptedException e) { // Something went wrong with a task submitted System.out.println("Error Interrupted exception"); e.printStackTrace(); } catch (ExecutionException e) { // Something went wrong with the result e.printStackTrace(); System.out.println("Error get() threw exception"); } } } } 
+88
Apr 07 2018-11-11T00:
source share

Omitting a lot of details:

  • ExecutorService = Incoming Queue + Workflows
  • CompletionService = Incoming Queue + Workflows + Output Queue
+141
Feb 08
source share

I think javadoc best answers the question of when is the CompletionService useful in the way of the ExecutorService .

A service that separates the release of new asynchronous tasks from the consumption of the results of completed tasks.

In principle, this interface allows the program to create manufacturers who create and present tasks (and even analyze the results of these representations), without knowing about any other consumers of the results of these tasks. Meanwhile, consumers who know about the CompletionService can poll for or take to get results without knowing about the manufacturers sending tasks.

For the record, I can be wrong, because it's pretty late, but I'm sure that the code sample in this blog post causes a memory leak. Without the active user taking the results from the internal ExecutorCompletionService queue, I'm not sure how the blogger expected the queue to drain.

+10
Feb 06 2018-11-11T00:
source share

Basically, you use a CompletionService if you want to perform several tasks in parallel, and then work with them in the order they were completed. So, if I complete 5 tasks, CompletionService will give me the first one that ends. An example where there is only one task does not provide an additional value over Executor , except for the ability to send Callable .

+9
Feb 06 2018-11-21T00:
source share

First of all, if we do not want to waste CPU time, we will not use

 while (!future.isDone()) { // Do some work... } 

We must use

 service.shutdown(); service.awaitTermination(14, TimeUnit.DAYS); 

The bad thing about this code is that it will disable ExecutorService . If we want to continue working with it (i.e., we have some kind of recursive task creation), we have two alternatives: invokeAll or ExecutorService .

invokeAll will wait until all tasks are completed. ExecutorService gives us the ability to take survey results one by one.

And of course a recursive example:

 ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUMBER); ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(executorService); while (Tasks.size() > 0) { for (final Task task : Tasks) { completionService.submit(new Callable<String>() { @Override public String call() throws Exception { return DoTask(task); } }); } try { int taskNum = Tasks.size(); Tasks.clear(); for (int i = 0; i < taskNum; ++i) { Result result = completionService.take().get(); if (result != null) Tasks.add(result.toTask()); } } catch (InterruptedException e) { // error :( } catch (ExecutionException e) { // error :( } } 
+4
Feb 03 2018-12-12T00:
source share

See it yourself at runtime, try to implement both solutions (Executorservice and Completionservice), and you will see how they behave, and it will be more understandable when to use one or the other. There is an example here if you want http://rdafbn.blogspot.co.uk/2013/01/executorservice-vs-completionservice-vs.html

+1
Jan 15 '13 at 10:04
source share

Let's say you have 5 long tasks (the called task), and you passed this task to the executer service. Now imagine that you do not want to wait for all 5 tasks to compete; instead, you want to do some processing on this task if it is completed. Now this can be done either by writing polling logic on future objects, or using this API.

+1
Jul 31 '15 at 11:42
source share

If the task creator is not interested in the results and the responsibility for processing the results of the asynchronous task performed by the executing service lies with another component, you should use CompletionService. This will help you separate the task result handler from the task producer. See an example http://www.zoftino.com/java-concurrency-executors-framework-tutorial

0
Jun 11 '18 at 15:01
source share

There is another advantage to using the completion service: performance

when you call future.get() , you expect rotation:

from java.util.concurrent.CompletableFuture

  private Object waitingGet(boolean interruptible) { Signaller q = null; boolean queued = false; int spins = -1; Object r; while ((r = result) == null) { if (spins < 0) spins = (Runtime.getRuntime().availableProcessors() > 1) ? 1 << 8 : 0; // Use brief spin-wait on multiprocessors else if (spins > 0) { if (ThreadLocalRandom.nextSecondarySeed() >= 0) --spins; } 

when you have a long-term task, it will be a disaster for performance.

If the service is completed when the task is completed, its result will be queued, and you will be able to query the queue with lower performance.

completion of service achieve this with the help of the wrap task with a done hook.

java.util.concurrent.ExecutorCompletionService

  private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; } 
0
Oct 25 '18 at 2:41
source share
 package com.barcap.test.test00; import java.util.concurrent.*; /** * Created by Sony on 25-04-2019. */ public class ExecutorCompletest00 { public static void main(String[] args) { ExecutorService exc= Executors.newFixedThreadPool( 10 ); ExecutorCompletionService executorCompletionService= new ExecutorCompletionService( exc ); for (int i=1;i<10;i++){ Task00 task00= new Task00( i ); executorCompletionService.submit( task00 ); } for (int i=1;i<20;i++){ try { Future<Integer> future= (Future <Integer>) executorCompletionService.take(); Integer inttest=future.get(); System.out.println(" the result of completion service is "+inttest); break; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } } 

=================================================== =====

 package com.barcap.test.test00; import java.util.*; import java.util.concurrent.*; /** * Created by Sony on 25-04-2019. */ public class ExecutorServ00 { public static void main(String[] args) { ExecutorService executorService=Executors.newFixedThreadPool( 9 ); List<Future> futList= new ArrayList <>( ); for (int i=1;i<10;i++) { Future result= executorService.submit( new Task00( i ) ); futList.add( result ); } for (Future<Integer> futureEach :futList ){ try { Integer inm= futureEach.get(); System.out.println("the result of future executorservice is "+inm); break; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } } 

=================================================== ==========

 package com.barcap.test.test00; import java.util.concurrent.*; /** * Created by Sony on 25-04-2019. */ public class Task00 implements Callable<Integer> { int i; public Task00(int i) { this.i = i; } @Override public Integer call() throws Exception { System.out.println(" the current thread is "+Thread.currentThread().getName() +" the result should be "+i); int sleepforsec=100000/i; Thread.sleep( sleepforsec ); System.out.println(" the task complted for "+Thread.currentThread().getName() +" the result should be "+i); return i; } } 

=================================================== =====================

difference in the logs for the executor termination service: the current thread is pool-1-thread-1, the result should be 1, the current thread is pool-1-thread-2, the result should be 2, the current thread is pool-1-thread - 3 result should be 3 current thread - pool-1-thread-4 result should be 4 current thread - pool-1-thread-6 result should be 6 current thread - pool-1-thread-5 result should be 5 current thread is pool-1-thread-7 the result should be 7 the current thread is pool-1-thread-9 the result should be 9 the current thread is pool-1-thread-8 the result should If 8 is back ca executed for a pool-1-thread 9, then the result should be 9, and the result - 9 - The task is made for a pool-1-thread-8. The result should be 8 - The task was completed for pool-1-thread-7. the result should be 7 task completed for pool-1-thread-6 result should be 6 task completed for pool-1-thread-5 result should be 5 task completed for pool-1-thread-4 result should be 4 task completed for pool -1-thread-3 result should be 3

task completed for pool-1-thread-2 result should be 2

the current thread is pool-1-thread-1, the result should be 1, the current thread is pool-1-thread-3, the result should be 3, the current thread is pool-1-thread-2, the result should be 2 current thread this pool-1-thread-5 result should be 5 current thread this pool-1-thread-4 result should be 4 current thread this pool-1-thread-6 result should be 6 current thread pool-1-thread-7 result should be 7 current thread - pool-1-thread-8 result should be 8 current thread - pool-1-thread-9 result should be 9 task completed for pool- 1-thread-9 result should be 9 task completed for pool -1-thread-8 result t should be 8 task completed for pool-1-thread-7 result should be 7 task completed for pool-1-thread-6 result should be 6 task completed for pool-1-thread-5 result should be 5 task completed for pool -1-thread-4 result should be 4 task completed for pool-1-thread-3 result should be 3 task completed for ol-1-thread-2 result should be 2 task completed for pool-1-thread-1 result should be 1 future result equals 1

=================================================== =====

for executorservice, the result will be available after all tasks have been completed.

executor completeservice any result that can be obtained, make this return.

0
Apr 25 '19 at 16:09
source share



All Articles