How to handle parallel and synchronous in spring integration?

Is it possible in spring integration to maintain synchronization of channels (to receive confirmation after sending a message), but to process more messages at the same time (process in parallel) without creating its own code with threads (to execute and send an employee in ExecutorService) and expect from them? I would like to upload files, I thought ftp, but loading more at the same time, without creating my own threads in the code. I need to know when all files are downloaded (so I want it to be in sync). Is this possible with the spring integration configuration like?

+4
java spring multithreading spring-integration ftp
Sep 09 '14 at 9:35
source share
2 answers

Well, it looks like you need a stream, for example:

  • <gateway> to send files to the channel and wait for some result as confirmation

  • <splitter> in ExecutorChannel to process each file in parallel

  • <int-ftp:outbound-gateway> to download each file

  • <aggregator> for correlating and grouping results <int-ftp:outbound-gateway>

  • <aggregator> should send its result to the <gateway> that is awaiting at this time.

Let me know if something is unclear.

UPDATE

How to do it in Spring Integrating Java DSL in any examples?

Something like that:

 @Configuration @EnableIntegration @IntegrationComponentScan public class Configuration { @Bean public IntegrationFlow uploadFiles() { return f -> f.split() .handle(Ftp.outboundGateway(this.ftpSessionFactory, AbstractRemoteFileOutboundGateway.Command.PUT, "'remoteDirectory'")) .aggregate(); } } @MessagingGateway(defaultRequestChannel = "uploadFiles.input") interface FtpUploadGateway { List<String> upload(List<File> filesToUpload); } 
+1
Sep 09 '14 at 10:27
source share

This is very possible in Spring using @Async task @Async .

First, create a service that will perform the task asynchronously. Here's the @Async annotation, the @Async method that will be scanned and tagged by Spring for asynchronous execution.

 import java.util.concurrent.Future; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; @Service public class AsyncTask { @Async public Future<Result> performTask(String someArgument) { // put the business logic here and collect the result below Result result = new Result(); // this is some custom bean holding your result return new AsyncResult<Result>(result); } } 

Next, create a component (optional - maybe from any other existing service) that will call the aforementioned service.

 import java.util.concurrent.Future; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class AsyncClass { @Autowired private AsyncTask asyncTask; public void doAsyncOperation() throws Exception { List<Future<Result>> futures = new ArrayList<Future<Result>>(); for (int i = 1; i < 10; i++) { // Simulate multiple calls Future<Result > future = doAsync(String.valueOf(i)); futures.add(future); } for (Future<Result > future : futures) { // fetch the result Result result = future.get(); // process the result } } private Future<Result> doAsync(final String someArgument) { // this will immediately return with a placeholder Future object which // can be used later to fetch the result Future<Result> future = asyncTask.performAsync(someArgument); return future; } } 

The xml sample configuration needed to enable async is shown below (for annotation-based use @EnableAsync)

 <task:annotation-driven executor="myExecutor" /> <task:executor id="myExecutor" pool-size="30" rejection-policy="CALLER_RUNS"/> 

For detailed documentation see here

+1
Sep 09 '14 at 10:26
source share



All Articles