Waiting for the Future List

I have a method that returns a List futures

 List<Future<O>> futures = getFutures(); 

Now I want to wait until all futures are completed successfully or any of the tasks whose output will be returned by the future throws an exception. Even if one task throws an exception, it makes no sense to wait for other futures.

A simple approach would be

 wait() { For(Future f : futures) { try { f.get(); } catch(Exception e) { //TODO catch specific exception // this future threw exception , means somone could not do its task return; } } } 

But the problem here is that, for example, the 4th future throws an exception, then I will wait for the first 3 futures to be available.

How to solve this? Will back help snaps anyway? I can't use Future isDone because the java document says

 boolean isDone() Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true. 
+103
java multithreading future
Oct 13 '13 at 17:42 on
source share
10 answers

You can use the CompletionService to receive futures as soon as they are ready, and if one of them throws an exception, cancel the processing. Something like that:

 Executor executor = Executors.newFixedThreadPool(4); CompletionService<SomeResult> completionService = new ExecutorCompletionService<SomeResult>(executor); //4 tasks for(int i = 0; i < 4; i++) { completionService.submit(new Callable<SomeResult>() { public SomeResult call() { ... return result; } }); } int received = 0; boolean errors = false; while(received < 4 && !errors) { Future<SomeResult> resultFuture = completionService.take(); //blocks if none available try { SomeResult result = resultFuture.get(); received ++; ... // do something with the result } catch(Exception e) { //log errors = true; } } 

I think that you can improve to cancel tasks that are still in progress if one of them throws an error.

EDIT: I found a more complete example here: http://blog.teamlazerbeez.com/2009/04/29/java-completionservice/

+106
Oct 13 '13 at 18:00
source share

If you are using Java 8, then you can make it easier with CompletableFuture and CompletableFuture.allOf , which applies the callback only after all the provided CompletableFutures are completed.

 // Waits for *all* futures to complete and returns a list of results. // If *any* future completes exceptionally then the resulting future will also complete exceptionally. public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) { CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]); return CompletableFuture.allOf(cfs) .thenApply(ignored -> futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()) ); } 
+82
Mar 28 '16 at 11:55
source share

You can use ExecutorCompletionService . The documentation even has an example for your specific use case:

Suppose that you would like to use the first nonzero result of a set of tasks, ignoring all exceptions that occur, and discarding all other tasks when the first is ready:

 void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException { CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e); int n = solvers.size(); List<Future<Result>> futures = new ArrayList<Future<Result>>(n); Result result = null; try { for (Callable<Result> s : solvers) futures.add(ecs.submit(s)); for (int i = 0; i < n; ++i) { try { Result r = ecs.take().get(); if (r != null) { result = r; break; } } catch (ExecutionException ignore) { } } } finally { for (Future<Result> f : futures) f.cancel(true); } if (result != null) use(result); } 

It is important to note that ecs.take () will receive the first completed task, not just the first one submitted. So you should get them in order of completion (or throwing an exception).

+15
Oct 13 '13 at 18:45
source share

Use CompletableFuture in Java 8

  // Kick of multiple, asynchronous lookups CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1"); CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2"); CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3"); // Wait until they are all done CompletableFuture.allOf(page1,page2,page3).join(); logger.info("--> " + page1.get()); 
+4
Jun 24 '18 at 4:03
source share

If you are using Java 8 and don’t want to manipulate CompletableFuture s, I wrote a tool to extract the results for List<Future<T>> using streaming. The key is that you are forbidden to map(Future::get) as it throws.

 public final class Futures { private Futures() {} public static <E> Collector<Future<E>, Collection<E>, List<E>> present() { return new FutureCollector<>(); } private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>> { private final List<Throwable> exceptions = new LinkedList<>(); @Override public Supplier<Collection<T>> supplier() { return LinkedList::new; } @Override public BiConsumer<Collection<T>, Future<T>> accumulator() { return (r, f) -> { try { r.add(f.get()); } catch (InterruptedException e) {} catch (ExecutionException e) { exceptions.add(e.getCause()); } }; } @Override public BinaryOperator<Collection<T>> combiner() { return (l1, l2) -> { l1.addAll(l2); return l1; }; } @Override public Function<Collection<T>, List<T>> finisher() { return l -> { List<T> ret = new ArrayList<>(l); if (!exceptions.isEmpty()) throw new AggregateException(exceptions, ret); return ret; }; } @Override public Set<java.util.stream.Collector.Characteristics> characteristics() { return java.util.Collections.emptySet(); } } 

This requires an AggregateException that works like C #

 public class AggregateException extends RuntimeException { /** * */ private static final long serialVersionUID = -4477649337710077094L; private final List<Throwable> causes; private List<?> successfulElements; public AggregateException(List<Throwable> causes, List<?> l) { this.causes = causes; successfulElements = l; } public AggregateException(List<Throwable> causes) { this.causes = causes; } @Override public synchronized Throwable getCause() { return this; } public List<Throwable> getCauses() { return causes; } public List<?> getSuccessfulElements() { return successfulElements; } public void setSuccessfulElements(List<?> successfulElements) { this.successfulElements = successfulElements; } } 

This component acts just like C # Task.WaitAll . I am working on an option that does the same thing as CompletableFuture.allOf (equivalent to Task.WhenAll )

The reason I did this is because I use Spring ListenableFuture and don’t want to port to CompletableFuture , despite the fact that this is a more standard way

+1
Mar 07 '17 at 14:07 on
source share

Perhaps this would help (nothing would replace the raw thread, yes!) I suggest running every Future guy with a dedicated thread (they go in parallel), and then sometime one of the errors received, it simply signals the manager ( Handler class).

 class Handler{ //... private Thread thisThread; private boolean failed=false; private Thread[] trds; public void waitFor(){ thisThread=Thread.currentThread(); List<Future<Object>> futures = getFutures(); trds=new Thread[futures.size()]; for (int i = 0; i < trds.length; i++) { RunTask rt=new RunTask(futures.get(i), this); trds[i]=new Thread(rt); } synchronized (this) { for(Thread tx:trds){ tx.start(); } } for(Thread tx:trds){ try {tx.join(); } catch (InterruptedException e) { System.out.println("Job failed!");break; } }if(!failed){System.out.println("Job Done");} } private List<Future<Object>> getFutures() { return null; } public synchronized void cancelOther(){if(failed){return;} failed=true; for(Thread tx:trds){ tx.stop();//Deprecated but works here like a boss }thisThread.interrupt(); } //... } class RunTask implements Runnable{ private Future f;private Handler h; public RunTask(Future f,Handler h){this.f=f;this.h=h;} public void run(){ try{ f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation) }catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();} catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");} } } 

I have to say that the code above will be an error (not tested), but I hope I can explain this solution. Please, try.

0
Oct 13 '13 at 19:25
source share

CompletionService will take your Callables using the.submit () method, and you can get calculated futures using the.take () method.

Remember that the ExecutorService ends when you call the .shutdown () method. You can also call this method only when you have saved the link to the executor’s service, so be sure to save it.

Sample code. For a fixed number of work items to be processed in parallel:

 ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); CompletionService<YourCallableImplementor> completionService = new ExecutorCompletionService<YourCallableImplementor>(service); ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>(); for (String computeMe : elementsToCompute) { futures.add(completionService.submit(new YourCallableImplementor(computeMe))); } //now retrieve the futures after computation (auto wait for it) int received = 0; while(received < elementsToCompute.size()) { Future<YourCallableImplementor> resultFuture = completionService.take(); YourCallableImplementor result = resultFuture.get(); received ++; } //important: shutdown your ExecutorService service.shutdown(); 

Sample code. For the dynamic number of work items to be processed in parallel:

 public void runIt(){ ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); CompletionService<CallableImplementor> completionService = new ExecutorCompletionService<CallableImplementor>(service); ArrayList<Future<CallableImplementor>> futures = new ArrayList<Future<CallableImplementor>>(); //Initial workload is 8 threads for (int i = 0; i < 9; i++) { futures.add(completionService.submit(write.new CallableImplementor())); } boolean finished = false; while (!finished) { try { Future<CallableImplementor> resultFuture; resultFuture = completionService.take(); CallableImplementor result = resultFuture.get(); finished = doSomethingWith(result.getResult()); result.setResult(null); result = null; resultFuture = null; //After work package has been finished create new work package and add it to futures futures.add(completionService.submit(write.new CallableImplementor())); } catch (InterruptedException | ExecutionException e) { //handle interrupted and assert correct thread / work packet count } } //important: shutdown your ExecutorService service.shutdown(); } public class CallableImplementor implements Callable{ boolean result; @Override public CallableImplementor call() throws Exception { //business logic goes here return this; } public boolean getResult() { return result; } public void setResult(boolean result) { this.result = result; } } 
0
Nov 02 '15 at 21:40
source share
  /** * execute suppliers as future tasks then wait / join for getting results * @param functors a supplier(s) to execute * @return a list of results */ private List getResultsInFuture(Supplier<?>... functors) { CompletableFuture[] futures = stream(functors) .map(CompletableFuture::supplyAsync) .collect(Collectors.toList()) .toArray(new CompletableFuture[functors.length]); CompletableFuture.allOf(futures).join(); return stream(futures).map(a-> { try { return a.get(); } catch (InterruptedException | ExecutionException e) { //logger.error("an error occurred during runtime execution a function",e); return null; } }).collect(Collectors.toList()); }; 
0
Jul 25 '18 at 14:56
source share

I have a utility class that contains these:

 @FunctionalInterface public interface CheckedSupplier<X> { X get() throws Throwable; } public static <X> Supplier<X> uncheckedSupplier(final CheckedSupplier<X> supplier) { return () -> { try { return supplier.get(); } catch (final Throwable checkedException) { throw new IllegalStateException(checkedException); } }; } 

Once you do this using static imports, you can just wait for all futures, like this:

 futures.stream().forEach(future -> uncheckedSupplier(future::get).get()); 

You can also collect all of their results, like this:

 List<MyResultType> results = futures.stream() .map(future -> uncheckedSupplier(future::get).get()) .collect(Collectors.toList()); 

Just reviewing my old post and notice that you had one more grief:

But the problem here is that if, for example, the 4th future throws an exception, then I will unnecessarily wait for the first 3 futures to be available.

In this case, a simple solution is to do this in parallel:

 futures.stream().parallel() .forEach(future -> uncheckedSupplier(future::get).get()); 

Thus, the first exception, although it does not stop the future, will break the forEach statement, as in the sequential example, but since everyone expects in parallel, you do not have to wait for the completion of the first 3.

0
Jan 08 '19 at 18:49
source share

In case you want to combine the CompletableFutures list, you can do this:

 List<CompletableFuture<Void>> futures = new ArrayList<>(); // ... Add futures to this ArrayList of CompletableFutures // CompletableFuture.allOf() method demand a variadic arguments // You can use this syntax to pass a List instead CompletableFuture<Void> allFutures = CompletableFuture.allOf( futures.toArray(new CompletableFuture[futures.size()])); // Wait for all individual CompletableFuture to complete // All individual CompletableFutures are executed in parallel allFutures.get(); 

For more information on Future & CompletableFuture, useful links:
1. The Future: https://www.baeldung.com/java-future
2. CompletableFuture: https://www.baeldung.com/java-completablefuture
3. CompletableFuture: https://www.callicoder.com/java-8-completablefuture-tutorial/

0
Jun 24 '19 at 12:37
source share



All Articles