RocketMQ 生产消费消息消息解析与重试机制详解
博主介绍:✌全网粉丝5W+,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战,博主也曾写过优秀论文,查重率极低,在这方面有丰富的经验✌
博主作品:《Java项目案例》主要基于SpringBoot+MyBatis/MyBatis-plus+MySQL+Vue等前后端分离项目,可以在左边的分类专栏找到更多项目。《Uniapp项目案例》有几个有uniapp教程,企业实战开发。《微服务实战》专栏是本人的实战经验总结,《Spring家族及微服务系列》专注Spring、SpringMVC、SpringBoot、SpringCloud系列、Nacos等源码解读、热门面试题、架构设计等。除此之外还有不少文章等你来细细品味,更多惊喜等着你哦
🍅uniapp微信小程序🍅面试题软考题免费使用,还可以使用微信支付,扫码加群。由于维护成本问题得不到解决,可能将停止线上维护。
🍅文末获取联系🍅精彩专栏推荐订阅👇🏻👇🏻 不然下次找不到哟
Java项目案例《100套》
https://blog.csdn.net/qq_57756904/category_12173599.html
uniapp小程序《100套》https://blog.csdn.net/qq_57756904/category_12173599.html
有需求代码永远写不完,而方法才是破解之道,抖音有实战视频课程,某马某千等培训都是2万左右,甚至广东有本科院校单单一年就得3万4年就12万学费,而且还没有包括吃饭的钱。所以很划算了。另外博客左侧有源码阅读专栏,对于求职有很大帮助,当然对于工作也是有指导意义等。在大城市求职,你面试来回一趟多多少少都在12块左右,而且一般不会一次性就通过,还得面试几家。而如果你对源码以及微服务等有深度认识,这无疑给你的面试添砖加瓦更上一层楼。
最后再送一句:最好是学会了,而不是学废了!!
2
一、消息体解析方案
1. JSON 消息解析(推荐)
@RocketMQMessageListener(topic = "ORDER_TOPIC", consumerGroup = "order-group")
public class OrderConsumer implements RocketMQListener<MessageExt> {private final ObjectMapper objectMapper = new ObjectMapper();@Overridepublic void onMessage(MessageExt message) {try {// 1. 获取消息体字节数组byte[] body = message.getBody();// 2. 解析JSON消息Order order = objectMapper.readValue(body, Order.class);// 3. 处理业务逻辑processOrder(order);} catch (JsonProcessingException e) {log.error("消息解析失败[msgId={}]", message.getMsgId(), e);throw new RuntimeException("消息格式错误", e);}}
}
2. 通用消息解析工具类
public class MessageParser {public static <T> T parse(MessageExt message, Class<T> valueType) {try {return new ObjectMapper().readValue(message.getBody(), valueType);} catch (IOException e) {throw new MessageParseException("消息解析失败", e);}}public static String parseToString(MessageExt message) {return new String(message.getBody(), StandardCharsets.UTF_8);}
}// 使用示例
Order order = MessageParser.parse(message, Order.class);
二、重试次数设置方案
1. 全局重试次数配置
# application.yml
rocketmq:consumer:max-reconsume-times: 3 # 默认最大重试次数delay-level-when-next-consume: 2 # 重试延迟级别
2. 消费者级别重试设置
@RocketMQMessageListener(topic = "PAYMENT_TOPIC",consumerGroup = "payment-group",maxReconsumeTimes = 5, // 覆盖全局设置delayLevelWhenNextConsume = 3 // 重试延迟级别
)
public class PaymentConsumer implements RocketMQListener<MessageExt> {// ...
}
3. 动态获取重试次数
@Override
public void onMessage(MessageExt message) {// 获取当前重试次数(从0开始)int reconsumeTimes = message.getReconsumeTimes();if (reconsumeTimes > getMaxRetryTimes()) {log.warn("达到最大重试次数[msgId={}]", message.getMsgId());saveToDeadLetterQueue(message);return;}// 业务处理...
}private int getMaxRetryTimes() {// 可以从配置中心动态获取return 3;
}
三、重试策略完整实现
1. 带重试监控的消费者
@RocketMQMessageListener(topic = "INVENTORY_TOPIC",consumerGroup = "inventory-group",maxReconsumeTimes = 3
)
public class InventoryConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {try {InventoryUpdate update = parseMessage(message);// 根据重试次数采用不同策略switch (message.getReconsumeTimes()) {case 0: // 第一次尝试inventoryService.fastUpdate(update);break;case 1: // 第二次尝试inventoryService.safeUpdate(update);break;default: // 最后一次尝试inventoryService.forceUpdate(update);}} catch (Exception e) {log.error("库存更新失败[retry={}/{}]", message.getReconsumeTimes(), message.getMaxReconsumeTimes(), e);// 最后一次重试特殊处理if (message.getReconsumeTimes() >= message.getMaxReconsumeTimes() - 1) {emergencyHandler.handle(message);}throw new RuntimeException(e);}}
}
2. 重试间隔配置
RocketMQ 使用延迟级别控制重试间隔:
延迟级别 | 延迟时间 | 对应描述 |
---|---|---|
1 | 1s | 延迟1秒 |
2 | 5s | 延迟5秒 |
3 | 10s | 延迟10秒 |
... | ... | ... |
16 | 2h | 延迟2小时 |
设置方式:
// 通过注解设置
@RocketMQMessageListener(delayLevelWhenNextConsume = 3)// 通过上下文动态设置(需要实现RocketMQReplyListener)
context.setDelayLevelWhenNextConsume(calculateDelayLevel());
四、测试方案设计
1. 消费者测试类示例
@SpringBootTest
class OrderConsumerTest {@Autowiredprivate OrderConsumer orderConsumer;@Testvoid testMessageProcessing() {// 1. 构建测试消息MessageExt message = new MessageExt();message.setBody(("{\"orderId\":\"123\",\"amount\":100}").getBytes());// 2. 模拟首次消费orderConsumer.onMessage(message);// 3. 模拟重试场景message.setReconsumeTimes(1);orderConsumer.onMessage(message);// 验证业务结果...}@Testvoid testMaxRetry() {MessageExt message = new MessageExt();message.setBody(("{\"orderId\":\"456\",\"amount\":200}").getBytes());message.setReconsumeTimes(3); // 超过最大重试assertThrows(RuntimeException.class, () -> {orderConsumer.onMessage(message);});// 验证死信队列处理}
}
2. 集成测试配置
@TestConfiguration
public class TestRocketMQConfig {@Bean@Primarypublic RocketMQTemplate mockRocketTemplate() {RocketMQTemplate template = mock(RocketMQTemplate.class);when(template.getProducer()).thenReturn(mock(DefaultMQProducer.class));return template;}@Bean@Primarypublic DeadLetterHandler mockDeadLetterHandler() {return mock(DeadLetterHandler.class);}
}
五、生产环境建议
-
重试策略原则:
-
非幂等操作:最大重试次数设为1
-
普通业务:3-5次重试
-
关键业务:8-16次重试(配合告警)
-
-
消息解析最佳实践:
// 添加消息版本号 @Data public class OrderMessage {private String version = "v1";private String orderId;private BigDecimal amount; }// 兼容性解析 public OrderMessage parseCompatible(MessageExt message) {String json = new String(message.getBody());if (json.contains("\"version\":\"v1\"")) {return objectMapper.readValue(json, OrderMessage.class);} else {// 旧版本处理逻辑} }
-
监控关键指标:
-
消息平均重试次数
-
死信队列堆积量
-
消息解析失败率
-
通过以上方案,您可以:
-
正确解析各种格式的消息体
-
灵活设置消息重试次数和间隔
-
实现健壮的消息处理逻辑
-
方便地进行单元测试和集成测试
3