当前位置: 首页 > news >正文

Java定时任务1_定时任务实现方式以及原理

背景

在 Java 应用开发中,定时任务是一种常见的需求,它可以帮助我们自动化执行周期性的工作,如试卷拉取、检测超时任务退回、数据备份、定时通知等。Java 提供了多种方式来实现定时任务,如使用 Timer、ScheduledExecutorService 以及 Spring 框架中的 @Scheduled 注解等,或者更重一些的xxl-job等

实现一个任务调度引擎系统 - 你樊不樊 - 博客园

定时任务原理方案综述 | 京东云技术团队_定时任务_京东科技开发者_InfoQ写作社区

Timer和TimerTask原理

Java定时器之Timer原理解析

Java在1.3版本引入了Timer工具类,它是一个古老的定时器,搭配TimerTask和TaskQueue一起使用。从Java5开始在并发包中引入了另一个定时器ScheduledThreadPoolExecutor,它对Timer做了很多改进并提供了更多的工具,可以认为是对Timer的取代。可选用的定时任务执行的方式

  • 一次性任务
  • 固定延迟执行(Fixed Delay):固定结束等待延时时间,当前任务执行一定是上个任务执行完 + delayTime。
  • 固定周期执行(Fixed Rate):固定频率

注意:chedule的间隔是相对于上一次任务结束的时间,任务在每次执行完成之后,等待固定的时间延迟再执行下一次任务。例如,如果你希望每次执行任务后等待 5 秒,这种方式更合适。而scheduleAtFixedRate是相对于上一次任务开始的时间,任务以固定的频率执行。无论任务本身需要多长时间,总是试图在精确的时间点上执行新任务。例如,每隔 5 秒执行一次,即使上一次任务还没有完全结束,也会尽量启动下一次执行。这在需要确保某个行为的严格周期性时非常有用。

Timer和TimerTask实现主要是依赖两部分

  • TaskQueue:按照时间顺序的优先队列,内部使用平衡二叉堆来存储任务,时间是每个定时任务下一次的执行时间的时间戳
  • TimerThread:内部无限循环的单线程,负责找到下一个该执行的定时任务执行然后更新任务下次执行时间。

注意的点

  • Timer中的TimerThread是单线程模式,因此导致所有定时任务不能同时执行,可能会出现延迟
  • TimerThread中并没有处理好任务的异常,因此每个TimerTask的实现必须自己try catch防止异常抛出,导致Timer整体失效。

ScheduledExecutorService原理

三分钟弄清楚定时任务ScheduledExecutorService实现

ScheduledExecutorService 是 Java 5 中引入的 java.util.concurrent 包的一部分,用于替代传统的基于线程的任务调度方法,例如 Timer 和 TimerTask。它提供了线程池机制,专门用于执行延迟或定时执行的任务。通过这种方式,我们可以避免单线程调度中遇到的各种问题,例如线程意外终止、无故延迟等。

内部是一个带有阻塞队列DelayWordQueue的线程池,该延时间队列的offer方法也简单,就是把任务加到queue数组中,如果数组为null则直接加进去,并唤醒take哪里的线程,如果不为null则比较数组中任务的time,越近的越在前面,如果新加进来的任务被设置到了数组的第一个,则唤醒take那里阻塞的线程。

线程任务为ScheduledFutureTask类型,它继承了FutureTask,扩展了一些定时任务需要的属性,比如下次执行时间、每次任务执行间隔,在任务完成后,调用setNextRunTime();reExecutePeriodic(outerTask);两步方法,第一步是计算下次任务执行时间,第二个是把任务放到队列中;

注意的点

  • 线程池管理:线程池可以提供更多的线程,减少线程创建销毁的开销。
  • 更好的错误恢复:线程之间发生异常不影响。
  • 任务阻塞:如果使用单线程 or 线程池线程被任务占满,其他本该执行的任务可能被阻塞影响执行,尽量使用非阻塞任务,避免在定时任务中执行阻塞操作,例如网络请求或数据库查询。可以将这些操作拆分为更小的单元,或者使用异步方式处理。

Spring定时任务@Scheduled原理

通过源码理解Spring中@Scheduled的实现原理并且实现调度任务动态装载 - throwable - 博客园

相当于Spring框架又在ScheduledExecutorService上面包了一层,加了一些任务调度器、任务Task等封装概念。

在Spring中使用定时任务时通过@EnableScheduling注解引入了SchedulingConfiguration配置类,该配置类注册了ScheduledAnnotationBeanPostProcessor的bean,类型为BeanDefinition.ROLE_INFRASTRUCTURE,该bean负责组装任务调度器、注册Task任务以及执行Task任务。

  • 该bean在初始化之后,会执行postProcessAfterInitialization方法,会扫描加了@Scheduled的类、方法,然后解析封装为不同的Task实例,缓存到ScheduledTaskRegistrar注册器中。
  • 该bean在所有单例初始化之后,会执行afterSingletonsInstantiated()方法,在所有单例初始化完成之后回调触发,在此方法中设置了ScheduledTaskRegistrar中的任务调度器(TaskScheduler或者ScheduledExecutorService类型)实例
  • 注册器ScheduledTaskRegistrar#afterPropertiesSet()方法添加所有缓存的Task实例到任务调度器中,后后续由任务调度器定时任务执行。

