当前位置: 首页 > news >正文

基于 Spring Batch 和 XXL-Job 的批处理任务实现

springbatch springinteger批处理作业

1 添加依赖

<dependency><groupId>org.springframework.batch</groupId><artifactId>spring - batch - integration</artifactId><version>你的 Spring Batch 版本</version>
</dependency>
<!-- 通常还需搭配 Spring Batch 核心、Spring Integration 相关基础依赖,比如 -->
<dependency><groupId>org.springframework.batch</groupId><artifactId>spring - batch - core</artifactId><version>你的 Spring Batch 版本</version>
</dependency>
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring - integration - core</artifactId><version>你的 Spring Integration 版本</version>
</dependency>

spring - batch - integration 依赖:它能帮你将 Spring Batch(批处理框架 )与 Spring Integration(企业集成模式框架 )结合,实现批处理作业和外部系统(如消息队列 )交互。

2 核心概念

spring - batch - integration 主要用于:
(1)触发批处理作业:通过 Spring Integration 的消息(如从消息队列获取消息 )
(2)触发 Spring Batch 作业执行,实现事件驱动的批处理。
作业结果交互:把 Spring Batch 作业执行结果(成功、失败、输出数据等 )通过 Spring Integration 发送到其他系统(如发消息通知监控平台 )。

关键组件和流程:
(1)JobLaunchingGateway:作为网关,接收消息(比如消息里包含作业参数 ),触发 Spring Batch 作业启动。
(2)JobExecutionEvent:作业执行过程中产生的事件(开始、结束、失败等 ),可通过 Spring Integration 通道传递,用于监听和后续处理。
(3)消息通道(Channel ):Spring Integration 里传递消息的通道,连接不同组件,像把触发作业的消息传到 JobLaunchingGateway,把作业事件消息传到监听器。

3 代码示例

