springboot rabbitmq 消息队列入门与实战
Spring Boot3 RabbitMq 项目地址
https://gitee.com/supervol/loong-springboot-study
(记得给个start,感谢)
RabbitMq 概述
RabbitMQ 是基于 AMQP(Advanced Message Queuing Protocol)的开源消息中间件,核心优势在于解耦、削峰、异步通信;而 Spring Boot 3 作为主流的 Java 开发框架,通过 spring-boot-starter-amqp
starter 简化了 RabbitMQ 的整合流程。本文将从基础概念、环境搭建、核心功能、高级特性到最佳实践,全面讲解 Spring Boot 3 与 RabbitMQ 的整合方案。
RabbitMq 核心
1. RabbitMQ 组件
组件 | 作用说明 |
---|---|
Broker | RabbitMQ 服务器实例,负责接收、存储、转发消息 |
Exchange | 交换机,接收生产者发送的消息,根据路由规则将消息路由到绑定的队列 |
Queue | 消息队列,存储待消费的消息,支持持久化、限流、死信等特性 |
Binding | 交换机与队列的绑定关系,包含 “路由键(Routing Key)” 用于匹配消息 |
Routing Key | 消息的 “地址标识”,交换机通过 Routing Key 决定消息路由到哪个队列 |
Virtual Host | 虚拟主机,实现多租户隔离(不同应用使用不同 Virtual Host,避免资源冲突) |
Connection | 客户端与 Broker 的 TCP 连接,重量级资源,一般复用 |
Channel | 基于 Connection 的轻量级通信通道, RabbitMQ 推荐通过 Channel 操作消息(减少 TCP 连接开销) |
交换机(Exchange)的 4 种核心类型,交换机是 RabbitMQ 消息路由的核心,不同类型对应不同的路由策略:
- Direct Exchange(直连交换机):精确匹配 Routing Key(消息的 Routing Key 与 Binding 的 Routing Key 完全一致才路由),适用于点对点通信(如订单支付通知)。
- Topic Exchange(主题交换机):模糊匹配 Routing Key(支持
*
匹配单个单词、#
匹配多个单词,单词间用.
分隔),适用于发布订阅 + 多条件过滤(如日志按 “服务名。级别” 路由)。 - Fanout Exchange(扇出交换机):忽略 Routing Key,将消息广播到所有绑定的队列,适用于广播通信(如系统通知、缓存清理)。
- Headers Exchange(头交换机):不依赖 Routing Key,通过匹配消息头(Headers)的键值对路由,适用于复杂属性匹配(较少用,灵活但性能略低)。
2. Spring AMQP 核心组件
Spring Boot 3 整合 RabbitMQ 依赖 Spring AMQP(版本与 Spring Boot 3 强绑定,如 Spring Boot 3.2 对应 Spring AMQP 3.2+),核心组件如下:
- RabbitTemplate:封装了 RabbitMQ 的消息发送逻辑,支持同步 / 异步发送、消息回调、消息转换器等。
- AmqpAdmin:用于声明交换机、队列、绑定关系(支持编程式声明,也可通过注解声明)。
- @RabbitListener:注解式消费者,标注在方法上即可监听指定队列,支持批量消费、手动确认等。
- MessageListenerContainer:消费者容器,负责管理消费者生命周期(如并发消费、消息重试、异常处理),Spring Boot 会自动配置默认容器。
RabbitMq 示例
1. 前提条件
Spring Boot 3 对依赖版本有严格要求,避免版本冲突:
组件 | 最低版本要求 | 推荐版本 |
---|---|---|
JDK | JDK 17+ | JDK 17/21 |
RabbitMQ | 3.9+ | 3.12+ |
Spring Boot | 3.0+ | 3.2.x(稳定版) |
Spring AMQP | 3.0+(随 Spring Boot 自动引入) | 3.2.x |
2. 代码位置
请参考项目地址中 springboot-mq/springboot-rabbitmq 模块代码。
RabbitMq 高级
基础整合仅满足简单通信,实际项目需解决消息丢失、重复消费、延迟消息等问题,本节讲解核心高级特性。
1. 消息可靠性保障
RabbitMQ 消息丢失可能发生在三个环节:生产者→Broker、Broker 存储、Broker→消费者,需针对性防护。
环节 | 防护措施 |
---|---|
生产者→Broker | 开启生产者确认(publisher-confirm-type: correlated )+ 回调重试 |
Broker 存储 | 交换机 / 队列持久化(durable=true )+ 消息持久化(deliveryMode=PERSISTENT ) |
Broker→消费者 | 手动确认(acknowledge-mode: manual )+ 消费失败转发死信队列 |
(1)消息持久化配置
在声明交换机和队列时,需设置 durable=true
;发送消息时,需设置 deliveryMode=PERSISTENT
:
// 1. 声明持久化交换机
DirectExchange durableExchange = new DirectExchange("durable-exchange", true, false);// 2. 声明持久化队列
Queue durableQueue = new Queue("durable-queue", true, false, false);// 3. 发送持久化消息(通过 RabbitTemplate 设置消息属性)
rabbitTemplate.convertAndSend("durable-exchange","durable-routing-key","持久化消息",message -> {// 设置消息持久化(DeliveryMode.PERSISTENT)message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;},new CorrelationData(UUID.randomUUID().toString())
);
2. 死信队列
死信是指无法被正常消费的消息(如消费失败、消息过期、队列满),死信队列用于存储这些消息,避免丢失或阻塞正常队列。
(1)死信产生条件
- 消息被消费者拒绝(
basicReject
/basicNack
,且requeue=false
)。 - 消息过期(队列设置
x-message-ttl
或消息单独设置expiration
)。 - 队列达到最大长度(
x-max-length
),无法存储新消息。
(2)死信队列配置示例
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DeadLetterQueueConfig {// 1. 死信交换机(普通 Direct 交换机)@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange("dlx-exchange", true, false);}// 2. 死信队列(存储死信消息)@Beanpublic Queue deadLetterQueue() {return new Queue("dlx-queue", true, false, false);}// 3. 绑定死信交换机与死信队列@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dlx-routing-key"); // 死信路由键}// 4. 普通队列(设置死信属性,将死信转发到死信交换机)@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal-queue").withArgument("x-dead-letter-exchange", "dlx-exchange") // 死信交换机.withArgument("x-dead-letter-routing-key", "dlx-routing-key") // 死信路由键.withArgument("x-message-ttl", 10000) // 消息过期时间(10秒).build();}// 5. 绑定普通队列与普通交换机@Beanpublic Binding normalBinding(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with("normal-routing-key");}// 6. 普通交换机@Beanpublic DirectExchange normalExchange() {return new DirectExchange("normal-exchange", true, false);}
}
测试:发送消息到 normal-queue
,若 10 秒内未被消费,消息会自动转为死信,进入 dlx-queue
。
3. 延迟队列
延迟队列用于 “消息延迟指定时间后再消费”(如订单超时未支付自动取消、定时任务),RabbitMQ 无原生延迟队列,需通过以下两种方式实现:
(1)基于死信队列 + TTL
利用 “消息过期后转为死信” 的特性,设置队列的 x-message-ttl
,死信队列即为延迟队列。
缺陷:队列中所有消息的延迟时间固定,无法灵活设置不同延迟时间。
(2)基于 RabbitMQ 延迟插件
RabbitMQ 提供 rabbitmq_delayed_message_exchange
插件,支持自定义消息延迟时间,灵活性更高。
步骤 1:安装延迟插件
- 下载
rabbitmq_delayed_message_exchange插件,并放到指定位置
。 - 安装插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
。 - 验证:访问管理界面,在
Exchanges
的Type
下拉框中可看到x-delayed-message
。 - 注意,本文不讨论和涉及rabbitmq及其插件安装和配置,请自行搜索。
步骤 2:配置延迟交换机与队列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DelayedQueueConfig {// 1. 声明延迟交换机(类型为 x-delayed-message)@Beanpublic CustomExchange delayedExchange() {// 参数:名称、类型、持久化、自动删除、附加参数(指定延迟交换机的路由类型)return new CustomExchange("delayed-exchange","x-delayed-message",true,false,Map.of("x-delayed-type", "direct") // 延迟交换机的底层路由类型(如 direct));}// 2. 声明延迟队列@Beanpublic Queue delayedQueue() {return new Queue("delayed-queue", true, false, false);}// 3. 绑定延迟交换机与队列@Beanpublic Binding delayedBinding() {return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed-routing-key").noargs();}
}
步骤 3:发送延迟消息
// 发送延迟消息(设置延迟时间,单位:毫秒)
public void sendDelayedMessage(Object message, long delayMs) {rabbitTemplate.convertAndSend("delayed-exchange","delayed-routing-key",message,msg -> {// 设置延迟时间msg.getMessageProperties().setDelay((int) delayMs);// 消息持久化msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return msg;},new CorrelationData(UUID.randomUUID().toString()));
}// 调用:延迟 5 秒后消费
sendDelayedMessage("延迟 5 秒的消息", 5000);
4. 消息幂等性
重复消费:同一消息被消费者多次处理(如消费者确认前宕机,Broker 重新投递)。需保证 “重复消费不影响业务正确性”(即幂等)。
(1) 解决方案:唯一 ID + 去重存储
- 生成唯一消息 ID:生产者发送消息时,设置
messageId
(如 UUID)。 - 消费前检查去重:消费者接收消息后,先查询存储(Redis / 数据库)中是否存在该
messageId
,若存在则跳过,若不存在则处理业务并记录messageId
。
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;@Service
public class IdempotentConsumerService {@Resourceprivate RedisTemplate<String, String> redisTemplate;// 幂等消费逻辑public void consumeIdempotentMessage(Object message, Message amqpMessage, Channel channel) throws IOException {String messageId = amqpMessage.getMessageProperties().getMessageId();String redisKey = "rabbitmq:message:id:" + messageId;try {// 1. Redis 分布式锁:避免并发重复处理(setIfAbsent 原子操作)Boolean isFirstConsume = redisTemplate.opsForValue().setIfAbsent(redisKey,"CONSUMED",24, // 过期时间(根据业务调整,避免 Redis 堆积)TimeUnit.HOURS);if (Boolean.FALSE.equals(isFirstConsume)) {// 2. 非首次消费:直接确认消息System.out.printf("消息已重复消费,ID=%s%n", messageId);channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);return;}// 3. 首次消费:处理业务逻辑System.out.printf("幂等消费消息:ID=%s,内容=%s%n", messageId, message);// 4. 处理完成:确认消息channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 5. 消费失败:拒绝消息(不重回队列)channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, false);System.err.printf("幂等消费失败:ID=%s,原因=%s%n", messageId, e.getMessage());}}
}
5. 监控与运维
1. RabbitMQ Management UI
RabbitMQ 管理界面是最基础的监控工具,关键监控指标:
- Exchanges:交换机是否正常,绑定数、消息入站 / 出站速率。
- Queues:队列长度(
Ready
数,若持续增长需扩容消费者)、消息消费速率(Consumers
数、Acknowledged
数)。 - Connections/Channels:连接数、信道数是否超出阈值(避免资源耗尽)。
- Admin:用户权限、虚拟主机配置是否正确。
2. Spring Boot Actuator 监控
通过 Spring Boot Actuator 暴露 RabbitMQ metrics,结合 Prometheus + Grafana 可实现可视化监控。
<!-- Spring Boot Actuator -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency><!-- 可选:Prometheus 适配(用于对接 Grafana) -->
<dependency><groupId>io.micrometer</groupId><artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
配置暴露监控端点
management:endpoints:web:exposure:include: health,info,metrics,prometheus # 暴露的端点metrics:export:prometheus:enabled: true # 启用 Prometheus 导出endpoint:health:show-details: always # 显示健康详情
查看监控数据
- 访问
http://localhost:8080/actuator/health
:查看 RabbitMQ 连接健康状态(rabbitmq
节点为UP
表示正常)。 - 访问
http://localhost:8080/actuator/metrics/rabbitmq.messages.sent
:查看消息发送总数。 - 访问
http://localhost:8080/actuator/prometheus
:获取 Prometheus 格式的 metrics,用于 Grafana 可视化。
RabbitMq 总结
-
组件设计规范:
- 交换机 / 队列命名:按 “业务模块 - 类型 - 用途” 命名(如
order-direct-exchange
、order-pay-queue
)。 - 虚拟主机隔离:不同环境(开发 / 测试 / 生产)或不同应用使用独立 Virtual Host。
- 交换机 / 队列命名:按 “业务模块 - 类型 - 用途” 命名(如
-
性能优化:
- 连接池配置:使用
CachingConnectionFactory
缓存信道(默认缓存 25 个),避免频繁创建信道。 - 消息体大小:单个消息不超过 1MB(大消息建议存储到 MinIO/OSS,消息中携带文件地址)。
- 并发控制:消费者并发数(
concurrency
)根据 CPU 核心数调整(如 2-4 倍核心数),避免过度并发导致资源竞争。
- 连接池配置:使用
-
可靠性优先:
- 必开特性:生产者确认、手动确认、消息持久化、死信队列。
- 避免滥用自动确认:仅在 “消费逻辑无副作用” 场景使用
acknowledge-mode: auto
。
-
问题排查:
- 日志配置:开启 RabbitMQ DEBUG 日志(
logging.level.org.springframework.amqp=DEBUG
),便于追踪消息流转。 - 死信监控:定期检查死信队列,分析死信原因(如消费异常、消息过期)。
- 日志配置:开启 RabbitMQ DEBUG 日志(