当前位置: 首页 > news >正文

中间件-MQ常见问题

MQ常见问题

  • 消息丢失
      • 消息会在哪些环节丢失
      • 应对机制
  • 消息的顺序性
  • 消息幂等
  • 消息积压的处理

消息丢失

消息会在哪些环节丢失

  1. 网络传输环节:生产者发送消息到broker,broker中master同步消息给slave,consumer消费消息,这3个环节都是跨网络传输,可能造成消息丢失。
  2. 缓存信息刷盘环节:MQ存盘时都会先写⼊操作系统的缓存pagecache中,然后再由操作系统异步的将消息写⼊硬盘。这个中间有个时间差,就可能会造成消息丢失。如果服务挂了,缓存中还没有来得及写⼊硬盘的消息就会丢失。

应对机制

  1. 生产者发送消息环节:消息确认机制。使用同步发送机制,可以收到broker端的发送成功确认。该方法消息安全但是效率低。
  2. 消息同步环节:普通master-slave集群下,master挂掉后slave不会主动切换未master,等master再次启动后,消息不会丢失,但这种方式牺牲了可用性。高可用Dledger集群,消息要写入半数以上节点才会通知客户端写成功,消息安全性比较高,可以认为不会丢失消息。但是在脑裂情况下,也有可能会丢失消息。
  3. 刷盘环节:RocketMQ的Broker提供了⼀个很明确的配置项flushDiskType,可以选择刷盘模式。有两个可选项,SYNC_FLUSH同步刷盘和ASYNC_FLUSH异步刷盘。所谓同步刷盘,是指broker每往⽇志⽂件中写⼊⼀条消息,就调⽤⼀次刷盘操作,其实也是以10毫秒的间隔去调⽤刷盘操作。⽽异步刷盘,则是指broker每隔⼀个固定的时间,才去调⽤⼀次刷盘操作。异步刷盘性能更稳定,但是会有丢消息的可能。⽽同步刷盘的消息安全性就更⾼,但是操作系统的IO压⼒就会⾮常⼤。从理论上来说,也还是会有⾮正常断电造成消息丢失的可能,甚⾄严格意义上来说,任何应⽤程序都不可能完全保证断电消息不丢失。
  4. 消费环节:消费状态确认机制。也就是消费者处理完消息后,需要给Broker⼀个响应,
    表示消息被正常处理了。如果Broker端没有拿到这个响应,不管是因为Consumer没有拿到消息,还是Consumer处理完消息后没有给出相应,Broker都会认为消息没有处理成功。之后,Broker就会向Consumer重复投递这些没有处理成功的消息。RocketMQ是把消费失败的消息方式重试队列里面重新推送。如果Consumer给Broker返回了消费成功,用异步的方式去处理消息,这种情况可能会丢失消息。
  5. 如果MQ服务全挂掉了,想继续保持业务运行,又想不丢失消息,通常的做法是设计⼀个降级缓存。Producer往MQ发消息失败了,就往降级缓存中写,然后,依然正常去进⾏后续的业务。此时,再启动⼀个线程,不断尝试将降级缓存中的数据往MQ中发送。这样,⾄少当MQ服务恢复过来后,这些消息可以尽快进⼊到MQ中,继续往下游Conusmer推送,⽽不⾄于造成消息丢失。
    在这里插入图片描述

消息的顺序性

通常讨论MQ的消息顺序性,其实是在强调局部有序,⽽不是全局有序。比如某个业务流程的消息有序。
RocketMQ的顺序消费机制:Producer通过设置MessageQueueSelector将⼀组有序的消息写⼊到同⼀个MessageQueue中。Consumer使用Orderly的消费方式每个线程集中从⼀个MessageQueue中拿取消息。

