当前位置: 首页 > news >正文

XXL-Job源码分析

源码解析

1、源码结构

源码中包含三个子模块:

1.1 源码目录介绍

- /doc :文档资料

- /db :“调度数据库”建表脚本

- /xxl-job-admin :调度中心(任务调度管理平台),项目源码

- /xxl-job-core :公共Jar依赖(调度核心)

- /xxl-job-executor-samples :执行器,Sample示例项目(大家可以在该项目上进行开发,也可以将现有项目改造生成执行器项目)

1.2 调度数据库配置

XXL-JOB调度模块基于自研调度组件并支持集群部署,调度数据库表说明如下:

  - xxl_job_lock:任务调度锁表;

  - xxl_job_group:执行器信息表,维护任务执行器信息;

  - xxl_job_info:调度扩展信息表: 用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等; (核心)

  - xxl_job_log:调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;

  - xxl_job_log_report:调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到;

  - xxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能;

  - xxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息;(核心)

  - xxl_job_user:系统用户表;

CREATE TABLE `xxl_job_info` (

    `id` int(11) NOT NULL AUTO_INCREMENT,

    `job_group` int(11) NOT NULL COMMENT '执行器主键ID',

    `job_desc` varchar(255) NOT NULL,

    `add_time` datetime DEFAULT NULL,

    `update_time` datetime DEFAULT NULL,

    `author` varchar(64) DEFAULT NULL COMMENT '作者',

    `alarm_email` varchar(255) DEFAULT NULL COMMENT '报警邮件',

    `schedule_type` varchar(50) NOT NULL DEFAULT 'NONE' COMMENT '调度类型',

    `schedule_conf` varchar(128) DEFAULT NULL COMMENT '调度配置,值含义取决于调度类型',

    `misfire_strategy` varchar(50) NOT NULL DEFAULT 'DO_NOTHING' COMMENT '调度过期策略',

    `executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '执行器路由策略',

    `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',

    `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',

    `executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞处理策略',

    `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',

    `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',

    `glue_type` varchar(50) NOT NULL COMMENT 'GLUE类型',

    `glue_source` mediumtext COMMENT 'GLUE源代码',

    `glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注',

    `glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间',

    `child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID,多个逗号分隔',

    `trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行',

    `trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间',

    `trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间',

    PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

2、xxl-job-admin 核心源码解析

2.1 包结构

2.2 调度入口-XxlJobAdminConfig

public class XxlJobAdminConfig implements InitializingBean, DisposableBean {

 

    private static XxlJobAdminConfig adminConfig = null;

    public static XxlJobAdminConfig getAdminConfig() {

        return adminConfig;

    }

 

 

    // ---------------------- XxlJobScheduler ----------------------

 

    private XxlJobScheduler xxlJobScheduler;

 

    @Override

    public void afterPropertiesSet() throws Exception {

        adminConfig = this;

 

        // 调度器初始化

        xxlJobScheduler = new XxlJobScheduler();

        xxlJobScheduler.init();

    }

 

    @Override

    public void destroy() throws Exception {

        // 调度器销毁

        xxlJobScheduler.destroy();

    }

}
public class XxlJobScheduler  {

    private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);

 

 

    public void init() throws Exception {

        // init i18n  国际化

        initI18n();

 

        // 下面依次开启线程去处理

        

        // admin trigger pool start  触发线程池

        JobTriggerPoolHelper.toStart();

 

        // admin registry monitor run  接受分布式的执行器注册

        JobRegistryHelper.getInstance().start();

 

        // admin fail-monitor run  任务失败监控

        JobFailMonitorHelper.getInstance().start();

 

        // admin lose-monitor run ( depend on JobTriggerPoolHelper )  任务完成监控 - 分布式执行器任务状态回调

        JobCompleteHelper.getInstance().start();

 

        // admin log report start   日志上报

        JobLogReportHelper.getInstance().start();

 

        // start-schedule  ( depend on JobTriggerPoolHelper )  开始调度

        JobScheduleHelper.getInstance().start();

 

        logger.info(">>>>>>>>> init xxl-job admin success.");

    }

 

    

    public void destroy() throws Exception {

 

        // 对应start, 倒序停止线程

        

        // stop-schedule

        JobScheduleHelper.getInstance().toStop();

 

        // admin log report stop

        JobLogReportHelper.getInstance().toStop();

 

        // admin lose-monitor stop

        JobCompleteHelper.getInstance().toStop();

 

        // admin fail-monitor stop

        JobFailMonitorHelper.getInstance().toStop();

 

        // admin registry stop

        JobRegistryHelper.getInstance().toStop();

 

        // admin trigger pool stop

        JobTriggerPoolHelper.toStop();

 

    }

}

总结一下初始化过程做了那几件事,基本上就是xxl做的全部的事情了

1、初始国际化

2、启动触发线程池(执行器服务)

3、启动接受分布式的执行器注册的线程 (注册服务)

4、启动任务失败监控线程

5、启动 任务完成监控 - 分布式执行器任务状态回调 线程 (回调服务)

6、启动  日志上报 线程 (日志服务)

7、启动 任务调度  线程 (调度器)

对照一下,理解下全异步解耦设计,看看上面全部是xxx线程,就能理解异步的含义;我们再回过头来看看xxl-job的架构图,上面7步一一对应下面的架构图,除了(任务线程)是分布的业务执行应用中;

2.3 调度核心源码解析-JobScheduleHelper

public class JobScheduleHelper {

