当前位置: 首页 > wzjs >正文

网站外链接自己可以怎么做优化大师官方

网站外链接自己可以怎么做,优化大师官方,戴瑞企业网站建设需求,怎么修改网站首页logo目录标题 六、步骤对象 Step6.1 步骤介绍6.2 简单Tasklet6.3 chunk居于块Tasklet**ChunkTasklet 泛型** 6.4 步骤监听器6.5 多步骤执行6.6 步骤控制6.6.1 条件分支控制-使用默认返回状态6.6.2 条件分支控制-使用自定义状态值 6.7 步骤状态6.8 流式步骤 六、步骤对象 Step 前面…

目录标题

  • 六、步骤对象 Step
    • 6.1 步骤介绍
    • 6.2 简单Tasklet
    • 6.3 chunk居于块Tasklet
      • **ChunkTasklet 泛型**
    • 6.4 步骤监听器
    • 6.5 多步骤执行
    • 6.6 步骤控制
      • 6.6.1 条件分支控制-使用默认返回状态
      • 6.6.2 条件分支控制-使用自定义状态值
    • 6.7 步骤状态
    • 6.8 流式步骤

六、步骤对象 Step

前面一章节讲完了作业JOB的相关介绍,本章节重点讲解步骤。

6.1 步骤介绍

在这里插入图片描述

一般认为步骤是一个独立功能组件,因为它包含了一个工作单元需要的所有内容,比如:输入模块,输出模块,数据处理模块等。这种设计好处在哪?给开发者带来更自由的操作空间。

目前Spring Batch 支持2种步骤处理模式:
在这里插入图片描述

  • 简单具于Tasklet 处理模式

    这种模式相对简单,前面讲的都是居于这个模式批处理

    @Bean
    public Tasklet tasklet(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("Hello SpringBatch....");return RepeatStatus.FINISHED;}};
    }
    

    只需要实现Tasklet接口,就可以构建一个step代码块。循环执行step逻辑,直到tasklet.execute方法返回RepeatStatus.FINISHED

  • 居于块(chunk)的处理模式

    居于块的步骤一般包含2个或者3个组件:1>ItemReader 2>ItemProcessor(可选) 3>ItemWriter 。 当用上这些组件之后,Spring Batch 会按块处理数据。

6.2 简单Tasklet

学到这,我们写过很多简单Tasklet模式步骤,但是都没有深入了解过,这节就细致分析一下具有Tasklet 步骤使用。

先看下Tasklet源码

public interface Tasklet {@NullableRepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}

Tasklet 接口有且仅有一个方法:execute,

参数有2个:

StepContribution:步骤信息对象,用于保存当前步骤执行情况信息,核心用法:设置步骤结果状态contribution.setExitStatus(ExitStatus status)

contribution.setExitStatus(ExitStatus.COMPLETED);

ChunkContext:chuck上下文,跟之前学的StepContext JobContext一样,区别是它用于记录chunk块执行场景。通过它可以获取前面2个对象。

返回值1个:

RepeatStatus:当前步骤状态, 它是枚举类,有2个值,一个表示execute方法可以循环执行,一个表示已经执行结束。

public enum RepeatStatus {/*** 当前步骤依然可以执行,如果步骤返回该值,会一直循环执行*/CONTINUABLE(true), /*** 当前步骤结束,可以为成功也可以表示不成,仅代表当前step执行结束了*/FINISHED(false);
}    

在这里插入图片描述

需求:练习上面RepeatStatus状态

