当前位置: 首页 > news >正文

RabbitMq消费延迟衰减重试实现思路

文章目录

  • 1.Mq消息父类
  • 2.MQ延迟消息的延迟时间衰减发送等级枚举
  • 3.消费者姿势
  • 4.RabbitMQ 最大超时时长可以设置多久?
    • ✅ 一、消息 TTL(Time-To-Live)——最常问的“延迟时长”
      • 🔹 最大值
      • ⚠️ 超出限制的行为
      • ✅ 正确示例(Java)
    • ✅ 二、Consumer Timeout(消费者超时)
      • 🔹 默认值
      • 🔹 可配置范围
      • 🔧 配置方式
        • 方法 1:永久生效(需重启)
        • 方法 2:动态修改(无需重启)
    • ✅ 三、连接与操作超时(客户端侧)
    • ✅ 四、延时插件(rabbitmq-delayed-message-exchange)的限制
    • 🚫 重要提醒:RabbitMQ 不是定时任务系统!
      • 原因:
      • 替代方案(长延迟):
    • ✅ 总结:RabbitMQ 超时最大值一览
  • 5.总结

1.Mq消息父类

/*** Mq消息父类*/
@Data
public class MqMsgDto<T> {/*** 延迟时间等级编号*/protected Integer level;/*** 消息*/protected T msg;}

2.MQ延迟消息的延迟时间衰减发送等级枚举

/*** MQ延迟消息的延迟时间衰减发送等级枚举* 以下有20个延迟时间等级,参考RocketMQ的那18个等级来设计的*/
@Getter
@AllArgsConstructor
public enum DelayTimeLevelEnum {TL1(1, 1000, "1秒"),TL2(2, 5 * 1000, "5秒"),TL3(3, 10 * 1000, "10秒"),TL4(4, 30 * 1000, "30秒"),TL5(5, 60 * 1000, "1分钟"),TL6(6, 2 * 60 * 1000, "2分钟"),TL7(7, 3 * 60 * 1000, "3分钟"),TL8(8, 4 * 60 * 1000, "4分钟"),TL9(9, 5 * 60 * 1000, "5分钟"),TL10(10, 6 * 60 * 1000, "6分钟"),TL11(11, 7 * 60 * 1000, "5分钟"),TL12(12, 8 * 60 * 1000, "8分钟"),TL13(13, 9 * 60 * 1000, "9分钟"),TL14(14, 10 * 60 * 1000, "10分钟"),TL15(15, 20 * 60 * 1000, "20分钟"),TL16(16, 30 * 60 * 1000, "30分钟"),TL17(17, 60 * 60 * 1000, "1小时"),TL18(18, 2 * 60 * 60 * 1000, "2小时"),TL19(19, 24 * 60 * 60 * 1000, "1天(24小时)"),TL20(20, 2 * 24 * 60 * 60 * 1000, "2天(48小时)");/*** 等级编号*/private Integer level;/*** 延迟时间(单位s)*/private Integer time;/*** 说明*/private String desc;/*** 根据延迟等级查找对应枚举类** @param level* @return*/public static DelayTimeLevelEnum getByLevel(Integer level) {return Arrays.stream(values()).filter(b -> Objects.nonNull(level) && b.getLevel() == level).findAny().orElse(DelayTimeLevelEnum.TL4);}}

3.消费者姿势

