当前位置: 首页 > news >正文

kafka高可用数据不丢失不重复分区内有序性

系列文章目录

文章目录

  • 系列文章目录
  • 一、acks的确认机制
  • 二、重试机制+幂等性
  • 三、总结
  • 四、kafka事务如何保证精准一次持久化?
    • 1、kafka事务基本流程


一、acks的确认机制

2个重要的参数: acks和min.insync.replicas
将acks=all和min.insync.replicas结合起来,就能保证数据端到端的高可用。其中acks是producer生产者端配置参数,min.insync.replicas是broker端配置参数

acks参数指定了必须要有多少个分区副本收到消息,生产者才认为该消息是写入成功的
kafka版本<3.0, acks=1; >3.0, acks=all或者-1

acks=0, 生产者在发送消息时就认为已经成功写入,不需要等待来自broker端服务器的响应. 只管发,发送即忘,吞吐量高,所有网络带宽都用来发送消息,但是可靠性低

acks=1, 需要等待leader 响应。

acks=-1, 表示所有副本都写入消息,客户端才会收到成功响应

二、重试机制+幂等性

重新发送会造成数据重复,数据乱序。

幂等生产者可以保证发送数据不丢失,分区内数据不重复以及分区内数据的顺序性。

Broker端怎么知道一条消息是不是重复的呢?可以给这条消息加上唯一键标识,唯一键应该选择什么粒度呢?kafka的设计是在分区的维度上设计唯一键,让每个分区的leader判断数据是否重复。选择设计的唯一键是producer+topicPartition维度。

如何保证消息的顺序性呢?生产者在发送消息时会同时发送一个序列号,序列号以topicPartition为基础,从0开始,并随着每条消息的产生而递增。在broker端通过序列号判断消息是否顺序发送。所以,kafka要做到幂等性,就要遵循

  1. PID: 每个生产者都要有一个Id
  2. sequence number: 生产者发送消息时会发送一个序列号,序列号以topicPartition为基础,从0开始
  3. 新消息:beoler新收到的消息的序列号刚好比给给定生产者在本地存储的系列号大1
  4. 重复消息:broker新收到的消息的序列号小于等于生产者保存的最大序列号
  5. 失序:Broker新收到的消息序列号-本地最大序列号的差大于1. 意味着有消息丢失了

总结一下,如何解决重复:假设第一批发送到broker,但是由于网络抖动,producer没有收到响应,于是再次发送相同的数据,broker收到数据后会检查seq, 如果baseSeq小于第一批里面的lastSeq,说明重复发送了,那么就会丢弃当前这批数据,给生产者一个ack。
如何解决乱序?由于inFlightRequest可以并行发送多个请求。最大可以发送5个请求,比如下面这组数据,
123,789,456, 乱序
123,456,789,有序

Broker 端对序列号的校验,Kafka 幂等性不能保证请求之间的顺序,但可以确保每条消息的 序列号是严格递增的,从而保证 写入日志的顺序 是正确的。

broker端落盘数据示例
每一个producerbatch会存储第一条消息的序列号,以及最后一条的序列号
baseOffset: 5522708 lastOffset: 5522745 count:38 baseSequence: 239704 lastSequence: 239741 ...|offset: 5522708 CreateTime: xxx  sequence: 239704...
|offset: 5522709 CreateTime: xxx  sequence: 239705...
|offset: 5522710 CreateTime: xxx  sequence: 239706...
|offset: 5522711 CreateTime: xxx  sequence: 239707...

三、总结

幂等性生产者可以避免分区内消息重复,保持消息的顺序性,即使飞行队列中有多个请求也不例外。借助producerBatch中的序列号,broker可以拒绝任何序列号不等于最后一个序列号+1的消息。 但是幂等性只能保证单个绘画内消息不丢不重复不乱序, 如果producer端重启了,就不能保证了。

幂等性不能跨多个topicPartition,只能保证producer在单个topicPartition内的幂等性。当涉及多个topicPartition时,这些状态信息并没有同步

如果需要实现跨会话,跨多个topicPartition的幂等性,需要使用kafka的事务性。

故事1
leader收到三条消息,还没回ack,follower还没来得及同步完,leader就挂了
在这里插入图片描述
broker现在选出新的leader。producer再次发送三条消息,leader会检查sequence number,也就是说已经收到的message1不会重复消费,follower也会同步最新的message2和message3,所以能保证同一个会话中数据重复问题
在这里插入图片描述

故事2
生产者宕机,当producer 重启,pid发生变化,消息的key<pid, partition, sequenceNumber>会发生变化,所以producer再次发送三条消息,broker认为是新的数据,导致同一个分区跨会话的消息重复