@SpringBootApplication
@EnableBatchProcessing
public class SimpleTaskletJob {@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Tasklet tasklet(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("------>" + System.currentTimeMillis());//return RepeatStatus.CONTINUABLE;  //循环执行return RepeatStatus.FINISHED;}};}@Beanpublic Step step1(){return stepBuilderFactory.get("step1").tasklet(tasklet()).build();}//定义作业@Beanpublic Job job(){return jobBuilderFactory.get("step-simple-tasklet-job").start(step1()).build();}public static void main(String[] args) {SpringApplication.run(SimpleTaskletJob.class, args);}}

6.3 chunk居于块Tasklet

居于块的Tasklet相对简单Tasklet来说,多了3个模块:ItemReader( 读模块), ItemProcessor(处理模块),ItemWriter(写模块), 跟它们名字一样, 一个负责数据读, 一个负责数据加工,一个负责数据写。

结构图:

*在这里插入图片描述

时序图:

*在这里插入图片描述
*
在这里插入图片描述

需求:简单演示chunk Tasklet使用

ItemReader ItemProcessor ItemWriter 都接口,直接使用匿名内部类方式方便创建

package com.langfeiyes.batch._08_step_chunk_tasklet;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.*;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;import java.util.Arrays;
import java.util.List;@SpringBootApplication
@EnableBatchProcessing
public class ChunkTaskletJob {@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic ItemReader itemReader(){return new ItemReader() {@Overridepublic Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {System.out.println("-------------read------------");return "read-ret";}};}@Beanpublic ItemProcessor itemProcessor(){return new ItemProcessor() {@Overridepublic Object process(Object item) throws Exception {System.out.println("-------------process------------>" + item);return "process-ret->" + item;}};}@Beanpublic ItemWriter itemWriter(){return new ItemWriter() {@Overridepublic void write(List items) throws Exception {System.out.println(items);}};}@Beanpublic Step step1(){return stepBuilderFactory.get("step1").chunk(3)  //设置块的size为3次.reader(itemReader()).processor(itemProcessor()).writer(itemWriter()).build();}//定义作业@Beanpublic Job job(){return jobBuilderFactory.get("step-chunk-tasklet-job").start(step1()).incrementer(new RunIdIncrementer()).build();}public static void main(String[] args) {SpringApplication.run(ChunkTaskletJob.class, args);}
}

执行完了之后结果

-------------read------------
-------------read------------
-------------read------------
-------------process------------>read-ret
-------------process------------>read-ret
-------------process------------>read-ret
[process-ret->read-ret, process-ret->read-ret, process-ret->read-ret]
-------------read------------
-------------read------------
-------------read------------
-------------process------------>read-ret
-------------process------------>read-ret
-------------process------------>read-ret
[process-ret->read-ret, process-ret->read-ret, process-ret->read-ret]
-------------read------------
-------------read------------
-------------read------------
-------------process------------>read-ret
-------------process------------>read-ret
-------------process------------>read-ret
[process-ret->read-ret, process-ret->read-ret, process-ret->read-ret]
....

观察上面打印结果,得出2个得出。

1>程序一直在循环打印,先循环打印3次reader, 再循环打印3次processor,最后一次性输出3个值。

2>死循环重复上面步骤

问题来了,为啥会出现这种效果,该怎么改进?

其实这个是ChunkTasklet 执行特点,ItemReader会一直循环读,直到返回null,才停止。而processor也是一样,itemReader读多少次,它处理多少次, itemWriter 一次性输出当前次输入的所有数据。

我们改进一下上面案例,要求只读3次, 只需要改动itemReader方法就行

int timer = 3;
@Bean
public ItemReader itemReader(){return new ItemReader() {@Overridepublic Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {if(timer > 0){System.out.println("-------------read------------");return  "read-ret-" + timer--;}else{return null;}}};
}

结果不在死循环了

-------------read------------
-------------read------------
-------------read------------
-------------process------------>read-ret-3
-------------process------------>read-ret-2
-------------process------------>read-ret-1
[process-ret->read-ret-3, process-ret->read-ret-2, process-ret->read-ret-1]

思考一个问题, 如果将timer改为 10,而 .chunk(3) 不变结果会怎样?

-------------read------------
-------------read------------
-------------read------------
-------------process------------>read-ret-10
-------------process------------>read-ret-9
-------------process------------>read-ret-8
[process-ret->read-ret-10, process-ret->read-ret-9, process-ret->read-ret-8]
-------------read------------
-------------read------------
-------------read------------
-------------process------------>read-ret-7
-------------process------------>read-ret-6
-------------process------------>read-ret-5
[process-ret->read-ret-7, process-ret->read-ret-6, process-ret->read-ret-5]
-------------read------------
-------------read------------
-------------read------------
-------------process------------>read-ret-4
-------------process------------>read-ret-3
-------------process------------>read-ret-2
[process-ret->read-ret-4, process-ret->read-ret-3, process-ret->read-ret-2]
-------------read------------
-------------process------------>read-ret-1
[process-ret->read-ret-1]

找出规律了嘛?

当chunkSize = 3 表示 reader 先读3次,提交给processor处理3次,最后由writer输出3个值

timer =10, 表示数据有10条,一个批次(趟)只能处理3条数据,需要4个批次(趟)来处理。

是不是有批处理味道出来

结论:chunkSize 表示: 一趟需要ItemReader读多少次,ItemProcessor要处理多少次。

ChunkTasklet 泛型

上面案例默认的是使用Object类型读、写、处理数据,如果明确了Item的数据类型,可以明确指定具体操作泛型。

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.*;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;import java.util.List;//开启 spring batch 注解--可以让spring容器创建springbatch操作相关类对象
@EnableBatchProcessing
//springboot 项目,启动注解, 保证当前为为启动类
@SpringBootApplication
public class ChunkTaskletJob {//作业启动器@Autowiredprivate JobLauncher jobLauncher;//job构造工厂---用于构建job对象@Autowiredprivate JobBuilderFactory jobBuilderFactory;//step 构造工厂--用于构造step对象@Autowiredprivate StepBuilderFactory stepBuilderFactory;int timer = 10;//读操作@Beanpublic ItemReader<String> itemReader(){return new ItemReader<String>() {@Overridepublic String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {if(timer > 0){System.out.println("-------------read------------");return "read-ret-->" + timer--;}else{return null;}}};}//处理操作@Beanpublic ItemProcessor<String, String> itemProcessor(){return new ItemProcessor<String, String>() {@Overridepublic String process(String item) throws Exception {System.out.println("-------------process------------>" + item);return "process-ret->" + item;}};}//写操作@Beanpublic ItemWriter<String> itemWriter(){return new ItemWriter<String>() {@Overridepublic void write(List<? extends String> items) throws Exception {System.out.println(items);}};}//构造一个step对象--chunk@Beanpublic Step step1(){//tasklet 执行step逻辑, 类似 Thread()--->可以执行runable接口return stepBuilderFactory.get("step1").<String, String>chunk(3)  //暂时为3.reader(itemReader()).processor(itemProcessor()).writer(itemWriter()).build();}@Beanpublic  Job job(){return jobBuilderFactory.get("chunk-tasklet-job").start(step1()).incrementer(new RunIdIncrementer()).build();}public static void main(String[] args) {SpringApplication.run(ChunkTaskletJob.class, args);}}

6.4 步骤监听器

前面我们讲了作业的监听器,步骤也有监听器,也是执行步骤执行前监听,步骤执行后监听。

步骤监听器有2个分别是:StepExecutionListener ChunkListener 意义很明显,就是step前后,chunk块执行前后监听。

先看下StepExecutionListener接口

public interface StepExecutionListener extends StepListener {void beforeStep(StepExecution stepExecution);@NullableExitStatus afterStep(StepExecution stepExecution);
}

需求:演示StepExecutionListener 用法

自定义监听接口

public class MyStepListener implements StepExecutionListener {@Overridepublic void beforeStep(StepExecution stepExecution) {System.out.println("-----------beforeStep--------->");}@Overridepublic ExitStatus afterStep(StepExecution stepExecution) {System.out.println("-----------afterStep--------->");return stepExecution.getExitStatus();  //不改动返回状态}
}
package com.langfeiyes.batch._09_step_listener;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
@EnableBatchProcessing
public class StepListenerJob {@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Tasklet tasklet(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("------>" + System.currentTimeMillis());return RepeatStatus.FINISHED;}};}@Beanpublic MyStepListener stepListener(){return new MyStepListener();}@Beanpublic Step step1(){return stepBuilderFactory.get("step1").tasklet(tasklet()).listener(stepListener())  .build();}//定义作业@Beanpublic Job job(){return jobBuilderFactory.get("step-listener-job1").start(step1()).incrementer(new RunIdIncrementer()).build();}public static void main(String[] args) {SpringApplication.run(StepListenerJob.class, args);}}

在step1方法中,加入:.listener(stepListener()) 即可

同理ChunkListener 操作跟上面一样

public interface ChunkListener extends StepListener {static final String ROLLBACK_EXCEPTION_KEY = "sb_rollback_exception";void beforeChunk(ChunkContext context);void afterChunk(ChunkContext context);void afterChunkError(ChunkContext context);
}

唯一的区别是多了一个afterChunkError 方法,表示当chunk执行失败后回调。

6.5 多步骤执行

到目前为止,我们演示的案例基本上都是一个作业, 一个步骤,那如果有多个步骤会怎样?Spring Batch 支持多步骤执行,以应对复杂业务需要多步骤配合执行的场景。

需求:定义2个步骤,然后依次执行

package com.langfeiyes.batch._10_step_multi;import com.langfeiyes.batch._09_step_listener.MyChunkListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
@EnableBatchProcessing
public class MultiStepJob {@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Tasklet tasklet1(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("--------------tasklet1---------------");return RepeatStatus.FINISHED;}};}@Beanpublic Tasklet tasklet2(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("--------------tasklet2---------------");return RepeatStatus.FINISHED;}};}@Beanpublic Step step1(){return stepBuilderFactory.get("step1").tasklet(tasklet1()).build();}@Beanpublic Step step2(){return stepBuilderFactory.get("step2").tasklet(tasklet2()).build();}//定义作业@Beanpublic Job job(){return jobBuilderFactory.get("step-multi-job1").start(step1()).next(step2()) //job 使用next 执行下一步骤.incrementer(new RunIdIncrementer()).build();}public static void main(String[] args) {SpringApplication.run(MultiStepJob.class, args);}
}

