当前位置: 首页 > news >正文

RocketMQ如何保证全链路消息不丢失?

哪些场景会有丢消息的可能:网络+缓存 网络不稳定容易丢数据;缓存断电就丢失了数据。

这两个原因落地到具体的场景:

1.生产者发送消息到 MQ,消息丢失
2.消息写入 MQ,消息丢失
    ① 消息写入内存后非正常关机;
    ② 消息写入磁盘后,磁盘坏了;
    ③ master 数据备份到 slave 由于网络原因导致备份失败;
3.消息者消费 MQ ,消息丢失

在这里插入图片描述

一、保证生产者发送消息到MQ,消息不丢失

【同步发送+多次重试/事务消息机制】

方案一:同步发送+多次重试

DefaultMQProducer发送消息的三种方式:
   1、单向发送:消息发出去就不管了   很容易产生消息的丢失
   2、同步发送:同步等待Broker响应  生产者发送消息给服务端之后,会等待服务端的响应,服务端响应成功了会给客户端一个响应。
      如果客户端没有收到响应,就会给服务端再次发送消息,通过重试的机制来保证消息一定会发送到服务端。这种机制涉及到重试队列,生产者会将消息放入到一个单独的重试队列,重试指定次数后就将消息扔到死信队列中另外去处理。  因为生产者等待服务端响应,就干不了其他事情,效率比较低
   3、异步发送:异步处理Broker通知  生产者会注册一个监听器,生产者往服务端发送消息之后就干其他事情去了,然后服务端处理完成之后就会往生产者发送一个回调,此时监听器来处理,通过这种方式来保证消息的安全性。如果发送失败或者超时,监听器来负责重试发送。 整体性能和安全性得到一个好的综合。
   4、重试队列与死信队列保证消息安全

方案二:RocketMQ提供的事务消息机制
在这里插入图片描述

RocketMQ设计比较独特的机制。可以保证消息的事务性。生产者处理完本地事务和消费者处理完本地事务,这两件事需要保证原子性(分布式事务)。
但是上面这种分布式事务很难,需要通知来通知去,就换了一种方式。保证生产者处理本地事务和往MQ发送消息,保证这两件事情的原子性。
        在消费者一端,只要消息发送成功了,这个消息就可以通过重试的机制,往消费者推送,哪怕消费者处理失败了,最终会通过多次重试保证消费者事务成功。
        通过保证事务的一半从而保证整体业务逻辑的事务性。        

事务消息机制

如果发送一个事务消息,生产者首先会发送一半消息,rocketMQ会针对半消息给一个回复,这个半消息也是一个消息,但是这个消息目前不会被下游消费者消费,这一步主要是下一步之前判断服务是否正常,来做个验证。接下来生产者执行自己本地事务,然后给RocketMQ服务端一个状态,这个状态有三种:
    1、一种是commit状态,表示本地事务成功了,消息就可以直接提交,提交之后向下游服务推送;
    2、如果本地事务执行失败了,就可以向RocketMQ服务端提交一个rollback状态,RocketMQ就会将消息丢弃掉;
    3、如果本地事务执行时间比较长,就会返回一个unknown状态,发送给RocketMQ服务端,那么RocketMQ就会等一段时间之后重新向生产者发起一个响应,来确定生产者的本地事务有没有完成,生产者就会检查本地事务的状态,同时也给RocketMQ一个返回状态, commit就提交,rollback就丢弃消息。
RocketMQ向客户端确定状态,默认重试15次,如果超过15次还没有确认成功,则会将消息rollback丢弃。

二、如何保证消息写入broker不丢失?

1、保证消息能正常从缓存写入磁盘

配置同步刷盘策略

在MQ的master节点上写的消息优先写入缓存,然后再将缓存中的消息定期写到操作系统的磁盘上,这时候就会有一个时间差,消息写到缓存和写到磁盘有个时间差,如果中间出现了非正常关机,直接拔电源断电的这种,就会出现内存中的消息还没有写到磁盘中的现象,那重启内存中的数据就丢了。
在这里插入图片描述

(1)、PageCache是什么?什么是刷盘策略?缓存不安全,可不可以不用PageCache缓存?

我们应用能控制的是应用中的缓存(操作系统中称为用户态),容易造成数据丢失的是内核态的PageCache缓存,内核态是操作系统保留的一部分内存操作,只有操作系统内部可以调用,上游的应用程序操作不了。

