当前位置: 首页 > news >正文

RabittMQ-高级特性2-应用问题

文章目录

  • 前言
  • 延迟队列介绍
  • ttl+死信队列存在问题
  • 延迟队列插件安装
  • 延迟插件使用
  • 事务
  • 消息分发概念介绍
    • 限流
    • 非公平分发(负载均衡)
  • 限流
  • 负载均衡
  • RabbitMQ应用问题-幂等性保障
  • 顺序性保障介绍1
  • 顺序性保障介绍2
  • 消息积压
  • 总结


前言

延迟队列介绍

延迟队列(Delayed Queue),即消息被发送以后, 并不想让消费者⽴刻拿到消息, ⽽是等待特定时间后,消费者才能拿到这个消息进⾏消费

延迟队列的使⽤场景有很多, ⽐如:

  1. 智能家居: ⽤⼾希望通过⼿机远程遥控家⾥的智能设备在指定的时间进⾏⼯作. 这时候就可以将⽤⼾指令发送到延迟队列, 当指令设定的时间到了再将指令推送到智能设备.
  2. ⽇常管理: 预定会议后,需要在会议开始前⼗五分钟提醒参会⼈参加会议
  3. ⽤⼾注册成功后, 7天后发送短信, 提⾼⽤⼾活跃度等

RabbitMQ本⾝没有直接⽀持延迟队列的的功能, 但是可以通过前⾯所介绍的TTL+死信队列的⽅式组合模拟出延迟队列的功能.
假设⼀个应⽤中需要将每条消息都设置为10秒的延迟, ⽣产者通过 normal_exchange 这个交换器将
发送的消息存储在 normal_queue 这个队列中. 消费者订阅的并⾮是 normal_queue 这个队列, ⽽
是 dlx_queue 这个队列. 当消息从 normal_queue 这个队列中过期之后被存⼊ dlx_queue 这个
队列中,消费者就恰巧消费到了延迟10秒的这条消息

就是消费ttl到了的死信队列的信息
在这里插入图片描述
这个就是死信队列就是一个延迟10s的延迟队列

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

这样就成功了

ttl+死信队列存在问题

问题就是消息的ttl,先发送一个20s的,再来10s的,那么10s就不会立马过期,第一个过期了,第二个才会过期,到达消费端的时候,才会去判定过没过期
如果是队列的ttl就没事了

在这里插入图片描述
所以这样的话,过期时间都变成一样的了

如果先10s后20s就没事了
在这里插入图片描述

延迟队列插件安装

官网安装

在这里插入图片描述

点击就可以跳转了

在这里插入图片描述
点击版本,下载ez
然后要把插件放在linux下载的rabbitmq的文件下
在这里插入图片描述
第一个就是ubuntu的

我们把插件放在这个目录下面就可以了
两个目录放一个就可以了
/usr/lib/rabbitmq/plugins 是⼀个附加⽬录, RabbitMQ包本⾝不会在此安装任何内容, 如果
没有这个路径, 可以⾃⼰进⾏创建
在这里插入图片描述
然后把ez包弄在这个目录下

#查看插件列表
rabbitmq-plugins list
#启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 禁用插件
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
#重启服务
service rabbitmq-server restart

在这里插入图片描述
可以看到就放进来了
可以看到版本对不上,我们换一个版本3.9.27

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

这个时候我们的交换机就多了一个类型了
就是延迟队列类型

延迟插件使用

在这里插入图片描述
我们只需要在声明交换机的时候多声明.delayed()这个参数就可以了
在这里插入图片描述
然后就是生产者,就不是说明过期时间了,而是说明延迟时间setDelayLong

在这里插入图片描述
然后再写消费者

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
这个消息就不会乱序了

在这里插入图片描述

在这里插入图片描述
这个延迟队列的原理是

把消息暂停在交换机上,到了时间后才会到达队列,不信的话,就把消费者注释一下就知道了

事务

