当前位置: 首页 > news >正文

RabbitMQ 高级特性

准备工作

1. 创建 Spring 项目

2. 引入依赖

3.修改配置文件

RabbitMQ官网  AMQP 0-9-1 Protocol Extensions | RabbitMQ

消息确认

消息确认机制

生产者发送消息,到达消费者后,可能会有以下情况:

    1.消息处理成功

    2.消息处理异常

RabbitMQ 向消费者发送消息之后,会把消息删除掉,如果是第二种情况,就会造成消息丢失

为了确保消费者已经成功接收到消息,RabbitMQ 提供了 消息确认机制 (Message acknowledgement)

消费者在订阅队列时,可以指定 autoAck 参数,根据这个参数设置,消息确认机制分为以下两种:

  • 自动确认

当 autoAck 为 true 时, RabbitMQ 会自动把发送出去的消息置为确认,然后从内存或者磁盘中删除,不管消费者是否真正消费到这些消息

  • 手动确认

当 autoAck 为 false 时, RabbitMQ 会等待消费者显示的调用 basicAck 命令,回复确认信号后才从内存或者磁盘中删除消息

当 autoAck 设为 false 时,对于 RabbitMQ 服务端而言,队列中的消息分成了两部分:

1. 等待投递给消费者的消息

2. 已经投递给消费者,但是还没有收到消费者确认信号的消息

如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者(可能还是原来那个消费者)

从 RabbitMQ 的管理平台上,也可以看到队列中的 Ready 和 Unacked 状态的消息数

手动确认方法

基于 RabbitMQ SDK 

消费者在收到消息后,可以选择确认,也可以选择直接拒绝或者跳过

消费者客户端可以调用与其对应的 channel 的相关方法:

  • 肯定确认 Channel.basicAck(long deliveryTag, boolean mutiple)

RabbitMQ 已经知道该消息并且处理成功的消息,可以丢弃了

  • deliveryTag    消息的唯一标识

是一个单调递增的64位的长整型值. deliveryTag 是每个 channel 独立维护的,在每个 channel 上都是唯一的.当消费者 ack 一条消息时,必须使用在对应的 channel 上确认

  • multiple    是否批量确认

在某些情况下,为减少网络流量,可以对一系列的 deliveryTag 进行批量确认

值为 true 时,会一次性 ack 所有小于或者等于 deliveryTag 的消息

值为 false 时,只确认当前制定 deliveryTag 的消息

deliveryTag 确保了消息传递的可靠性和顺序性

  • 否定确认 Channel.basicReject(long deliveryTag, boolean requeue)

RabbitMQ 在 2.0.0 版本引入 Baisc.Reject 这个命令,消费者客户端可以调用这个方法来告诉 RabbitMQ 拒绝这个消息

  • requeue    表示拒绝后,消息的处理方式

值为 true, RabbitMQ 会重新把这条消息存入队列,以便可以发送给下一个订阅的消费者

值为 false, RabbitMQ 会把消息从队列中移除,不会把这个消息发给新的消费者

  • 否定确认 Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

Basic.Reject 命令,一次只能拒绝一条消息

Basic.Nack 命令,可以批量的拒绝消息

  • multiple

值为 true, 拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息

基于 Spring Boot

基于 SpringBoot 的确认机制,使用方式和使用 RabbitMQ Java Client 库有一定的差异

Sprin-AMQP 对消息确认机制提供了三种策略:

  •  AcknowledgeMode.NONE

消息一旦被投递给消费者,不管消费者是否成功处理了消息, RabbitMQ 都会自动确认消息,从 queue 中移除消息(如果消费者处理消息失败,消息可能会丢失)

  •  AcknowledgeMode.AUTO (默认)

消费者在处理消息成功时会自动确认消息,吐过处理过程抛出异常,则不会确认消息

  •  AcknowledgeMode.MANUAL

手动确认模式下,消费者必须在成功处理消息后,显示调用 basicAck 方法来确认消息

如果消息未被确认,RabbitMQ 会认为消息未被成功处理,并且会在消费者可用时重新投递该消息

这种模式提高了消息的可用性,即使消费者处理消息后失败,消息也不会丢失,而是可以被重新处理

代码演示

基于 RabbitMQ SDK 的代码已经在前面演示过了,此处只演示基于 Spring Boot 的方式