    private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);

 

    private static JobScheduleHelper instance = new JobScheduleHelper();

    public static JobScheduleHelper getInstance(){

        return instance;

    }

 

    public static final long PRE_READ_MS = 5000;    // pre read

 

    private Thread scheduleThread;

    private Thread ringThread;

    private volatile boolean scheduleThreadToStop = false;

    private volatile boolean ringThreadToStop = false;

    private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();

 

    public void start(){

 

        // schedule thread

        scheduleThread = new Thread(new Runnable() {

            @Override

            public void run() {

 

                try {

                    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );

                } catch (InterruptedException e) {

                    if (!scheduleThreadToStop) {

                        logger.error(e.getMessage(), e);

                    }

                }

                logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

 

                // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)

                int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

 

                while (!scheduleThreadToStop) {

 

                    // Scan Job

                    long start = System.currentTimeMillis();

 

                    Connection conn = null;

                    Boolean connAutoCommit = null;

                    PreparedStatement preparedStatement = null;

 

                    boolean preReadSuc = true;

                    try {

 

                        conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();

                        connAutoCommit = conn.getAutoCommit();

                        conn.setAutoCommit(false);

 

                        preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );

                        preparedStatement.execute();

 

                        // tx start

 

                        // 1、pre read

                        long nowTime = System.currentTimeMillis();

                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);

                        if (scheduleList!=null && scheduleList.size()>0) {

                            // 2、push time-ring

                            for (XxlJobInfo jobInfo: scheduleList) {

 

                                // time-ring jump

                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {

                                    // 2.1、trigger-expire > 5s:pass && make next-trigger-time

                                    logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

 

                                    // 1、misfire match

                                    MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);

                                    if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {

                                        // FIRE_ONCE_NOW 》 trigger

                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);

                                        logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

                                    }

 

                                    // 2、fresh next

                                    refreshNextValidTime(jobInfo, new Date());

 

                                } else if (nowTime > jobInfo.getTriggerNextTime()) {

                                    // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

 

                                    // 1、trigger

                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);

                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

 

                                    // 2、fresh next

                                    refreshNextValidTime(jobInfo, new Date());

 

                                    // next-trigger-time in 5s, pre-read again

                                    if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

 

                                        // 1、make ring second

                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

 

                                        // 2、push time ring

                                        pushTimeRing(ringSecond, jobInfo.getId());

 

                                        // 3、fresh next

                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

 

                                    }

 

                                } else {

                                    // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time

 

                                    // 1、make ring second

                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

 

                                    // 2、push time ring

                                    pushTimeRing(ringSecond, jobInfo.getId());

 

                                    // 3、fresh next

                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

 

                                }

 

                            }

 

                            // 3、update trigger info

                            for (XxlJobInfo jobInfo: scheduleList) {

                                XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);

                            }

 

                        } else {

                            preReadSuc = false;

                        }

 

                        // tx stop

 

 

                    } catch (Exception e) {

                        if (!scheduleThreadToStop) {

                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);

                        }

                    } finally {

 

                        // commit

                        if (conn != null) {

                            try {

                                conn.commit();

                            } catch (SQLException e) {

                                if (!scheduleThreadToStop) {

                                    logger.error(e.getMessage(), e);

                                }

                            }

                            try {

                                conn.setAutoCommit(connAutoCommit);

                            } catch (SQLException e) {

                                if (!scheduleThreadToStop) {

                                    logger.error(e.getMessage(), e);

                                }

                            }

                            try {

                                conn.close();

                            } catch (SQLException e) {

                                if (!scheduleThreadToStop) {

                                    logger.error(e.getMessage(), e);

                                }

                            }

                        }

 

                        // close PreparedStatement

                        if (null != preparedStatement) {

                            try {

                                preparedStatement.close();

                            } catch (SQLException e) {

                                if (!scheduleThreadToStop) {

                                    logger.error(e.getMessage(), e);

                                }

                            }

                        }

                    }

                    long cost = System.currentTimeMillis()-start;

 

 

                    // Wait seconds, align second

                    if (cost < 1000) {  // scan-overtime, not wait

                        try {

                            // pre-read period: success > scan each second; fail > skip this period;

                            TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);

                        } catch (InterruptedException e) {

                            if (!scheduleThreadToStop) {

                                logger.error(e.getMessage(), e);

                            }

                        }

                    }

 

                }

 

                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");

            }

        });

        scheduleThread.setDaemon(true);

        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");

        scheduleThread.start();

 

 

        // ring thread

        ringThread = new Thread(new Runnable() {

            @Override

            public void run() {

 

                while (!ringThreadToStop) {

 

                    // align second

                    try {

                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);

                    } catch (InterruptedException e) {

                        if (!ringThreadToStop) {

                            logger.error(e.getMessage(), e);

                        }

                    }

 

                    try {

                        // second data

                        List<Integer> ringItemData = new ArrayList<>();

                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;

                        for (int i = 0; i < 2; i++) {

                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );

                            if (tmpData != null) {

                                ringItemData.addAll(tmpData);

                            }

                        }

 

                        // ring trigger

                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );

                        if (ringItemData.size() > 0) {

                            // do trigger

                            for (int jobId: ringItemData) {

                                // do trigger

                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);

                            }

                            // clear

                            ringItemData.clear();

                        }

                    } catch (Exception e) {

                        if (!ringThreadToStop) {

                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);

                        }

                    }

                }

                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");

            }

        });

        ringThread.setDaemon(true);

        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");

        ringThread.start();

    }

}

2.4 触发器

JobTriggerPoolHelper.start()  主要干了两件事

1、 创建了快慢两个触发线程池,任务触发时间1分钟耗时超过500ms有10次就放入慢线程池中;

2、 异步将触发动作传递给XxlJobTrigger触发执行