RabbitMQ是基于AMQP协议实现的, 该协议实现了事务机制, 因此RabbitMQ也⽀持事务机制. Spring
AMQP也提供了对事务相关的操作. RabbitMQ事务允许开发者确保消息的发送和接收是原⼦性的, 要么全部成功, 要么全部失败

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

这种发送消息的话,就只会发送成功一个消息,就是第一个消息,这个就是没有采用事务的方式

要启动事务的话,就要对template进行单独的配置,不然每个请求都有事务了
还是自己构建一个template
在这里插入图片描述
这样我们就创建了一个有事务的template了,开启setChannelTransacted即可

在这里插入图片描述

在这里插入图片描述
然后还要在方法的上面加上注解Transactional,这样就开启事务了

这样两条消息都不会发送成功了

但是这样还是不行的,还要做一点
我们还要创建一个rabbitmq的事务管理器,加入bean
在这里插入图片描述
这样就可以完成事务了
只有这三个条件同时满足的时候,那么这两个才不会同时发送
如果三个条件少了一个,那么都不是rabbitmq的事务,都会发送成功一个

1.不加 @Transactional , 会发现消息1发送成功
2. 添加 @Transactional , 消息1和消息2全部发送失败

而且值得注意的就是rabbitmq的事务与手动确认模式和发布确认模式是矛盾的,意思是不能同时存在和使用
在这里插入图片描述

所以最好改为自动确认

        acknowledge-mode: auto  # 使用自动确认(配合事务)publisher-confirm-type: none  # 关闭发布确认(使用事务保证)

就是这样
那么就有效果了

消息分发概念介绍

RabbitMQ队列拥有多个消费者时, 队列会把收到的消息分派给不同的消费者. 每条消息只会发送给订阅列表⾥的⼀个消费者. 这种⽅式⾮常适合扩展, 如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可.默认情况下, RabbitMQ是以轮询的⽅法进⾏分发的, ⽽不管消费者是否已经消费并已经确认了消息. 这种⽅式是不太合理的, 试想⼀下, 如果某些消费者消费速度慢, ⽽某些消费者消费速度快, 就可能会导致某些消费者消息积压, 某些消费者空闲, 进⽽应⽤整体的吞吐量下降.
如何处理呢? 我们可以使⽤前⾯章节讲到的channel.basicQos(int prefetchCount) ⽅法, 来限制当前信道上的消费者所能保持的最⼤未确认消息的数量
⽐如: 消费端调⽤了 channelbasicQos(5) , RabbitMQ会为该消费者计数, 发送⼀条消息计数+1, 消费⼀条消息计数-1, 当达到了设定的上限, RabbitMQ就不会再向它发送消息了,直到消费者确认了某条消息.类似TCP/IP中的"滑动窗⼝".prefetchCount 设置为0时表⽰没有上限.basicQos 对拉模式的消费⽆效(后⾯再讲)
在这里插入图片描述

消息分发的常⻅应⽤场景有如下:

  1. 限流
  2. ⾮公平分发

限流

订单系统每秒最多处理5000请求, 正常情况下, 订单系统可以正常满⾜需求
但是在秒杀时间点, 请求瞬间增多, 每秒1万个请求, 如果这些请求全部通过MQ发送到订单系统, ⽆疑会把订单系统压垮

RabbitMQ提供了限流机制, 可以控制消费端⼀次只拉取N个请求
通过设置prefetchCount参数, 同时也必须要设置消息应答⽅式为⼿动应答
prefetchCount: 控制消费者从队列中预取(prefetch)消息的数量, 以此来实现流控制和负载均衡

非公平分发(负载均衡)

我们也可以⽤此配置,来实现"负载均衡"
如下图所⽰, 在有两个消费者的情况下,⼀个消费者处理任务⾮常快, 另⼀个⾮常慢,就会造成⼀个消费者会⼀直很忙, ⽽另⼀个消费者很闲. 这是因为 RabbitMQ 只是在消息进⼊队列时分派消息. 它不考虑消费者未确认消息的数量