3.1 包含以下几个核心部分

  • job定义:ptbTBJob() 定义了一个完整的批处理任务
  • Step 步骤:step01() 定义了任务的具体处理步骤,包括读取、处理、写入
  • 数据读取:multiResourceItemReader() 读取数据(通常是文件)
  • 数据写入:zjtbWriter() 处理并写入数据
  • 任务监听:PtbTBJobListener 监控任务执行前后的状态
  • 调度触发:通过 XXL-Job 定时调用这个批处理任务
    @Configuration@Slf4j@EnableBatchProcessingpublic class PtbTBBatchConfig {@Beanpublic Job ptbTBJob() {// 通过jobBuilderFactory构建一个Job,get方法参数为Job的namereturn jobBuilderFactory.get("ptbTBJob").incrementer(new RunIdIncrementer()).start(step01()).listener(ptbTBJobListener()).build();}@Beanpublic Step step01() {return stepBuilderFactory.get("step01").<PrpZjtbFirstdata, PrpZjtbFirstdata>chunk(5000).reader(multiResourceItemReader(""))
//                .processor(itemProcessor1())   这里注释掉了 processor,可能在 writer 中直接处理.writer(zjtbWriter()).build();}//  创建监听@Beanpublic PtbTBJobListener ptbTBJobListener() {return new PtbTBJobListener();}}

调度触发:通过 XXL-Job 定时调用这个批处理任务:

  • 监听类
@Slf4j
public class PtbTBJobListener implements JobExecutionListener {   @Overridepublic void beforeJob(JobExecution jobExecution) { String date = jobExecution.getJobParameters().getString("date");//  判断文件是否存在// SFTP文件下载流程开始}@Overridepublic void afterJob(JobExecution jobExecution) {String date = jobExecution.getJobParameters().getString("date");}}

@Slf4j
@Component
public class XxlJobFileReportHandler {  // 引入声明的job的实例
@Autowired@Qualifier("ptbTBJob")private Job ptbTBJob;/*** zjTbFileReportHandlerTiming 定时 T-1 拉取**日结文件,并入表*/@XxlJob("zjTbFileReportHandlerTiming")public void zjTbFileReportHandlerTiming() {StopWatch stopWatch = new StopWatch();stopWatch.start("Finished executing " + ZFTB_LOG_FLAG_TIMING + "zjTbFileReportHandlerTiming " + LocalDateTime.now().format(DATE_FORMATTER_YYYY_MM_DD) + " took ");List<RulProjectConfig> rulProjectConfigListFailed = new ArrayList<>();try {log.info(ZFTB_LOG_FLAG_TIMING + "zjTbFileReportHandlerTiming===>日期:{}", LocalDateTime.now().format(DATE_FORMATTER_YYYY_MM_DD));String date = LocalDate.now().minusDays(1).format(DATE_FORMATTER_YYYYMMDD);//拉取日结文件fileReportJob("zjTbFileReportHandlerTiming",date,ptbTBJob);log.info(ZFTB_LOG_FLAG_TIMING + "zjTbFileReportHandlerTiming结束执行===>日期:{}, 异常项目信息集合为:{}", LocalDateTime.now().format(DATE_FORMATTER_YYYY_MM_DD), rulProjectConfigListFailed.stream().map(RulProjectConfig::toLogString).collect(Collectors.toList()));} catch (Exception e) {log.error(ZFTB_LOG_FLAG_TIMING + "zjTbFileReportHandlerTiming执行异常===>日期:{}", LocalDateTime.now().format(DATE_FORMATTER_YYYY_MM_DD), e);XxlJobHelper.log(ZFTB_LOG_FLAG_TIMING + "zjTbFileReportHandlerTiming执行异常===>日期:{}, 异常为:{}", LocalDateTime.now().format(DATE_FORMATTER_YYYY_MM_DD), e.getMessage());XxlJobHelper.handleFail(ZFTB_LOG_FLAG_TIMING + "zjTbFileReportHandlerTiming执行异常===>异常为: " + e.getMessage());} finally {stopWatch.stop();log.info("{}{}ms", stopWatch.getLastTaskName(), stopWatch.getLastTaskTimeMillis());}}/*** 日结文件处理** @param handler       handler名称* @param date                       日期*/private void fileReportJob(String handler,String date,Job job) throws Exception {//拉取日结文件JobParameters jobParameters = new JobParametersBuilder(jobExplorer)
//               .getNextJobParameters(job)//生产需要注掉 因为自动生成的不可靠
//                .addString("fileName", name).addString("date", date)		// 日期能和业务时间关联起来 方便查询.toJobParameters();//启动jobJobExecution jobExecution = launcher.run(job, jobParameters);log.info("执行状态"+jobExecution.getExitStatus());log.info(handler+"执行完成===>日期:{}, 执行状态{}, 参数日期{}", LocalDateTime.now().format(DATE_FORMATTER_YYYY_MM_DD), jobExecution.getExitStatus(), date);}}
  • 参数传递:将日期参数date传入 Job

  • 生产环境中,通常需要手动构建JobParameters,确保参数可控且与业务关联,例如:
    .addString(“fileName”, name)
    .addString(“date”, date)
    .toJobParameters();

  • 自动生成实例id代码

 // 2. 调用getNextJobParameters,传入目标作业,自动生成唯一参数JobParameters jobParameters = parametersBuilder.getNextJobParameters(myBatchJob);// 3. 启动作业(使用自动生成的参数)jobLauncher.run(myBatchJob, jobParameters);

3.2 任务启动:

  • 先执行PtbTBJobListener的beforeJob()方法
  • 执行step01步骤:
  • 读取数据:multiResourceItemReader读取文件
  • 处理数据:这里注释掉了 processor,可能在 writer 中直接处理
  • 写入数据:zjtbWriter批量写入,每批 5000 条
  • 执行PtbTBJobListener的afterJob()方法

3.3 执行step01步骤 读取数据

@Slf4j
@EnableBatchProcessing
public class PtbTBBatchConfig {   @Bean@StepScopepublic ExtendedMultiResourceItemReader<PrpZjtbFirstdata> multiResourceItemReader(@Value("#{jobParameters['date']}") String date) {log.info("date->" + date);ExtendedMultiResourceItemReader<PrpZjtbFirstdata> resourceItemReader = new ExtendedMultiResourceItemReader<>();//测试
ExtendedMultiResourceItemReader<PrpZjtbFirstdata> resourceItemReader = new ExtendedMultiResourceItemReader<>();return resourceItemReader;/** 
*  写操作  里面也进行了处理
*
*/
@Bean@StepScopepublic ItemWriter<PrpZjtbFirstdata> zjtbWriter() {return new ItemWriter<PrpZjtbFirstdata>() {@Overridepublic void write(List<? extends PrpZjtbFirstdata> items) throws Exception {    log.info("重复数量:" + firstErrorCount);//重复数量+入库数量总数Long size = firstErrorCount + Long.parseLong(String.valueOf(i));log.info("成功入库数量:" + i);log.info("总数量:" + size);}}
}
}
}

4 思考Spring Batch 与 XXL-Job 结合使用,而不是直接在 XXL-Job 的逻辑层编写所有代码

4.1 区别

XXL-Job:专注于任务调度和触发
Spring Batch:专注于批处理逻辑的实现

  • 直接编写的问题:
    如果直接在 XXL-Job 中编写批处理逻辑,会导致调度框架和业务逻辑耦合代码会变得难以维护和扩展
  • 实际应用场景:
    当批处理逻辑变得复杂时(如需要重试、跳过错误、分区处理等)
    当需要监控和追踪批处理任务的执行状态时
    强大的批处理功能支持
4.1.1 Spring Batch 提供的关键功能:
  • 重试机制:自动处理临时错误
@Bean
public Step step() {return stepBuilderFactory.get("step").<Input, Output>chunk(1000).faultTolerant().retryLimit(3).retry(Exception.class).reader(reader()).writer(writer()).build();
}
``
  • 跳过策略:跳过特定错误继续处理
.faultTolerant()
.skipLimit(10)
.skip(Exception.class)
  • 分区处理:并行处理大量数据
@Bean
public Step partitionStep() {return stepBuilderFactory.get("partitionStep").partitioner("workerStep", partitioner()).step(workerStep()).gridSize(10).build();
}
  • 流程控制:复杂的作业流程定义
@Bean
public Job job() {return jobBuilderFactory.get("job").start(step1()).on("COMPLETED").to(step2()).from(step2()).on("*").to(step3()).end().build();
}
  • 完善的监控和错误处理
    Spring Batch 提供的监控功能:
    JobRepository:记录作业执行历史
@Autowired
private JobExplorer jobExplorer;public void listJobExecutions() {List<JobExecution> executions = jobExplorer.findJobExecutions(jobInstance);// 分析执行历史
}
  • 执行上下文:在步骤间共享数据

ExecutionContext stepExecutionContext = stepExecution.getExecutionContext();
stepExecutionContext.put(“key”, value);

  • 错误处理:灵活的错误处理策略
    java
    @Bean
    public Step errorHandlingStep() {
    return stepBuilderFactory.get(“errorHandlingStep”)
    .<Input, Output>chunk(1000)
    .faultTolerant()
    .skipPolicy(new CustomSkipPolicy())
    .listener(new StepExecutionListener() {
    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
    // 自定义错误处理逻辑
    return null;
    }
    })
    .reader(reader())
    .writer(writer())
    .build();
    }
http://www.dtcms.com/a/291900.html

相关文章:

  • linux c语言进阶 - 进程,通信方式
  • PHICOMM(斐讯)N1盒子 - Armbian25.05(Debian 12)刷入U盘/EMMC
  • Unity之C# 脚本与Unity Visual Scripting 交互
  • Java 网络编程详解:从基础到实战,彻底掌握 TCP/UDP、Socket、HTTP 网络通信
  • 【数据可视化-70】奶茶店销量数据可视化:打造炫酷黑金风格的可视化大屏
  • Vue + WebSocket 实时数据可视化实战:多源融合与模拟数据双模式设计
  • AI创作系列第22篇:前端缓存与更新机制重构 - 表情包系统的全面升级
  • 贪心算法Day4学习心得
  • 当直播间告别“真人时代”:AI数字人重构商业新秩序
  • haproxy七层代理新手入门详解
  • 零事故网站重构:11步标准化流程与风险管理指南
  • 第13天 | openGauss逻辑结构:表管理1
  • zabbix“专家坐诊”第295期问答
  • SPI的收发(W25Q64外部flash 和 内部flsah)
  • 小米视觉算法面试30问全景精解
  • Android常用的adb和logcat命令
  • 【bug】ubuntu20.04 orin nx Temporary failure resolving ‘ports.ubuntu.com‘
  • 【测试开发】---Bug篇
  • kafka主题管理详解 - kafka-topics.sh
  • Claude Code Kimi K2 环境配置指南 (Windows/macOS/Ubuntu)
  • 热点leetCode题
  • AI助力临床医学科研创新与效率双提升丨临床医学日常工作、论文高效撰写与项目申报、数据分析与可视化、机器学习建模等
  • Vercel AI SDK 3.0 学习入门指南
  • Java设计模式揭秘:深入理解模板方法模式
  • 一个简单实用的 WinForm 通用开发框架
  • 替代Oracle?金仓数据库用「敢替力」重新定义国产数据库
  • Pygame开源--谷歌小恐龙游戏(附彩蛋)
  • Custom SRP - Draw Calls
  • 从零构建智能对话助手:LangGraph + ReAct 实现具备记忆功能的 AI 智能体
  • Spring Boot 整合 Redis 实现发布/订阅(含ACK机制 - 事件驱动方案)