当前位置: 首页 > news >正文

Springboot 集成 SpringBatch 批处理组件

Springboot 集成 SpringBatch 批处理组件

  • 1.Spring Batch 简介
  • 2.批处理工具架构和示例
    • 2.1 批处理任务持久化控制
    • 2.2 实现一个读取器
    • 2.3 定义批处理JOB
    • 2.4数据实体
    • 2.5 接口类
    • 4.6 H2 配置
    • 4.7 H2 数据库脚本
  • 3.测试

1.Spring Batch 简介

Spring Batch 是 Spring 生态系统中的​​企业级批处理框架​​,专门设计用于处理大规模数据作业。它提供了批处理应用所需的核心功能,解决了传统批处理应用开发中的重复性问题,使开发人员能够专注于业务逻辑而非基础设施。

  • 核心价值与定位​​

    ​​问题解决​​:自动化处理​​周期性的、数据密集型的​​任务(如报表生成、数据迁移、对账结算)
    ​​典型场景​​:
    每月财务报表生成
    银行日终批量交易处理
    电商平台每日用户行为分析
    百万级数据迁移(如旧系统到新系统)

2.批处理工具架构和示例

接口
ItemReader
处理ItemProcessor
ItemWriter

项目结构

在这里插入图片描述

依赖包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>SpringBatcher</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>21</maven.compiler.source><maven.compiler.target>21</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><spring-boot.version>3.5.3</spring-boot.version></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.38</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>4.0.3</version></dependency><dependency><groupId>com.h2database</groupId><artifactId>h2</artifactId><scope>runtime</scope> <!-- 通常只需运行时依赖 --></dependency></dependencies></project>

启动类

package org.example;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@EnableBatchProcessing
@SpringBootApplication
public class BatchApp {public static void main(String[] args) {SpringApplication.run(BatchApp.class, args);}
}

2.1 批处理任务持久化控制

示例代码基于 H2 存储

package org.example.config;import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;import javax.sql.DataSource;/*** @Author zhx && moon* @Since 21* @Date 2025-06-20 PM 4:21*/
@Configuration
public class BatchJobConfig {@Beanpublic JobRepository jobRepository(DataSource dataSource) throws Exception {JobRepositoryFactoryBean bean = new JobRepositoryFactoryBean();bean.setDataSource(dataSource);bean.setDatabaseType("H2");bean.setTransactionManager(new DataSourceTransactionManager(dataSource));bean.afterPropertiesSet();return bean.getObject();}}

2.2 实现一个读取器

以 Excel 文件读取为例

