当前位置: 首页 > news >正文

Kafka 实现从网络层到日志与位点的“全景拆解”

1 网络层:NIO、多路复用与 sendfile

Kafka 的网络层是一个相当直接的 NIO 服务器。为高效利用零拷贝,Kafka 让 TransferableRecords 接口提供 writeTo 方法,使基于文件的 message set 可以走更高效的 transferTo 路径,而不是进程内缓冲写。

  • 线程模型1 个 acceptor 线程 + N 个 processor 线程;每个 processor 维护固定数量的连接。
  • 协议目标:保持尽量简单,便于未来用其他语言实现客户端。
  • sendfile 路径:通过 transferTo内核页缓存里的数据直接写入套接字,减少用户态/内核态拷贝。

这套模型在实践中已广泛验证:实现简单、性能稳定。

2 消息抽象:Record、Key/Value 与 RecordBatch

一条消息(Record)由三部分构成:

  1. 变长 Header(头)
  2. 变长不透明 key(字节数组)
  3. 变长不透明 value(字节数组)

Kafka 不限定 key/value 的具体序列化格式——这使其能适配 JSON/Avro/ProtoBuf/自定义二进制等多种序列化方案。上层应用通常会统一一种序列化规范。

RecordBatch对消息的迭代器,并提供面向 NIO Channel 的批量读写专用方法——这是 Kafka 达到高吞吐的关键。

3 消息格式(Message Format):一切皆批

Kafka 的消息总是以批(batch)形式写入。批里包含 1 条或多条 Record(退化情形仅 1 条)。RecordBatch 与 Record 都有自己的头/体格式。

3.1 RecordBatch:磁盘格式与压缩、事务、CRC

磁盘上的 RecordBatch 格式:

baseOffset: int64
batchLength: int32
partitionLeaderEpoch: int32
magic: int8 (当前 magic 值为 2)
crc: uint32
attributes: int16bit 0~2:0: 无压缩1: gzip2: snappy3: lz44: zstdbit 3: timestampTypebit 4: isTransactional (0 表示非事务批)bit 5: isControlBatch (0 表示非控制批)bit 6: hasDeleteHorizonMs (0 表示 baseTimestamp 不是压缩删除边界)bit 7~15: 未使用
lastOffsetDelta: int32
baseTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
recordsCount: int32
records: [Record]
  • 压缩:启用压缩时,压缩后的记录数据紧跟在 recordsCount 后写入。
  • CRC 覆盖范围:从 attributes 到批末尾(即 CRC 之后的所有字节)。
  • CRC 位置:在 magic 之后,因此客户端必须先解析 magic 再决定如何解释 batchLengthmagic 之间的字节。
  • CRC 计算CRC-32C(Castagnoli)
  • 为何 partitionLeaderEpoch 不计入 CRC:避免 broker 为每个收到的批分配该字段时重复计算 CRC
清理(Compaction)与生产者幂等状态
  • 必须保留批的首/尾 offset 与 sequence,以便日志重放时恢复生产者状态。

    • 若不保留最后序列号,分区主副本切换后,生产者可能遇到 OutOfSequence
    • broker 通过首/尾序列号重复检查
  • 因此可能出现空批(批内记录都被清理,但为保留生产者最后序列号而保留批的元信息)。

  • 注意:baseTimestamp 在压缩时不保留,若第一条记录被清理,它会变化。

删除地平线(Delete Horizon)与特殊记录
  • 当批内存在空 payload记录或事务中止标记时,清理也可能修改 baseTimestamp

    • 将其设为这些记录应被删除的时间,
    • 并将 attributes 中的 delete horizon 位 置位。
3.1.1 控制批(Control Batches)

控制批只含一条控制记录(control record),它不会传递给应用,而是用于帮助消费者过滤已中止事务的消息

  • 控制记录 key schema:

    version: int16 (当前为 0)
    type: int16 (0 表示 abort 标记;1 表示 commit)
    
  • 控制记录 value:随类型而变,对客户端不透明

3.2 Record:变长编码与 Header

磁盘上的 Record 格式:

length: varint
attributes: int8bit 0~7: 未使用
timestampDelta: varlong
offsetDelta: varint
keyLength: varint
key: byte[]
valueLength: varint
value: byte[]
headersCount: varint
Headers => [Header]
3.2.1 Record Header
headerKeyLength: varint
headerKey: String
headerValueLength: varint
Value: byte[]
  • varint 编码Protobuf 相同(headers 的数量也使用 varint)。

3.3 旧消息格式(Old Message Format)

Kafka 0.11 之前,消息通过 message set 进行传输与存储。详情见 Old Message Format

4 日志(Log):文件布局、读写路径与删除策略

