当前位置: 首页 > news >正文

如何保证RocketMQ消息不丢失

目录

前言

1.哪些环节可能丢消息?

2.保证producer到MQ消息不丢失

3.保证MQ内部消息不丢失

3.1.利用同步刷盘数据可以不丢失

3.2.同步刷盘的缺点

4.保证MQ到consumer消息不丢失


前言

这是作者RocketMQ系列文章中的一篇,一切操作和API都默认基于Spring Boot+RocketMQ,详细的各种代码示例详见:

Spring Boot集成RocketMQ_springboot集成rocketmq-CSDN博客

1.哪些环节可能丢消息?

回忆一下RocketMQ的生产消费过程:

  1. producer生产message发给MQ,
  2. MQ收到消息后将message写入commit log中完成持久化
  3. MQ将message推给consumer(假设是常用的推模式)

上面的环节有丢消息可能的是:

  1. producer发送消息给MQ的路上,消息丢了,MQ没有收到消息,或者MQ挂了,或者网络波动等等原因都有可能,反正就是producer发了,MQ没有收到消息。
  2. message到了MQ内存中,还没有写入commit log中还没有落磁盘,MQ挂了,消息会丢。
  3. MQ推送消息给consumer的路上,message丢了,consumer没有收到

保证消息不丢失,其实就是保证这三步消息不会丢失。接下来分开解决。

2.保证producer到MQ消息不丢失

producer端去校验MQ返回的ACK,要是收到失败的ACK就重新投递消息:

producer在api层面有三种发送方式:

  1. 同步发送:producer同步等待,Broker 返回写入结果(明确 ACK),失败自动重试
  2. 异步发送:通过回调函数接收 Broker 的 ACK/失败结果,需要自定义补偿逻辑
  3. Oneway发送:不等待 Broker 响应(无 ACK)

以下是Sping Boot集成RocketMQ异步发送的代码示例:

public class AsyncProducerService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;
​public void sendAsync() {Message<String> message = MessageBuilder.withPayload("异步消息内容").setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3") // 延迟级别.build();
​rocketMQTemplate.asyncSend("ASYNC_TOPIC", message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("异步发送成功: " + sendResult.getMsgId());}
​@Overridepublic void onException(Throwable e) {System.err.println("异步发送失败: " + e.getMessage());}});}
}

3.保证MQ内部消息不丢失

3.1.利用同步刷盘数据可以不丢失

producer将message发送到consumer中,写入commit log内,commit log在内存里,message在commit log中还没有落磁盘,MQ断电挂了,消息会丢。MQ支持两种刷盘方式:

  1. 同步刷盘,message落磁盘后向producer返回成功的ACK
  2. 异步刷盘,message写入内存后向producer返回成功的ACK

很明显,只要将刷盘方式配置为同步刷盘,MQ内message就一定不会丢失。在broker的配置文件中可以配置刷盘方式:

# 同步刷盘配置
flushDiskType = SYNC_FLUSH

# 异步刷盘配置(默认)
flushDiskType = ASYNC_FLUSH

3.2.同步刷盘的缺点

MQ的刷盘要稍微展开聊一下,才能彻底明白同步刷盘和异步刷盘的不同之处:

数据的读写,即数据IO,向网络上进行读写是网络IO,向磁盘上读写是磁盘IO。操作系统底层IO就是向内存中某块具体的地方进行读写,磁盘的存储和内存地址间存在一种映射关系。

RocketMQ收到message后将消息从内存中写入硬盘中自然也是上面说的这种机制,展开来说就是,commit log是个逻辑概念,是message的一个集合,其整体是存在磁盘上的。操作系统利用内存映射(MappedByteBuffer)将磁盘上的commit映射到内存中的 PageCache,向PageCache中进行读写。

同步刷盘会进行系统调用,强制将到达PageCache的数据强行刷入磁盘中,这就会存在一个问题,每次message来都要触发一次IO,很明显这个效率和吞吐量会是很低的。所以在工业界用的都是异步刷盘。肯定这里会有个疑惑:

异步刷盘数据丢了怎么办?

只能这样说,概率太低太低了,无数前面的工程经验显示用异步刷盘不会有什么问题。

4.保证MQ到consumer消息不丢失

consumer一侧,消息丢失的可能性有两种:

  1. consumer自身就没收到,在传输过程中就丢了或者consumer挂了
  2. consumer收到了,然后用这条消息去走下面的业务流程的时候消息丢失了,比如落库,还没落库断电了,那么对MQ来说message consumer是收到了的,但是实际整个业务上来说message约等于是没有收到。

情况一不需要我们去处理,因为consumer和MQ之间的ACK是默认打开的,要是MQ没收到consumer的成功ACK,那么它会间隔一段时间进行消息的重新投递,默认重发16次,具体重试次数可在配置文件中配置:

超过重试次数,message会被MQ投入死信队列,进入死信队列的消息可由人工去手动处理。

情况二就要我们在consumer端编码的时候注意了,在消费message的时候既要考虑到给MQ返回ACK,保证消息抵达consumer,还要考虑到message真的在业务层面被消费到了,所以将自动ack改为手动ACK,与此同时要根据自己的业务有所思考。以下是作者曾经做的一个订单系统的消费者端如何进行消息的可靠消费的代码示例:

相关文章:

  • 《广度优先搜索》题集
  • 钉钉告警集成部署指南
  • ROS move base 简易调试
  • 在postgresql中,group by时取第一个值
  • AIGC 基础篇 Python基础 04 for循环与while循环
  • 华为仓颉语言初识:并发编程之同步机制(下)
  • 华为0528笔试
  • 基于流形迁移学习的快速动态多目标进化算法(MMTL-MOEA/D)求解FDA1-FDA5和dMOP1-dMOP3,提供完整MATLAB代码
  • C++中的跳转语句
  • 大模型中Function Call的定义与核心功能
  • MMaDA: Multimodal Large Diffusion Language Models
  • AWS Lambda Python + AWS Secrets Manager + AWS Aurora Mysql
  • jupyterhub的浅浅使用-重点在解决无法登录
  • 第四章 RAG 知识库基础
  • visual studio 2022更改主题为深色
  • 深度学习:概念、特点和发展史
  • Python实例题:Python计算实变函数
  • linux libusb使用libusb_claim_interface失败(-6,Resource busy)解决方案
  • 【javascript】泡泡龙游戏中反弹和查找匹配算法
  • 第十三章 RTC 实时时钟
  • 网站后台制作这么做/seo网站怎么搭建
  • 沙田镇网站建设/百度推广引流
  • 读书网站怎么做/百度识图找原图
  • 扁平化网站模板下载/培训机构加盟
  • wordpress标签随机调用/google优化推广
  • 大网站制作公司/seo优化的主要内容