当前位置: 首页 > news >正文

RabbitMQ 七种工作模式全解析

abbitMQ 作为主流消息中间件,提供了七种经典工作模式,覆盖从简单消息传递到复杂路由、RPC 通信等各类场景。

一、核心前置概念

在学习模式前,需先明确三个关键组件的关系:

  • Exchange(交换机):接收生产者消息,按规则路由到队列,有四种类型(fanout/direct/topic/headers),本文重点覆盖前三种;
  • Routing Key(路由键):生产者发送消息时指定的“标签”,交换机通过它匹配队列;
  • Binding(绑定):将交换机与队列关联,需指定Binding Key(与Routing Key匹配规则决定消息流向)。

所有模式的核心差异在于交换机类型路由规则,以下逐一解析。

二、七种工作模式详解

1. 简单模式(Simple):点对点通信

核心原理

最简单的“生产者-队列-消费者”模型:一个生产者发送消息到队列,一个消费者从队列接收消息,消息仅被消费一次,无交换机参与(默认使用 RabbitMQ 内置的空字符串交换机)。

模式图

简单模式

关键特点
  • 无交换机,直接使用默认交换机;
  • 一对一通信,一个消息仅被一个消费者处理;
  • 队列需提前声明,确保消息有存储载体。
适用场景

简单的异步通信场景,如单节点任务通知(如用户注册后发送欢迎邮件)。

实战代码
生产者(发送消息)
public class SimpleProducer {// 队列名称private static final String SIMPLE_QUEUE = "simple_queue";public static void main(String[] args) throws Exception {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("110.41.51.65"); // 服务器IPfactory.setPort(15673);         // 端口(文档中自定义为15673)factory.setVirtualHost("bite");  // 虚拟主机factory.setUsername("study");    // 用户名factory.setPassword("study");    // 密码// 2. 建立连接和通道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 3. 声明队列(持久化、非独占、非自动删除)channel.queueDeclare(SIMPLE_QUEUE, true, false, false, null);// 4. 发送消息(默认交换机,路由键=队列名)String msg = "Hello Simple Mode!";channel.basicPublish("", SIMPLE_QUEUE, null, msg.getBytes());System.out.println("消息发送成功:" + msg);}}
}
消费者(接收消息)
public class SimpleConsumer {private static final String SIMPLE_QUEUE = "simple_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同生产者...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列(与生产者一致,确保队列存在)channel.queueDeclare(SIMPLE_QUEUE, true, false, false, null);// 消费消息(自动确认)DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(SIMPLE_QUEUE, true, consumer);// 保持监听(消费者需持续运行)System.out.println("消费者已启动,等待消息...");Thread.currentThread().join();}}
}

2. 工作队列模式(Work Queues):任务分发

核心原理

简单模式的扩展:一个生产者发送多条消息到队列,多个消费者竞争消费,每条消息仅被一个消费者处理(RabbitMQ 默认采用“轮询”策略分配消息)。

模式图

工作队列模式

关键特点
  • 无交换机,使用默认交换机;
  • 多消费者竞争队列消息,实现任务负载均衡;
  • 适合处理“耗时任务”,避免单消费者积压。
适用场景

集群环境下的异步任务处理,如 12306 订票成功后,多台短信服务器竞争发送通知。

实战代码
生产者(发送10条消息)
public class WorkProducer {private static final String WORK_QUEUE = "work_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(WORK_QUEUE, true, false, false, null);// 发送10条消息for (int i = 0; i < 10; i++) {String msg = "Work Message " + i;channel.basicPublish("", WORK_QUEUE, null, msg.getBytes());System.out.println("发送消息:" + msg);}}}
}
消费者(两个消费者代码一致,仅类名不同)
public class WorkConsumer1 {private static final String WORK_QUEUE = "work_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(WORK_QUEUE, true, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {System.out.println("Consumer1 接收:" + new String(body));}};channel.basicConsume(WORK_QUEUE, true, consumer);System.out.println("Consumer1 启动...");Thread.currentThread().join();}}
}
运行结果
  • 先启动两个消费者,再启动生产者;
  • 消费者1接收“0、2、4、6、8”,消费者2接收“1、3、5、7、9”,轮询分配。

3. 发布订阅模式(Publish/Subscribe):广播消息

核心原理

引入fanout类型交换机(广播交换机):生产者发送消息到交换机,交换机会将消息复制到所有绑定的队列,每个队列的消费者都能接收完整消息。

模式图

发布订阅模式

关键特点
  • 交换机类型为fanout,路由键(Routing Key)无效(可设为空);
  • 消息被广播到所有绑定队列,多个消费者接收相同消息;
  • 交换机无存储能力,若无队列绑定,消息会丢失。
