【RabbitMQ的应用】
RabbitMQ的应用
提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档
RabbitMQ的应用
- RabbitMQ的应用
- 一、RabbitMQ的核心概念
- 1.Produce和Consumer
- 2.Connection和Channel
- 3.Virtual host
- 4.Queue
- 5.Exchange
- 6.RabbitMQ工作流程
- 二、RabbitMQ的工作模式
- 1. Simple(普通模式)
- 2. WorK Queue(工作队列模式)
- 3. Publish/Subscribe(发布/订阅模式)
- 交换机的特性
- RoutingKey(路由键)
- Binding Key(绑定键)
- 4.路由模式
- 5.通配符模式
- 6. RPC
- 7. 发布确认
- 1.概念介绍
- 2.流程:
- 3.核心参数
- 4.确认应答机制
- 1.策略一:普通确认模式(Simple Confirm)
- 2.策略二:批量确认模式(Batch Confirm)
- 3.异步确认模式(Correlated Confirm)
一、RabbitMQ的核心概念
这是RabbitMQ 的Web 管理控制台

如上图,界面上的导航栏共分6部分,这6部分分别是什么意思呢,我们先看看RabbitMQ的工作流程。

如上图所示,RabbitMQ是⼀个消息中间件,也是⼀个生产者消费者模型.它负责接收,存储并转发消息。现在我将这个组件分成5部分分别描述,更方便理解。
1.Produce和Consumer
Producer(生产者)
生产者是 RabbitMQ Server 的客户端,负责向 RabbitMQ 发送消息。
Consumer(消费者)
消费者也是 RabbitMQ Server 的客户端,负责从 RabbitMQ 接收消息。
Broker
其实就是 RabbitMQ Server,主要负责接收、路由和转发消息。
核心流程补充
- 生产者 (Producer) 创建消息,然后发布到 RabbitMQ 中。在实际应用中,消息通常是带有一定业务逻辑结构的数据(如 JSON 字符串),可带有标签,RabbitMQ 会根据标签进行路由,把消息发送给感兴趣的消费者 (Consumer)。
- 消费者连接到 RabbitMQ 服务器,即可消费消息。消费过程中,标签会被处理,消费者只会收到消息,无需知道消息的生产者是谁。
- 对于 RabbitMQ 来说,一个 Broker 可简单看作一个 RabbitMQ 服务节点(或服务实例),大多数情况下也可理解为一台 RabbitMQ 服务器。

2.Connection和Channel
Connection(连接)
连接是客户端和 RabbitMQ 服务器之间的一个 TCP 连接。这个连接是建立消息传递的基础,负责传输客户端和服务器之间的所有数据和控制信息。
Channel(通道,信道)
Channel 是在 Connection 之上的一个抽象层。在 RabbitMQ 中,一个 TCP 连接可以有多个 Channel,每个 Channel 都是独立的虚拟连接。消息的发送和接收都是基于 Channel 的。
通道的主要作用是将消息的读写操作复用到同一个 TCP 连接上,这样可以减少建立和关闭连接的开销,提高性能。

3.Virtual host
Virtual host(虚拟主机)
虚拟主机是一个虚拟概念,为消息队列提供了一种逻辑上的隔离机制。对于 RabbitMQ 而言,一个 BrokerServer 上可以存在多个 Virtual Host。当多个不同的用户使用同一个 RabbitMQ Server 提供的服务时,可以虚拟划分出多个 vhost,每个用户在自己的 vhost 中创建 exchange、queue 等。
4.Queue
Queue:队列,是RabbitMQ的内部对象,用于存储消息.

多个消费者,可以订阅同⼀个队列