我们可以使⽤设置prefetch=1 的⽅式, 告诉 RabbitMQ ⼀次只给⼀个消费者⼀条消息, 也就是说, 在处理并确认前⼀条消息之前, 不要向该消费者发送新消息. 相反, 它会将它分派给下⼀个不忙的消费者

限流

配置prefetch参数, 设置应答⽅式为⼿动应答

channel.basicQos(int prefetchCount)这个是sdk提供的方法

在这里插入图片描述
这个就表示每一个消费者,未确认就只能为五个

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

我们这个消费者一直没有确认的话,那么就会有15条消息没有发过来
在这里插入图片描述
在这里插入图片描述

这个的意思就是队列中有15条消息,消费者有五条未确认的消息,总共20条消息
剩下的15条消息就不会给消费者了

在这里插入图片描述

在这里插入图片描述
没有限流的话,20条消息就全部一下子去给消费者了
限流就是限制消费者最多获得的资源数

负载均衡

在这里插入图片描述
设置为1,意思就是干完一个才能干下一个—》这样就不会挤压了
设置为2的意思就是一次给你两个任务,干完了才能领取下一次的两个任务

然后现在改为两个消费者

在这里插入图片描述

一个快点,立马确认,一个慢点,一直不确认

在这里插入图片描述

就变成这样了

在这里插入图片描述
再改一下

在这里插入图片描述

这样就是负载均衡了
看得出来111每处理一个222就要处理两个
那个tag是每个信道的tag,左边的test编号才是资源的编号
每个信道的tag是互不干扰的

RabbitMQ应用问题-幂等性保障

幂等性是数学和计算机科学中某些运算的性质, 它们可以被多次应⽤, ⽽不会改变初始应⽤的结果.
应⽤程序的幂等性介绍
在应⽤程序中, 幂等性就是指对⼀个系统进⾏重复调⽤(相同参数), 不论请求多少次, 这些请求对系统的影响都是相同的效果.
⽐如数据库的 select 操作. 不同时间两次查询的结果可能不同, 但是这个操作是符合幂等性的. 幂等
性指的是对资源的影响, ⽽不是返回结果. 查询操作对数据资源本⾝不会产⽣影响, 之所以结果不同, 可能是因为两次查询之间有其他操作对资源进⾏了修改.
⽐如 i++ 这个操作, 就是⾮幂等性的. 如果调⽤⽅没有控制好逻辑, ⼀次流程重复调⽤好⼏次, 结果
就会不同
MQ的幂等性介绍
对于MQ⽽⾔, 幂等性是指同⼀条消息, 多次消费, 对系统的影响是相同的.
⼀般消息中间件的消息传输保障分为三个层级.

  1. At most once:最多⼀次. 消息可能会丢失,但绝不会重复传输.
  2. At least once:最少⼀次. 消息绝不会丢失,但可能会重复传输.–》要保证幂等性
  3. Exactly once:恰好⼀次. 每条消息肯定会被传输⼀次且仅传输⼀次.
    RabbitMQ⽀持"最多⼀次"和"最少⼀次".
    对于"恰好⼀次", ⽬前RabbitMQ还做不到, 不仅是RabbitMQ, ⽬前市⾯上主流的消息中间件, 都做不到这⼀点.

在业务使⽤中, 对于可靠性要求⽐较⾼的场景, 建议使⽤"最少⼀次", 以防⽌消息丢失. “最多⼀次” 会因为消息发送过程中, ⽹络问题, 消费出现异常等种种原因, 导致消息丢失.
以下场景可能会导致消息发送重复(包含但不限于)
• 发送时消息重复: 当⼀条消息已被成功发送到服务端并完成持久化, 此时出现了⽹络闪断或者客⼾
端宕机, 导致服务端对客⼾端应答失败. 如果此时Producer意识到消息发送失败并尝试再次发送消
息, Consumer后续会收到两条内容相同并且Message ID也相同的消息.
• 投递时消息重复: 消息消费的场景下, 消息已投递到Consumer并完成业务处理, 当客⼾端给服务端
反馈应答的时候⽹络闪断. 为了保证消息⾄少被消费⼀次, 云消息队列 RabbitMQ 版的服务端将在
⽹络恢复后再次尝试投递之前已被处理过的消息, Consumer后续会收到两条内容相同并且
Message ID也相同的消息

