当前位置: 首页 > news >正文

RocketMQ 生产消费消息消息解析与重试机制详解

 

博主介绍:✌全网粉丝5W+,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战,博主也曾写过优秀论文,查重率极低,在这方面有丰富的经验✌

博主作品:《Java项目案例》主要基于SpringBoot+MyBatis/MyBatis-plus+MySQL+Vue等前后端分离项目,可以在左边的分类专栏找到更多项目。《Uniapp项目案例》有几个有uniapp教程,企业实战开发。《微服务实战》专栏是本人的实战经验总结,《Spring家族及微服务系列》专注Spring、SpringMVC、SpringBoot、SpringCloud系列、Nacos等源码解读、热门面试题、架构设计等。除此之外还有不少文章等你来细细品味,更多惊喜等着你哦

🍅uniapp微信小程序🍅面试题软考题免费使用,还可以使用微信支付,扫码加群。由于维护成本问题得不到解决,可能将停止线上维护。

🍅文末获取联系🍅精彩专栏推荐订阅👇🏻👇🏻 不然下次找不到哟

Java项目案例《100套》
https://blog.csdn.net/qq_57756904/category_12173599.html
uniapp小程序《100套》

https://blog.csdn.net/qq_57756904/category_12173599.html

有需求代码永远写不完,而方法才是破解之道,抖音有实战视频课程,某马某千等培训都是2万左右,甚至广东有本科院校单单一年就得3万4年就12万学费,而且还没有包括吃饭的钱。所以很划算了。另外博客左侧有源码阅读专栏,对于求职有很大帮助,当然对于工作也是有指导意义等。在大城市求职,你面试来回一趟多多少少都在12块左右,而且一般不会一次性就通过,还得面试几家。而如果你对源码以及微服务等有深度认识,这无疑给你的面试添砖加瓦更上一层楼。

最后再送一句:最好是学会了,而不是学废了!!

2

一、消息体解析方案

1. JSON 消息解析(推荐)

@RocketMQMessageListener(topic = "ORDER_TOPIC", consumerGroup = "order-group")
public class OrderConsumer implements RocketMQListener<MessageExt> {private final ObjectMapper objectMapper = new ObjectMapper();@Overridepublic void onMessage(MessageExt message) {try {// 1. 获取消息体字节数组byte[] body = message.getBody();// 2. 解析JSON消息Order order = objectMapper.readValue(body, Order.class);// 3. 处理业务逻辑processOrder(order);} catch (JsonProcessingException e) {log.error("消息解析失败[msgId={}]", message.getMsgId(), e);throw new RuntimeException("消息格式错误", e);}}
}

2. 通用消息解析工具类

public class MessageParser {public static <T> T parse(MessageExt message, Class<T> valueType) {try {return new ObjectMapper().readValue(message.getBody(), valueType);} catch (IOException e) {throw new MessageParseException("消息解析失败", e);}}public static String parseToString(MessageExt message) {return new String(message.getBody(), StandardCharsets.UTF_8);}
}// 使用示例
Order order = MessageParser.parse(message, Order.class);

二、重试次数设置方案

1. 全局重试次数配置

# application.yml
rocketmq:consumer:max-reconsume-times: 3  # 默认最大重试次数delay-level-when-next-consume: 2  # 重试延迟级别

2. 消费者级别重试设置

@RocketMQMessageListener(topic = "PAYMENT_TOPIC",consumerGroup = "payment-group",maxReconsumeTimes = 5,  // 覆盖全局设置delayLevelWhenNextConsume = 3  // 重试延迟级别
)
public class PaymentConsumer implements RocketMQListener<MessageExt> {// ...
}

3. 动态获取重试次数

@Override
public void onMessage(MessageExt message) {// 获取当前重试次数(从0开始)int reconsumeTimes = message.getReconsumeTimes();if (reconsumeTimes > getMaxRetryTimes()) {log.warn("达到最大重试次数[msgId={}]", message.getMsgId());saveToDeadLetterQueue(message);return;}// 业务处理...
}private int getMaxRetryTimes() {// 可以从配置中心动态获取return 3; 
}

三、重试策略完整实现

1. 带重试监控的消费者

@RocketMQMessageListener(topic = "INVENTORY_TOPIC",consumerGroup = "inventory-group",maxReconsumeTimes = 3
)
public class InventoryConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {try {InventoryUpdate update = parseMessage(message);// 根据重试次数采用不同策略switch (message.getReconsumeTimes()) {case 0: // 第一次尝试inventoryService.fastUpdate(update);break;case 1: // 第二次尝试inventoryService.safeUpdate(update);break;default: // 最后一次尝试inventoryService.forceUpdate(update);}} catch (Exception e) {log.error("库存更新失败[retry={}/{}]", message.getReconsumeTimes(), message.getMaxReconsumeTimes(), e);// 最后一次重试特殊处理if (message.getReconsumeTimes() >= message.getMaxReconsumeTimes() - 1) {emergencyHandler.handle(message);}throw new RuntimeException(e);}}
}

2. 重试间隔配置

RocketMQ 使用延迟级别控制重试间隔:

延迟级别延迟时间对应描述
11s延迟1秒
25s延迟5秒
310s延迟10秒
.........
162h延迟2小时

设置方式:

// 通过注解设置
@RocketMQMessageListener(delayLevelWhenNextConsume = 3)// 通过上下文动态设置(需要实现RocketMQReplyListener)
context.setDelayLevelWhenNextConsume(calculateDelayLevel());

四、测试方案设计

1. 消费者测试类示例

@SpringBootTest
class OrderConsumerTest {@Autowiredprivate OrderConsumer orderConsumer;@Testvoid testMessageProcessing() {// 1. 构建测试消息MessageExt message = new MessageExt();message.setBody(("{\"orderId\":\"123\",\"amount\":100}").getBytes());// 2. 模拟首次消费orderConsumer.onMessage(message);// 3. 模拟重试场景message.setReconsumeTimes(1);orderConsumer.onMessage(message);// 验证业务结果...}@Testvoid testMaxRetry() {MessageExt message = new MessageExt();message.setBody(("{\"orderId\":\"456\",\"amount\":200}").getBytes());message.setReconsumeTimes(3); // 超过最大重试assertThrows(RuntimeException.class, () -> {orderConsumer.onMessage(message);});// 验证死信队列处理}
}

2. 集成测试配置

@TestConfiguration
public class TestRocketMQConfig {@Bean@Primarypublic RocketMQTemplate mockRocketTemplate() {RocketMQTemplate template = mock(RocketMQTemplate.class);when(template.getProducer()).thenReturn(mock(DefaultMQProducer.class));return template;}@Bean@Primarypublic DeadLetterHandler mockDeadLetterHandler() {return mock(DeadLetterHandler.class);}
}

五、生产环境建议

