Spring 源码硬核解析系列专题(扩展篇):Spring Batch 回滚机制源码解析
在第九期及“Spring Batch 恢复机制”扩展篇中,我们深入探讨了 Spring Batch 的批处理流程和故障恢复能力。在批处理中,当异常发生时,不仅需要跳过或重启,还可能需要回滚已执行的操作,以确保数据一致性。本篇将聚焦 Spring Batch 的回滚机制,深入源码分析其实现原理,并补充相关图示。
1. 回滚机制的核心概念
Spring Batch 的回滚机制依赖事务管理,确保每个 chunk(数据块)的处理要么全部成功,要么全部回滚。核心组件包括:
- TransactionManager:控制事务的开启、提交和回滚。
- ChunkContext:记录 chunk 处理的状态。
- StepExecution:跟踪事务执行的上下文。
- FaultTolerantStep:支持事务回滚的步骤。
回滚通常在 chunk 级别发生,与 Spring 的事务管理(第四期)深度集成。
2. 回滚机制的配置
一个支持回滚的 Spring Batch 配置:
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, Step step) {
return jobBuilderFactory.get("rollbackJob")
.start(step)
.build();
}
@Bean
public Step step(StepBuilderFactory stepBuilderFactory, DataSource dataSource, PlatformTransactionManager txManager) {
return stepBuilderFactory.get("rollbackStep")
.<String, String>chunk(10)
.reader(reader(dataSource))
.processor(processor())
.writer(writer(dataSource))
.transactionManager(txManager)
.faultTolerant()
.build();
}
@Bean
public ItemReader<String> reader(DataSource dataSource) {
JdbcCursorItemReader<String> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setSql("SELECT name FROM items");
reader.setRowMapper((rs, rowNum) -> rs.getString("name"));
return reader;
}
@Bean
public ItemProcessor<String, String> processor() {
return item -> {
if ("error".equals(item)) throw new RuntimeException("Simulated error");
return "Processed: " + item;
};
}
@Bean
public ItemWriter