RabbitMQ 七种工作模式全解析
abbitMQ 作为主流消息中间件,提供了七种经典工作模式,覆盖从简单消息传递到复杂路由、RPC 通信等各类场景。
一、核心前置概念
在学习模式前,需先明确三个关键组件的关系:
- Exchange(交换机):接收生产者消息,按规则路由到队列,有四种类型(
fanout
/direct
/topic
/headers
),本文重点覆盖前三种; - Routing Key(路由键):生产者发送消息时指定的“标签”,交换机通过它匹配队列;
- Binding(绑定):将交换机与队列关联,需指定
Binding Key
(与Routing Key
匹配规则决定消息流向)。
所有模式的核心差异在于交换机类型和路由规则,以下逐一解析。
二、七种工作模式详解
1. 简单模式(Simple):点对点通信
核心原理
最简单的“生产者-队列-消费者”模型:一个生产者发送消息到队列,一个消费者从队列接收消息,消息仅被消费一次,无交换机参与(默认使用 RabbitMQ 内置的空字符串交换机)。
模式图
关键特点
- 无交换机,直接使用默认交换机;
- 一对一通信,一个消息仅被一个消费者处理;
- 队列需提前声明,确保消息有存储载体。
适用场景
简单的异步通信场景,如单节点任务通知(如用户注册后发送欢迎邮件)。
实战代码
生产者(发送消息)
public class SimpleProducer {// 队列名称private static final String SIMPLE_QUEUE = "simple_queue";public static void main(String[] args) throws Exception {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("110.41.51.65"); // 服务器IPfactory.setPort(15673); // 端口(文档中自定义为15673)factory.setVirtualHost("bite"); // 虚拟主机factory.setUsername("study"); // 用户名factory.setPassword("study"); // 密码// 2. 建立连接和通道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 3. 声明队列(持久化、非独占、非自动删除)channel.queueDeclare(SIMPLE_QUEUE, true, false, false, null);// 4. 发送消息(默认交换机,路由键=队列名)String msg = "Hello Simple Mode!";channel.basicPublish("", SIMPLE_QUEUE, null, msg.getBytes());System.out.println("消息发送成功:" + msg);}}
}
消费者(接收消息)
public class SimpleConsumer {private static final String SIMPLE_QUEUE = "simple_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同生产者...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列(与生产者一致,确保队列存在)channel.queueDeclare(SIMPLE_QUEUE, true, false, false, null);// 消费消息(自动确认)DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(SIMPLE_QUEUE, true, consumer);// 保持监听(消费者需持续运行)System.out.println("消费者已启动,等待消息...");Thread.currentThread().join();}}
}
2. 工作队列模式(Work Queues):任务分发
核心原理
简单模式的扩展:一个生产者发送多条消息到队列,多个消费者竞争消费,每条消息仅被一个消费者处理(RabbitMQ 默认采用“轮询”策略分配消息)。
模式图
关键特点
- 无交换机,使用默认交换机;
- 多消费者竞争队列消息,实现任务负载均衡;
- 适合处理“耗时任务”,避免单消费者积压。
适用场景
集群环境下的异步任务处理,如 12306 订票成功后,多台短信服务器竞争发送通知。
实战代码
生产者(发送10条消息)
public class WorkProducer {private static final String WORK_QUEUE = "work_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(WORK_QUEUE, true, false, false, null);// 发送10条消息for (int i = 0; i < 10; i++) {String msg = "Work Message " + i;channel.basicPublish("", WORK_QUEUE, null, msg.getBytes());System.out.println("发送消息:" + msg);}}}
}
消费者(两个消费者代码一致,仅类名不同)
public class WorkConsumer1 {private static final String WORK_QUEUE = "work_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(WORK_QUEUE, true, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {System.out.println("Consumer1 接收:" + new String(body));}};channel.basicConsume(WORK_QUEUE, true, consumer);System.out.println("Consumer1 启动...");Thread.currentThread().join();}}
}
运行结果
- 先启动两个消费者,再启动生产者;
- 消费者1接收“0、2、4、6、8”,消费者2接收“1、3、5、7、9”,轮询分配。
3. 发布订阅模式(Publish/Subscribe):广播消息
核心原理
引入fanout
类型交换机(广播交换机):生产者发送消息到交换机,交换机会将消息复制到所有绑定的队列,每个队列的消费者都能接收完整消息。
模式图
关键特点
- 交换机类型为
fanout
,路由键(Routing Key
)无效(可设为空); - 消息被广播到所有绑定队列,多个消费者接收相同消息;
- 交换机无存储能力,若无队列绑定,消息会丢失。
适用场景
需多系统同步接收消息的场景,如气象局发布天气预报,新浪、百度等平台同时获取数据。
实战代码
生产者(创建交换机并绑定队列)
public class FanoutProducer {private static final String FANOUT_EXCHANGE = "fanout_exchange";private static final String FANOUT_QUEUE1 = "fanout_queue1";private static final String FANOUT_QUEUE2 = "fanout_queue2";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 1. 声明fanout交换机(持久化)channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);// 2. 声明两个队列channel.queueDeclare(FANOUT_QUEUE1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE2, true, false, false, null);// 3. 绑定队列到交换机(路由键为空)channel.queueBind(FANOUT_QUEUE1, FANOUT_EXCHANGE, "");channel.queueBind(FANOUT_QUEUE2, FANOUT_EXCHANGE, "");// 4. 发送消息String msg = "Hello Publish/Subscribe!";channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes());System.out.println("广播消息发送成功:" + msg);}}
}
消费者(两个消费者分别监听不同队列)
// 消费者1:监听fanout_queue1
public class FanoutConsumer1 {private static final String FANOUT_QUEUE1 = "fanout_queue1";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {System.out.println("Consumer1 接收:" + new String(body));}};channel.basicConsume(FANOUT_QUEUE1, true, consumer);System.out.println("Consumer1 启动...");Thread.currentThread().join();}}
}
运行结果
两个消费者均接收消息:Hello Publish/Subscribe!
。
4. 路由模式(Routing):定向筛选
核心原理
使用direct
类型交换机(定向交换机):生产者发送消息时指定Routing Key
,交换机仅将消息路由到Binding Key 与 Routing Key 完全匹配的队列。
模式图
关键特点
- 交换机类型为
direct
,路由键需精确匹配; - 一个队列可绑定多个
Binding Key
(如 Queue2 绑定 black 和 green); - 实现“按标签筛选消息”,仅符合条件的消费者接收。
适用场景
需按消息类型筛选的场景,如日志系统:error
日志发送到告警队列,info
/debug
日志发送到普通日志队列。
实战代码
生产者(指定不同Routing Key)
public class DirectProducer {private static final String DIRECT_EXCHANGE = "direct_exchange";private static final String DIRECT_QUEUE1 = "direct_queue1"; // 绑定orangeprivate static final String DIRECT_QUEUE2 = "direct_queue2"; // 绑定black/greenpublic static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 1. 声明direct交换机channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);// 2. 声明队列并绑定channel.queueDeclare(DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(DIRECT_QUEUE2, true, false, false, null);channel.queueBind(DIRECT_QUEUE1, DIRECT_EXCHANGE, "orange");channel.queueBind(DIRECT_QUEUE2, DIRECT_EXCHANGE, "black");channel.queueBind(DIRECT_QUEUE2, DIRECT_EXCHANGE, "green");// 3. 发送不同Routing Key的消息channel.basicPublish(DIRECT_EXCHANGE, "orange", null, "Orange Message".getBytes());channel.basicPublish(DIRECT_EXCHANGE, "black", null, "Black Message".getBytes());channel.basicPublish(DIRECT_EXCHANGE, "green", null, "Green Message".getBytes());System.out.println("路由消息发送成功");}}
}
消费者(分别监听不同队列)
// 消费者1:监听direct_queue1(仅接收orange消息)
public class DirectConsumer1 {private static final String DIRECT_QUEUE1 = "direct_queue1";public static void main(String[] args) throws Exception {// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {System.out.println("Consumer1 接收:" + new String(body));}};channel.basicConsume(DIRECT_QUEUE1, true, consumer);}}
}
运行结果
- Consumer1 接收:
Orange Message
; - Consumer2 接收:
Black Message
、Green Message
。
5. 通配符模式(Topics):灵活匹配
核心原理
direct
模式的扩展,使用topic
类型交换机(通配符交换机):Routing Key
和Binding Key
为“点分隔的单词”(如order.pay.error
),支持通配符匹配:
*
:匹配一个单词;#
:匹配零个或多个单词。
模式图
关键特点
- 交换机类型为
topic
,路由键支持通配符,灵活性极高; - 适合复杂的多维度消息筛选,如按“业务+操作+级别”路由。
适用场景
需灵活匹配消息的场景,如电商系统:order.pay.success
路由到支付通知队列,user.login.error
路由到告警队列。
实战代码
生产者(发送不同格式的Routing Key)
public class TopicProducer {private static final String TOPIC_EXCHANGE = "topic_exchange";private static final String TOPIC_QUEUE1 = "topic_queue1"; // 绑定*.errorprivate static final String TOPIC_QUEUE2 = "topic_queue2"; // 绑定#.info/*.errorpublic static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 1. 声明topic交换机channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, false, null);// 2. 声明队列并绑定channel.queueDeclare(TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(TOPIC_QUEUE2, true, false, false, null);channel.queueBind(TOPIC_QUEUE1, TOPIC_EXCHANGE, "*.error");channel.queueBind(TOPIC_QUEUE2, TOPIC_EXCHANGE, "#.info");channel.queueBind(TOPIC_QUEUE2, TOPIC_EXCHANGE, "*.error");// 3. 发送消息channel.basicPublish(TOPIC_EXCHANGE, "order.error", null, "Order Error".getBytes());channel.basicPublish(TOPIC_EXCHANGE, "order.pay.info", null, "Order Pay Info".getBytes());channel.basicPublish(TOPIC_EXCHANGE, "pay.error", null, "Pay Error".getBytes());System.out.println("通配符消息发送成功");}}
}
运行结果
- Consumer1(Queue1)接收:
Order Error
、Pay Error
; - Consumer2(Queue2)接收:
Order Error
、Order Pay Info
、Pay Error
。
6. RPC模式(Remote Procedure Call):远程调用
核心原理
基于 RabbitMQ 实现“请求-响应”通信:客户端发送请求消息到队列,服务端处理后将响应发送到客户端指定的“回调队列”,客户端通过correlationId
(请求唯一标识)匹配请求与响应。
模式图
关键流程
- 客户端声明临时回调队列,生成
correlationId
; - 客户端发送请求消息,消息属性包含
replyTo
(回调队列名)和correlationId
; - 服务端监听请求队列,处理后将响应发送到
replyTo
指定的回调队列; - 客户端监听回调队列,通过
correlationId
确认响应归属。
适用场景
需要同步获取远程服务结果的场景,如分布式系统中“订单系统调用库存系统查询库存”。
实战代码
客户端(发送请求+接收响应)
public class RPCClient {private static final String RPC_QUEUE = "rpc_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 1. 声明请求队列channel.queueDeclare(RPC_QUEUE, true, false, false, null);// 2. 声明临时回调队列(RabbitMQ自动生成队列名)String replyQueue = channel.queueDeclare().getQueue();// 3. 生成请求唯一标识String correlationId = UUID.randomUUID().toString();// 4. 发送请求消息AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(correlationId).replyTo(replyQueue).build();String requestMsg = "查询库存";channel.basicPublish("", RPC_QUEUE, props, requestMsg.getBytes());// 5. 监听回调队列,获取响应BlockingQueue<String> responseQueue = new ArrayBlockingQueue<>(1);channel.basicConsume(replyQueue, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {if (properties.getCorrelationId().equals(correlationId)) {responseQueue.offer(new String(body));}}});// 6. 打印响应String response = responseQueue.take();System.out.println("客户端收到响应:" + response);}}
}
服务端(处理请求+发送响应)
public class RPCServer {private static final String RPC_QUEUE = "rpc_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(RPC_QUEUE, true, false, false, null);// 设置每次仅处理1条消息(避免消息积压)channel.basicQos(1);// 监听请求队列channel.basicConsume(RPC_QUEUE, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {try {// 1. 处理请求String request = new String(body);System.out.println("服务端收到请求:" + request);String response = "库存充足(响应:" + request + ")";// 2. 发送响应到回调队列AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes());// 3. 手动确认消息(确保处理完成)channel.basicAck(envelope.getDeliveryTag(), false);} catch (IOException e) {e.printStackTrace();}}});System.out.println("RPC服务端启动,等待请求...");Thread.currentThread().join();}}
}
运行结果
- 服务端:
服务端收到请求:查询库存
; - 客户端:
客户端收到响应:库存充足(响应:查询库存)
。
7. 发布确认模式(Publisher Confirms):消息可靠性
核心原理
RabbitMQ 提供的消息可靠性保障机制:生产者将信道设为confirm
模式后,所有发送的消息会被分配唯一deliveryTag
,RabbitMQ 处理完消息(如写入磁盘、路由到队列)后,会向生产者发送ACK
(确认)或NACK
(失败),确保消息不丢失。
三种确认策略
策略 | 核心逻辑 | 优点 | 缺点 |
---|---|---|---|
单独确认 | 发送一条消息后,等待waitForConfirms | 简单,失败可定位到单条 | 串行阻塞,性能差 |
批量确认 | 发送一批消息后,批量等待确认 | 性能优于单独确认 | 失败无法定位单条,需重发批量 |
异步确认 | 注册回调,后台处理ACK/NACK | 性能最优,异步非阻塞 | 实现复杂,需维护消息序号集合 |
适用场景
对数据安全性要求极高的场景,如金融交易、订单支付(消息丢失会导致业务异常)。
实战代码(异步确认,性能最优)
public class PublisherConfirm {private static final int MESSAGE_COUNT = 500; // 发送500条消息private static final String CONFIRM_QUEUE = "confirm_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 1. 开启confirm模式channel.confirmSelect();// 2. 声明队列(持久化,确保消息不丢失)channel.queueDeclare(CONFIRM_QUEUE, true, false, true, null);// 3. 维护未确认消息的序号(有序集合,线程安全)SortedSet<Long> unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet<>());// 4. 注册确认回调channel.addConfirmListener(new ConfirmListener() {// 消息确认成功(ACK)@Overridepublic void handleAck(long deliveryTag, boolean multiple) {if (multiple) {// 批量确认:移除小于等于当前deliveryTag的所有序号unconfirmedSet.headSet(deliveryTag + 1).clear();} else {// 单条确认:移除当前序号unconfirmedSet.remove(deliveryTag);}System.out.println("ACK - deliveryTag: " + deliveryTag + ", multiple: " + multiple);}// 消息确认失败(NACK)@Overridepublic void handleNack(long deliveryTag, boolean multiple) {// 失败逻辑:如重发消息System.err.println("NACK - deliveryTag: " + deliveryTag + ", multiple: " + multiple);if (multiple) {unconfirmedSet.headSet(deliveryTag + 1).clear();} else {unconfirmedSet.remove(deliveryTag);}}});// 5. 发送消息long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {// 获取下一条消息的序号long nextSeq = channel.getNextPublishSeqNo();unconfirmedSet.add(nextSeq);// 发送消息channel.basicPublish("", CONFIRM_QUEUE, null, ("Confirm Message " + i).getBytes());}// 6. 等待所有消息确认while (!unconfirmedSet.isEmpty()) {Thread.sleep(10);}long end = System.currentTimeMillis();System.out.println("500条消息异步确认完成,耗时:" + (end - start) + "ms");}}
}
运行结果
- 控制台输出多条
ACK
日志; - 最终打印:
500条消息异步确认完成,耗时:107ms
(异步策略性能最优)。
三、七种模式对比与选型建议
模式 | 交换机类型 | 核心特点 | 适用场景 |
---|---|---|---|
简单模式 | 默认 | 一对一,无路由 | 简单异步通知(如邮件发送) |
工作队列模式 | 默认 | 多消费者竞争,负载均衡 | 集群任务处理(如短信群发) |
发布订阅模式 | fanout | 广播消息,所有消费者接收相同内容 | 多系统同步数据(如天气预报) |
路由模式 | direct | 精确路由,按Routing Key完全匹配 | 按类型筛选消息(如日志分级) |
通配符模式 | topic | 通配符路由,灵活匹配多维度消息 | 复杂筛选(如电商业务消息) |
RPC模式 | 默认 | 请求-响应,同步获取远程结果 | 分布式远程调用(如库存查询) |
发布确认模式 | 任意 | 消息可靠性保障,ACK/NACK确认 | 金融、订单等核心业务 |