但是"最少⼀次", 就会造成⼀个问题, 消费端会收到重复的消息, 也会造成对同⼀条消息进⾏多次处理. ⼀些不重要的业务还好⼀点, 对于重要的业务, 如果不对重复的消息进⾏处理, 会造成严重事故.
⽐如: 当⽤⼾对⼀个订单付款之后, 因为⽹络问题, 付款成功的结果未返回给订单系统, 当⽤⼾再次点击付款时, 如果系统未做幂等性处理, 那就会造成两次扣款

MQ消费者的幂等性的解决⽅法, ⼀般有以下⼏种:
全局唯⼀ID

  1. 为每条消息分配⼀个唯⼀标识符, ⽐如UUID或者MQ消息中的唯⼀ID,但是⼀定要保证唯⼀性.
  2. 消费者收到消息后, 先⽤该id判断该消息是否已经消费过, 如果已经消费过, 则放弃处理.
  3. 如果未消费过, 消费者开始消费消息, 业务处理成功后, 把唯⼀ID保存起来(数据库或Redis等)
    可以使⽤Redis 的原⼦性操作setnx来保证幂等性, 将唯⼀ID作为key放到redis中 (SETNX
    messageID 1) . 返回1, 说明之前没有消费过, 正常消费. 返回0, 说明这条消息之前已消费过, 抛
    弃.
    SETNX = set if not exsts

业务逻辑判断
在业务逻辑层⾯实现消息处理的幂等性.
例如: 通过检查数据库中是否已存在相关数据记录, 或者使⽤乐观锁机制来避免更新已被其他事务更改的数据, 再或者在处理消息之前, 先检查相关业务的状态, 确保消息对应的操作尚未执⾏, 然后才进⾏处理, 具体根据业务场景来处理

顺序性保障介绍1

消息的顺序性是指消费者消费的消息和⽣产者发送消息的顺序是⼀致的.
⽐如⽣产者发送的消息分别是msg1, msg2, msg3, 那么消费者也是按照msg1, msg2, msg3的顺序进⾏消费的.
很多业务场景下, 消息的消费是不⽤保证顺序的, ⽐如使⽤MQ实现订单超时的处理. 但有些业务场景, 可能存在多个消息顺序处理的情况. ⽐如⽤⼾信息修改, 对同⼀个⽤⼾的同⼀个资料进⾏修改, 需要保证消息的顺序

⼀些资料显⽰RabbitMQ的消息能够保障顺序性, 这是不严谨的. 在不考虑消息丢失, ⽹络故障等异常的情况下, 如果只有⼀个消费者, 最好也只有⼀个⽣产者的情况下, 是可以保证消息的顺序性**. 如果有多个⽣产者同时发送消息, ⽆法确定消息到达RabbitMQ Broker的前后顺序, 也就⽆法验证消息的顺序性**.哪些情况可能会打破RabbitMQ的顺序性呢? 下⾯介绍⼏种常⻅的场景:一个生产者

  1. 多个消费者: 当队列配置了多个消费者时, 消息可能会被不同的消费者并⾏处理, 从⽽导致消息处理
    的顺序性⽆法保证.
  2. ⽹络波动或异常: 在消息传递过程中, 如果出现⽹络波动或异常, 可能会导致消息确认(ACK) 丢失, 从⽽使得消息被重新⼊队和重新消费, 造成顺序性问题.
  3. 消息重试:如果消费者在处理消息后未能及时发送确认, 或者确认消息在传输过程中丢失, 那么MQ可能会认为消息未被成功消费⽽进⾏重试, 这也可能导致消息处理的顺序性问题.
  4. 消息路由问题: 在复杂的路由场景中, 消息可能会根据路由键被发送到不同的队列, 从⽽⽆法保证全
    局的顺序性.
  5. 死信队列: 消息因为某些原因(如消费端拒绝消息)被放⼊死信队列, 死信队列被消费时, ⽆法保证消息的顺序和⽣产者发送消息的顺序⼀致
    包括但不仅限于以上⼏种情形会使RabbitMQ消息错序, 如果要保证消息的顺序性, 需要业务⽅使⽤
    RabbitMQ之后做进⼀步的处理

