成都网站建设排行榜网站外链的优化方法
源码解析
1、源码结构
源码中包含三个子模块:
1.1 源码目录介绍
- /doc :文档资料
- /db :“调度数据库”建表脚本
- /xxl-job-admin :调度中心(任务调度管理平台),项目源码
- /xxl-job-core :公共Jar依赖(调度核心)
- /xxl-job-executor-samples :执行器,Sample示例项目(大家可以在该项目上进行开发,也可以将现有项目改造生成执行器项目)
1.2 调度数据库配置
XXL-JOB调度模块基于自研调度组件并支持集群部署,调度数据库表说明如下:
- xxl_job_lock:任务调度锁表;
- xxl_job_group:执行器信息表,维护任务执行器信息;
- xxl_job_info:调度扩展信息表: 用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等; (核心)
- xxl_job_log:调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;
- xxl_job_log_report:调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到;
- xxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能;
- xxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息;(核心)
- xxl_job_user:系统用户表;
CREATE TABLE `xxl_job_info` (`id` int(11) NOT NULL AUTO_INCREMENT,`job_group` int(11) NOT NULL COMMENT '执行器主键ID',`job_desc` varchar(255) NOT NULL,`add_time` datetime DEFAULT NULL,`update_time` datetime DEFAULT NULL,`author` varchar(64) DEFAULT NULL COMMENT '作者',`alarm_email` varchar(255) DEFAULT NULL COMMENT '报警邮件',`schedule_type` varchar(50) NOT NULL DEFAULT 'NONE' COMMENT '调度类型',`schedule_conf` varchar(128) DEFAULT NULL COMMENT '调度配置,值含义取决于调度类型',`misfire_strategy` varchar(50) NOT NULL DEFAULT 'DO_NOTHING' COMMENT '调度过期策略',`executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '执行器路由策略',`executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',`executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',`executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞处理策略',`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',`executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',`glue_type` varchar(50) NOT NULL COMMENT 'GLUE类型',`glue_source` mediumtext COMMENT 'GLUE源代码',`glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注',`glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间',`child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID,多个逗号分隔',`trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行',`trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间',`trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间',PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
2、xxl-job-admin 核心源码解析
2.1 包结构
2.2 调度入口-XxlJobAdminConfig
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {private static XxlJobAdminConfig adminConfig = null;public static XxlJobAdminConfig getAdminConfig() {return adminConfig;}// ---------------------- XxlJobScheduler ----------------------private XxlJobScheduler xxlJobScheduler;@Overridepublic void afterPropertiesSet() throws Exception {adminConfig = this;// 调度器初始化xxlJobScheduler = new XxlJobScheduler();xxlJobScheduler.init();}@Overridepublic void destroy() throws Exception {// 调度器销毁xxlJobScheduler.destroy();}}
public class XxlJobScheduler {private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);public void init() throws Exception {// init i18n 国际化initI18n();// 下面依次开启线程去处理// admin trigger pool start 触发线程池JobTriggerPoolHelper.toStart();// admin registry monitor run 接受分布式的执行器注册JobRegistryHelper.getInstance().start();// admin fail-monitor run 任务失败监控JobFailMonitorHelper.getInstance().start();// admin lose-monitor run ( depend on JobTriggerPoolHelper ) 任务完成监控 - 分布式执行器任务状态回调JobCompleteHelper.getInstance().start();// admin log report start 日志上报JobLogReportHelper.getInstance().start();// start-schedule ( depend on JobTriggerPoolHelper ) 开始调度JobScheduleHelper.getInstance().start();logger.info(">>>>>>>>> init xxl-job admin success.");}public void destroy() throws Exception {// 对应start, 倒序停止线程// stop-scheduleJobScheduleHelper.getInstance().toStop();// admin log report stopJobLogReportHelper.getInstance().toStop();// admin lose-monitor stopJobCompleteHelper.getInstance().toStop();// admin fail-monitor stopJobFailMonitorHelper.getInstance().toStop();// admin registry stopJobRegistryHelper.getInstance().toStop();// admin trigger pool stopJobTriggerPoolHelper.toStop();}}
总结一下初始化过程做了那几件事,基本上就是xxl做的全部的事情了
1、初始国际化
2、启动触发线程池(执行器服务)
3、启动接受分布式的执行器注册的线程 (注册服务)
4、启动任务失败监控线程
5、启动 任务完成监控 - 分布式执行器任务状态回调 线程 (回调服务)
6、启动 日志上报 线程 (日志服务)
7、启动 任务调度 线程 (调度器)
对照一下,理解下全异步解耦设计,看看上面全部是xxx线程,就能理解异步的含义;我们再回过头来看看xxl-job的架构图,上面7步一一对应下面的架构图,除了(任务线程)是分布的业务执行应用中;
2.3 调度核心源码解析-JobScheduleHelper
public class JobScheduleHelper {private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);private static JobScheduleHelper instance = new JobScheduleHelper();public static JobScheduleHelper getInstance(){return instance;}public static final long PRE_READ_MS = 5000; // pre readprivate Thread scheduleThread;private Thread ringThread;private volatile boolean scheduleThreadToStop = false;private volatile boolean ringThreadToStop = false;private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();public void start(){// schedule threadscheduleThread = new Thread(new Runnable() {@Overridepublic void run() {try {TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>> init xxl-job admin scheduler success.");// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {// Scan Joblong start = System.currentTimeMillis();Connection conn = null;Boolean connAutoCommit = null;PreparedStatement preparedStatement = null;boolean preReadSuc = true;try {conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();connAutoCommit = conn.getAutoCommit();conn.setAutoCommit(false);preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();// tx start// 1、pre readlong nowTime = System.currentTimeMillis();List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (scheduleList!=null && scheduleList.size()>0) {// 2、push time-ringfor (XxlJobInfo jobInfo: scheduleList) {// time-ring jumpif (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {// 2.1、trigger-expire > 5s:pass && make next-trigger-timelogger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());// 1、misfire matchMisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {// FIRE_ONCE_NOW 》 triggerJobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );}// 2、fresh nextrefreshNextValidTime(jobInfo, new Date());} else if (nowTime > jobInfo.getTriggerNextTime()) {// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time// 1、triggerJobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );// 2、fresh nextrefreshNextValidTime(jobInfo, new Date());// next-trigger-time in 5s, pre-read againif (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {// 1、make ring secondint ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ringpushTimeRing(ringSecond, jobInfo.getId());// 3、fresh nextrefreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time// 1、make ring secondint ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ringpushTimeRing(ringSecond, jobInfo.getId());// 3、fresh nextrefreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}// 3、update trigger infofor (XxlJobInfo jobInfo: scheduleList) {XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);}} else {preReadSuc = false;}// tx stop} catch (Exception e) {if (!scheduleThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);}} finally {// commitif (conn != null) {try {conn.commit();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.setAutoCommit(connAutoCommit);} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}// close PreparedStatementif (null != preparedStatement) {try {preparedStatement.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}long cost = System.currentTimeMillis()-start;// Wait seconds, align secondif (cost < 1000) { // scan-overtime, not waittry {// pre-read period: success > scan each second; fail > skip this period;TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");}});scheduleThread.setDaemon(true);scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");scheduleThread.start();// ring threadringThread = new Thread(new Runnable() {@Overridepublic void run() {while (!ringThreadToStop) {// align secondtry {TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);} catch (InterruptedException e) {if (!ringThreadToStop) {logger.error(e.getMessage(), e);}}try {// second dataList<Integer> ringItemData = new ArrayList<>();int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;for (int i = 0; i < 2; i++) {List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );if (tmpData != null) {ringItemData.addAll(tmpData);}}// ring triggerlogger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );if (ringItemData.size() > 0) {// do triggerfor (int jobId: ringItemData) {// do triggerJobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);}// clearringItemData.clear();}} catch (Exception e) {if (!ringThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");}});ringThread.setDaemon(true);ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");ringThread.start();}}
2.4 触发器
JobTriggerPoolHelper.start() 主要干了两件事
1、 创建了快慢两个触发线程池,任务触发时间1分钟耗时超过500ms有10次就放入慢线程池中;
2、 异步将触发动作传递给XxlJobTrigger触发执行
public class JobTriggerPoolHelper {private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);// ---------------------- trigger pool ----------------------// fast/slow thread poolprivate ThreadPoolExecutor fastTriggerPool = null;private ThreadPoolExecutor slowTriggerPool = null;public void start(){fastTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(1000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());}});slowTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());}});}public void stop() {//triggerPool.shutdown();fastTriggerPool.shutdownNow();slowTriggerPool.shutdownNow();logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");}// job timeout countprivate volatile long minTim = System.currentTimeMillis()/60000; // ms > minprivate volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();/*** add trigger*/public void addTrigger(final int jobId,final TriggerTypeEnum triggerType,final int failRetryCount,final String executorShardingParam,final String executorParam,final String addressList) {// choose thread poolThreadPoolExecutor triggerPool_ = fastTriggerPool;AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 mintriggerPool_ = slowTriggerPool;}// triggertriggerPool_.execute(new Runnable() {@Overridepublic void run() {long start = System.currentTimeMillis();try {// do triggerXxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);} catch (Exception e) {logger.error(e.getMessage(), e);} finally {// check timeout-count-maplong minTim_now = System.currentTimeMillis()/60000;if (minTim != minTim_now) {minTim = minTim_now;jobTimeoutCountMap.clear();}// incr timeout-count-maplong cost = System.currentTimeMillis()-start;if (cost > 500) { // ob-timeout threshold 500msAtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));if (timeoutCount != null) {timeoutCount.incrementAndGet();}}}}});}}
XxlJobTrigger
主要做了3件事
1、trigger 初始化分片,按照是否为广播模式
- 分片路由执行,根据注册地址的数量,传递分片索引,循环执行触发
- 普通路由执行,则初始化1个分片,执行触发
2、processTrigger 执行触发
- 组装触发参数,包括阻塞策略、触发参数、分片数、日志id、执行器地址
其中执行器地址通过路由策略获取:
- 分片模式根据分片获取地址;
- 其余根据路由策略匹配
3、交个执行器ExecutorBizClient 发起远程调度
public class XxlJobTrigger {private static Logger logger = LoggerFactory.getLogger(XxlJobTrigger.class);/*** trigger job** @param jobId* @param triggerType* @param failRetryCount* >=0: use this param* <0: use param from job info config* @param executorShardingParam* @param executorParam* null: use job param* not null: cover job param* @param addressList* null: use executor addressList* not null: cover*/public static void trigger(int jobId,TriggerTypeEnum triggerType,int failRetryCount,String executorShardingParam,String executorParam,String addressList) {// load dataXxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);if (jobInfo == null) {logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);return;}if (executorParam != null) {jobInfo.setExecutorParam(executorParam);}int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());// cover addressListif (addressList!=null && addressList.trim().length()>0) {group.setAddressType(1);group.setAddressList(addressList.trim());}// sharding paramint[] shardingParam = null;if (executorShardingParam!=null){String[] shardingArr = executorShardingParam.split("/");if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {shardingParam = new int[2];shardingParam[0] = Integer.valueOf(shardingArr[0]);shardingParam[1] = Integer.valueOf(shardingArr[1]);}}// 分片路由执行,传递分片if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()&& shardingParam==null) {for (int i = 0; i < group.getRegistryList().size(); i++) {processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}}// 普通路由执行,初始化一个分片else {if (shardingParam == null) {shardingParam = new int[]{0, 1};}processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);}}private static boolean isNumeric(String str){try {int result = Integer.valueOf(str);return true;} catch (NumberFormatException e) {return false;}}/*** @param group job group, registry list may be empty* @param jobInfo* @param finalFailRetryCount* @param triggerType* @param index sharding index* @param total sharding index*/private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){// paramExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategyExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategyString shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;// 1、save log-idXxlJobLog jobLog = new XxlJobLog();jobLog.setJobGroup(jobInfo.getJobGroup());jobLog.setJobId(jobInfo.getId());jobLog.setTriggerTime(new Date());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());// 2、init trigger-paramTriggerParam triggerParam = new TriggerParam();triggerParam.setJobId(jobInfo.getId());triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());triggerParam.setExecutorParams(jobInfo.getExecutorParam());triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());triggerParam.setLogId(jobLog.getId());triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());triggerParam.setGlueType(jobInfo.getGlueType());triggerParam.setGlueSource(jobInfo.getGlueSource());triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());triggerParam.setBroadcastIndex(index);triggerParam.setBroadcastTotal(total);// 3、init address 路由策略,分片模式则根据分片获取地址;其余的进行路由策略匹配String address = null;ReturnT<String> routeAddressResult = null;if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {if (index < group.getRegistryList().size()) {address = group.getRegistryList().get(index);} else {address = group.getRegistryList().get(0);}} else {routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}}} else {routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));}// 4、trigger remote executor 触发远程调度执行ReturnT<String> triggerResult = null;if (address != null) {triggerResult = runExecutor(triggerParam, address);} else {triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);}// 5、collection trigger infoStringBuffer triggerMsgSb = new StringBuffer();//.....省略// 6、save log trigger-infojobLog.setExecutorAddress(address);jobLog.setExecutorHandler(jobInfo.getExecutorHandler());jobLog.setExecutorParam(jobInfo.getExecutorParam());jobLog.setExecutorShardingParam(shardingParam);jobLog.setExecutorFailRetryCount(finalFailRetryCount);//jobLog.setTriggerTime();jobLog.setTriggerCode(triggerResult.getCode());jobLog.setTriggerMsg(triggerMsgSb.toString());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());}}
3、xxl-core 任务线程核心源码解析
3.1 任务执行器的RPC服务提供
执行 XxlJobExecutor.start: 做了五件事,我们先重点专注最后意见,初始化执行器服务
public void start() throws Exception {// init logpath 初始化日志路径XxlJobFileAppender.initLogPath(logPath);// init invoker, admin-client 初始化调度管理中心,用于回调initAdminBizList(adminAddresses, accessToken);// init JobLogFileCleanThread 初始化启动日志清洗线程JobLogFileCleanThread.getInstance().start(logRetentionDays);// init TriggerCallbackThread 初始化触发回调线程TriggerCallbackThread.getInstance().start();// init executor-server 初始化执行器服务initEmbedServer(address, ip, port, appname, accessToken);}
初始化执行器服务,下面的代码利用netty实现NIO的服务模型,作为RPC服务提供方,netty的nio实现不在我们本次的关注,有兴趣的可以去看看netty; 我们重点关注里面的任务处理的核心处理方法:EmbedHttpServerHandler, 用到了ThreadPoolExecutor bizThreadPool 进行任务的异步处理;
// ---------------------- executor-server (rpc provider) ----------------------private EmbedServer embedServer = null;private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {// fill ip portport = port>0?port: NetUtil.findAvailablePort(9999);ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();// generate addressif (address==null || address.trim().length()==0) {String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is nulladdress = "http://{ip_port}/".replace("{ip_port}", ip_port_address);}// accessTokenif (accessToken==null || accessToken.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");}// startembedServer = new EmbedServer();embedServer.start(address, port, appname, accessToken);}
public class EmbedServer {private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);private ExecutorBiz executorBiz;private Thread thread;public void start(final String address, final int port, final String appname, final String accessToken) {executorBiz = new ExecutorBizImpl();thread = new Thread(new Runnable() {@Overridepublic void run() {// paramEventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(0,200,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");}});try {// start serverServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle.addLast(new HttpServerCodec()).addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));}}).childOption(ChannelOption.SO_KEEPALIVE, true);// bindChannelFuture future = bootstrap.bind(port).sync();logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);// start registrystartRegistry(appname, address);// wait util stopfuture.channel().closeFuture().sync();} catch (InterruptedException e) {logger.info(">>>>>>>>>>> xxl-job remoting server stop.");} catch (Exception e) {logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);} finally {// stoptry {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();} catch (Exception e) {logger.error(e.getMessage(), e);}}}});thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leavethread.start();}
3.2 任务调度请求的处理 EmbedHttpServerHandler
1、FullHttpRequest中的content为自定义传递的参数
2、提交业务处理任务到bizThreadPool线程池异步处理
@Overrideprotected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {// request parse//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);String requestData = msg.content().toString(CharsetUtil.UTF_8);String uri = msg.uri();HttpMethod httpMethod = msg.method();boolean keepAlive = HttpUtil.isKeepAlive(msg);String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);// invokebizThreadPool.execute(new Runnable() {@Overridepublic void run() {// do invokeObject responseObj = process(httpMethod, uri, requestData, accessTokenReq);// to jsonString responseJson = GsonTool.toJson(responseObj);// write responsewriteResponse(ctx, keepAlive, responseJson);}});}}
3、业务处理 process方法,根据uri进行路由执行不同方法,我们先关注任务的执行 “/run”, ExecutorBizImpl.run(triggerParam)
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {// validif (HttpMethod.POST != httpMethod) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}if (uri == null || uri.trim().length() == 0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}if (accessToken != null&& accessToken.trim().length() > 0&& !accessToken.equals(accessTokenReq)) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mappingtry {switch (uri) {case "/beat":return executorBiz.beat();case "/idleBeat":IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);case "/run":TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);return executorBiz.run(triggerParam);case "/kill":KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);case "/log":LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);default:return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");}} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));}}
ExecutorBizImpl.run(triggerParam) :
1、获取JobHandler
Bean模式: 通过@XxlJob注解获取对应的handler信息,存储在ConcurrentMap<String, IJobHandler> jobHandlerRepository 中,key为@XxlJob的value; IJobHandler为MethodJobHandler
GLUE_GROOVY模式:new一个GlueJobHandler
Script模式:new一个ScriptJobHandler
2、获取JobThread, 存储位置:ConcurrentMap<Integer, JobThread> jobThreadRepository,key为任务id
阻塞策略ExecutorBlockStrategyEnum的实现
DISCARD_LATER 忽略本次提交,直接返回
COVER_EARLY 使用最新进行覆盖,通过重建JobThread来忽略之前未的任务,而执行本次提交的任务
SERIAL_EXECUTION 并行执行, 不做任何处理
3、触发参数放入对应的JobThread的触发队列LinkedBlockingQueue<TriggerParam> triggerQueue中,通过JobThread异步循环执行;
public ReturnT<String> run(TriggerParam triggerParam) {// load old:jobHandler + jobThreadJobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;String removeOldReason = null;// valid:jobHandler + jobThreadGlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());if (GlueTypeEnum.BEAN == glueTypeEnum) {// new jobhandlerIJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());// valid old jobThreadif (jobThread!=null && jobHandler != newJobHandler) {// change handler, need kill old threadremoveOldReason = "change jobhandler or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {jobHandler = newJobHandler;if (jobHandler == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");}}} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {// valid old jobThreadif (jobThread != null &&!(jobThread.getHandler() instanceof GlueJobHandler&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {// change handler or gluesource updated, need kill old threadremoveOldReason = "change job source or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {try {IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());}}} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {// valid old jobThreadif (jobThread != null &&!(jobThread.getHandler() instanceof ScriptJobHandler&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {// change script or gluesource updated, need kill old threadremoveOldReason = "change job source or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));}} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");}// executor block strategyif (jobThread != null) {ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {// discard when runningif (jobThread.isRunningOrHasQueue()) {return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());}} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {// kill running jobThreadif (jobThread.isRunningOrHasQueue()) {removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();jobThread = null;}} else {// just queue trigger}}// replace thread (new or exists invalid)if (jobThread == null) {jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}// push data to queueReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);return pushResult;}
3.3 任务执行
JobThread 集成 Thread,核心的成员变量:
private int jobId; // 任务id
private IJobHandler handler; // 任务处理器
private LinkedBlockingQueue<TriggerParam> triggerQueue; // 任务触发队列
private Set<Long> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID
1、pushTriggerQueue 方法: 在上面3.2的任务调度请求处理中,由ExecutorBizImpl将任务参数添加到触发队列triggerQueue
2、核心run方法:
启动时,handler.init() 初始化
循环开始
- 从triggerQueue中获取触发任务,通过handler执行execute,
- 执行参数存在超时时间executorTimeout, 则使用线程异步执行execute,并通过FutureTask.get(long timeout, TimeUnit unit) 在超时后退出;
- 反之 ,直接执行
- 如triggerQueue没有获取到任务,超过30次循环并且triggerQueue中无任务,则移除任务id对应的JobThread,触发线程退出
- 最终通过TriggerCallbackThread回调线程推送执行结果
循环退出
检测triggerQueue,如存在任务则通过TriggerCallbackThread回调线程推送执行结果(killed)
最后,handler.destory() 销毁
package com.xxl.job.core.thread;import com.xxl.job.core.biz.model.HandleCallbackParam;import com.xxl.job.core.biz.model.ReturnT;import com.xxl.job.core.biz.model.TriggerParam;import com.xxl.job.core.context.XxlJobContext;import com.xxl.job.core.context.XxlJobHelper;import com.xxl.job.core.executor.XxlJobExecutor;import com.xxl.job.core.handler.IJobHandler;import com.xxl.job.core.log.XxlJobFileAppender;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.PrintWriter;import java.io.StringWriter;import java.util.Collections;import java.util.Date;import java.util.HashSet;import java.util.Set;import java.util.concurrent.*;/*** handler thread* @author xuxueli 2016-1-16 19:52:47*/public class JobThread extends Thread{private static Logger logger = LoggerFactory.getLogger(JobThread.class);private int jobId;private IJobHandler handler;private LinkedBlockingQueue<TriggerParam> triggerQueue;private Set<Long> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_IDprivate volatile boolean toStop = false;private String stopReason;private boolean running = false; // if running jobprivate int idleTimes = 0; // idel timespublic JobThread(int jobId, IJobHandler handler) {this.jobId = jobId;this.handler = handler;this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());// assign job thread namethis.setName("xxl-job, JobThread-"+jobId+"-"+System.currentTimeMillis());}public IJobHandler getHandler() {return handler;}/*** new trigger to queue** @param triggerParam* @return*/public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {// avoid repeatif (triggerLogIdSet.contains(triggerParam.getLogId())) {logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());}triggerLogIdSet.add(triggerParam.getLogId());triggerQueue.add(triggerParam);return ReturnT.SUCCESS;}/*** kill job thread** @param stopReason*/public void toStop(String stopReason) {/*** Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),* 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;* 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;*/this.toStop = true;this.stopReason = stopReason;}/*** is running job* @return*/public boolean isRunningOrHasQueue() {return running || triggerQueue.size()>0;}@Overridepublic void run() {// inittry {handler.init();} catch (Throwable e) {logger.error(e.getMessage(), e);}// executewhile(!toStop){running = false;idleTimes++;TriggerParam triggerParam = null;try {// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);if (triggerParam!=null) {running = true;idleTimes = 0;triggerLogIdSet.remove(triggerParam.getLogId());// log filename, like "logPath/yyyy-MM-dd/9999.log"String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());XxlJobContext xxlJobContext = new XxlJobContext(triggerParam.getJobId(),triggerParam.getExecutorParams(),logFileName,triggerParam.getBroadcastIndex(),triggerParam.getBroadcastTotal());// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);// executeXxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());if (triggerParam.getExecutorTimeout() > 0) {// limit timeoutThread futureThread = null;try {FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);handler.execute();return true;}});futureThread = new Thread(futureTask);futureThread.start();Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);} catch (TimeoutException e) {XxlJobHelper.log("<br>----------- xxl-job job execute timeout");XxlJobHelper.log(e);// handle resultXxlJobHelper.handleTimeout("job execute timeout ");} finally {futureThread.interrupt();}} else {// just executehandler.execute();}// valid execute handle dataif (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {XxlJobHelper.handleFail("job handle result lost.");} else {String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)?tempHandleMsg.substring(0, 50000).concat("..."):tempHandleMsg;XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);}XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="+ XxlJobContext.getXxlJobContext().getHandleCode()+ ", handleMsg = "+ XxlJobContext.getXxlJobContext().getHandleMsg());} else {if (idleTimes > 30) {if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lostXxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");}}}} catch (Throwable e) {if (toStop) {XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);}// handle resultStringWriter stringWriter = new StringWriter();e.printStackTrace(new PrintWriter(stringWriter));String errorMsg = stringWriter.toString();XxlJobHelper.handleFail(errorMsg);XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");} finally {if(triggerParam != null) {// callback handler infoif (!toStop) {// commonmTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() ));} else {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job running, killed]" ));}}}}// callback trigger request in queuewhile(triggerQueue !=null && triggerQueue.size()>0){TriggerParam triggerParam = triggerQueue.poll();if (triggerParam!=null) {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job not executed, in the job queue, killed.]"));}}// destroytry {handler.destroy();} catch (Throwable e) {logger.error(e.getMessage(), e);}logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());}}
3.4 任务结果回调 - TriggerCallbackThread
1、存储任务回调结果: LinkedBlockingQueue<HandleCallbackParam> callBackQueue
2、start方法:启动triggerCallbackThread线程,从callBackQueue中获取回调信息,轮询 AdminBiz 调度中心上报,上报成功一个节点后即退出
3、上报失败则记录日志
3.5 注册和心跳检测机制
1、自动监控:admin 在 2.2 调度入口处,启动了JobRegistryHelper.start() 开启了一个注册监控线程 registryMonitorThread,每隔BEAT_TIMEOUT 30s 循环一次,检测xxl_job_registry表中自动注册的数据,通过update_time更新时间来判定失联(2 * BEAT_TIMEOUT:60s ),并从xxl_job_registry中移除,并更新xxl_job_group表汇总更新注册地址列表;
public class JobRegistryHelper {private static Logger logger = LoggerFactory.getLogger(JobRegistryHelper.class);private static JobRegistryHelper instance = new JobRegistryHelper();public static JobRegistryHelper getInstance(){return instance;}private ThreadPoolExecutor registryOrRemoveThreadPool = null;private Thread registryMonitorThread;private volatile boolean toStop = false;public void start(){// for registry or removeregistryOrRemoveThreadPool = new ThreadPoolExecutor(2,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {r.run();logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");}});// for monitorregistryMonitorThread = new Thread(new Runnable() {@Overridepublic void run() {while (!toStop) {try {// auto registry groupList<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);if (groupList!=null && !groupList.isEmpty()) {// remove dead address (admin/executor)List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());if (ids!=null && ids.size()>0) {XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);}// fresh online address (admin/executor)HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());if (list != null) {for (XxlJobRegistry item: list) {if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {String appname = item.getRegistryKey();List<String> registryList = appAddressMap.get(appname);if (registryList == null) {registryList = new ArrayList<String>();}if (!registryList.contains(item.getRegistryValue())) {registryList.add(item.getRegistryValue());}appAddressMap.put(appname, registryList);}}}// fresh group addressfor (XxlJobGroup group: groupList) {List<String> registryList = appAddressMap.get(group.getAppname());String addressListStr = null;if (registryList!=null && !registryList.isEmpty()) {Collections.sort(registryList);StringBuilder addressListSB = new StringBuilder();for (String item:registryList) {addressListSB.append(item).append(",");}addressListStr = addressListSB.toString();addressListStr = addressListStr.substring(0, addressListStr.length()-1);}group.setAddressList(addressListStr);group.setUpdateTime(new Date());XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}try {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");}});registryMonitorThread.setDaemon(true);registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");registryMonitorThread.start();}}
2、主动上报:启动一个注册或销毁的线程池registryOrRemoveThreadPool,用于executor主动注册或销毁,其实就是更新、新增、删除xxl_job_registry表
// ---------------------- helper ----------------------public ReturnT<String> registry(RegistryParam registryParam) {// validif (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");}// async executeregistryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());if (ret < 1) {XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// freshfreshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;}public ReturnT<String> registryRemove(RegistryParam registryParam) {// validif (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");}// async executeregistryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());if (ret > 0) {// freshfreshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;}private void freshGroupRegistryInfo(RegistryParam registryParam){// Under consideration, prevent affecting core tables}
3、executor上报时机
在【3.1 任务执行器的RPC服务提供】中executor服务初始化时,我们关注了核心的消息处理EmbedHttpServerHandler,在服务启动和销毁后,还通过ExecutorRegistryThread执行了一行注册和销毁动作 ;
public class EmbedServer {private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);private ExecutorBiz executorBiz;private Thread thread;public void start(final String address, final int port, final String appname, final String accessToken) {executorBiz = new ExecutorBizImpl();thread = new Thread(new Runnable() {@Overridepublic void run() {// param// ... 省略// start registrystartRegistry(appname, address);// ... 省略}// ---------------------- registry ----------------------public void startRegistry(final String appname, final String address) {// start registryExecutorRegistryThread.getInstance().start(appname, address);}public void stopRegistry() {// stop registryExecutorRegistryThread.getInstance().toStop();}}
ExecutorRegistryThread: 启动registryThread线程,每隔BEAT_TIMEOUT:30s上报注册一次(通过上面提到的AdminBizClient的进行远程的registry调用); 在停止线程后, 进行注册移除动作;
public class ExecutorRegistryThread {private static Logger logger = LoggerFactory.getLogger(ExecutorRegistryThread.class);private static ExecutorRegistryThread instance = new ExecutorRegistryThread();public static ExecutorRegistryThread getInstance(){return instance;}private Thread registryThread;private volatile boolean toStop = false;public void start(final String appname, final String address){// validif (appname==null || appname.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");return;}if (XxlJobExecutor.getAdminBizList() == null) {logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");return;}registryThread = new Thread(new Runnable() {@Overridepublic void run() {// registrywhile (!toStop) {try {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registry(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {if (!toStop) {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);}} catch (InterruptedException e) {if (!toStop) {logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());}}}// registry removetry {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {if (!toStop) {logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");}});registryThread.setDaemon(true);registryThread.setName("xxl-job, executor ExecutorRegistryThread");registryThread.start();}public void toStop() {toStop = true;// interrupt and waitif (registryThread != null) {registryThread.interrupt();try {registryThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}}}
4、路由策略解析
ExecutorRouteFailover : 快速失败,通过beat心跳检测executor是否在线;
ExecutorRouteBusyover: 空闲, 通过idleBeat检测executor是否空闲(JobThread运行 && triggerQueue>0)
还有其他的如下
public enum ExecutorRouteStrategyEnum {FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);}