Multithreading with Spring File Reader Batch File

In a Spring package, I'm trying to read a CSV file and want to assign each line to a separate thread and process it. I tried to achieve this using TaskExecutor, but everything that happens in the whole thread selects the same row at a time. I also tried to implement the concept using Partioner, the same thing happens there. See below my Xml configuration.

Step description

    <step id="Step2">
        <tasklet task-executor="taskExecutor">
            <chunk reader="reader" processor="processor" writer="writer" commit-interval="1" skip-limit="1">
            </chunk>
        </tasklet> 
    </step>

              <bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="file:cvs/user.csv" />

<property name="lineMapper">
    <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
      <!-- split it -->
      <property name="lineTokenizer">
            <bean
          class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
            <property name="names" value="userid,customerId,ssoId,flag1,flag2" />
        </bean>
      </property>
      <property name="fieldSetMapper">   

          <!-- map to an object -->
          <bean
            class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
            <property name="prototypeBeanName" value="user" />
          </bean>           
      </property>

      </bean>
  </property>

       </bean>

      <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
 <property name="concurrencyLimit" value="4"/>   

I tried with different types of task executors, but they all behave the same. How can I assign each row to a separate thread?

+4
source share
3 answers

FlatFileItemReader . CSV CSV, MultiResourcePartitioner . 2 : (, 10 ), . , , .

:

<batch:job id="csvsplitandprocess">
     <batch:step id="step1" next="step2master">
    <batch:tasklet>
        <batch:chunk reader="largecsvreader" writer="csvwriter" commit-interval="500">
        </batch:chunk>
    </batch:tasklet>
    </batch:step>
    <batch:step id="step2master">
    <partition step="step2" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</batch:step>
</batch:job>

<batch:step id="step2">
    <batch:tasklet>
        <batch:chunk reader="smallcsvreader" writer="writer" commit-interval="100">
        </batch:chunk>
    </batch:tasklet>
</batch:step>


<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="corePoolSize" value="10" />
            <property name="maxPoolSize" value="10" />
    </bean>

<bean id="partitioner" 
class="org.springframework.batch.core.partition.support.MultiResourcePartitioner">
<property name="resources" value="file:cvs/extracted/*.csv" />
</bean>

Custom Thread-safe Reader, , , , .

+6

, .

: ( ).

, :

  • , ( )
  •    .
  • .

, :

com.test.partitioner.RangePartitioner:

public Map<String, ExecutionContext> partition() {

    Map < String, ExecutionContext > result = new HashMap < String, ExecutionContext >();

    int range = 1;
    int fromId = 1;
    int toId = range;

    for (int i = 1; i <= gridSize; i++) {
        ExecutionContext value = new ExecutionContext();

        log.debug("\nStarting : Thread" + i);
        log.debug("fromId : " + fromId);
        log.debug("toId : " + toId);

        value.putInt("fromId", fromId);
        value.putInt("toId", toId);

        // give each thread a name, thread 1,2,3
        value.putString("name", "Thread" + i);

        result.put("partition" + i, value);

        fromId = toId + 1;
        toId += range;

    }

    return result;
}

- > outPut

: Thread1 fromId: 1 toId: 1

: Thread2 fromId: 2 toId: 2

: Thread3 fromId: 3 toId: 3

: Thread4 fromId: 4 toId: 4

: Thread5 fromId: 5 toId: 5

: Thread6 fromId: 6 toId: 6

: Thread7 fromId: 7 toId: 7

: Thread8 fromId: 8 toId: 8

: Thread9 fromId: 9 toId: 9

: Thread10 fromId: 10 toId: 10

:

http://www.springframework.org/schema/batch/spring -batch-2.2.xsd       http://www.springframework.org/schema/beans      http://www.springframework.org/schema/beans/spring-beans-3.2.xsd" >

<import resource="../config/context.xml" />
<import resource="../config/database.xml" />

<bean id="mouvement" class="com.test.model.Mouvement" scope="prototype" />

