Need to process multiple files in parallel in Spring Integration

I have an SFTP directory and reading files and sending files for further processing in ServiceActivator. At any time, I need to process them in parallel using a handler.

Here is my DSL stream for SPring integration.

IntegrationFlows.from(Sftp.inboundAdapter(getSftpSessionFactory()) .temporaryFileSuffix("COPY") .localDirectory(directory) .deleteRemoteFiles(false) .preserveTimestamp(true) .remoteDirectory("remoteDir")) .patternFilter("*.txt")), e -> e.poller(Pollers.fixedDelay(500).maxMessagesPerPoll(5))) .handle("mybean", "myMethod") .handle(Files.outboundAdapter(new File("success"))) .deleteSourceFiles(true) .autoCreateDirectory(true)) .get(); 

Update: here is my ThreadPoolExecutor:

 @Bean(name = "executor") public Executor getExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(4); executor.setMaxPoolSize(4); executor.setQueueCapacity(20); executor.initialize(); return executor; } 
+3
java spring spring-integration dsl spring-integration-sftp
Mar 11 '16 at 23:03
source share
1 answer

Sftp.inboundAdapter() ( SftpInboundFileSynchronizingMessageSource ) in any case returns deleted files one at a time. First of all, it synchronizes them with the local directory and only after that polling them to process messages as a File payload.

To process them in parallel, this would be enough to add taskExecutor to the definition of e.poller() , and all those maxMessagesPerPoll(5) will be distributed across different threads.

+1
Mar 11 '16 at 23:28
source share



All Articles