当前位置: 首页 > news >正文

如何利用Redis实现延迟队列?

延迟队列概念解析

延迟队列(Delay Queue)是一种特殊的消息队列,核心特性是允许消息在指定的延迟时间后被消费者处理,而非立即消费。它解决了传统队列(FIFO)无法处理“定时任务”或“超时任务”的问题,常见于需要异步延迟处理的场景(如订单超时取消、定时提醒、重试机制等)。


核心要素

  1. 延迟时间:消息入队时需指定“延迟时长”或“绝对执行时间”(如“5分钟后处理”或“2024-08-01 10:00执行”)。
  2. 任务存储:需可靠存储未到期的任务(避免宕机丢失),支持快速查询到期任务。
  3. 触发机制:能高效检测并提取已到期的任务(时间精度需满足业务需求)。
  4. 处理逻辑:消费者对到期任务进行业务处理(如调用接口、更新数据库)。

与普通队列的区别

特性普通队列(FIFO)延迟队列
消费时机消息入队后立即可被消费消息需等待指定延迟时间后才被消费
排序规则按入队顺序(先进先出)按到期时间排序(时间早的优先)
核心目标解耦、异步、削峰填谷解决“定时/超时”类异步任务需求

典型应用场景

  • 订单超时取消:用户下单后未支付,15分钟后自动取消订单。
  • 重试机制:接口调用失败后,延迟30秒重试(避免立即重试加重系统负担)。
  • 定时通知:活动开始前30分钟,向用户推送提醒消息。
  • 缓存预热:每日凌晨3点触发缓存数据加载任务。

关键设计挑战

  1. 延迟精度:需平衡性能与时间精度(如Redis轮询间隔过短会增加QPS,过长可能导致任务延迟处理)。
  2. 持久化:避免因服务宕机导致未到期任务丢失(如Redis通过RDB/AOF持久化,RabbitMQ通过消息持久化)。
  3. 分布式支持:多消费者场景下需避免任务重复消费(如Redis使用Lua脚本原子化取任务)。
  4. 内存/存储限制:单机方案(如JDK DelayQueue)受内存限制,需评估任务量上限。

一、Redis 实现延迟队列(Java 代码)

Redis 延迟队列通常利用 有序集合(ZSET) 存储任务,任务的执行时间作为 score,通过轮询或阻塞方式获取到期任务。以下是核心实现:

1. 依赖引入(Maven)
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>4.4.3</version>
</dependency>
2. 生产者(任务入队)
import redis.clients.jedis.Jedis;
import java.util.UUID;public class RedisDelayQueueProducer {private final Jedis jedis;private final String queueKey = "delay_queue";public RedisDelayQueueProducer(Jedis jedis) {this.jedis = jedis;}// 添加延迟任务(score 为执行时间戳)public void addTask(String taskData, long executeTime) {String taskId = UUID.randomUUID().toString();jedis.zadd(queueKey, executeTime, taskId + ":" + taskData);}
}
3. 消费者(任务出队)

使用 Lua 脚本原子化获取并删除到期任务(避免多消费者竞态条件):

import redis.clients.jedis.Jedis;
import java.util.Arrays;
import java.util.List;public class RedisDelayQueueConsumer {private final Jedis jedis;private final String queueKey = "delay_queue";// Lua 脚本:获取并删除 score <= 当前时间的任务(最多取 10 个)// 使用Lua保证删除时间和任务的原子性private final String luaScript = "" +"local tasks = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'LIMIT', 0, 10)\n" +"if #tasks > 0 then\n" +"    redis.call('zrem', KEYS[1], unpack(tasks))\n" +"end\n" +"return tasks";public RedisDelayQueueConsumer(Jedis jedis) {this.jedis = jedis;}public List<String> pollExpiredTasks() {long currentTime = System.currentTimeMillis();return jedis.eval(luaScript, Arrays.asList(queueKey), Arrays.asList(String.valueOf(currentTime)));}
}