任务调度器

调度器一般是封装了一些schdule、scheduleAtFixedRate、scheduleWithFixedDelay等方法的接口

Scheduling模块支持的调度器接口

  • TaskScheduler类型
  • ScheduledExecutorService类型

而ScheduledExecutorService其实是JDK并发包java.util.concurrent的接口,后面经过适配器模式会被转为ConcurrentTaskScheduler,因此只需要分析TaskScheduler类型的执行器

  • ThreadPoolTaskScheduler:基于线程池实现的任务执行器,这个是最常用的实现,底层依赖于ScheduledThreadPoolExecutor实现。
  • ConcurrentTaskScheduler:TaskScheduler接口和ScheduledExecutorService接口的适配器,如果自定义一个ScheduledThreadPoolExecutor类型的Bean,那么任务执行器就会适配为ConcurrentTaskScheduler。
  • DefaultManagedTaskScheduler:JDK7引入的JSR-236的支持,可以通过JNDI配置此调度执行器,一般很少用到,底层也是依赖于ScheduledThreadPoolExecutor实现。

三种任务调度器底层都依赖ScheduledThreadPoolExecutor线程池。

定时任务

主要分为三种定时任务类型,可以通过@Scheduled注解的属性指定

  • CronTask:支持定义Corn表达式的定时任务,不支持延迟执行
  • FixedDelayTask:距离上次执行结果延迟执行
  • FixedRateTask:固定频率执行

解析和缓存

@Scheduled注解的解析缓存集中在postProcessAfterInitialization()方法,然后调用ScheduledMethodRunnable#processScheduled处理标注了@Scheduled注解的方法,这个方法十分长,不过逻辑并不复杂,它只做了四件事

  1. 解析@Scheduled中的initialDelay、initialDelayString属性,适用于FixedDelayTask或者FixedRateTask的延迟执行
  2. 优先解析@Scheduled中的cron属性,封装为CronTask,通过ScheduledTaskRegistrar进行缓存
  3. 解析@Scheduled中的fixedDelay、fixedDelayString属性,封装为FixedDelayTask,通过ScheduledTaskRegistrar进行缓存
  4. 解析@Scheduled中的fixedRate、fixedRateString属性,封装为FixedRateTask,通过ScheduledTaskRegistrar进行缓存

使用@Scheduled注解注解的方法会被解析成为ScheduledTask类型,解析成功的所有任务实例存放在ScheduledAnnotationBeanPostProcessor的一个映射scheduledTasks中:

// 宿主Bean实例 -> 解析完成的任务实例Set
private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);

另外就是在ScheduledMethodRunnable#run方法中利用反射调用的加了@Scheduled注解的方法。

注册激活

在ScheduledAnnotationBeanPostProcessor见互斥方法afterSingletonsInstantiated()和onApplicationEvent(),两者中一定只有一个方法能够调用finishRegistration(),然后finishRegistration方法又调用ScheduledTaskRegistrar#afterPropertiesSet完成任务调度器的激活,具体的就是scheduleTasks方法,

  • 将三种类型的定时任务备份到Set中
  • 将三种类型的定时任务调用this.taskScheduler.scheduleXxx方法激活,最后任务调度器TaskScheduler负责调度执行定时任务,底层使用的是ScheduledExecutorService等相关。

拓展调度任务动态装载

如果想从外部的数据源像MySQL、JSON文件等加载定义的定时任务,然后再注册给Spring的定时任务调度器,不想使用已经支持的XML配置加载,可以借助SchedulingConfigurer可以拓展的钩子接口,在定时任务注册激活之前,依靠特定方法拿到ScheduledTaskRegistrar实例,然后注册自己从JSON、MySQL中加载定义的定时任务实现动态装载。

在抽象类AbstractSchedulingConfigurer中实现了SchedulingConfigurer接口,然后暴露了一个List loadTaskProperties()方法,可以创建外部资源类继承这个抽象类,然后重写在该方法中将外部定义的定时任务JSON配置加载为ScheduleTaskProperties,然后构造返回List,这样就可以被夹在了。

xxl-job原理

8000字 + 25图探秘Xxl-Job核心架构原理-腾讯云开发者社区-腾讯云

分布式任务调度平台XXL-JOB

轻量级分布式任务调度平台(XXL-JOB介绍、原理、工作流程、XXL-JOB环境搭建集成springboot) - 你樊不樊 - 博客园

2.1.0版本前核心调度模块都是基于quartz框架,2.1.0版本开始自研调度组件,移除quartz依赖 ,使用时间轮调度。(RPC的底层变化, 2.0.1 使用的是Jetty服务的RPC, 2.0.2 使用的Nettty服务的RPC)

设计思想:任务和触发执行逻辑分离,这样更加灵活,不同的任务可以被不同的触发逻辑执行。