public class JobTriggerPoolHelper {

    private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);

 

 

    // ---------------------- trigger pool ----------------------

 

    // fast/slow thread pool

    private ThreadPoolExecutor fastTriggerPool = null;

    private ThreadPoolExecutor slowTriggerPool = null;

 

    public void start(){

        fastTriggerPool = new ThreadPoolExecutor(

                10,

                XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),

                60L,

                TimeUnit.SECONDS,

                new LinkedBlockingQueue<Runnable>(1000),

                new ThreadFactory() {

                    @Override

                    public Thread newThread(Runnable r) {

                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());

                    }

                });

 

        slowTriggerPool = new ThreadPoolExecutor(

                10,

                XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),

                60L,

                TimeUnit.SECONDS,

                new LinkedBlockingQueue<Runnable>(2000),

                new ThreadFactory() {

                    @Override

                    public Thread newThread(Runnable r) {

                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());

                    }

                });

    }

 

 

    public void stop() {

        //triggerPool.shutdown();

        fastTriggerPool.shutdownNow();

        slowTriggerPool.shutdownNow();

        logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");

    }

 

 

    // job timeout count

    private volatile long minTim = System.currentTimeMillis()/60000;     // ms > min

    private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();

 

 

    /**

     * add trigger

     */

    public void addTrigger(final int jobId,

                           final TriggerTypeEnum triggerType,

                           final int failRetryCount,

                           final String executorShardingParam,

                           final String executorParam,

                           final String addressList) {

 

        // choose thread pool

        ThreadPoolExecutor triggerPool_ = fastTriggerPool;

        AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);

        if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min

            triggerPool_ = slowTriggerPool;

        }

 

        // trigger

        triggerPool_.execute(new Runnable() {

            @Override

            public void run() {

 

                long start = System.currentTimeMillis();

 

                try {

                    // do trigger

                    XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);

                } catch (Exception e) {

                    logger.error(e.getMessage(), e);

                } finally {

 

                    // check timeout-count-map

                    long minTim_now = System.currentTimeMillis()/60000;

                    if (minTim != minTim_now) {

                        minTim = minTim_now;

                        jobTimeoutCountMap.clear();

                    }

 

                    // incr timeout-count-map

                    long cost = System.currentTimeMillis()-start;

                    if (cost > 500) {       // ob-timeout threshold 500ms

                        AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));

                        if (timeoutCount != null) {

                            timeoutCount.incrementAndGet();

                        }

                    }

 

                }

 

            }

        });

    }

 

}

XxlJobTrigger

主要做了3件事

1、trigger 初始化分片,按照是否为广播模式

        - 分片路由执行,根据注册地址的数量,传递分片索引,循环执行触发

        - 普通路由执行,则初始化1个分片,执行触发

2、processTrigger 执行触发

        -  组装触发参数,包括阻塞策略、触发参数、分片数、日志id、执行器地址

        其中执行器地址通过路由策略获取:

            - 分片模式根据分片获取地址;

            - 其余根据路由策略匹配

3、交个执行器ExecutorBizClient 发起远程调度

public class XxlJobTrigger {

    private static Logger logger = LoggerFactory.getLogger(XxlJobTrigger.class);

 

    /**

     * trigger job

     *

     * @param jobId

     * @param triggerType

     * @param failRetryCount

     * >=0: use this param

     * <0: use param from job info config

     * @param executorShardingParam

     * @param executorParam

     *          null: use job param

     *          not null: cover job param

     * @param addressList

     *          null: use executor addressList

     *          not null: cover

     */

    public static void trigger(int jobId,

                               TriggerTypeEnum triggerType,

                               int failRetryCount,

                               String executorShardingParam,

                               String executorParam,

                               String addressList) {

 

        // load data

        XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);

        if (jobInfo == null) {

            logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);