方案缺点(消费者去消费这条消息只有轮询去消费,会导致大量线程空转,特别是高峰期,不太推荐使用):
由于 Redis ZSET 不支持原生的阻塞命令(如 BLPOP ),实际中需通过以下方式模拟阻塞:

  • 短轮询+休眠 :轮询间隔设置为较小值(如100ms),减少延迟但增加 Redis 压力。
  • 事件触发 :结合 Redis 的 PUBLISH/SUBSCRIBE 机制,生产者在添加任务时发布事件,消费者订阅事件后立即触发轮询(减少无效轮询)。

二、其他延迟队列实现方案(Java)

方案 1:JDK DelayQueue(单机版)

基于 java.util.concurrent.DelayQueue,任务需实现 Delayed 接口。

代码实现
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;// 延迟任务类
class DelayTask implements Delayed {private final String taskId;private final String data;private final long expireTime; // 绝对时间戳(毫秒)public DelayTask(String taskId, String data, long delayMs) {this.taskId = taskId;this.data = data;this.expireTime = System.currentTimeMillis() + delayMs;}// 剩余延迟时间@Overridepublic long getDelay(TimeUnit unit) {long diff = expireTime - System.currentTimeMillis();return unit.convert(diff, TimeUnit.MILLISECONDS);}// 按到期时间排序@Overridepublic int compareTo(Delayed o) {return Long.compare(this.expireTime, ((DelayTask) o).expireTime);}
}// 生产者与消费者
public class JdkDelayQueueDemo {private static final DelayQueue<DelayTask> queue = new DelayQueue<>();public static void main(String[] args) {// 生产者:添加延迟 5 秒的任务new Thread(() -> {queue.put(new DelayTask("task1", "data1", 5000));}).start();// 消费者:阻塞获取到期任务new Thread(() -> {while (true) {try {DelayTask task = queue.take();System.out.println("处理任务:" + task.taskId + ", 数据:" + task.data);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}}).start();}
}

方案缺点:轮询不推荐

方案 2:RabbitMQ 死信队列(分布式)

通过设置消息 TTL(过期时间),过期后消息转发到死信队列(DLX),消费者监听死信队列。

代码实现(需 RabbitMQ 环境)
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class RabbitMqDelayQueueDemo {private static final String NORMAL_EXCHANGE = "normal_exchange";private static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";private static final String DEAD_LETTER_QUEUE = "dead_letter_queue";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 1. 配置死信队列(DLX)channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, BuiltinExchangeType.DIRECT);channel.queueDeclare(DEAD_LETTER_QUEUE, true, false, false, null);channel.queueBind(DEAD_LETTER_QUEUE, DEAD_LETTER_EXCHANGE, "dead_letter_key");// 2. 配置普通队列(设置 TTL 和 DLX)Map<String, Object> normalQueueArgs = new HashMap<>();normalQueueArgs.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);normalQueueArgs.put("x-dead-letter-routing-key", "dead_letter_key");normalQueueArgs.put("x-message-ttl", 5000); // 消息 5 秒后过期channel.queueDeclare("normal_queue", true, false, false, normalQueueArgs);channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.queueBind("normal_queue", NORMAL_EXCHANGE, "normal_key");// 3. 生产者发送消息到普通队列String message = "延迟任务数据";channel.basicPublish(NORMAL_EXCHANGE, "normal_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());// 4. 消费者监听死信队列(处理延迟任务)channel.basicConsume(DEAD_LETTER_QUEUE, false, (consumerTag, delivery) -> {String msg = new String(delivery.getBody());System.out.println("处理延迟任务:" + msg);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}, consumerTag -> {});}
}

方案特点:使用死信队列机制实现延迟队列,如果有RabbitMQ 推荐使用。


方案 3:RocketMq实现(推荐)
一、核心概念

RocketMQ 的延迟时间并非任意值,而是通过「延迟级别」控制(由 Broker 配置决定)。默认延迟级别对应的时间为:

1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h

对应级别为 1~18(级别 0 表示不延迟)。

二、实现步骤
1. 生产者发送延迟消息

在发送消息时,通过 setDelayTimeLevel(int level) 方法设置延迟级别。

