SpringBatch使用介绍
对于需要进行批处理的系统来说,SpringBatch是一把利器,可以轻松的实现大批量数据的批处理。
以下结合官方文档,介绍基础的概念
SpringBatch官方文档:https://springdoc.cn/spring-batch/index.html
一、介绍
使用场景
- 一般来说,一个典型的批处理程序:
- 从数据库、文件或队列中读取大量的记录。
- 以某种方式处理数据。
- 以修改后的形式写回数据。
Spring Batch将这种基本的批量迭代自动化,提供了将类似事务作为一个集合进行处理的能力,通常是在离线环境下,无需任何用户互动。批量作业是大多数IT项目的一部分,而Spring Batch是唯一提供强大的、企业级解决方案的开源框架。
业务场景
Spring Batch支持以下业务场景:
- 定期提交批处理。
- 并发批处理:对job进行并行处理。
- 分阶段、企业消息驱动的处理。
- 大规模并行批处理。
- 故障后手动或预定重启。
- 从属 step 的顺序处理(扩展到工作流驱动的批处理)。
- 部分处理:跳过记录(例如,在回滚时)。
- 整批事务,适用于小批量或现有存储程序或脚本的情况。
技术目标
Spring Batch有以下技术目标:
- 让批处理开发人员使用Spring编程模型: 专注于业务逻辑,让框架来处理基础设施。
- 在基础设施、批处理执行环境和批处理应用之间提供明确的关注点分离。
- 提供共同的、核心的执行服务,作为所有项目都可以实现的接口。
- 提供简单和默认的核心执行接口的实现,可以 “开箱即用”。
- 通过在所有层中使用Spring框架,使配置、定制和扩展服务变得容易。
- 所有现有的核心服务都应该易于替换或扩展,而不会对基础设施层造成任何影响。
- 提供一个简单的部署模型,架构JAR与应用程序完全分离,通过使用Maven构建。
二、基础概念
在SpringBatch中,一个 Job 有一至多个 step,每个 step 正好有一个 ItemReader,一个 ItemProcessor,和一个 ItemWriter。一个 Job 需要被启动(用 JobLauncher),关于当前运行进程的元数据需要被存储(在 JobRepository)。
Job
在 Spring Batch 中,一个 Job 是一个实体,它封装了整个批处理过程。然而,Job 只是一个 Step 实例的容器。它将逻辑上属于一个流程的多个 step 结合在一起,并允许配置所有 step 的全局属性,如重新启动的能力。job 配置包含:
- job 的名称
- Step 实例的定义和排序
- job 是否可以重新启动
如下是一个示例:
@Bean
public Job footballJob(JobRepository jobRepository) {return new JobBuilder("footballJob", jobRepository).start(playerLoad()).next(gameLoad()).next(playerSummarization()).build();
}
JobInstance
JobInstance 指的是一个逻辑job运行的概念。每个 JobInstance 可以有多个执行(JobExecution),并且在一个给定的时间只能运行一个 JobInstance(它对应于一个特定的 Job 和 JobParameters 标识)。JobInstance 的定义与要加载的数据完全没有关系。完全由 ItemReader 的实现来决定数据的加载方式。然而,使用相同的 JobInstance 决定了是否使用以前执行的 “state”(ExecutionContext)。使用一个新的 JobInstance 意味着 “从头开始”,而使用一个现有的实例通常意味着 “从你停止的地方开始”。
一个 Job 定义了什么是 job 以及如何执行工作,而 JobInstance 是一个纯粹的组织对象,将执行工作分组,主要是为了实现正确的重启语义。
JobParameters
一个 JobParameters 对象持有一组用于批处理job的参数。它们可以用于识别,甚至可以作为运行期间的参考数据,如下图所示:
JobInstance = Job + 指定的 JobParameters
JobExecution
一个 JobExecution 指的是运行一个 Job 的单一尝试的技术概念。一个 execution 可能以失败或成功告终,但对应于一个特定执行的 JobInstance 并不被认为是完整的,除非 execution 成功完成。JobExecution 是运行过程中实际发生的事情的主要存储机制,它包含了许多必须被控制和持久化的属性,如下表所示:
ExecutionContext
一个 ExecutionContext 代表了一个键/值对的集合,这些键/值对被持久化并由框架控制,为开发者提供了一个存储持久化状态的地方,这些持久化状态的 scope 是 StepExecution 对象或 JobExecution 对象。(对于那些熟悉Quartz的人来说,它与 JobDataMap 非常相似。)最好的使用例子是方便重新启动。以平面文件输入为例,在处理个别行时,框架会定期在提交点持久化 ExecutionContext。这样做可以让 ItemReader 存储其状态,以防在运行过程中发生致命错误,甚至停电。
注意,每个 JobExecution 至少有一个 ExecutionContext,每个 StepExecution 也有一个
Step
Step 是一个 domain 对象,它封装了批处理 Job 的一个独立、连续的阶段。因此,每个 Job 完全由一个或多个步骤(step)组成。一个 Step 包含定义和控制实际批处理的所有必要信息。这必然是一个模糊的描述,因为任何给定的 Step 的内容都由编写 Job 的开发人员决定。一个 Step 可以是简单的,也可以是复杂的,正如开发者所希望的那样。一个简单的 Step 可能从文件中加载数据到数据库中,只需要很少或没有代码(取决于使用的实现)。一个更复杂的 Step 可能有复杂的业务规则,作为处理的一部分被应用。与 Job 一样,一个 Step 有一个单独的 StepExecution,与一个独特的 JobExecution 相关,如下图所示:
Step一般是由一组分别实现了ItemReader、ItemWriter、ItemWriter接口的类组成
ItemReader
ItemReader 是一个抽象概念,它代表了对一个 Step 的输入的检索,一次一个 item。当 ItemReader 用完了它能提供的 item 时,它会通过返回 null 来表示这一点。你可以在Reader 和 Writer 中找到更多关于 ItemReader 接口及其各种实现的细节。
ItemWriter
ItemWriter 是一个抽象概念,它代表了一个 Step 的输出,每次都是一批或一大批的item。一般来说,ItemWriter 不知道它接下来应该收到的输入,只知道在其当前调用中传递的 item。你可以在 Reader 和 Writer 中找到更多关于 ItemWriter 接口及其各种实现的细节。
ItemProcessor
ItemProcessor 是一个抽象,它代表一个 item 的业务处理。当 ItemReader 读取一个item,而 ItemWriter 写入一个 item 时,ItemProcessor 提供了一个访问点来转换或应用其他业务处理。如果在处理该 item 时,确定该项目是无效的,返回 null 表示该 item 不应该被写出。你可以在 Reader 和 Writer 中找到更多关于 ItemProcessor 接口的细节。
StepExecution
一个 StepExecution 表示执行一个 Step 的单一尝试。每次运行一个 Step 都会创建一个新的 StepExecution,与 JobExecution 类似。然而,如果一个 Step 因为之前的 Step 失败而无法执行,则不会为其持续执行。一个 StepExecution 只有在其 Step 实际启动时才被创建。
Step 的执行由 StepExecution 类的对象表示。每个执行都包含对其相应 step 和 JobExecution 的引用,以及与事务相关的数据,如提交和回滚计数以及开始和结束时间。此外,每个 step 的执行都包含一个 ExecutionContext,它包含了开发者需要跨批处理运行的任何数据,例如重新启动所需的统计数据或状态信息。下表列出了 StepExecution 的属性:
JobRepository
JobRepository 是前面提到的所有 stereotype 的持久化机制。它为 JobLauncher、Job 和 Step 的实现提供 CRUD 操作。当一个 Job 第一次被启动时,一个 JobExecution 会从 repository 中获得。另外,在执行过程中,StepExecution 和 JobExecution 的实现通过传递给 repository 而被保存。
当使用 Java 配置时,@EnableBatchProcessing 注解提供了一个 JobRepository 作为自动配置的组件之一。
JobLauncher
JobLauncher 代表了一个简单的接口,用于用一组给定的 JobParameters 启动一个 Job,如下面的例子所示:
public interface JobLauncher {public JobExecution run(Job job, JobParameters jobParameters)throws JobExecutionAlreadyRunningException, JobRestartException,JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}