当前位置: 首页 > news >正文

SpringBoot集成Quzrtz实现定时任务

一 定时任务介绍

自律是很多人都想拥有的一种能力,或者说素质,但是理想往往很美好,现实却是无比残酷的。在现实生活中,我们很难做到自律,或者说做到持续自律。例如,我们经常会做各种学习计划、储蓄计划或减肥计划等,但无一例外地被各种“意外”打破。这往往使得我们非常沮丧,甚至开始怀疑人生。

但是有一个“家伙”在自律方面做得格外出色。它只要制订了计划就会严格地执行,而且无论一个任务重复多少遍都不厌其烦,简直自律到“令人发指”,它就是定时任务。

1.1 什么时候需要定时任务

哪些业务场景适合使用定时任务呢?简单概括一下就是:at sometime to do something.凡是在某一时刻需要做某件事情时,都可以考虑使用定时任务(非实时性需求)。
定时任务常见业务场景

  • 银行月底汇总账单
  • 电信公司月底结算话费
  • 订单在30分钟内未支付会自动取消(延时任务)
  • 商品详情、文章的缓存定时更新
  • 定时同步跨库的数据库表数据

1.2 java中的定时任务

1.2.1 单机环境

  • Timer:来自JDK,从JDK 1.3开始引入。JDK自带,不需要引入外部依赖,简单易用,但是功能相对单一。
  • ScheduledExecutorService:同样来自JDK,比Timer晚一些,从JDK 1.5开始引入,它的引入弥补了Timer的一些缺陷。
  • Spring Task:来自Spring,Spring环境中单机定时任务的不二之选。

1.2.2 分布式环境

  • Quartz:一个完全由 Java 编写的开源作业调度框架,分布式定时任务的基石,功能丰富且强大,既能与简单的单体应用结合,又能支撑起复杂的分布式系统。
  • ElasticJob:来自当当网,最开始是基于Quartz开发的,后来改用ZooKeeper来实现分布式协调。它具有完整的定时任务处理流程,很多国内公司都在使用(目前登记在册的有80多家),并且支持云开发。
  • XXL-JOB:来自大众点评,同样是基于Quartz开发的,后来改用自研的调度组件。它是一个轻量级的分布式任务调度平台,简单易用,很多国内公司都在使用(目前登记在册的有400多家)。
  • PowerJob:号称“全新一代分布式调度与计算框架”,采用无锁化设计,支持多种报警通知方式(如WebHook、邮件、钉钉及自定义)。它比较重量级,适合做公司公共的任务调度中间件。

二 Quartz介绍

2.1 核心概念

  • Job:是一个接口,表示一个工作,要具体执行的内容,任务的核心逻辑。该接口只有一个excute方法
  • JobDetail:对Job进一步封装,一个具体的可执行的调度程序。包含了任务的调度方案和策略。JobDetail既然是通用任务,用于接受任务,所以我们需要定义一个自己的任务类(例如叫做QuartzDetailJob),这个任务类需要实现 Job接口,这个任务类QuartzDetailJob,要执行具体的任务,具体的任务,一般都是我们写的自己的一些方法
  • Trigger:触发器,调度参数的配置,配置什么时候去调定时任务。主要用来指定Job的触发规则,分为SimpleTrigger和CronTrigger(这里使用的是常用的CronTrigger)
  • Scheduler:调度容器(调度中心,任务交给它就行了),一个调度容器中可以注册多个 JobDetail 和 Trigger。当 Trigger 与 JobDetail 组合,就可以被 Scheduler 容器调度了。。用来维护Job的生命周期(创建、删除、暂停、调度等)

2.1 SpringBoot单机版-整合Quartz代码实战

数据库表结构官网已经提供,我们可以直接访问Quartz对应的官网下载,找到对应的版本,然后将其下载!目前最新的稳定版是2.3.0,我们就下载这个版本

在这里插入图片描述
下载完成之后将其解压,在文件中搜索sql,在里面选择适合当前环境的数据库脚本文件,然后将其初始化到数据库中即可!我这里使用的是mysql,所以使用tables_mysql_innodb.sql这个脚本
在这里插入图片描述
把里边的sql语句,在mysql库里执行即可。共涉及到11张表,每张表的含义如下
在这里插入图片描述
其中,QRTZ_LOCKS 就是 Quartz 集群实现同步机制的行锁表