以主题 my-topic2 个分区为例,磁盘上会有两个目录my-topic-0my-topic-1。每个目录包含数据文件,文件由**“日志条目”**顺序组成:

  • 条目以 **4 字节整数 N(消息长度)**开头,后接 N 字节的消息内容
  • 64 位 offset该分区自始至今的消息字节流”中本条消息起始的字节偏移,在分区内唯一且单调递增。
  • 文件命名:为其包含的第一条消息的 offset。第一个文件名是 00000000000000000000.log;后续文件名与上一个相差约 S 字节(S 为配置的最大日志文件大小)。

Kafka 对 record batch 的二进制格式做了版本化与标准接口治理,以便生产者、broker 与客户端之间在需要时零拷贝传递而无需转换

为何直接用 offset 作为消息 ID?

最初方案:生产者生成 GUID,broker 维护 GUID→offset 映射。但这会引入复杂且持久化的随机访问索引;同时消费者本就按分区维护位点,全局唯一的价值有限。
最终选择:每分区单调计数器 → 直接使用 offset。offset 在分区内唯一且单调,且对消费者 API 是实现细节,因此采用更高效的 offset 方案

写入(Writes)

  • 顺序追加最后一个日志文件

  • 文件达到阈值(如 1GB)后滚动新文件。

  • 两个耐久性参数:

    • M:每写入 M 条消息后,强制让 OS 刷盘
    • S:每经过 S 秒强制刷盘。
  • 崩溃容限:最坏情况下丢失不超过 M 条或 S 秒的数据

读取(Reads)

  • 客户端给定64 位逻辑 offset最大返回字节数 S,得到一个迭代器遍历返回缓冲区中的消息。

  • 若遇异常大消息,可倍增缓冲区重试直到读全。

  • 可设置最大消息大小最大缓冲区,限制过大的消息与必要的读取上限。

  • 定位流程:

    1. 找到包含该 offset 的段文件
    2. 计算段内偏移
    3. 从该偏移读取。
    • 段定位使用对内存中段范围的二分查找变体
  • 从“现在”开始消费:可直接获取最新写入位置。

  • 若消费者落后超出保留范围,读取不存在的 offset 会得到 OutOfRangeException——应用可选择重置失败

返回给消费者的结果格式:

  • MessageSetSend(单分区 fetch 结果)

    total length     : 4 bytes
    error code       : 2 bytes
    message 1        : x bytes
    ...
    message n        : x bytes
    
  • MultiMessageSetSend(multiFetch 结果)

    total length       : 4 bytes
    error code         : 2 bytes
    messageSetSend 1
    ...
    messageSetSend n
    

读取缓冲区的末尾常常是部分消息,但因前缀带长度,可轻易检测与修复(下次拉取时续上)。

删除(Deletes)

  • 以日志段为单位删除

  • 两种删除策略:

    1. 时间:看记录的时间戳,段内最大时间戳决定该段的保留时间(记录顺序无关)。
    2. 大小(默认关闭):当开启且总大小超限时,不断删除最老段直至回到限制内。
  • 并行开启时,任一策略触发即可删除。

  • 段列表使用写时复制(copy-on-write),使删除操作不阻塞读;二分查找永远在一致快照上进行。

启动恢复与一致性保证(Guarantees)

  • 参数 M 限制最多未刷盘的消息数。

  • 启动时会运行日志恢复:遍历最新段的所有消息,逐条校验:

    • size + offset < 文件长度,且
    • payload 的 CRC32 与存储的一致。
  • 若发现损坏,则将日志截断到最后一个有效 offset

需要处理两种损坏:

  1. 截断:未写入的数据块因崩溃丢失;
  2. 伪增:由于 inode 与数据块写入次序无保证,可能出现文件大小增长但数据块尚未写入而崩溃,导致“凭空多出垃圾块”。

CRC 能识别第二种情况,避免将垃圾块当作真实消息(但未写入的消息仍会丢失)。

5 分布式:消费组位点(Offsets)的提交与获取

组协调者与发现流程

消费者会跟踪每个分区已消费的最大 offset,并可提交 offset以便重启恢复。Kafka 将某消费组的所有 offsets 存在其**组协调者(group coordinator)**所在 broker 上。

  • 组到协调者是按组名映射的;
  • 消费者可向任意 broker 发送 FindCoordinatorRequest,从 FindCoordinatorResponse 获取协调者地址
  • 协调者变更时,消费者需重新发现

offset 提交既可自动也可手动

Offset 提交:写入 __consumer_offsets(紧凑主题)

  • 协调者收到 OffsetCommitRequest 后,会把请求追加到特殊紧凑主题 __consumer_offsets
  • 只有当该主题的所有副本都成功写入后,才返回提交成功
  • 若在可配置超时内复制未完成,提交失败,消费者应退避重试
  • 由于只需保留每个分区最近一次提交,brokers 会定期压缩该主题。
  • 协调者还会把 offsets 缓存到内存表以加速OffsetFetch

