当前位置: 首页 > news >正文

延迟任务方案-DelayQueue

延迟任务方案-DelayQueue

文章内容参考天机学堂项目

1.场景介绍

视频播放时需要记忆当前播放进度,用户退出再进入该视频时,恢复之前的进度。这时我们可以设计一个延时任务。前端每20秒向后端传输当前播放进度,后端进行延迟任务,不直接将数据存入数据库。等待超过一个周期后(20s)对比传输过来的进度。如果一样再更新数据库,减少数据库压力。

2.延迟队列方案

DelayQueueRedissonMQ时间轮
原理JDK自带延迟队列,基于阻塞队列实现。基于Redis数据结构模拟JDK的DelayQueue实现利用MQ的特性。例如RabbitMQ的死信队列时间轮算法
优点不依赖第三方服务分布式系统下可用不占用JVM内存分布式系统下可以不占用JVM内存不依赖第三方服务性能优异
缺点占用JVM内存只能单机使用依赖第三方服务依赖第三方服务只能单机使用

以上四种方案都可以解决问题,不过本例中我使用DelayQueue方案。因为这种方案使用成本最低,而且不依赖任何第三方服务,减少了网络交互。

但缺点也很明显,就是需要占用JVM内存,在数据量非常大的情况下可能会有问题。但考虑到任务存储时间比较短(只有20秒),因此也可以接收。

如果你们的数据量非常大,DelayQueue不能满足业务需求,大家也可以替换为其它延迟队列方式,例如Redisson、MQ等

3.DelayQueue的原理

首先来看一下DelayQueue的源码:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();// ... 略
}

可以看到DelayQueue实现了BlockingQueue接口,是一个阻塞队列。队列就是容器,用来存储东西的。DelayQueue叫做延迟队列,其中存储的就是延迟执行的任务

我们可以看到DelayQueue的泛型定义:

DelayQueue<E extends Delayed>

这说明存入DelayQueue内部的元素必须是Delayed类型,这其实就是一个延迟任务的规范接口。来看一下:

public interface Delayed extends Comparable<Delayed> {/*** Returns the remaining delay associated with this object, in the* given time unit.** @param unit the time unit* @return the remaining delay; zero or negative values indicate* that the delay has already elapsed*/long getDelay(TimeUnit unit);
}

从源码中可以看出,Delayed类型必须具备两个方法:

  • getDelay():获取延迟任务的剩余延迟时间
  • compareTo(T t):比较两个延迟任务的延迟时间,判断执行顺序

可见,Delayed类型的延迟任务具备两个功能:获取剩余延迟时间、比较执行顺序。当然,我们可以对Delayed做实现和功能扩展,比如添加延迟任务的数据。

将来每一次提交播放记录,就可以将播放记录保存在这样的一个Delayed类型的延迟任务里并设定20秒的延迟时间。然后交给DelayQueue队列。DelayQueue会调用compareTo方法,根据剩余延迟时间对任务排序。剩余延迟时间越短的越靠近队首,这样就会被优先执行。

4.DelayQueue的用法

首先定义一个Delayed类型的延迟任务类,要能保持任务数据。

