RabbitMq消费延迟衰减重试实现思路
文章目录
- 1.Mq消息父类
- 2.MQ延迟消息的延迟时间衰减发送等级枚举
- 3.消费者姿势
- 4.RabbitMQ 最大超时时长可以设置多久?
- ✅ 一、消息 TTL(Time-To-Live)——最常问的“延迟时长”
- 🔹 最大值
- ⚠️ 超出限制的行为
- ✅ 正确示例(Java)
- ✅ 二、Consumer Timeout(消费者超时)
- 🔹 默认值
- 🔹 可配置范围
- 🔧 配置方式
- 方法 1:永久生效(需重启)
- 方法 2:动态修改(无需重启)
- ✅ 三、连接与操作超时(客户端侧)
- ✅ 四、延时插件(rabbitmq-delayed-message-exchange)的限制
- 🚫 重要提醒:RabbitMQ 不是定时任务系统!
- 原因:
- 替代方案(长延迟):
- ✅ 总结:RabbitMQ 超时最大值一览
- 5.总结
1.Mq消息父类
/*** Mq消息父类*/
@Data
public class MqMsgDto<T> {/*** 延迟时间等级编号*/protected Integer level;/*** 消息*/protected T msg;}
2.MQ延迟消息的延迟时间衰减发送等级枚举
/*** MQ延迟消息的延迟时间衰减发送等级枚举* 以下有20个延迟时间等级,参考RocketMQ的那18个等级来设计的*/
@Getter
@AllArgsConstructor
public enum DelayTimeLevelEnum {TL1(1, 1000, "1秒"),TL2(2, 5 * 1000, "5秒"),TL3(3, 10 * 1000, "10秒"),TL4(4, 30 * 1000, "30秒"),TL5(5, 60 * 1000, "1分钟"),TL6(6, 2 * 60 * 1000, "2分钟"),TL7(7, 3 * 60 * 1000, "3分钟"),TL8(8, 4 * 60 * 1000, "4分钟"),TL9(9, 5 * 60 * 1000, "5分钟"),TL10(10, 6 * 60 * 1000, "6分钟"),TL11(11, 7 * 60 * 1000, "5分钟"),TL12(12, 8 * 60 * 1000, "8分钟"),TL13(13, 9 * 60 * 1000, "9分钟"),TL14(14, 10 * 60 * 1000, "10分钟"),TL15(15, 20 * 60 * 1000, "20分钟"),TL16(16, 30 * 60 * 1000, "30分钟"),TL17(17, 60 * 60 * 1000, "1小时"),TL18(18, 2 * 60 * 60 * 1000, "2小时"),TL19(19, 24 * 60 * 60 * 1000, "1天(24小时)"),TL20(20, 2 * 24 * 60 * 60 * 1000, "2天(48小时)");/*** 等级编号*/private Integer level;/*** 延迟时间(单位s)*/private Integer time;/*** 说明*/private String desc;/*** 根据延迟等级查找对应枚举类** @param level* @return*/public static DelayTimeLevelEnum getByLevel(Integer level) {return Arrays.stream(values()).filter(b -> Objects.nonNull(level) && b.getLevel() == level).findAny().orElse(DelayTimeLevelEnum.TL4);}}
3.消费者姿势
@Slf4j
@Component
@RefreshScope
public class mqConsumer {/*** 延迟插件实现延迟队列监听队列消息** @param message* @param channel* @throws IOException*/@RabbitHandler@RabbitListener(queues = "xxxxxx")public void consumer(Message message, Channel channel) throws IOException {String msg = new String(message.getBody(), "UTF-8");try {log.info("consumer=====>msg:{}", msg);//处理业务逻辑MqMsgDto<实体类型> mqMsgDto = JSON.parseObject(msg, new TypeReference<MqMsgDto<实体类型>>() {});log.info("consumer.mqMsgDto:{}", JSON.toJSONString(mqMsgDto));int i = mqMsgDto.getLevel().intValue();mqMsgDto.setLevel(++i);if (i > maxRetryNum) {log.info("level大于{}次不发衰减消息!", maxRetryNum);//TODO 超过最大重试次数可以发钉钉消息提醒人工处理(暂时不实现)} else {//TODO 处理失败则重新将上面的mqMsgDto往队列中发送,就可以达到衰减重试的效果//TODO 处理成功则确认消息即可}channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {log.error("消费异常:{}", e.getMessage());channel.basicNack(message.getMessageProperties().getDeliveryTag(), // deliveryTagfalse, // multiple = false:只拒绝当前消息false // requeue = false:不重试,走死信队列(没有死信队列则消息直接被丢弃));log.error("消费异常消息msg:{}", msg);}//channel.basicNack(); 不ack//channel.basicReject(); 拒绝}}
4.RabbitMQ 最大超时时长可以设置多久?
RabbitMQ 的“最大超时时长”需根据具体场景区分,主要包括以下三类超时设置。每种都有其理论上限和实际限制:
✅ 一、消息 TTL(Time-To-Live)——最常问的“延迟时长”
这是实现延迟队列的核心参数,分为 Per-Message TTL 和 Queue TTL。
🔹 最大值
- 单位:毫秒(ms)
- 理论最大值:
2^32 - 1 = 4,294,967,295 毫秒 - 换算:≈ 49.71 天
📌 来源:RabbitMQ 使用 32 位无符号整数存储 TTL(单位 ms),因此上限为
UINT32_MAX。
⚠️ 超出限制的行为
- 如果设置 TTL > 4,294,967,295 ms:
- RabbitMQ 不会报错;
- 但会溢出为负数或小正数;
- 结果:消息立即过期,被投递到死信交换器(DLX)或丢弃。
✅ 正确示例(Java)
// 设置 30 天 TTL(安全范围内)
long ttl = 30L * 24 * 60 * 60 * 1000; // = 2,592,000,000 ms < 4,294,967,295
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration(String.valueOf(ttl)).build();
channel.basicPublish("", "delay.queue", props, message.getBytes());
💡 建议:生产环境延迟不要超过 30 天,避免接近极限值。
✅ 二、Consumer Timeout(消费者超时)
在手动 ACK 模式下,RabbitMQ 会监控消费者是否长时间未 ACK。
🔹 默认值
- 30 分钟(1800000 毫秒)
🔹 可配置范围
- 最小:1 毫秒(不推荐)
- 最大:理论上可设为
Integer.MAX_VALUE(约 24.8 天),但不建议超过几天
🔧 配置方式
方法 1:永久生效(需重启)
# /etc/rabbitmq/rabbitmq.conf
consumer_timeout = 86400000 # 24 小时(单位:毫秒)
方法 2:动态修改(无需重启)
# 查看当前值
rabbitmqctl eval 'application:get_env(rabbit, consumer_timeout).'# 设置为 12 小时
rabbitmqctl eval 'application:set_env(rabbit, consumer_timeout, 43200000).'
⚠️ 注意:该超时针对 Channel 级别,若消费者处理单条消息超过此时间且未 ACK,RabbitMQ 会:
- 关闭 Channel;
- 将未 ACK 消息重新入队(requeue);
- 可能导致消息重复消费。
✅ 三、连接与操作超时(客户端侧)
这些由客户端库控制,非 RabbitMQ 服务端限制。
| 超时类型 | 默认值(以 Java 客户端为例) | 最大值 |
|---|---|---|
connectionTimeout | 60,000 ms(60 秒) | Integer.MAX_VALUE(≈24.8 天) |
handshakeTimeout | 10,000 ms | 同上 |
networkRecoveryInterval | 5,000 ms | 同上 |
💡 实际中设为几秒到几分钟即可,设太大无意义(网络故障应快速失败)。
✅ 四、延时插件(rabbitmq-delayed-message-exchange)的限制
如果你使用官方推荐的 延迟插件:
- 同样受 32 位整数限制;
- 最大延迟 ≈ 49.71 天;
- 行为更可靠(基于定时器而非 TTL + DLQ)。
🚫 重要提醒:RabbitMQ 不是定时任务系统!
尽管技术上支持近 50 天 的延迟,但官方明确建议:
❗ “Use delayed messages for short delays (seconds to minutes), not hours or days.”
(延迟消息适用于秒级~分钟级,而非小时或天级)
原因:
- 长期堆积消息占用磁盘;
- Broker 重启可能导致消息丢失(若未持久化);
- 无法动态取消/修改延迟任务;
- 监控和运维困难。
替代方案(长延迟):
| 延迟时长 | 推荐方案 |
|---|---|
| < 5 分钟 | RabbitMQ TTL 或延迟插件 ✅ |
| 5 分钟 ~ 24 小时 | Redis ZSet + 时间轮 ⏱️ |
| > 24 小时 | 数据库 + 定时调度(Quartz / XXL-JOB)📁 |
✅ 总结:RabbitMQ 超时最大值一览
| 超时类型 | 最大值 | 单位 | 是否推荐用到极限 |
|---|---|---|---|
| 消息 TTL | 4,294,967,295 | 毫秒(≈49.71 天) | ❌ 不推荐(建议 ≤30 天) |
| Consumer Timeout | Integer.MAX_VALUE(≈24.8 天) | 毫秒 | ❌ 不推荐(建议 ≤几小时) |
| 客户端连接超时 | Integer.MAX_VALUE | 毫秒 | ❌ 无意义(设几秒即可) |
💡 最佳实践:
- 短延迟(秒~分钟)→ RabbitMQ;
- 长延迟(小时~天)→ 专用调度系统。
📌 RabbitMQ 不是为长延迟设计的!官方文档明确建议:
“Use delayed messages for short delays (seconds to minutes), not hours.”
5.总结
希望我的分享对你有所启发和帮助,请一键三连,么么哒!