配置  (acknowledge-mode 在配置不同模式时记得修改)

 声明  队列,交换机,绑定

生产者发送消息

消费者逻辑

 AcknowledgeMode.NONE    AcknowledgeMode.AUTO

AcknowledgeMode.MANUAL

测试

 AcknowledgeMode.NONE

消费者无论是否正常处理, MQ 都删除相应消息

AcknowledgeMode.AUTO

消费者正常处理  消息自动确认

消费者处理异常  消息不停重试

AcknowledgeMode.MANUAL

消费者正常处理  消息自动确认

消费者处理异常  消息不停重试 (requeue 设为 true, 是否重新入队)

持久化

默认情况下,RabbitMQ 退出或者由于某种原因崩溃时,会忽略队列和消息持久化

队列持久化

队列的持久化是通过在声明队列时将 durable 参数置为 true实现的
如果队列不设置持久化, 那么在RabbitMQ服务重启之后,该队列就会被删掉, 此时数据也会丢失. (队列没有了, 消息也无处可存了)
队列的持久化能保证该队列本身的元数据不会因异常情况而丢失, 但是并不能保证内部所存储的消息不会丢失. 要确保消息不会丢失, 需要将消息设置为持久化

    //默认是非持久化@Bean("presQueue")public Queue presQueue(){return QueueBuilder.durable(Constants.PRES_QUEUE).build();}    //非持久化@Bean("presQueue")public Queue presQueue(){return QueueBuilder.nonDurable(Constants.PRES_QUEUE).build();}

交换机持久化

交换器的持久化是通过在声明交换机时是将durable参数置为true实现的.相当于将交换机的属性在服务器内部保存,当MQ的服务器发生意外或关闭之后,重启 RabbitMQ 时不需要重新去建立交换机, 交换机会自动建立,相当于一直存在
如果交换器不设置持久化, 那么在 RabbitMQ 服务重启之后, 相关的交换机元数据会丢失, 对一个长期使用的交换器来说,建议将其置为持久化的

    @Bean("presExchange")public DirectExchange presExchange(){return ExchangeBuilder.directExchange(Constants.PRES_EXCHANGE).build();}//声明参数 durable 为 false@Bean("presExchange")public DirectExchange presExchange(){return ExchangeBuilder.directExchange(Constants.PRES_EXCHANGE).durable(false).build();}

消息持久化

RabbitMQ SDK 实现

消息实现持久化, 需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2,也就是 MessageDeliveryMode.PERSISTENT

public enum MessageDeliveryMode {
        NON_PERSISTENT, // ⾮持久化
        PERSISTENT; // 持久化
}

设置了队列和消息的持久化, 当 RabbitMQ 服务重启之后, 消息依旧存在. 如果只设置队列持久化, 重启之后消息会丢失. 如果只设置消息的持久化, 重启之后队列消失, 继而消息也丢失. 所以单单设置消息持久化而不设置队列的持久化显得毫无意义

// ⾮持久化信息
channel.basicPublish( "" , QUEUE_NAME , null ,msg.getBytes());
// 持久化信息
channel.basicPublish( "" , QUEUE_NAME, MessageProperties. PERSISTENT_TEXT_PLAIN ,msg.getBytes());
MessageProperties.PERSISTENT_TEXT_PLAIN 实际就是封装了这个属性
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
        new BasicProperties ( "text/plain" ,
                                           null ,
                                           null ,
                                           2 , //deliveryMode
                                           0 , null , null , null ,
                                           null , null , null , null ,
                                           null , null );

使用 RabbitTemplate 发送消息

// 要发送的消息内容
String message = "This is a persistent message" ;
// 创建⼀个 Message 对象,设置为持久化
Message message  = new Message (message.getBytes(), new MessageProperties ());
messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode. PERSIS
TENT );
// 使⽤ RabbitTemplate 发送消息
rabbitTemplate.convertAndSend(Constant. ACK_EXCHANGE_NAME , "ack" , message);
RabbitMQ默认情况下会将消息视为持久化的,除非队列被声明为非持久化,或者消息在发送时被标记为非持久化
将所有的消息都设置为持久化, 会严重影响RabbitMQ的性能(随机). 写入磁盘的速度比写入内
存的速度慢得不只一点点. 对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的
吞吐量. 在选择是否要将消息持久化时, 需要在可靠性和吐吞量之间做⼀个权衡