定义2个tasklet: tasklet1 tasklet2, 定义2个step: step1 step2 修改 job方法,从.start(step1()) 然后执行到 .next(step2())

Spring Batch 使用next 执行下一步步骤,如果还有第三个step,再加一个next(step3)即可

6.6 步骤控制

上面多个步骤操作,先执行step1 然后是step2,如果有step3, step4,那执行顺序也是从step1到step4。此时爱思考的小伙伴肯定会想,步骤的执行能不能进行条件控制呢?比如:step1执行结束根据业务条件选择执行step2或者执行step3,亦或者直接结束呢?答案是yes:设置步骤执行条件即可

Spring Batch 使用 start next on from to end 不同的api 改变步骤执行顺序。
在这里插入图片描述

6.6.1 条件分支控制-使用默认返回状态

需求:作业执行firstStep步骤,如果处理成功执行sucessStep,如果处理失败执行failStep

package com.langfeiyes.batch._11_step_condition;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
@EnableBatchProcessing
public class ConditionStepJob {@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Tasklet firstTasklet(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("--------------firstTasklet---------------");return RepeatStatus.FINISHED;//throw new RuntimeException("测试fail结果");}};}@Beanpublic Tasklet successTasklet(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("--------------successTasklet---------------");return RepeatStatus.FINISHED;}};}@Beanpublic Tasklet failTasklet(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("--------------failTasklet---------------");return RepeatStatus.FINISHED;}};}@Beanpublic Step firstStep(){return stepBuilderFactory.get("step1").tasklet(firstTasklet()).build();}@Beanpublic Step successStep(){return stepBuilderFactory.get("successStep").tasklet(successTasklet()).build();}@Beanpublic Step failStep(){return stepBuilderFactory.get("failStep").tasklet(failTasklet()).build();}//定义作业@Beanpublic Job job(){return jobBuilderFactory.get("condition-multi-job").start(firstStep()).on("FAILED").to(failStep()).from(firstStep()).on("*").to(successStep()).end().incrementer(new RunIdIncrementer()).build();}public static void main(String[] args) {SpringApplication.run(ConditionStepJob.class, args);}
}

观察给出的案例,job方法以 .start(firstStep()) 开始作业,执行完成之后, 使用onfrom 2个方法实现流程转向。

.on(“FAILED”).to(failStep()) 表示当**firstStep()**返回FAILED时执行。

.from(firstStep()).on(“*”).to(successStep()) 另外一个分支,表示当**firstStep()**返回 * 时执行。

上面逻辑有点像 if / else 语法

if("FAILED".equals(firstStep())){failStep();
}else{successStep();
}

几个注意点:

1> on 方法表示条件, 上一个步骤返回值,匹配指定的字符串,满足后执行后续 to 步骤

2> * 为通配符,表示能匹配任意返回值

3> from 表示从某个步骤开始进行条件判断

4> 分支判断结束,流程以end方法结束,表示if/else逻辑结束

5> on 方法中字符串取值于 ExitStatus 类常量,当然也可以自定义。

6.6.2 条件分支控制-使用自定义状态值

前面也说了,on条件的值取值于ExitStatus 类常量,具体值有:UNKNOWN,EXECUTING,COMPLETED,NOOP,FAILED,STOPPED等,如果此时我想自定义返回值呢,是否可行?答案还是yes:Spring Batch 提供JobExecutionDecider 接口实现状态值定制。

需求:先执行firstStep,如果返回值为A,执行stepA, 返回值为B,执行stepB, 其他执行defaultStep

分析:先定义一个决策器,随机决定返回A / B / C

public class MyStatusDecider implements JobExecutionDecider {@Overridepublic FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {long ret = new Random().nextInt(3);if(ret == 0){return new FlowExecutionStatus("A");}else if(ret == 1){return new FlowExecutionStatus("B");}else{return new FlowExecutionStatus("C");}}
}
package com.langfeiyes.batch._11_step_condition_decider;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
@EnableBatchProcessing
public class CustomizeStatusStepJob {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Tasklet taskletFirst(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("--------------taskletFirst---------------");return RepeatStatus.FINISHED;}};}@Beanpublic Tasklet taskletA(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("--------------taskletA---------------");return RepeatStatus.FINISHED;}};}@Beanpublic Tasklet taskletB(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("--------------taskletB---------------");return RepeatStatus.FINISHED;}};}@Beanpublic Tasklet taskletDefault(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("--------------taskletDefault---------------");return RepeatStatus.FINISHED;}};}@Beanpublic Step firstStep(){return stepBuilderFactory.get("firstStep").tasklet(taskletFirst()).build();}@Beanpublic Step stepA(){return stepBuilderFactory.get("stepA").tasklet(taskletA()).build();}@Beanpublic Step stepB(){return stepBuilderFactory.get("stepB").tasklet(taskletB()).build();}@Beanpublic Step defaultStep(){return stepBuilderFactory.get("defaultStep").tasklet(taskletDefault()).build();}//决策器@Beanpublic MyStatusDecider statusDecider(){return new MyStatusDecider();}//定义作业@Beanpublic Job job(){return jobBuilderFactory.get("customize-step-job").start(firstStep()).next(statusDecider()) .from(statusDecider()).on("A").to(stepA()).from(statusDecider()).on("B").to(stepB()).from(statusDecider()).on("*").to(defaultStep()).end().incrementer(new RunIdIncrementer()).build();}public static void main(String[] args) {SpringApplication.run(CustomizeStepJob.class, args);}}

