当前位置: 首页 > news >正文

RabbitMQ 幂等性, 顺序性 和 消息积压

目录

1. 幂等性

1.1 简介

1.2 幂等性的划分

1.3 可能会引发消息重复传输的场景

1.4 如何解决消息重复接收

1.4.1 全局唯一 ID 

1.4.2 业务逻辑判断

2. 顺序性

2.1 简介

2.2 会打破顺序性的情况

2.3 如何保证消息的顺序性

3. 消息积压

3.1 简介

3.2 消息积压产生的原因

3.3 如何解决消息积压


1. 幂等性

1.1 简介

幂等性是指对一个系统进行重复调用 (相同的参数), 不论请求多少次, 这些请求对系统的影响都是相同的效果.

比如, 数据库中的 seletc 操作, 不同时间两次查询结果可能不同, 但是这个操作是复合幂等性的. 幂等性指的是对资源的影响, 而不是返回结果. 查询操作对数据资源本身不会产生影响, 之所以结果不同, 可能是因为两次查询之间有其他的操作对资源进行了修改.

比如, i++ 这个操作, 就是非幂等性的. 如果调用方没有控制好逻辑, 一次流程重复调用好几次, 结果就会不同.

日常生活中, 要是我们的支付不遵循幂等性, 那还不完蛋. 所以要保持幂等性

1.2 幂等性的划分

一般消息中间件的消息传输保障分为三个层级 : 

  • At most once : 消息可能会丢失, 但绝不会重复传输 (最多一次)
  • At least once : 消息绝对不会丢失, 但可能重复传输 (最少一次)
  • Exactil once : 每条消息肯定会被传输一次且仅传输一次 (恰好一次)

RabbitMQ 支持最少一次和最多一次. 对于恰好一次, RabbitMQ 目前做不到, 而且很多消息中间价都做不到这一点. 

在业务使用中, 对于可靠性要求较高的场景, 建议使用最少一次, 以防消息丢失. 最多一次会因为消息发送过程中, 网络问题, 消费出现异常等种种原因, 导致消息丢失. 但是最少一次, 就会使得消费端可能收到重复的消息, 也会造成对同一条消息的多次处理. 对于一些比较重要的业务而言, 重复处理相同的消息, 就会造成严重事故. 例如 : 当用户对一个订单付款之后, 因为网络问题, 付款成功的结果未返回给订单系统, 当用户再次点击付款时, 如果系统未做幂等性处理, 那就会造成两次扣款.

1.3 可能会引发消息重复传输的场景

  • 当生产者给 MQ 发送消息时, MQ 已经收到消息并且返回了 ack, 但由于网络等原因, 生产者没有收到 ack, 那么此时生产者就会再次向 MQ 发送该条消息, 就造成了 MQ 收到了两条重复的消息.

  • 当 MQ 给消费者发送消息时, 消费者已经成功处理了消息并返回了 ack, 但由于网络等原因, MQ没有收到 ack, 于是 MQ 就会再次给消费者发送该条消息, 就造成了消费者收到两条重复的消息.

1.4 如何解决消息重复接收

1.4.1 全局唯一 ID 
  • 首先, 为每条消息分配一个唯一标识符, 例如雪花算法, UUID等, 只要能保证唯一性即可.
  • 其次, 消费者收到消息后, 先用唯一标识符判断该消息是否已经消费过, 如果消费过就直接放弃.
  • 最后, 如果没有消费过, 消费者就开始消费信息, 业务处理成功之后, 把唯一标识符保存起来.

        可以使用 Redis 的原子性操作 setnx 来保证幂等性, 将唯一标识符作为 key 放到 Redis 中. 消费消息之前, 先 setnx id 1. 如果返回值为 1 (这里成功设置表示我之前没有设置过), 表示之前没有消费过, 正常消费. 返回值为 0 (表示我们已经消费过了), 则表示这条消息之前已消费过, 直接抛弃.

这样消费者就可以根据 Redis 中是否存有接收到的消息的 ID 来判断该消息是否存储过.