代码演示

队列,交换机,绑定  声明

上述代码在测试时要停止服务

当服务器停止时,服务还未停止  ->  服务会不时向服务器发送通信,判断是否能连上服务器

连接失败,发生异常;连接成功,建立连接(发现 队列,交换机 缺失,重新创建) 

将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗? 答案是否定的
1. 从消费者来说, 如果在订阅消费队列时将autoAck参数设置为true, 那么当消费者接收到相关消息之后, 还没来得及处理就宕机了, 这样也算数据丢失. 这种情况很好解决, 将autoAck参数设置为false, 并进行手动确认
2. 在持久化的消息正确存入RabbitMQ之后,还需要有一段时间(虽然很短,但是不可忽视)才能存入磁盘RabbitMQ并不会为每条消息都进行同步存盘(调用内核的fsync方法)的处理, 可能仅仅保存到操作系统缓存之中而不是物理磁盘之中. 如果在这段时间内RabbitMQ服务节点发生了宕机、重启等异常情况, 消息保存还没来得及落盘, 那么这些消息将会丢失

1. RabbitMQ的仲裁队列, 如果主节点(master)在此特殊时间内挂掉, 可以自动切换到从节点(slave),这样有效地保证了高可用性, 除非整个集群都挂掉(此方法也不能保证100%可靠, 但是配置了仲裁队列要比没有配置仲裁队列的可靠性要高很多, 实际生产环境中的关键业务队列⼀般都会设置仲裁队列)
2. 在发送端引入事务机制或者发送方确认机制来保证消息已经正确地发送并存储至RabbitMQ中

发送方确认

在使用 RabbitMQ 的时候, 可以通过消息持久化来解决因为服务器的异常崩溃而导致的消息丢失, 但是还有一个问题, 当消息的生产者将消息发送出去之后, 消息到底有没有正确地到达服务器呢? 如果在消息到达服务器之前已经丢失(比如RabbitMQ重启, 那么RabbitMQ重启期间生产者消息投递失败), 持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?

RabbitMQ提供了两种解决方案:
a. 通过事务机制实现
b. 通过发送方确认(publisher confirm) 机制实现
事务机制比较消耗性能, 在实际⼯作中使用也不多, 主要介绍confirm机制来实现发送方的确认
RabbitMQ提供了两个方式来控制消息的可靠性投递
1. confirm确认模式
2. return退回模式

confirm 确认模式

Producer 在发送消息的时候, 对发送端设置一个ConfirmCallback的监听, 无论消息是否到达Exchange, 这个监听都会被执行, 如果Exchange成功收到, ACK( Acknowledge character , 确认字符)为true, 如果没收到消息, ACK就为false

1.配置

2.设置确认回调逻辑并发送消息

此处要注意 RabbitTemplate 在配置完文件名和连接后,Spring 会自动创建一个 rabbitTemplate 实例(单例)

在类中注入后,使用的一直是同一个实例,如果对这一实例进行参数的设置,会影响所用的所有使用该实例的方法

此处采用的方法是在创建一个类,构造一个预想的 RabbitTemplate,再注入到类中(和构造方方法类似,如果主动创建一个 RabbitTemplate, Spring就不会自动创建了)

回调逻辑:

无论消息确认成功还是失败, 都会调用ConfirmCallback的confirm方法. 如果消息成功发送到Broker, ack为true
每个 RabbitTemplate 只能设置一次回调方法(Return 也是只能是只一次)
correlationData: 发送消息时的附加信息 , 通常用于在确认回调中识别特定的消
ack: 交换机是否收到消息 , 收到为 true, 未收到为 false
ause: 当消息确认失败时 , 这个字符串参数将提供失败的原因 . 这个原因可以用于调 试和错误处理 . 成功时 , cause null

生产者逻辑

启动服务,发送请求

消息成功发送到交换机,ack 为true

此时修改生产者代码,将交换机改错

消息发送失败,未找到指定的交换机,ack 为 false

此时将 routingKey 改错


此时显示的依然是成功收到消息,但是 rongtingKey 明显不符合要求

注意: confirm 是在交换机获取到消息后就返回 ack 为 true,不管该消息是否能找到目标队列

即只要发送到交换机就算成功  -> 此时就需要 return 退回模式