特点

  • 调度中心依赖数据库,任务持久化在数据库。
  • 调度中心支持集群,集群之间通过数据库进行数据共享。

执行器注册

当前启动的服务实例可以被当作是一个执行器,服务配置XxlJob的方式是配置一个XxlJobSpringExecutor类型的bean放入spring容器,配置了一些host、port、logPath等信息。

执行器中可能有很多加了@XxlJob注解的方法,解析的时机在相关执行器注册类XxlJobSpringExecutor中,实现了Spring的SmartInitializingSingleton接口,在Bean初始化过程中,会调用afterSingletonsInstantiated方法,该方法是xxl-job执行器初始化入口,会进行一系列初始化。

public void afterSingletonsInstantiated() {
    // 1、初始化JobHandler
    this.initJobHandlerMethodRepository(applicationContext);

    // 2、刷新注册GlueJobHandler、ScriptJobHandler
    GlueFactory.refreshInstance(1);

    try {
        // 3、启动http服务
        super.start();
    } catch (Exception var2) {
        throw new RuntimeException(var2);
    }
}

JobHandler是一个定时任务的抽象,对应不同的定义方式有

  • MethodJobHandler:定时任务的抽象,利用反射调用加了@XxlJob注解的方法。
  • GlueJobHandler
  • ScriptJobHandler

拿着初始化的MethodJobHandler解析完成之后(其他两种都是在admin页面通过IDE创建才会被解析),会放入执行器XxlJobSpringExecutor的一个map缓存,key为XxlJob注解的value属性,value就为对应的JobHandler(也就是对应定时任务方法所在的bean)

执行器创建HTTP服务器

执行器在注册JobHandler同时,还会启动一个基于Netty的HTTP服务,用于和调度中心通信。

具体的逻辑在父类的XxlJobExecutor,启动HTTP服务相关代码在EmbedServer类

public void start() throws Exception {
    XxlJobFileAppender.initLogPath(this.logPath);
    this.initAdminBizList(this.adminAddresses, this.accessToken);
    JobLogFileCleanThread.getInstance().start((long)this.logRetentionDays);
    TriggerCallbackThread.getInstance().start();
    // 初始化HTTP服务器并启动
    this.initEmbedServer(this.address, this.ip, this.port, this.appname, this.accessToken);
}

基于Netty的HTTP服务器从配置、启动是由一个守护线程完成的

// 一、netty服务器配置

// 创建事件循环组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

// 配置服务器
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel channel) {
            channel.pipeline()
                // 空闲检测
                .addLast(new IdleStateHandler(0, 0, 90, TimeUnit.SECONDS))
                // HTTP 编解码
                .addLast(new HttpServerCodec())
                // HTTP 消息聚合
                .addLast(new HttpObjectAggregator(5242880))
                // 业务处理器
                .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
        }
    })
    .childOption(ChannelOption.SO_KEEPALIVE, true);


// 二、启动
try {
    // 启动服务器
    ChannelFuture future = bootstrap.bind(port).sync();
    // 注册服务
    startRegistry(appname, address);
    // 等待服务器关闭
    future.channel().closeFuture().sync();
} catch (InterruptedException e) {
    // 异常处理
} finally {
    // 优雅关闭
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
}

另外就是注册一个线程池bizThreadPool、ExecutorBiz用于处理来自调度中心的HTTP请求,通过EmbedHttpServerHandler将executorBiz动作抽象传给HTTP服务器,然后将bizThreadPool作为处理各种任务的线程池。

ExecutorBiz他有两个实现类,一个是执行器接受HTTP请求的ExecutorBizImpl,一个是调度中心发送HTTP请求的ExecutorBizClient。

public interface ExecutorBiz {
    ReturnT<String> beat();

    ReturnT<String> idleBeat(IdleBeatParam var1);

    ReturnT<String> run(TriggerParam var1);

    ReturnT<String> kill(KillParam var1);

    ReturnT<LogResult> log(LogParam var1);
}

执行器注册到调度中心

执行器启动的时候除了开启一个守护线程启动HTTP服务器,开启一个守护线程将执行器信息注册到调度中心,相关类是ExecutorRegistryThread,会将执行器的appname、host、port等信息注册到调度中心。

一个服务实例可以看作是一个执行器,多服务实例就是多执行器,调度中心都会保存每个执行的信息,便于后续HTTP通信。

任务调度以及触发

在admin调度中心服务启动的时候,会加载XxlJobAdminConfig类的afterPropertiesSet方法,在此方法内部执行init方法

// XxlJobAdminConfig#afterPropertiesSet
private XxlJobScheduler xxlJobScheduler;

@Override
public void afterPropertiesSet() throws Exception {
    adminConfig = this;

    xxlJobScheduler = new XxlJobScheduler();
    xxlJobScheduler.init();
}

init方法内部会初始化各种组件,关于任务触发的主要是 JobTriggerPoolHelper、JobScheduleHelper类