5.Exchange
交换机是消息到达 broker 的第一站,负责接收生产者发送的消息,并根据特定的规则把这些消息路由到一个或多个 Queue 中。
交换机起到了消息路由的作用,它根据类型和规则来确定如何转发接收到的消息。
6.RabbitMQ工作流程
1.Producer(生产者)创建消息
消息包含业务数据(如 JSON 字符串),可附带属性(如路由键 Routing Key、消息优先级等)。
2.建立通信链路
生产者连接到 RabbitMQ Broker,建立 TCP 连接(Connection),并在连接上开启一个信道(Channel)(所有消息操作都通过 Channel 执行,减少 TCP 连接开销);同时指定要访问的虚拟主机(Virtual Host),实现资源隔离。
3.声明交换机(Exchange)
生产者通过 Channel 声明一个交换机(需指定类型,如 Direct/Topic 等),用于后续消息路由。若交换机不存在,RabbitMQ 会自动创建;若已存在,需确保声明参数(类型、持久化等)与现有一致,否则会报错。
4.声明队列(Queue)
生产者通过 Channel 声明一个队列(需指定名称、是否持久化等属性),用于存储消息。同样,若队列不存在则创建,参数需与现有一致。
5.绑定交换机与队列(Binding)
生产者通过 Channel 将交换机与队列绑定,并指定 “绑定键(Binding Key)”—— 这是交换机路由消息的核心规则(如 Direct 交换机需 Routing Key 与 Binding Key 完全匹配)。(此步骤为关键遗漏项,没有绑定,交换机无法将消息路由到队列)
6.发送消息到 Broker
生产者通过 Channel 将消息发送到指定交换机,并指定消息的 “路由键(Routing Key)”。
Broker 路由与存储消息RabbitMQ Broker 接收消息后,交换机根据自身类型、Binding Key 与消息的 Routing Key 的匹配规则,将消息路由到对应的队列:
- 若匹配成功,消息被存入队列(可配置持久化到磁盘,避免服务重启丢失);
- 若匹配失败,消息的处理方式取决于生产者配置:
若设置了mandatory=true,消息会被退回给生产者;
若设置了备份交换机(Alternate Exchange),消息会路由到备份交换机;
否则,消息会被直接丢弃。
二、RabbitMQ的工作模式
RabbitMQ 一共提供了七种工作模式,进行消息传递。不同模式适用于不同的业务场景,核心区别在于消息的路由方式、交换机(Exchange)的类型以及生产者与消费者的交互方式。以下是常见的 6 种工作模式:

1. Simple(普通模式)

1. 核心角色定义
- P(生产者):即要发送消息的程序。
- C(消费者):即消息的接收者。
- Queue(消息队列):图中黄色背景部分,类似一个邮箱,可缓存消息;生产者向其中投递消息,消费者从其中取出消息。
2. 模式特点
该模式为点对点(Point-to-Point)模式,核心特点是:一个生产者(P)对应一个消费者(C),消息只能被消费一次。
2. WorK Queue(工作队列模式)

-
角色:一个生产者P,多个消费者C1、C2。
-
消息分配:在存在多个消息的情况下,Work Queue会将消息分派给不同的消费者,每个消费者会接收到不同的消息。
-
特点:消息不会重复,会分配给不同的消费者。假设若有10条消息,C1,C2共同消费这10条记录。
-
适用场景:集群环境中做异步处理。
-
示例:12306短信通知服务,订票成功后,订单消息会发送到RabbitMQ,短信服务从RabbitMQ中获取订单信息并发送通知(在多个短信服务实例之间进行任务分配)。
3. Publish/Subscribe(发布/订阅模式)

