延迟任务方案-DelayQueue
延迟任务方案-DelayQueue
文章内容参考天机学堂项目
1.场景介绍
视频播放时需要记忆当前播放进度,用户退出再进入该视频时,恢复之前的进度。这时我们可以设计一个延时任务。前端每20秒向后端传输当前播放进度,后端进行延迟任务,不直接将数据存入数据库。等待超过一个周期后(20s)对比传输过来的进度。如果一样再更新数据库,减少数据库压力。
2.延迟队列方案
DelayQueue | Redisson | MQ | 时间轮 | |
---|---|---|---|---|
原理 | JDK自带延迟队列,基于阻塞队列实现。 | 基于Redis数据结构模拟JDK的DelayQueue实现 | 利用MQ的特性。例如RabbitMQ的死信队列 | 时间轮算法 |
优点 | 不依赖第三方服务 | 分布式系统下可用不占用JVM内存 | 分布式系统下可以不占用JVM内存 | 不依赖第三方服务性能优异 |
缺点 | 占用JVM内存只能单机使用 | 依赖第三方服务 | 依赖第三方服务 | 只能单机使用 |
以上四种方案都可以解决问题,不过本例中我使用DelayQueue方案。因为这种方案使用成本最低,而且不依赖任何第三方服务,减少了网络交互。
但缺点也很明显,就是需要占用JVM内存,在数据量非常大的情况下可能会有问题。但考虑到任务存储时间比较短(只有20秒),因此也可以接收。
如果你们的数据量非常大,DelayQueue不能满足业务需求,大家也可以替换为其它延迟队列方式,例如Redisson、MQ等
3.DelayQueue的原理
首先来看一下DelayQueue的源码:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();// ... 略
}
可以看到DelayQueue实现了BlockingQueue接口,是一个阻塞队列。队列就是容器,用来存储东西的。DelayQueue叫做延迟队列,其中存储的就是延迟执行的任务。
我们可以看到DelayQueue的泛型定义:
DelayQueue<E extends Delayed>
这说明存入DelayQueue
内部的元素必须是Delayed
类型,这其实就是一个延迟任务的规范接口。来看一下:
public interface Delayed extends Comparable<Delayed> {/*** Returns the remaining delay associated with this object, in the* given time unit.** @param unit the time unit* @return the remaining delay; zero or negative values indicate* that the delay has already elapsed*/long getDelay(TimeUnit unit);
}
从源码中可以看出,Delayed类型必须具备两个方法:
getDelay()
:获取延迟任务的剩余延迟时间compareTo(T t)
:比较两个延迟任务的延迟时间,判断执行顺序
可见,Delayed类型的延迟任务具备两个功能:获取剩余延迟时间、比较执行顺序。当然,我们可以对Delayed做实现和功能扩展,比如添加延迟任务的数据。
将来每一次提交播放记录,就可以将播放记录保存在这样的一个Delayed
类型的延迟任务里并设定20秒的延迟时间。然后交给DelayQueue
队列。DelayQueue
会调用compareTo
方法,根据剩余延迟时间对任务排序。剩余延迟时间越短的越靠近队首,这样就会被优先执行。
4.DelayQueue的用法
首先定义一个Delayed类型的延迟任务类,要能保持任务数据。
package delayQueue;import java.time.Duration;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** @author 未若君雅裁* @version 1.0* @since 2025 8月 03*/public class DelayQueueTask<T> implements Delayed {// 数据private T data;// 截止时间private long deadlineNanos;/*** 构造函数,用于创建一个延迟任务。** @param data 任务携带的数据* @param delayTime 延迟时间,使用Duration表示*/public DelayQueueTask(T data, Duration delayTime) {this.data = data;// 计算截止时间戳(纳秒),当前时间加上延迟时间this.deadlineNanos = System.nanoTime() + delayTime.toNanos();}@Overridepublic long getDelay(TimeUnit unit) {// 返回剩余延迟时间// 剩余延迟时间 = 截止时间 - 当前时间return unit.convert(Math.max(0, deadlineNanos - System.nanoTime()), TimeUnit.NANOSECONDS);}@Overridepublic int compareTo(Delayed o) {// 比较两个延迟任务的剩余到期时间,用于在延迟队列中的排序// 当前任务的剩余延迟时间减去比较对象的剩余延迟时间long l = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);if (l > 0){// 当前任务的延迟时间更长,优先级更低,排在后面return 1;} else if (l < 0 ) {// 当前任务的延迟时间更短,优先级更高,排在前面return -1;} else {// 延迟时间相同,优先级相同return 0;}}public T getData() {return data;}public void setData(T data) {this.data = data;}public long getDeadlineNanos() {return deadlineNanos;}public void setDeadlineNanos(long deadlineNanos) {this.deadlineNanos = deadlineNanos;}}
接下来就可以创建延迟任务,交给延迟队列保存,最后,补上执行任务的代码:
package delayQueue;import java.time.Duration;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;class DelayQueueTaskTest {public static void main(String[] args) throws InterruptedException {// 1.初始化延迟队列DelayQueue<DelayQueueTask<String>> queue = new DelayQueue<>();// 2.向队列中添加延迟执行的任务System.out.println("开始初始化延迟任务。。。。");queue.add(new DelayQueueTask<>("延迟任务3", Duration.ofSeconds(3)));queue.add(new DelayQueueTask<>("延迟任务1", Duration.ofSeconds(1)));queue.add(new DelayQueueTask<>("延迟任务2", Duration.ofSeconds(2)));queue.add(new DelayQueueTask<>("延迟任务4", Duration.ofSeconds(4)));queue.add(new DelayQueueTask<>("延迟任务5", Duration.ofSeconds(5)));// 3.尝试执行任务new Thread(() -> {while (true) {DelayQueueTask<String> task = null;try {task = queue.take();} catch (InterruptedException e) {throw new RuntimeException(e);}System.err.println("开始执行延迟任务:" + task.getData());}}).start();new Thread(() -> {AtomicInteger i = new AtomicInteger(6);// 使用 ScheduledExecutorService 定时添加任务ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {queue.add(new DelayQueueTask<>("延迟任务" + i, Duration.ofSeconds(1)));System.out.println("向延迟队列中添加延迟任务:" + i);i.getAndIncrement();}, 0, 1, TimeUnit.SECONDS);}).start();}
}
5.DelayQueue 主要方法一览表
方法分类 | 方法签名 | 功能描述 | 阻塞行为 | 返回值/异常 |
---|---|---|---|---|
插入元素 | boolean add(E e) | 插入元素到队列尾部 | 不阻塞 | 成功返回 true,失败抛出异常 |
void put(E e) | 插入元素到队列 | 不阻塞 | 无返回值 | |
boolean offer(E e) | 插入元素到队列 | 不阻塞 | 成功返回 true | |
boolean offer(E e, long timeout, TimeUnit unit) | 插入元素到队列,支持超时 | 不阻塞 | 成功返回 true | |
获取并移除元素 | E take() | 获取并移除队列头部元素 | 阻塞(无到期元素时) | 返回队列头部元素 |
E poll() | 获取并移除队列头部元素 | 不阻塞 | 有到期元素返回该元素,否则返回 null | |
E poll(long timeout, TimeUnit unit) | 获取并移除队列头部元素,支持超时 | 阻塞(指定时间内无到期元素) | 有到期元素返回该元素,超时返回 null | |
查看元素(不移除) | E peek() | 查看队列头部元素但不移除 | 不阻塞 | 有到期元素返回该元素,否则返回 null |
队列信息 | int size() | 获取队列元素数量 | 不阻塞 | 返回元素数量 |
boolean isEmpty() | 判断队列是否为空 | 不阻塞 | 空队列返回 true | |
批量操作 | int drainTo(Collection<? super E> c) | 移除所有可用元素并添加到指定集合 | 不阻塞 | 返回移除元素数量 |
int drainTo(Collection<? super E> c, int maxElements) | 移除指定数量的可用元素并添加到指定集合 | 不阻塞 | 返回移除元素数量 | |
元素操作 | boolean remove(Object o) | 从队列中移除指定元素的第一个匹配项 | 不阻塞 | 存在并成功移除返回 true |
boolean contains(Object o) | 判断队列是否包含指定元素 | 不阻塞 | 包含返回 true | |
转换操作 | Object[] toArray() | 返回包含队列所有元素的数组 | 不阻塞 | 返回元素数组 |
<T> T[] toArray(T[] a) | 返回包含队列所有元素的指定类型数组 | 不阻塞 | 返回指定类型的元素数组 | |
清空操作 | void clear() | 移除队列所有元素 | 不阻塞 | 无返回值 |
特殊说明
-
延迟特性:DelayQueue 中的元素只有在延迟期满后才能被取出,未到期的元素即使使用
poll()
也会返回 null。 -
排序特性:队列中的元素按过期时间排序,过期时间最短的元素位于队列头部。
-
无界队列:DelayQueue 是无界队列,插入操作不会因队列满而阻塞。
-
线程安全:DelayQueue 是线程安全的,可以在多线程环境中安全使用。
-
元素要求:所有插入 DelayQueue 的元素都必须实现 Delayed 接口。