// XxlJobScheduler#init()
public void init() throws Exception {
    // init i18n
    initI18n();

    // admin trigger pool start
    JobTriggerPoolHelper.toStart();

    // admin registry monitor run
    JobRegistryHelper.getInstance().start();

    // admin fail-monitor run
    JobFailMonitorHelper.getInstance().start();

    // admin lose-monitor run ( depend on JobTriggerPoolHelper )
    JobCompleteHelper.getInstance().start();

    // admin log report start
    JobLogReportHelper.getInstance().start();

    // start-schedule  ( depend on JobTriggerPoolHelper )
    JobScheduleHelper.getInstance().start();

    logger.info(">>>>>>>>> init xxl-job admin success.");
}
  • JobScheduleHelper 类是一个用于任务调度的辅助类,它负责扫描任务,计算任务的触发时间,并将任务添加到时间环中,以便在合适的时间触发任务。启动了两个线程scheduleThread 和 ringThread
    • scheduleThread 线程负责扫描任务,计算任务的触发时间,并将任务添加到时间环中,务扫描和处理 ,scheduleThread 线程会定期扫描任务,根据任务的触发时间和当前时间的关系,进行不同的处理:
      • 如果任务触发时间已经过期超过 5 秒,则根据任务的错过策略进行处理,并更新任务的下次触发时间。
      • 如果任务触发时间已经过期但不超过 5 秒,则直接触发任务,并更新任务的下次触发时间。
      • 如果任务触发时间在未来,则将任务添加到时间环中,并更新任务的下次触发时间。
    • ringThread 线程负责从时间环中取出任务,调用JobTriggerPoolHelper#trigger(jobId)并触发任务。
  • JobTriggerPoolHelper 类是一个用于管理任务触发线程池的辅助类,它提供了快速和慢速两个线程池,根据任务的执行情况(是否超时)来选择合适的线程池执行任务触发操作。快速线程池用于执行正常的任务触发,慢速线程池用于执行那些在一分钟内超时次数超过 10 次的任务(单指向执行器发送HTTP以及收到响应的时间,因为执行器执行任务是异步的)。

任务调度

调度任务如何保证不被重复调度?

调度(更新数据库、放入时间轮)之前,使用一张表的 for 实现了分布式锁,这样在多个调度中心实例的时候,也能保证只有一个调度中心在调度,调度完成之后,关闭连接从而释放锁。

scheduleThread线程主要做任务调度,主要逻辑

  • 线程刚开启的时候休眠,保证线程在整点秒开始执行任务
  • 快速预估下可调度的任务数量,将快速线程池和慢速线程池的最大线程数量相加,得到总的最大线程数量 * 20,每个任务触发大概需要50ms,1秒大概可以触发20个任务。
  • 使用数据表xxl_job_lock实现一个写锁"select * from xxl_job_lock where lock_name = ‘schedule_lock’ for update",避免多个调度器同时执行调度任务,保证调度任务的唯一性,另外就是时间轮是一个安全的ConcurrentHashMap
  • 查询xxl_job_info表中trigger_status=1的任务,并且trigger_next_time小于等于当前时间 + 5s的任务。

// schedule thread
scheduleThread = new Thread(new Runnable() {
    @Override
    public void run() {

        // 1、线程刚开启的时候休眠,保证线程在整点秒开始执行任务
        try {
            TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
        } catch (Throwable e) {
            if (!scheduleThreadToStop) {
                logger.error(e.getMessage(), e);
            }
        }

        // 2、快速预估下可调度的任务数量
        // 将快速线程池和慢速线程池的最大线程数量相加,得到总的最大线程数量 * 20,每个任务触发大概需要50ms,1秒大概可以触发20个任务。
        // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
        int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
        while (!scheduleThreadToStop) {
            // Scan Job
            long start = System.currentTimeMillis();
            Connection conn = null;
            Boolean connAutoCommit = null;
            PreparedStatement preparedStatement = null;
            boolean preReadSuc = true;
            try {
                conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                connAutoCommit = conn.getAutoCommit();
                conn.setAutoCommit(false);
                // 3、使用数据表xxl_job_lock实现一个表锁,避免多个调度器同时执行任务,保证任务的唯一性。
                preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                preparedStatement.execute();

                long nowTime = System.currentTimeMillis();
                // 4、查询xxl_job_info表中trigger_status=1的任务,并且trigger_next_time小于等于当前时间 + 5s的任务。
                List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);

                // 省略,后面说
                // tx stop
            } catch (Throwable e) {
                
            } finally {
                // 省略代码
                // 关闭资源

            }

        }

        logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
    }
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();

将任务调度进时间轮

逐个处理任务,根据任务的下一次触发时间,将任务推送到不同的时间轮上。

  • **任务延迟的时间 > 5s:**如果任务的下一次触发时间 大于当前时间 + 5s,根据admin面板设置的调度策略决定放弃本次调度还是立即调度,并且更新任务的下一次触发时间。
  • **任务延迟的时间 <= 5s:**如果任务的下一次触发时间 小于等于当前时间 + 5s,直接调度,然后更新任务的下一次触发时间,然后在加入时间轮,等待触发,加入时间轮的计算方法
    • 取当前任务下一次触发时间秒数的模 60,得到一个 0 到 59 之间的整数,表示任务应该被放入时间轮的哪个秒槽中。
    • 以key为秒槽值,value为当前任务的jobId放入时间轮,是一个ConcurrentHashMap。
    • 然后刷新下一次调度时间。
  • **任务还没延迟:**如果任务的下一次触发时间 小于等于 当前时间,将任务推送到时间轮上,并且更新任务的下一次触发时间。
if (scheduleList!=null && scheduleList.size()>0) {
    for (XxlJobInfo jobInfo: scheduleList) {
        // 5、逐个处理任务,根据任务的下一次触发时间,将任务推送到不同的时间轮上。
        // 5.1、如果任务的下一次触发时间 小于等于当前时间 + 5s,立即触发,将任务推送到时间轮上,并且更新任务的下一次触发时间。
        // 5.2、如果任务的下一次触发时间 大于当前时间 + 5s,根据策略决定是否触发,将任务推送到时间轮上,并且不更新任务的下一次触发时间。
        // 5.3、如果任务的下一次触发时间 小于等于 当前时间,将任务推送到时间轮上,并且更新任务的下一次触发时间。
        // time-ring jump
        if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
            // 获取调度策略
            MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
            if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                // FIRE_ONCE_NOW 》 trigger
                JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
            }
            // 2、fresh next
            refreshNextValidTime(jobInfo, new Date());

        } else if (nowTime > jobInfo.getTriggerNextTime()) {
            // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
            // 1、trigger
            JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
            // 2、fresh next
            refreshNextValidTime(jobInfo, new Date());
            // next-trigger-time in 5s, pre-read again
            if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
                // 1、make ring second
                int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
                // 2、push time ring
                pushTimeRing(ringSecond, jobInfo.getId());
                // 3、fresh next
                refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
            }

        } else {
            // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
            // 1、make ring second
            int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
            // 2、push time ring
            pushTimeRing(ringSecond, jobInfo.getId());
            // 3、fresh next
            refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
        }
    }

    // 3、update trigger info
    for (XxlJobInfo jobInfo: scheduleList) {
        XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
    }

} else {
    preReadSuc = false;
}

