Redisson - 实现延迟队列
Redisson 延迟队列
Redisson 是基于 Redis 的一款功能强大的 Java 客户端。它提供了诸如分布式锁、限流器、阻塞队列、延迟队列等高可用、高并发组件。
其中,RDelayedQueue 是对 Redis 数据结构的高阶封装,能让你将消息延迟一定时间后再进入消费队列。
延迟队列的组成
Redisson 延迟队列由两个 Redis 队列组成:
RDelayedQueue:延迟元素容器,暂存在 Redis 的 zset 中(按时间排序)
RBlockingQueue(目标队列):当延迟时间到达后,Redisson 会自动将元素移动到这个队列,供消费者消费。
Redisson 内部使用 定时轮询线程 来扫描延迟数据并迁移至目标队列。
原理
Redisson 的 RDelayedQueue 设计巧妙,它并非一个单一的 Redis 数据结构,而是结合了 Redis 的 ZSET (有序集合) 和 List (列表,具体实现为 RBlockingQueue) 来实现的
> 生产者 -- (元素, 延迟时间) --> RDelayedQueue (API)
> |
> | (Redisson 内部)
> V
> +--------------------------------------------------+
> | ZSET (有序集合) |
> | Key: delayed_queue_name_zset |
> | Member: task_id_or_payload |
> | Score: execution_timestamp |
> +--------------------------------------------------+
> ^
> | (Eviction Scheduler 定期扫描)
> | (到期任务)
> V
> +--------------------------------------------------+
> | RBlockingQueue (目标队列,基于 Redis List) |
> | Key: destination_queue_name |
> | Value: task_payload |
> +--------------------------------------------------+
> ^
> |
> | (消费者 take()/poll())
> V 消费者 <---------------------------
Redisson 延迟队列的优势
分布式特性:基于 Redis,天然支持分布式环境,多个生产者和消费者实例可以共享同一个延迟队列系统。
高性能与持久化:依赖 Redis 的高性能特性。如果 Redis 配置了持久化 (AOF/RDB),延迟任务的元数据也能得到持久化保障。
易用性:Redisson 提供了简洁易懂的 API,屏蔽了底层 ZSET 和 List 的复杂操作。
精确度较高:任务的到期判断依赖 Redis 服务器的时间,通常比较准确。
支持任务取消:可以通过 RDelayedQueue.remove(object) 方法从延迟队列中移除尚未到期的任务(前提是任务对象能被正确 equals 比较)。
Redisson队列实践
添加依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.24.3</version>
</dependency>
Redisson 配置 (application.yml):
spring:
application:name: redisson-delayed-order-app# Redisson 配置 (如果使用 redisson-spring-boot-starter,它会尝试自动配置)# 你也可以提供一个 redisson.yaml 文件或在 Java Config 中配置 RedissonClient Bean# 例如,指定单个 Redis 服务器:redis: # Spring Boot 2.x 使用 spring.redis, Spring Boot 3.x 使用 spring.data.redis# Redisson starter 会尝试使用这些配置,但更推荐使用 Redisson 自身的配置方式# address: redis://127.0.0.1:6379 # Redisson 推荐的格式host: 127.0.0.1port: 6379# database: 0# password:# Redisson 自己的配置方式 (更灵活,可以放在 redisson.yaml 中)
# redisson:
# singleServerConfig:
# address: "redis://127.0.0.1:6379"
# database: 0
# # codec: org.redisson.codec.JsonJacksonCodec # 推荐使用 Jackson 序列化
定义延迟任务处理器(消费者)
@Component
public class OrderCloseConsumer implements InitializingBean {private static final String QUEUE_NAME = "order-close-queue";@Autowiredprivate RedissonClient redissonClient;@Autowiredprivate OrderService orderService;@Overridepublic void afterPropertiesSet() {RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(QUEUE_NAME);RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);// 启动消费线程Executors.newSingleThreadExecutor().submit(() -> {while (true) {try {String orderSn = blockingQueue.take(); // 阻塞等待orderService.closeOrder(orderSn); // 业务处理log.info("订单超时关闭成功:{}", orderSn);} catch (Exception e) {log.error("延迟关单处理异常", e);}}});}
}
在下单时设置延迟任务
public void createOrder(String orderSn) {// 1. 业务入库逻辑...// 2. 加入延迟队列RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue("order-close-queue");RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);delayedQueue.offer(orderSn, 30, TimeUnit.MINUTES); // 30分钟后执行log.info("订单加入延迟关闭队列:{}", orderSn);
}
关键注意事项
1、序列化 (Codec):
Redisson 默认使用 MarshallingCodec,它要求任务对象实现 java.io.Serializable。
推荐配置 RedissonClient 使用 org.redisson.codec.JsonJacksonCodec,这样任务对象无需实现 Serializable,更灵活且跨语言友好。在 RedissonClient Bean 配置中设置:config.setCodec(new JsonJacksonCodec());
2、幂等性: 消费者的处理逻辑(如 processOrderTimeout)必须是幂等的。即使同一个任务被重复消费(如消费者处理中断后任务重投,或 remove 失败后任务仍到期),最终结果也应一致。通过检查订单当前状态是保证幂等性的关键。
3、任务取消的 equals() 和 hashCode(): RDelayedQueue.remove(object) 依赖于任务对象的 equals() 和 hashCode() 方法来查找并删除。确保任务载体类 (如 OrderTimeoutTask) 正确实现了这两个方法,通常基于任务的唯一标识(如订单号)。
4、消费者线程管理: 使用 InitializingBean 和 DisposableBean 来管理消费者线程的生命周期。消费者逻辑应在单独的线程中执行,避免阻塞主线程。可以使用 Executors 创建线程池来并发处理任务。
5、错误处理与重试: 消费者循环中应有完善的 try-catch 块,处理单个任务失败的情况,避免整个消费者线程挂掉。可以考虑引入失败任务记录、告警或简单的延时重试(但要注意避免无限重试)。
6、Redis 持久化: 为保证系统重启后延迟任务不丢失(ZSET 中的元数据),Redis 服务器应配置 RDB 或 AOF 持久化。
7、消费者并发与伸缩:
可以启动多个消费者实例(不同JVM进程),它们会从同一个目标 RBlockingQueue 中竞争获取任务,实现负载均衡。
在单个消费者实例内部,也可以使用线程池来并发处理从队列中获取的任务。
8、监控: 关注 Redis 中 ZSET 和 List 的长度、Redisson 调度线程的健康状况、任务处理的成功率和耗时等指标,以便及时发现和处理问题。
9、Redisson 版本: 确保使用较新的稳定版 Redisson,因为早期版本可能在延迟队列的某些边缘场景下存在问题。
总结
Redisson 的 RDelayedQueue 通过巧妙地结合 Redis ZSET 和 List,提供了一个强大、易用且高效的分布式延迟队列解决方案。它非常适合如订单超时关闭、延时通知、定时任务等场景。在实践中,务必注意序列化、幂等性、任务取消的正确实现以及消费者端的健壮性设计,才能充分发挥其优势,构建稳定可靠的分布式应用。