<bean id="itemProcessor" class="com.test.processor.CustomItemProcessor" scope="step">
    <property name="threadName" value="#{stepExecutionContext[name]}" />
</bean>
<bean id="xmlItemWriter" class="com.test.writer.ItemWriter" />

<batch:job id="mouvementImport" xmlns:batch="http://www.springframework.org/schema/batch">
    <batch:listeners>
        <batch:listener ref="myAppJobExecutionListener" />
    </batch:listeners>

    <batch:step id="masterStep">
        <batch:partition step="slave" partitioner="rangePartitioner">
            <batch:handler grid-size="10" task-executor="taskExecutor" />
        </batch:partition>
    </batch:step>
</batch:job>

<bean id="rangePartitioner" class="com.test.partitioner.RangePartitioner" />

<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />

<batch:step id="slave">
    <batch:tasklet>
        <batch:listeners>
            <batch:listener ref="stepExecutionListener" />
        </batch:listeners>

        <batch:chunk reader="mouvementReader" writer="xmlItemWriter" processor="itemProcessor" commit-interval="1">
        </batch:chunk>

    </batch:tasklet>
</batch:step>



<bean id="stepExecutionListener" class="com.test.listener.step.StepExecutionListenerCtxInjecter" scope="step" />

<bean id="myAppJobExecutionListener" class="com.test.listener.job.MyAppJobExecutionListener" />

<bean id="mouvementReaderParent" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">

    <property name="resource" value="classpath:XXXXX/XXXXXXXX.csv" />

    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <property name="lineTokenizer">
                <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                    <property name="delimiter" value="|" />
                    <property name="names"
                        value="id,numen,prenom,grade,anneeScolaire,academieOrigin,academieArrivee,codeUsi,specialiteEmploiType,natureSupport,dateEffet,modaliteAffectation" />
                </bean>
            </property>
            <property name="fieldSetMapper">
                <bean class="com.test.mapper.MouvementFieldSetMapper" />
            </property>
        </bean>
    </property>

</bean>

<!--    <bean id="itemReader" scope="step" autowire-candidate="false" parent="mouvementReaderParent">-->
<!--        <property name="resource" value="#{stepExecutionContext[fileName]}" />-->
<!--    </bean>-->

<bean id="mouvementReader" class="com.test.reader.MouvementItemReader" scope="step">
    <property name="delegate" ref="mouvementReaderParent" />
    <property name="parameterValues">
        <map>
            <entry key="fromId" value="#{stepExecutionContext[fromId]}" />
            <entry key="toId" value="#{stepExecutionContext[toId]}" />
        </map>
    </property>
</bean>

<!--    <bean id="xmlItemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">-->
<!--        <property name="resource" value="file:xml/outputs/Mouvements.xml" />-->
<!--        <property name="marshaller" ref="reportMarshaller" />-->
<!--        <property name="rootTagName" value="Mouvement" />-->
<!--    </bean>-->

<bean id="reportMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
    <property name="classesToBeBound">
        <list>
            <value>com.test.model.Mouvement</value>
        </list>
    </property>
</bean>

TODO: , ( ), , java.

.

+1

, Partitionner , .

<batch:job id="transformJob">
<batch:step id="deleteDir" next="cleanDB">
    <batch:tasklet ref="fileDeletingTasklet" />
</batch:step>
<batch:step id="cleanDB" next="split">
    <batch:tasklet ref="countThreadTasklet" />
</batch:step>
<batch:step id="split" next="partitionerMasterImporter">
    <batch:tasklet>
        <batch:chunk reader="largeCSVReader" writer="smallCSVWriter" commit-interval="#{jobExecutionContext['chunk.count']}" />
    </batch:tasklet>
</batch:step>
<batch:step id="partitionerMasterImporter" next="partitionerMasterExporter">
    <partition step="importChunked" partitioner="filePartitioner">
        <handler grid-size="10" task-executor="taskExecutor" />
    </partition>
</batch:step>

, ( Github)

.

0
source

All Articles