return 退回模式

消息到达Exchange之后, 会根据路由规则匹配, 把消息放⼊Queue中. Exchange到Queue的过程, 如果一条消息无法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等), 可以选择把消息退回给发送者. 消息退回给发送者时, 我们可以设置一个返回回调方法, 对消息进行处理

配置文件如 confirm 确认模式

设置回调

使用RabbitTemplate的setMandatory方法设置消息的mandatory属性为true(默认为false). 这个属性的作用是告诉RabbitMQ, 如果⼀条消息无法被任何队列消费, RabbitMQ应该将消息返回给发送者, 此时 ReturnCallback 就会被触发

发送消息(路由错误)

public class ReturnedMessage {
        //返回的消息对象,包含了消息体和消息属性
        private final Message message;
        //由 Broker 提供的回复码 , 表示 消息无法路由的原因 . 通常是⼀个数字代码,每个数字代表不同 的含义
        private final int  replyCode;
        //⼀个⽂本字符串 , 提供了无法路由消息的额外信息或错误描述 .
        private final String replyText;
        //消息被发送到的交换机名称
        private final String exchange;
        //消息的路由键,即发送消息时指定的键
        private final String routingKey;
}

常见问题

 如何保证RabbitMQ消息的可靠传输

1. 生产者将消息发送到 RabbitMQ失败
a. 可能原因: 网络问题等
b. 解决办法: 参考[发送方确认-confirm确认模式]
2. 消息在交换机中无法路由到指定队列:
a. 可能原因: 代码或者配置层面错误, 导致消息路由失败
b. 解决办法: 参考[发送方确认-return模式]
3. 消息队列自身数据丢失
a. 可能原因: 消息到达RabbitMQ之后, RabbitMQ Server 宕机导致消息丢失
b. 解决办法: 参考[持久性]. 开启 RabbitMQ持久化, 就是消息写入之后会持久化到磁盘, 如果RabbitMQ 挂了, 恢复之后会自动读取之前存储的数据. (极端情况下, RabbitMQ还未持久化就挂了, 可能导致少量数据丢失, 这个概率极低, 也可以通过集群的方式提⾼提高可靠性)
4. 消费者异常, 导致消息丢失
a. 可能原因: 消息到达消费者, 还没来得及消费, 消费者宕机. 消费者逻辑有问题
b. 解决办法: 参考[消息确认]. RabbitMQ 提供了 消费者应答机制 来使 RabbitMQ 能够感知到消费者是否消费成功消息. 默认情况下消费者应答机制是自动应答的, 可以开启手动确认, 当消费者确认消费成功后才会删除消息, 从而避免消息丢失. 除此之外, 也可以配置[重试机制], 当消息消费异常时, 通过消息重试确保消息的可靠性

重试机制

在消息传递过程中, 可能会遇到各种问题, 如网络故障, 服务不可用, 资源不足等, 这些问题可能导致消息处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送
但如果是程序逻辑引起的错误, 那么多次重试也是没有用的, 可以设置重试次数

RabbitMQ SDK 提供的方式:

1.自动确认: 消息到达消费者,消息就删除

2.手动确认:消息处理成功后,需要进行 ack

SpringAMQP 提供的方式:

public enum AcknowledgeMode{

        NONE,

        MANUAL,

        AUTO;

}

配置重试机制:

配置 auto

1.不配置重试机制

不断尝试发送消息,队列中的消息为 Unacked ,deliveryTag 依次加一

2.配置重试机制

消息重发 5 次后报错,并且队列中的消息丢失,deliveryTag 不变

配置 manual

1.不配置重试机制

消息不断入队,消息为 Uacked,deliveryTag 依次加一

2.配置消息重试机制

消息不断入队,消息为 Unacked,deliveryTag依次加一,重试机制不生效

可以看到, 手动确认模式时, 重试次数的限制不会像在自动确认模式下那样直接生效, 因为是否重试以及何时重试更多地取决于应用程序的逻辑和消费者的实现
自动确认模式下, RabbitMQ 会在消息被投递给消费者后自动确认消息. 如果消费者处理消息时抛出异常,RabbitMQ 根据配置的重试参数自动将消息重新入队, 从而实现重试. 重试次数和重试间隔等参数可以直接在RabbitMQ的配置中设定,并且RabbitMQ会负责执行这些重试策略
手动确认模式下, 消费者需要显式地对消息进行确认. 如果消费者在处理消息时遇到异常, 可以选择不确认消息使消息可以重新入队. 重试的控制权在于应用程序本身, 而不是RabbitMQ的内部机制. 应用程序可以通过自己的逻辑和利用RabbitMQ的高级特性来实现有效的重试策略