具体调度进时间轮的代码


private void pushTimeRing(int ringSecond, int jobId){
    // push async ring
    List<Integer> ringItemData = ringData.get(ringSecond);
    if (ringItemData == null) {
        ringItemData = new ArrayList<Integer>();
        ringData.put(ringSecond, ringItemData);
    }
    // 哈希链
    ringItemData.add(jobId);

    logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
}

将时间轮任务触发

ringThread 线程每秒执行一次,从时间轮中获取当前秒和上一秒需要执行的任务 ID 列表,然后通过 JobTriggerPoolHelper 触发这些任务,进而在XxlJobTrigger中被时机触发。

需要注意的是为避免处理耗时过长跨越刻度,向前校验一个刻度,也就是每次检查当前秒、上一秒需要触发的任务,避免检查 & 触发的时候长了漏掉了某一秒应该检查的触发任务

ringThread = new Thread(new Runnable() {
    @Override
    public void run() {
        // 只要 ringThreadToStop 为 false,线程就会持续运行
        while (!ringThreadToStop) {
            // 让线程休眠到下一秒的开始,确保操作按秒对齐
            try {
                TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
            } catch (Throwable e) {
                // 如果线程未被要求停止,记录错误日志
                if (!ringThreadToStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            try {
                // 用于存储当前秒和上一秒需要执行的任务 ID
                List<Integer> ringItemData = new ArrayList<>();
                // 获取当前秒数
                int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
                // 为避免处理耗时过长跨越刻度,向前校验一个刻度
                for (int i = 0; i < 2; i++) {
                    // 从时间轮中移除当前秒和上一秒对应的任务 ID 列表
                    // (nowSecond + 60 - i) % 60 是为了计算当前秒和上一秒对应的时间轮索引。 + 60 是为了避免出现负数, % 60 是确保索引值在 0 到 59 之间。当 i 为 0 时,得到的是当前秒的索引;当 i 为 1 时,得到的是上一秒的索引。
                    List<Integer> tmpData = ringData.remove((nowSecond + 60 - i) % 60);
                    if (tmpData != null) {
                        // 将任务 ID 列表添加到 ringItemData 中
                        ringItemData.addAll(tmpData);
                    }
                }
                // 记录当前秒和对应的任务 ID 列表
                logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData));
                // 如果有任务需要执行
                if (ringItemData.size() > 0) {
                    // 遍历任务 ID 列表,触发每个任务
                    for (int jobId : ringItemData) {
                        JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                    }
                    // 清空任务 ID 列表
                    ringItemData.clear();
                }
            } catch (Throwable e) {
                // 如果线程未被要求停止,记录错误日志
                if (!ringThreadToStop) {
                    logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                }
            }
        }
        // 线程停止时记录日志
        logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
    }
});
// 设置线程为守护线程
ringThread.setDaemon(true);
// 设置线程名称
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
// 启动线程
ringThread.start();