反复执行,会返回打印的值有

--------------taskletA---------------
--------------taskletB---------------
--------------taskletDefault---------------    

它们随机切换,为啥能做到这样?注意,并不是firstStep() 执行返回值为A/B/C控制流程跳转,而是由后面**.next(statusDecider())** 决策器。

6.7 步骤状态

Spring Batch 使用ExitStatus 类表示步骤、块、作业执行状态,大体上有以下几种:

public class ExitStatus implements Serializable, Comparable<ExitStatus> {//未知状态public static final ExitStatus UNKNOWN = new ExitStatus("UNKNOWN");//执行中public static final ExitStatus EXECUTING = new ExitStatus("EXECUTING");//执行完成public static final ExitStatus COMPLETED = new ExitStatus("COMPLETED");//无效执行public static final ExitStatus NOOP = new ExitStatus("NOOP");//执行失败public static final ExitStatus FAILED = new ExitStatus("FAILED");//执行中断public static final ExitStatus STOPPED = new ExitStatus("STOPPED");...
}    

一般来说,作业启动之后,这些状态皆为流程自行控制。顺利结束返回:COMPLETED, 异常结束返回:FAILED,无效执行返回:NOOP, 这是肯定有小伙伴说,能不能编程控制呢?答案是可以的。

Spring Batch 提供 3个方法决定作业流程走向:

end():作业流程直接成功结束,返回状态为:COMPLETED

fail():作业流程直接失败结束,返回状态为:FAILED

stopAndRestart(step) :作业流程中断结束,返回状态:STOPPED 再次启动时,从step位置开始执行 (注意:前提是参数与Job Name一样)

**需求:当步骤firstStep执行抛出异常时,通过end, fail,stopAndRestart改变步骤执行状态 **

package com.langfeiyes.batch._12_step_status;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;//开启 spring batch 注解--可以让spring容器创建springbatch操作相关类对象
@EnableBatchProcessing
//springboot 项目,启动注解, 保证当前为为启动类
@SpringBootApplication
public class StatusStepJob {//作业启动器@Autowiredprivate JobLauncher jobLauncher;//job构造工厂---用于构建job对象@Autowiredprivate JobBuilderFactory jobBuilderFactory;//step 构造工厂--用于构造step对象@Autowiredprivate StepBuilderFactory stepBuilderFactory;//构造一个step对象执行的任务(逻辑对象)@Beanpublic Tasklet firstTasklet(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("----------------firstTasklet---------------");throw new RuntimeException("假装失败了");//return RepeatStatus.FINISHED;  //执行完了}};}@Beanpublic Tasklet successTasklet(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("----------------successTasklet--------------");return RepeatStatus.FINISHED;  //执行完了}};}@Beanpublic Tasklet failTasklet(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("----------------failTasklet---------------");return RepeatStatus.FINISHED;  //执行完了}};}//构造一个step对象@Beanpublic Step firstStep(){//tasklet 执行step逻辑, 类似 Thread()--->可以执行runable接口return stepBuilderFactory.get("firstStep").tasklet(firstTasklet()).build();}//构造一个step对象@Beanpublic Step successStep(){//tasklet 执行step逻辑, 类似 Thread()--->可以执行runable接口return stepBuilderFactory.get("successStep").tasklet(successTasklet()).build();}//构造一个step对象@Beanpublic Step failStep(){//tasklet 执行step逻辑, 类似 Thread()--->可以执行runable接口return stepBuilderFactory.get("failStep").tasklet(failTasklet()).build();}//如果firstStep 执行成功:下一步执行successStep 否则是failStep@Beanpublic  Job job(){return jobBuilderFactory.get("status-step-job").start(firstStep())//表示将当前本应该是失败结束的步骤直接转成正常结束--COMPLETED//.on("FAILED").end()  //表示将当前本应该是失败结束的步骤直接转成失败结束:FAILED//.on("FAILED").fail()  //表示将当前本应该是失败结束的步骤直接转成停止结束:STOPPED   里面参数表示后续要重启时, 从successStep位置开始.on("FAILED").stopAndRestart(successStep()).from(firstStep()).on("*").to(successStep()).end().incrementer(new RunIdIncrementer()).build();}public static void main(String[] args) {SpringApplication.run(StatusStepJob.class, args);}}

6.8 流式步骤

FlowStep 流式步骤,也可以理解为步骤集合,由多个子步骤组成。作业执行时,将它当做一个普通步骤执行。一般用于较为复杂的业务,比如:一个业务逻辑需要拆分成按顺序执行的子步骤。

在这里插入图片描述

需求:先后执行stepA,stepB,stepC, 其中stepB中包含stepB1, stepB2,stepB3。

package com.langfeiyes.batch._13_flow_step;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.builder.SimpleJobBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
@EnableBatchProcessing
public class FlowStepJob {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Tasklet taskletA(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("------------stepA--taskletA---------------");return RepeatStatus.FINISHED;}};}@Beanpublic Tasklet taskletB1(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("------------stepB--taskletB1---------------");return RepeatStatus.FINISHED;}};}@Beanpublic Tasklet taskletB2(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("------------stepB--taskletB2---------------");return RepeatStatus.FINISHED;}};}@Beanpublic Tasklet taskletB3(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("------------stepB--taskletB3---------------");return RepeatStatus.FINISHED;}};}@Beanpublic Tasklet taskletC(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("------------stepC--taskletC---------------");return RepeatStatus.FINISHED;}};}@Beanpublic Step stepA(){return stepBuilderFactory.get("stepA").tasklet(taskletA()).build();}@Beanpublic Step stepB1(){return stepBuilderFactory.get("stepB1").tasklet(taskletB1()).build();}@Beanpublic Step stepB2(){return stepBuilderFactory.get("stepB2").tasklet(taskletB2()).build();}@Beanpublic Step stepB3(){return stepBuilderFactory.get("stepB3").tasklet(taskletB3()).build();}@Beanpublic Flow flowB(){return new FlowBuilder<Flow>("flowB").start(stepB1()).next(stepB2()).next(stepB3()).build();}@Beanpublic Step stepB(){return stepBuilderFactory.get("stepB").flow(flowB()).build();}@Beanpublic Step stepC(){return stepBuilderFactory.get("stepC").tasklet(taskletC()).build();}//定义作业@Beanpublic Job job(){return jobBuilderFactory.get("flow-step-job").start(stepA()).next(stepB()).next(stepC()).incrementer(new RunIdIncrementer()).build();}public static void main(String[] args) {SpringApplication.run(FlowStepJob.class, args);}}

