当前位置: 首页 > news >正文

RocketMQ延迟消息是如何实现的?

RocketMQ的延迟消息实现机制非常巧妙,其核心是通过多级时间轮 + 定时任务 + 消息重投递来实现的。以下是详细实现原理:


⏰ 一、延迟消息的核心设计

  1. 预设延迟级别(非任意时间)
    RocketMQ不支持任意时间延迟,而是预设了18个固定延迟级别(1-18),每个级别对应固定延迟时间:

    // 源码中的延迟级别定义 (MessageStoreConfig类)
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    
  2. 延迟消息处理流程

    设置delayLevel
    到达延迟时间
    生产者发送延迟消息
    Broker接收
    存入SCHEDULE_TOPIC队列
    定时任务扫描
    重投递到真实Topic
    消费者消费

🔧 二、Broker端实现细节

1. 特殊主题存储
  • 所有延迟消息先存入内部主题:SCHEDULE_TOPIC_XXXX
  • 该主题包含 18个队列,每个队列对应一个延迟级别
  • 消息结构包含关键元数据:
    class Message {private String topic;      // 原始主题(如ORDER_TOPIC)private int delayLevel;   // 延迟级别(3=10秒)private long storeTimestamp; // 存储时间戳// ...其他字段
    }
    
2. 时间轮调度器(核心)
public class ScheduleMessageService extends ConfigManager {// 延迟级别对应的Timerprivate final ConcurrentMap<Integer, Timer> timerTable = new ConcurrentHashMap<>(32);// 延迟级别对应的处理队列private final ConcurrentMap<Integer, Long> offsetTable =new ConcurrentHashMap<>(32);
}
  • 每个延迟级别独立Timer:为18个级别分别创建定时器
  • 时间轮算法:使用HashedWheelTimer高效管理延迟任务
3. 消息重投递过程

当延迟时间到达时:

  1. SCHEDULE_TOPIC_XXXX的对应队列拉取消息
  2. 清除消息的delayLevel属性
  3. 将消息写入原始目标Topic
  4. 消费者此时可正常消费

⚡ 三、源码级执行流程

  1. 消息接收(Broker端):

    // DefaultMessageStore.putMessage()
    if (msg.getDelayTimeLevel() > 0) {// 修改Topic为SCHEDULE_TOPIC_XXXXtopic = ScheduleMessageService.SCHEDULE_TOPIC;// 计算目标队列:queueId = delayLevel - 1queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    }
    
  2. 定时扫描(每秒执行):

    // ScheduleMessageService.executeOnTimeup()
    for (int level = 1; level <= 18; level++) {long delayTimeMillis = computeDeliverTimestamp(level, storeTimestamp);if (now >= delayTimeMillis) {// 触发重投递deliverDelayedMessage(level);}
    }
    
  3. 重投递关键操作

    MessageExt msgExt = scheduleMessageIterator.next();
    // 恢复原始Topic/Queue
    MessageExtBrokerInner msgInner = rebuildMessage(msgExt);
    // 存入CommitLog(真实Topic)
    PutMessageResult result = defaultMessageStore.putMessage(msgInner);
    

📊 四、延迟级别与时间映射

延迟级别实际延迟时间对应队列ID
11秒queue0
25秒queue1
310秒queue2
430秒queue3
51分钟queue4
182小时queue17

⚠️ 五、使用注意事项

  1. 不支持任意时间延迟
    只能选择预设的18个级别(可通过修改配置扩展级别)
  2. 最大延迟时间限制
    默认最大2小时,修改需调整配置并重启Broker
  3. 精度误差
    实际延迟可能有1-2秒误差(受扫描周期影响)
  4. 资源消耗
    高并发延迟消息会显著增加Broker的CPU负载

🔄 六、生产环境优化建议

  1. 调整扫描频率(平衡精度与CPU)
    # broker.conf
    flushDelayOffsetInterval=1000  # 默认1秒,可调大到3秒
    
  2. 扩展延迟级别
    修改messageDelayLevel配置增加自定义级别:
    messageDelayLevel=1s 5s 10s 30s 1m 2m 5m 10m 30m 1h 2h 6h 12h
    
  3. 监控关键指标
    • ScheduleMessageService_* 开头的指标
    • 延迟队列积压情况(通过Admin CLI查看)

通过这种设计,RocketMQ在保证高性能的同时实现了海量延迟消息的支持。实际测试中,单Broker可处理百万级延迟消息,平均延迟误差控制在秒级以内。

http://www.dtcms.com/a/266490.html

相关文章:

  • 深度学习基础1
  • 基于Android的财务记账App
  • 【wps】 excel 删除重复项
  • AI 应用于进攻性安全
  • linux_git的使用
  • MySQL 8.0:窗口函数
  • 【Unity开发】Unity实现对模型移动、缩放、旋转操作的功能
  • 基于Docker构建OrangePi5 SDK环境
  • 408第三季part2 - 计算机网络 - 计算机网络基本概念
  • 闲庭信步使用SV搭建图像测试平台:第二十九课——绘制正弦波的图片
  • Android 实现底部弹窗
  • Datasophon的Ranger安装时数据库踩坑及问题解决
  • NeighborGeo:基于邻居的IP地理定位(三)
  • NeighborGeo:基于邻居的IP地理定位(二)
  • 【WEB】Polar靶场 6-10题 详细笔记
  • Jenkins-Email Extension 插件插件
  • 前端开发-前置知识
  • Android WebView 性能优化指南
  • Vue2中的keep-alive:组件状态缓存与性能优化实战指南
  • Android发展历程
  • Android 安装使用教程
  • [论文精读]StruQ: Defending Against Prompt Injection with Structured Queries
  • 前端捕获异常的全面场景及方法
  • 算法题目记录
  • OTC机器人焊机节气设备
  • EPLAN 电气制图:建立自己的部件库,添加部件-加SQL Server安装教程(三)上
  • web3钱包的运作原理
  • Type-C PD快充协议智能芯片S312L详解
  • GO 语言学习 之 结构体
  • mysql索引:索引应该选择哪种数据结构 B+树 MySQL中的页 页主体 页目录 索引分类