1.4.2 业务逻辑判断
  • 可以使用数据库来保证幂等性, 通过检查数据库是否已经存在相关数据记录.
  • 也可以使用锁机制来保证幂等性, 通过使用乐观锁机制来避免更新已经被其他事务更改的数据,
  • 还可以使用相关业务状态来保证幂等性, 在消费者处理消息之前, 先检查相关的业务状, 确保消息对应的操作尚未执行, 然后才进行处理, 具体根据业务场景来做.

2. 顺序性

2.1 简介

消息的顺序性是指消费者消费的消息和生产者发送消息的顺序是一致的. 例如生产者发送的消息依次是msg1, msg2, msg3, 那么消费者消费消息的顺序也必须按照msg1, msg2, msg3 的顺序进行.

比如, 用户需要更改他的个人信息, 这个时候就要保证顺序性, 因为用户可能会修改资料很多次, 但是我们要按照顺序执行, 最后保留的是他最后一次修改的内容. 不然就会和用户所预期的内容不一致.

2.2 会打破顺序性的情况

在没有网络故障, 消息丢失的情况下, 只有一个生产者与一个消费者时, RabbitMQ 可以保证消息的顺序性, 当出现下列情况时, RabbitMQ 就无法保证消息的顺序性.

  • 多个生产者 : 如果有多个生产者同时发送消息, 无法确定消息到达 Broker 的前后顺序, 也就无法验证消息的顺序性.
  • 多个消费者 : 当队列配置了多个消费者时, 消息可能会被不同的消费者并行处理, 然而消费者处理消息的速度是不同的, 从而导致消息处理的顺序性无法保证. 
  • 网络波动异常 :  在消息传递的过程中, 如果出现网络波动或异常, 可能会导致 ack (消息确认) 丢失, 从而使得消息被重新入队和重新消费, 造成顺序性问题.
  • 消息重试 : 如果消费者在处理消息后未能及时发送确认, 或者确认消息在传输过程中丢失, 那么 MQ 可能会认为消息未被成功消费而进行重试, 这也可能导致消息处理的顺序性问题.
  • 消息路由问题 : 复杂的路由场景中, 消息可能会根据路由键被发送到不同的队列, 然而队里消费的速度不同, 从而无法保证全局的顺序性.
  • 死信队列 : 消息因为某些原因进入死信队列, 死信队列被消费时, 无法保证消息的顺序和生产者发送消息的顺序一致.

2.3 如何保证消息的顺序性

消息顺序性保障分为 : 局部顺序性保障和全局顺序性保障.

  • 局部顺序性保障 : 在单个队列内部保障消息的顺序
  • 全局顺序性保障 : 在多个队列或多个消费者之间保障消息的顺序.

实现顺序性的方法 : 

  • 单队列单消费者 : 使用单个队列, 并由单个消费者进行处理, 同一个队列中的消息是先进先出的
  • 分区消费 : 将一个队列分割成多个分区, 每个分区由一个消费者处理, 以此保障每个分区的顺序性. (Spring-Cloud-Stream)
  • 消息确认机制 : 使用手动确认机制, 当消费者消费完一条消息后, 向服务器发送 ack, 服务器接收到消息后才会继续发送下一条消息.
  • 业务逻辑控制 : 在消息中添加 id 属性, 即使消费者接收到的消息不是按照顺序发送的, 也可以根据 id 进行排序.

3. 消息积压

3.1 简介

在消息队列中, 待处理的消息数量超过了消费者的处理能力, 导致消息在队列中不断堆积的现象.

3.2 消息积压产生的原因

  • 消息产生过快 : 在高流量或者高负载的情况下, 生产者以极高的速率发送消息
  • 消费者处理能力不足 : 
  1. 消费端业务逻辑复杂, 耗时长
  2. 消费端代码性能低
  3. 系统资源限制, 如 CPU, 内存, 磁盘...
  4. 异常处理不当, 消费者在处理消息时出现异常, 导致消息无法被正确处理和确认
  • 网络问题 : 网络不稳定或延迟, 导致消费者发送的 ack 无法被服务器接收或丢失, 若设置了消息重新入队列, 就会造成消息重新发送, 也会造成消息积压
  • RabbitMQ 服务器的配置偏低