  1. 重试策略原则

    • 非幂等操作:最大重试次数设为1

    • 普通业务:3-5次重试

    • 关键业务:8-16次重试(配合告警)

  2. 消息解析最佳实践

    // 添加消息版本号
    @Data
    public class OrderMessage {private String version = "v1";private String orderId;private BigDecimal amount;
    }// 兼容性解析
    public OrderMessage parseCompatible(MessageExt message) {String json = new String(message.getBody());if (json.contains("\"version\":\"v1\"")) {return objectMapper.readValue(json, OrderMessage.class);} else {// 旧版本处理逻辑}
    }

  3. 监控关键指标

    • 消息平均重试次数

    • 死信队列堆积量

    • 消息解析失败率

通过以上方案,您可以:

  • 正确解析各种格式的消息体

  • 灵活设置消息重试次数和间隔

  • 实现健壮的消息处理逻辑

  • 方便地进行单元测试和集成测试

3

相关文章:

  • [GHCTF 2025]ret2libc1(NSSCTF)
  • 云蝠语音智能体——电话面试中的智能助手
  • 搭配前端食用
  • 【小程序】手机号快速验证组件如何使用对公转账方式
  • 一文详解RTMP协议
  • 每日一练,冲进国赛!全国青少年信息素养大赛-图形化编程—省赛真题——小鸡吃东西
  • 服务器为什么会产生垃圾文件
  • 【摄影测量与遥感】卫星姿态角解析:Roll/Pitch/Yaw与Φ/Ω/Κ的对应关系
  • NIST提出新型安全指标:识别潜在被利用漏洞
  • 图解深度学习 - 人工智能、机器学习和深度学习
  • SVN被锁定解决svn is already locked
  • 怎么判断一个Android APP使用了Qt 这个跨端框架
  • Javase易混点专项复习01_this关键字
  • 2.2.1 05年T1复习
  • 重读《人件》Peopleware -(12-2)Ⅱ 办公环境 Ⅴ 大脑时间与身体时间(下)
  • 生成式 AI:解锁人类创造力的智能引擎
  • SIWARD希华差分振荡器产品(TKD)SPXO有源振荡器
  • 清华大学:基于生成模型的上肢外骨骼机器人助力个性化中风康复
  • 【算法】: 前缀和算法(利用o(1)的时间复杂度快速求区间和)
  • 对于geoserver发布数据后的开发应用
  • 织梦手机网站怎么仿制/2021小学生新闻摘抄
  • 网站算信息化建设/企业网站搜索优化网络推广
  • 做网站怎么建立文件夹/手机免费发布信息平台
  • 动易cms下载/重庆seo关键词优化服务
  • 黄色网站模板/外包网站
  • 南山做网站多少钱/最新推广赚钱的app