适用场景

需多系统同步接收消息的场景,如气象局发布天气预报,新浪、百度等平台同时获取数据。

实战代码
生产者(创建交换机并绑定队列)
public class FanoutProducer {private static final String FANOUT_EXCHANGE = "fanout_exchange";private static final String FANOUT_QUEUE1 = "fanout_queue1";private static final String FANOUT_QUEUE2 = "fanout_queue2";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 1. 声明fanout交换机(持久化)channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);// 2. 声明两个队列channel.queueDeclare(FANOUT_QUEUE1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE2, true, false, false, null);// 3. 绑定队列到交换机(路由键为空)channel.queueBind(FANOUT_QUEUE1, FANOUT_EXCHANGE, "");channel.queueBind(FANOUT_QUEUE2, FANOUT_EXCHANGE, "");// 4. 发送消息String msg = "Hello Publish/Subscribe!";channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes());System.out.println("广播消息发送成功:" + msg);}}
}
消费者(两个消费者分别监听不同队列)
// 消费者1:监听fanout_queue1
public class FanoutConsumer1 {private static final String FANOUT_QUEUE1 = "fanout_queue1";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {System.out.println("Consumer1 接收:" + new String(body));}};channel.basicConsume(FANOUT_QUEUE1, true, consumer);System.out.println("Consumer1 启动...");Thread.currentThread().join();}}
}
运行结果

两个消费者均接收消息:Hello Publish/Subscribe!

4. 路由模式(Routing):定向筛选

核心原理

使用direct类型交换机(定向交换机):生产者发送消息时指定Routing Key,交换机仅将消息路由到Binding Key 与 Routing Key 完全匹配的队列。

模式图

路由模式

关键特点
  • 交换机类型为direct,路由键需精确匹配;
  • 一个队列可绑定多个Binding Key(如 Queue2 绑定 black 和 green);
  • 实现“按标签筛选消息”,仅符合条件的消费者接收。
适用场景

需按消息类型筛选的场景,如日志系统:error日志发送到告警队列,info/debug日志发送到普通日志队列。

实战代码
生产者(指定不同Routing Key)
public class DirectProducer {private static final String DIRECT_EXCHANGE = "direct_exchange";private static final String DIRECT_QUEUE1 = "direct_queue1"; // 绑定orangeprivate static final String DIRECT_QUEUE2 = "direct_queue2"; // 绑定black/greenpublic static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 1. 声明direct交换机channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);// 2. 声明队列并绑定channel.queueDeclare(DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(DIRECT_QUEUE2, true, false, false, null);channel.queueBind(DIRECT_QUEUE1, DIRECT_EXCHANGE, "orange");channel.queueBind(DIRECT_QUEUE2, DIRECT_EXCHANGE, "black");channel.queueBind(DIRECT_QUEUE2, DIRECT_EXCHANGE, "green");// 3. 发送不同Routing Key的消息channel.basicPublish(DIRECT_EXCHANGE, "orange", null, "Orange Message".getBytes());channel.basicPublish(DIRECT_EXCHANGE, "black", null, "Black Message".getBytes());channel.basicPublish(DIRECT_EXCHANGE, "green", null, "Green Message".getBytes());System.out.println("路由消息发送成功");}}
}
消费者(分别监听不同队列)
// 消费者1:监听direct_queue1(仅接收orange消息)
public class DirectConsumer1 {private static final String DIRECT_QUEUE1 = "direct_queue1";public static void main(String[] args) throws Exception {// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {System.out.println("Consumer1 接收:" + new String(body));}};channel.basicConsume(DIRECT_QUEUE1, true, consumer);}}
}
运行结果
  • Consumer1 接收:Orange Message
  • Consumer2 接收:Black MessageGreen Message

5. 通配符模式(Topics):灵活匹配

核心原理

direct模式的扩展,使用topic类型交换机(通配符交换机):Routing KeyBinding Key为“点分隔的单词”(如order.pay.error),支持通配符匹配:

  • *:匹配一个单词;
  • #:匹配零个或多个单词。
模式图

通配符模式

关键特点
  • 交换机类型为topic,路由键支持通配符,灵活性极高;
  • 适合复杂的多维度消息筛选,如按“业务+操作+级别”路由。
适用场景

需灵活匹配消息的场景,如电商系统:order.pay.success路由到支付通知队列,user.login.error路由到告警队列。

实战代码
生产者(发送不同格式的Routing Key)
public class TopicProducer {private static final String TOPIC_EXCHANGE = "topic_exchange";private static final String TOPIC_QUEUE1 = "topic_queue1"; // 绑定*.errorprivate static final String TOPIC_QUEUE2 = "topic_queue2"; // 绑定#.info/*.errorpublic static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 1. 声明topic交换机channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, false, null);// 2. 声明队列并绑定channel.queueDeclare(TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(TOPIC_QUEUE2, true, false, false, null);channel.queueBind(TOPIC_QUEUE1, TOPIC_EXCHANGE, "*.error");channel.queueBind(TOPIC_QUEUE2, TOPIC_EXCHANGE, "#.info");channel.queueBind(TOPIC_QUEUE2, TOPIC_EXCHANGE, "*.error");// 3. 发送消息channel.basicPublish(TOPIC_EXCHANGE, "order.error", null, "Order Error".getBytes());channel.basicPublish(TOPIC_EXCHANGE, "order.pay.info", null, "Order Pay Info".getBytes());channel.basicPublish(TOPIC_EXCHANGE, "pay.error", null, "Pay Error".getBytes());System.out.println("通配符消息发送成功");}}
}
运行结果
  • Consumer1(Queue1)接收:Order ErrorPay Error
  • Consumer2(Queue2)接收:Order ErrorOrder Pay InfoPay Error

6. RPC模式(Remote Procedure Call):远程调用

核心原理

基于 RabbitMQ 实现“请求-响应”通信:客户端发送请求消息到队列,服务端处理后将响应发送到客户端指定的“回调队列”,客户端通过correlationId(请求唯一标识)匹配请求与响应。

模式图

RPC模式

关键流程
  1. 客户端声明临时回调队列,生成correlationId
  2. 客户端发送请求消息,消息属性包含replyTo(回调队列名)和correlationId
  3. 服务端监听请求队列,处理后将响应发送到replyTo指定的回调队列;
  4. 客户端监听回调队列,通过correlationId确认响应归属。
适用场景

需要同步获取远程服务结果的场景,如分布式系统中“订单系统调用库存系统查询库存”。

实战代码
客户端(发送请求+接收响应)
public class RPCClient {private static final String RPC_QUEUE = "rpc_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 1. 声明请求队列channel.queueDeclare(RPC_QUEUE, true, false, false, null);// 2. 声明临时回调队列(RabbitMQ自动生成队列名)String replyQueue = channel.queueDeclare().getQueue();// 3. 生成请求唯一标识String correlationId = UUID.randomUUID().toString();// 4. 发送请求消息AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(correlationId).replyTo(replyQueue).build();String requestMsg = "查询库存";channel.basicPublish("", RPC_QUEUE, props, requestMsg.getBytes());// 5. 监听回调队列,获取响应BlockingQueue<String> responseQueue = new ArrayBlockingQueue<>(1);channel.basicConsume(replyQueue, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {if (properties.getCorrelationId().equals(correlationId)) {responseQueue.offer(new String(body));}}});// 6. 打印响应String response = responseQueue.take();System.out.println("客户端收到响应:" + response);}}
}
服务端(处理请求+发送响应)
public class RPCServer {private static final String RPC_QUEUE = "rpc_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(RPC_QUEUE, true, false, false, null);// 设置每次仅处理1条消息(避免消息积压)channel.basicQos(1);// 监听请求队列channel.basicConsume(RPC_QUEUE, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {try {// 1. 处理请求String request = new String(body);System.out.println("服务端收到请求:" + request);String response = "库存充足(响应:" + request + ")";// 2. 发送响应到回调队列AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes());// 3. 手动确认消息(确保处理完成)channel.basicAck(envelope.getDeliveryTag(), false);} catch (IOException e) {e.printStackTrace();}}});System.out.println("RPC服务端启动,等待请求...");Thread.currentThread().join();}}
}
运行结果
  • 服务端:服务端收到请求:查询库存
  • 客户端:客户端收到响应:库存充足(响应:查询库存)

7. 发布确认模式(Publisher Confirms):消息可靠性

核心原理

RabbitMQ 提供的消息可靠性保障机制:生产者将信道设为confirm模式后,所有发送的消息会被分配唯一deliveryTag,RabbitMQ 处理完消息(如写入磁盘、路由到队列)后,会向生产者发送ACK(确认)或NACK(失败),确保消息不丢失。

三种确认策略
策略核心逻辑优点缺点
单独确认发送一条消息后,等待waitForConfirms简单,失败可定位到单条串行阻塞,性能差
批量确认发送一批消息后,批量等待确认性能优于单独确认失败无法定位单条,需重发批量
异步确认注册回调,后台处理ACK/NACK性能最优,异步非阻塞实现复杂,需维护消息序号集合

发布确认模式

适用场景

对数据安全性要求极高的场景,如金融交易、订单支付(消息丢失会导致业务异常)。

实战代码(异步确认,性能最优)
public class PublisherConfirm {private static final int MESSAGE_COUNT = 500; // 发送500条消息private static final String CONFIRM_QUEUE = "confirm_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 配置同前...try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 1. 开启confirm模式channel.confirmSelect();// 2. 声明队列(持久化,确保消息不丢失)channel.queueDeclare(CONFIRM_QUEUE, true, false, true, null);// 3. 维护未确认消息的序号(有序集合,线程安全)SortedSet<Long> unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet<>());// 4. 注册确认回调channel.addConfirmListener(new ConfirmListener() {// 消息确认成功(ACK)@Overridepublic void handleAck(long deliveryTag, boolean multiple) {if (multiple) {// 批量确认:移除小于等于当前deliveryTag的所有序号unconfirmedSet.headSet(deliveryTag + 1).clear();} else {// 单条确认:移除当前序号unconfirmedSet.remove(deliveryTag);}System.out.println("ACK - deliveryTag: " + deliveryTag + ", multiple: " + multiple);}// 消息确认失败(NACK)@Overridepublic void handleNack(long deliveryTag, boolean multiple) {// 失败逻辑:如重发消息System.err.println("NACK - deliveryTag: " + deliveryTag + ", multiple: " + multiple);if (multiple) {unconfirmedSet.headSet(deliveryTag + 1).clear();} else {unconfirmedSet.remove(deliveryTag);}}});// 5. 发送消息long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {// 获取下一条消息的序号long nextSeq = channel.getNextPublishSeqNo();unconfirmedSet.add(nextSeq);// 发送消息channel.basicPublish("", CONFIRM_QUEUE, null, ("Confirm Message " + i).getBytes());}// 6. 等待所有消息确认while (!unconfirmedSet.isEmpty()) {Thread.sleep(10);}long end = System.currentTimeMillis();System.out.println("500条消息异步确认完成,耗时:" + (end - start) + "ms");}}
}
运行结果
  • 控制台输出多条ACK日志;
  • 最终打印:500条消息异步确认完成,耗时:107ms(异步策略性能最优)。

三、七种模式对比与选型建议

模式交换机类型核心特点适用场景
简单模式默认一对一,无路由简单异步通知(如邮件发送)
工作队列模式默认多消费者竞争,负载均衡集群任务处理(如短信群发)
发布订阅模式fanout广播消息,所有消费者接收相同内容多系统同步数据(如天气预报)
路由模式direct精确路由,按Routing Key完全匹配按类型筛选消息(如日志分级)
通配符模式topic通配符路由,灵活匹配多维度消息复杂筛选(如电商业务消息)
RPC模式默认请求-响应,同步获取远程结果分布式远程调用(如库存查询)
发布确认模式任意消息可靠性保障,ACK/NACK确认金融、订单等核心业务
http://www.dtcms.com/a/513245.html

相关文章:

  • 【记录】Unity|Unity从安装到打开一个Github项目(以我的世界(仿)为例)
  • 网站域名管理中心阿里云轻量级服务器搭建wordpress
  • 网站企业备案代理国外psd免费下载网站
  • 网站内链wordpress插件中国电力建设协会网站
  • 浙江建设信息港网站笔记转wordpress
  • 互动网站设计与制作做企业网站那家好
  • 免费浏览网站推广常熟做网站公司排名
  • 网站如何做跳转wordpress 启用插件代码
  • 设计商城网站陕西建设监理证书查询网站
  • AI学习日记——PyTorch深度学习快速入门:神经网络构建与训练实战
  • 建材网站织梦房产网站源码
  • 如何创建一个自己的公众号哪里有网站推广优化
  • (六)React事件处理基础内容解析
  • 品牌网站建设需要哪些规划个人做百度云下载网站吗
  • linux_缓冲区及简单libc库【Ubuntu】
  • 北京通州网站建设新手如何做淘宝运营
  • 申请中网可信网站基层建设 官方网站
  • 杭州网站网络 科技公司交互做的好的中国网站
  • Java中String类
  • 呼伦贝尔网站制作网站续费能自己续费吗
  • 怎么去掉wordpress加载动画南京做网站优化
  • Godot4.x的整体架构图解析-源码阅读
  • 广州短视频网站开发服装网站设计
  • 网站自动弹窗代码郑州做网页的公司
  • 做膜结构那个网站好网站切换
  • 怎么用自助网站宁波seo怎么推广
  • 网站排名alexa常用网站有哪些
  • 【DeepSeek新开源】DeepSeek-OCR如何用“视觉压缩”革新长文本处理
  • 反向代理应用:frp
  • SetConsoleCursorPosition函数的用法