Offset 获取:来自缓存的“最后提交值”

  • 协调者收到 OffsetFetch 请求时,直接从缓存返回最后提交的 offsets。

  • 若协调者刚启动刚成为 __consumer_offsets 某分区的 leader,需要从该分区加载到缓存;

    • 此时会返回 CoordinatorLoadInProgressException,消费者应退避后重试 OffsetFetchRequest

6.把“格式细节”转化为工程可用的能力

本文以“完整、不省略”为原则,覆盖了 Kafka Implementation 章节的关键实现:

  • 网络层如何通过 NIO + sendfile 撑起高吞吐;
  • Record/RecordBatch精确二进制格式与事务/压缩/控制批的边界;
  • 日志文件布局、顺序+二分分段删除启动恢复校验
  • 消费组位点如何通过协调者与 __consumer_offsets 主题实现可靠提交与高效获取

文章转载自:

http://0ot4W3dE.bnmfq.cn
http://hU9SdAGc.bnmfq.cn
http://sPXrrUiN.bnmfq.cn
http://fTVPM4Tz.bnmfq.cn
http://UoOAolzN.bnmfq.cn
http://oYRUFnsx.bnmfq.cn
http://yo6fYdL3.bnmfq.cn
http://7MkQZFCr.bnmfq.cn
http://NnZk3xnK.bnmfq.cn
http://5bd3tSDs.bnmfq.cn
http://9ANJoByK.bnmfq.cn
http://aNL94Os4.bnmfq.cn
http://4t8JF1M4.bnmfq.cn
http://s5hK2cMu.bnmfq.cn
http://m5HgGXii.bnmfq.cn
http://Ob5SHlQc.bnmfq.cn
http://WasT37aV.bnmfq.cn
http://mBtkPowB.bnmfq.cn
http://Y3OepWpR.bnmfq.cn
http://X5LQeSNs.bnmfq.cn
http://sWNMWVPP.bnmfq.cn
http://SCfkjASG.bnmfq.cn
http://vRZFbKjC.bnmfq.cn
http://Ho55BOCr.bnmfq.cn
http://JE4bOdjI.bnmfq.cn
http://h10DfXdg.bnmfq.cn
http://X92poguo.bnmfq.cn
http://uXTmaeZv.bnmfq.cn
http://5eRGWreA.bnmfq.cn
http://GLjRZ492.bnmfq.cn
http://www.dtcms.com/a/383005.html

相关文章:

  • Python入门教程之赋值运算符
  • 机器学习系统设计:从需求分析到模型部署的完整项目流程
  • SpringMVC架构解析:从入门到精通(1)
  • Why Language Models Hallucinate 论文翻译
  • 从 WPF 到 Avalonia 的迁移系列实战篇5:Trigger、MultiTrigger、DataTrigger 的迁移
  • easyExcel动态应用案例
  • 目标计数论文阅读(2)Learning To Count Everything
  • 贪心算法应用:速率单调调度(RMS)问题详解
  • 【传奇开心果系列】基于Flet框架实现的用窗口管理器动态集中管理多窗口自定义组件模板特色和实现原理深度分析
  • [Android] 汉语大辞典3.2
  • 《嵌入式硬件(八):基于IMX6ULL的点灯操作》
  • css的基本知识
  • AOP 切面日志详细
  • 软件工程实践二:Spring Boot 知识回顾
  • 从美光暂停报价看存储市场博弈,2026年冲突加剧!
  • Bean.
  • Kafka 入门指南:从 0 到 1 构建你的 Kafka 知识基础入门体系
  • 从qwen3-next学习大模型前沿架构
  • 【Linux】深入Linux多线程架构与高性能编程
  • Python爬虫-爬取拉勾网招聘数据
  • Python|Pyppeteer解决Pyppeteer启动后,页面一直显示加载中,并显示转圈卡死的问题(37)
  • C++_STL和数据结构《1》_STL、STL_迭代器、c++中的模版、STL_vecto、列表初始化、三个算法、链表
  • 【计算机网络 | 第16篇】DNS域名工作原理
  • C++算法题中的输入输出形式(I/O)
  • 【算法详解】:编程中的“无限”可能,驾驭超大数的艺术—高精度算法
  • Linux基础开发工具(gcc/g++,yum,vim,make/makefile)
  • NLP:Transformer之多头注意力(特别分享4)
  • arm芯片的功能优化方案
  • 【C++】动态数组vector的使用
  • 软件工程实践三:RESTful API 设计原则