顺序性保障介绍2

消息顺序性保障分为: 局部顺序性保证和全局顺序性保证.
局部顺序性通常指的是在单个队列内部保证消息的顺序. 全局顺序性是指在多个队列或多个消费者之间保证消息的顺序.
在实际应⽤中, 全局顺序性很难实现, 可以考虑使⽤业务逻辑来保证顺序性, ⽐如在消息中嵌⼊序列号,并在消费端进⾏排序处理. 相对⽽⾔, 局部顺序性更常⻅, 也更容易实现.
RabbitMQ作为⼀个分布式消息队列, 主要优化的是吞吐量和可⽤性, ⽽不是严格的顺序性保证. 如果业务场景确实需要严格的消息顺序, 可能需要在应⽤层⾯进⾏额外的设计和实现.
接下来说⼀下消息的顺序性保证的常⻅策略.

  1. 单队列单消费者
    最简单的⽅法是使⽤单个队列, 并由单个消费者进⾏处理. 同⼀个队列中的消息是先进先出的, 这是
    RabbitMQ来帮助我们保证的

  2. 分区消费
    单个消费者的吞吐太低了, 当需要多个消费者以提⾼处理速度时, 可以使⽤分区消费. 把⼀个队列分割成多个分区, 每个分区由⼀个消费者处理, 以此来保持每个分区内消息的顺序性.
    ⽐如⽤⼾修改资料后, 发送⼀条⽤⼾资料消息. 消费者在处理时, 需要保证消息发送的先后顺序
    但这种场合并不需要保证全局顺序. 只需要保证同⼀个⽤⼾的消息顺序消费就可以. 这时候就可以采
    ⽤把消费按照⼀定的规则, 分为多个区, 每个分区由⼀个消费者处理
    RabbitMQ本⾝并不⽀持分区消费, 需要业务逻辑去实现, 或者借助spring-cloud-stream来实现
    参考: https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_partitions.html
    在这里插入图片描述

  3. 消息确认机制
    使⽤⼿动消息确认机制, 消费者在处理完⼀条消息后, 显式地发送确认, 这样RabbitMQ才会移除并继续发送下⼀条消息.

  4. 业务逻辑控制
    在某些情况下, 即使消息乱序到达, 也可以在业务逻辑层⾯实现顺序控制. ⽐如通过在消息中嵌⼊序列
    号, 并在消费时根据这些信息来处理
    RabbitMQ本⾝并不保证全局的严格顺序性, 特别是在分布式系统中. 在实际应⽤开发中, 根据具体的业务需求, 可能需要结合多种策略来实现所需要的顺序保证

消息积压

