According to the Spring Batch documentation, reloading a job is supported out of the box, but I can't get it to start from where it was on the left. for example, if my step has processed 10 records, it should start with record 11 with processing whenever I restart it. In practice, this does not happen. He reads from the beginning and processes everything.
Does anyone have a simple Java configuration configuration that reads a delimited file and writes the contents to a db table that can be restarted from the moment it stops?
@Configuration public class BatchConfiguration { @Value("${spring-batch.databaseType}") private String databaseType; @Value("${spring-batch.databaseSchema}") private String schemaName; @Bean public JobBuilderFactory jobBuilderFactory(final JobRepository jobRepository) { return new JobBuilderFactory(jobRepository); } @Bean public StepBuilderFactory stepBuilderFactory(final JobRepository jobRepository, final PlatformTransactionManager transactionManager) { return new StepBuilderFactory(jobRepository, transactionManager); } @Bean public JobRepository jobRepository(final DataSource dataSource, final PlatformTransactionManager transactionManager) { final JobRepositoryFactoryBean bean = new JobRepositoryFactoryBean(); bean.setDatabaseType(databaseType); bean.setDataSource(dataSource); if (StringUtils.isNotBlank(schemaName)) { bean.setTablePrefix(schemaName); } bean.setTransactionManager(transactionManager); try { bean.afterPropertiesSet(); return bean.getObject(); } catch (final Exception e) { throw new BatchConfigurationException("Invalid batch job repository configuration.", e); } } @Bean public JobLauncher jobLauncher(final JobRepository jobRepository) { final SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository); return jobLauncher; } } @Configuration @EnableScheduling @ComponentScan("com.some.package") public class BatchJobConfiguration { @Resource private JobBuilderFactory jobBuilderFactory; @Resource private StepBuilderFactory stepBuilderFactory; @Value("${savings-transaction.file}") private String savingsTransactionFile; @Value("${savings-balance.file}") private String savingsBalanceFile; @Value("${processed-directory}") private String processedDirectory; private static final Integer IMPORT_CHUNKSIZE = 10; @Bean @DependsOn("stepBuilderFactory") public Step savingsTransactionStep(final PlatformTransactionManager transactionManager, @Qualifier("savingsTransactionItemReader") final ItemReader<SavingsTransactionItem> savingsTransactionItemReader, @Qualifier("savingsTransactionProcessor") final ItemProcessor<SavingsTransactionItem, SavingsTransaction> processor, @Qualifier("savingsTransactionItemWriter") final ItemWriter<SavingsTransaction> savingsTransactionItemWriter, @Qualifier("savingsTransactionStepListener") final SavingsTransactionStepListener listener) { return stepBuilderFactory.get("savingsTransactionStep") .transactionManager(transactionManager) .<SavingsTransactionItem, SavingsTransaction> chunk(IMPORT_CHUNKSIZE) .reader(savingsTransactionItemReader) .processor(processor) .writer(savingsTransactionItemWriter) .listener(listener) .build(); } @Bean public Step savingsTransactionCleanUpStep(final PlatformTransactionManager transactionManager, final JobRepository jobRepository) { final TaskletStep taskletStep = new TaskletStep("savingsTransactionCleanUpStep"); final FileMovingTasklet tasklet = new FileMovingTasklet(); tasklet.setFileNamePattern(savingsTransactionFile); tasklet.setProcessedDirectory(processedDirectory); taskletStep.setTasklet(tasklet); taskletStep.setTransactionManager(transactionManager); taskletStep.setJobRepository(jobRepository); try { taskletStep.afterPropertiesSet(); } catch (final Exception e) { throw new BatchConfigurationException("Failed to configure tasklet!", e); } return taskletStep; } @Bean @DependsOn("jobBuilderFactory") public Job job(final Step savingsTransactionStep, final Step savingsTransactionCleanUpStep) { return jobBuilderFactory.get("job") .incrementer(new RunIdIncrementer()) .start(savingsTransactionStep) .next(savingsTransactionCleanUpStep) .on("FINISHED") .end() .build() .build(); } }
Unit test, which restarts the task
final Date now = new Date(); jobMananger.processRegistrations(now); final List<SavingsBalance> savingsBalances = savingsBalanceDao.findAll(); assertEquals(9, savingsBalances.size()); FileUtils.moveFile(new File("target/AEA001_20160610.dat"), new File("target/AEA001_20160610_invalid.dat")); FileUtils.moveFile(new File("target/AEA001_20160610_valid.dat"), new File("target/AEA001_20160610.dat")); jobMananger.processRegistrations(now); final List<SavingsBalance> savingsBalances2 = savingsBalanceDao.findAll(); System.out.println(savingsBalances2.size()); int found = 0; for (final SavingsBalance savingsBalance : savingsBalances2) { final String id = savingsBalance.getId(); if ("12345".equals(id)) { found++; } } assertEquals("Invalid number of found balances!", 1, found);
Job Manager Implementation
public class JobManager { @Resource private JobLauncher jobLauncher; @Resource private Job job; @Transactional(propagation = Propagation.NOT_SUPPORTED) public void processRegistrations(final Date date) { try { final Map<String, JobParameter> parameters = new HashMap<>(); parameters.put("START_DATE", new JobParameter(date)); final JobParameters jobParameters = new JobParameters(parameters); final JobExecution execution = jobLauncher.run(job, jobParameters); LOG.info("Exit Status : " + execution.getStatus()); } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) { LOG.error("Failed to process registrations.", e); } } }