package org.example.job.common;import com.alibaba.excel.EasyExcel;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.InitializingBean;import java.io.File;
import java.util.List;/*** @Author zhx && moon* @Since 21* @Date 2025-06-24 PM 2:16*/
public class EasyExcelItemReader<T> implements ItemReader<T>, InitializingBean {private final Class<T> clazz;private final String filePath;private List<T> cacheList;private int index = 0;public EasyExcelItemReader(Class<T> clazz, String filePath) {this.clazz = clazz;this.filePath = filePath;}@Overridepublic void afterPropertiesSet() {try {// 一次性读取Excel所有数据(适用于中小文件)cacheList = EasyExcel.read(new File(filePath)).head(clazz).sheet().headRowNumber(1) // 跳过标题行.doReadSync();} catch (Exception e) {throw new RuntimeException("read excel failed ", e);}}@Overridepublic T read() {if (index < cacheList.size()) {return cacheList.get(index++);}// 重置读取的位置index = 0;return null;}
}

2.3 定义批处理JOB

package org.example.job;import org.example.entity.User;
import org.example.job.common.EasyExcelItemReader;
import org.springframework.batch.core.*;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;/*** @Author zhx && moon* @Since 21* @Date 2025-06-24 PM 2:08*/
@Component
public class SVCJob {/*** Excel 读取* @return*/@Bean("easyExcelItemReader")public EasyExcelItemReader<User> easyExcelItemReader() {return new EasyExcelItemReader<>(User.class, "C:\\Users\\Administrator\\Desktop\\Test.xlsx");}/*** 数据处理器 对读取的数据进行加工* @return*/@Bean("getNameProcessors")public ItemProcessor<User, String> getNameProcessors() {return item -> {return item.getName();};}/*** 配置写入器(保持不变)* @return*/@Bean("nameWriter")public ItemWriter<String> nameWriter() {return items -> {for (String item : items) {System.out.println("User Name: " + item);}};}/*** 配置批处理步骤(使用新版API)* @param jobRepository* @param transactionManager* @param reader* @param processor* @param writer* @return*/@Bean("easyExcelStep")public Step easyExcelStep(JobRepository jobRepository,PlatformTransactionManager transactionManager,@Qualifier("easyExcelItemReader") EasyExcelItemReader<User> reader,@Qualifier("getNameProcessors") ItemProcessor<User, String> processor,@Qualifier("nameWriter") ItemWriter<String> writer) {return new StepBuilder("easyExcelStep", jobRepository).<User, String>chunk(100, transactionManager).reader(reader).processor(processor).writer(writer).faultTolerant().skipLimit(1).skip(IllegalArgumentException.class).listener(new StepExecutionListener() {@Overridepublic void beforeStep(StepExecution stepExecution) {System.out.println("start to processor data ...");}}).build();}/*** 配置批处理作业* @param jobRepository* @param importStep* @return*/@Bean("easyExcelImportJobs")public Job customerImportJob(JobRepository jobRepository, @Qualifier("easyExcelStep") Step importStep) {return new JobBuilder("easyExcelImportJobs", jobRepository).incrementer(new RunIdIncrementer()).start(importStep).listener(new JobExecutionListener() {@Overridepublic void afterJob(JobExecution jobExecution) {System.out.println("Job Finished!State: " + jobExecution.getStatus());}}).build();}}

2.4数据实体

package org.example.entity;import com.alibaba.excel.annotation.ExcelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @Author zhx && moon* @Since 21* @Date 2025-06-24 PM 2:20*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {@ExcelProperty("姓名")private String name;@ExcelProperty("编号")private String employeeId;@ExcelProperty("年龄")private Integer age;}

2.5 接口类

package org.example.controller;import jakarta.annotation.Resource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @Author zhx && moon* @Since 21* @Date 2025-06-23 PM 4:40*/
@RestController
@RequestMapping("/job")
public class JobManage {@Autowiredprivate JobLauncher jobLauncher;@Resource(name = "easyExcelImportJobs")Job job;@GetMapping("/start")public void start(){try {JobParameters params = new JobParametersBuilder().addLong("uniqueId", System.nanoTime()).toJobParameters();jobLauncher.run(job, params);} catch (Exception e) {throw new RuntimeException(e);}}}

4.6 H2 配置

spring:datasource:url: jdbc:h2:file:Z:/IdeaProjects/SpringBatcher/SpringBatcher/springbatchdb  #jdbc:h2:tcp://localhost/mem:springbatchdb;DB_CLOSE_DELAY=-1 #jdbc:h2:mem:springbatchdbdriver-class-name: org.h2.Driverusername: sapassword: sah2:console:enabled: truepath: /h2/db-consolesettings:web-allow-others: truebatch:jdbc:initialize-schema: always

4.7 H2 数据库脚本

-- Autogenerated: do not edit this fileCREATE TABLE BATCH_JOB_INSTANCE  (JOB_INSTANCE_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,VERSION BIGINT ,JOB_NAME VARCHAR(100) NOT NULL,JOB_KEY VARCHAR(32) NOT NULL,constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ;CREATE TABLE BATCH_JOB_EXECUTION  (JOB_EXECUTION_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,VERSION BIGINT  ,JOB_INSTANCE_ID BIGINT NOT NULL,CREATE_TIME TIMESTAMP(9) NOT NULL,START_TIME TIMESTAMP(9) DEFAULT NULL ,END_TIME TIMESTAMP(9) DEFAULT NULL ,STATUS VARCHAR(10) ,EXIT_CODE VARCHAR(2500) ,EXIT_MESSAGE VARCHAR(2500) ,LAST_UPDATED TIMESTAMP(9),constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (JOB_EXECUTION_ID BIGINT NOT NULL ,PARAMETER_NAME VARCHAR(100) NOT NULL ,PARAMETER_TYPE VARCHAR(100) NOT NULL ,PARAMETER_VALUE VARCHAR(2500) ,IDENTIFYING CHAR(1) NOT NULL ,constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;CREATE TABLE BATCH_STEP_EXECUTION  (STEP_EXECUTION_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,VERSION BIGINT NOT NULL,STEP_NAME VARCHAR(100) NOT NULL,JOB_EXECUTION_ID BIGINT NOT NULL,CREATE_TIME TIMESTAMP(9) NOT NULL,START_TIME TIMESTAMP(9) DEFAULT NULL ,END_TIME TIMESTAMP(9) DEFAULT NULL ,STATUS VARCHAR(10) ,COMMIT_COUNT BIGINT ,READ_COUNT BIGINT ,FILTER_COUNT BIGINT ,WRITE_COUNT BIGINT ,READ_SKIP_COUNT BIGINT ,WRITE_SKIP_COUNT BIGINT ,PROCESS_SKIP_COUNT BIGINT ,ROLLBACK_COUNT BIGINT ,EXIT_CODE VARCHAR(2500) ,EXIT_MESSAGE VARCHAR(2500) ,LAST_UPDATED TIMESTAMP(9),constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,SHORT_CONTEXT VARCHAR(2500) NOT NULL,SERIALIZED_CONTEXT LONGVARCHAR ,constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,SHORT_CONTEXT VARCHAR(2500) NOT NULL,SERIALIZED_CONTEXT LONGVARCHAR ,constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_SEQ;

3.测试

启动服务

在这里插入图片描述

测试 H2 连接

在这里插入图片描述

测试数据

在这里插入图片描述

触发 JOB

在这里插入图片描述

JOB 执行记录

在这里插入图片描述

相关文章:

  • 软件著作权人的权利
  • 【系统分析师】高分论文:论软件开发模型及应用
  • GitHub vs GitLab 全面对比报告(2025版)
  • 大模型小模型选型手册:开源闭源、国内国外全方位对比
  • Vulkan 学习(18)---- 使用 ValidationLayer
  • Function Calling与MCP的区别
  • python训练day44 预训练模型
  • Windows系统安装鸿蒙模拟器
  • 原型设计Axure RP网盘资源下载与安装教程共享
  • wpf的Binding之UpdateSourceTrigger
  • Go语言--语法基础6--基本数据类型--数组类型(2)
  • MATLAB GUI界面设计 第七章——高级应用
  • 领域驱动设计(DDD)【26】之CQRS模式初探
  • 大语言模型训练阶段
  • Tang Prime 20K板OV2640例程
  • 30套精品论文答辩开题报告PPT模版
  • (八)聚类
  • node js入门,包含express,npm管理
  • 【3.3】Pod详解——容器探针部署第一个pod
  • Python 可迭代的对象、迭代器 和生成器(另一个示例:等差数列生成器)