使用重试机制时需要注意:
1. 自动确认模式下: 程序逻辑异常, 多次重试还是失败, 消息就会被自动确认, 那么消息就丢失了
2. 手动确认模式下: 程序逻辑异常, 多次重试消息依然处理失败, 无法被确认, 就⼀直是unacked的状态, 导致消息积压

TTL

TTL (Time to Live 过期时间) RabbitMQ 可以对 队列和消息 设置 TTL

当消息到达存活时间之后,还没有被消费,就会被自动清除

目前有两种方法可以设置消息的TTL
一是设置队列的TTL, 队列中所有消息都有相同的过期时间. 二是对消息本身进行单独设置, 每条消息的TTL可以不同. 如果两种方法一起使用, 则消息的TTL以两者之间较小的那个数值为准

设置消息的 TTL

针对每条消息设置 TTL 的方法是在发送消息的方法中加入 expiration 的属性参数,单位为毫秒

如果不设置TTL,则表示此消息不会过期;如果将TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃

设置队列的 TTL

设置队列TTL的方法是在创建队列时, 加入  x-message-ttl 参数实现的, 单位是毫秒

设置队列TTL属性的方法, ⼀旦消息过期, 就会从队列中删除
设置消息TTL的方法, 即使消息过期, 也不会马上从队列中删除, 而是在即将投递到消费者之前进行判定的
为什么这两种方法处理的方式不⼀样?
设置队列TTL, 队列中已过期的消息肯定在队列头部, RabbitMQ只要定期从队头开始扫描是否有过期的消息即可
设置消息TTL, 每条消息的过期时间不同, 如果要删除所有过期消息需要扫描整个队列, 所以不如等到此消息即将被消费时再判定是否过期, 如果过期再进行删除即可

死信队列

死信(dead message) 简单理解就是因为种种原因, 无法被消费的信息, 就是死信
当消息在一个队列中变成死信之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX( Dead Letter Exchange ), 绑定DLX的队列, 就称为死信队列(Dead Letter Queue,简称DLQ)

消息变成死信一般是由于以下几种原因:

1.消息被拒绝(Basic.Reject / Basic.Nack),并设置 requeue 为 false (拒绝重新入队)

2.消息过期

3.队列达到最大长度

代码演示

队列,交换机,绑定声明

一下就针对上述三种导致死信的原因进行演示:

1.消息被拒绝

2.消息过期

删除刚才创建的 正常队列 ,注掉正常队列的监听类,在 正常队列 上设置 TTL

发送消息,等待10秒

3.队列达到最长长度

删除刚才创建的 正常队列,注掉 正常队列 的监听类

常见问题

1. 死信队列的概念
死信(Dead Letter)是消息队列中的一种特殊消息, 它指的是那些无法被正常消费或处理的消息. 在消息队列系统中, 如RabbitMQ, 死信队列用于存储这些死信消息
2. 死信的来源
1) 消息过期: 消息在队列中存活的时间超过了设定的TTL
2) 消息被拒绝: 消费者在处理消息时, 可能因为消息内容错误, 处理逻辑异常等原因拒绝处理该消息. 如果拒绝时指定不重新⼊队(requeue=false), 消息也会成为死信
3) 队列满了: 当队列达到最大长度, 无法再容纳新的消息时, 新来的消息会被处理为死信.
3. 死信队列的应用场景
对于RabbitMQ来说, 死信队列是一个非常有用的特性. 它可以处理异常情况下,消息不能够被消费者正确消费而被置⼊死信队列中的情况, 应用程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况, 进而可以改善和优化系统.
比如: 用户支付订单之后, 支付系统会给订单系统返回当前订单的支付状态
为了保证支付信息不丢失, 需要使用到死信队列机制. 当消息消费异常时, 将消息投⼊到死信队列中, 由
订单系统的其他消费者来监听这个队列, 并对数据进行处理(比如发送工单等,进行人工确认)
场景的应用场景还有:
• 消息重试:将死信消息重新发送到原队列或另一个队列进行重试处理
• 消息丢弃:直接丢弃这些无法处理的消息,以避免它们占用系统资源
• 日志收集:将死信消息作为日志收集起来,用于后续分析和问题定位

