Spring 源码硬核解析系列专题(扩展篇):Spring Batch 的恢复机制源码解析
在第九期中,我们深入探讨了 Spring Batch 的批处理流程,剖析了 Job 和 Step 的执行机制。在企业级应用中,批处理任务可能因异常(如数据库故障、网络中断)失败,如何从失败点恢复并继续执行,是 Spring Batch 的关键特性之一。本篇将聚焦 Spring Batch 的恢复机制,深入源码分析其实现原理,并补充相关图示。
1. 恢复机制的核心概念
Spring Batch 的恢复机制依赖以下组件:
- JobRepository:持久化 Job 和 Step 的执行状态。
- JobExecution:记录 Job 的运行信息(如状态、失败原因)。
- StepExecution:记录 Step 的运行信息(如已处理条目数)。
- Restartability:支持从失败点重启。
恢复的核心在于利用持久化状态,定位失败位置并跳过已完成的数据。
2. 恢复机制的配置
一个支持恢复的 Spring Batch 配置:
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, Step step) {
return jobBuilderFactory.get("recoverableJob")
.start(step)
.build();
}
@Bean
public Step step(StepBuilderFactory stepBuilderFactory, DataSource dataSource) {
return stepBuilderFactory.get("recoverableStep")
.<String, String>chunk(10)
.reader(reader(dataSource))
.processor(processor())
.writer(writer())
.faultTolerant()
.skip(IllegalArgumentException.class)
.skipLimit(5)
.build();
}
@Bean
public ItemReader<String> reader(DataSource dataSource) {
JdbcCursorItemReader<String> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setSql("SELECT name FROM items");
reader.setRowMapper((rs, rowNum) -> rs.getString("name"));
return reader;
}
@Bean
public ItemProcessor<String, String> processor() {
return item -> {
if ("error".equals(item)) throw new IllegalArgumentException("Simulated error");
return "Processed: " + item;
};
}
@Bean
public ItemWriter<String> writer() {
return items -> items.forEach(System.out::println);
}
}
faultTolerant()
:启用容错。skip()
:跳过指定异常。skipLimit()
:设置跳过次数上限。
3. JobRepository 的作用
JobRepository
使用数据库(如 BATCH_JOB_EXECUTION
和 BATCH_STEP_EXECUTION
表)持久化状态:
public interface JobRepository {
JobExecution createJobExecution(String jobName, JobParameters jobParameters);
void update(JobExecution jobExecution);
void update(StepExecution stepExecution)