How can you restart the failed spring batch job and let it pick up where it left off?

According to the Spring Batch documentation, reloading a job is supported out of the box, but I can't get it to start from where it was on the left. for example, if my step has processed 10 records, it should start with record 11 with processing whenever I restart it. In practice, this does not happen. He reads from the beginning and processes everything.

Does anyone have a simple Java configuration configuration that reads a delimited file and writes the contents to a db table that can be restarted from the moment it stops?

@Configuration public class BatchConfiguration { @Value("${spring-batch.databaseType}") private String databaseType; @Value("${spring-batch.databaseSchema}") private String schemaName; @Bean public JobBuilderFactory jobBuilderFactory(final JobRepository jobRepository) { return new JobBuilderFactory(jobRepository); } @Bean public StepBuilderFactory stepBuilderFactory(final JobRepository jobRepository, final PlatformTransactionManager transactionManager) { return new StepBuilderFactory(jobRepository, transactionManager); } @Bean public JobRepository jobRepository(final DataSource dataSource, final PlatformTransactionManager transactionManager) { final JobRepositoryFactoryBean bean = new JobRepositoryFactoryBean(); bean.setDatabaseType(databaseType); bean.setDataSource(dataSource); if (StringUtils.isNotBlank(schemaName)) { bean.setTablePrefix(schemaName); } bean.setTransactionManager(transactionManager); try { bean.afterPropertiesSet(); return bean.getObject(); } catch (final Exception e) { throw new BatchConfigurationException("Invalid batch job repository configuration.", e); } } @Bean public JobLauncher jobLauncher(final JobRepository jobRepository) { final SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository); return jobLauncher; } } @Configuration @EnableScheduling @ComponentScan("com.some.package") public class BatchJobConfiguration { @Resource private JobBuilderFactory jobBuilderFactory; @Resource private StepBuilderFactory stepBuilderFactory; @Value("${savings-transaction.file}") private String savingsTransactionFile; @Value("${savings-balance.file}") private String savingsBalanceFile; @Value("${processed-directory}") private String processedDirectory; private static final Integer IMPORT_CHUNKSIZE = 10; @Bean @DependsOn("stepBuilderFactory") public Step savingsTransactionStep(final PlatformTransactionManager transactionManager, @Qualifier("savingsTransactionItemReader") final ItemReader<SavingsTransactionItem> savingsTransactionItemReader, @Qualifier("savingsTransactionProcessor") final ItemProcessor<SavingsTransactionItem, SavingsTransaction> processor, @Qualifier("savingsTransactionItemWriter") final ItemWriter<SavingsTransaction> savingsTransactionItemWriter, @Qualifier("savingsTransactionStepListener") final SavingsTransactionStepListener listener) { return stepBuilderFactory.get("savingsTransactionStep") .transactionManager(transactionManager) .<SavingsTransactionItem, SavingsTransaction> chunk(IMPORT_CHUNKSIZE) .reader(savingsTransactionItemReader) .processor(processor) .writer(savingsTransactionItemWriter) .listener(listener) .build(); } @Bean public Step savingsTransactionCleanUpStep(final PlatformTransactionManager transactionManager, final JobRepository jobRepository) { final TaskletStep taskletStep = new TaskletStep("savingsTransactionCleanUpStep"); final FileMovingTasklet tasklet = new FileMovingTasklet(); tasklet.setFileNamePattern(savingsTransactionFile); tasklet.setProcessedDirectory(processedDirectory); taskletStep.setTasklet(tasklet); taskletStep.setTransactionManager(transactionManager); taskletStep.setJobRepository(jobRepository); try { taskletStep.afterPropertiesSet(); } catch (final Exception e) { throw new BatchConfigurationException("Failed to configure tasklet!", e); } return taskletStep; } @Bean @DependsOn("jobBuilderFactory") public Job job(final Step savingsTransactionStep, final Step savingsTransactionCleanUpStep) { return jobBuilderFactory.get("job") .incrementer(new RunIdIncrementer()) .start(savingsTransactionStep) .next(savingsTransactionCleanUpStep) .on("FINISHED") .end() .build() .build(); } } 

Unit test, which restarts the task

  final Date now = new Date(); jobMananger.processRegistrations(now); final List<SavingsBalance> savingsBalances = savingsBalanceDao.findAll(); assertEquals(9, savingsBalances.size()); FileUtils.moveFile(new File("target/AEA001_20160610.dat"), new File("target/AEA001_20160610_invalid.dat")); FileUtils.moveFile(new File("target/AEA001_20160610_valid.dat"), new File("target/AEA001_20160610.dat")); jobMananger.processRegistrations(now); final List<SavingsBalance> savingsBalances2 = savingsBalanceDao.findAll(); System.out.println(savingsBalances2.size()); int found = 0; for (final SavingsBalance savingsBalance : savingsBalances2) { final String id = savingsBalance.getId(); if ("12345".equals(id)) { found++; } } assertEquals("Invalid number of found balances!", 1, found); 

Job Manager Implementation

 public class JobManager { @Resource private JobLauncher jobLauncher; @Resource private Job job; @Transactional(propagation = Propagation.NOT_SUPPORTED) public void processRegistrations(final Date date) { try { final Map<String, JobParameter> parameters = new HashMap<>(); parameters.put("START_DATE", new JobParameter(date)); final JobParameters jobParameters = new JobParameters(parameters); final JobExecution execution = jobLauncher.run(job, jobParameters); LOG.info("Exit Status : " + execution.getStatus()); } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) { LOG.error("Failed to process registrations.", e); } } } 
+5
source share
3 answers

It seems you need to configure the following beans to restart your job.

 @Bean public JobOperator jobOperator(final JobLauncher jobLauncher, final JobRepository jobRepository, final JobRegistry jobRegistry, final JobExplorer jobExplorer) { final SimpleJobOperator jobOperator = new SimpleJobOperator(); jobOperator.setJobLauncher(jobLauncher); jobOperator.setJobRepository(jobRepository); jobOperator.setJobRegistry(jobRegistry); jobOperator.setJobExplorer(jobExplorer); return jobOperator; } @Bean public JobExplorer jobExplorer(final DataSource dataSource) throws Exception { final JobExplorerFactoryBean bean = new JobExplorerFactoryBean(); bean.setDataSource(dataSource); bean.setTablePrefix("BATCH_"); bean.setJdbcOperations(new JdbcTemplate(dataSource)); bean.afterPropertiesSet(); return bean.getObject(); } 

Then you need to get the package instance identifier from the package tables in order to be able to reload this particular instance using jobOperator.

 final Long restartId = jobOperator.restart(id); final JobExecution restartExecution = jobExplorer.getJobExecution(restartId); 
+3
source

You start your work with the new JobParameters, so SB does not resume, but starts a new one.
If you want to resume work, you must remove the increment from the Job bean config.

0
source

Inside the JobManager class, instead of using JobLauncher, use the JobOperator.restart () method.

The reason your work does not restart from the last failed step is because you use JobLauncher to start another new task again and therefore it starts the task from the first step.

Make sure the "restartable" property is set to true (the default value is true).

Here is a sample code.

 public boolean resumeWorkflow(long executionId) throws WorkflowResumeServiceException { JobOperator jobOperator = (JobOperator) ApplicationContextProvider.getApplicationContext().getBean("jobOperator"); try { LOGGER.info("SUMMARY AFTER RESTART:" + jobOperator.getSummary(executionId)); jobOperator.restart(executionId); } } 

You need to get the jobExecutionid of the failed job and pass it to the above method.

Please note that a job that is completed with the FINISHED status cannot be restarted.

You can also read this post. Restarting a task.

0
source

All Articles