RabbitMq消息阻塞,立即解决方案
如果目前你的 RabbitMQ 消费者 被卡住不再消费消息,且消息已经到达消费者绑定队列,但Spring Cloud Stream 没有继续触发 @StreamListener
的方法执行。这类问题一般是因为消费者线程阻塞或消息被 RabbitMQ 拒绝投递。我们可以按照下面的步骤紧急处理:
✅ 立即解决方案(临时 + 快速恢复消费)
✅ Step 1. 重启消费者服务
这一步是最快恢复消费的方式:
-
Spring Cloud Stream 的默认 ack 模式是自动 ack,只要没有抛出异常,MQ 就认为消费成功。
-
如果你遇到卡死,有可能是:
- 前一条消息处理还没结束(线程阻塞)
- 或者死循环/超时没释放线程
-
重启服务可释放线程/消费挂起状态
# 假设是 docker 容器/服务
docker restart your-consumer-service
# 或者直接 kill 掉 Java 服务再启动
观察 RabbitMQ 管控台,看是否:
- 队列消息数逐渐减少
- 消费速率恢复为 > 0
- 控制台中连接数和 channel 数重新建立
🔍 临时观察细节(可排查 root cause)
✅ RabbitMQ 控制台重点检查 3 项:
项目 | 正常情况 | 异常表现 |
---|---|---|
Ready | 有新消息就增加,消费中就减少 | 持续增长说明消息没人处理 |
Unacked | 一般在 0 ~ prefetch 区间内 | 长期不变说明卡住未 ack |
Consumer Utilization | 1.0 | 小于 1.0 说明消费者阻塞或卡死 |
打开 RabbitMQ 中该 queue 的 “Consumer” 详情页,能看到具体的 Consumer 状态。
🔧 推荐补救优化(防止再次发生)
✅ 修改 application.yml
设置消费者并发 + prefetch:
spring:cloud:stream:bindings:executeAiBaseTask-in-0:destination: ai-paas-executeAiBaseTask.ai-platformrabbit:bindings:executeAiBaseTask-in-0:consumer:concurrency: 5 # 启动多个消费线程prefetch: 10 # 提前拉取多个消息处理acknowledge-mode: AUTO # 默认可以先用 AUTO,后面再换 MANUAL
✅ 改造处理逻辑为异步线程池提交:
@StreamListener(ExecuteAiBaseTaskBinding.INPUT)
public void executeAiBaseTask(String apiCode) {taskExecutor.submit(() -> handleAiTask(apiCode));
}
将你的整套复杂处理逻辑挪到 handleAiTask(apiCode)
中。
🛡️ 如果你急着排查是“哪条消息卡住了”
你可以尝试用 RabbitMQ 控制台 Web UI:
-
在对应 Queue 上点击“Get messages”
-
获取队列中某些消息的 payload
-
手动判断是否是某条任务数据异常
- 比如
rightPop
之后无法解析 JSON、字段缺失、Redis 无响应等
- 比如
总结
步骤 | 目的 | 操作 |
---|---|---|
🔧 Step 1 | 快速恢复消费 | 重启消费服务 |
🔍 Step 2 | 定位卡住原因 | RabbitMQ 控制台查看 Unacked 与队列状态 |
✅ Step 3 | 防止再挂死 | 增加并发、改异步处理、调高 prefetch |
🛠 Step 4 | 结构性优化 | 异步线程池 + MANUAL ack 模式(推荐) |