在此之前,介绍一下交换机基本概念。
Exchange(交换机,简称X)
作用:生产者将消息发送到Exchange,由交换机将消息按`一定规则路由到一个或多个队列中(上图中生产者将消息直接投递到队列中,实际上这在RabbitMQ中不会发生)。
交换机类型及路由策略
RabbitMQ交换机有四种核心类型,不同类型对应不同路由规则:
- Fanout:广播模式,将消息转发给所有绑定到该交换机的队列(对应Publish/Subscribe模式)。
- Direct:定向模式,将消息转发给符合指定routing key的队列(对应Routing模式)。
- Topic:通配符模式,将消息转发给符合routing pattern(路由模式)的队列(对应Topics模式)。
- headers:不依赖路由键匹配,而是根据消息内容中的headers属性匹配。该类型性能较差且不实用,实际中很少使用。
(注:AMQP协议中还有System和自定义两种类型,此处不展开描述。)
交换机的特性
Exchange(交换机)仅负责转发消息,不具备存储消息的能力。因此:
- 若没有任何队列与交换机绑定,或没有符合路由规则的队列,消息会直接丢失。
RoutingKey(路由键)
生产者将消息发送给交换机时,指定的一个字符串,用于告知交换机如何处理该消息(是路由的“指引标识”)。
Binding Key(绑定键)
RabbitMQ中通过Binding(绑定)操作将交换机与队列关联,绑定时常指定一个Binding Key。通过它,RabbitMQ能明确如何将消息从交换机路由到对应的队列(是路由的“匹配规则”)。

如下图:在发送消息时,若设置了Bingdin Key为black,消息就会被路由到C2

下图为发布订阅模式:

- 角色:一个生产者 P,多个消费者 C1、C2,X 代表交换机。
核心逻辑:交换机将消息复制多份,转发到多个不同的队列,每个队列对应一个消费者,最终每个消费者接收相同的消息。 - 适用场景:消息需要被多个消费者同时接收的场景,如实时通知、广播消息。
- 示例:中国气象局发布 “天气预报” 消息到交换机,新浪、百度、搜狐、网易等门户网站通过队列绑定该交换机,自动获取气象局推送的气象数据。
4.路由模式

- 本质:发布订阅模式的变种,在其基础上增加了路由key(RoutingKey)。
- 核心区别:
发布订阅模式无条件将所有消息分发给所有消费者;
路由模式中,Exchange会根据RoutingKey的规则,筛选消息后发送给对应的消费者队列。 - 适用场景:需要根据特定规则分发消息的场景。
5.通配符模式

- 本质:路由模式的升级版,在RoutingKey基础上增加了通配符功能,使消息匹配更灵活。
- 核心原理:与Routing模式基本相同——生产者将消息发送给交换机,交换机根据RoutingKey将消息转发给匹配的队列。
- 与Routing模式的区别:
匹配方式不同:Routing模式是“相等匹配”(RoutingKey与BindingKey完全一致);Topics模式是“通配符匹配”(类似正则表达式方式定义RoutingKey模式)。 - 适用场景:需要灵活匹配和过滤消息的场景。
6. RPC

1.客户端发送消息到一个指定的队列,并在消息属性中设置replyTo字段,该字段指定了一个回调队列,用于接收服务端的响应。
2.服务端接收到请求后,处理请求并发送响应消息到replyTo指定的回调队列。
3.客户端在回调队列上等待响应消息,一旦收到响应,会检查消息的correlationId属性,确保它是所期望的响应。
7. 发布确认
1.概念介绍

作为消息中间件,可能会面临以下消息丢失问题:
1.生产者因网络抖动或自身程序故障,导致消息未成功送到到broker。
2.生产者成功将消息传送给broker,但因自身因为重启或节点故障等原因,消息丢失。
3.broker成功将消息推送给消费者,但当消费者处理时发生异常,消息实际未处理但已被 “确认消费”,从而丢失。
上述原因中,发布确认机制可以确保问题1.

- 核心原理:Publisher Confirms 模式是 RabbitMQ 提供的一种确保消息可靠发送到 RabbitMQ 服务器的机制。在这种模式下,生产者可以等待 RabbitMQ 服务器的确认,以确保消息已被服务器接收并处理。
- 工作流程:生产者将 Channel 设置为 confirm 模式(通过调用channel.confirmSelect()完成)后,发布的每一条消息都会获得一个唯一的 ID,生产者可将这些序列号与消息关联,以便跟踪消息状态。
当消息被 RabbitMQ 服务器接收并处理后,服务器会异步向生产者发送一个确认(ACK)(包含消息的唯一 ID),表明消息已送达。 - 核心作用与适用场景:通过 Publisher Confirms 模式,生产者可确保消息被 RabbitMQ 服务器成功接收,从而避免消息丢失问题。适用于对数据安全性要求较高的场景,如金融交易、订单处理。
发布确认属于 RabbitMQ 的七大工作模式之一。
2.流程:
生产者将信道设置成 confirm(确认)模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(deliveryTag投递标签)从 1 开始,一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认消息给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写⼊磁盘之后发出。

3.核心参数
①deliveryTag(投递标签):
deliveryTag 是 RabbitMQ 给生产者通过某个 Channel(通道)发送的每一条消息分配的唯一递增整数标识,相当于消息在该通道内的 “快递单号”。
作用:Broker 向生产者返回确认 / 未确认响应时,会携带该 deliveryTag,生产者通过它能精准识别 “哪一条消息” 投递成功 / 失败。
范围:deliveryTag 与 Channel 强绑定—— 同一 Channel 内,消息的 deliveryTag 从 1 开始递增;不同 Channel 之间的 deliveryTag 相互独立(即使同一连接下,Channel A 的 deliveryTag=5 和 Channel B 的 deliveryTag=5 是两条完全不同的消息)。
②. multiple(批量标识):
作用:标识当前确认是 “单条确认” 还是 “批量确认”;
解读:
multiple = true:当前 deliveryTag 及之前所有未确认的消息均已成功 / 失败(如 deliveryTag=10 代表 1-10 条消息批量确认);
multiple = false:仅当前 deliveryTag 对应的单条消息被确认 / 失败。
4.确认应答机制
发布确认机制下,有三种确认应答机制
1.策略一:普通确认模式(Simple Confirm)
- 策略说明
定义:生产者发送 单条消息后,同步阻塞等待 Broker 确认响应,直到收到 “确认(Ack)” 或 “未确认(Nack)” 信号后再发送下一条;
核心特点:
优点:可靠性极高,每条消息的投递状态独立可追溯,失败可精准重试;
缺点:性能最低,同步阻塞导致吞吐量受限(频繁网络交互);
关键 API:channel.waitForConfirms()(阻塞等待确认,返回 true 表示确认成功);
适用场景:金融交易、核心指令等 “零丢失” 场景,对吞吐量要求低。
private static void publishingMessageIndividually() throws Exception{try(Connection connection=createConnection()) {//1.开启信道Channel channel=connection.createChannel();//2.设置信道为confirm模式channel.confirmSelect();//3.声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1,true,false,false,null);long start =System.currentTimeMillis();//4.发送消息,并等待确认for (int i = 0; i <MESSAGE_COUNT ; i++) {String msg="hello publisher confirm "+i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1,null,msg.getBytes(StandardCharsets.UTF_8));//等待确认channel.waitForConfirms(5000);}long end=System.currentTimeMillis();System.out.printf("单独确认策略,消息条数: %d ,耗时: %d ms \n",MESSAGE_COUNT,end-start);}}
2.策略二:批量确认模式(Batch Confirm)
- 策略说明
定义:生产者 批量发送 N 条消息后,一次性同步等待 Broker 确认,Broker 返回 “整批确认” 或 “整批未确认”;
核心特点:
优点:性能优于普通模式,减少网络交互次数(批量确认一次等价于 N 次单条确认);
缺点:可靠性中等,批量中一条失败无法定位具体消息,需整批重试;
关键设计:通过计数器控制批次大小(如 10 条 / 批),达到阈值后调用 waitForConfirms();
适用场景:非核心业务(如通知推送、日志同步),可接受小范围重试,对吞吐量有一定要求。
private static void publishingMessageInBatches() throws IOException, TimeoutException, InterruptedException {try (Connection connection=createConnection()){//1.开启信道Channel channel=connection.createChannel();//2.设置信道为confirm模式channel.confirmSelect();//3.声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2,true,false,false,null);//4.发送消息,并进行确认int batchSize=100;int outstandingMessageCount=0;long start=System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String msg="hello publisher confirm "+i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2,null,msg.getBytes(StandardCharsets.UTF_8));outstandingMessageCount++;if(outstandingMessageCount==batchSize){channel.waitForConfirms(5000);outstandingMessageCount=0;}}if(outstandingMessageCount>0){channel.waitForConfirms(5000);}long end=System.currentTimeMillis();System.out.printf("批量确认策略,消息条数: %d ,耗时: %d ms \n",MESSAGE_COUNT,end-start);}}
3.异步确认模式(Correlated Confirm)
1. 策略说明
定义:生产者发送消息后 不阻塞,继续执行其他逻辑;Broker 处理完消息后,通过 回调函数异步通知 生产者确认结果;
核心特点:
优点:性能最佳(非阻塞),可靠性高(可精准定位单条 / 批量消息),是生产环境首选;
缺点:实现略复杂,需处理回调函数的线程安全;
适用场景:电商订单、物流通知等绝大多数生产场景,兼顾高吞吐量与高可靠性。
private static void handingPublisherConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException {try (Connection connection = createConnection()) {//1.开启信道Channel channel = connection.createChannel();//2.设置信道为confirm模式channel.confirmSelect();//3.声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);//4.监听confirm//集合中存储的是未确认的消息IDSortedSet<Long> confirmSeqNo= Collections.synchronizedSortedSet(new TreeSet<>());long start=System.currentTimeMillis();channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if(multiple){//headSet(n)方法返回当前集合中小于n的集合,这里表示清除集合confirmSeqNo.headSet(deliveryTag+1).clear();}else{//只清除一个confirmSeqNo.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if(multiple){//headSet(n)方法返回当前集合中小于n的集合,这里表示清除集合confirmSeqNo.headSet(deliveryTag+1).clear();}else{//只清除一个confirmSeqNo.remove(deliveryTag);}//业务根据实际场景进行处理,比如重发,此处代码省略}});//5.发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String msg="hello publisher confirms "+i;//获取发送的deliverTaglong seqNo=channel.getNextPublishSeqNo();channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3,null,msg.getBytes(StandardCharsets.UTF_8));confirmSeqNo.add(seqNo);}while(!confirmSeqNo.isEmpty()){Thread.sleep(10);}long end=System.currentTimeMillis();System.out.printf("异步确认策略,消息条数: %d ,耗时: %d ms \n",MESSAGE_COUNT,end-start);}}
注意事项
1.异步确认需使用线程安全集合(如 ConcurrentHashMap)存储 deliveryTag 映射,避免并发异常;
2.批量确认需控制批次大小(建议 100-1000 条),避免批次过大导致失败时重试范围扩大;
3.三种策略均需配合 队列 / 消息持久化,才能实现 “消息到达 Broker 磁盘” 的端到端可靠(仅确认机制保证到达交换机)。