引入依赖

<!--定时任务-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<!--druid 数据连接池-->
<dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.17</version>
</dependency>

新建quartz.properties 配置文件(数据库信息换为自己的数据库即可)

#调度配置
#调度器实例名称
org.quartz.scheduler.instanceName=SsmScheduler
#调度器实例编号自动生成
org.quartz.scheduler.instanceId=AUTO
#是否在Quartz执行一个job前使用UserTransaction
org.quartz.scheduler.wrapJobExecutionInUserTransaction=false#线程池配置
#线程池的实现类
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
#线程池中的线程数量
org.quartz.threadPool.threadCount=10
#线程优先级
org.quartz.threadPool.threadPriority=5
#配置是否启动自动加载数据库内的定时任务,默认true
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true
#是否设置为守护线程,设置后任务将不会执行
#org.quartz.threadPool.makeThreadsDaemons=true#持久化方式配置
#JobDataMaps是否都为String类型
org.quartz.jobStore.useProperties=true
#数据表的前缀,默认QRTZ_
org.quartz.jobStore.tablePrefix=QRTZ_
#最大能忍受的触发超时时间
org.quartz.jobStore.misfireThreshold=60000
#是否以集群方式运行
org.quartz.jobStore.isClustered=true
#调度实例失效的检查时间间隔,单位毫秒
org.quartz.jobStore.clusterCheckinInterval=2000
#数据保存方式为数据库持久化
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
#数据库代理类,一般org.quartz.impl.jdbcjobstore.StdJDBCDelegate可以满足大部分数据库
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#数据库别名 随便取
org.quartz.jobStore.dataSource=qzDS#数据库连接池,将其设置为druid
org.quartz.dataSource.qzDS.connectionProvider.class=com.ts.hjbz.quartz.DruidConnectionProvider
#数据库引擎
org.quartz.dataSource.qzDS.driver=com.mysql.jdbc.Driver
#数据库连接
org.quartz.dataSource.qzDS.URL=jdbc:mysql://192.168.119.128:3306/hjbz?serverTimezone=GMT%2B8&characterEncoding=utf-8
#数据库用户
org.quartz.dataSource.qzDS.user=root
#数据库密码
org.quartz.dataSource.qzDS.password=123456
#允许最大连接
org.quartz.dataSource.qzDS.maxConnection=5
#验证查询sql,可以不设置
org.quartz.dataSource.qzDS.validationQuery=select 0 from dual

注册 Quartz 任务工厂