任务触发的逻辑被委托在JobTriggerPoolHelper#trigger()方法,具体的逻辑

  1. 根据任务历史在当前的一分钟内触发超时情况(单指调度器触发这个动作时间,不包括执行器的执行时间)选择fastTriggerPool、slowTriggerPool线程池
  2. 根据当前分钟数是否变化清空超时计数map
  3. 判断本次触发的任务是否超时500ms,超时的话更新计数map
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
    helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}

// 委托
/**
 * add trigger
 */
public void addTrigger(final int jobId,
                       final TriggerTypeEnum triggerType,
                       final int failRetryCount,
                       final String executorShardingParam,
                       final String executorParam,
                       final String addressList) {

    // choose thread pool
    ThreadPoolExecutor triggerPool_ = fastTriggerPool;
    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
    if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
        triggerPool_ = slowTriggerPool;
    }

    // trigger
    triggerPool_.execute(new Runnable() {
        @Override
        public void run() {

            long start = System.currentTimeMillis();

            try {
                // do trigger
                XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
            } catch (Throwable e) {
                logger.error(e.getMessage(), e);
            } finally {

                // check timeout-count-map
                long minTim_now = System.currentTimeMillis()/60000;
                if (minTim != minTim_now) {
                    minTim = minTim_now;
                    jobTimeoutCountMap.clear();
                }

                // incr timeout-count-map
                long cost = System.currentTimeMillis()-start;
                if (cost > 500) {       // ob-timeout threshold 500ms
                    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                    if (timeoutCount != null) {
                        timeoutCount.incrementAndGet();
                    }
                }

            }

        }
        @Override
        public String toString() {
            return "Job Runnable, jobId:"+jobId;
        }
    });
}

真是的任务触发在XxlJobTrigger.trigger()方法,主要的逻辑

  • 加载任务信息和任务组信息 :通过 jobId 从数据库中加载 XxlJobInfo 任务信息,如果任务信息为空,则记录警告日志并返回。如果传入的 executorParam 不为空,则更新任务的执行参数。确定最终的失败重试次数 finalFailRetryCount 。加载任务所属的 XxlJobGroup 任务组信息。
  • 覆盖执行器地址列表 :如果传入的 addressList 不为空且长度大于 0,则更新任务组的地址类型和地址列表。
  • 处理分片参数 :如果传入的 executorShardingParam 不为空,则尝试解析分片参数。如果解析成功,则将其转换为整数数组 shardingParam 。
  • 根据路由策略和分片参数执行任务触发 :如果路由策略为 SHARDING_BROADCAST ,且任务组的注册列表不为空,且没有分片参数,则对任务组的每个注册地址调用 processTrigger 方法。否则,设置默认的分片参数 [0, 1] ,并调用 processTrigger 方法。

然后进入processTrigger方法

  1. 初始化参数 :
    • 确定任务的阻塞策略 blockStrategy 和路由策略 executorRouteStrategyEnum 。
    • 根据路由策略生成分片参数 shardingParam 。
  2. 保存日志 ID :
    • 创建一个 XxlJobLog 对象,设置任务组、任务 ID 和触发时间,并保存到数据库中。
  3. 初始化触发参数 :
    • 创建一个 TriggerParam 对象,设置任务 ID、执行器处理程序、执行参数等信息。
  4. 初始化执行器地址 :
    • 根据路由策略和任务组的注册列表确定执行器的地址。
    • 如果路由策略为 SHARDING_BROADCAST ,则根据分片索引选择地址;否则,使用路由策略的路由器选择地址。
  5. 触发远程执行器 :
    • 如果执行器地址不为空,则调用 runExecutor 方法触发远程执行器;否则,返回失败结果。
  6. 收集触发信息 :
    • 收集触发类型、管理地址、执行器注册类型、路由策略、阻塞策略、超时时间、失败重试次数等信息,并生成触发信息字符串。
  7. 保存日志触发信息 :
    • 更新 XxlJobLog 对象的执行器地址、执行器处理程序、执行参数等信息,并将触发结果和触发信息保存到数据库中。

最终就是在第5步发送HTTP请求给各个执行器实例,就来到了executorBiz,实际调用的就是executorBizClient的run方法发送触发请求。

public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }

    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());

    runResult.setMsg(runResultSB.toString());
    return runResult;
}


// TriggerParam 实体参数
private int jobId;

private String executorHandler;
private String executorParams;
private String executorBlockStrategy;
private int executorTimeout;

private long logId;
private long logDateTime;

private String glueType;
private String glueSource;
private long glueUpdatetime;

private int broadcastIndex;
private int broadcastTotal;

触发成功之后,会在xxl_job_logs更新日志,执行器执行任务也是异步的,执行过程结果可以查询xxl_job_logs表以及在代码中使用回调API向调度中心发送执行结果。

@Component
public class MyJobHandler {