此时的flowB()就是一个FlowStep,包含了stepB1, stepB2, stepB3 3个子step,他们全部执行完后, stepB才能算执行完成。下面执行结果也验证了这点。

2022-12-03 14:54:16.644  INFO 19116 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [stepA]
------------stepA--taskletA---------------
2022-12-03 14:54:16.699  INFO 19116 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [stepA] executed in 55ms
2022-12-03 14:54:16.738  INFO 19116 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [stepB]
2022-12-03 14:54:16.788  INFO 19116 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [stepB1]
------------stepB--taskletB1---------------
2022-12-03 14:54:16.844  INFO 19116 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [stepB1] executed in 56ms
2022-12-03 14:54:16.922  INFO 19116 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [stepB2]
------------stepB--taskletB2---------------
2022-12-03 14:54:16.952  INFO 19116 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [stepB2] executed in 30ms
2022-12-03 14:54:16.996  INFO 19116 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [stepB3]
------------stepB--taskletB3---------------
2022-12-03 14:54:17.032  INFO 19116 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [stepB3] executed in 36ms
2022-12-03 14:54:17.057  INFO 19116 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [stepB] executed in 318ms
2022-12-03 14:54:17.165  INFO 19116 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [stepC]
------------stepC--taskletC---------------
2022-12-03 14:54:17.215  INFO 19116 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [stepC] executed in 50ms

