当前位置: 首页 > news >正文

RocketMQ如何处理消息堆积

mq出现消息堆积本质上是消费速度跟不上生产速度导致的结果。处理消息堆积需要一个系统性的方法,从快速定位问题根源到实施相应的解决方案。

紧急诊断与定位瓶颈

当监控系统告警出现消息堆积时,不要急于重启或扩容,先搞清楚问题出在哪里。

  1. 确认堆积情况

    • 通过 RocketMQ Console 或 Admin Tool 查看 堆积量(Behind)、堆积消息数(Diff)等指标。确认是单个 Topic 还是多个 Topic 出现问题,是全部 Consumer Group 还是特定 Group。

    • 命令:./mqadmin consumerProgress -n <nameserver_addr> -g <consumer_group>

  2. 分析消费端状态

    • 消费线程是否卡住?:检查消费者应用的日志是否有大量错误,如数据库连接超时、HTTP 调用失败、死锁或长时间的 GC 停顿。这是最常见的原因

    • 消费逻辑是否变慢?:是否引入了新的、耗时的业务逻辑?是否在处理某一种特定消息时效率极低?

    • 检查系统资源:CPU、内存、磁盘 I/O 或网络带宽是否已达瓶颈?特别是消费者应用所在的机器。

  3. 分析生产端状态

    • 是否突然有流量洪峰?:例如大促、定时任务集中触发等,导致生产者发送消息的速率远超平时。

    • 是否在“重放”消息?:是否将大量历史消息重新发送到队列中?

  4. 分析 Broker 状态

    • Broker 的 CPU、IO 负载是否正常?写入消息的性能是否下降?

  • 单个Topic堆积 -> 问题很大概率出在这个Topic相关的特定业务链路上(消费端或生产端)。

  • 多个Topic同时堆积 -> 问题很大概率出在公共底层资源或组件上。

因此从四大方面分别进行分析:发送端(流量激增,过多重复消息),队列端(单或多topic同时堆积),broker处理机端(cpu,io负载),消费端(消费报错,新增耗时业务,系统资源占用情况)。

实施应急处理方案(短期止血)

排查完问题之后就需要立即解决当前消息堆积的问题

  1. 扩容消费者(最直接有效)

    • 增加消费者实例数:这是应对流量洪峰最快捷的方式。通过增加 Pod(K8s)、容器或虚拟机来水平扩展消费者应用。

    • 增加单个消费者的并行度

      • 调整 consumeThreadMin 和 consumeThreadMax:增加消费者线程池的大小。

      • 调整 pullBatchSize:增加每次从 Broker 拉取的消息数量,减少网络交互次数。

  2. 优化消费逻辑

    • 简化或绕过:如果可能,临时将复杂的消费逻辑(如写数据库、调用外部API)替换为更简单的逻辑(如只解析消息并落盘到本地或更快的存储中),事后再进行补偿处理。

    • 避免阻塞:检查消费代码中是否有不必要的同步等待、锁竞争或慢速的 IO 操作,将其异步化或优化。

  3. 服务降级

    • 如果消息不是100%关键,可以考虑只处理核心业务消息,非核心消息先跳过或记录日志后稍后处理。

根本原因分析与长期优化(治本)

1. 批量消费示例

RocketMQ支持批量消费,你可以在消费者中一次获取多条消息进行处理。

public class BatchMessageListener implements MessageListenerConcurrently {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {try {// 批量处理消息for (MessageExt msg : msgs) {String body = new String(msg.getBody(), StandardCharsets.UTF_8);// 处理每条消息,这里可以是你的业务逻辑processMessage(body);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {// 处理失败,稍后重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}private void processMessage(String message) {// 你的业务逻辑System.out.println("Processing: " + message);}
}

在初始化消费者时,你需要设置批量消费的大小(通过设置Consumer的pullBatchSize属性,但注意,实际拉取的消息数还会受到Topic、队列等因素的影响)。另外,也可以通过在监听器中处理多条消息(如上所示)来实现批量处理。

2. 异步处理示例

将耗时的业务操作异步化,避免阻塞消费线程。

public class AsyncMessageListener implements MessageListenerConcurrently {// 创建一个独立的线程池用于异步处理private final ExecutorService asyncProcessingPool = Executors.newFixedThreadPool(10);@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {// 将每条消息提交给线程池异步处理for (MessageExt msg : msgs) {asyncProcessingPool.submit(() -> {try {String body = new String(msg.getBody(), StandardCharsets.UTF_8);// 耗时操作,如调用第三方API、复杂计算等timeConsumingProcess(body);} catch (Exception e) {// 处理异常,可以根据业务需要记录日志并决定是否重试// 注意:异步处理中如果失败,消息已经被确认消费了,所以需要额外的重试机制(如将失败消息再发送到另一个队列)}});}// 注意:这里立即返回成功,消息会被确认消费成功。如果异步处理失败,消息不会重试。// 因此,这种模式适用于对消息丢失不敏感的场景,或者有其他补偿机制。return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}private void timeConsumingProcess(String message) {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}System.out.println("Processed: " + message);}
}

注意:异步处理的风险在于,如果异步任务失败,消息已经被确认为消费成功,因此不会重试。所以这种方法通常需要配合其他机制(如死信队列、人工干预)使用,或者用于允许少量丢失的场景。

3. 优化数据库操作示例

优化数据库操作,例如使用批量插入、索引等。

假设我们消费消息后需要将数据插入数据库,我们可以使用批量插入来优化。

4. 保证幂等性示例

以消费端幂等为例,使用Redis来记录已经处理过的消息(假设每条消息有唯一ID)。

public class IdempotentMessageListener implements MessageListenerConcurrently {private final RedisTemplate<String, String> redisTemplate;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {String messageId = msg.getMsgId(); // 注意:生产者发送重复消息时MsgId不同,所以要用业务唯一IDString body = new String(msg.getBody(), StandardCharsets.UTF_8);MyData data = parseMessage(body);// 使用业务唯一ID来判断重复,比如订单号String businessId = data.getOrderId();// 尝试在Redis中设置key,如果已存在则设置失败Boolean success = redisTemplate.opsForValue().setIfAbsent(businessId, "processed", 10, TimeUnit.MINUTES);if (Boolean.TRUE.equals(success)) {// 第一次处理,执行业务逻辑processData(data);} else {// 已经处理过,直接跳过System.out.println("Duplicate message, skipped: " + businessId);}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}private void processData(MyData data) {// 处理业务}
}

注意:上述示例中使用消息的业务唯一ID(如订单ID)作为Redis的key,并设置过期时间(例如10分钟),这样在一定时间后自动清除,避免Redis无限增长。

如果使用数据库实现幂等,可以通过唯一键约束来避免重复插入。

这些示例提供了实现这些优化策略的基本思路,实际应用中需要根据具体业务场景进行调整。

5. 合理设计消息与 Topic