            return;

        }

        if (executorParam != null) {

            jobInfo.setExecutorParam(executorParam);

        }

        int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();

        XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

 

        // cover addressList

        if (addressList!=null && addressList.trim().length()>0) {

            group.setAddressType(1);

            group.setAddressList(addressList.trim());

        }

 

        // sharding param

        int[] shardingParam = null;

        if (executorShardingParam!=null){

            String[] shardingArr = executorShardingParam.split("/");

            if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {

                shardingParam = new int[2];

                shardingParam[0] = Integer.valueOf(shardingArr[0]);

                shardingParam[1] = Integer.valueOf(shardingArr[1]);

            }

        }

        // 分片路由执行,传递分片

        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)

                && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()

                && shardingParam==null) {

            for (int i = 0; i < group.getRegistryList().size(); i++) {

                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());

            }

        }

        // 普通路由执行,初始化一个分片

        else {

            if (shardingParam == null) {

                shardingParam = new int[]{0, 1};

            }

            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);

        }

 

    }

 

    private static boolean isNumeric(String str){

        try {

            int result = Integer.valueOf(str);

            return true;

        } catch (NumberFormatException e) {

            return false;

        }

    }

 

    /**

     * @param group                     job group, registry list may be empty

     * @param jobInfo

     * @param finalFailRetryCount

     * @param triggerType

     * @param index                     sharding index

     * @param total                     sharding index

     */

    private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

 

        // param

        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy

        ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy

        String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

 

        // 1、save log-id

        XxlJobLog jobLog = new XxlJobLog();

        jobLog.setJobGroup(jobInfo.getJobGroup());

        jobLog.setJobId(jobInfo.getId());

        jobLog.setTriggerTime(new Date());

        XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);

        logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

 

        // 2、init trigger-param

        TriggerParam triggerParam = new TriggerParam();

        triggerParam.setJobId(jobInfo.getId());

        triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());

        triggerParam.setExecutorParams(jobInfo.getExecutorParam());

        triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());

        triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());

        triggerParam.setLogId(jobLog.getId());

        triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());

        triggerParam.setGlueType(jobInfo.getGlueType());

        triggerParam.setGlueSource(jobInfo.getGlueSource());

        triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());

        triggerParam.setBroadcastIndex(index);

        triggerParam.setBroadcastTotal(total);

 

        // 3、init address  路由策略,分片模式则根据分片获取地址;其余的进行路由策略匹配

        String address = null;

        ReturnT<String> routeAddressResult = null;

        if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {

            if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {

                if (index < group.getRegistryList().size()) {

                    address = group.getRegistryList().get(index);

                } else {

                    address = group.getRegistryList().get(0);

                }

            } else {

                routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());

                if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {

                    address = routeAddressResult.getContent();

                }

            }

        } else {

            routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));

        }

 

        // 4、trigger remote executor  触发远程调度执行

        ReturnT<String> triggerResult = null;

        if (address != null) {

            triggerResult = runExecutor(triggerParam, address);

        } else {

            triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);

        }

 

        // 5、collection trigger info

        StringBuffer triggerMsgSb = new StringBuffer();

        //.....省略

 

        // 6、save log trigger-info

        jobLog.setExecutorAddress(address);

        jobLog.setExecutorHandler(jobInfo.getExecutorHandler());

        jobLog.setExecutorParam(jobInfo.getExecutorParam());

        jobLog.setExecutorShardingParam(shardingParam);

        jobLog.setExecutorFailRetryCount(finalFailRetryCount);

        //jobLog.setTriggerTime();

        jobLog.setTriggerCode(triggerResult.getCode());

        jobLog.setTriggerMsg(triggerMsgSb.toString());

        XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

 

        logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());

    }

}

3、xxl-core 任务线程核心源码解析

3.1 任务执行器的RPC服务提供

执行 XxlJobExecutor.start: 做了五件事,我们先重点专注最后意见,初始化执行器服务


   

 public void start() throws Exception {

 

        // init logpath  初始化日志路径

        XxlJobFileAppender.initLogPath(logPath);

 

        // init invoker, admin-client  初始化调度管理中心,用于回调

        initAdminBizList(adminAddresses, accessToken);

 

 

        // init JobLogFileCleanThread  初始化启动日志清洗线程

        JobLogFileCleanThread.getInstance().start(logRetentionDays);

 

        // init TriggerCallbackThread  初始化触发回调线程

        TriggerCallbackThread.getInstance().start();

 

        // init executor-server  初始化执行器服务

        initEmbedServer(address, ip, port, appname, accessToken);

    }

初始化执行器服务,下面的代码利用netty实现NIO的服务模型,作为RPC服务提供方,netty的nio实现不在我们本次的关注,有兴趣的可以去看看netty; 我们重点关注里面的任务处理的核心处理方法:EmbedHttpServerHandler, 用到了ThreadPoolExecutor bizThreadPool 进行任务的异步处理;


 

// ---------------------- executor-server (rpc provider) ----------------------

    private EmbedServer embedServer = null;

 

    private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

 

        // fill ip port

        port = port>0?port: NetUtil.findAvailablePort(9999);

        ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

 

        // generate address

        if (address==null || address.trim().length()==0) {

            String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null

            address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);

        }

 

        // accessToken

        if (accessToken==null || accessToken.trim().length()==0) {

            logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");

        }

 

        // start

        embedServer = new EmbedServer();

        embedServer.start(address, port, appname, accessToken);

    }
public class EmbedServer {

    private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);

 

    private ExecutorBiz executorBiz;

    private Thread thread;

 

    public void start(final String address, final int port, final String appname, final String accessToken) {

        executorBiz = new ExecutorBizImpl();

        thread = new Thread(new Runnable() {

            @Override

            public void run() {

                // param

                EventLoopGroup bossGroup = new NioEventLoopGroup();

                EventLoopGroup workerGroup = new NioEventLoopGroup();

                ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(

                        0,

                        200,

                        60L,

                        TimeUnit.SECONDS,

                        new LinkedBlockingQueue<Runnable>(2000),

                        new ThreadFactory() {

                            @Override

                            public Thread newThread(Runnable r) {

                                return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());

                            }

                        },

                        new RejectedExecutionHandler() {

                            @Override

                            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

                                throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");

                            }

                        });

                try {

                    // start server

                    ServerBootstrap bootstrap = new ServerBootstrap();

                    bootstrap.group(bossGroup, workerGroup)

                            .channel(NioServerSocketChannel.class)

                            .childHandler(new ChannelInitializer<SocketChannel>() {

                                @Override

                                public void initChannel(SocketChannel channel) throws Exception {

                                    channel.pipeline()

                                            .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle

                                            .addLast(new HttpServerCodec())

                                            .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL

                                            .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));

                                }

                            })

                            .childOption(ChannelOption.SO_KEEPALIVE, true);

 

                    // bind

                    ChannelFuture future = bootstrap.bind(port).sync();

 

                    logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

 

                    // start registry

                    startRegistry(appname, address);

 

                    // wait util stop

                    future.channel().closeFuture().sync();

 

                } catch (InterruptedException e) {

                    logger.info(">>>>>>>>>>> xxl-job remoting server stop.");

                } catch (Exception e) {

                    logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);

                } finally {

                    // stop

                    try {

                        workerGroup.shutdownGracefully();

                        bossGroup.shutdownGracefully();

                    } catch (Exception e) {

                        logger.error(e.getMessage(), e);

                    }

                }

            }

        });

        thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave

        thread.start();

    }