延迟队列

RabbitMQ本身没有直接支持延迟队列的的功能, 但是可以通过前面所介绍的TTL+死信队列的方式组合模拟出延迟队列的功能.
假设一个应用中需要将每条消息都设置为10秒的延迟, 生产者通过 normal_exchange 这个交换器将发送的消息存储在 normal_queue 这个队列中. 消费者订阅的并非是 normal_queue 这个队列, 而是 dlx_queue 这个队列. 当消息从 normal_queue 这个队列中过期之后被存入 dlx_queue 这个队列中,消费者就恰巧消费到了延迟10秒的这条消息

TTL+ 死信队列 实现

TTL的实现有两种方式:

1.设置队列的过期时间

2.设置消息的过期时间

所以 基于 TTL + 死信队列 实现的延迟队列,也有两种形式:

1.设置队列TTL +死信队列

设置队列 TTL

向正常队列发送消息

监听死信队列里面的消息

2.设置消息TTL + 死信队列

第二条消息设置的是延迟 10 s,最后却和设置延迟 20 s 的消息几乎同时返回

消息过期之后, 不一定会被马上丢弃. 因为RabbitMQ只会检查队首消息是否过期, 如果过期则丢到死信队列. 此时就会造成一个问题, 如果第一个消息的延时时间很长, 第二个消息的延时时间很短, 那第二个消息并不会优先得到执行

所以在考虑使用TTL+死信队列实现延迟任务队列的时候, 需要确认业务上每个任务的延迟时间是一致的, 如果遇到不同的任务类型需要不同的延迟的话, 需要为每一种不同延迟时间的消息建立单独的消息队列

延迟队列插件

RabbitMQ 官方也提供了一个延迟的插件来实现延迟的功能    Scheduling Messages with RabbitMQ | RabbitMQ

插件下载地址    Releases · rabbitmq/rabbitmq-delayed-message-exchange
根据自己的 RabbitMQ 版本选择相应版本的插件,下载后上传到服务器

插件上传目录参考   installing Additional Plugins | RabbitMQ


/usr/lib/rabbitmq/plugins  是一个附加目录,RabbitMQ 本身不会在此安装任何内容,如果没有这个路径,可以自己创建

#下载文件并解压

#进入指定目录  /usr/lib/rabbitmq/plugins  ,将解压后的文件拖拽到界面上传

#查看插件列表

rabbitmq-plugins list

#启动插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

#启动服务

service rabbitmq-server restart

#验证插件

基于插件实现延迟队列

添加 delayed 属性,表示声明一个延迟类型交换机

设置延迟时间 (注意与上面的区分,上面是 队列的过期时间 setExpiration())  参数类型为 long

可以看到消息按照预定的过去时间返回了

二者对比

1. 基于死信实现的延迟队列
a. 优点: 1) 灵活不需要额外的插件支持
b. 缺点: 1) 存在消息顺序问题 2) 需要额外的逻辑来处理死信队列的消息, 增加了系统的复杂性
2. 基于插件实现的延迟队列
a. 优点: 1) 通过插件可以直接创建延迟队列, 简化延迟消息的实现. 2) 避免了DLX的时序问题
b. 缺点: 1) 需要依赖特定的插件, 有运维工作 2) 只适用特定版本

事务

RabbitMQ是基于AMQP协议实现的, 该协议实现了事务机制, 因此RabbitMQ也支持事务机

Spring AMQP也提供了对事务相关的操作. RabbitMQ事务允许开发者确保消息的发送和接收是原子性的, 要么全部成功, 要么全部失败

#未实现事务

发送消息,发现错误

两条消息,报错前的一条消息被发送到队列

#实现事务

添加 Transactional 注解

由于要设置 rabbitTemplate ,所以重新创建一个,再注入到类里面(原因见 消息确认 段)

声明一个 事务管理器

上述三个画红线的部分都要实现,否则不能实现

消息分发

