XXL-JOB任务执行The access token is wrong问题分析解决及原理源码解析
文章目录
- 1.起因
- 2.解决
- 3.源码
- 3.1client的位置
- 3.2server的位置
- 3.3正确姿势
- 4.XXL-JOB的原理
- 5.时间轮
- 6.总结
1.起因
由于一个项目的XXL-JOB的调度日志发现本月好几天的任务都没有执行,都报了这个The access token is wrong的错误,然后导致那些天任务没有执行,奖励数据还在路上,这种只能我手动触发去把那几天遗漏的任务重新触发执行之后数据才正常了。
2.解决
通过两个服务的XXL-JOB的服务的配置发现,有问题的那个服务的nacos上配置了XXL-JOB的accessToken参数如下:
xxl.job.accessToken=default_token
经过两个服务的XXL-JOB的配置对比,果断把有问题的服务的xxl.job.accessToken配置参数删除,然后重启服务之后应该就好了。
3.源码
3.1client的位置
客户端所在的位置在EmbedServer的process方法里面:
com.xxl.job.core.server.EmbedServer
3.2server的位置
admin的server所在位置在JobApiControllerl类的api方法里面:
com.xxl.job.admin.controller.api
在XxlJobRemotingUtil类中有一个常量:XXL_JOB_ACCESS_TOKEN
public static final String XXL_JOB_ACCESS_TOKEN = "XXL-JOB-ACCESS-TOKEN";
XxlJobRemotingUtil的postBody这个方法是一个公共的方法,用户client和admin交互的接口基本上就如下几个了:
这个postBody会有如下代码,会把accessToken放到http的请求头里面,header头中会有一个属性:XXL-JOB-ACCESS-TOKEN就是admin和client约定好的access-token的值,两边必须一致才可以。
if(accessToken!=null && accessToken.trim().length()>0){connection.setRequestProperty(XXL_JOB_ACCESS_TOKEN, accessToken);
}
3.3正确姿势
根据XXL-JOB的官方文档来看,
https://www.xuxueli.com/xxl-job/
需要在部署admin的时候配置,admin的源码如下:需要在启动admin的时候配置这个accessToken的配置参数:
官方最新文档参数如下,为啥这个配置跟图片上的不一致,是因为我们使用的版本是2.2.0的版本,官方文档应该是最新的版本,所以会有出入。
### 调度中心通讯TOKEN [选填]:非空时启用;
xxl.job.admin.accessToken=default_token
生产XXL-JOB的admin部署是一个集群,这个集群应该是没有配置这个参数,但是客户端配置了这个参数,要配置的话admin和client端的这个参数要配置成一致的才可以,所以都是内网使用client端就没有必要加这个access-token,直接把client服务服务的nacos的这个参数去掉,然后重启就可以了。
4.XXL-JOB的原理
部署好admin,项目中引入core的核心依赖之后,服务会配置XXL-JOB的admin的一些信息,通过spingBoot的自动装配扫描解析配置之后,启动了一个netty的服务来和admin通信,入口就在XxlJobSpringExecutor类的afterSingletonsInstantiated方法中:
@Overridepublic void afterSingletonsInstantiated() {// init JobHandler Repository/*initJobHandlerRepository(applicationContext);*/// init JobHandler Repository (for method)initJobHandlerMethodRepository(applicationContext);// refresh GlueFactoryGlueFactory.refreshInstance(1);// super starttry {super.start();} catch (Exception e) {throw new RuntimeException(e);}}
关键就是这个 super.start()的方法:
public void start() throws Exception {// init logpath(初始化日志存储路径)XxlJobFileAppender.initLogPath(logPath);// init invoker, admin-client(初始化交互调用客户端对象列表)initAdminBizList(adminAddresses, accessToken);// init JobLogFileCleanThread(初始化日志文件清理线程)JobLogFileCleanThread.getInstance().start(logRetentionDays);// init TriggerCallbackThread(初始化admin触发调用线程)TriggerCallbackThread.getInstance().start();// init executor-server(初始化netty服务)initEmbedServer(address, ip, port, appname, accessToken);}
EmbedServer的start方法如下:
public void start(final String address, final int port, final String appname, final String accessToken) {executorBiz = new ExecutorBizImpl();thread = new Thread(new Runnable() {@Overridepublic void run() {// paramEventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(0,200,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");}});try {// start serverServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle(心跳处理).addLast(new HttpServerCodec())//http的编解码处理.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL(黏包、半包处理).addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));//这个就是客户端与服务端的http的接口调用交互处理Handler}}).childOption(ChannelOption.SO_KEEPALIVE, true);// bindChannelFuture future = bootstrap.bind(port).sync();logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);// start registry(注册client扫描解析的AdminBiz调用的列表)startRegistry(appname, address);// wait util stopfuture.channel().closeFuture().sync();} catch (InterruptedException e) {if (e instanceof InterruptedException) {logger.info(">>>>>>>>>>> xxl-job remoting server stop.");} else {logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);}} finally {// stoptry {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();} catch (Exception e) {logger.error(e.getMessage(), e);}}}});thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leavethread.start();}
EmbedHttpServerHandler类如下:
public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);private ExecutorBiz executorBiz;private String accessToken;private ThreadPoolExecutor bizThreadPool;public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {this.executorBiz = executorBiz;this.accessToken = accessToken;this.bizThreadPool = bizThreadPool;}@Overrideprotected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {// request parse//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);String requestData = msg.content().toString(CharsetUtil.UTF_8);String uri = msg.uri();HttpMethod httpMethod = msg.method();boolean keepAlive = HttpUtil.isKeepAlive(msg);String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);// invokebizThreadPool.execute(new Runnable() {@Overridepublic void run() {// do invokeObject responseObj = process(httpMethod, uri, requestData, accessTokenReq);// to jsonString responseJson = GsonTool.toJson(responseObj);// write responsewriteResponse(ctx, keepAlive, responseJson);}});}private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {// validif (HttpMethod.POST != httpMethod) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}if (uri==null || uri.trim().length()==0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}if (accessToken!=null&& accessToken.trim().length()>0&& !accessToken.equals(accessTokenReq)) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mappingtry {if ("/beat".equals(uri)) {return executorBiz.beat();} else if ("/idleBeat".equals(uri)) {IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);} else if ("/run".equals(uri)) {TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);return executorBiz.run(triggerParam);} else if ("/kill".equals(uri)) {KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);} else if ("/log".equals(uri)) {LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");}} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));}}/*** write response*/private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {// write responseFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); // Unpooled.wrappedBuffer(responseJson)response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8"); // HttpHeaderValues.TEXT_PLAIN.toString()response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());if (keepAlive) {response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);}ctx.writeAndFlush(response);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {logger.error(">>>>>>>>>>> xxl-job provider netty_http server caught exception", cause);ctx.close();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {ctx.channel().close(); // beat 3N, close if idlelogger.debug(">>>>>>>>>>> xxl-job provider netty_http server close an idle channel.");} else {super.userEventTriggered(ctx, evt);}}}
5.时间轮
在admin端最有意思的是他的调度实现采用的是时间轮来实现,入口是在XxlJobAdminConfig的afterPropertiesSet方法里面
@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {private static XxlJobAdminConfig adminConfig = null;public static XxlJobAdminConfig getAdminConfig() {return adminConfig;}// ---------------------- XxlJobScheduler ----------------------private XxlJobScheduler xxlJobScheduler;@Overridepublic void afterPropertiesSet() throws Exception {adminConfig = this;//这里就是调度实例初始化xxlJobScheduler = new XxlJobScheduler();xxlJobScheduler.init();}@Overridepublic void destroy() throws Exception {//这里是调度实例的销毁xxlJobScheduler.destroy();}// ---------------------- XxlJobScheduler ----------------------// conf@Value("${xxl.job.i18n}")private String i18n;@Value("${xxl.job.accessToken}")private String accessToken;@Value("${spring.mail.username}")private String emailUserName;@Value("${xxl.job.triggerpool.fast.max}")private int triggerPoolFastMax;@Value("${xxl.job.triggerpool.slow.max}")private int triggerPoolSlowMax;@Value("${xxl.job.logretentiondays}")private int logretentiondays;,,,,,,,,,,,,,,,,,,,,,,,,,,,,
}
XxlJobScheduler的init和destroy方法如下:
public void init() throws Exception {// init i18ninitI18n();// admin registry monitor run(注册监控任务启动)JobRegistryMonitorHelper.getInstance().start();// admin fail-monitor run(监控失败任务--发邮件告警)JobFailMonitorHelper.getInstance().start();// admin lose-monitor run(任务失联任务监控启动)JobLosedMonitorHelper.getInstance().start();// admin trigger pool start(触发线程池任务启动)JobTriggerPoolHelper.toStart();// admin log report start(日志上报处理任务启动)JobLogReportHelper.getInstance().start();// start-schedule(任务调度线程启动)JobScheduleHelper.getInstance().start();logger.info(">>>>>>>>> init xxl-job admin success.");}public void destroy() throws Exception {// stop-scheduleJobScheduleHelper.getInstance().toStop();// admin log report stopJobLogReportHelper.getInstance().toStop();// admin trigger pool stopJobTriggerPoolHelper.toStop();// admin lose-monitor stopJobLosedMonitorHelper.getInstance().toStop();// admin fail-monitor stopJobFailMonitorHelper.getInstance().toStop();// admin registry stopJobRegistryMonitorHelper.getInstance().toStop();}
JobScheduleHelper在这个类里面启动了一个后台线程采用时间轮实现了任务的调度:
public class JobScheduleHelper {private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);private static JobScheduleHelper instance = new JobScheduleHelper();public static JobScheduleHelper getInstance(){return instance;}public static final long PRE_READ_MS = 5000; // pre readprivate Thread scheduleThread;private Thread ringThread;private volatile boolean scheduleThreadToStop = false;private volatile boolean ringThreadToStop = false;private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();public void start(){// schedule threadscheduleThread = new Thread(new Runnable() {@Overridepublic void run() {try {TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>> init xxl-job admin scheduler success.");// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {// Scan Joblong start = System.currentTimeMillis();Connection conn = null;Boolean connAutoCommit = null;PreparedStatement preparedStatement = null;boolean preReadSuc = true;try {conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();connAutoCommit = conn.getAutoCommit();conn.setAutoCommit(false);preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();// tx start// 1、pre readlong nowTime = System.currentTimeMillis();List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (scheduleList!=null && scheduleList.size()>0) {// 2、push time-ringfor (XxlJobInfo jobInfo: scheduleList) {// time-ring jumpif (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {// 2.1、trigger-expire > 5s:pass && make next-trigger-timelogger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());// fresh nextrefreshNextValidTime(jobInfo, new Date());} else if (nowTime > jobInfo.getTriggerNextTime()) {// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time// 1、triggerJobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );// 2、fresh nextrefreshNextValidTime(jobInfo, new Date());// next-trigger-time in 5s, pre-read againif (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {// 1、make ring secondint ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ringpushTimeRing(ringSecond, jobInfo.getId());// 3、fresh nextrefreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time// 1、make ring secondint ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ringpushTimeRing(ringSecond, jobInfo.getId());// 3、fresh nextrefreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}// 3、update trigger infofor (XxlJobInfo jobInfo: scheduleList) {XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);}} else {preReadSuc = false;}// tx stop} catch (Exception e) {if (!scheduleThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);}} finally {// commitif (conn != null) {try {conn.commit();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.setAutoCommit(connAutoCommit);} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}// close PreparedStatementif (null != preparedStatement) {try {preparedStatement.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}long cost = System.currentTimeMillis()-start;// Wait seconds, align secondif (cost < 1000) { // scan-overtime, not waittry {// pre-read period: success > scan each second; fail > skip this period;TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");}});scheduleThread.setDaemon(true);scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");scheduleThread.start();// ring threadringThread = new Thread(new Runnable() {@Overridepublic void run() {// align secondtry {TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );} catch (InterruptedException e) {if (!ringThreadToStop) {logger.error(e.getMessage(), e);}}while (!ringThreadToStop) {try {// second dataList<Integer> ringItemData = new ArrayList<>();int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;for (int i = 0; i < 2; i++) {List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );if (tmpData != null) {ringItemData.addAll(tmpData);}}// ring triggerlogger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );if (ringItemData.size() > 0) {// do triggerfor (int jobId: ringItemData) {// do triggerJobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);}// clearringItemData.clear();}} catch (Exception e) {if (!ringThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);}}// next second, align secondtry {TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);} catch (InterruptedException e) {if (!ringThreadToStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");}});ringThread.setDaemon(true);ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");ringThread.start();}private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws ParseException {Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(fromTime);if (nextValidTime != null) {jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());jobInfo.setTriggerNextTime(nextValidTime.getTime());} else {jobInfo.setTriggerStatus(0);jobInfo.setTriggerLastTime(0);jobInfo.setTriggerNextTime(0);}}private void pushTimeRing(int ringSecond, int jobId){// push async ringList<Integer> ringItemData = ringData.get(ringSecond);if (ringItemData == null) {ringItemData = new ArrayList<Integer>();ringData.put(ringSecond, ringItemData);}ringItemData.add(jobId);logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );}public void toStop(){// 1、stop schedulescheduleThreadToStop = true;try {TimeUnit.SECONDS.sleep(1); // wait} catch (InterruptedException e) {logger.error(e.getMessage(), e);}if (scheduleThread.getState() != Thread.State.TERMINATED){// interrupt and waitscheduleThread.interrupt();try {scheduleThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}// if has ring databoolean hasRingData = false;if (!ringData.isEmpty()) {for (int second : ringData.keySet()) {List<Integer> tmpData = ringData.get(second);if (tmpData!=null && tmpData.size()>0) {hasRingData = true;break;}}}if (hasRingData) {try {TimeUnit.SECONDS.sleep(8);} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}// stop ring (wait job-in-memory stop)ringThreadToStop = true;try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {logger.error(e.getMessage(), e);}if (ringThread.getState() != Thread.State.TERMINATED){// interrupt and waitringThread.interrupt();try {ringThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop");}}
这个类里面有两个线程,一个是调度的后台线程,一个是任务执行的后台线程
JobTriggerPoolHelper这类是负责任务的触发下发任务实现类:
/*** job trigger thread pool helper** @author xuxueli 2018-07-03 21:08:07*/
public class JobTriggerPoolHelper {private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);// ---------------------- trigger pool ----------------------// fast/slow thread poolprivate ThreadPoolExecutor fastTriggerPool = null;private ThreadPoolExecutor slowTriggerPool = null;public void start(){fastTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(1000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());}});slowTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());}});}public void stop() {//triggerPool.shutdown();fastTriggerPool.shutdownNow();slowTriggerPool.shutdownNow();logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");}// job timeout countprivate volatile long minTim = System.currentTimeMillis()/60000; // ms > minprivate volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();/*** add trigger*/public void addTrigger(final int jobId,final TriggerTypeEnum triggerType,final int failRetryCount,final String executorShardingParam,final String executorParam,final String addressList) {// choose thread poolThreadPoolExecutor triggerPool_ = fastTriggerPool;AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 mintriggerPool_ = slowTriggerPool;}// triggertriggerPool_.execute(new Runnable() {@Overridepublic void run() {long start = System.currentTimeMillis();try {// do triggerXxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);} catch (Exception e) {logger.error(e.getMessage(), e);} finally {// check timeout-count-maplong minTim_now = System.currentTimeMillis()/60000;if (minTim != minTim_now) {minTim = minTim_now;jobTimeoutCountMap.clear();}// incr timeout-count-maplong cost = System.currentTimeMillis()-start;if (cost > 500) { // ob-timeout threshold 500msAtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));if (timeoutCount != null) {timeoutCount.incrementAndGet();}}}}});}// ---------------------- helper ----------------------private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();public static void toStart() {helper.start();}public static void toStop() {helper.stop();}/*** @param jobId* @param triggerType* @param failRetryCount* >=0: use this param* <0: use param from job info config* @param executorShardingParam* @param executorParam* null: use job param* not null: cover job param*/public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);}}
这个类里面有快慢线程池,用于执行下发任务的,唯一的区别是阻塞队列的大小不同,应对调度执行任务的数量动态适配用那个线程池来处理,它里面XxlJobTrigger.trigger方法才是真正下发执行任务的类:
/*** xxl-job trigger* Created by xuxueli on 17/7/13.*/
public class XxlJobTrigger {private static Logger logger = LoggerFactory.getLogger(XxlJobTrigger.class);/*** trigger job** @param jobId* @param triggerType* @param failRetryCount* >=0: use this param* <0: use param from job info config* @param executorShardingParam* @param executorParam* null: use job param* not null: cover job param* @param addressList* null: use executor addressList* not null: cover*/public static void trigger(int jobId,TriggerTypeEnum triggerType,int failRetryCount,String executorShardingParam,String executorParam,String addressList) {// load dataXxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);if (jobInfo == null) {logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);return;}if (executorParam != null) {jobInfo.setExecutorParam(executorParam);}int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());// cover addressListif (addressList!=null && addressList.trim().length()>0) {group.setAddressType(1);group.setAddressList(addressList.trim());}// sharding paramint[] shardingParam = null;if (executorShardingParam!=null){String[] shardingArr = executorShardingParam.split("/");if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {shardingParam = new int[2];shardingParam[0] = Integer.valueOf(shardingArr[0]);shardingParam[1] = Integer.valueOf(shardingArr[1]);}}if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()&& shardingParam==null) {for (int i = 0; i < group.getRegistryList().size(); i++) {processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}} else {if (shardingParam == null) {shardingParam = new int[]{0, 1};}processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);}}private static boolean isNumeric(String str){try {int result = Integer.valueOf(str);return true;} catch (NumberFormatException e) {return false;}}/*** @param group job group, registry list may be empty* @param jobInfo* @param finalFailRetryCount* @param triggerType* @param index sharding index* @param total sharding index*/private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){// paramExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategyExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategyString shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;// 1、save log-idXxlJobLog jobLog = new XxlJobLog();jobLog.setJobGroup(jobInfo.getJobGroup());jobLog.setJobId(jobInfo.getId());jobLog.setTriggerTime(new Date());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());// 2、init trigger-paramTriggerParam triggerParam = new TriggerParam();triggerParam.setJobId(jobInfo.getId());triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());triggerParam.setExecutorParams(jobInfo.getExecutorParam());triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());triggerParam.setLogId(jobLog.getId());triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());triggerParam.setGlueType(jobInfo.getGlueType());triggerParam.setGlueSource(jobInfo.getGlueSource());triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());triggerParam.setBroadcastIndex(index);triggerParam.setBroadcastTotal(total);// 3、init addressString address = null;ReturnT<String> routeAddressResult = null;if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {if (index < group.getRegistryList().size()) {address = group.getRegistryList().get(index);} else {address = group.getRegistryList().get(0);}} else {routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}}} else {routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));}// 4、trigger remote executorReturnT<String> triggerResult = null;if (address != null) {triggerResult = runExecutor(triggerParam, address);} else {triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);}// 5、collection trigger infoStringBuffer triggerMsgSb = new StringBuffer();triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":").append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());if (shardingParam != null) {triggerMsgSb.append("("+shardingParam+")");}triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>").append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");// 6、save log trigger-infojobLog.setExecutorAddress(address);jobLog.setExecutorHandler(jobInfo.getExecutorHandler());jobLog.setExecutorParam(jobInfo.getExecutorParam());jobLog.setExecutorShardingParam(shardingParam);jobLog.setExecutorFailRetryCount(finalFailRetryCount);//jobLog.setTriggerTime();jobLog.setTriggerCode(triggerResult.getCode());jobLog.setTriggerMsg(triggerMsgSb.toString());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());}/*** run executor* @param triggerParam* @param address* @return*/public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){ReturnT<String> runResult = null;try {ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);runResult = executorBiz.run(triggerParam);} catch (Exception e) {logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));}StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");runResultSB.append("<br>address:").append(address);runResultSB.append("<br>code:").append(runResult.getCode());runResultSB.append("<br>msg:").append(runResult.getMsg());runResult.setMsg(runResultSB.toString());return runResult;}}
在XxlJobTrigger的trigger方法中,会根据任务信息的执行路由策略匹配到对应的策略进行下发执行,对应的执行路由策略如下:
在XxlJobTrigger的processTrigger方法中会组装对应的triggerParam触发任务执行的参数和一些数据的入库,像日志id等,后续client收到这个run执行的任务会根据这个任务参数,把执行的日志上报给admin入库保存,admin会有一个后台清理日志的线程到了日志存储期限就会把对应的日志清理了,在XxlJobTrigger的runExecutor方法中最关键的代码入下:
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);runResult = executorBiz.run(triggerParam);
executorBiz.run方法会调用ExecutorBizClient的run方法给client发送http请求,而client在启动的之后会把client服务的地址信息及xxl-job的注解标识的类的方法等信息全部发给admin注册信息入库,然后在admin配置好对应的任务信息,然后admin会根据配置对应的任务信息来进行调度到时间下发执行,同时client会上报一些任务执行的信息:任务日志,任务状态等,admin端会有一个任务执行的线程在接收这些相关信息并入库,然后去统计刷新任务执行爆表等信息,如果任务执行失败了,配置了任务的告警邮箱地址,会收到任务邮件通(这个则是由admin的监控线程来完成发邮件告警的),client端启动的是netty服务监听对应端口来和admin进行http方式交互
public ReturnT<String> run(TriggerParam triggerParam) {return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);}
admin端给client端交互的接口ExecutorBizClient类如下:
/*** admin api test** @author xuxueli 2017-07-28 22:14:52*/
public class ExecutorBizClient implements ExecutorBiz {public ExecutorBizClient() {}public ExecutorBizClient(String addressUrl, String accessToken) {this.addressUrl = addressUrl;this.accessToken = accessToken;// validif (!this.addressUrl.endsWith("/")) {this.addressUrl = this.addressUrl + "/";}}private String addressUrl ;private String accessToken;private int timeout = 3;@Overridepublic ReturnT<String> beat() {return XxlJobRemotingUtil.postBody(addressUrl+"beat", accessToken, timeout, null, String.class);}@Overridepublic ReturnT<String> idleBeat(IdleBeatParam idleBeatParam){return XxlJobRemotingUtil.postBody(addressUrl+"idleBeat", accessToken, timeout, idleBeatParam, String.class);}@Overridepublic ReturnT<String> run(TriggerParam triggerParam) {return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);}@Overridepublic ReturnT<String> kill(KillParam killParam) {return XxlJobRemotingUtil.postBody(addressUrl + "kill", accessToken, timeout, killParam, String.class);}@Overridepublic ReturnT<LogResult> log(LogParam logParam) {return XxlJobRemotingUtil.postBody(addressUrl + "log", accessToken, timeout, logParam, LogResult.class);}}
client端向admin端上报主要是由AdminBizClient类来完成:
/*** admin api test** @author xuxueli 2017-07-28 22:14:52*/
public class AdminBizClient implements AdminBiz {public AdminBizClient() {}public AdminBizClient(String addressUrl, String accessToken) {this.addressUrl = addressUrl;this.accessToken = accessToken;// validif (!this.addressUrl.endsWith("/")) {this.addressUrl = this.addressUrl + "/";}}private String addressUrl ;private String accessToken;private int timeout = 3;@Overridepublic ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);}@Overridepublic ReturnT<String> registry(RegistryParam registryParam) {return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);}@Overridepublic ReturnT<String> registryRemove(RegistryParam registryParam) {return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);}}
看一张client的core图基本上就豁然开了了:
在看一张admin的图:
6.总结
XXL-JOB的源码很早之前我就拉了看过的,本想说解析一波,后面没有这个机会,直到遇到了这个问题,索性在去探索一波,大佬写的这个开源项目的还是非常的牛逼,里面有好多的设计及源码可以学习的,标准中间件实现的必备格式代码,所以看一下源码还是可以发消新大陆的,XXL-JOB这个开源项目还是非常的好用的,可以在生产搞起来了,相当的丝滑,通过以上源码的解析在加上官方文档详细教程,我相信你对于XXL-JOB的理解和掌握不止浮于表面,而是可以深入原理和源码层面,XXL是一个开源社区,它里面不至有XXL-JOB这一个开源项目的,还有好多个开源项目的:
https://www.xuxueli.com/page/projects.html
希望我的分享对你有所启发和帮助,请一键三连,么么么哒!