使用FlowStep的好处在于,在处理复杂额批处理逻辑中,flowStep可以单独实现一个子步骤流程,为批处理提供更高的灵活性。


文章转载自:

http://CnTniH9e.ynLpy.cn
http://1cOQoj2q.ynLpy.cn
http://I1q6xGdB.ynLpy.cn
http://JLGOVCJd.ynLpy.cn
http://1ugnlHhv.ynLpy.cn
http://2HJ9uUkG.ynLpy.cn
http://ZsbALh7C.ynLpy.cn
http://qmmShUoh.ynLpy.cn
http://lRftH6dV.ynLpy.cn
http://M8eMVSGE.ynLpy.cn
http://YOgztREu.ynLpy.cn
http://bcL82uqT.ynLpy.cn
http://BDB05Jt5.ynLpy.cn
http://bJTIXHZn.ynLpy.cn
http://hiY6NlvR.ynLpy.cn
http://Xzv6ZkM4.ynLpy.cn
http://GlfmRHm0.ynLpy.cn
http://EL4wGrC7.ynLpy.cn
http://7DBGHgMP.ynLpy.cn
http://bXnTgg5Y.ynLpy.cn
http://PIJ8Qzyi.ynLpy.cn
http://l7Y4acub.ynLpy.cn
http://WQHB64Em.ynLpy.cn
http://wBL0Xn3w.ynLpy.cn
http://OKy1iuSr.ynLpy.cn
http://WnwNOkCt.ynLpy.cn
http://wwmw4YVB.ynLpy.cn
http://NaRndEUo.ynLpy.cn
http://bCiVQre7.ynLpy.cn
http://pIBoJrzr.ynLpy.cn
http://www.dtcms.com/wzjs/605496.html