故事3
生产者宕机,当producer 重启,pid发生变化,消息的key<pid, partition, sequenceNumber>会发生变化,所以producer再次发送三条消息,假设此时不往protition 0发送,而是向partition1 发送。对于重新启动的生产者来说,会按照下面公式计算路由分区。重启后生产者线程编号!=旧生产者的线程编号,可能导致重启后的生产者将数据发到partition 1。 当然也可能发到partition 0. 于是造成了分区间数据重复。

int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % availablePartitions.size()).partition();

public int nextPartition(String topic, Cluster cluster, int prevPartition) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);Integer oldPart = indexCache.get(topic);Integer newPart = oldPart;// Check that the current sticky partition for the topic is either not set or that the partition that // triggered the new batch matches the sticky partition that needs to be changed.if (oldPart == null || oldPart == prevPartition) {List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.isEmpty()) {int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());newPart = random % partitions.size();} else if (availablePartitions.size() == 1) {newPart = availablePartitions.get(0).partition();} else {while (newPart == null || newPart.equals(oldPart)) {int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());newPart = availablePartitions.get(random % availablePartitions.size()).partition();}}// Only change the sticky partition if it is null or prevPartition matches the current sticky partition.if (oldPart == null) {indexCache.putIfAbsent(topic, newPart);} else {indexCache.replace(topic, prevPartition, newPart);}return indexCache.get(topic);}return indexCache.get(topic);}

四、kafka事务如何保证精准一次持久化?

Kafka 事务(Transactions)机制是为了解决 跨多个分区的消息写入的一致性问题,特别是在使用 幂等生产者(Idempotent Producer) 无法满足需求的场景下。比如:

多个消息需要一起写入不同分区
消息必须要么全部成功写入,要么全部失败(ACID 特性)

1、kafka事务基本流程

Producer 发起事务

使用 beginTransaction() 开始一个事务。
Producer 写入消息到多个分区

每条消息都会携带一个 producer_id 和 sequence_number(类似幂等机制)
消息被写入对应的分区日志中
Producer 提交事务

调用 commitTransaction() 提交事务
或调用 abortTransaction() 中止事务
事务协调器记录事务状态

将事务状态保存到一个特殊的分区(如 __transaction_state 分区)
这个分区由 Kafka 自动维护,用于记录所有事务的状态
Broker 确认事务

Broker 收到提交请求后,会检查事务是否已成功完成
如果是提交,则将这些消息标记为“已提交”
如果是中止,则丢弃这些消息

开启事务: producer将partition信息提交给TC,TC将<transactionId, TopicPartition>的对应关系写到_transaction_state中

producer.send: 往分区中写入数据

准备提交事务: 具体做法就是往分区对应的目录中的最新log文件中写入一条记录,标识之前的消息是否写入成功。

TC修改事务状态

在这里插入图片描述

http://www.dtcms.com/a/393947.html

相关文章:

  • KRaft 运维从静态到动态 Controller
  • 自动语音识别--Zipformer ASR模型
  • 计算机视觉与深度学习 | 图像去雾算法综述:原理、公式与代码实现
  • MySQL sql语言简介和DDL语句介绍
  • [数据结构] 二叉树
  • 4+10+N,华为坤灵“求解”中小企业智能化
  • ECharts 四川省地图渲染与交互效果实现
  • Zynq开发实践(SDK之自定义IP3 - 软件IP联调)
  • VMware虚拟机中CentOS的network配置好后ping不通问题解决方法
  • 传输层————TCP
  • [已更新]2025华为杯B题数学建模研赛B题研究生数学建模思路代码文章成品:无线通信系统链路速率建模
  • 机器学习相关内容
  • 【win11】自动登录,开机进入桌面
  • 关系型数据库系统概述:MySQL与PostgreSQL
  • python编程练习(Day8)
  • 【Linux命令从入门到精通系列指南】apt 命令详解:Debian/Ubuntu 系统包管理的现代利器
  • xtuoj 7的倍数
  • 【开题答辩全过程】以 java牙科门诊管理系统为例,包含答辩的问题和答案
  • 【论文速递】2025年第19周(May-04-10)(Robotics/Embodied AI/LLM)
  • 鸿蒙 - 验证码功能
  • 大数据毕业设计选题推荐-基于大数据的汽车之家数据分析系统-Hadoop-Spark-数据可视化-BigData
  • Bioconductor 项目为高通量生物数据分析提供了大量强大的工具 Bioconductor规范,核心是一系列设计精良、标准化的数据对象
  • 还有新援?利物浦即将启动预签协议,锁定英格兰新星
  • Audacity音频软件介绍和使用
  • SpringBoot配置优化:Tomcat+数据库+缓存+日志全场景教程
  • 《数据库系统概论》——陈红、卢卫-1-数据库系统概述
  • VLA-Adapter:一种适用于微型 VLA 的有效范式
  • JVM内存模型深度剖析与优化
  • 固定收益理论(六)波动率曲面、曲线及其构建模型
  • Zotero使用学习笔记