Spring Boot 与 RabbitMQ 集成示例
文章目录
- 一、核心前提(环境与依赖)
- 1. 环境准备
- 2. 引入核心依赖
- 二、核心配置(连接与基础设置)
- 三、核心组件声明(交换机、队列、绑定)
- 1. 注解式声明(简洁高效)
- 2. 配置文件声明(适用于简单场景)
- 四、消息收发实现
- 1. 生产者(发送消息)
- 2. 消费者(接收消息)
- 五、关键特性与进阶配置
- 1. 消息持久化
- 2. 死信队列(处理失败 / 过期消息)
- 3. 消息限流(避免消费者过载)
- 六、测试验证
- 1. 编写测试类
- 2. 启动验证
- 七、常见问题排查
Spring Boot 集成 RabbitMQ 的核心是通过
spring-boot-starter-amqp实现自动配置,快速完成消息收发,适用于解耦、异步通信等场景。
一、核心前提(环境与依赖)
1. 环境准备
-
安装 RabbitMQ 服务(本地 / 服务器),启动后默认端口 5672,管理界面端口 15672。
-
确保 Spring Boot 版本与 AMQP 启动器兼容(推荐 Spring Boot 2.x/3.x)。
2. 引入核心依赖
Maven 项目在pom.xml中添加依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
Gradle 项目对应添加:implementation 'org.springframework.boot:spring-boot-starter-amqp'
二、核心配置(连接与基础设置)
在application.yml(或application.properties)中配置 RabbitMQ 连接信息:
spring:rabbitmq:host: 127.0.0.1 # MQ服务地址(远程填服务器IP)port: 5672 # 默认通信端口username: guest # 默认账号(生产环境需自定义并授权)password: guest # 默认密码virtual-host: / # 虚拟主机(用于环境隔离)# 可选配置:消息确认、重试机制publisher-confirm-type: correlated # 生产者确认机制publisher-returns: true # 消息路由失败回调listener:simple:acknowledge-mode: auto # 消费者确认模式(auto/manual/none)retry:enabled: true # 开启消费重试max-attempts: 3 # 最大重试次数
三、核心组件声明(交换机、队列、绑定)
需明确交换机(路由消息)、队列(存储消息)、绑定(关联两者),支持两种声明方式:
1. 注解式声明(简洁高效)
通过@Queue、@Exchange、@Binding注解直接绑定:
import org.springframework.amqp.core.\*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMqConfig {// 队列名称public static final String QUEUE_NAME = "demo_queue";// 交换机名称public static final String EXCHANGE_NAME = "demo_exchange";// 路由键public static final String ROUTING_KEY = "demo.routing.key";// 声明队列(durable=true:持久化,重启MQ不丢失)@Beanpublic Queue demoQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}// 声明交换机(Direct类型:精确路由)@Beanpublic DirectExchange demoExchange() {return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();}// 绑定队列与交换机(指定路由键)@Beanpublic Binding demoBinding(Queue demoQueue, DirectExchange demoExchange) {return BindingBuilder.bind(demoQueue).to(demoExchange).with(ROUTING_KEY);}}
- 交换机类型:Direct(精确路由)、Topic(模糊路由)、Fanout(广播)、Headers(头匹配)。
2. 配置文件声明(适用于简单场景)
直接在application.yml中声明基础队列(无需代码):
spring:rabbitmq:# 其他配置...template:exchange: demo_exchangerouting-key: demo_routing_keylistener:simple:queues: demo_queue
四、消息收发实现
1. 生产者(发送消息)
使用RabbitTemplate发送消息,支持字符串、对象(自动序列化):
import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;import javax.annotation.Resource;@Componentpublic class MessageProducer {@Resourceprivate RabbitTemplate rabbitTemplate;// 发送字符串消息public void sendStringMessage(String content) {rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, // 交换机名称RabbitMqConfig.ROUTING_KEY, // 路由键content // 消息内容);System.out.println("生产者发送消息:" + content);}// 发送对象消息(需确保对象可序列化)public void sendObjectMessage(User user) {rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,RabbitMqConfig.ROUTING_KEY,user);System.out.println("生产者发送对象消息:" + user);}// 静态内部类示例(可独立定义)public static class User implements java.io.Serializable {private String id;private String name;// getter/setter/toString}}
2. 消费者(接收消息)
使用@RabbitListener注解监听队列,自动消费消息:
import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class MessageConsumer {// 监听指定队列@RabbitListener(queues = RabbitMqConfig.QUEUE_NAME)public void receiveStringMessage(String content) {System.out.println("消费者接收字符串消息:" + content);// 业务处理逻辑...}// 监听队列并接收对象消息@RabbitListener(queues = RabbitMqConfig.QUEUE_NAME)public void receiveObjectMessage(MessageProducer.User user) {System.out.println("消费者接收对象消息:" + user);// 业务处理逻辑...}}
- 若需手动确认消息(
acknowledge-mode: manual),可通过Channel对象手动 ack:
import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class ManualAckConsumer {@RabbitListener(queues = RabbitMqConfig.QUEUE_NAME)public void receiveMessage(String content, Channel channel, Message message) throws Exception {try {System.out.println("手动确认模式 - 接收消息:" + content);// 业务处理成功后,手动确认消息(第二个参数false:不批量确认)channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 业务处理失败,拒绝消息并重回队列(或死信队列)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}
五、关键特性与进阶配置
1. 消息持久化
-
队列持久化:
QueueBuilder.durable(true)(默认 true)。 -
交换机持久化:
ExchangeBuilder.durable(true)(默认 true)。 -
消息持久化:发送时指定
MessageProperties.PERSISTENT_TEXT_PLAIN:
rabbitTemplate.convertAndSend(exchange, routingKey, content, message -> {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;});
2. 死信队列(处理失败 / 过期消息)
通过队列参数配置死信交换机和路由键,失败消息自动路由到死信队列:
@Beanpublic Queue deadLetterQueue() {return QueueBuilder.durable("dead_letter_queue").build();}@Beanpublic DirectExchange deadLetterExchange() {return ExchangeBuilder.directExchange("dead_letter_exchange").durable(true).build();}@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead_letter_routing_key");}// 普通队列绑定死信配置@Beanpublic Queue demoQueue() {return QueueBuilder.durable(QUEUE_NAME).withArgument("x-dead-letter-exchange", "dead_letter_exchange") // 死信交换机.withArgument("x-dead-letter-routing-key", "dead_letter_routing_key") // 死信路由键.withArgument("x-message-ttl", 60000) // 消息过期时间(60秒).build();}
3. 消息限流(避免消费者过载)
在application.yml中配置消费者每次拉取消息数量:
spring:rabbitmq:listener:simple:prefetch: 10 # 每次最多拉取10条消息,处理完再拉取
六、测试验证
1. 编写测试类
import org.junit.jupiter.api.Test;import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTestpublic class RabbitMqTest {@Resourceprivate MessageProducer messageProducer;@Testpublic void testSendMessage() {// 测试发送字符串消息messageProducer.sendStringMessage("Hello Spring Boot + RabbitMQ!");// 测试发送对象消息MessageProducer.User user = new MessageProducer.User();user.setId("1001");user.setName("测试用户");messageProducer.sendObjectMessage(user);}}
2. 启动验证
-
启动 RabbitMQ 服务,访问
http://localhost:15672(管理界面),可查看队列、交换机、消息数量。 -
启动 Spring Boot 应用,执行测试方法,控制台会打印生产者发送日志和消费者接收日志。
七、常见问题排查
-
连接失败:检查 RabbitMQ 服务是否启动、地址 / 端口 / 账号密码是否正确、防火墙是否开放 5672 端口。
-
消息发送成功但消费者未接收:检查交换机与队列是否绑定、路由键是否匹配、消费者是否被 @Component 扫描到。
-
对象消息序列化失败:确保对象实现
Serializable接口,或配置 JSON 序列化(需引入jackson-databind依赖)。
