【RabbitMq】七种工作模式
目录
RabbitMQ 核心概念快速回顾
RabbitMQ 的七种工作模式详解
1. 简单模式 (Simple Mode)
2. 工作队列模式 (Work Queue Mode)
3. 发布/订阅模式 (Publish/Subscribe Mode)
4. 路由模式 (Routing Mode)
5. 通配符模式 (Topics Mode)
6. RPC 模式 (RPC Mode)
7. 发布确认模式 (Publisher Confirms Mode)
总结与选择指南
RabbitMQ 核心概念快速回顾
在深入工作模式前,先了解几个贯穿所有模式的基础概念:
-
Producer(生产者):发送消息的应用程序。
-
Consumer(消费者):接收并处理消息的应用程序。
-
Broker(服务器):指 RabbitMQ 服务本身,负责接收、存储和转发消息。
-
Connection(连接):生产者/消费者与 Broker 之间的 TCP 长连接。
-
Channel(信道):在 Connection 上建立的轻量级逻辑连接,用于执行具体的 AMQP 指令。多个 Channel 可复用同一个 TCP 连接,减少开销。
-
Virtual Host(虚拟主机):类似于命名空间,用于在单个 Broker 内进行资源逻辑隔离。
-
Exchange(交换机):消息的路由中心,负责接收生产者发送的消息,并根据类型和规则将消息分发到一个或多个队列。本身不存储消息。
-
Queue(队列):存储消息的缓冲区,消费者从中获取消息。
-
Binding(绑定):连接 Exchange 和 Queue 的规则,定义了消息如何从交换机路由到队列。
-
Routing Key(路由键):生产者发送消息时指定的一个关键字,Exchange 根据它来决定消息的路由路径。
-
Binding Key(绑定键):在 Binding 过程中,队列与交换机关联时指定的关键字,用于和 Routing Key 进行匹配。
RabbitMQ 的七种工作模式详解
1. 简单模式 (Simple Mode)
-
模式简介:最简单的点对点通信模型。一个生产者直接向一个队列发送消息,一个消费者从该队列消费消息。此模式通常使用 RabbitMQ 默认的匿名交换机(名称为空字符串
""
)。 -
工作原理图:
-
代码示例(Java):
-
生产者:
import com.rabbitmq.client.*;public class SimpleProducer {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列(持久化、非独占、不自动删除)channel.queueDeclare(QUEUE_NAME, true, false, false, null);String message = "Hello, Simple Mode!";// 使用默认交换机,路由键为队列名channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}} }
-
消费者:
import com.rabbitmq.client.*;public class SimpleConsumer {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });} }
-
-
应用场景:简单的任务通知,如发送单条短信或邮件,只需要一个消费者处理的场景。
2. 工作队列模式 (Work Queue Mode)
-
模式简介:一个生产者,一个队列,但对应多个消费者。队列中的消息会被平均地分配给各个消费者(默认轮询方式),每个消息只能被一个消费者处理。
-
核心概念:消息预取 (QoS):通过
channel.basicQos(prefetchCount)
方法限制消费者一次最多预取的消息数量,实现公平分发,防止处理慢的消费者积压消息。 -
工作原理图:
-
代码示例(Java):
-
生产者(发送多条消息):
// ... (连接工厂设置同简单模式) try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare("work_queue", true, false, false, null);for (int i = 0; i < 10; i++) {String message = "Task " + i;channel.basicPublish("", "work_queue", null, message.getBytes());} }
-
消费者(需启动多个实例,并设置 QoS):
// ... (连接工厂设置同简单模式) Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work_queue", true, false, false, null); // 设置公平分发,一次只预取一条消息 channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 模拟耗时任务try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {// 手动确认消息处理完成channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} }; // 关闭自动确认,改为手动确认 channel.basicConsume("work_queue", false, deliverCallback, consumerTag -> { });
-
-
应用场景:任务分发和负载均衡,例如处理大量订单,由多个工作进程并行处理。
3. 发布/订阅模式 (Publish/Subscribe Mode)
-
模式简介:引入 Fanout 类型的交换机。生产者将消息发送到交换机,交换机会将消息广播到所有与之绑定的队列。每个队列都有一个消费者,因此一条消息会被多个消费者同时收到。
-
核心概念:Fanout Exchange:不处理任何 Routing Key,会将收到的消息广播到所有绑定到它的队列。
-
工作原理图:
-
代码示例(Java):
-
生产者(声明 Fanout 交换机并发送消息):
// ... (连接工厂设置) try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明一个Fanout类型的交换机channel.exchangeDeclare("logs_exchange", BuiltinExchangeType.FANOUT);String message = "Broadcast Message!";// 发送消息到交换机,路由键在Fanout模式下可为空channel.basicPublish("logs_exchange", "", null, message.getBytes()); }
-
消费者(每个消费者创建临时队列并绑定到交换机):
// ... (连接工厂设置) Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个临时队列(非持久化、独占、自动删除) String queueName = channel.queueDeclare().getQueue(); // 将该队列绑定到指定的Fanout交换机,路由键为空 channel.queueBind(queueName, "logs_exchange", ""); DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
-
-
应用场景:系统广播、事件通知。例如,用户注册后需同时发送邮件、初始化资料、写日志等。
4. 路由模式 (Routing Mode)
-
模式简介:使用 Direct 类型的交换机。队列在绑定时会指定一个 Binding Key。生产者发送消息时指定 Routing Key。交换机只会将消息路由给那些 Binding Key 与 Routing Key 完全匹配 的队列。
-
核心概念:Direct Exchange:基于 Routing Key 的精确匹配进行路由。
-
工作原理图:
-
代码示例(Java,使用 Spring AMQP 示例):
-
配置类(声明 Direct Exchange、队列和绑定):
@Configuration public class RabbitConfig {@Beanpublic DirectExchange directExchange() {return new DirectExchange("exchange.direct");}@Beanpublic Queue queueA() {return new Queue("queue.direct.a");}@Beanpublic Queue queueB() {return new Queue("queue.direct.b");}@Beanpublic Binding bindingA(DirectExchange directExchange, Queue queueA) {// 将队列A绑定到交换机,Binding Key为"error"return BindingBuilder.bind(queueA).to(directExchange).with("error");}@Beanpublic Binding bindingB(DirectExchange directExchange, Queue queueB) {// 将队列B绑定到交换机,Binding Key为"info"return BindingBuilder.bind(queueB).to(directExchange).with("info");} }
-
生产者(发送消息时指定 Routing Key):
@Service public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendErrorMsg() {String message = "An error occurred!";// 发送到Direct Exchange,指定Routing Key为"error"rabbitTemplate.convertAndSend("exchange.direct", "error", message);}public void sendInfoMsg() {String message = "This is an info message.";// 发送到Direct Exchange,指定Routing Key为"info"rabbitTemplate.convertAndSend("exchange.direct", "info", message);} }
-
消费者(监听特定队列):
@Service public class MessageReceiver {@RabbitListener(queues = "queue.direct.a")public void receiveErrorMsg(String message) {System.out.println("Error Queue Received: " + message);}@RabbitListener(queues = "queue.direct.b")public void receiveInfoMsg(String message) {System.out.println("Info Queue Received: " + message);} }
-
-
应用场景:有选择地接收消息。例如日志系统,将 error、warning、info 日志分别路由到不同队列处理。
5. 通配符模式 (Topics Mode)
-
模式简介:路由模式的增强版,使用 Topic 类型的交换机。Binding Key 和 Routing Key 都是包含多部分的字符串(用点
.
分隔)。支持两种通配符进行模糊匹配:*
(匹配一个单词)和#
(匹配零个或多个单词)。 -
核心概念:Topic Exchange:基于通配符匹配的路由机制,非常灵活。
-
工作原理图:
-
代码示例(Java):
-
生产者(发送带有复杂 Routing Key 的消息):
// ... (连接工厂设置) try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明一个Topic类型的交换机channel.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);String routingKey = "quick.orange.rabbit";String message = "A quick orange rabbit message.";channel.basicPublish("topic_logs", routingKey, null, message.getBytes()); }
-
消费者(使用通配符绑定模式):
// ... (连接工厂设置) Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queueName = channel.queueDeclare().getQueue(); // 绑定使用通配符,接收所有中间单词为orange的消息 channel.queueBind(queueName, "topic_logs", "*.orange.*"); DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
-
-
应用场景:需要高度灵活性的消息路由。例如新闻推送,用户可订阅如
sports.*
(所有体育新闻)、*.europe.weather
(欧洲天气)等主题。
6. RPC 模式 (RPC Mode)
-
模式简介:利用 RabbitMQ 实现类似远程过程调用。客户端发送请求消息,并在消息属性中设置一个回调队列 (
replyTo
) 和唯一标识 (correlationId
)。服务端处理请求后,将响应发送到回调队列,客户端通过correlationId
匹配响应。 -
核心概念:回调队列 (Reply-To Queue) 和 关联ID (Correlation ID):用于匹配请求和响应。
-
工作原理图:
-
代码示例(Java,关键步骤概览):
-
客户端:
// 生成唯一的correlationId和回调队列名 String corrId = java.util.UUID.randomUUID().toString(); String replyQueueName = channel.queueDeclare().getQueue(); // 设置消息属性 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build(); // 发送请求到请求队列 channel.basicPublish("", "rpc_queue", props, message.getBytes()); // 监听回调队列,等待响应
-
服务端:
// 从请求队列消费消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 处理请求...String response = processRequest(new String(delivery.getBody()));// 获取客户端指定的回调队列名String replyTo = delivery.getProperties().getReplyTo();// 发送响应到回调队列,并设置相同的correlationIdAMQP.BasicProperties respProps = new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();channel.basicPublish("", replyTo, respProps, response.getBytes()); }; channel.basicConsume("rpc_queue", true, deliverCallback, consumerTag -> { });
-
-
应用场景:需要等待服务端处理结果的后端交互,如计算密集型任务。
7. 发布确认模式 (Publisher Confirms Mode)
-
模式简介:一种可靠性投递机制,而非新的通信拓扑。生产者通过将信道设置为 Confirm 模式,可以确保消息成功发送到 RabbitMQ 服务器。服务器会异步返回一个确认 (ACK) 或未确认 (NACK) 回执。
-
核心概念:发布者确认 (Publisher Confirm):RabbitMQ 提供的消息可靠投递机制。
-
工作原理图:
-
代码示例(Java,异步确认):
// 将通道设置为Confirm模式 channel.confirmSelect(); // 添加异步确认监听器 channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {// 消息成功到达BrokerSystem.out.println("Message with tag " + deliveryTag + " ACK'd");}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {// 消息可能未到达Broker,需要重发System.out.println("Message with tag " + deliveryTag + " NACK'd");} }); // 发送消息 String message = "Important message!"; channel.basicPublish("", "confirmed_queue", null, message.getBytes());
-
应用场景:对消息可靠性要求极高的场景,如金融交易、订单支付等。
总结与选择指南
下表总结了七种模式的核心特点,助您按需选择:
工作模式 | 核心组件 | 路由规则 | 典型应用场景 |
---|---|---|---|
简单模式 | 默认交换机 | 队列名精确匹配 | 单任务通知 |
工作队列模式 | 默认交换机 | 轮询分发 | 任务分发、负载均衡 |
发布/订阅模式 | Fanout Exchange | 广播 | 事件广播、系统通知 |
路由模式 | Direct Exchange | Routing Key 精确匹配 | 有选择的消息路由(如日志分级) |
通配符模式 | Topic Exchange | Routing Key 通配符匹配 | 灵活的主题订阅(如新闻推送) |
RPC模式 | 两个队列 | 请求/响应配对 | 需要同步结果的远程调用 |
发布确认模式 | Confirm机制 | 可靠性确认 | 确保消息可靠投递 |
建议:
-
消息持久化:对于重要消息,将队列 (
durable=true
) 和消息 (deliveryMode=2
) 都设置为持久化。 -
手动ACK:在消费者端使用手动确认,确保消息被成功处理后再从队列中移除。
-
QoS预取:在工作队列模式中,使用
channel.basicQos(1)
实现公平分发。 -
连接复用:使用连接池和每个线程独立的 Channel 来优化性能。