@Slf4j
@Component
@RefreshScope
public class mqConsumer {/*** 延迟插件实现延迟队列监听队列消息** @param message* @param channel* @throws IOException*/@RabbitHandler@RabbitListener(queues = "xxxxxx")public void consumer(Message message, Channel channel) throws IOException {String msg = new String(message.getBody(), "UTF-8");try {log.info("consumer=====>msg:{}", msg);//处理业务逻辑MqMsgDto<实体类型> mqMsgDto = JSON.parseObject(msg, new TypeReference<MqMsgDto<实体类型>>() {});log.info("consumer.mqMsgDto:{}", JSON.toJSONString(mqMsgDto));int i = mqMsgDto.getLevel().intValue();mqMsgDto.setLevel(++i);if (i > maxRetryNum) {log.info("level大于{}次不发衰减消息!", maxRetryNum);//TODO 超过最大重试次数可以发钉钉消息提醒人工处理(暂时不实现)} else {//TODO 处理失败则重新将上面的mqMsgDto往队列中发送,就可以达到衰减重试的效果//TODO 处理成功则确认消息即可}channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {log.error("消费异常:{}", e.getMessage());channel.basicNack(message.getMessageProperties().getDeliveryTag(), // deliveryTagfalse,  // multiple = false:只拒绝当前消息false   // requeue = false:不重试,走死信队列(没有死信队列则消息直接被丢弃));log.error("消费异常消息msg:{}", msg);}//channel.basicNack(); 不ack//channel.basicReject(); 拒绝}}

4.RabbitMQ 最大超时时长可以设置多久?

RabbitMQ 的“最大超时时长”需根据具体场景区分,主要包括以下三类超时设置。每种都有其理论上限实际限制


✅ 一、消息 TTL(Time-To-Live)——最常问的“延迟时长”

这是实现延迟队列的核心参数,分为 Per-Message TTLQueue TTL

🔹 最大值

  • 单位:毫秒(ms)
  • 理论最大值2^32 - 1 = 4,294,967,295 毫秒
  • 换算:≈ 49.71 天

📌 来源:RabbitMQ 使用 32 位无符号整数存储 TTL(单位 ms),因此上限为 UINT32_MAX

⚠️ 超出限制的行为

  • 如果设置 TTL > 4,294,967,295 ms:
    • RabbitMQ 不会报错
    • 但会溢出为负数或小正数
    • 结果:消息立即过期,被投递到死信交换器(DLX)或丢弃。

✅ 正确示例(Java)

// 设置 30 天 TTL(安全范围内)
long ttl = 30L * 24 * 60 * 60 * 1000; // = 2,592,000,000 ms < 4,294,967,295
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration(String.valueOf(ttl)).build();
channel.basicPublish("", "delay.queue", props, message.getBytes());

💡 建议:生产环境延迟不要超过 30 天,避免接近极限值。


✅ 二、Consumer Timeout(消费者超时)

手动 ACK 模式下,RabbitMQ 会监控消费者是否长时间未 ACK。

🔹 默认值

  • 30 分钟(1800000 毫秒)

🔹 可配置范围

  • 最小:1 毫秒(不推荐)
  • 最大:理论上可设为 Integer.MAX_VALUE(约 24.8 天),但不建议超过几天

🔧 配置方式

方法 1:永久生效(需重启)
# /etc/rabbitmq/rabbitmq.conf
consumer_timeout = 86400000  # 24 小时(单位:毫秒)
方法 2:动态修改(无需重启)
# 查看当前值
rabbitmqctl eval 'application:get_env(rabbit, consumer_timeout).'# 设置为 12 小时
rabbitmqctl eval 'application:set_env(rabbit, consumer_timeout, 43200000).'

⚠️ 注意:该超时针对 Channel 级别,若消费者处理单条消息超过此时间且未 ACK,RabbitMQ 会:

  • 关闭 Channel;
  • 将未 ACK 消息重新入队(requeue);
  • 可能导致消息重复消费。

✅ 三、连接与操作超时(客户端侧)

这些由客户端库控制,非 RabbitMQ 服务端限制。

超时类型默认值(以 Java 客户端为例)最大值
connectionTimeout60,000 ms(60 秒)Integer.MAX_VALUE(≈24.8 天)
handshakeTimeout10,000 ms同上
networkRecoveryInterval5,000 ms同上

💡 实际中设为几秒到几分钟即可,设太大无意义(网络故障应快速失败)。


✅ 四、延时插件(rabbitmq-delayed-message-exchange)的限制

如果你使用官方推荐的 延迟插件:

