当前位置: 首页 > news >正文

RabbitMQ消息堆积问题排查:concurrentConsumers 配置的坑与解决方案

一、问题背景

在分布式系统中,消息队列常用于异步处理和解耦。某业务场景下,用户完成支付后,支付结果通过MQ通知业务系统更新状态。但用户反馈支付成功后,需要等待好几分钟才能在业务系统中看到状态更新,体验较差。

二、问题现象

业务流程如下:

  1. 用户支付成功
  2. 支付渠道将支付结果发送到MQ队列
  3. 业务服务消费MQ中的支付结果
  4. 消费成功后更新业务单据状态
  5. 向相关系统发送支付结果通知

监控发现,从消息生产到消费完成,平均耗时约2分钟,无法满足业务实时性要求。

三、问题排查

1. 队列堆积检查

使用RabbitMQ管理命令检查队列状态:
rabbitmqctl list_queues name messages messages_unacknowledged

监控数据显示:

16:54:00 queue_async_notify      2
....
16:59:30 queue_async_notify      0

明显看到消息在队列中堆积,直到16:58:30才被消费完。

2. 消费线程分析

检查业务日志发现,所有消息都由同一个线程处理:

3. 代码配置检查

查看RabbitMQ配置类发现关键问题:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, MessageConverter msgConverter) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(msgConverter);factory.setMaxConcurrentConsumers(10); // 只设置了最大并发数return factory;
}

4. 根本原因分析

• 业务处理平均耗时20秒,属于IO密集型操作

• 配置中只设置了maxConcurrentConsumers,未设置concurrentConsumers

• 由于消息量不大,未触发动态扩容机制

• 实际并发消费者数保持默认值1,导致消息串行处理

• 单线程处理速度跟不上消息生产速度,造成堆积

四、解决方案

方案一:固定并发消费者数(推荐)

@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

// 设置固定并发消费者数
factory.setConcurrentConsumers(5);
return factory;

}

方案二:动态扩容配置

@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

// 动态扩容配置
factory.setConcurrentConsumers(1);           // 初始并发数
factory.setMaxConcurrentConsumers(5);        // 最大并发数
factory.setPrefetchCount(1);                // 预取消息数
factory.setConsecutiveActiveTrigger(3);      // 连续活跃触发扩容
factory.setConsecutiveIdleTrigger(3);       // 连续空闲触发缩容
factory.setReceiveTimeout(1000L);            // 接收超时时间
return factory;

}

五、优化效果

配置调整后:
• 消息处理耗时从5分钟降低到20秒内

• 系统吞吐量提升5倍

• 用户感知的支付状态更新延迟大幅减少

六、经验总结

1. 理解配置参数:

• concurrentConsumers:初始并发消费者数

• maxConcurrentConsumers:最大并发消费者数

• 两者需要配合使用,只设置最大值不会生效

2. 根据业务特点配置:

• IO密集型操作:适当增加并发数

• CPU密集型操作:谨慎增加并发数

• 消息量稳定:使用固定并发数

• 消息量波动大:使用动态扩容

通过这次排查,我们认识到正确配置RabbitMQ消费者并发数的重要性,以及如何根据业务特性进行针对性优化。

http://www.dtcms.com/a/363478.html

相关文章:

  • 网络共享协议
  • 探索JavaScript机器学习:几款流行的库推荐
  • 服务器数据恢复—OceanStor存储数据丢失原来这样恢复
  • linux 命令 awk的常见用法
  • 【LeetCode】3025. 人员站位的方案数 I(康复-T2)
  • 【ComfyUI】SDXL Refiner 提示进一步提升生成图像的质量
  • 族 20 魅族 note16 meizu M20 MEIZU NOTE16 解锁BL bootloader
  • Linux电脑怎样投屏到客厅的大电视?支持远程投屏吗?
  • 构建高性能企业级搜索?Amazon CloudSearch全攻略:从核心概念到落地实践
  • 第 2 讲:Kafka Topic 与 Partition 基础
  • 心路历程-vim编辑器
  • 详解kafka streams(二)
  • Kafka 架构详解
  • 数据结构_队列(C语言实现)
  • 图论简介与图神经网络(Dijkstra算法,图卷积网络GCN实战)
  • 只需几条命令,本地体验微软最新长文本语音合成 VibeVoice(支持中文)
  • 电子电气架构 --- 当前企业EEA现状(上)
  • 2025牛客暑期多校训练营4(FBDGI)
  • MacOS - 记录MacOS发烫的好几天 - 幕后黑手竟然是
  • KVM 虚拟化基础与实操
  • 新质生产力的中枢神经:人工智能+时代的实时视频架构
  • Ubuntu 用户和用户组
  • rocketmq console dashboard 2.0控制台
  • 均匀分布直线阵的常规波束形成方位谱和波束图
  • MySQL中binlog、redolog与undolog的不同之处解析
  • 存算一体前沿技术——无需比较器即可高效排序,性能提升高达百倍
  • 轻型载货汽车变速器设计cad+设计说明书
  • 广东某地非金属矿山自动化监测服务项目
  • “转”若惊鸿,电磁“通”——耐达讯自动化RS485转Profinet点亮能源新章
  • Understanding the Flap T in American English