import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.scheduling.quartz.AdaptableJobFactory;
import org.springframework.stereotype.Component;/*** @Author:sgw* @Date:2023/9/1* @Description: 注册 Quartz 任务工厂*/
@Component
public class QuartzJobFactory extends AdaptableJobFactory {@Autowiredprivate AutowireCapableBeanFactory capableBeanFactory;@Overrideprotected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {//调用父类的方法Object jobInstance = super.createJobInstance(bundle);//进行注入capableBeanFactory.autowireBean(jobInstance);return jobInstance;}
}

注册调度工厂

import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;import java.io.IOException;/*** @Author:sgw* @Date:2023/9/1* @Description:注册调度工厂*/
@Configuration
public class QuartzConfig {@Autowiredprivate QuartzJobFactory jobFactory;@Beanpublic SchedulerFactoryBean schedulerFactoryBean() throws IOException {//获取配置属性PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));//在quartz.properties中的属性被读取并注入后再初始化对象propertiesFactoryBean.afterPropertiesSet();//创建SchedulerFactoryBeanSchedulerFactoryBean factory = new SchedulerFactoryBean();factory.setQuartzProperties(propertiesFactoryBean.getObject());factory.setJobFactory(jobFactory);//支持在JOB实例中注入其他的业务对象factory.setApplicationContextSchedulerContextKey("applicationContextKey");factory.setWaitForJobsToCompleteOnShutdown(true);//这样当spring关闭时,会等待所有已经启动的quartz job结束后spring才能完全shutdown。factory.setOverwriteExistingJobs(false);//是否覆盖己存在的Jobfactory.setStartupDelay(10);//QuartzScheduler 延时启动,应用启动完后 QuartzScheduler 再启动return factory;}/*** 通过SchedulerFactoryBean获取Scheduler的实例* @return* @throws IOException* @throws SchedulerException*/@Bean(name = "scheduler")public Scheduler scheduler() throws IOException, SchedulerException {Scheduler scheduler = schedulerFactoryBean().getScheduler();return scheduler;}
}

重新设置 Quartz 数据连接池
默认 Quartz 的数据连接池是 c3p0,由于性能不太稳定,不推荐使用,因此我们将其改成driud数据连接池,配置如下:

import com.alibaba.druid.pool.DruidDataSource;
import org.quartz.SchedulerException;
import org.quartz.utils.ConnectionProvider;import java.sql.Connection;
import java.sql.SQLException;/*** @Author:sgw* @Date:2023/9/1* @Description: 重新设置 Quartz 数据连接池。默认 Quartz 的数据连接池是 c3p0,由于性能不太稳定,不推荐使用,因此我们将其改成driud数据连接池*/
public class DruidConnectionProvider implements ConnectionProvider {/*** 常量配置,与quartz.properties文件的key保持一致(去掉前缀),同时提供set方法,Quartz框架自动注入值。* @return* @throws SQLException*///JDBC驱动public String driver;//JDBC连接串public String URL;//数据库用户名public String user;//数据库用户密码public String password;//数据库最大连接数public int maxConnection;//数据库SQL查询每次连接返回执行到连接池,以确保它仍然是有效的。public String validationQuery;private boolean validateOnCheckout;private int idleConnectionValidationSeconds;public String maxCachedStatementsPerConnection;private String discardIdleConnectionsSeconds;public static final int DEFAULT_DB_MAX_CONNECTIONS = 10;public static final int DEFAULT_DB_MAX_CACHED_STATEMENTS_PER_CONNECTION = 120;//Druid连接池private DruidDataSource datasource;@Overridepublic Connection getConnection() throws SQLException {return datasource.getConnection();}@Overridepublic void shutdown() throws SQLException {datasource.close();}@Overridepublic void initialize() throws SQLException {if (this.URL == null) {throw new SQLException("DBPool could not be created: DB URL cannot be null");}if (this.driver == null) {throw new SQLException("DBPool driver could not be created: DB driver class name cannot be null!");}if (this.maxConnection < 0) {throw new SQLException("DBPool maxConnectins could not be created: Max connections must be greater than zero!");}datasource = new DruidDataSource();try{datasource.setDriverClassName(this.driver);} catch (Exception e) {try {throw new SchedulerException("Problem setting driver class name on datasource: " + e.getMessage(), e);} catch (SchedulerException e1) {}}datasource.setUrl(this.URL);datasource.setUsername(this.user);datasource.setPassword(this.password);datasource.setMaxActive(this.maxConnection);datasource.setMinIdle(1);datasource.setMaxWait(0);datasource.setMaxPoolPreparedStatementPerConnectionSize(DEFAULT_DB_MAX_CONNECTIONS);if (this.validationQuery != null) {datasource.setValidationQuery(this.validationQuery);if(!this.validateOnCheckout)datasource.setTestOnReturn(true);elsedatasource.setTestOnBorrow(true);datasource.setValidationQueryTimeout(this.idleConnectionValidationSeconds);}}public String getDriver() {return driver;}public void setDriver(String driver) {this.driver = driver;}public String getURL() {return URL;}public void setURL(String URL) {this.URL = URL;}public String getUser() {return user;}public void setUser(String user) {this.user = user;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public int getMaxConnection() {return maxConnection;}public void setMaxConnection(int maxConnection) {this.maxConnection = maxConnection;}public String getValidationQuery() {return validationQuery;}public void setValidationQuery(String validationQuery) {this.validationQuery = validationQuery;}public boolean isValidateOnCheckout() {return validateOnCheckout;}public void setValidateOnCheckout(boolean validateOnCheckout) {this.validateOnCheckout = validateOnCheckout;}public int getIdleConnectionValidationSeconds() {return idleConnectionValidationSeconds;}public void setIdleConnectionValidationSeconds(int idleConnectionValidationSeconds) {this.idleConnectionValidationSeconds = idleConnectionValidationSeconds;}public DruidDataSource getDatasource() {return datasource;}public void setDatasource(DruidDataSource datasource) {this.datasource = datasource;}public String getDiscardIdleConnectionsSeconds() {return discardIdleConnectionsSeconds;}public void setDiscardIdleConnectionsSeconds(String discardIdleConnectionsSeconds) {this.discardIdleConnectionsSeconds = discardIdleConnectionsSeconds;}
}

创建完成之后,还需要在quartz.properties配置文件中设置一下即可!

#数据库连接池,将其设置为druid
org.quartz.dataSource.qzDS.connectionProvider.class=com.ts.hjbz.quartz.DruidConnectionProvider

编写 Job 具体任务类(不同的任务,需要定义不同的job类)

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.text.SimpleDateFormat;
import java.util.Date;/*** @Author:sgw* @Date:2023/9/1* @Description: 具体要执行的的job*/
public class TfCommandJob implements Job {private static final Logger log = LoggerFactory.getLogger(TfCommandJob.class);@Overridepublic void execute(JobExecutionContext context) {try {System.out.println("开始执行:"+context.getScheduler().getSchedulerInstanceId() + "--" + new SimpleDateFormat("YYYY-MM-dd HH:mm:ss").format(new Date()));} catch (SchedulerException e) {log.error("任务执行失败",e);}}
}

编写 Quartz 服务层接口

import java.util.Map;public interface QuartzJobService {/*** 添加任务可以传参数* @param clazzName* @param jobName* @param groupName* @param cronExp* @param param*/void addJob(String clazzName, String jobName, String groupName, String cronExp, Map<String, Object> param);/*** 暂停任务* @param jobName* @param groupName*/void pauseJob(String jobName, String groupName);/*** 恢复任务* @param jobName* @param groupName*/void resumeJob(String jobName, String groupName);/*** 立即运行一次定时任务* @param jobName* @param groupName*/void runOnce(String jobName, String groupName);/*** 更新任务* @param jobName* @param groupName* @param cronExp* @param param*/void updateJob(String jobName, String groupName, String cronExp, Map<String, Object> param);/*** 删除任务* @param jobName* @param groupName*/void deleteJob(String jobName, String groupName);/*** 启动所有任务*/void startAllJobs();/*** 暂停所有任务*/void pauseAllJobs();/*** 恢复所有任务*/void resumeAllJobs();/*** 关闭所有任务*/void shutdownAllJobs();
}

对应的实现类QuartzJobServiceImpl如下:

import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Map;/*** @Author:sgw* @Date:2023/9/1* @Description:*/
@Service
public class QuartzJobServiceImpl implements QuartzJobService {private static final Logger log = LoggerFactory.getLogger(QuartzJobServiceImpl.class);@Autowiredprivate Scheduler scheduler;@Overridepublic void addJob(String clazzName, String jobName, String groupName, String cronExp, Map<String, Object> param) {try {// 启动调度器,默认初始化的时候已经启动
//            scheduler.start();//构建job信息Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(clazzName);JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, groupName).build();//表达式调度构建器(即任务执行的时间)CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExp);//按新的cronExpression表达式构建一个新的triggerCronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, groupName).withSchedule(scheduleBuilder).build();//获得JobDataMap,写入数据if (param != null) {trigger.getJobDataMap().putAll(param);}scheduler.scheduleJob(jobDetail, trigger);} catch (Exception e) {log.error("创建任务失败", e);}}@Overridepublic void pauseJob(String jobName, String groupName) {try {scheduler.pauseJob(JobKey.jobKey(jobName, groupName));} catch (SchedulerException e) {log.error("暂停任务失败", e);}}@Overridepublic void resumeJob(String jobName, String groupName) {try {scheduler.resumeJob(JobKey.jobKey(jobName, groupName));} catch (SchedulerException e) {log.error("恢复任务失败", e);}}@Overridepublic void runOnce(String jobName, String groupName) {try {scheduler.triggerJob(JobKey.jobKey(jobName, groupName));} catch (SchedulerException e) {log.error("立即运行一次定时任务失败", e);}}@Overridepublic void updateJob(String jobName, String groupName, String cronExp, Map<String, Object> param) {try {TriggerKey triggerKey = TriggerKey.triggerKey(jobName, groupName);CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);if (cronExp != null) {// 表达式调度构建器CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExp);// 按新的cronExpression表达式重新构建triggertrigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();}//修改mapif (param != null) {trigger.getJobDataMap().putAll(param);}// 按新的trigger重新设置job执行scheduler.rescheduleJob(triggerKey, trigger);} catch (Exception e) {log.error("更新任务失败", e);}}@Overridepublic void deleteJob(String jobName, String groupName) {try {//暂停、移除、删除scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, groupName));scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, groupName));scheduler.deleteJob(JobKey.jobKey(jobName, groupName));} catch (Exception e) {log.error("删除任务失败", e);}}@Overridepublic void startAllJobs() {try {scheduler.start();} catch (Exception e) {log.error("开启所有的任务失败", e);}}@Overridepublic void pauseAllJobs() {try {scheduler.pauseAll();} catch (Exception e) {log.error("暂停所有任务失败", e);}}@Overridepublic void resumeAllJobs() {try {scheduler.resumeAll();} catch (Exception e) {log.error("恢复所有任务失败", e);}}@Overridepublic void shutdownAllJobs() {try {if (!scheduler.isShutdown()) {// 需谨慎操作关闭scheduler容器// scheduler生命周期结束,无法再 start() 启动schedulerscheduler.shutdown(true);}} catch (Exception e) {log.error("关闭所有的任务失败", e);}}
}

创建一个请求参数实体类

import lombok.Data;import java.io.Serializable;
import java.util.Map;/*** @Author:sgw* @Date:2023/9/1* @Description:*/
@Data
public class QuartzConfigDTO implements Serializable {private static final long serialVersionUID = 1L;/*** 任务名称*/private String jobName;/*** 任务所属组*/private String groupName;/*** 任务执行类*/private String jobClass;/*** 任务调度时间表达式*/private String cronExpression;/*** 附加参数*/private Map<String, Object> param;
}

编写 contoller 服务

import com.ts.hjbz.quartz.QuartzConfigDTO;
import com.ts.hjbz.quartz.QuartzJobService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
/*** @Author:sgw* @Date:2023/9/1* @Description: 定时任务入口类*/
@RestController
@RequestMapping("/job")
public class QuartzController {private static final Logger log = LoggerFactory.getLogger(QuartzController.class);@Autowiredprivate QuartzJobService quartzJobService;/*** 添加新任务* @param configDTO* @return*/@RequestMapping("/addJob")public Object addJob(@RequestBody QuartzConfigDTO configDTO) {quartzJobService.addJob(configDTO.getJobClass(), configDTO.getJobName(), configDTO.getGroupName(), configDTO.getCronExpression(), configDTO.getParam());return HttpStatus.OK;}/*** 暂停任务* @param configDTO* @return*/@RequestMapping("/pauseJob")public Object pauseJob(@RequestBody QuartzConfigDTO configDTO) {quartzJobService.pauseJob(configDTO.getJobName(), configDTO.getGroupName());return HttpStatus.OK;}/*** 恢复任务* @param configDTO* @return*/@RequestMapping("/resumeJob")public Object resumeJob(@RequestBody QuartzConfigDTO configDTO) {quartzJobService.resumeJob(configDTO.getJobName(), configDTO.getGroupName());return HttpStatus.OK;}/*** 立即运行一次定时任务* @param configDTO* @return*/@RequestMapping("/runOnce")public Object runOnce(@RequestBody QuartzConfigDTO configDTO) {quartzJobService.runOnce(configDTO.getJobName(), configDTO.getGroupName());return HttpStatus.OK;}/*** 更新任务* @param configDTO* @return*/@RequestMapping("/updateJob")public Object updateJob(@RequestBody QuartzConfigDTO configDTO) {quartzJobService.updateJob(configDTO.getJobName(), configDTO.getGroupName(), configDTO.getCronExpression(), configDTO.getParam());return HttpStatus.OK;}/*** 删除任务* @param configDTO* @return*/@RequestMapping("/deleteJob")public Object deleteJob(@RequestBody QuartzConfigDTO configDTO) {quartzJobService.deleteJob(configDTO.getJobName(), configDTO.getGroupName());return HttpStatus.OK;}/*** 启动所有任务* @return*/@RequestMapping("/startAllJobs")public Object startAllJobs() {quartzJobService.startAllJobs();return HttpStatus.OK;}/*** 暂停所有任务* @return*/@RequestMapping("/pauseAllJobs")public Object pauseAllJobs() {quartzJobService.pauseAllJobs();return HttpStatus.OK;}/*** 恢复所有任务* @return*/@RequestMapping("/resumeAllJobs")public Object resumeAllJobs() {quartzJobService.resumeAllJobs();return HttpStatus.OK;}/*** 关闭所有任务* @return*/@RequestMapping("/shutdownAllJobs")public Object shutdownAllJobs() {quartzJobService.shutdownAllJobs();return HttpStatus.OK;}
}

使用postman进行测试,新增一个定时任务,每隔五秒执行一次
在这里插入图片描述
具体参数如下

{"jobName":"测试任务1","groupName":"组1","cronExpression":"0/5 * * * * ? ","jobClass":"com.ts.hjbz.quartz.TfCommandJob","param":{"hello":"hello啊"}
}

其中cronExpression,直接上网查询在线cron表达式转换即可,如https://www.bejson.com/othertools/cron/
在这里插入图片描述
上图是配置每5秒执行一次,生成的cron表达式就是我们postman里需要的cronExpression参数。执行postman的调用后,可以看到控制台每隔5秒打印一次,如下

在这里插入图片描述

并且服务重启后,这个定时任务依然存在,依然会每隔5秒执行一次。

注册监听器(选用)

如果你想在 SpringBoot 里面集成 Quartz 的监听器,操作也很简单

创建任务调度监听器

@Component
public class SimpleSchedulerListener extends SchedulerListenerSupport {@Overridepublic void jobScheduled(Trigger trigger) {System.out.println("任务被部署时被执行");}@Overridepublic void jobUnscheduled(TriggerKey triggerKey) {System.out.println("任务被卸载时被执行");}@Overridepublic void triggerFinalized(Trigger trigger) {System.out.println("任务完成了它的使命,光荣退休时被执行");}@Overridepublic void triggerPaused(TriggerKey triggerKey) {System.out.println(triggerKey + "(一个触发器)被暂停时被执行");}@Overridepublic void triggersPaused(String triggerGroup) {System.out.println(triggerGroup + "所在组的全部触发器被停止时被执行");}@Overridepublic void triggerResumed(TriggerKey triggerKey) {System.out.println(triggerKey + "(一个触发器)被恢复时被执行");}@Overridepublic void triggersResumed(String triggerGroup) {System.out.println(triggerGroup + "所在组的全部触发器被回复时被执行");}@Overridepublic void jobAdded(JobDetail jobDetail) {System.out.println("一个JobDetail被动态添加进来");}@Overridepublic void jobDeleted(JobKey jobKey) {System.out.println(jobKey + "被删除时被执行");}@Overridepublic void jobPaused(JobKey jobKey) {System.out.println(jobKey + "被暂停时被执行");}@Overridepublic void jobsPaused(String jobGroup) {System.out.println(jobGroup + "(一组任务)被暂停时被执行");}@Overridepublic void jobResumed(JobKey jobKey) {System.out.println(jobKey + "被恢复时被执行");}@Overridepublic void jobsResumed(String jobGroup) {System.out.println(jobGroup + "(一组任务)被恢复时被执行");}@Overridepublic void schedulerError(String msg, SchedulerException cause) {System.out.println("出现异常" + msg + "时被执行");cause.printStackTrace();}@Overridepublic void schedulerInStandbyMode() {System.out.println("scheduler被设为standBy等候模式时被执行");}@Overridepublic void schedulerStarted() {System.out.println("scheduler启动时被执行");}@Overridepublic void schedulerStarting() {System.out.println("scheduler正在启动时被执行");}@Overridepublic void schedulerShutdown() {System.out.println("scheduler关闭时被执行");}@Overridepublic void schedulerShuttingdown() {System.out.println("scheduler正在关闭时被执行");}@Overridepublic void schedulingDataCleared() {System.out.println("scheduler中所有数据包括jobs, triggers和calendars都被清空时被执行");}
}

创建任务触发监听器

@Component
public class SimpleTriggerListener extends TriggerListenerSupport {/*** Trigger监听器的名称* @return*/@Overridepublic String getName() {return "mySimpleTriggerListener";}/*** Trigger被激发 它关联的job即将被运行* @param trigger* @param context*/@Overridepublic void triggerFired(Trigger trigger, JobExecutionContext context) {System.out.println("myTriggerListener.triggerFired()");}/*** Trigger被激发 它关联的job即将被运行, TriggerListener 给了一个选择去否决 Job 的执行,如果返回TRUE 那么任务job会被终止* @param trigger* @param context* @return*/@Overridepublic boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {System.out.println("myTriggerListener.vetoJobExecution()");return false;}/*** 当Trigger错过被激发时执行,比如当前时间有很多触发器都需要执行,但是线程池中的有效线程都在工作,* 那么有的触发器就有可能超时,错过这一轮的触发。* @param trigger*/@Overridepublic void triggerMisfired(Trigger trigger) {System.out.println("myTriggerListener.triggerMisfired()");}/*** 任务完成时触发* @param trigger* @param context* @param triggerInstructionCode*/@Overridepublic void triggerComplete(Trigger trigger, JobExecutionContext context, Trigger.CompletedExecutionInstruction triggerInstructionCode) {System.out.println("myTriggerListener.triggerComplete()");}
}

创建任务执行监听器

@Component
public class SimpleJobListener extends JobListenerSupport {/*** job监听器名称* @return*/@Overridepublic String getName() {return "mySimpleJobListener";}/*** 任务被调度前* @param context*/@Overridepublic void jobToBeExecuted(JobExecutionContext context) {System.out.println("simpleJobListener监听器,准备执行:"+context.getJobDetail().getKey());}/*** 任务调度被拒了* @param context*/@Overridepublic void jobExecutionVetoed(JobExecutionContext context) {System.out.println("simpleJobListener监听器,取消执行:"+context.getJobDetail().getKey());}/*** 任务被调度后* @param context* @param jobException*/@Overridepublic void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {System.out.println("simpleJobListener监听器,执行结束:"+context.getJobDetail().getKey());}
}

最后,在QuartzConfig中将监听器注册到Scheduler

@Autowired
private SimpleSchedulerListener simpleSchedulerListener;@Autowired
private SimpleJobListener simpleJobListener;@Autowired
private SimpleTriggerListener simpleTriggerListener;@Bean(name = "scheduler")
public Scheduler scheduler() throws IOException, SchedulerException {Scheduler scheduler = schedulerFactoryBean().getScheduler();//全局添加监听器//添加SchedulerListener监听器scheduler.getListenerManager().addSchedulerListener(simpleSchedulerListener);// 添加JobListener, 支持带条件匹配监听器scheduler.getListenerManager().addJobListener(simpleJobListener, KeyMatcher.keyEquals(JobKey.jobKey("myJob", "myGroup")));// 添加triggerListener,设置全局监听scheduler.getListenerManager().addTriggerListener(simpleTriggerListener, EverythingMatcher.allTriggers());return scheduler;
}

采用项目数据源(选用)
在上面的 Quartz 数据源配置中,我们使用了自定义的数据源,目的是和项目中的数据源实现解耦,当然有的同学不想单独建库,想和项目中数据源保持一致,配置也很简单!
quartz.properties配置文件中,去掉org.quartz.jobStore.dataSource配置,即

#注释掉quartz的数据源配置
#org.quartz.jobStore.dataSource=qzDS

在QuartzConfig配置类中加入dataSource数据源,并将其注入到quartz中

@Autowired
private DataSource dataSource;@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {//...SchedulerFactoryBean factory = new SchedulerFactoryBean();factory.setQuartzProperties(propertiesFactoryBean.getObject());//使用数据源,自定义数据源factory.setDataSource(dataSource);//...return factory;
}

2.2 SpringBoot集群版-整合Quartz代码实战

Quartz 提供了极为广用的特性,如任务持久化、集群部署和分布式调度任务等等,正因如此,基于 Quartz 任务调度功能在系统开发中应用极为广泛!

在集群环境下,Quartz 集群中的每个节点是一个独立的 Quartz 应用,没有负责集中管理的节点,而是通过数据库表来感知另一个应用,利用数据库锁的方式来实现集群环境下进行并发控制,每个任务当前运行的有效节点有且只有一个!

特别需要注意的是:分布式部署时需要保证各个节点的系统时间一致!

在实际的部署中,项目都是集群进行部署,因此为了和正式环境一致,我们再新建两个相同的项目来测试一下在集群环境下 quartz 是否可以实现分布式调度,保证任何一个定时任务只有一台机器在运行?理论上,我们只需要将刚刚新建好的项目,重新复制一份,然后修改一下端口号就可以实现本地测试!

因为curd服务只需要一个,因此我们在新的服务里,不需要再编写QuartzJobService等增、删、改服务,仅仅保持QuartzConfig、DruidConnectionProvider、QuartzJobFactory、TfCommandJob、quartz.properties类和配置都是相同的就可以了!

依次启动服务quartz-001、quartz-002、quartz-003,看看效果如何

第一个启动的服务quartz-001会优先加载数据库中已经配置好的定时任务,其他两个服务quartz-002、quartz-003都没有主动调度服务;

当我们主动关闭quartz-001时,quartz-002服务主动接收任务调度

当我们主动关闭quartz-002,同样quartz-003服务主动接收任务调度

http://www.dtcms.com/a/304603.html

相关文章:

  • 【目标检测】小样本度量学习
  • 记录一个TI DSP编译器的Bug
  • CentOS安装ffmpeg并转码视频为mp4
  • 预过滤环境光贴图制作教程:第四阶段 - Lambert 无权重预过滤(Stage 3)
  • 预过滤环境光贴图制作教程:第一步 - HDR 转立方体贴图
  • Android Compose 自定义组件完全指南
  • 对College数据进行多模型预测(R语言)
  • 《React与Vue构建TODO应用的深层逻辑》
  • 【lucene】SegmentCoreReaders
  • linux_前台,后台进程
  • LeetCode热题100——155. 最小栈
  • (LeetCode 面试经典 150 题) 150. 逆波兰表达式求值 (栈)
  • 电脑主机显示的python版本是3.8.6,但是我在控制面板没有找到,想删除不知道怎么操作怎么办
  • 《 java 随想录》| LeetCode链表高频考题
  • 【LeetCode】大厂面试算法真题回忆(111)--身高排序
  • 鱼皮项目简易版 RPC 框架开发(五)
  • 2.oracle保姆级安装教程
  • 逐渐走进Ai世界~
  • Django模型开发:模型字段、元数据与继承全方位讲解
  • Unity_SRP Batcher
  • 【WRF-Chem 实例1】namelist.input 详解- 模拟CO2
  • 基于AI代码疫苗技术的开源软件供应链安全治理
  • C# _列表(List<T>)_ 字典(Dictionary<TKey, TValue>)
  • 【dropdown组件填坑指南】—怎么实现下拉框的位置计算
  • 【机器学习深度学习】为什么需要分布式训练?
  • 从硬编码到自主智能体:营销AI的20年技术演进与未来展望
  • 前端开发为什么没有后端开发那么清除业务
  • sqLite 数据库 (2):
  • 摔倒识别误报率↓79%:陌讯动态时序融合算法实战解析
  • System V IPC机制:进程通信的统一设计