3.2 任务调度请求的处理  EmbedHttpServerHandler

1、FullHttpRequest中的content为自定义传递的参数

2、提交业务处理任务到bizThreadPool线程池异步处理

@Override

        protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

            // request parse

            //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());    // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);

            String requestData = msg.content().toString(CharsetUtil.UTF_8);

            String uri = msg.uri();

            HttpMethod httpMethod = msg.method();

            boolean keepAlive = HttpUtil.isKeepAlive(msg);

            String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);

 

            // invoke

            bizThreadPool.execute(new Runnable() {

                @Override

                public void run() {

                    // do invoke

                    Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);

 

                    // to json

                    String responseJson = GsonTool.toJson(responseObj);

 

                    // write response

                    writeResponse(ctx, keepAlive, responseJson);

                }

            });

        }

}

3、业务处理 process方法,根据uri进行路由执行不同方法,我们先关注任务的执行 “/run”, ExecutorBizImpl.run(triggerParam)

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {

            // valid

            if (HttpMethod.POST != httpMethod) {

                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");

            }

            if (uri == null || uri.trim().length() == 0) {

                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");

            }

            if (accessToken != null

                    && accessToken.trim().length() > 0

                    && !accessToken.equals(accessTokenReq)) {

                return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");

            }

 

            // services mapping

            try {

                switch (uri) {

                    case "/beat":

                        return executorBiz.beat();

                    case "/idleBeat":

                        IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);

                        return executorBiz.idleBeat(idleBeatParam);

                    case "/run":

                        TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);

                        return executorBiz.run(triggerParam);

                    case "/kill":

                        KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);

                        return executorBiz.kill(killParam);

                    case "/log":

                        LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);

                        return executorBiz.log(logParam);

                    default:

                        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");

                }

            } catch (Exception e) {

                logger.error(e.getMessage(), e);

                return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));

            }

        }

 ExecutorBizImpl.run(triggerParam) :

1、获取JobHandler

Bean模式: 通过@XxlJob注解获取对应的handler信息,存储在ConcurrentMap<String, IJobHandler> jobHandlerRepository 中,key为@XxlJob的value;  IJobHandler为MethodJobHandler

GLUE_GROOVY模式:new一个GlueJobHandler

Script模式:new一个ScriptJobHandler

2、获取JobThread, 存储位置:ConcurrentMap<Integer, JobThread> jobThreadRepository,key为任务id

阻塞策略ExecutorBlockStrategyEnum的实现

DISCARD_LATER  忽略本次提交,直接返回

COVER_EARLY  使用最新进行覆盖,通过重建JobThread来忽略之前未的任务,而执行本次提交的任务

SERIAL_EXECUTION 并行执行, 不做任何处理

3、触发参数放入对应的JobThread的触发队列LinkedBlockingQueue<TriggerParam> triggerQueue中,通过JobThread异步循环执行;

public ReturnT<String> run(TriggerParam triggerParam) {

        // load old:jobHandler + jobThread

        JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());

        IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;

        String removeOldReason = null;

 

        // valid:jobHandler + jobThread

        GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());

        if (GlueTypeEnum.BEAN == glueTypeEnum) {

 

            // new jobhandler

            IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

 

            // valid old jobThread

            if (jobThread!=null && jobHandler != newJobHandler) {

                // change handler, need kill old thread

                removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

 

                jobThread = null;

                jobHandler = null;

            }

 

            // valid handler

            if (jobHandler == null) {

                jobHandler = newJobHandler;

                if (jobHandler == null) {

                    return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");

                }

            }

 

        } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

 

            // valid old jobThread

            if (jobThread != null &&

                    !(jobThread.getHandler() instanceof GlueJobHandler

                        && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {

                // change handler or gluesource updated, need kill old thread

                removeOldReason = "change job source or glue type, and terminate the old job thread.";

 

                jobThread = null;

                jobHandler = null;

            }

 

            // valid handler

            if (jobHandler == null) {

                try {

                    IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());

                    jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());

                } catch (Exception e) {

                    logger.error(e.getMessage(), e);

                    return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());

                }

            }

        } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {

 

            // valid old jobThread

            if (jobThread != null &&

                    !(jobThread.getHandler() instanceof ScriptJobHandler

                            && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {

                // change script or gluesource updated, need kill old thread

                removeOldReason = "change job source or glue type, and terminate the old job thread.";

 

                jobThread = null;

                jobHandler = null;

            }

 

            // valid handler

            if (jobHandler == null) {

                jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));

            }

        } else {

            return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");

        }

 

        // executor block strategy

        if (jobThread != null) {

            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);

            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {

                // discard when running

                if (jobThread.isRunningOrHasQueue()) {

                    return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());

                }

            } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {

                // kill running jobThread

                if (jobThread.isRunningOrHasQueue()) {

                    removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

 

                    jobThread = null;

                }

            } else {

                // just queue trigger

            }

        }

 

        // replace thread (new or exists invalid)

        if (jobThread == null) {

            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);

        }

 

        // push data to queue

        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);

        return pushResult;

    }

3.3 任务执行

JobThread 集成 Thread,核心的成员变量:

private int jobId;  // 任务id

private IJobHandler handler;  // 任务处理器

private LinkedBlockingQueue<TriggerParam> triggerQueue;  // 任务触发队列

private Set<Long> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID

1、pushTriggerQueue 方法: 在上面3.2的任务调度请求处理中,由ExecutorBizImpl将任务参数添加到触发队列triggerQueue

2、核心run方法:

启动时,handler.init() 初始化

循环开始

-  从triggerQueue中获取触发任务,通过handler执行execute,

- 执行参数存在超时时间executorTimeout, 则使用线程异步执行execute,并通过FutureTask.get(long timeout, TimeUnit unit) 在超时后退出;

- 反之 ,直接执行

- 如triggerQueue没有获取到任务,超过30次循环并且triggerQueue中无任务,则移除任务id对应的JobThread,触发线程退出

- 最终通过TriggerCallbackThread回调线程推送执行结果

 循环退出

检测triggerQueue,如存在任务则通过TriggerCallbackThread回调线程推送执行结果(killed)

最后,handler.destory() 销毁

package com.xxl.job.core.thread;

 

import com.xxl.job.core.biz.model.HandleCallbackParam;

import com.xxl.job.core.biz.model.ReturnT;

import com.xxl.job.core.biz.model.TriggerParam;

import com.xxl.job.core.context.XxlJobContext;

import com.xxl.job.core.context.XxlJobHelper;

import com.xxl.job.core.executor.XxlJobExecutor;

import com.xxl.job.core.handler.IJobHandler;

import com.xxl.job.core.log.XxlJobFileAppender;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import java.io.PrintWriter;

import java.io.StringWriter;

import java.util.Collections;

import java.util.Date;

import java.util.HashSet;

import java.util.Set;

import java.util.concurrent.*;

 

 

/**

 * handler thread

 * @author xuxueli 2016-1-16 19:52:47

 */

public class JobThread extends Thread{

private static Logger logger = LoggerFactory.getLogger(JobThread.class);

 

private int jobId;

private IJobHandler handler;

private LinkedBlockingQueue<TriggerParam> triggerQueue;

private Set<Long> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID

 

private volatile boolean toStop = false;

private String stopReason;

 

    private boolean running = false;    // if running job

private int idleTimes = 0; // idel times

 

 

public JobThread(int jobId, IJobHandler handler) {

this.jobId = jobId;

this.handler = handler;

this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();

this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());

 

// assign job thread name

this.setName("xxl-job, JobThread-"+jobId+"-"+System.currentTimeMillis());

}

public IJobHandler getHandler() {

return handler;

}

 

    /**

     * new trigger to queue

     *

     * @param triggerParam

     * @return

     */

public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {

// avoid repeat

if (triggerLogIdSet.contains(triggerParam.getLogId())) {

logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());

return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());

}

 

triggerLogIdSet.add(triggerParam.getLogId());

triggerQueue.add(triggerParam);

        return ReturnT.SUCCESS;

}

 

    /**

     * kill job thread

     *

     * @param stopReason

     */

public void toStop(String stopReason) {

/**

 * Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),

 * 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;

 * 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;

 */

this.toStop = true;

this.stopReason = stopReason;

}

 

    /**

     * is running job

     * @return

     */

    public boolean isRunningOrHasQueue() {

        return running || triggerQueue.size()>0;

    }

 

    @Override

public void run() {

 

     // init

     try {

handler.init();

} catch (Throwable e) {

     logger.error(e.getMessage(), e);

}

 

// execute

while(!toStop){

running = false;

idleTimes++;

 

            TriggerParam triggerParam = null;

            try {

// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)

triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);

if (triggerParam!=null) {

running = true;

idleTimes = 0;

triggerLogIdSet.remove(triggerParam.getLogId());

 

// log filename, like "logPath/yyyy-MM-dd/9999.log"

String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());

XxlJobContext xxlJobContext = new XxlJobContext(

triggerParam.getJobId(),

triggerParam.getExecutorParams(),

logFileName,

triggerParam.getBroadcastIndex(),

triggerParam.getBroadcastTotal());

 

// init job context

XxlJobContext.setXxlJobContext(xxlJobContext);

 

// execute

XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());

 

if (triggerParam.getExecutorTimeout() > 0) {

// limit timeout

Thread futureThread = null;

try {

FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {

@Override

public Boolean call() throws Exception {

 

// init job context

XxlJobContext.setXxlJobContext(xxlJobContext);

 

handler.execute();

return true;

}

});

futureThread = new Thread(futureTask);

futureThread.start();

 

Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);

} catch (TimeoutException e) {

 

XxlJobHelper.log("<br>----------- xxl-job job execute timeout");

XxlJobHelper.log(e);

 

// handle result

XxlJobHelper.handleTimeout("job execute timeout ");

} finally {

futureThread.interrupt();

}

} else {

// just execute

handler.execute();

}

 

// valid execute handle data

if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {

XxlJobHelper.handleFail("job handle result lost.");

} else {

String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();

tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)

?tempHandleMsg.substring(0, 50000).concat("...")

:tempHandleMsg;

XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);

}

XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="

+ XxlJobContext.getXxlJobContext().getHandleCode()

+ ", handleMsg = "

+ XxlJobContext.getXxlJobContext().getHandleMsg()

);

 

} else {

if (idleTimes > 30) {

if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost

XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");

}

}

}

} catch (Throwable e) {

if (toStop) {

XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);

}

 

// handle result

StringWriter stringWriter = new StringWriter();

e.printStackTrace(new PrintWriter(stringWriter));

String errorMsg = stringWriter.toString();

 

XxlJobHelper.handleFail(errorMsg);

 

XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");

} finally {

                if(triggerParam != null) {

                    // callback handler info

                    if (!toStop) {

                        // commonm

                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(

                         triggerParam.getLogId(),

triggerParam.getLogDateTime(),

XxlJobContext.getXxlJobContext().getHandleCode(),

XxlJobContext.getXxlJobContext().getHandleMsg() )

);

                    } else {

                        // is killed

                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(

                         triggerParam.getLogId(),

triggerParam.getLogDateTime(),

XxlJobContext.HANDLE_CODE_FAIL,

stopReason + " [job running, killed]" )

);

                    }

                }

            }

        }

 