3.3 如何解决消息积压

  • 提高消费者效率 : 
  1. 加消费者实例数量, 比如新增机器.
  2. 优化业务逻辑,  比如使用多线程来处理业务.
  3. 设置 prefetchCount, 当一个消费者阻塞时, 消息转发到其他未阻塞的消费者
  4. 消息发生异常时, 设置合适的重试策略, 或者转入到死信队列
  • 限制生产者效率 : 
  1. 流量控制 : 在消息生产者中实现流量控制逻辑, 根据消费者处理能力动态调整发送效率.
  2. 限流 : 使用限流工具, 为消息发送效率设置一个上限.
  3. 设置过期时间 : 如果消息过期未被消费, 可以配置死信队列, 以避免消息丢失, 同时减少对主队列的压力.
  • 资源与配置优化 : 比如升级 RabbitMQ 服务器的硬件, 调整 RabbitMQ 的配置参数等.

文章转载自:

http://YRGSqWR9.ksjnL.cn
http://LdCOfTK6.ksjnL.cn
http://aYeCESlC.ksjnL.cn
http://Z8Tq5x9F.ksjnL.cn
http://YElTnay2.ksjnL.cn
http://cge5ZPMm.ksjnL.cn
http://5IaIKjR8.ksjnL.cn
http://av2Y2K3J.ksjnL.cn
http://2cit5lr3.ksjnL.cn
http://KKd3Xtyv.ksjnL.cn
http://NMDL7fzz.ksjnL.cn
http://ABEUaUOI.ksjnL.cn
http://DgvONx4p.ksjnL.cn
http://jqLm7Iy2.ksjnL.cn
http://u4T1IdWM.ksjnL.cn
http://ZJEnTlQ4.ksjnL.cn
http://IQnqIfLF.ksjnL.cn
http://YUv1G2PI.ksjnL.cn
http://u1oPM0E3.ksjnL.cn
http://hI457xCQ.ksjnL.cn
http://CIxQXG5h.ksjnL.cn
http://ldrEIAZ7.ksjnL.cn
http://GBR74gSa.ksjnL.cn
http://cFH2OyIC.ksjnL.cn
http://V1BimECF.ksjnL.cn
http://rIOtMowR.ksjnL.cn
http://ostC2IdX.ksjnL.cn
http://wgQt99tq.ksjnL.cn
http://t1HnkACj.ksjnL.cn
http://j0RiEyAo.ksjnL.cn
http://www.dtcms.com/a/374098.html

相关文章:

  • 单片机按键示例功能
  • Enable FIPS in ubuntu (by quqi99)
  • OpenAI的开源王牌:gpt-oss上手指南与深度解析
  • 使用nvidia-ml-py监控与管理GPU资源
  • 鹧鸪云光储流程系统全新升级:视频指引与分阶段模块使用指南
  • qx-13 开发数据服务总线
  • GD32入门到实战44--LVGL使用外部SRAM
  • 硬件驱动芯片——I.MX6ULL芯片(1)
  • MV190E0M-N10 工业广视角液晶模组技术白皮书
  • AI+预测3D新模型百十个定位预测+胆码预测+去和尾2025年9月8日第173弹
  • 机器视觉的手机柔性屏贴合应用
  • 【PyTorch】图像二分类-部署
  • 纵向循环缓慢滚动图片
  • 项目日记 -日志系统 -明确目标、规划模块并完成项目文档
  • 【C++上岸】C++常见面试题目--网络篇(第二十二期)
  • 数据治理系列(一):数据治理的整体框架与发展趋势
  • 【LeetCode 每日一题】1504. 统计全 1 子矩形
  • FastGPT源码解析 Agent知识库文本资料处理详解和代码分析
  • php 实现 导入excel 带图片导入
  • JP4-7-MyLesson后台前端(五)
  • 【系统分析师】第17章-关键技术:嵌入式系统分析与设计(核心总结)
  • Centos9安装rocketmq
  • Docker | 一种使用 docker-compose 命令将 YAML 定义的配置文件导入到 Docker 的方法
  • 编译器构造:模拟器,汇编与反汇编
  • 自由学习记录(96)
  • Cy5-Tyramide, Cyanine 5 Tyramide;1431148-26-3
  • JMeter接口测试全流程解析
  • ARM处理器的小常识
  • Go语言极速入门与精要指南从零到精通的系统化学习路径
  • RK3576 android14 usb_audio_policy_configuration.xml解析