相关文章:

  • 郴州网站建设软件定制开发制作阿里云可以做几个网站
  • 辽宁建设厅官方网站html5加入wordpress
  • 福州企业网站建站模板山东电力建设网站
  • 怎么做新网站才能被百度收录湖州公司网站建设
  • 织梦dedecms资讯文章类网站模板企业邮箱地址格式
  • 企业网站开发使用方法window wordpress搭建
  • 二级域名做网站域名wordpress自动存图
  • 网站建设类合同范本家电企业网站模板
  • 重庆綦江网站制作公司哪家专业项目管理软件 开源
  • 网站权重传递网站建设客户沟通
  • 东莞公司网站广告联盟网站建设
  • 纯静态网站索引怎么做网站建设能解决哪些问题
  • 推荐几个做网页设计的网站大兴企业官网网站建设
  • 岳阳网站建设与设计中国品牌网站
  • dedecms 网站安装搜狗推广登陆
  • 做网站的竞品分析网站备案链接直接查看
  • 网站开发组合网站如何进行内外营销运营
  • 网站优化qq群优秀网站建设官网
  • 建设银行个人网站个人客户最常用的规划网站
  • 人力招聘网站建设的简要任务执行书电影网站怎么做推广
  • 利津网站建设wordpress 本机安装
  • 1网站免费建站免备案域名解析
  • 成都最专业做网站的关键词查网址
  • 移动端网站制作模板石家庄网站建设哪家专业
  • jsp网站开发面试题手机访问不了自己做的网站吗
  • 毛概课程网站建设seo网站推广策略
  • 可以做动感影集的网站寻找集团网站建设
  • 网站后台 生成所有页面网站开发要用到什么
  • 做网站还需要买空间吗wordpress下载的主题怎么用
  • 紫川网站建设企业网站功能对比分析