package delayQueue;import java.time.Duration;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** @author 未若君雅裁* @version 1.0* @since 2025  8月  03*/public class DelayQueueTask<T> implements Delayed {// 数据private T data;// 截止时间private long deadlineNanos;/*** 构造函数,用于创建一个延迟任务。** @param data      任务携带的数据* @param delayTime 延迟时间,使用Duration表示*/public DelayQueueTask(T data, Duration delayTime) {this.data = data;// 计算截止时间戳(纳秒),当前时间加上延迟时间this.deadlineNanos = System.nanoTime() + delayTime.toNanos();}@Overridepublic long getDelay(TimeUnit unit) {// 返回剩余延迟时间// 剩余延迟时间 = 截止时间 - 当前时间return unit.convert(Math.max(0, deadlineNanos - System.nanoTime()), TimeUnit.NANOSECONDS);}@Overridepublic int compareTo(Delayed o) {// 比较两个延迟任务的剩余到期时间,用于在延迟队列中的排序// 当前任务的剩余延迟时间减去比较对象的剩余延迟时间long l = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);if (l > 0){// 当前任务的延迟时间更长,优先级更低,排在后面return 1;} else if (l < 0 ) {// 当前任务的延迟时间更短,优先级更高,排在前面return -1;} else {// 延迟时间相同,优先级相同return 0;}}public T getData() {return data;}public void setData(T data) {this.data = data;}public long getDeadlineNanos() {return deadlineNanos;}public void setDeadlineNanos(long deadlineNanos) {this.deadlineNanos = deadlineNanos;}}

接下来就可以创建延迟任务,交给延迟队列保存,最后,补上执行任务的代码:

package delayQueue;import java.time.Duration;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;class DelayQueueTaskTest {public static void main(String[] args) throws InterruptedException {// 1.初始化延迟队列DelayQueue<DelayQueueTask<String>> queue = new DelayQueue<>();// 2.向队列中添加延迟执行的任务System.out.println("开始初始化延迟任务。。。。");queue.add(new DelayQueueTask<>("延迟任务3", Duration.ofSeconds(3)));queue.add(new DelayQueueTask<>("延迟任务1", Duration.ofSeconds(1)));queue.add(new DelayQueueTask<>("延迟任务2", Duration.ofSeconds(2)));queue.add(new DelayQueueTask<>("延迟任务4", Duration.ofSeconds(4)));queue.add(new DelayQueueTask<>("延迟任务5", Duration.ofSeconds(5)));// 3.尝试执行任务new Thread(() -> {while (true) {DelayQueueTask<String> task = null;try {task = queue.take();} catch (InterruptedException e) {throw new RuntimeException(e);}System.err.println("开始执行延迟任务:" + task.getData());}}).start();new Thread(() -> {AtomicInteger i = new AtomicInteger(6);// 使用 ScheduledExecutorService 定时添加任务ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {queue.add(new DelayQueueTask<>("延迟任务" + i, Duration.ofSeconds(1)));System.out.println("向延迟队列中添加延迟任务:" + i);i.getAndIncrement();}, 0, 1, TimeUnit.SECONDS);}).start();}
}

5.DelayQueue 主要方法一览表

方法分类方法签名功能描述阻塞行为返回值/异常
插入元素boolean add(E e)插入元素到队列尾部不阻塞成功返回 true,失败抛出异常
void put(E e)插入元素到队列不阻塞无返回值
boolean offer(E e)插入元素到队列不阻塞成功返回 true
boolean offer(E e, long timeout, TimeUnit unit)插入元素到队列,支持超时不阻塞成功返回 true
获取并移除元素E take()获取并移除队列头部元素阻塞(无到期元素时)返回队列头部元素
E poll()获取并移除队列头部元素不阻塞有到期元素返回该元素,否则返回 null
E poll(long timeout, TimeUnit unit)获取并移除队列头部元素,支持超时阻塞(指定时间内无到期元素)有到期元素返回该元素,超时返回 null
查看元素(不移除)E peek()查看队列头部元素但不移除不阻塞有到期元素返回该元素,否则返回 null
队列信息int size()获取队列元素数量不阻塞返回元素数量
boolean isEmpty()判断队列是否为空不阻塞空队列返回 true
批量操作int drainTo(Collection<? super E> c)移除所有可用元素并添加到指定集合不阻塞返回移除元素数量
int drainTo(Collection<? super E> c, int maxElements)移除指定数量的可用元素并添加到指定集合不阻塞返回移除元素数量
元素操作boolean remove(Object o)从队列中移除指定元素的第一个匹配项不阻塞存在并成功移除返回 true
boolean contains(Object o)判断队列是否包含指定元素不阻塞包含返回 true
转换操作Object[] toArray()返回包含队列所有元素的数组不阻塞返回元素数组
<T> T[] toArray(T[] a)返回包含队列所有元素的指定类型数组不阻塞返回指定类型的元素数组
清空操作void clear()移除队列所有元素不阻塞无返回值

特殊说明

  1. 延迟特性:DelayQueue 中的元素只有在延迟期满后才能被取出,未到期的元素即使使用 poll() 也会返回 null。

  2. 排序特性:队列中的元素按过期时间排序,过期时间最短的元素位于队列头部。

  3. 无界队列:DelayQueue 是无界队列,插入操作不会因队列满而阻塞。

  4. 线程安全:DelayQueue 是线程安全的,可以在多线程环境中安全使用。

  5. 元素要求:所有插入 DelayQueue 的元素都必须实现 Delayed 接口。

http://www.dtcms.com/a/313449.html

相关文章:

  • SpringBoot 2.x 升 3.x 避坑指南:企业级项目的实战问题与解决方案
  • Celery-分布式任务队列
  • MySQL深度理解-MySQL锁机制
  • 数据结构学习(day01)
  • 第八章:进入Redis的SET的核心
  • Android系统模块编译调试与Ninja使用指南
  • 【数据分享】各省粮食外贸依存度、粮食波动率等粮食相关数据合集(2011-2022)(获取方式看文末)
  • 【MATLAB】(六)多项式的创建与四则运算
  • python的高校奖助学金系统
  • 23 Active Directory攻击与防护策略解析
  • 编译旧版本的electron内核
  • SpringBoot之整合MyBatisPlus
  • Nvidia Orin DK 刷机CUDA TensorRT+硬盘扩容+ROS+Realsense+OpenCV+Ollama+Yolo11 一站式解决方案
  • 从“配置地狱”到“云端乐园”——Nacos 如何成为分布式微服务配置中心的“定海神针”
  • 数组和指针的关系
  • 操作系统——读者写者问题
  • KNX协议介绍
  • Nvidia Orin + RealSense D435i 与3D地图实现导航
  • Ubuntu系统VScode实现opencv(c++)视频的处理与保存
  • [硬件电路-129]:模拟电路 - 继电器的工作原理、关键指标、常用芯片与管脚定义
  • SpringAI的使用
  • Socket编程——TCP协议
  • 从一到无穷大 #51:突破阿姆达尔定律:COZ因果剖析与串行优化八法
  • Java学习第一百零一部分——网关(Gateway)
  • java测试题(ssm框架)
  • 02.Redis 安装
  • MPLS 静态LSP
  • TV电视版软件集合分享
  • 深入理解Java并发编程:原理、实战与最佳实践
  • Redis 7 中的 Set 和 Zset 使用