    @XxlJob("myJobHandler")
    public void execute() {
        try {
            // 任务执行逻辑
            XxlJobHelper.log("任务开始执行");
            // 模拟任务执行
            Thread.sleep(5000);
            XxlJobHelper.log("任务执行完成");

            // 设置执行结果
            XxlJobHelper.handleSuccess("任务执行成功");
        } catch (Exception e) {
            XxlJobHelper.log("任务执行失败: " + e.getMessage());
            XxlJobHelper.handleFail("任务执行失败");
        }
    }
}
为什么调度之后触发使用(快、慢)线程池异步的来做呢?
  • 触发任务需要发送HTTP请求,过程比较耗费时间,如果调度线程来做,会影响调度线程。
  • 另外一个优化就是使用快、慢线程池,使用快慢线程池就是避免那种频繁触发并且每次触发时间还很长的任务阻塞其它任务的触发的情况发生
如何选择触发的执行器实例?

除了分片广播,其余的具体的算法实现都是通过ExecutorRouter的实现类来实现的

  • ExecutorRouteFirst :选择执行器列表中的第一个执行器地址。
  • ExecutorRouteLast :选择执行器列表中的最后一个执行器地址。
  • ExecutorRouteRound :通过轮询的方式选择执行器地址。
  • ExecutorRouteRandom :随机选择执行器地址。
  • ExecutorRouteLFU :选择使用频率最低的执行器地址。
  • ExecutorRouteLRU :选择最近最久未使用的执行器地址。
  • ExecutorRouteFailover :通过心跳检测,选择第一个可用的执行器地址。
  • ExecutorRouteBusyover :通过空闲检测,选择第一个空闲的执行器地址。
  • ExecutorRouteConsistentHash:通过一致性Hash算法 一致性哈希介绍
  • 分片广播:XxlJob给每个执行器分配一个编号,从0开始递增,然后向所有执行器触发任务,告诉每个执行器自己的编号和总共执行器的数据,加快执行效率。我们可以通过XxlJobHelper#getShardIndex获取到编号,XxlJobHelper#getShardTotal获取到执行器的总数据量。举个例子,比如你现在需要处理30w条数据,有3个执行器,此时使用分片广播,那么此时可将任务分成3分,每份10w条数据,执行器根据自己的编号选择对应的那份10w数据处理

执行器执行任务

任务执行

当执行器接收到调度中心的触发HTTP请求时,会把请求交给ExecutorBiz的来处理,具体的实现类是ExecutorBizImpl的run方法,主要的逻辑

  1. 检查拿到之前JobThread、JobHandler类,在XxlJobExecutor类中有个map按照key=jobId缓存着JobThread。
  2. 判断执行器方法的类型glueType,针对不同的任务类型,执行不同的逻辑:
  • BEAN 类型 :
    • 加载新的任务处理器。
    • 如果旧的任务线程存在且其处理器与新处理器不同,则标记需要移除旧线程。
    • 如果任务处理器为 null ,则使用新处理器;若新处理器也为 null ,则返回失败结果。
  • GLUE_GROOVY 类型 :
    • 检查旧的任务线程是否需要更新,如果需要则标记移除。
    • 如果任务处理器为 null ,则通过 GlueFactory 加载新的实例并创建 GlueJobHandler 。
  • 脚本类型 :
    • 检查旧的任务线程是否需要更新,如果需要则标记移除。
    • 如果任务处理器为 null ,则创建 ScriptJobHandler 。
  • 无效类型 :如果任务类型无效,则返回失败结果。
  1. 根据任务执行策略来选择丢弃、替换等当前执行,如果任务线程存在,则根据 triggerParam 中的 executorBlockStrategy 匹配阻塞策略。针对不同的阻塞策略,执行不同的逻辑:
  • DISCARD_LATER :如果任务线程正在运行或有触发队列,则丢弃该任务并返回失败结果。
  • COVER_EARLY :如果任务线程正在运行或有触发队列,则标记移除旧线程并将任务线程置为 null 。
  • 其他策略 :直接将任务触发信息推送到队列中。
  1. 将待执行的任务参数加入队列

最终在JobThread中,通过JobHandler处理执行任务,具体的执行逻辑

  1. 初始化任务处理器,调用任务处理器 handler 的 init 方法进行初始化操作。
  2. 任务循环处理:
  • 循环条件 : while(!toStop) 表示只要 toStop 标志为 false ,线程就会持续运行。
  • 任务获取 :使用 triggerQueue.poll(3L, TimeUnit.SECONDS) 从任务队列中获取任务,最多等待 3 秒。避免线程长时间阻塞在队列获取操作上。
  • 任务执行 :
    • 如果获取到任务,将 running 标志置为 true ,并初始化任务上下文。
    • 如果任务设置了执行超时时间,使用 FutureTask 启动一个新线程执行任务,并在指定时间内等待结果。如果超时,记录超时日志并处理超时情况。使用 FutureTask 和 get 方法设置任务执行的超时时间,避免任务长时间占用资源,提高系统的稳定性和响应性能。
    • 如果未设置超时时间,直接执行任务。
  • 结果验证 :验证任务执行结果的 handleCode ,如果小于等于 0 则处理失败情况;如果结果消息过长,进行截断处理。
  • 空闲处理 :如果连续 30 次未获取到任务且任务队列为空,调用 XxlJobExecutor.removeJobThread 方法移除当前任务线程。当线程连续空闲超过一定次数且任务队列为空时,移除当前任务线程,释放系统资源。
  • 异常处理 :捕获任务执行过程中的异常,记录异常信息并处理失败情况。
  • 结果回调 :无论任务执行成功还是失败,都通过 TriggerCallbackThread.pushCallBack 方法将结果回调给调度中心。
  1. 处理队列中剩余的任务:当 toStop 标志为 true 时,线程停止运行前,处理任务队列中剩余的任务,并将其标记为失败,回调给调度中心。
  2. 销毁任务处理器:调用任务处理器 handler 的 destroy 方法进行销毁操作。如果销毁过程中出现异常,记录错误日志。