我们往操作系统写的每一个消息,比如ppt写完点击保存,实际上就是保存在PageCache缓存中,操作系统会在某个时间段将缓存写到磁盘中。正常关机需要等一段时间,实际上就是浪费在缓存同步。叫做刷盘操作。	

PageCache是操作系统的缓存,你接触不到的,应用程序唯一能做的就是操作系统提供了一个接口叫做刷盘的操作,应用系统可以调用刷盘操作申请一次刷盘。rocketmq提供了一种配置,【可以配置同步刷盘和异步刷盘】。

(2)、RocketMQ如何配置同步刷盘与异步刷盘?

刷盘策略可以通过配置文件broker.conf进行设置。这个配置文件位于RocketMQ安装目录的conf文件夹下
1、同步刷盘(SYNC_FLUSH):在broker.conf中添加或修改以下属性:
flushDiskType=SYNC_FLUSH
2、异步刷盘(ASYNC_FLUSH):确保broker.conf中的相关属性如下所示:
flushDisk/XMLSchema=ASYNC_FLUSH
请注意,上面提到的flushDiskType是决定是否启用同步或异步刷盘的关键配置项。正确设置这个值后,需要重启Broker服务以使配置生效。

(3)、RocketMQ如何实现同步刷盘和异步刷盘?

同步刷盘消息可以更快的写入到磁盘,但是性能比较低,因为写入磁盘是一个很慢的操作;如果每次来个消息就调用刷盘操作写入磁盘,那么如果大量消息过来,频繁调用刷盘,操作系统就会受不了。rocketmq是通过定时每10ms调用一次刷盘策略。10ms时间很短,目的不是保证消息不丢失,而是减少消息丢失的可能性。

异步刷盘是将消息攒一批,再往磁盘里写。 实际上没那么简单。可以选择是不是打开堆外内存(Java应用管理的一部分内存叫做Java堆内存,堆内存是由JC管理的,Java还可以访问更多的操作系统外的内存,称为直接内存,直接内存因为没有JC的介入,所以效率更高);堆外内存缺点是内存没有使用好,不会被释放一直占着,可能会造成内存泄漏,造成OOM。

​ 如果异步刷盘没有使用到直接内存,消息就在Java堆内存中存储一段时间,然后执行一次刷盘操作,刷盘间隔时间长一点,可以配置;

​ 如果使用直接内存,rocketmq只管往直接内存中写消息,写完之后就不管了,操作系统接收,操作系统决定什么时候往磁盘里面写,操作系统通过脏页的机制来往磁盘里写。PageCache是通过一页一页的Page加载到缓存中,如果对Page做了修改,操作系统就会给该Page打个标记,标记为脏页,如果脏页的数据占了内存的阈值了,就进行一次刷盘操作,自己内部刷盘往磁盘写数据。这种方式好处是使用内存效率更高,没有JC参与。

2、Broker出现故障时,保证消息不丢失

搭建Dledger集群模式

【同步同步的主节点,保证信息的安全性;Dledger集群在大部分场景下,安全性已经足够,极端情况可能性非常小,实际用的时候可以忽略】

master不会只配置单独的一个节点,万一master服务器坏了,磁盘就坏了,写入磁盘也没用,通常会配置一个slave,给master节点消息做个备份,master磁盘坏了,slave中还有数据可以提供服务,但是这个过程是跨网络的,有可能就会造成消息丢失,备份不成功。
在这里插入图片描述

rocketmq有两种集群机制:

一种是指定角色的,指定节点是master还是slave,对于master有两种方式:异步同步的主节点(消息发送给master之后,master先给生产者一个响应成功的消息,异步发起一个线程将数据同步给slave,性能比较高,容易产生数据丢失)、同步同步的主节点(生产者将消息发送给master,master立即将消息同步给slave,slave同步完成后,才会给生产者响应成功的信息,性能比较低);master挂了之后,不会主动状态切换,slave只管保存数据。

另一种是Dledger集群,通过两阶段的形式去做同步,生产者将消息写入leader之后,leader先将消息写入到log日志中,在日志中写完之后就会给生产者一个响应,接下来会往其他节点发起一个消息的同步(随着心跳),如果有多数节点完成了消息同步,那么这个消息就会最终写到磁盘里去,将消息记为committed状态。

