当前位置: 首页 > news >正文

Spring 源码硬核解析系列专题(扩展篇):Spring Batch 的恢复机制源码解析

在第九期中,我们深入探讨了 Spring Batch 的批处理流程,剖析了 Job 和 Step 的执行机制。在企业级应用中,批处理任务可能因异常(如数据库故障、网络中断)失败,如何从失败点恢复并继续执行,是 Spring Batch 的关键特性之一。本篇将聚焦 Spring Batch 的恢复机制,深入源码分析其实现原理,并补充相关图示。

1. 恢复机制的核心概念

Spring Batch 的恢复机制依赖以下组件:

  • JobRepository:持久化 Job 和 Step 的执行状态。
  • JobExecution:记录 Job 的运行信息(如状态、失败原因)。
  • StepExecution:记录 Step 的运行信息(如已处理条目数)。
  • Restartability:支持从失败点重启。

恢复的核心在于利用持久化状态,定位失败位置并跳过已完成的数据。

2. 恢复机制的配置

一个支持恢复的 Spring Batch 配置:

@Configuration
@EnableBatchProcessing
public class BatchConfig {
   
    @Bean
    public Job job(JobBuilderFactory jobBuilderFactory, Step step) {
   
        return jobBuilderFactory.get("recoverableJob")
                .start(step)
                .build();
    }

    @Bean
    public Step step(StepBuilderFactory stepBuilderFactory, DataSource dataSource) {
   
        return stepBuilderFactory.get("recoverableStep")
                .<String, String>chunk(10)
                .reader(reader(dataSource))
                .processor(processor())
                .writer(writer())
                .faultTolerant()
                .skip(IllegalArgumentException.class)
                .skipLimit(5)
                .build();
    }

    @Bean
    public ItemReader<String> reader(DataSource dataSource) {
   
        JdbcCursorItemReader<String> reader = new JdbcCursorItemReader<>();
        reader.setDataSource(dataSource);
        reader.setSql("SELECT name FROM items");
        reader.setRowMapper((rs, rowNum) -> rs.getString("name"));
        return reader;
    }

    @Bean
    public ItemProcessor<String, String> processor() {
   
        return item -> {
   
            if ("error".equals(item)) throw new IllegalArgumentException("Simulated error");
            return "Processed: " + item;
        };
    }

    @Bean
    public ItemWriter<String> writer() {
   
        return items -> items.forEach(System.out::println);
    }
}
  • faultTolerant():启用容错。
  • skip():跳过指定异常。
  • skipLimit():设置跳过次数上限。

3. JobRepository 的作用

JobRepository 使用数据库(如 BATCH_JOB_EXECUTIONBATCH_STEP_EXECUTION 表)持久化状态:

public interface JobRepository {
   
    JobExecution createJobExecution(String jobName, JobParameters jobParameters);
    void update(JobExecution jobExecution);
    void update(StepExecution stepExecution)

相关文章:

  • LeetCode hot 100—合并两个有序链表
  • 【Spark+Hive】基于Spark大数据技术小红书舆情分析可视化预测系统(完整系统源码+数据库+开发笔记+详细部署教程+虚拟机分布式启动教程)✅
  • Kotlin函数式编程与Lambda表达式
  • Docker安装Redpandata-console控制台
  • 华硕电脑开启电池保养模式的方法
  • 立体仓WMS同MES制造的协同
  • 2020最新Java面试题
  • opencv 模板匹配方法汇总
  • c语言笔记 数组篇
  • vue videojs使用canvas截取视频画面
  • Linux命令常用的有哪些?
  • nlp第十节——LLM相关
  • 013-2 订单支付超时自动取消订单(rocketmq jpa)
  • 迷你世界脚本玩家接口:Player
  • 蓝桥杯2025模拟三(01字符串)
  • Python Tornado 框架面试题及参考答案
  • 【leetcode hot 100 76】最小覆盖子串
  • TypeScript系列01-类型系统全解析
  • 【中值滤波器(Median Filter)详解】
  • go routine 并发和同步
  • 网站 vps/百分百营销软件官网
  • 网站建设代理公司/seo搜索引擎优化人员
  • 桂林漓江学院/搜索引擎简称seo
  • wordpress 数据 拆分/排名优化方法
  • 青岛网站设计建议i青岛博采网络/网站seo优化报告
  • 高端定制网站建设/海南seo顾问服务