  • 同样受 32 位整数限制
  • 最大延迟 ≈ 49.71 天
  • 行为更可靠(基于定时器而非 TTL + DLQ)。

🚫 重要提醒:RabbitMQ 不是定时任务系统!

尽管技术上支持近 50 天 的延迟,但官方明确建议:

“Use delayed messages for short delays (seconds to minutes), not hours or days.”
(延迟消息适用于秒级~分钟级,而非小时或天级)

原因:

  • 长期堆积消息占用磁盘;
  • Broker 重启可能导致消息丢失(若未持久化);
  • 无法动态取消/修改延迟任务;
  • 监控和运维困难。

替代方案(长延迟):

延迟时长推荐方案
< 5 分钟RabbitMQ TTL 或延迟插件 ✅
5 分钟 ~ 24 小时Redis ZSet + 时间轮 ⏱️
> 24 小时数据库 + 定时调度(Quartz / XXL-JOB)📁

✅ 总结:RabbitMQ 超时最大值一览

超时类型最大值单位是否推荐用到极限
消息 TTL4,294,967,295毫秒(≈49.71 天)❌ 不推荐(建议 ≤30 天)
Consumer TimeoutInteger.MAX_VALUE(≈24.8 天)毫秒❌ 不推荐(建议 ≤几小时)
客户端连接超时Integer.MAX_VALUE毫秒❌ 无意义(设几秒即可)

💡 最佳实践

  • 短延迟(秒~分钟)→ RabbitMQ;
  • 长延迟(小时~天)→ 专用调度系统。

📌 RabbitMQ 不是为长延迟设计的!官方文档明确建议:

“Use delayed messages for short delays (seconds to minutes), not hours.”

5.总结

希望我的分享对你有所启发和帮助,请一键三连,么么哒!

http://www.dtcms.com/a/605799.html

相关文章:

  • 欧拉22.03系统安装RabbitMQ-3.6.10
  • C语言编译器的选择与优化技巧 | 如何选择适合的C语言编译器以提高代码性能
  • 高通移动:编译成功后,用Unpacking Tool打包,再烧录
  • 文档切片(Document Chunking)
  • AEC-Q100 stress实验详解#5——PTC(功率温度循环)
  • MacOS彻底清除docker及image
  • 【3ds Max动画】烟花:超级喷射粒子,荧光粒子效果
  • 做网站的内容资源广告装饰 技术支持 东莞网站建设
  • 脑机接口核心产业链研发实力:翔宇医疗、三博脑科、汉威科技、科大讯飞、创新医疗,5家龙头公司研发实力深度数据
  • AI驱动与人才争夺战:互联网行业步入新一轮扩张期
  • Java-171 Neo4j 备份与恢复 + 预热与执行计划实战
  • 《信息存储与管理》完整复习手册
  • 西门子1500PLC(模拟器)与Matlab经由Modbus通信联合PID仿真
  • 【LeetCode】110. 平衡二叉树
  • LeetCode 423 - 从英文中重建数字
  • 建设部信息中心网站提供模板网站制作多少钱
  • 徐州集团网站建设关键词排名霸屏代做
  • 将现有git项目推送到gitcode的方法
  • 鸿蒙PC生态三方软件移植:开发环境搭建及三方库移植指南
  • F280049C学习笔记之SDFM
  • Linux内存管理深度解析:从首次访问缺页处理到NUMA策略的完整架构
  • 北京网站设计与制作品牌网站建设策划书
  • Java 9+ 模块化系统(Jigsaw)实战:从 Jar 地狱到模块解耦的架构升级
  • Claude Code 深度解析:架构、工作原理与常见误解
  • 珠海市企业网站制作品牌仿简书wordpress博客主题
  • 文化传媒 网站设计成都网站建设:
  • Python实用指南:python + pyqt
  • SSM基于J2EE的山西旅游网站的设计与实现iiqmx(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。
  • 如何通过 WebSocket 接入期货实时行情接口
  • 开源 Objective-C IOS 应用开发(六)Objective-C 和 C语言