单体架构实现延时任务
一、编写延时任务类
这里使用的是单例模式。
@Slf4j
public class ScheduledThreadPool {/*** 单例实例*/private static volatile ScheduledThreadPool instance;private final ScheduledExecutorService scheduledThreadPool;/*** 私有构造函数,防止外部实例化* @param poolSize 线程池大小*/private ScheduledThreadPool(int poolSize) {this.scheduledThreadPool = Executors.newScheduledThreadPool(poolSize);}/*** 获取单例实例* @param poolSize 线程池大小* @return ScheduledThreadPool 实例*/public static ScheduledThreadPool getInstance(int poolSize) {if (instance == null) {synchronized (ScheduledThreadPool.class) {if (instance == null) {instance = new ScheduledThreadPool(poolSize);}}}return instance;}/*** 提交任务到定时线程池,延迟指定时间后执行* @param command 要执行的任务* @param delay 延迟时间* @param timeUnit 延迟时间的单位*/public void execute(Runnable command, long delay, TimeUnit timeUnit) {try {if (scheduledThreadPool.isShutdown()) {log.error("线程池已关闭,无法提交任务");return;}// 将任务提交到定时线程池中,延迟指定时间后执行scheduledThreadPool.schedule(command, delay, timeUnit);log.debug("任务已提交,将在 {} {} 后执行", delay, timeUnit);} catch (RejectedExecutionException ex) {// 如果任务提交失败,线程池已关闭或饱和,则记录错误信息log.error("任务提交被拒绝: {}", ex.getMessage(), ex);} catch (Exception ex) {// 处理其他异常log.error("提交任务时发生未知异常: {}", ex.getMessage(), ex);}}/*** 关闭线程池*/public void shutdown() {try {scheduledThreadPool.shutdown();if (!scheduledThreadPool.awaitTermination(60, TimeUnit.SECONDS)) {scheduledThreadPool.shutdownNow();if (!scheduledThreadPool.awaitTermination(60, TimeUnit.SECONDS)) {log.error("线程池未能正常关闭");}}} catch (InterruptedException ex) {scheduledThreadPool.shutdownNow();Thread.currentThread().interrupt();}}}
二、使用Demo
这里直接使用了Controller接口实现了延时任务的Demo
@RestController
@ApiOperation("延迟任务Demo")
@RequestMapping("/delayTask")
@Slf4j
public class DelayTaskController {@PostMapping("/demo")public void delayTaskDemo(Long delayTime) {// 得到延时任务类对象ScheduledThreadPool threadPool = ScheduledThreadPool.getInstance(3);// 匿名内部类实现要延时执行的逻辑Runnable task = () -> System.out.println("任务执行");threadPool.execute(task, delayTime, TimeUnit.SECONDS);// 在 JVM 关闭时,自动调用线程池的 shutdown 方法,确保线程池能够正确关闭,避免资源泄漏,与延迟线程是否执行完毕无关Runtime.getRuntime().addShutdownHook(new Thread(threadPool::shutdown));}
}
三、答疑解惑
1、volatile修饰的作用
- 保证可见性 :在多线程环境下,每个线程都有自己的工作内存,变量的值会被缓存到工作内存中。当一个线程修改了变量的值,其他线程可能无法立即看到最新的值。使用 volatile 关键字修饰的变量,当一个线程修改了它的值,会立即将新值刷新到主内存中,其他线程在读取该变量时,会直接从主内存中获取最新的值,从而保证了变量在多线程之间的可见性。
- 禁止指令重排序 :编译器和处理器为了提高性能,可能会对代码的执行顺序进行优化,即指令重排序。在单例模式中,如果没有使用 volatile 关键字,在对象实例化的过程中,可能会出现指令重排序的情况,导致其他线程获取到一个未完全初始化的对象。使用 volatile 关键字可以禁止指令重排序,确保对象在实例化完成后才会被其他线程访问。
在代码中, private static volatile ScheduledThreadPool instance; 使用 volatile 关键字保证了 instance 变量在多线程环境下的可见性,并且避免了指令重排序带来的问题。
2、getInstance方法的设计
- 双重检查 :两次instance == null判断确保只有一个线程会创建实例。
- 不会返回null :由于 instance 变量使用了 volatile 关键字修饰,保证了对象在实例化完成后才会被其他线程访问,避免了返回未完全初始化的对象。
3、shutdown方法的设计
- scheduledThreadPool.shutdown() :调用 shutdown() 方法会平缓地关闭线程池,线程池不再接受新的任务,但会继续执行已经提交的任务。
- scheduledThreadPool.awaitTermination(60, TimeUnit.SECONDS) :调用 awaitTermination() 方法会阻塞当前线程,等待线程池中的任务执行完毕,最多等待 60 秒。如果在 60 秒内所有任务都执行完毕,方法返回 true ;否则返回 false 。
- scheduledThreadPool.shutdownNow() :如果在 60 秒内线程池中的任务没有执行完毕,调用 shutdownNow() 方法会尝试强制关闭线程池,它会尝试中断正在执行的任务,并返回等待执行的任务列表。
- 再次调用 awaitTermination(60, TimeUnit.SECONDS) :再次调用 awaitTermination() 方法,等待最多 60 秒,确保线程池中的任务已经全部停止。如果仍然没有关闭,记录错误日志。
- 处理 InterruptedException :如果在等待线程池关闭的过程中,当前线程被中断,会捕获 InterruptedException 异常,调用 shutdownNow() 方法强制关闭线程池,并重新设置中断标志位,以便上层代码可以处理中断。
4、钩子函数关闭线程池
Runtime.getRuntime().addShutdownHook(new Thread(threadPool::shutdown));这段逻辑会向JVM注册一个关闭钩子函数。整行代码的作用是在 JVM 关闭时,自动启动一个线程来调用 threadPool 的 shutdown 方法,确保线程池能够在程序退出前安全关闭。这样可以避免因为程序突然退出而导致线程池中的任务没有正常完成,或者资源没有正确释放的问题。与延迟线程是否执行完毕无关。