Dledger是基于RAFT协议来实现的,RAFT不能保证消息不丢失(产生消息丢失的可能性非常小,这个协议用来保证数据一致的,不能保证每一个写到leader的消息都能够保存下来);生产者往leader发很多消息,这个消息首先会记录到log日志中,这个日志还是uncommited状态,这个时候还没有同步到其他节点,服务就挂了,这时就会使用RAFT协议根据心跳重新选取一个日志最新的作为leader,然后所有的消息以leader为准,这个时候老节点没有提交的消息,在服务重启之后会主动丢弃没有提交的消息,以新的leader为准。

有什么办法不丢失消息:zookeper的ZAB协议,就可以保证没有提交的消息不丢失,zookeper在节点新上任的时候就会收集所有foller节点的状态去生成一个初始化的镜像,未提交的消息就会收集到镜像中去,哪怕leader上没有也会将消息恢复出来。但是可用性是有问题,节点发生选举这段时间是不可用的,还有一个是有可能发生脑裂。RAFT协议就很好的解决了脑裂的问题。

三、保证consumer端数据不丢失

同步处理消息,再返回消费成功的响应

消费者从服务端拉一些消息过来,跨网络的,也可能产生消息丢失
server端往consumer端推送数据的时候,消费端需要向server端发送一个响应,表示数据处理完成了,服务端才会停止数据的推送,如果消费失败了,server端会重新投递消息。所以通常情况,不需要考虑consumer端的数据丢失。而是需要考虑消息幂等问题。
有可能服务端将消息推送给consumer,consumer没有给服务端响应,可能由于网络原因造成响应丢失,服务端将消息往另一个consumer推送,会造成消息重复。

也不是绝对的消息不丢失,如果消费端使用异步方式处理业务逻辑,直接返回成功,但是异步处理失败容易造成消息丢失。【在任何时候都需要考虑并发的重要性】一定不要在消费端用异步的方式处理。

四、如果broker和NameSever全都挂了怎么办?

增加临时的降级缓存
在这里插入图片描述

rocketmq挂了,做一个简单的备份就可以了,简单的写个缓存,生产者往一个降级的缓存中去写,可以是redis,也可以是内存中的集合都可以。往缓存里写消息,生产者就可以做其他事情去了,接着另起一个线程,不断尝试将降级缓存中的消息往rocketmq中去写,rocketmq重启了之后,这些消息就能尽快写入rocketmq,下游服务就能正常了。
在这里插入图片描述
在这里插入图片描述

总结

1、生产者将消息发送到mq,消息不丢失
        同步发送+多次尝试 - 降低吞吐量
        事务消息机制 - 多次网络请求
2Broker收到消息后,消息不丢失
        设置同步刷盘 - IO负担 (通过配置broker.conf文件,开启同步刷盘  flushDiskType = SYNC_FLUSH)
        搭建Dledger - 不断地RPC心跳,网络负担
3、消费者消费消息不丢失
        同步处理消息,再返回消费成功的响应
4、整个MQ集群挂了,保证消息零丢失
        增加临时的降级缓存

相关文章:

  • docker容器制作和上传
  • Maven插件学习(二)——测试插件maven-surefire-pluigin
  • Linux的一些常见指令
  • 如何查看window电脑的GPU信息
  • docker部署onlyoffice(windows版)
  • Android系统的安全问题 - Android的启动时验证
  • WebGPU 全面解析:下一代 Web 图形与计算 API 的崛起
  • Pytorch学习笔记(六)Learn the Basics - Automatic Differentiation
  • 常见邮件协议
  • 自然语言处理(14:处理时序数据的层的实现)
  • 数学-算法
  • java对象模型
  • Unity游戏开发如何优化移动端的延迟渲染管线?
  • 【NLP 43、文本生成任务】
  • 使用HTML5和CSS3实现3D旋转相册效果
  • LeetCode热题100精讲——Top4:移动零【双指针】
  • SpringBoot底层原理
  • AndroidStudio 下载
  • 大疆上云api直播功能如何实现
  • Linux文件目录管理指令详解(上篇)
  • 魅力网络营销公司/网站搜索优化技巧
  • 网站开发前途/邯郸seo优化公司
  • 做类似起点的网站/微信小程序免费制作平台
  • 网站服务器是网站的空间吗/青岛百度推广优化
  • 合肥包河区最新消息/宁波seo网页怎么优化
  • 好的手机网站推荐/html简单网页代码