Spring Boot 整合 RabbitMQ :四大核心模式解析
Spring Boot 整合 RabbitMQ 大幅简化了开发流程,核心是通过
spring-boot-starter-amqp依赖封装底层细节,通过RabbitTemplate和@RabbitListener实现消息收发。本文讲解的四大模式覆盖了多数业务场景:
- 工作队列:多消费者负载均衡;
- 发布订阅:消息广播;
- 路由:精确匹配筛选;
- 通配符:灵活多维度筛选。
一、整合前置准备:依赖与基础配置
无论哪种模式,整合的前置步骤一致,核心是引入依赖并配置 RabbitMQ 连接信息。
1.1 引入 Maven 依赖
在 Spring Boot 项目的 pom.xml 中,添加 RabbitMQ 核心依赖与 Web 依赖(用于接口测试):

<!-- RabbitMQ 整合依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Web 依赖:用于编写接口发送消息 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 测试依赖(可选) -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope>
</dependency>
spring-boot-starter-amqp 内置了 RabbitMQ 客户端(默认 Lettuce),并封装了 RabbitTemplate(消息发送模板)、@RabbitListener(消息监听注解)等核心组件,无需额外引入客户端依赖。
1.2 配置 RabbitMQ 连接信息
在 src/main/resources/application.yml 中,配置 RabbitMQ 服务器地址、端口、虚拟主机等信息(与文档中一致,使用云服务器映射地址):
spring:rabbitmq:host: 110.41.51.65 # 服务器IP(文档中示例地址)port: 5672 # 端口(文档中自定义为15673,默认5672)username: study # 用户名(文档中创建的专用用户)password: study # 密码(与用户名对应)virtual-host: bite # 虚拟主机(文档中创建的隔离空间,默认/)# 可选:通过addresses简化配置(二选一即可)# addresses: amqp://study:study@110.41.51.65:15673/bite
配置后,Spring Boot 会自动创建 ConnectionFactory、RabbitTemplate 等 Bean,无需手动初始化。
二、模式一:工作队列模式(Work Queues)
工作队列模式通过多消费者竞争同一队列的消息,实现任务负载均衡,适用于集群环境下的异步任务处理(如 12306 短信通知)。
2.1 核心原理
- 生产者发送多条消息到队列;
- 多个消费者监听同一队列,RabbitMQ 采用“轮询”策略分配消息,每条消息仅被一个消费者处理;
- 无需交换机,使用 RabbitMQ 内置的默认交换机(空字符串)。
2.2 代码实现
步骤1:声明队列(配置类)
通过 @Configuration 类声明持久化队列,确保服务重启后队列不丢失:
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;// 常量类:存储队列名称(文档中推荐统一管理)
class Constants {public static final String WORK_QUEUE = "work_queue";
}@Configuration
public class RabbitMQConfig {// 声明工作队列(durable=true:持久化)@Bean("workQueue")public Queue workQueue() {return QueueBuilder.durable(Constants.WORK_QUEUE).build();}
}
步骤2:编写生产者(接口发送消息)
通过 RabbitTemplate 发送消息,用 HTTP 接口触发(方便测试):
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/producer")
public class ProducerController {// 注入Spring自动创建的RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;// 工作队列模式:发送10条消息@RequestMapping("/work")public String sendWorkMessage() {for (int i = 0; i < 10; i++) {String msg = "Work Message " + i;// 发送消息:默认交换机(空字符串),路由键=队列名rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, msg);}return "工作队列消息发送成功!";}
}
步骤3:编写消费者(监听队列)
通过 @RabbitListener 注解声明消费者,多个消费者监听同一队列:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class WorkListener {// 消费者1:监听work_queue队列@RabbitListener(queues = Constants.WORK_QUEUE)public void listenWorkQueue1(Message message) {String msg = new String(message.getBody());System.out.println("消费者1接收:" + msg);}// 消费者2:监听同一队列(竞争消息)@RabbitListener(queues = Constants.WORK_QUEUE)public void listenWorkQueue2(Message message) {String msg = new String(message.getBody());System.out.println("消费者2接收:" + msg);}
}
@RabbitListener 是 Spring AMQP 的核心注解,支持直接指定队列名,无需手动绑定。
2.3 运行验证
- 启动项目:Spring Boot 会自动创建
work_queue队列; - 发送消息:访问接口
http://127.0.0.1:8080/producer/work,返回“工作队列消息发送成功!”; - 观察日志:消费者1接收“0、2、4、6、8”,消费者2接收“1、3、5、7、9”,符合轮询分配规则。
三、模式二:发布订阅模式(Publish/Subscribe)
发布订阅模式通过
fanout类型交换机,将消息广播到所有绑定的队列,适用于多系统同步接收消息(如气象局推送天气预报)。
3.1 核心原理
- 生产者发送消息到
fanout交换机; - 交换机将消息复制到所有绑定的队列;
- 每个队列的消费者都能接收完整消息,实现“一条消息多端消费”;
fanout交换机忽略路由键(Routing Key),绑定键(Binding Key)可设为空。
3.2 代码实现
步骤1:声明交换机、队列与绑定关系
在 RabbitMQConfig 中添加交换机、队列声明,以及两者的绑定:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;class Constants {// 发布订阅模式:交换机+队列名称public static final String FANOUT_EXCHANGE = "fanout_exchange";public static final String FANOUT_QUEUE1 = "fanout_queue1";public static final String FANOUT_QUEUE2 = "fanout_queue2";
}@Configuration
public class RabbitMQConfig {// 1. 声明fanout交换机(durable=true:持久化)@Bean("fanoutExchange")public FanoutExchange fanoutExchange() {return new FanoutExchange(Constants.FANOUT_EXCHANGE, true, false);}// 2. 声明两个队列@Bean("fanoutQueue1")public Queue fanoutQueue1() {return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();}@Bean("fanoutQueue2")public Queue fanoutQueue2() {return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();}// 3. 绑定队列1到交换机@Beanpublic Binding bindFanoutQueue1(@Qualifier("fanoutExchange") FanoutExchange exchange,@Qualifier("fanoutQueue1") Queue queue) {return BindingBuilder.bind(queue).to(exchange); // 绑定键为空}// 4. 绑定队列2到交换机@Beanpublic Binding bindFanoutQueue2(@Qualifier("fanoutExchange") FanoutExchange exchange,@Qualifier("fanoutQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange);}
}
步骤2:编写生产者(发送广播消息)
在 ProducerController 中添加接口,发送消息到 fanout 交换机:
@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;// 发布订阅模式:发送广播消息@RequestMapping("/fanout")public String sendFanoutMessage() {String msg = "Hello Publish/Subscribe!";// 发送消息到fanout交换机,路由键为空(fanout忽略路由键)rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "", msg);return "广播消息发送成功!";}
}
步骤3:编写消费者(监听不同队列)
创建两个消费者,分别监听 fanout_queue1 和 fanout_queue2:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class FanoutListener {// 消费者1:监听fanout_queue1@RabbitListener(queues = Constants.FANOUT_QUEUE1)public void listenFanoutQueue1(String msg) {System.out.println("消费者1(fanout_queue1)接收:" + msg);}// 消费者2:监听fanout_queue2@RabbitListener(queues = Constants.FANOUT_QUEUE2)public void listenFanoutQueue2(String msg) {System.out.println("消费者2(fanout_queue2)接收:" + msg);}
}
3.3 运行验证
- 发送消息:访问
http://127.0.0.1:8080/producer/fanout; - 观察日志:两个消费者均打印
Hello Publish/Subscribe!,消息被成功广播; - 管理界面验证:进入 RabbitMQ 管理界面(
http://110.41.51.65:15672),切换到bite虚拟主机,可看到fanout_exchange已绑定两个队列。
四、模式三:路由模式(Routing)
路由模式通过
direct类型交换机,按“路由键(Routing Key)完全匹配”筛选消息,适用于按类型分发消息(如日志系统分级别处理)。
4.1 核心原理
- 生产者发送消息时指定
Routing Key; - 交换机类型为
direct,仅将消息路由到“绑定键(Binding Key与Routing Key完全匹配)”的队列; - 一个队列可绑定多个
Binding Key(如队列绑定black和green,可接收两种路由键的消息)。
4.2 代码实现
步骤1:声明交换机、队列与绑定关系
在 RabbitMQConfig 中添加 direct 交换机、队列及绑定(指定绑定键):
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;class Constants {// 路由模式:交换机+队列名称public static final String DIRECT_EXCHANGE = "direct_exchange";public static final String DIRECT_QUEUE1 = "direct_queue1"; // 绑定orangepublic static final String DIRECT_QUEUE2 = "direct_queue2"; // 绑定black/green
}@Configuration
public class RabbitMQConfig {// 1. 声明direct交换机@Bean("directExchange")public DirectExchange directExchange() {return new DirectExchange(Constants.DIRECT_EXCHANGE, true, false);}// 2. 声明两个队列@Bean("directQueue1")public Queue directQueue1() {return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();}@Bean("directQueue2")public Queue directQueue2() {return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();}// 3. 绑定队列1到交换机(绑定键=orange)@Beanpublic Binding bindDirectQueue1(@Qualifier("directExchange") DirectExchange exchange,@Qualifier("directQueue1") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("orange");}// 4. 绑定队列2到交换机(绑定键=black)@Beanpublic Binding bindDirectQueue2(@Qualifier("directExchange") DirectExchange exchange,@Qualifier("directQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("black");}// 5. 绑定队列2到交换机(绑定键=green)@Beanpublic Binding bindDirectQueue3(@Qualifier("directExchange") DirectExchange exchange,@Qualifier("directQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("green");}
}
步骤2:编写生产者(指定Routing Key)
在 ProducerController 中添加接口,支持动态传入 Routing Key:
@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;// 路由模式:按Routing Key发送消息@RequestMapping("/direct")public String sendDirectMessage(String routingKey) {String msg = "Hello Routing! Key: " + routingKey;// 发送消息到direct交换机,指定Routing KeyrabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey, msg);return "路由消息发送成功!Key:" + routingKey;}
}
步骤3:编写消费者(监听不同队列)
创建两个消费者,分别监听 direct_queue1 和 direct_queue2:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class DirectListener {// 消费者1:监听direct_queue1(仅接收orange消息)@RabbitListener(queues = Constants.DIRECT_QUEUE1)public void listenDirectQueue1(String msg) {System.out.println("消费者1(direct_queue1)接收:" + msg);}// 消费者2:监听direct_queue2(接收black/green消息)@RabbitListener(queues = Constants.DIRECT_QUEUE2)public void listenDirectQueue2(String msg) {System.out.println("消费者2(direct_queue2)接收:" + msg);}
}
4.3 运行验证
- 发送 orange 消息:访问
http://127.0.0.1:8080/producer/direct?routingKey=orange,消费者1打印Hello Routing! Key: orange; - 发送 black 消息:访问
http://127.0.0.1:8080/producer/direct?routingKey=black,消费者2打印Hello Routing! Key: black; - 发送 green 消息:访问
http://127.0.0.1:8080/producer/direct?routingKey=green,消费者2打印Hello Routing! Key: green。
五、模式四:通配符模式(Topics)
通配符模式是路由模式的扩展,通过
topic类型交换机支持“通配符匹配”,适用于复杂的多维度消息筛选(如电商系统按“业务+操作+级别”路由)。
5.1 核心原理
Routing Key和Binding Key为“点分隔的单词”(如order.pay.error);- 支持两种通配符:
*:匹配一个单词(如*.error匹配order.error,不匹配order.pay.error);#:匹配零个或多个单词(如#.info匹配info、order.pay.info);
- 交换机类型为
topic,按通配符规则路由消息。
5.2 代码实现
步骤1:声明交换机、队列与绑定关系
在 RabbitMQConfig 中添加 topic 交换机、队列及通配符绑定:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;class Constants {// 通配符模式:交换机+队列名称public static final String TOPIC_EXCHANGE = "topic_exchange";public static final String TOPIC_QUEUE1 = "topic_queue1"; // 绑定*.errorpublic static final String TOPIC_QUEUE2 = "topic_queue2"; // 绑定#.info/*.error
}@Configuration
public class RabbitMQConfig {// 1. 声明topic交换机@Bean("topicExchange")public TopicExchange topicExchange() {return new TopicExchange(Constants.TOPIC_EXCHANGE, true, false);}// 2. 声明两个队列@Bean("topicQueue1")public Queue topicQueue1() {return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();}@Bean("topicQueue2")public Queue topicQueue2() {return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();}// 3. 绑定队列1到交换机(绑定键=*.error)@Beanpublic Binding bindTopicQueue1(@Qualifier("topicExchange") TopicExchange exchange,@Qualifier("topicQueue1") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("*.error");}// 4. 绑定队列2到交换机(绑定键=#.info)@Beanpublic Binding bindTopicQueue2(@Qualifier("topicExchange") TopicExchange exchange,@Qualifier("topicQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("#.info");}// 5. 绑定队列2到交换机(绑定键=*.error)@Beanpublic Binding bindTopicQueue3(@Qualifier("topicExchange") TopicExchange exchange,@Qualifier("topicQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("*.error");}
}
步骤2:编写生产者(动态传入Routing Key)
在 ProducerController 中添加接口,支持复杂格式的 Routing Key:
@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;// 通配符模式:发送带复杂Routing Key的消息@RequestMapping("/topics")public String sendTopicsMessage(String routingKey) {String msg = "Hello Topics! Key: " + routingKey;// 发送消息到topic交换机,指定Routing KeyrabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, routingKey, msg);return "通配符消息发送成功!Key:" + routingKey;}
}
步骤3:编写消费者(监听不同队列)
创建两个消费者,分别监听 topic_queue1 和 topic_queue2:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class TopicListener {// 消费者1:监听topic_queue1(仅接收*.error消息)@RabbitListener(queues = Constants.TOPIC_QUEUE1)public void listenTopicQueue1(String msg) {System.out.println("消费者1(topic_queue1)接收:" + msg);}// 消费者2:监听topic_queue2(接收#.info/*.error消息)@RabbitListener(queues = Constants.TOPIC_QUEUE2)public void listenTopicQueue2(String msg) {System.out.println("消费者2(topic_queue2)接收:" + msg);}
}
5.3 运行验证
- 发送 order.error 消息:访问
http://127.0.0.1:8080/producer/topics?routingKey=order.error,消费者1和2均接收消息; - 发送 order.pay.info 消息:访问
http://127.0.0.1:8080/producer/topics?routingKey=order.pay.info,仅消费者2接收消息; - 发送 pay.error 消息:访问
http://127.0.0.1:8080/producer/topics?routingKey=pay.error,消费者1和2均接收消息。
六、整合核心组件与注意事项
6.1 核心组件总结
| 组件 | 作用 | 关键特性 |
|---|---|---|
RabbitTemplate | 消息发送模板 | 自动管理连接/通道,支持消息序列化 |
@RabbitListener | 消息监听注解 | 可加在类/方法上,支持多参数类型(String、Message等) |
QueueBuilder | 队列构建工具 | 支持链式配置持久化、自动删除等属性 |
BindingBuilder | 绑定构建工具 | 支持交换机与队列的灵活绑定(指定绑定键) |
6.2 注意事项
- 队列/交换机持久化:生产环境需设置
durable=true,避免 RabbitMQ 重启后组件丢失; - 消息序列化:若发送对象消息,需配置
Jackson2JsonMessageConverter(文档中推荐 JSON 序列化); - 消费者启动顺序:建议先启动消费者再发送消息,避免消息因无消费者监听而丢失(或配置队列持久化);
- 虚拟主机隔离:不同业务应使用独立虚拟主机(如
order-vhost、log-vhost),通过配置virtual-host实现隔离。