  • 消息过滤:使用 Tag 或 SQL92 属性过滤器,让消费者只订阅它真正需要的消息,避免处理无关数据。

  • 拆分 Topic:如果一个大 Topic 中包含多种业务类型的消息,且消费速度不一致,可以考虑将其拆分成多个 Topic,由不同的 Consumer Group 消费,避免慢业务阻塞快业务。

6. 监控与告警常态化

  • 建立完善的监控:不仅监控堆积量,还要监控消费 TPS、生产 TPS、消费耗时、成功/失败率等。

  • 设置合理告警阈值:在堆积量达到一个风险水平(如积压超过 10W)时就提前告警,而不是等到系统快崩溃了再告警。

7.预留资源缓冲

  • 预估大促或活动流量,提前对消费者应用进行扩容。

  • 保证系统有一定的资源冗余,以应对突发流量。


文章转载自:

http://2S9Goj9B.pdxqk.cn
http://MrCItkDr.pdxqk.cn
http://kPejCLEM.pdxqk.cn
http://PmdLKeJS.pdxqk.cn
http://eJkOO3tA.pdxqk.cn
http://qRAF5Z4G.pdxqk.cn
http://6NIRUv4W.pdxqk.cn
http://BdR1tNdE.pdxqk.cn
http://v0huS9B4.pdxqk.cn
http://2gs2zMCs.pdxqk.cn
http://yCkBIkfC.pdxqk.cn
http://PsJCRwqi.pdxqk.cn
http://0Tny8NRS.pdxqk.cn
http://0Ul4sumC.pdxqk.cn
http://EAPrXZsC.pdxqk.cn
http://GSYmjYvF.pdxqk.cn
http://lHZrsBUa.pdxqk.cn
http://MXMQ1bsq.pdxqk.cn
http://IQAPxHHF.pdxqk.cn
http://6Rtlvb3J.pdxqk.cn
http://fBrcAaWE.pdxqk.cn
http://wSzqCbqr.pdxqk.cn
http://eKe2M4qg.pdxqk.cn
http://7IwstxxD.pdxqk.cn
http://D3YQN2H2.pdxqk.cn
http://HJFgwVqr.pdxqk.cn
http://PnkPmcdF.pdxqk.cn
http://csDaN35r.pdxqk.cn
http://WJHcbSuo.pdxqk.cn
http://dPtGcfYW.pdxqk.cn
http://www.dtcms.com/a/370964.html

相关文章:

  • Maimo-AI驱动的行业研究工作平台
  • 「数据获取」《中国服务业统计与服务业发展(2014)》
  • C++:深入剖析vector
  • Arazzo AI监考Agent API实战教程
  • *和->的区别
  • MySQL中有哪些锁
  • 什么是云手机?
  • 基于FPGA的电梯控制系统设计(论文+源码)
  • Vllm-0.10.1:vllm bench serve参数说明
  • 华为悦盒EC6108V9/EC6108V9U/EC6108V9C_MV100(pub普通版/CA高安版)卡刷和强刷固件包
  • JVM中常见的GC垃圾收集器
  • Rsyslog日志采集
  • 代理连接性能优化:提升网络效率的关键技术与实践
  • NV308NV309美光固态闪存NW388NW504
  • C++中的栈
  • 手撕C++ list容器:从节点到完整双向链表实现
  • [Windows] AdGuard.v7.21.5089.0 中文直装电脑版
  • Skia如何渲染 Lottie 动画
  • 打工人日报#20250906
  • 基于GOA与BP神经网络分类模型的特征选择方法研究(Python实现)
  • 【完整源码+数据集+部署教程】苹果实例分割检测系统源码和数据集:改进yolo11-AggregatedAtt
  • [Upscayl图像增强] 多种AI处理模型 | 内置模型与自定义模型
  • RK3568 Trust
  • ECharts Gallery:Apache官方数据可视化模板库,助你快速制作交互图表并实现深度定制
  • 【LeetCode热题100道笔记】二叉搜索树中第 K 小的元素
  • HMI(人机界面)
  • 懒加载的概念
  • panther X2 armbian24 安装宝塔(bt)面板注意事项
  • 少儿配音教育:广州声与色在线科技有限公司打造趣味课程,助力青少年语言能力提升
  • 零基础学习数据采集与监视控制系统SCADA