原因分析
消息积压是指在消息队列(如RabbitMQ)中, 待处理的消息数量超过了消费者处理能⼒, 导致消息在队列中不断堆积的现象.
通常有以下⼏种原因:

  1. 消息⽣产过快: 在⾼流量或者⾼负载的情况下, ⽣产者以极⾼的速率发送消息, 超过了消费者的处理
    能⼒.
  2. 消费者处理能⼒不⾜: 消费者处理处理消息的速度跟不上消息⽣产的速度, 也会导致消息在队列中积压.
    可能原因有:
  1. 消费端业务逻辑复杂, 耗时⻓
  2. 消费端代码性能低
  3. 系统资源限制, 如CPU、内存、磁盘I/O等也会限制消费者处理消息的效率.
  4. 异常处理处理不当. 消费者在处理消息时出现异常, 导致消息⽆法被正确处理和确认.
  1. ⽹络问题: 因为⽹络延迟或不稳定, 消费者⽆法及时接收或确认消息, 最终导致消息积压
  2. RabbitMQ 服务器配置偏低
    消息积压可能会导致系统性能下降, 影响⽤⼾体验, 甚⾄导致系统崩溃. 因此, 及时发现消息积压并解决对于维护系统稳定性⾄关重要

解决⽅案
遇到消息积压时, ⾸先要分析消息积压造成的原因. 根据原因来调整策略.
主要从以下⼏个⽅⾯来解决:

  1. 提⾼消费者效率
    a. 增加消费者实例数量, ⽐如新增机器
    b. 优化业务逻辑, ⽐如使⽤多线程来处理业务
    c. 设置prefetchCount, 当⼀个消费者阻塞时, 消息转发到其他未阻塞的消费者.
    d. 消息发⽣异常时, 设置合适的重试策略, 或者转⼊到死信队列
  2. 限制⽣产者速率. ⽐如流量控制, 限流算法等.
    a. 流量控制: 在消息⽣产者中实现流量控制逻辑, 根据消费者处理能⼒动态调整发送速率
    b. 限流: 使⽤限流⼯具, 为消息发送速率设置⼀个上限
    c. 设置过期时间. 如果消息过期未消费, 可以配置死信队列, 以避免消息丢失, 并减少对主队列的压
  3. 资源与配置优化. ⽐如升级RabbitMQ服务器的硬件, 调整RabbitMQ的配置参数等

上述这些策略选择时, 需要综合考虑业务需求和系统的实际承载能⼒

总结

相关文章:

  • 深度学习基础--目标检测常见算法简介(R-CNN、Fast R-CNN、Faster R-CNN、Mask R-CNN、SSD、YOLO)
  • 【论文解读】| ACL2024 | LANDeRMT:基于语言感知神经元路由的大模型机器翻译微调框架
  • Kafka单机版安装部署
  • Docker下Gogs设置Webhook推送Spug,踩坑记录与解决方案
  • 机器学习第三讲:监督学习 → 带答案的学习册,如预测房价时需要历史价格数据
  • 广东省省考备考(第六天5.9)—言语:逻辑填空(每日一练)
  • 初识C++:入门基础(二)
  • 香港科技大学(广州)新开设智能制造理学硕士学位项目2025年9月入学机会
  • 可信数据空间:标准体系建设指南及空间能力要求
  • 【论文阅读】基于客户端数据子空间主角度的聚类联邦学习分布相似性高效识别
  • Spring AI 系列——使用大模型对文本内容分类归纳并标签化输出
  • python---kafka常规使用
  • nginx的学习笔记
  • shopping mall(document)
  • PPT插入图像自带透明
  • 革新锅炉厂智能控制——Ethernet IP转CANopen协议网关的工业互联新方案
  • gd32 编译环境
  • Java 自动下载 Chromium
  • 嵌入式学习笔记 - 关于单片机的位数
  • Spring生态全景解析:Spring、Spring MVC、SpringBoot与Spring Cloud的关系
  • 警方通报男子地铁上拍视频致乘客恐慌受伤:列车运行一度延误,已行拘
  • 高盛上调A股未来12个月目标点位,沪深300指数潜在回报15%
  • 全国铁路五一假期累计发送1.51亿人次,多项运输指标创历史新高
  • 42岁退役军人高武生命最后时刻:在水中托举近20分钟救出落水孩童
  • 短剧剧组在贵州拍戏突遇极端天气,演员背部、手臂被冰雹砸伤
  • 长和获准出售巴拿马运河港口以外的港口?外交部:该报道没有依据