I am using spring batch remote splitting for a batch process. I run jobs using spring batch admin.
I have a concurrency inbound gateway client step 10, but the maximum number of concurrent partitions is 8.
I want to increase consumer concurrency to 15 later.
Below is my configuration,
<task:executor id="taskExecutor" pool-size="50" /> <rabbit:template id="computeAmqpTemplate" connection-factory="rabbitConnectionFactory" routing-key="computeQueue" reply-timeout="${compute.partition.timeout}"> </rabbit:template> <int:channel id="computeOutboundChannel"> <int:dispatcher task-executor="taskExecutor" /> </int:channel> <int:channel id="computeInboundStagingChannel" /> <amqp:outbound-gateway request-channel="computeOutboundChannel" reply-channel="computeInboundStagingChannel" amqp-template="computeAmqpTemplate" mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" /> <beans:bean id="computeMessagingTemplate" class="org.springframework.integration.core.MessagingTemplate" p:defaultChannel-ref="computeOutboundChannel" p:receiveTimeout="${compute.partition.timeout}" /> <beans:bean id="computePartitionHandler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler" p:stepName="computeStep" p:gridSize="${compute.grid.size}" p:messagingOperations-ref="computeMessagingTemplate" /> <int:aggregator ref="computePartitionHandler" send-partial-result-on-expiry="true" send-timeout="${compute.step.timeout}" input-channel="computeInboundStagingChannel" /> <amqp:inbound-gateway concurrent-consumers="${compute.consumer.concurrency}" request-channel="computeInboundChannel" reply-channel="computeOutboundStagingChannel" queue-names="computeQueue" connection-factory="rabbitConnectionFactory" mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" /> <int:channel id="computeInboundChannel" /> <int:service-activator ref="stepExecutionRequestHandler" input-channel="computeInboundChannel" output-channel="computeOutboundStagingChannel" /> <int:channel id="computeOutboundStagingChannel" /> <beans:bean id="computePartitioner" class="org.springframework.batch.core.partition.support.MultiResourcePartitioner" p:resources="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/shares_rics/shares_rics_*.txt" scope="step" /> <beans:bean id="computeFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" p:resource="#{stepExecutionContext[fileName]}" p:lineMapper-ref="stLineMapper" scope="step" /> <beans:bean id="computeItemWriter" class="com.st.batch.foundation.writers.ComputeItemWriter" p:symfony-ref="symfonyStepScoped" p:timeout="${compute.item.timeout}" p:batchId="#{jobParameters[batch_id]}" scope="step" /> <step id="computeStep"> <tasklet transaction-manager="transactionManager"> <chunk reader="computeFileItemReader" writer="computeItemWriter" commit-interval="${compute.commit.interval}" /> </tasklet> </step> <flow id="computeFlow"> <step id="computeStep.master"> <partition partitioner="computePartitioner" handler="computePartitionHandler" /> </step> </flow> <job id="computeJob" restartable="true"> <flow id="computeJob.computeFlow" parent="computeFlow" /> </job> compute.grid.size = 112 compute.consumer.concurrency = 10 Input files are splited to 112 equal parts = compute.grid.size = total number of partitions Number of servers = 4.
There are 2 problems
i) Although I set concurrency to 10, the maximum number of threads will be 8.
ii)
some of them are slower than other processes run on them, and some are faster, so I want to make sure that the steps are distributed fairly, that is, if faster servers run with them, the rest of the remaining queues in the queue should go to them. It should not be distributed around the robbin fashion.
I know that in rabbitmq there is a prefetch counter setting and ack mode for distribution. For spring integration, the prefetch counter is 1 by default, and the ack mode is AUTO by default. But still, some servers continue to work with a large number of partitions, although other servers run for a long time. Ideally, no server should sit idle.
Update:
Another thing I'm observing right now is that for some steps performed in parallel using split (not distributed using remote splitting), max 8 also runs in parallel. This seems like a problem with thread pool restrictions, but as you see, taskExecutor has the pool size set to 50.
Is there anything in spring -batch / spring -batch-admin that limits the number of steps that can be taken at one time?
Second update:
And if there are 8 or more threads in the parallel processing elements, spring batch admin does not load. He is just hanging. If I reduce concurrency, spring loads the batch administrator. I even tested it with setting up concurrency 4 on one server and 8 on another server, spring batch admin does not load it. I use the URL of the server that is running 8 threads, but it works on the server where 4 threads are running.
Spring Batch Administration Administrator has the jobLauncher configuration below,
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> <property name="taskExecutor" ref="jobLauncherTaskExecutor" /> </bean> <task:executor id="jobLauncherTaskExecutor" pool-size="6" rejection-policy="ABORT" />
The pool size is 6, does it have anything to do with the problem?
Or is there anything in tomcat 7 that limits the number of threads running to 8?