为什么需要更换任务处理器以及线程呢?

加载新的任务处理器并更换线程主要是为了确保任务能够根据最新的配置和代码逻辑正确执行,同时保证任务的灵活性和可维护性。

举个例子任务类型变更 :任务的类型可能会发生变化,例如从 BEAN 类型变为 GLUE_GROOVY 类型,或者脚本类型发生更新。

更换线程:旧线程状态不一致 :当任务的处理器或代码发生变化时,旧的线程可能仍然在使用旧的处理器或代码逻辑。为了确保任务使用最新的配置和代码执行,需要终止旧的线程并创建新的线程。

任务执行结果回调

在finally中无论执行是否成功都需要回调结果给调度中心,回调的动作是向TriggerCallbackThread类的队列callBackQueue中加元素,然后该线程会从队列中取元素,发送HTTP请求回调给调度中心。

// 在finally中回调结果给调度中心
finally {
    if(triggerParam != null) {
        // callback handler info
        if (!toStop) {
            // commonm
            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                    triggerParam.getLogId(),
                    triggerParam.getLogDateTime(),
                    XxlJobContext.getXxlJobContext().getHandleCode(),
                    XxlJobContext.getXxlJobContext().getHandleMsg() )
            );
        } else {
            // is killed
            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                    triggerParam.getLogId(),
                    triggerParam.getLogDateTime(),
                    XxlJobContext.HANDLE_CODE_FAIL,
                    stopReason + " [job running, killed]" )
            );
        }
    }
}
}
private void doCallback(List<HandleCallbackParam> callbackParamList){
        boolean callbackRet = false;
        // callback, will retry if error
        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
            try {
                ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
                if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
                    callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
                    callbackRet = true;
                    break;
                } else {
                    callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
                }
            } catch (Throwable e) {
                callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
            }
        }
        if (!callbackRet) {
            appendFailCallbackFile(callbackParamList);
        }
    }

// 委托AdminBizImpl进行HTTP请求
@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
    return JobCompleteHelper.getInstance().callback(callbackParamList);
}

// 最终是JobCompleteHelper
private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
		// valid log item
		XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId());
		if (log == null) {
			return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");
		}
		if (log.getHandleCode() > 0) {
			return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback.");     // avoid repeat callback, trigger child job etc
		}

		// handle msg
		StringBuffer handleMsg = new StringBuffer();
		if (log.getHandleMsg()!=null) {
			handleMsg.append(log.getHandleMsg()).append("<br>");
		}
		if (handleCallbackParam.getHandleMsg() != null) {
			handleMsg.append(handleCallbackParam.getHandleMsg());
		}

		// success, save log
		log.setHandleTime(new Date());
		log.setHandleCode(handleCallbackParam.getHandleCode());
		log.setHandleMsg(handleMsg.toString());
		XxlJobCompleter.updateHandleInfoAndFinish(log);

		return ReturnT.SUCCESS;
	}

相关文章:

  • 基于JSP和SQL的CD销售管理系统(源码+lw+部署文档+讲解),源码可白嫖!
  • ubuntu ollama+dify实践
  • 基金交易系统的流程
  • 国产主流数据库存储类型简析
  • 接口自动化测试实战(超详细的)
  • 小程序主包方法迁移到分包-调用策略
  • Python区块链应用开发从入门到精通
  • Word 小黑第19套
  • redis 配置
  • mingw工具源码编译
  • SAP BC 记一次 DBCO 链接ORACLE DBCC 连接测试突然失败的问题
  • tomcat配置应用
  • 【区块链+ 医疗健康】基于区块链的医院诊所信息系统 | FISCO BCOS 应用案例
  • 整合记录-持续
  • 监控易东莞运维项目:it监控+机房动环监控+资产管理+配置管理
  • 滑动窗口[判断子集是否满足条件] 力扣:209 ▎2962 ▎3306
  • ArrayList底层结构和源码分析笔记
  • docker3-容器与镜像命令
  • 【算法】动态规划
  • oracle11.2.0.4 RAC 保姆级静默安装(二) DB数据库软件
  • 为学校网站做网站推广策划书/什么是网络营销策划
  • 服装饰品网站建设/进入百度
  • 做外贸接私单的网站/最近新闻摘抄50字
  • 二手商品网站制作/广告营销平台
  • 新开传奇网站新开网/成功营销十大经典案例
  • 免费做动态图片的网站/百度竞价怎么开户