// callback trigger request in queue

while(triggerQueue !=null && triggerQueue.size()>0){

TriggerParam triggerParam = triggerQueue.poll();

if (triggerParam!=null) {

// is killed

TriggerCallbackThread.pushCallBack(new HandleCallbackParam(

triggerParam.getLogId(),

triggerParam.getLogDateTime(),

XxlJobContext.HANDLE_CODE_FAIL,

stopReason + " [job not executed, in the job queue, killed.]")

);

}

}

 

// destroy

try {

handler.destroy();

} catch (Throwable e) {

logger.error(e.getMessage(), e);

}

 

logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());

}

}

3.4 任务结果回调 - TriggerCallbackThread

1、存储任务回调结果: LinkedBlockingQueue<HandleCallbackParam> callBackQueue

2、start方法:启动triggerCallbackThread线程,从callBackQueue中获取回调信息,轮询 AdminBiz 调度中心上报,上报成功一个节点后即退出

3、上报失败则记录日志

3.5 注册和心跳检测机制

 1、自动监控:admin 在 2.2 调度入口处,启动了JobRegistryHelper.start()  开启了一个注册监控线程 registryMonitorThread,每隔BEAT_TIMEOUT  30s 循环一次,检测xxl_job_registry表中自动注册的数据,通过update_time更新时间来判定失联(2 * BEAT_TIMEOUT:60s ),并从xxl_job_registry中移除,并更新xxl_job_group表汇总更新注册地址列表;

public class JobRegistryHelper {

private static Logger logger = LoggerFactory.getLogger(JobRegistryHelper.class);

 

private static JobRegistryHelper instance = new JobRegistryHelper();

public static JobRegistryHelper getInstance(){

return instance;

}

 

private ThreadPoolExecutor registryOrRemoveThreadPool = null;

private Thread registryMonitorThread;

private volatile boolean toStop = false;

 

public void start(){

 

// for registry or remove

registryOrRemoveThreadPool = new ThreadPoolExecutor(

2,

10,

30L,

TimeUnit.SECONDS,

new LinkedBlockingQueue<Runnable>(2000),

new ThreadFactory() {

@Override

public Thread newThread(Runnable r) {

return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());

}

},

new RejectedExecutionHandler() {

@Override

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

r.run();

logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");

}

});

 

// for monitor

registryMonitorThread = new Thread(new Runnable() {

@Override

public void run() {

while (!toStop) {

try {

// auto registry group

List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);

if (groupList!=null && !groupList.isEmpty()) {

 

// remove dead address (admin/executor)

List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());

if (ids!=null && ids.size()>0) {

XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);

}

 

// fresh online address (admin/executor)

HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();

List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());

if (list != null) {

for (XxlJobRegistry item: list) {

if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {

String appname = item.getRegistryKey();

List<String> registryList = appAddressMap.get(appname);

if (registryList == null) {

registryList = new ArrayList<String>();

}

 

if (!registryList.contains(item.getRegistryValue())) {

registryList.add(item.getRegistryValue());

}

appAddressMap.put(appname, registryList);

}

}

}

 

// fresh group address

for (XxlJobGroup group: groupList) {

List<String> registryList = appAddressMap.get(group.getAppname());

String addressListStr = null;

if (registryList!=null && !registryList.isEmpty()) {

Collections.sort(registryList);

StringBuilder addressListSB = new StringBuilder();

for (String item:registryList) {

addressListSB.append(item).append(",");

}

addressListStr = addressListSB.toString();

addressListStr = addressListStr.substring(0, addressListStr.length()-1);

}

group.setAddressList(addressListStr);

group.setUpdateTime(new Date());

 

XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);

}

}

} catch (Exception e) {

if (!toStop) {

logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);

}

}

try {

TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);

} catch (InterruptedException e) {

if (!toStop) {

logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);

}

}

}

logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");

}

});

registryMonitorThread.setDaemon(true);

registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");

registryMonitorThread.start();

}

}

2、主动上报:启动一个注册或销毁的线程池registryOrRemoveThreadPool,用于executor主动注册或销毁,其实就是更新、新增、删除xxl_job_registry表

// ---------------------- helper ----------------------

 

public ReturnT<String> registry(RegistryParam registryParam) {

 

// valid

if (!StringUtils.hasText(registryParam.getRegistryGroup())

|| !StringUtils.hasText(registryParam.getRegistryKey())

|| !StringUtils.hasText(registryParam.getRegistryValue())) {

return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");

}

 

// async execute

registryOrRemoveThreadPool.execute(new Runnable() {

@Override

public void run() {

int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

if (ret < 1) {

XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

 

// fresh

freshGroupRegistryInfo(registryParam);

}

}

});

 

return ReturnT.SUCCESS;

}

 

public ReturnT<String> registryRemove(RegistryParam registryParam) {

 

// valid

if (!StringUtils.hasText(registryParam.getRegistryGroup())

|| !StringUtils.hasText(registryParam.getRegistryKey())

|| !StringUtils.hasText(registryParam.getRegistryValue())) {

return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");

}

 

// async execute

registryOrRemoveThreadPool.execute(new Runnable() {

@Override

public void run() {

int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());

if (ret > 0) {

// fresh

freshGroupRegistryInfo(registryParam);

}

}

});

 

return ReturnT.SUCCESS;

}

 

