使用DelayQueue 分布式延时队列,干掉定时任务!

DelayQueue 介绍
概述
DelayQueue 是 Java 并发包 (java.util.concurrent) 中的一个无界阻塞队列,用于存放实现了 Delayed 接口的元素。只有当元素的延迟时间到期时,才能从队列中取出该元素。
核心特性
- 基本特点
无界队列:理论上可以无限添加元素
线程安全:内部使用 ReentrantLock 保证线程安全
阻塞操作:当队列为空或头部元素未到期时,获取操作会阻塞
元素排序:按到期时间排序,最早到期的元素在队列头部
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** 延时消息实体*/
public class DelayMessage<T> implements Delayed {private final T data; // 消息数据private final long expireTime; // 过期时间戳public DelayMessage(T data, long delay, TimeUnit unit) {this.data = data;this.expireTime = System.currentTimeMillis() + unit.toMillis(delay);}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {return Long.compare(this.expireTime, ((DelayMessage<?>) o).expireTime);}public T getData() {return data;}public long getExpireTime() {return expireTime;}
}
- 分布式延时队列管理器
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;/*** 分布式延时队列管理器*/
public class DistributedDelayQueue<T> {private final DelayQueue<DelayMessage<T>> delayQueue;private final Consumer<T> messageHandler;private final String queueName;private volatile boolean running;public DistributedDelayQueue(String queueName, Consumer<T> messageHandler) {this.delayQueue = new DelayQueue<>();this.messageHandler = messageHandler;this.queueName = queueName;this.running = false;}/*** 添加延时消息*/public void put(T data, long delay, TimeUnit unit) {DelayMessage<T> message = new DelayMessage<>(data, delay, unit);delayQueue.put(message);System.out.println("添加延时消息: " + data + ", 延时: " + delay + " " + unit);// 这里可以添加Redis持久化逻辑persistToRedis(message);}/*** 启动消费线程*/public void start() {if (running) {return;}running = true;// 启动消费线程Executors.newSingleThreadExecutor().submit(() -> {System.out.println("延时队列 [" + queueName + "] 启动成功");while (running) {try {DelayMessage<T> message = delayQueue.take();T data = message.getData();// 处理消息messageHandler.accept(data);// 从Redis中删除已处理的消息removeFromRedis(message);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;} catch (Exception e) {e.printStackTrace();}}});// 启动Redis恢复线程(分布式环境下使用)startRedisRecovery();}/*** 停止队列*/public void stop() {running = false;System.out.println("延时队列 [" + queueName + "] 已停止");}/*** 持久化到Redis(分布式支持)*/private void persistToRedis(DelayMessage<T> message) {try {// 这里使用Redis的ZSET结构存储,score为过期时间戳String redisKey = "delay_queue:" + queueName;// RedisTemplate.zadd(redisKey, message.getExpireTime(), serialize(message));System.out.println("消息已持久化到Redis: " + message.getData());} catch (Exception e) {e.printStackTrace();}}/*** 从Redis删除消息*/private void removeFromRedis(DelayMessage<T> message) {try {String redisKey = "delay_queue:" + queueName;// RedisTemplate.zrem(redisKey, serialize(message));} catch (Exception e) {e.printStackTrace();}}/*** 从Redis恢复消息(应用重启时)*/private void startRedisRecovery() {Executors.newSingleThreadExecutor().submit(() -> {while (running) {try {recoverFromRedis();TimeUnit.SECONDS.sleep(30); // 30秒恢复一次} catch (InterruptedException e) {Thread.currentThread().interrupt();break;} catch (Exception e) {e.printStackTrace();}}});}/*** 从Redis恢复未处理的消息*/private void recoverFromRedis() {try {String redisKey = "delay_queue:" + queueName;long now = System.currentTimeMillis();// 获取所有已过期的消息// Set<ZSetOperations.TypedTuple<String>> expiredMessages = // RedisTemplate.zrangeByScoreWithScores(redisKey, 0, now);// for (ZSetOperations.TypedTuple<String> tuple : expiredMessages) {// DelayMessage<T> message = deserialize(tuple.getValue());// delayQueue.put(message);// System.out.println("从Redis恢复消息: " + message.getData());// }} catch (Exception e) {e.printStackTrace();}}/*** 获取队列大小*/public int size() {return delayQueue.size();}/*** 判断队列是否为空*/public boolean isEmpty() {return delayQueue.isEmpty();}
}
- 业务消息实体
/*** 订单超时消息*/
public class OrderTimeoutMessage {private String orderId;private Long createTime;private String userId;public OrderTimeoutMessage(String orderId, Long createTime, String userId) {this.orderId = orderId;this.createTime = createTime;this.userId = userId;}// getter和setter方法public String getOrderId() { return orderId; }public void setOrderId(String orderId) { this.orderId = orderId; }public Long getCreateTime() { return createTime; }public void setCreateTime(Long createTime) { this.createTime = createTime; }public String getUserId() { return userId; }public void setUserId(String userId) { this.userId = userId; }@Overridepublic String toString() {return "OrderTimeoutMessage{" +"orderId='" + orderId + '\'' +", createTime=" + createTime +", userId='" + userId + '\'' +'}';}
}/*** 会话超时消息*/
public class SessionTimeoutMessage {private String sessionId;private String userId;private Long loginTime;public SessionTimeoutMessage(String sessionId, String userId, Long loginTime) {this.sessionId = sessionId;this.userId = userId;this.loginTime = loginTime;}// getter和setter方法public String getSessionId() { return sessionId; }public void setSessionId(String sessionId) { this.sessionId = sessionId; }public String getUserId() { return userId; }public void setUserId(String userId) { this.userId = userId; }public Long getLoginTime() { return loginTime; }public void setLoginTime(Long loginTime) { this.loginTime = loginTime; }@Overridepublic String toString() {return "SessionTimeoutMessage{" +"sessionId='" + sessionId + '\'' +", userId='" + userId + '\'' +", loginTime=" + loginTime +'}';}
}
- 使用示例和测试类
import java.util.concurrent.TimeUnit;/*** 延时队列使用示例*/
public class DelayQueueDemo {public static void main(String[] args) throws InterruptedException {// 1. 创建订单超时队列DistributedDelayQueue<OrderTimeoutMessage> orderTimeoutQueue = new DistributedDelayQueue<>("order_timeout", DelayQueueDemo::handleOrderTimeout);// 2. 创建会话超时队列DistributedDelayQueue<SessionTimeoutMessage> sessionTimeoutQueue = new DistributedDelayQueue<>("session_timeout", DelayQueueDemo::handleSessionTimeout);// 3. 启动队列orderTimeoutQueue.start();sessionTimeoutQueue.start();// 4. 添加测试消息// 订单15秒后超时orderTimeoutQueue.put(new OrderTimeoutMessage("ORDER_001", System.currentTimeMillis(), "USER_001"),15, TimeUnit.SECONDS);// 订单30秒后超时orderTimeoutQueue.put(new OrderTimeoutMessage("ORDER_002", System.currentTimeMillis(), "USER_002"),30, TimeUnit.SECONDS);// 会话5分钟后超时sessionTimeoutQueue.put(new SessionTimeoutMessage("SESSION_001", "USER_001", System.currentTimeMillis()),5, TimeUnit.MINUTES);System.out.println("已添加测试消息,等待处理...");// 5. 模拟运行一段时间Thread.sleep(40 * 1000);// 6. 停止队列orderTimeoutQueue.stop();sessionTimeoutQueue.stop();}/*** 处理订单超时*/private static void handleOrderTimeout(OrderTimeoutMessage message) {System.out.println("【订单超时处理】" + message.getOrderId() + ", 用户: " + message.getUserId() + ", 创建时间: " + message.getCreateTime());// 实际业务逻辑// 1. 更新订单状态为超时// 2. 释放库存// 3. 发送通知等}/*** 处理会话超时*/private static void handleSessionTimeout(SessionTimeoutMessage message) {System.out.println("【会话超时处理】" + message.getSessionId() + ", 用户: " + message.getUserId() + ", 登录时间: " + message.getLoginTime());// 实际业务逻辑// 1. 清理会话信息// 2. 记录日志// 3. 通知用户重新登录}
}
- Spring Boot 集成配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;@Configuration
public class DelayQueueConfig {@Beanpublic DistributedDelayQueue<OrderTimeoutMessage> orderTimeoutQueue() {return new DistributedDelayQueue<>("order_timeout", this::processOrderTimeout);}@Beanpublic DistributedDelayQueue<SessionTimeoutMessage> sessionTimeoutQueue() {return new DistributedDelayQueue<>("session_timeout", this::processSessionTimeout);}private void processOrderTimeout(OrderTimeoutMessage message) {// Spring环境下的业务处理// 可以注入Service进行处理}private void processSessionTimeout(SessionTimeoutMessage message) {// Spring环境下的业务处理}
}
- 高级特性 - 支持消息重试
/*** 支持重试的延时消息*/
public class RetryDelayMessage<T> extends DelayMessage<T> {private int retryCount;private final int maxRetryCount;public RetryDelayMessage(T data, long delay, TimeUnit unit, int maxRetryCount) {super(data, delay, unit);this.retryCount = 0;this.maxRetryCount = maxRetryCount;}public boolean canRetry() {return retryCount < maxRetryCount;}public void incrementRetry() {retryCount++;}public int getRetryCount() {return retryCount;}
}
核心优势
替代定时任务:不再需要轮询数据库,减少系统压力
精确延时:消息在精确的时间点被处理
分布式支持:通过Redis持久化,支持多实例部署
高性能:基于内存的DelayQueue,处理速度快
可扩展:支持多种业务场景,易于扩展
使用场景
订单超时取消
会话超时管理
延时通知推送
任务调度执行
缓存过期处理
这个方案可以有效替代传统的定时任务轮询方式,提供更高效、更精确的延时处理能力。