RabbitMQ队列拥有多个消费者时, 队列会把收到的消息分派给不同的消费者. 每条消息只会发送给订阅列表里的一个消费者. 这种方法非常适合扩展, 如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可
默认情况下, RabbitMQ是以轮询的方法进行分发的, 而不管消费者是否已经消费并已经确认了消息. 这种方式是不太合理的, 试想⼀下, 如果某些消费者消费速度慢, 而某些消费者消费速度快, 就可能会导致某些消费者消息积压, 某些消费者空闲, 进而应用整体的吞吐量下降
如何处理呢? 我们可以使用 RabbitMQ SDK 讲到的channel.basicQos(int prefetchCount) 方法, 来限制当前信道上的消费者所能保持的最大确认消息的数量
比如: 消费端调用了 channelbasicQos(5) , RabbitMQ会为该消费者计数, 发送一条消息计数+1, 消费一条消息计数-1, 当达到了设定的上限, RabbitMQ就不会再向它发送消息了,直到消费者确认了某条消息
类似TCP/IP中的"滑动窗口"

prefetchCount 设置为0时表示上限
basicQos 对拉模式的消费无效

限流

订单系统每秒最多处理5000请求, 正常情况下, 订单系统可以正常满足需求
但是在秒杀时间点, 请求瞬间增多, 每秒1万个请求, 如果这些请求全部通过MQ发送到订单系统, 无疑会把订单系统压垮

RabbitMQ提供了限流机制, 可以控制消费端一次只拉取N个请求
通过设置prefetchCount参数, 同时也必须要设置消息应答方式为手动应答
prefetchCount: 控制消费者从队列中预取(prefetch)消息的数量, 以此来实现流控制和负载均衡

设置 prrefetchCount 参数,开启手动应答

发送 20 条消息

队列在接收 4 条消息后,没有进行确认应答,达到 prefetchCount 的最大值,不再接收消息

负载均衡

我们也可以用此配置,来实现"负载均衡"
如下图所示, 在有两个消费者的情况下,一个消费者处理任务非常快, 另一个非常慢,就会造成一个消费者会一直很忙, 而另一个消费者很闲. 这是因为 RabbitMQ 只是在消息进⼊队列时分派消息. 它不考虑消费者未确认消息的数量

可以使用设置prefetch=1 的方式, 告诉 RabbitMQ 一次只给一个消费者一条消息, 也就是说, 在处理并确认前一条消息之前, 不要向该消费者发送新消息. 相反, 它会将它分派给下一个不忙的消费者

开启两个监听器,在获取消息后使用 Thread.sleep() 进行休眠,模拟消息处理的过程

以上也可以大致看出,不同队列处理消息的频率,类似于是 负载均衡效果`

deliveryTag 有重复是因为两个消费者使用的是不同的Channel, 每个 Channel 上的deliveryTag 是独立计数的

相关文章:

  • Unity 模拟高度尺系统开发详解——实现拖动、范围限制、碰撞吸附与本地坐标轴选择
  • C语言基础(08)【循环结构】
  • PCB设计教程【强化篇】——USB拓展坞原理图设计
  • 生成式AI模型学习笔记
  • Fastapi 学习使用
  • 告别压降损耗与反向电流困扰:汽车电子电源防反接方案全面解析与理想二极管应用
  • 【Unity笔记】Unity WASD+QE 控制角色移动与转向(含 Shift 加速)实现教程
  • 【Python进阶】CPython
  • 分析XSSstrike源码
  • 关联子串 - 华为OD统一考试(JavaScript题解)
  • 姜老师MBTI课程:4条轴线的总结
  • ssh连接断开,保持任务后台执行——tmux
  • Java 中 Redis 过期策略深度解析(含拓展-redis内存淘汰策略列举)
  • spring boot项目中的一些常用提示信息
  • C++17新特性 Lambda表达式
  • 第十四篇:MySQL 运维中的故障场景还原与排查实战技巧
  • NLP基础:从词嵌入到预训练模型应用
  • token
  • 进程间通信(消息队列)
  • C++学习打卡
  • 手机网站建设价格是多少/网络营销企业培训
  • wordpress搬家后打不开网页/seo基础入门免费教程
  • 织梦手机网站模板/肇庆网站推广排名
  • 线下推广平台有哪些/优化外包服务公司
  • 青岛网站维护/东莞好的网站国外站建设价格
  • 宁夏考试教育网站/企业站seo外包