private void freshGroupRegistryInfo(RegistryParam registryParam){

// Under consideration, prevent affecting core tables

}

3、executor上报时机

在【3.1 任务执行器的RPC服务提供】中executor服务初始化时,我们关注了核心的消息处理EmbedHttpServerHandler,在服务启动和销毁后,还通过ExecutorRegistryThread执行了一行注册和销毁动作 ;   

public class EmbedServer {

    private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);

 

    private ExecutorBiz executorBiz;

    private Thread thread;

 

    public void start(final String address, final int port, final String appname, final String accessToken) {

        executorBiz = new ExecutorBizImpl();

        thread = new Thread(new Runnable() {

            @Override

            public void run() {

                // param

 

// ... 省略

 

             // start registry

             startRegistry(appname, address);

// ... 省略

     }

 

    // ---------------------- registry ----------------------

 

    public void startRegistry(final String appname, final String address) {

        // start registry

        ExecutorRegistryThread.getInstance().start(appname, address);

    }

 

    public void stopRegistry() {

        // stop registry

        ExecutorRegistryThread.getInstance().toStop();

    }

}

ExecutorRegistryThread: 启动registryThread线程,每隔BEAT_TIMEOUT:30s上报注册一次(通过上面提到的AdminBizClient的进行远程的registry调用); 在停止线程后, 进行注册移除动作;

public class ExecutorRegistryThread {

    private static Logger logger = LoggerFactory.getLogger(ExecutorRegistryThread.class);

 

    private static ExecutorRegistryThread instance = new ExecutorRegistryThread();

    public static ExecutorRegistryThread getInstance(){

        return instance;

    }

 

    private Thread registryThread;

    private volatile boolean toStop = false;

    public void start(final String appname, final String address){

 

        // valid

        if (appname==null || appname.trim().length()==0) {

            logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");

            return;

        }

        if (XxlJobExecutor.getAdminBizList() == null) {

            logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");

            return;

        }

 

        registryThread = new Thread(new Runnable() {

            @Override

            public void run() {

 

                // registry

                while (!toStop) {

                    try {

                        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);

                        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {

                            try {

                                ReturnT<String> registryResult = adminBiz.registry(registryParam);

                                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {

                                    registryResult = ReturnT.SUCCESS;

                                    logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});

                                    break;

                                } else {

                                    logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});

                                }

                            } catch (Exception e) {

                                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);

                            }

 

                        }

                    } catch (Exception e) {

                        if (!toStop) {

                            logger.error(e.getMessage(), e);

                        }

 

                    }

 

                    try {

                        if (!toStop) {

                            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);

                        }

                    } catch (InterruptedException e) {

                        if (!toStop) {

                            logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());

                        }

                    }

                }

 

                // registry remove

                try {

                    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);

                    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {

                        try {

                            ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);

                            if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {

                                registryResult = ReturnT.SUCCESS;

                                logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});

                                break;

                            } else {

                                logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});

                            }

                        } catch (Exception e) {

                            if (!toStop) {

                                logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);

                            }

 

                        }

 

                    }

                } catch (Exception e) {

                    if (!toStop) {

                        logger.error(e.getMessage(), e);

                    }

                }

                logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");

 

            }

        });

        registryThread.setDaemon(true);

        registryThread.setName("xxl-job, executor ExecutorRegistryThread");

        registryThread.start();

    }

 

    public void toStop() {

        toStop = true;

 

        // interrupt and wait

        if (registryThread != null) {

            registryThread.interrupt();

            try {

                registryThread.join();

            } catch (InterruptedException e) {

                logger.error(e.getMessage(), e);

            }

        }

 

    }

 

}

4、路由策略解析

ExecutorRouteFailover : 快速失败,通过beat心跳检测executor是否在线;

ExecutorRouteBusyover: 空闲, 通过idleBeat检测executor是否空闲(JobThread运行 && triggerQueue>0)

还有其他的如下

public enum ExecutorRouteStrategyEnum {

 

    FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),

    LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),

    ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),

    RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),

    CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),

    LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),

    LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),

    FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),

    BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),

    SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);

}

相关文章:

  • 2024春秋杯网络安全联赛冬季赛wp
  • Jenkins+gitee 搭建自动化部署
  • 基于带通滤波的camera脏污检测算法可以完全替代imatest
  • AIGC与AICG的区别解析
  • 深入理解DOM:22个核心知识点与代码示例
  • k8s worker 节点使用kubectl 命令
  • Huatuo热更新--安装HybridCLR
  • 【计算机视觉】文本识别
  • 【Java】实现后端请求接口
  • 组合的输出(信息学奥赛一本通-1317)
  • 关于防火墙运维面试题2
  • DirectShow基类文件和帮助文档
  • 【无标题】基于AIX的DB2 10.1安装配置规范
  • Qt的QTreeWidget样式设置
  • Linux进阶——防火墙
  • 【鸿蒙开发】第三十章 应用稳定性-检测、分析、优化、运维汇总
  • 数据结构——二叉树(2025.2.12)
  • 用大模型学大模型04-模型与网络
  • 负载测试和压力测试的原理分别是什么
  • 代码实践——准备阶段
  • 中国进出口银行:1-4月投放制造业中长期贷款超1800亿元
  • 清雪车司机未拉手刹下车导致溜车被撞亡,事故调查报告发布
  • 政企共同发力:多地密集部署外贸企业抢抓90天政策窗口期
  • 俄代表团:16日上午将继续“等候乌代表团”
  • 俄外交部:俄乌伊斯坦布尔谈判改在当地时间15日下午举行
  • 鄂州交警通报致1死2伤车祸:女子操作不当引发,已被刑拘