Implementation of the task orchestration

I am having problems implementing an elegant functional style for a program that requires organizing various tasks. This is what I want to achieve.

I have three classes whose methods I want to streamline (simplified for brevity):

class TaskA { public ResultA call() { return new ResultA(); } } class TaskB { public ResultB call(ResultA a) { return new ResultB(); } } class TaskC { public ResultC call(List<ResultB> resultBs) { return new ResultC(); } } 

I need to perform TaskA 'n' times in parallel, and for each execution TaskA I need to perform TaskB 'n' times, using the result of the corresponding TaskA . Finally, I need to perform TaskC once, using the results of all calls TaskB .

One way to achieve this would be to create a Callable , which encapsulates the call TaskA and TaskB and finally in my main stream collects List of Future of ResultB to perform TaskC :

 class TaskATaskBCallable implements Callable<ResultB> { private TaskA taskA ...; private TaskB taskB ...; public ResultB call() { return taskB.call(taskA.call()); } } 

And in my main theme:

 private ResultC orchestrate() { ExecutorService service = ...; List<Callable<ResultB>> callables = ...; taskC.call(callables.map(callable -> service.submit(callable)).map(Future::get).collect(Collectors.toList()); } 

The only thing I do not like this solution - it TaskATaskBCallable . This is probably useless clutch classes TaskA and TaskB . Moreover, if I need to bind another problem with TaskA and TaskB , I have to change TaskATaskBCallable , may also change its name. I feel that I can get rid of it by using intelligent Java classes parallel libraries such as CompletableFuture or Phaser .

Any pointers?

+5
source share
2 answers

I found one way to do this using CompletableFuture :

 private ResultC orchestrate() { ExecutorService service = ...; int taskCount = ...; List<CompletableFuture<ResultB>> resultBFutures = IntStream.rangeClosed(1, taskCount) .mapToObj((i) -> CompletableFuture.supplyAsync(() -> new TaskA().call(), service)) .map(resultAFuture -> resultAFuture.thenApplyAsync(resultA -> new TaskB().call(resultA), service)) .collect(Collectors.toList()); return new TaskC().call(CompletableFuture.allOf(resultBFutures.toArray(new CompletableFuture[resultBFutures.size()])) .thenApply(v -> resultBFutures.stream().map(CompletableFuture::join) .collect(Collectors.toList())) .join()); } IntStream.rangeClosed ( private ResultC orchestrate() { ExecutorService service = ...; int taskCount = ...; List<CompletableFuture<ResultB>> resultBFutures = IntStream.rangeClosed(1, taskCount) .mapToObj((i) -> CompletableFuture.supplyAsync(() -> new TaskA().call(), service)) .map(resultAFuture -> resultAFuture.thenApplyAsync(resultA -> new TaskB().call(resultA), service)) .collect(Collectors.toList()); return new TaskC().call(CompletableFuture.allOf(resultBFutures.toArray(new CompletableFuture[resultBFutures.size()])) .thenApply(v -> resultBFutures.stream().map(CompletableFuture::join) .collect(Collectors.toList())) .join()); } map (CompletableFuture :: join). private ResultC orchestrate() { ExecutorService service = ...; int taskCount = ...; List<CompletableFuture<ResultB>> resultBFutures = IntStream.rangeClosed(1, taskCount) .mapToObj((i) -> CompletableFuture.supplyAsync(() -> new TaskA().call(), service)) .map(resultAFuture -> resultAFuture.thenApplyAsync(resultA -> new TaskB().call(resultA), service)) .collect(Collectors.toList()); return new TaskC().call(CompletableFuture.allOf(resultBFutures.toArray(new CompletableFuture[resultBFutures.size()])) .thenApply(v -> resultBFutures.stream().map(CompletableFuture::join) .collect(Collectors.toList())) .join()); } 
0
source

I think that CompletableFuture will be the most elegant:

  int taskCount = 100; List<ResultB> resultBs = IntStream.range(0, taskCount) .mapToObj(i -> new TaskA()) .map(taskA -> CompletableFuture.supplyAsync(taskA::call)) .map(completableFutureA -> completableFutureA.thenApplyAsync(new TaskB()::call)) .collect(Collectors.toList()) // collect, in order to kick off the async tasks .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); return new TaskC().call(resultBs); TaskB () :: call))  int taskCount = 100; List<ResultB> resultBs = IntStream.range(0, taskCount) .mapToObj(i -> new TaskA()) .map(taskA -> CompletableFuture.supplyAsync(taskA::call)) .map(completableFutureA -> completableFutureA.thenApplyAsync(new TaskB()::call)) .collect(Collectors.toList()) // collect, in order to kick off the async tasks .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); return new TaskC().call(resultBs); , in order to kick off the async tasks  int taskCount = 100; List<ResultB> resultBs = IntStream.range(0, taskCount) .mapToObj(i -> new TaskA()) .map(taskA -> CompletableFuture.supplyAsync(taskA::call)) .map(completableFutureA -> completableFutureA.thenApplyAsync(new TaskB()::call)) .collect(Collectors.toList()) // collect, in order to kick off the async tasks .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); return new TaskC().call(resultBs); 
-1
source

All Articles