分布式专题——13 RabbitMQ之高级功能
1 队列的三种类型
- RabbitMQ 可选择三种队列类型:Classic 经典队列、Quorum 仲裁队列、Stream 流式队列。Quorum 仲裁队列在 3.8.x 版本推出,Stream 流式队列在 3.9.x 版本推出,这两种新队列是为应对现代业务场景,解决经典队列因消息大量积累影响收发性能的问题,能极大提升消息堆积性能。
1.1 Classic 经典队列
-
在创建 Classic 队列时,可设置以下关键属性:
-
Virtual host:用于指定队列所在的虚拟主机;
-
Type:队列类型,此处选 Classic;
-
Name:队列名称;
-
Durability:有 Durable 和 Transient 两个选项
- Durable 表示队列会将消息保存到硬盘,消息安全性高,但因涉及更多 I/O 操作,生产和消费消息的性能比 Transient 低;
- Transient 则是消息不持久化到硬盘,性能相对高,但消息安全性稍差;
-
Auto delete:若选择 Yes,队列将在至少一个消费者连接后,且所有消费者都断开连接后删除自己;选择 No 则不会自动删除;
-
Arguments:包含众多参数,如“Auto expire”(自动过期)、“Message TTL”(消息生存时间)、“Overflow behaviour”(溢出行为)、“Single active consumer”(单活跃消费者)、“Dead letter exchange”(死信交换机)、“Dead letter routing key”(死信路由键)、“Max length”(最大长度)、“Max length bytes”(最大字节长度)、“Maximum priority”(最大优先级)、“Version”(版本)、“Master locator”(主定位器)等,可点击问号逐步了解各参数含义;
-
-
在 RabbitMQ 中,Classic 队列是传统的队列结构,消息以 FIFO(先进先出)的方式存入队列,Consumer 从队列中取出消息后,该消息就会从队列中删除,若消息需要重新投递,就需要再次入队;
-
持久化实现:RabbitMQ 目前提供两个版本的持久化实现
- 一种方式是将数据文件整体写入和读取,方式简单,但如果消息有积压,对服务端的压力会比较大;
- 另一种方式是只读取一部分索引,数据在需要的时候再加载到内存当中,也就是懒队列,这种方式在消息积压时,对服务端的压力会相对小一点;
-
适用场景:主要用在数据量比较小,并且生产消息和消费消息的速度比较稳定的业务场景,比如内部系统之间的服务调用;
-
局限性:经典队列依赖各个 Broker 自己进行管理,在分布式场景下,管理效率不高。而且不适合积累太多消息,若队列中积累的消息太多,会严重影响客户端生产消息以及消费消息的性能。
1.2 Quorum 仲裁队列
-
Quorum 仲裁队列是 RabbitMQ 从 3.8.0 版本引入的新队列类型,也是官方比较推荐的队列类型。相比 Classic 经典队列,在分布式环境下对消息的可靠性保障更高,官方文档表示未来会用它代替传统 Classic 队列;
- Quorum Queues | RabbitMQ;
-
Quorum 仲裁队列基于 Raft 一致性协议实现,是一种新型分布式消息队列,实现了持久化、多备份的 FIFO 队列,专为 RabbitMQ 集群设计;
- 队列中的消息需要集群中多半节点同意确认后,才会写入队列,能保证消息在集群内部不丢失;
- 不过,它是以牺牲很多高级队列特性为代价,来进一步保证分布式环境下消息的高可靠性;
-
Quorum 仲裁队列是在 Classic 经典队列的基础上做减法,二者功能对比:
-
Non - durable queues(非持久化内存队列):Classic 支持,Quorum 不支持;
-
Exclusivity(独占队列,队列只能由声明队列的 Connection 连接使用,Connection 断开后自动删除):Classic 支持,Quorum 不支持;
-
Per message persistence(每条消息持久化):Classic 是每条消息都可持久化,Quorum 是始终持久化;
-
Membership changes(成员变更):Classic 自动进行,Quorum 手动进行;
-
Message TTL(消息生存时间):Classic 支持,Quorum 从 3.10 版本开始支持;
-
Queue TTL(队列生存时间):Classic 支持,Quorum 部分支持(队列重新声明时租约不会续期);
-
Queue length limits(队列长度限制):Classic 支持,Quorum 支持,但
x-overflow: reject-publish-dlx
除外; -
Lazy behaviour(懒队列行为):Classic 支持,Quorum 从 3.10 版本开始始终支持;
-
Message priority(消息优先级):Classic 支持,Quorum 不支持;
-
Consumer priority(消费者优先级):Classic 支持,Quorum 支持;
-
Dead letter exchanges(死信交换机):Classic 支持,Quorum 支持;
-
Adheres to policies(遵循策略):Classic 支持,Quorum 支持(需看策略支持情况);
-
Poison message handling(有毒消息处理,处理一直不能被消费者正常消费的消息):Classic 不支持,Quorum 支持。Quorum 队列会跟踪消息的失败投递尝试次数,记录在
x-delivery-count
头部参数中,可通过设置 Delivery limit 参数定制有毒消息删除策略,当重复投递次数超过该参数阈值时,RabbitMQ 会删除这些消息,若配置了死信队列,消息会进入死信队列; -
Global QoS Prefetch(全局服务质量预取):Classic 支持,Quorum 不支持;
-
-
数据安全性:Quorum 仲裁队列主要针对网络分区、通信失败等复杂网络情况,可提升数据安全性,通常建议配合 Publisher Confirms 机制使用。RabbitMQ 能保证经生产者确认过的消息在集群内是安全的,未经生产者确认的消息则无法保证安全;
-
适用场景:适合队列长期存在,且对容错、数据安全要求比低延迟、不持久等高级队列特性要求更严格的场景,例如电商系统的订单,引入 MQ 后处理速度可慢,但订单不能丢失;
-
不适合的场景:
- 临时使用的队列,如 transient 临时队列、exclusive 独占队列,或经常修改和删除的队列;
- 对消息低延迟要求高的场景,因为一致性算法会影响消息延迟;
- 对数据安全性要求不高的场景,Quorum 队列需要消费者或生产者手动确认;
- 队列消息积压严重的场景,Quorum 队列会将所有消息始终保存在内存中,直到达到内存使用极限,这种情况 Stream 流式队列是更好的选择。
1.3 Stream 流式队列
-
Stream 队列是 RabbitMQ 自 3.9.0 版本开始引入的新数据队列类型,消息会持久化到磁盘且具备分布式备份,更适合消费者多、读消息频繁的场景;
-
Stream 队列核心是以 append-only(只添加)的日志来记录消息,消息以 append-only 方式持久化到日志文件中,通过调整每个消费者的消费进度 offset,实现消息的多次分发。同时,有相关属性用于定义日志文件的大小以及保存时间;
-
Stream 与 Classic 队列功能对比:
-
Stream 队列提供了其他 RabbitMQ 队列类型不太好实现的四个特点:
-
大规模分发(large fan - outs):以往队列类型向多个订阅者发相同消息,需为每个消费者绑定专用队列,消费者数量大时性能低下。而 Stream 队列允许任意数量消费者使用同一个队列的消息,消除绑定多个队列的需求;
-
消息回溯(Replay/Time - travelling):RabbitMQ 原有队列类型中,消费者处理完消息后会从队列删除,无法重新读取已消费消息。Stream 队列允许用户在日志的任意连接点开始重新读取数据;
-
高吞吐性能(Throughput Performance):Stream 队列设计以性能为主要目标,对消息传递吞吐量的提升非常明显;
-
大日志(Large logs):RabbitMQ 原有队列在消息积累过多时性能下降明显,Stream 队列设计目标是用最小内存开销高效存储大量数据,可轻松在队列中积累百万级别的消息;
-
-
Stream 队列借鉴了其他 MQ 产品优点,在保证消息可靠性基础上,着力提高队列的消息吞吐量和转发性能,解决了 RabbitMQ 长期以来消息积累过多时性能下降明显的问题,也体现出 RabbitMQ 从更专注企业级内部使用,向更复杂互联网环境靠拢的趋势,随着版本推进,对 RabbitMQ 的认识也需不断更新。
1.4 三种队列的使用
- 这几种不同类型的队列,本质上都是存储消息的数据结构。Classic 队列的编程模型此前已有详细分析,Quorum 队列和 Stream 队列的使用方式与 Classic 队列大同小异。
1.4.1 Quorum 队列的使用
-
Quorum 队列与 Classic 队列使用方式相近,主要差别在声明队列时;
-
声明 Quorum 队列需在
arguments
中传入参数x-queue-type
,参数值设为quorum
; -
同时,Quorum 队列的消息必须持久化,所以
durable
参数必须设为true
,若设为false
会报错;exclusive
参数也必须设为false
,且这些声明在 Producer 和 Consumer 中要保持一致;Map<String, Object> params = new HashMap<>(); params.put("x-queue-type", "quorum"); // 声明Quorum队列的方式就是添加一个x-queue-type参数,指定为quorum。默认是classic channel.queueDeclare(QUEUE_NAME, true, false, false, params);
1.4.2 Stream 队列的使用
-
队列声明:
- Stream 队列使用稍复杂,声明时
x-queue-type
参数要设为stream
; - 还可添加可选参数,如
x-max-length-bytes
(日志文件最大字节数)、x-stream-max-segment-size-bytes
(每个日志文件最大大小),防止 Stream 日志无限制累计; durable
参数必须声明为true
,exclusive
参数必须声明为false
;
Map<String, Object> params = new HashMap<>(); params.put("x-queue-type", "stream"); params.put("x-max-length-bytes", 20_000_000_000L); // maximum stream size: 20 GB params.put("x-stream-max-segment-size-bytes", 100_000_000); // size of segment files: 100 MB channel.queueDeclare(QUEUE_NAME, true, false, false, params);
- Stream 队列使用稍复杂,声明时
-
消费步骤:
-
channel
必须设置basicQos
属性,与 Spring 框架集成时,channel
对象可在@RabbitListener
声明的消费者方法中直接引用,Spring 框架会注入; -
正确声明 Stream 队列,在
Queue
对象中传入声明所需参数; -
消费时要指定
offset
,与 Spring 框架集成时,可通过注入Channel
对象,使用原生 API 传入offset
属性。例如用原生 API 创建 Stream 类型的Consumer
时,需添加参数x-stream-offset
,表示从队列哪个位置开始消费;x-stream-offset
可选值有:-
first
:从日志队列中第一个可消费的消息开始消费 -
last
:消费消息日志中最后一个消息 -
next
:相当于不指定offset
,消费不到消息 -
Offset
:一个数字型的偏移量 -
Timestamp
:一个代表时间的Date
类型变量,表示从这个时间点开始消费,例如一小时前:Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
-
Map<String, Object> consumeParam = new HashMap<>(); consumeParam.put("x-stream-offset", "last"); channel.basicConsume(QUEUE_NAME, false, consumeParam, myconsumer);
-
-
与 SpringBoot 集成问题及解决方式:
- 在 Consumer 中必须传入
x-stream-offset
参数,目前在与 SpringBoot 集成时,Stream 队列暂时无法正常消费; - SpringBoot 框架集成 RabbitMQ 后,为简化编程模型,隐藏了
Channel
、connection
等关键对象,无法直接接入这些对象的注入过程; - 若要使用 Stream 队列,有两种方式:
- 一是使用原生 API,在 SpringBoot 框架下自行封装;
- 二是使用 RabbitMQ 的 Stream 插件(Stream Plugin | RabbitMQ),在服务端通过 Stream 插件打开 TCP 连接接口,并配合单独提供的 Stream 客户端使用,但这种方式对应用端影响大,且未提供与 SpringBoot 框架的集成,需自行完善,选择使用的企业较少;
- 在 Consumer 中必须传入
-
Stream 客户端支持情况:
- 官方支持的 Stream 客户端有:RabbitMQ Java Stream Client、RabbitMQ Golang Stream Client、RabbitMQ .NET Stream Client、RabbitMQ Rust Stream Client、RabbitMQ Python Stream Client (stream);
- 还有部分暂未得到官方完全支持的客户端,如 RabbitMQ Python Stream Client (rbfly)、RabbitMQ NodeJS Stream Client 等。
1.4.3 企业使用现状
- 在企业中,目前用得最多的还是 Classic 经典队列;
- 从 RabbitMQ 官网可知,RabbitMQ 目前主推 Quorum 队列,甚至有传言未来会用 Quorum 队列全面替代 Classic 经典队列;
- 而 Stream 队列虽经历了几个版本的完善修复,但目前还不太稳定,企业用得还比较少。
2 用 Quorum 队列代替懒队列
-
从 3.6.x 版本到 3.12.x 版本,RabbitMQ 针对 Classic Queue 提供了
lazy-mode
优化配置,即懒队列-
懒队列会尽可能早地将消息内容保存到硬盘中,只有在用户请求消息时,才临时从硬盘加载到 RAM 内存当中。
-
出现原因:默认情况下,RabbitMQ 接收到消息时,会先保存到内存以便使用,同时把消息写到硬盘。但消息写入硬盘的过程会阻塞队列,即便 RabbitMQ 对写入硬盘速度做了很多算法优化,在长队列场景下表现仍不理想,所以推出了懒队列;
-
优点:能减少内存使用,加快消费速度,适合消息量大且长期有堆积的队列;
-
缺点:以大量消耗集群的网络及磁盘 IO 为代价;
-
-
在 3.13 版本,官方建议使用 Quorum 队列代替懒队列,且明确建议从 3.11 版本往后的版本都用 Quorum 队列替代懒队列。
3 死信队列
3.1 简介
-
死信队列是 RabbitMQ 中非常重要的特性,可简单理解为 RabbitMQ 对于未能正常消费的消息进行补救的机制。死信队列本身也是一个普通的队列,同样可以在队列上声明消费者,继续对消息进行消费处理;
-
在 RabbitMQ 中,死信队列主要涉及以下几个参数:
-
x-dead-letter-exchange
:指定对应的死信交换机; -
x-dead-letter-routing-key
:指定死信交换机的 routing - key;- 之后死信交换机就可以像普通交换机一样,通过 RoutingKey 将消息转发到对应的死信队列中;
-
x-message-ttl
:消息过期时间); -
durable
:设置为true
,表示死信队列需要持久化,这是必须的。
-
3.2 何时会产生死信?
-
有以下三种情况,RabbitMQ 会将一个正常消息转成死信:
-
消息被消费者确认拒绝。即消费者把
requeue
参数设置为false
,并且在消费后,向 RabbitMQ 返回拒绝,可通过channel.basicReject
或者channel.basicNack
方法实现; -
消息达到预设的 TTL(Time-To-Live,最长存活时间)时限还一直没有被消费。消息在队列中保存时间超过这个 TTL,即会被认为死亡,死亡的消息会被丢入死信队列,如果没有配置死信队列的话,RabbitMQ 会保证死了的消息不会再次被投递,并且在未来版本中,会主动删除掉这些死掉的消息;
-
消息由于队列已经达到最长长度限制而被丢掉;
-
-
设置 TTL 有两种方式:
-
策略配置方式:可通过 Web 管理平台配置,或者使用指令配置,示例指令为
rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues
- 60000 为毫秒单位;
-
声明队列时指定:同样可以在 Web 管理平台配置,也可以在代码中配置,代码示例如下:
Map<String, Object> args = new HashMap<String, Object>(); args.put("x-message-ttl", 60000); channel.queueDeclare("myqueue", false, false, false, args);
-
3.3 死信队列的配置方式
-
RabbitMQ 有两种声明死信队列的方式,一种是针对单个单独队列指定对应的死信队列,另一种是以策略的方式进行批量死信队列的配置,针对多个队列,可使用策略方式配置统一的死信队列;
-
可通过指令进行配置,示例指令为:
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues
-
针对队列单独指定死信队列主要是设置之前提到的三个属性(
x-dead-letter-exchange
、x-dead-letter-routing-key
、x-message-ttl
)。代码示例如下:channel.exchangeDeclare("some.exchange.name", "direct"); Map<String, Object> args = new HashMap<String, Object>(); args.put("x-dead-letter-exchange", "some.exchange.name"); channel.queueDeclare("myqueue", false, false, false, args);
-
这些参数也可以在 RabbitMQ 的管理页面进行配置,例如配置策略时:
-
另外,在对队列进行配置时,只有 Classic 经典队列和 Quorum 仲裁队列才能配置死信队列,Stream 流式队列不支持配置死信队列。
3.4 x-dead-letter-routing-key
参数
- 死信转移到死信队列时,原本的 Routing key 会保存下来,但如果配置了
x-dead-letter-routing-key
参数(该参数用于指定死信交换机的 routing - key),Routing key 就会被替换为该配置的值; - 另外,死信在转移到死信队列的过程中,没有经过消息发送者确认,所以无法保证消息的安全性。
3.5 如何确定一个消息是不是死信?
- 消息被作为死信转移到死信队列后,会在 Header 中增加一些信息。在官网的详细介绍里,能看到很多内容,比如时间、原因(
rejected
、expired
、maxlen
)、队列等,而且 Header 中还会加上第一次成为死信的三个属性,这三个属性在后续传递过程中都不会更改,分别是:x-first-death-reason
x-first-death-queue
x-first-death-exchange
3.6 基于死信队列实现延迟队列
-
从前面的配置过程可以看出,死信交换机或者死信队列,是在交换机或者队列之间建立一种死信对应关系,死信队列可以像正常队列一样被消费,和普通队列一样具有 FIFO(先进先出)的特性,对死信队列的消费逻辑通常是对这些失效消息进行一些业务上的补偿;
-
RabbitMQ 中不存在延迟队列的功能,通常如果要用到延迟队列,会采用 TTL + 死信队列的方式来处理。此外,RabbitMQ 提供了
rabbitmq_delayed_message_exchange
插件,可实现延迟队列功能,但该插件没有集成到官方的发布包中,需要单独下载,这里暂不讨论。
4 消息分片存储插件
4.1 插件的作用背景
-
RabbitMQ 的客户端 TPS(Transactions Per Second,每秒事务数)和 Kafka、RocketMQ 相比有较大差距。那么在消费者处理能力有限的前提下,如何提升消费者的消费速度呢?RabbitMQ 提供的 Sharding 插件给出了一种思路;
-
提到 Sharding,很容易联想到数据库的分库分表。对于数据库分库分表,分库能减少数据库的 IO 性能压力,而要真正解决单表数据量过大的问题,就需要分表;
-
对于 RabbitMQ 来说,针对单个队列,要增加吞吐量,最有效的办法是增加消费者个数以及提高消息处理速度,但这无疑需要投入更大的资源。而 RabbitMQ 的 Sharding 插件提供了一种方案,它将一个队列中的消息分散存储到不同的节点上,并且提供多个节点的负载均衡策略,从而实现对等的读与写功能,以此在消费者处理能力有限的情况下提升消费进度。
4.2 使用步骤
4.2.1 启用 Sharding 插件
-
当前 RabbitMQ 运行版本已包含 Sharding 插件,使用时只需安装启用,执行指令:
rabbitmq-plugins enable rabbitmq_sharding
4.2.2 配置 Sharding 策略
-
启用插件后需配置策略,且必须增加分片参数(如
shards-per-node
)。可通过管理页面配置:
4.2.3 新增带 Sharding 的 Exchange 交换机
-
安装 Sharding 插件后,创建 Exchange 时会多出
x-modulus-hash
类型。通过该类型可创建分片交换机,用于后续消息的分片分发;
4.2.4 往分片交换机上发送消息
-
编写生产者代码,在 RabbitMQ 上声明一个
x-modulus-hash
类型的交换机(如sharding_exchange
),并向其发送三千条消息;public class ShardingProducer {// 定义交换机的名称,用于消息路由private static final String EXCHANGE_NAME = "sharding_exchange";public static void main(String[] args) throws Exception{// 创建RabbitMQ连接工厂实例ConnectionFactory factory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址factory.setHost("192.168.65.112");// 设置RabbitMQ服务器的端口号factory.setPort(5672);// 设置连接RabbitMQ的用户名factory.setUsername("admin");// 设置连接RabbitMQ的密码factory.setPassword("admin");// 设置要连接的虚拟主机factory.setVirtualHost("/mirror");// 创建与RabbitMQ服务器的连接Connection connection = factory.newConnection();// 创建通道,这是大多数API方法所在的地方Channel channel = connection.createChannel();// 声明一个名为"sharding_exchange"的交换机,使用"x-modulus-hash"类型// "x-modulus-hash"是一种自定义交换机类型,用于实现分片/分区功能// 它通过计算消息路由键的哈希值模数来决定将消息路由到哪个队列channel.exchangeDeclare(EXCHANGE_NAME, "x-modulus-hash");// 循环发送3000条消息for(int i = 0 ; i < 3000 ; i ++){String message = "Sharding message "+i;// 发布消息到指定的交换机// 使用String.valueOf(i)作为路由键,这将用于计算消息应该被路由到哪个分片channel.basicPublish(EXCHANGE_NAME, String.valueOf(i), null, message.getBytes());}// 关闭通道channel.close();// 关闭连接connection.close();} }
-
发送后,交换机的绑定情况会显示绑定到多个分片队列(如下图):
-
且消息会被平均分配到这些队列中:
4.2.5 消费分片交换机上的消息
-
当
sharding_exchange
交换机上的消息已平均分配到三个分片队列后,需要考虑如何消费这些消息。这些分片队列的命名有特定格式:sharding:{exchangename}-{node}-{shardingindex}
,虽然可以针对每个队列单独声明消费者,但这样只能消费到零散消息,不符合分片业务场景要消费完整数据副本的需求; -
Sharding 插件提供了伪队列消费方式,可声明一个名为
exchangename
(下面代码中命名为sharding_exchange
)的伪队列,像消费普通队列一样消费这一系列分片队列。之所以称为伪队列,是因为这个名为exchangename
的队列实际上是并不存在的;public class ShardingConsumer {// 定义队列名称,与交换机的名称相同,表明这个队列绑定到了同名的交换机public static final String QUEUENAME = "sharding_exchange";public static void main(String[] args) throws IOException, TimeoutException {// 创建RabbitMQ连接工厂实例ConnectionFactory factory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址factory.setHost("192.168.65.112");// 设置RabbitMQ服务器的端口号factory.setPort(5672);// 设置连接RabbitMQ的用户名factory.setUsername("admin");// 设置连接RabbitMQ的密码factory.setPassword("admin");// 设置要连接的虚拟主机factory.setVirtualHost("/mirror");// 创建与RabbitMQ服务器的连接Connection connection = factory.newConnection();// 创建通道,这是大多数API方法所在的地方Channel channel = connection.createChannel();// 声明一个队列,使用与交换机相同的名称// 参数说明:// - 队列名称:sharding_exchange// - 是否持久化:false(服务器重启后队列不存在)// - 是否独占:false(多个消费者可以同时使用这个队列)// - 是否自动删除:false(当没有消费者时,队列不会被自动删除)// - 其他参数:nullchannel.queueDeclare(QUEUENAME, false, false, false, null);// 创建消费者实例,重写handleDelivery方法来处理接收到的消息Consumer myconsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {System.out.println("========================");// 获取消息的路由键String routingKey = envelope.getRoutingKey();System.out.println("routingKey >" + routingKey);// 获取消息的内容类型String contentType = properties.getContentType();System.out.println("contentType >" + contentType);// 获取消息的交付标签(唯一标识符)long deliveryTag = envelope.getDeliveryTag();System.out.println("deliveryTag >" + deliveryTag);// 获取并打印消息内容System.out.println("content:" + new String(body, "UTF-8"));// 注意:这里没有调用basicAck方法进行消息确认// 这意味着消息不会被标记为已处理,服务器会继续重发这条消息// 这通常不是期望的行为,在实际应用中应该根据处理结果进行确认或拒绝}};// 启动三个消费者实例来同时消费同一个队列// 这是为了利用分片机制实现负载均衡,每个消费者会处理不同分片的消息// basicConsume方法的第二个参数为true表示自动确认消息(但上面的处理逻辑中并没有实际确认)String consumeerFlag1 = channel.basicConsume(QUEUENAME, true, myconsumer);System.out.println("c1:"+consumeerFlag1);String consumeerFlag2 = channel.basicConsume(QUEUENAME, true, myconsumer);System.out.println("c2:"+consumeerFlag2);String consumeerFlag3 = channel.basicConsume(QUEUENAME, true, myconsumer);System.out.println("c3:"+consumeerFlag3);// 注意:这里没有关闭连接和通道,消费者会持续运行并接收消息} }
-
调用
channel.basicConsume
方法注册消费者,由于有三个分片队列需要消费,所以注册了三次。Sharding
插件的实现原理是将basicConsume
方法绑定到分片队列中消息最少的一个队列上,从而实现对分片队列消息的消费。consumerTag
是消费者的标签,用于标识不同的消费者。
4.3 注意事项
- 消息分发与顺序:Producer 发送消息时,只需指定虚拟 Exchange,无法确定消息最终发往哪个分片队列。Sharding 插件虽尽量按轮询方式均匀存储消息,但不能保证绝对均匀。而且消息分片过程不考虑消息顺序,会让 RabbitMQ 原本就不严格的消息顺序问题更严重,所以该插件适合对消息延迟要求不严格、对消费顺序无要求的场景;
- 消费队列影响:Sharding 插件消费伪队列消息时,会从消费者最少的分片队列中选择。若分片队列中已有很多其他消息,消费伪队列消息时会受这些不均匀数据的影响,因此使用该插件时,这些碎片队列尽量不要单独使用。