消息幂等

  1. 消息的重复发送:Producer发送消息时,如果采⽤发送者确认的机制,那么Producer发送消息会等待Broker的响应。如果没有收到Broker的响应,Producer就会发起重试。但是,Producer没有收到Broker的响应,也有可能是Broker已经正常处理完了消息,只不过发给Producer的响应请求丢失了。这时候Producer再次发起消息重试,就有可能造成消息重复。RocketMQ的处理⽅式,是会在发送消息时,给每条消息分配⼀个唯⼀的ID。重试时broker可以根据msgId判断是否已经处理。
  2. 消息的重复消费:RocketMQ是通过消费者的响应机制来推进offset的,如果consumer从broker上获取了消息,正常处理之后,他要往broker返回⼀个响应,但是如果⽹络出现波动,consumer从broker上拿取到了消息,但是等到他向broker发响应时,发⽣⽹络波动,这个响应丢失了,那么就会造成消息的重复消费。因为broker没有收到响应,就会向这个Consumer所在的Group重复投递消息。Comsumer端可以使用msgId进行唯一性控制,或者更严格的可以使⽤message的key属性写入业务的唯一属性来控制。

消息积压的处理

产⽣消息积压的根本原因还是Consumer处理消息的效率低于消息产生的速度。所以处理消息积压第一个可以优化Consumer处理消息的效率,第二个可以增加Consumer实例的个数。但是增加Consumer实例个数是有上限的。RocketMQ中一个Topic下的MessageQueue只能由一个消费者绑定,因此如果消费者实例数最多增加到等于MessageQueue的个数。如果此时再继续增加Consumer的实例,那么就会有些Consumer实例是没有MessageQueue去消费的,因此也就没有⽤了。
如果要快速处理积压的消息,可以创建⼀个新的Topic,配置⾜够多的MessageQueue。
然后把Consumer实例的Topic转向新的Topic,并紧急上线⼀组新的消费者,只负责消费旧Topic中的消息,并转存到新的Topic中。这个速度明显会⽐普通Consumer处理业务逻辑要快很多。然后在新的Topic上,就可以通过添加消费者个数来提⾼消费速度了。之后再根据情况考虑是否要恢复成正常情况。

相关文章:

  • 基于AH1101芯片的5V升18.6V LED恒流背光供电方案设计
  • 从代码学习深度学习 - 实战 Kaggle 比赛:图像分类 (CIFAR-10 PyTorch版)
  • electron进程通信
  • constexpr 关键字的意义(入门)
  • 怎样用 esProc 实现连续区间的差集运算
  • 什么是 NB-IoT ?窄带IoT 应用
  • 【SPIN】用Promela验证顺序程序:从断言到SPIN实战(SPIN学习系列--2)
  • 华为Watch的ECG功能技术分析
  • 解决 Ubuntu 22.04 安装后启动卡死问题
  • recvfrom和sendto函数中地址参数的作用
  • C++算法(22):二维数组参数传递,从内存模型到高效实践
  • 原生微信小程序 textarea组件placeholder无法换行的问题解决办法
  • postgresql主从+repmgr+keepalive安装
  • 如何在 IntelliJ IDEA 中配置并调用虚拟机 HDFS
  • uniapp微信小程序一键授权登录
  • AI数字人融合VR全景:开启未来营销与交互新篇章
  • 无人机动力系统全解析:核心组件、工作原理与实用指南
  • shell脚本练习(6):备份MySQL数据库表
  • MH22D3开发高级UI应用,适配arm2d驱动
  • 高效管理多后端服务:Nginx 配置与实践指南
  • 高途一季度净利润同比增长1108%: “与吴彦祖一起学英语”短时间内就实现了盈利
  • 泉州围头湾一港区项目炸礁被指影响中华白海豚,官方:已叫停重新评估
  • 俄媒:俄乌伊斯坦布尔谈判将于北京时间今天17时30分开始
  • 美国将与阿联酋合作建立海外最大的人工智能数据中心
  • 贵州省委军民融合发展委员会办公室副主任李刚接受审查调查
  • 透视社会组织创新实践中的花开岭现象:与乡村发展的融合共进