当前位置: 首页 > news >正文

spring-batch深入了解

原文链接:深入了解 Spring 之 Spring Batch 框架

文章目录

  • 一. 概述
  • 二. 概念及原理
    • 1. JobLauncher
    • 2. Job
      • 2.1 JobExecution
      • 2.2 JobExecutionListener
    • 3. Step
      • 3.1 StepHandler
      • 3.2 Step 接口及实现类
        • 3.2.1 StepExecution
        • 3.2.2 AbstractStep 的子类
      • 3.3 StepListener
    • 4. TaskletStep
      • 4.1 RepeatOperations
      • 4.2 Tasklet
    • 5. Flow
    • 6. JobRepository 及 Entity
    • 7. JobOperator
    • 8. 总结
  • 三. 构建
    • 3.1 Job 构建
    • 3.2 Step 构建
    • 3.3 JobRepository 构建
    • 3.4 总结
  • 四. 总结

一. 概述

spring batch 是 spring 提供的一个数据处理框架,其功能包括记录/跟踪,事务管理,作业统计,作业重启,跳过和资源管理等。它还提供了更高级的技术服务和功能,通过优化和分区技术实现极高容量和高性能的批处理作业。

首先会对其框架所涉及到概念进行讲解,接着对其框架大体原理进行解读。

二. 概念及原理

1. JobLauncher

该接口是启动任务的主要入口,其入参是 Job 实例,以及 Job 对应的参数信息。其实现类为 SImpleJobLauncher 类,其里面有两个关键的属性成员:

  • jobRepository 保存任务或者检索任务的信息;

  • taskExecutor 任务执行器,主要是同步执行还是异步执行任务;

流程图如下:

每个 Step 的运行状态都有哪些,可以查看 BatchStatus 枚举类,后面有对其状态进行介绍。

2. Job

我们执行的任务,该任务就是 Job 接口下的实现类,其类图如下:

AbstractJob 的属性介绍:

  • restartable 是否允许重跑

  • name 任务名

  • listener 监听器

  • jobParametersIncrementer 获取下一个 JobParameters 对象,其实质是对 JobParameters 上添加一个自增或者随机的 KV 对象;

  • jobParametersValidator 是对 JobParameters 校验;

  • stepHandler 执行 Step 的适配器。

下面的子类是根据 Step 的组成规则不一样。

  • SimpleJob 是按照“顺序式”执行 Step。

  • FlowJob 按照“流式”执行 Step,其具体执行过程由 Flow 接口的子类来实现。

一个 Job 的执行过程如下:

SimpleJob 子类的 doExecute 方法