示例代码(Java)

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;public class DelayProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("delay_producer_group");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();// 创建消息并设置延迟级别(例如级别 3,对应 10s 延迟)Message msg = new Message("DelayTopic",  // 主题"TagA",        // 标签"Hello RocketMQ".getBytes("UTF-8")  // 消息内容);msg.setDelayTimeLevel(3);  // 设置延迟级别为 3(10秒)// 发送消息producer.send(msg);producer.shutdown();}
}
2. 消费者消费消息

消费者无需特殊配置,正常订阅主题即可,Broker 会在延迟时间到达后投递消息。

示例代码(Java)

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;public class DelayConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer_group");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("DelayTopic", "*");  // 订阅主题consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.printf("收到消息:%s,延迟时间:%ds%n", new String(msg.getBody()), msg.getStoreTimestamp() - msg.getBornTimestamp() / 1000);  // 计算实际延迟时间(毫秒转秒)}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.println("消费者启动");}
}
三、注意事项
  1. 延迟级别限制:Broker 默认仅支持 18 个延迟级别,如需自定义延迟时间,需修改 Broker 配置文件(broker.conf)中的 messageDelayLevel 参数(格式:1s 5s 10s ...)。
  2. 消息时效性:延迟消息的存储和投递依赖 Broker 稳定性,需确保 Broker 有足够资源处理延迟队列。
  3. 版本兼容性:RocketMQ 4.2.0 及以上版本支持延迟消息,低版本需升级。

方案特点:原生api支持延迟队列,推荐此方案,实现简单易配置。


三、方案对比

方案优势劣势内聚耦合扩展性
Redis 延迟队列支持分布式、持久化(RDB/AOF)、高性能(O(logN) 插入/查询)需要维护 Redis 集群;需处理网络抖动(如 Lua 脚本原子性)低(依赖 Redis)高(可通过集群扩展)
JDK DelayQueue无额外依赖、实现简单、单机性能高单机限制(无法分布式)、无持久化(宕机任务丢失)、任务数受内存限制高(纯 JDK)低(仅单机)
RabbitMQ 死信队列天然分布式、支持持久化、消息可靠(ACK 机制)依赖 RabbitMQ 集群;配置复杂(需设置 TTL/DLX);延迟精度受 TTL 限制中(依赖 MQ)中(需扩展 MQ 集群)

总结

  • Redis:适合需要分布式、高吞吐的延迟任务(如订单超时取消)。
  • JDK DelayQueue:适合单机、小规模、对延迟精度要求不高的场景(如本地缓存清理)。
  • RabbitMQ:适合需要严格消息可靠、已集成 MQ 的分布式系统(如电商促销活动通知)。

相关文章:

  • Windows系统部署MongoDB数据库图文教程
  • String的一些固定程序函数
  • Cadence Allegro安装教程及指导
  • vector(c++)
  • img.dims() <= 2 in function ‘cv::matchTemplate报错
  • OpenAI新发布Codex的全面解析
  • OpenCV 光流估计:从原理到实战
  • JS逆向-某易云音乐下载器
  • OpenCL C++ 常见属性与函数
  • VASP+机器学习快速收敛AIMD
  • 慢速降落字母html
  • 微机电子拉伸试验机
  • 内容安全:使用开源框架Caffe实现上传图片进行敏感内容识别
  • 环形缓冲区 ring buffer 概述
  • 自定义库模块增加自定义许可操作详细方法
  • 通义千问-langchain使用构建(三)
  • 毛泽东(井冈山)词三篇
  • Buildroot 移植MiniGUI: 编写简单示例(基于君正X2000)
  • SAP学习笔记 - 开发豆知识01 - CDS SDK命令出乱码 (cds init CAP-Test03 --add java)
  • 如何在 Windows 10 或 11 中安装 PowerShellGet 模块?
  • 被围观的“英之园”,谁建了潮汕天价违建?
  • 多少Moreless:向世界展示现代中式家具的生活美学
  • 首次带人形机器人走科技节红毯,傅利叶顾捷:机器人行业没包袱,很多事都能从零开始
  • “多规合一”改革7年成效如何?自然资源部总规划师亮成绩单
  • 江苏省委组织部副部长高颜已任南京市委常委、组织部部长
  • 租车订单时隔7年从花呗免密扣费?“GoFun出行”引质疑