Use FutureTask for concurrency

I have a service like:

class DemoService { Result process(Input in) { filter1(in); if (filter2(in)) return... filter3(in); filter4(in); filter5(in); return ... } } 

Now I want it faster, and I found that some filters can run at the same time, while some filters must wait for others to complete. For example:

 filter1-- |---filter3-- filter2-- |---filter5 ---filter4-- 

which means:

1.filter1 and filter2 can be run simultaneously, so filter3 and filter4

2.filter3 and filter4 must wait for filter2 to complete.

one more thing :

if filter2 returns true, then the process method returns immediately and ignores the following filters.

now my solution uses FutureTask:

  // do filter work at FutureTask for (Filter filter : filters) { FutureTask<RiskResult> futureTask = new FutureTask<RiskResult>(new CallableFilter(filter, context)); executorService.execute(futureTask); } //when all FutureTask are submitted, wait for result for(Filter filter : filters) { if (filter.isReturnNeeded()) { FutureTask<RiskResult> futureTask = context.getTask(filter.getId()); riskResult = futureTask.get(); if (canReturn(filter, riskResult)) { returnOk = true; return riskResult; } } } 

my CallableFilter:

 public class CallableFilter implements Callable<RiskResult> { private Filter filter; private Context context; @Override public RiskResult call() throws Exception { List<Filter> dependencies = filter.getDependentFilters(); if (dependencies != null && dependencies.size() > 0) { //wait for its dependency filters to finish for (Filter d : dependencies) { FutureTask<RiskResult> futureTask = context.getTask(d.getId()); futureTask.get(); } } //do its own work return filter.execute(context); } } 

I want to know:

1. Is it a good idea to use FutureTask in this case? is there a better solution?

2. Overhead on the context of the flow context.

thanks!

+7
java multithreading concurrency futuretask
source share
2 answers

In Java 8, you can use CompletableFuture to link your filters one after another. Use the thenApply and thenCompose methods to add new asynchronous filters to the CompletableFuture - they will be executed after the previous step is completed. thenCombine combines two independent CompletableFutures when both are completed. Use allOf to wait for the result of more than two CompletableFuture objects.

If you cannot use Java 8, then Guava ListenableFuture can do the same, see Listening for the Future Explained . With Guava, you can wait for several independent filters to be completed using Futures.allAsList - this also returns ListenableFuture.

In both approaches, the idea is that after declaring your future actions, their dependencies on each other, and their flows, you return one Future object that encapsulates your final result.

EDIT: An early return can be implemented by explicitly filling out the CompletableFuture using the complete () method or using the Guava SettableFuture (which implements ListenableFuture)

+6
source share

You can use ForkJoinPool for parallelization, which is explicitly considered for this kind of parallel computing:

(...) The join () method and its variants are suitable for use only when the completion dependencies are acyclic; that is, parallel computation can be described as a directed acyclic graph (DAG) (...)

(see ForkJoinTask )

The advantage of ForkJoinPool is that each task can generate new tasks, as well as wait for them to complete without actually blocking the executable thread (which could otherwise lead to a deadlock if other tasks wait for others to finish than there are threads).

This is an example that should work so far, although it still has some limitations:

  • It ignores filtering results.
  • It does not prematurely terminate if filter 2 returns true
  • Exception handling is not implemented.

The main idea of ​​this code: each filter is presented as a Node , which may depend on other nodes (= filters that must be executed before this filter can execute). Dependent nodes are generated as parallel tasks.

 import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class Node<V> extends RecursiveTask<V> { private static final short VISITED = 1; private final Callable<V> callable; private final Set<Node<V>> dependencies = new HashSet<>(); @SafeVarargs public Node(Callable<V> callable, Node<V>... dependencies) { this.callable = callable; this.dependencies.addAll(Arrays.asList(dependencies)); } public Set<Node<V>> getDependencies() { return this.dependencies; } @Override protected V compute() { try { // resolve dependencies first for (Node<V> node : dependencies) { if (node.tryMarkVisited()) { node.fork(); // start node } } // wait for ALL nodes to complete for (Node<V> node : dependencies) { node.join(); } return callable.call(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } public boolean tryMarkVisited() { return compareAndSetForkJoinTaskTag((short) 0, VISITED); } } 

Usage example:

 public static void main(String[] args) { Node<Void> filter1 = new Node<>(filter("filter1")); Node<Void> filter2 = new Node<>(filter("filter2")); Node<Void> filter3 = new Node<>(filter("filter3"), filter1, filter2); Node<Void> filter4 = new Node<>(filter("filter4"), filter1, filter2); Node<Void> filter5 = new Node<>(filter("filter5"), filter3, filter4); Node<Void> root = new Node<>(() -> null, filter5); ForkJoinPool.commonPool().invoke(root); } public static Callable<Void> filter(String name) { return () -> { System.out.println(Thread.currentThread().getName() + ": start " + name); Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + ": end " + name); return null; }; } 
+1
source share

All Articles