protected void doExecute(JobExecution execution) throws JobInterruptedException, JobRestartException,
StartLimitExceededException {  StepExecution stepExecution = null;for (Step step : steps) {//遍历集合中的step,通过StepHandler去执行StepstepExecution = handleStep(step, execution);if (stepExecution.getStatus() != BatchStatus.COMPLETED) {break;}}if (stepExecution != null) {//设置执行完状态以及退出状态execution.upgradeStatus(stepExecution.getStatus());execution.setExitStatus(stepExecution.getExitStatus());}
}

FlowJob 子类的 doExecute 方法

protected void doExecute(final JobExecution execution) throws JobExecutionException {try {JobFlowExecutor executor = new JobFlowExecutor(getJobRepository(),new SimpleStepHandler(getJobRepository()), execution);//通过Flow执行executor.updateJobExecutionStatus(flow.start(executor).getStatus());}catch (FlowExecutionException e) {if (e.getCause() instanceof JobExecutionException) {throw (JobExecutionException) e.getCause();}throw new JobExecutionException("Flow execution ended unexpectedly", e);}
}

2.1 JobExecution

从 SimpleLauncher 流程图中以及 Job 的流程图中,一直都有 JobExecution 影子存在;JobExecution 是记录 Job 执行过程中的信息;JobExecution 类含有的关键属性如下:

  • JobParameters 记录 Job 开始运行时传递过来的参数

  • stepExecutions 记录 Job 中每个 Step 的运行过程中的信息

  • status 记录 Job 运行的状态

    • COMPLETED -0-完成状态

    • STARTING -1-开始中状态,创建 JobExecution 以及 StepExecution 的默认状态

    • STARTED -2-已开始状态

    • STOPPING -3-停止中状态

    • STOPPED -4-已停止状态,主要是 JobInterruptedException 异常

    • FAILED -5-失败状态

    • ABANDONED -6-中断状态

    • UNKNOWN -7-未知状态,主要是系统异常,都归为未知异常

  • startTime 任务开始时间

  • createTime JobExecution 的创建时间

  • endTime 任务结束时间

  • lastUpdate 上一次更新时间

  • exitStatus 任务退出的状态,batch 提供了默认的几个状态

    • EXECUTING -1-执行中状态

    • COMPLETED -2-完成状态

    • NOOP -3-任务未做任何处理状态,

    • STOPPED -4-停止状态,主要是 JobInterruptedException 中断异常,使其停止

    • NO_SUCH_JOB , 没有对应的任务异常,主要是对于 NoSuchJobException 异常

    • FAILED -5-失败状态

    • UNKNOWN -6-未知状态,是 JobExecution 的默认状态,主要是系统异常,都归为未知异常。

  • executionContext 任务执行过程的上下文,主要用于参数的传递。

  • failureExceptions 保存任务执行异常堆栈信息

2.2 JobExecutionListener

Job 执行过程中提供了监听器,暴露的两个接口:

public interface JobExecutionListener {void beforeJob(JobExecution jobExecution);void afterJob(JobExecution jobExecution);
}

其实质并不是监听器,可以理解为前后置处理器,通过对 JobExecution 的修改,从而影响对 Job 在执行过程中的行为;

3. Step

在讲解 Step 之前,需要对其 StepHandler 接口进行解读,因为 Job 并不是直接调用 Step 的实现类的,而是通过 StepHandler 的实现类去调用 Step 的实现类的;换句话说,StepHandler 就是一个 Job 与 Step 之间的桥梁,充当适配器。

3.1 StepHandler

StepHandler 接口的实现类,主要是 SimpleStepHandler。其入口是 handleStep 方法,查阅其代码,大体得出其流程图如下:

整体而言的话,主要分为一下场景:

  • 如果上一个任务对应的 Step 执行成功,

3.2 Step 接口及实现类

Step 接口的抽象类 AbstractStep 是主要的 Step 执行的主要入口,接着里面会调用对应子类所有的特性方法 doExecute 方法;具体流程图如下:

上面的流程图,重点关注的状态以及退出状态的设置,

3.2.1 StepExecution

  • jobExecution 该 Step 所归属的 Job 对应的执行实例

  • stepName 名称

  • status 运行状态

  • COMPLETED -0-完成状态

  • STARTING -1-开始中状态,创建 JobExecution 以及 StepExecution 的默认状态

  • STARTED -2-已开始状态

  • STOPPING -3-停止中状态

  • STOPPED -4-已停止状态,主要是 JobInterruptedException 异常

  • FAILED -5-失败状态

  • ABANDONED -6-中断状态

  • UNKNOWN -7-未知状态,主要是系统异常,都归为未知异常

  • readCount 读次数的统计

  • writeCount 写次数的统计

  • commitCount 提交次数的统计

  • rollbackCount 回滚次数的统计

  • readSkipCount 跳过读次数的统计

  • processSkipCount 跳过处理次数的统计

  • writeSkipCount 跳过写入次数的统计

  • startTime 创建 StepExecution 的时间

  • endTime 结束时间

  • lastUpdated 上一次更新时间

  • executionContext step 执行上下文

  • exitStatus 退出状态

  • EXECUTING -1-执行中状态

  • COMPLETED -2-完成状态

  • NOOP -3-任务未做任何处理状态,

  • STOPPED -4-停止状态,主要是 JobInterruptedException 中断异常,使其停止

  • FAILED -5-失败状态

  • UNKNOWN -6-未知状态,是 JobExecution 的默认状态,主要是系统异常,都归为未知异常。

  • terminateOnly 是否中断标志

  • filterCount 过滤次数的统计

  • failureExceptions 异常堆栈信息

3.2.2 AbstractStep 的子类

类图如下:

  • PartitionStep

是有关将任务 Step 进行拆分处理的实例,具体交由其属性成员进行处理,该类相当于是一个门面对象。

看到 PartitionStep 中包含了三个关键的属性成员,具体工作是交给其成员完成的;

  • stepExecutionSplitter 将当前 Step 拆分多个 Step,其原理是基于当前 StepExecution 对象,创建多个子 StepExecution 对象;
  • partitionHandler 是将 stepExecutionSplitter 拆分后的子 StepExecution 集合进行执行,我们可以如下类图,在 TaskExecutionParitionHandler 中包含了一个 Step 属性,意味着多次执行属性 step 对象的动作的。

  • stepExecutionAggregator 是将 partitionHandler 执行完的结果进行归集;

在 spring-batch 中提供了默认的实现类,但我们可以拓展,可以有效的提供批数据的处理效率。例如,采用远程分片,如通过接口形式,消息中间件形式等方式。具体的做法是实现 PartitionHandler 接口。

  • TaskletStep

执行 Tasklet 接口的任务 Step。在我们常规开发中,主要是使用的就是该类;在 spring batch 框架,TaskletStep 是重点,而且也是较为复杂的;后面抽出来进行解读;我们先看一下其属性:

  • stepOperations 重复操作对象,意味着,Tasklet 不是一次性处理所有数据,而是分批次进行处理的;

  • chunkListener 监听器,处理过程中所触发的事件

  • interruptionPolicy 中断策略, 每次循环,都要去检测是否已经触发了中断;

  • stream 流操作, 主要是执行过程中打开,更新、关闭流对应的句柄;

  • transactionManager 事务管理器

  • transactionAttribute 事务属性

  • tasklet 执行数据处理的对象;

  • DecisionStep

是决策路由的任务 step,得配合 Flow 才能起到效果;在 Flow 中是根据 step 的退出状态 ExitStatus 来进行路由的;该类得实现 Decider 接口。

  • JobStep

执行子 Job 的任务 Step

  • FlowStep

执行 Flow 的任务 Step

在我们代码开发中,主要是围绕着 TaskletStep 以及 PartitionStep 进行数据处理的;

3.3 StepListener

类图如下:

在 AbstractStep 执行过程,主要触发的 StepExecutionListener

其余的监听器,都是 Tasklet 接口的子类所触发的;后面对 Tasklet 接口解读时,进行说明;

4. TaskletStep

我们先看 TaskletStep 的处理逻辑;

protected void doExecute(StepExecution stepExecution) throws Exception {stepExecution.getExecutionContext().put(TASKLET_TYPE_KEY, tasklet.getClass().getName());stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());//流更新操作stream.update(stepExecution.getExecutionContext());getJobRepository().updateExecutionContext(stepExecution);//创建信号锁final Semaphore semaphore = createSemaphore();//对当前数据处理任务进行拆分数据stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {@Overridepublic RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)throws Exception {      StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();      // 中断检测interruptionPolicy.checkInterrupted(stepExecution);      RepeatStatus result;try {//ChunkTransactionCallback 是TaskletStep内部类,在执行ChunkTransactionCallback时,//具体是交由Tasklet实例去进行数据处理;result = new TransactionTemplate(transactionManager, transactionAttribute).execute(new ChunkTransactionCallback(chunkContext, semaphore));}catch (UncheckedTransactionException e) {// Allow checked exceptions to be thrown inside callbackthrow (Exception) e.getCause();}chunkListener.afterChunk(chunkContext);// 中断检测interruptionPolicy.checkInterrupted(stepExecution);      return result == null ? RepeatStatus.FINISHED : result;}    });
}

4.1 RepeatOperations

类图如上,既然 RepeatOperations 是重复操作实例,该实现类就需要有一种机制,检测是否已经完成;我们看到 RepeatTemplate 类中有 CompletionPolicy 属性,是检测数据处理是否完成;如果在循环处理中,其中有一个循环出现了问题,需要如何处理该异常,是停止数据处理,向上抛出异常;还是简单的打印异常,继续下一次循环处理;

我们看到 TaskExecutionRepeatTemplate 类中有 TaskEsecutor 属性,意味可以采用异步的形式进行数据处理;

我们查看其关键的代码,流程图如下:

上面的流程图比较粗糙,大体罗列了核心的逻辑,细节的需要具体查看代码;

具体完成策略的,具体查看其实现类;这里不在罗列;

4.2 Tasklet

Tasklet 接口的主要的实现类为 ChunkOrientedTasklet,其类图如下:

其关键的属性:

  • chunkProcessor 对数据进行处理的接口,其实现类中包含的主要的接口为 ItemProcessor 和 ItemWriter。

  • ItemProcessor 是对 ItemReader 获取的数据进行处理

  • ItemWriter 是对 ItemProcessor 处理后的数据进行写入操作

  • ChunkProvider 提供数据的接口,其实现类中包含的主要的接口为 ItemReader。

  • ItemReader 是提供数据的接口

  • chunkListener 对 Chunk 进行监听;

流程图如下:

上面只是粗糙的罗列的主要的核心流程图,该流程图并未包含 FaultTolerantChunkProcessor 类的逻辑;

  • FaultTolerantChunkProcessor 主要是容错性处理,如执行失败时进行重试,重试依然失败,则执行对应回调处理;目前没有对其进行过多详细解读;有需要,可以重点去查阅该代码;

ItemReader、ItemProcessor 和 ItemWriter 这三个接口,提供了更加灵活的设置。如可以通过远程的形式去获取,处理,写入等操作;

在 batch 框架中,提供了很多有关 ItemReader,ItemProcessor 和 ItemWriter 接口的实现类;可以有效的减少开发人员的工作量;我们具体罗列我们常用的实现类。主要是文件解析,以及数据库操作等;

  • FlatFileItemReader 基于文件解析的,按照行进行解析的,解析规则交给 LineMapper 进行处理;

  • AbstractPagingItemReader 基于分页形式进行获取数据,其实现类大多数都是数据库查询;下面罗列了 JDBC 层的分页查询;

当然还有很多 ItemReader 的实现类,例如消息间的 JmsItemReader,KafkaItemReader 等

ItemWriter 也对应有文件以及数据库层面上的写入操作,类图如下:

5. Flow

相关类图如下:

Flow 接口的实现类为 SimpleFlow,主要的组成接口有 StateTransition 连接起来。而 StateTransition 中的 State 接口,其有很多实现类,类图如下:

从上类图来看,意味着 Flow 可以将 Step 和子 Flow 组合成链路起来,以及决策 DecisionState 来决定将要执行的下一个 State。我们具体看一下“链表“是如何工作的?大体的流程图如下:

简单来讲,其是通过 FlowExecutionStatus 来决定执行哪一个节点的,在 batch 中提供了默认的状态;

  • COMPLETED 完成状态

  • STOPPED 停止状态

  • FAILED 失败状态

  • UNKNOWN 位置状态

但是我们依然可以拓展其状态,来决定匹配到下一个节点的执行判断;

6. JobRepository 及 Entity

在 batch 中会记录各个 Job 以及 Step 的执行情况,其相对应的提供的主要的接口为 JobRepository,其类图如下:

在 SimpleJobRepository 包含里四个 dao 操作,其都是以接口的形式暴露出来;具体交由开发人员指定,是以内存形式操作,还是数据库形式操作;这里还有一些逻辑,这里不在进行解读,具体查看其代码;

每个 Job 以及 Step 在执行中都会创建一个执行实例 Execution 对象,来记录其过程信息,类图如下:

7. JobOperator

上面罗列了有关 Job 的运行过程的大体逻辑;并没有对其进行管理;

在 spring-batch 框架提供了这样的一个接口 JobOperator,可以对其进行管理;

我们重点关注其中几个方法,方法如下:

public interface JobOperator {//开启任务执行Long start(String jobName, String parameters) throws NoSuchJobException, JobInstanceAlreadyExistsException, JobParametersInvalidException;//重新启动任务Long restart(long executionId) throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,NoSuchJobException, JobRestartException, JobParametersInvalidException;//停止任务的执行boolean stop(long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException;//中断任务的执行JobExecution abandon(long jobExecutionId) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException;
}

8. 总结

上面大体罗列了 batch 中所涉及的概念以及对应的类图,粗略的流程图;

可能看着有点被绕晕了,但只要围绕 Job 的状态以及 Step 状态进行查看其过程,但我梳理了很久依然无法确认其状态间切换的准确逻辑,有点复杂;但我们可以不用管其状态间切换,因为依然有异常抛出,其会记录堆栈信息,我们去查看其堆栈信息即可;

Step 之间是如何传参的,这个需要重点指出;每个 Step 执行都会创建 StepExecution 对象,该对象中的 ExecutionContext 是只会在 Step 内部使用,并不会传递给下一个 Step;但 StepExecution 对象中包含 JobExecution 对象,我们可以对 JobExecution 对象中的 ExecutionContext 对象进行设置,这样子可以到下一个 StepExecution 可以拿到想要的参数;

另外完成策略接口 CompletionPolicy,可以具体查阅对应实现类,以及在其他类中使用的对应的实现类;相信会很快把握其逻辑信息;

三. 构建

上面罗列了关键的类,那么具体是如何创建这些类的;

3.1 Job 构建

Job 构建通过 JobBuilderHelper 的子类以及 FlowBuilder 进行构建;类图如下;

3.2 Step 构建

Step 构建通过 StepBuilderHelper 的子类进行构建,类图如下:

配置 JobRepository 通过 AbstractJobRepositoryFactoryBean 抽象中子类进行构建,类图就不展示;

3.3 JobRepository 构建

相关属性成员的介绍,待后续进行补充;

3.4 总结

相信,通过上述的构建的相关类结构进行展示,相信对整个 spring-batch 框架的使用以及如何构建有大概的了解;

四. 总结

本人使用其框架进行跑批的开发经验并没有太多,基本上都是比较粗糙的使用,很多细节并没有太多了解。之所以对其进行整理,一方面是为了后续开发基于 spring-batch 项目提供较好的开发指引。另外一方面,可以进一步对其原理进一步了解。

上面并没过多细节的进行讨论,毕竟整篇是对 Spring-batch 框架的关键类以及大体的运行的过程进行粗糙介绍;但通过本篇的大体了解,相信对开发有一定的指引,高效的构建 spring-batch 工程。

http://www.dtcms.com/a/419466.html

相关文章:

  • QML学习笔记(十六)QML的信号参数
  • 百度C++实习生面试题深度解析(上篇)
  • 网站培训视频建设银行淮安招聘网站
  • Http与WebSocket网络通信协议的对比
  • Docker 部署微服务项目详细步骤
  • 后端_本地缓存:零抖动的极速缓冲层
  • 【笔试强训Day02】1. ⽜⽜的快递(模拟)
  • 建设网站找哪家简洁页面心情网站
  • 广州站有高铁吗域名不变 网站改版
  • 防爬虫逆向日志爆炸,精简追踪不崩浏览器控制台
  • DevExpress WinForms v25.2新功能预览 - 即将升级富文本编辑器控件功能
  • C#知识学习-015(修饰符_4)
  • 有做分期海淘的网站吗苏州建设网站找网络公司
  • 全缓冲和行缓冲
  • 【C++语法】C++11——右值引用
  • 如何给网站做app平台网站建设 厦门
  • 鞍山网站建设工作室北京交易中心网站
  • 现代C++ Lambda表达式:最佳实践、深入理解和资源推荐
  • CUDA 内核中计算全局线程索引的标准方法
  • 刚做网站做多用户还是单用户企业建站的作用是什么
  • CUDA 13.0深度解析:统一ARM生态、UVM增强与GPU共享的革命
  • 佛山外贸网站建设咨询php网站上做微信支付功能
  • 公司网站中新闻中心怎样做优化网页设计图片怎么居中
  • 网站运营经验山东查询网站备案
  • 巴塘网站建设网站开发的论文课题
  • GuavaCache
  • 免费空间如何放网站搜索引擎优化培训免费咨询
  • LeetCode 53 最大子数字和(动态规划)
  • 如何为100Tops机器人“退烧”?世强芯片热管理方案,释放100%算力!
  • 【NodeJS】使用 NVM 安装 Node.js 22 并配置国内镜像加速