Kafka 实现从网络层到日志与位点的“全景拆解”
1 网络层:NIO、多路复用与 sendfile
Kafka 的网络层是一个相当直接的 NIO 服务器。为高效利用零拷贝,Kafka 让 TransferableRecords
接口提供 writeTo
方法,使基于文件的 message set 可以走更高效的 transferTo
路径,而不是进程内缓冲写。
- 线程模型:1 个 acceptor 线程 + N 个 processor 线程;每个 processor 维护固定数量的连接。
- 协议目标:保持尽量简单,便于未来用其他语言实现客户端。
- sendfile 路径:通过
transferTo
将内核页缓存里的数据直接写入套接字,减少用户态/内核态拷贝。
这套模型在实践中已广泛验证:实现简单、性能稳定。
2 消息抽象:Record、Key/Value 与 RecordBatch
一条消息(Record)由三部分构成:
- 变长 Header(头)
- 变长不透明 key(字节数组)
- 变长不透明 value(字节数组)
Kafka 不限定 key/value 的具体序列化格式——这使其能适配 JSON/Avro/ProtoBuf/自定义二进制等多种序列化方案。上层应用通常会统一一种序列化规范。
RecordBatch
是对消息的迭代器,并提供面向 NIO Channel 的批量读写专用方法——这是 Kafka 达到高吞吐的关键。
3 消息格式(Message Format):一切皆批
Kafka 的消息总是以批(batch)形式写入。批里包含 1 条或多条 Record(退化情形仅 1 条)。RecordBatch 与 Record 都有自己的头/体格式。
3.1 RecordBatch:磁盘格式与压缩、事务、CRC
磁盘上的 RecordBatch 格式:
baseOffset: int64
batchLength: int32
partitionLeaderEpoch: int32
magic: int8 (当前 magic 值为 2)
crc: uint32
attributes: int16bit 0~2:0: 无压缩1: gzip2: snappy3: lz44: zstdbit 3: timestampTypebit 4: isTransactional (0 表示非事务批)bit 5: isControlBatch (0 表示非控制批)bit 6: hasDeleteHorizonMs (0 表示 baseTimestamp 不是压缩删除边界)bit 7~15: 未使用
lastOffsetDelta: int32
baseTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
recordsCount: int32
records: [Record]
- 压缩:启用压缩时,压缩后的记录数据紧跟在
recordsCount
后写入。 - CRC 覆盖范围:从
attributes
到批末尾(即 CRC 之后的所有字节)。 - CRC 位置:在
magic
之后,因此客户端必须先解析magic
再决定如何解释batchLength
与magic
之间的字节。 - CRC 计算:CRC-32C(Castagnoli)。
- 为何
partitionLeaderEpoch
不计入 CRC:避免 broker 为每个收到的批分配该字段时重复计算 CRC。
清理(Compaction)与生产者幂等状态
-
必须保留批的首/尾 offset 与 sequence,以便日志重放时恢复生产者状态。
- 若不保留最后序列号,分区主副本切换后,生产者可能遇到
OutOfSequence
。 - broker 通过首/尾序列号做重复检查。
- 若不保留最后序列号,分区主副本切换后,生产者可能遇到
-
因此可能出现空批(批内记录都被清理,但为保留生产者最后序列号而保留批的元信息)。
-
注意:
baseTimestamp
在压缩时不保留,若第一条记录被清理,它会变化。
删除地平线(Delete Horizon)与特殊记录
-
当批内存在空 payload记录或事务中止标记时,清理也可能修改
baseTimestamp
:- 将其设为这些记录应被删除的时间,
- 并将 attributes 中的 delete horizon 位 置位。
3.1.1 控制批(Control Batches)
控制批只含一条控制记录(control record),它不会传递给应用,而是用于帮助消费者过滤已中止事务的消息。
-
控制记录 key schema:
version: int16 (当前为 0) type: int16 (0 表示 abort 标记;1 表示 commit)
-
控制记录 value:随类型而变,对客户端不透明。
3.2 Record:变长编码与 Header
磁盘上的 Record 格式:
length: varint
attributes: int8bit 0~7: 未使用
timestampDelta: varlong
offsetDelta: varint
keyLength: varint
key: byte[]
valueLength: varint
value: byte[]
headersCount: varint
Headers => [Header]
3.2.1 Record Header
headerKeyLength: varint
headerKey: String
headerValueLength: varint
Value: byte[]
- varint 编码与 Protobuf 相同(headers 的数量也使用 varint)。
3.3 旧消息格式(Old Message Format)
在 Kafka 0.11 之前,消息通过 message set 进行传输与存储。详情见 Old Message Format。
4 日志(Log):文件布局、读写路径与删除策略
以主题 my-topic
、2 个分区为例,磁盘上会有两个目录:my-topic-0
与 my-topic-1
。每个目录包含数据文件,文件由**“日志条目”**顺序组成:
- 条目以 **4 字节整数 N(消息长度)**开头,后接 N 字节的消息内容。
- 64 位 offset 是该分区“自始至今的消息字节流”中本条消息起始的字节偏移,在分区内唯一且单调递增。
- 文件命名:为其包含的第一条消息的 offset。第一个文件名是
00000000000000000000.log
;后续文件名与上一个相差约 S 字节(S 为配置的最大日志文件大小)。
Kafka 对 record batch 的二进制格式做了版本化与标准接口治理,以便生产者、broker 与客户端之间在需要时零拷贝传递而无需转换。
为何直接用 offset 作为消息 ID?
最初方案:生产者生成 GUID,broker 维护 GUID→offset 映射。但这会引入复杂且持久化的随机访问索引;同时消费者本就按分区维护位点,全局唯一的价值有限。
最终选择:每分区单调计数器 → 直接使用 offset。offset 在分区内唯一且单调,且对消费者 API 是实现细节,因此采用更高效的 offset 方案。
写入(Writes)
-
顺序追加到最后一个日志文件。
-
文件达到阈值(如 1GB)后滚动新文件。
-
两个耐久性参数:
- M:每写入 M 条消息后,强制让 OS 刷盘;
- S:每经过 S 秒强制刷盘。
-
崩溃容限:最坏情况下丢失不超过 M 条或 S 秒的数据。
读取(Reads)
-
客户端给定64 位逻辑 offset与最大返回字节数 S,得到一个迭代器遍历返回缓冲区中的消息。
-
若遇异常大消息,可倍增缓冲区重试直到读全。
-
可设置最大消息大小与最大缓冲区,限制过大的消息与必要的读取上限。
-
定位流程:
- 找到包含该 offset 的段文件;
- 计算段内偏移;
- 从该偏移读取。
- 段定位使用对内存中段范围的二分查找变体。
-
从“现在”开始消费:可直接获取最新写入位置。
-
若消费者落后超出保留范围,读取不存在的 offset 会得到 OutOfRangeException——应用可选择重置或失败。
返回给消费者的结果格式:
-
MessageSetSend
(单分区 fetch 结果)total length : 4 bytes error code : 2 bytes message 1 : x bytes ... message n : x bytes
-
MultiMessageSetSend
(multiFetch 结果)total length : 4 bytes error code : 2 bytes messageSetSend 1 ... messageSetSend n
读取缓冲区的末尾常常是部分消息,但因前缀带长度,可轻易检测与修复(下次拉取时续上)。
删除(Deletes)
-
以日志段为单位删除。
-
两种删除策略:
- 时间:看记录的时间戳,段内最大时间戳决定该段的保留时间(记录顺序无关)。
- 大小(默认关闭):当开启且总大小超限时,不断删除最老段直至回到限制内。
-
并行开启时,任一策略触发即可删除。
-
段列表使用写时复制(copy-on-write),使删除操作不阻塞读;二分查找永远在一致快照上进行。
启动恢复与一致性保证(Guarantees)
-
参数 M 限制最多未刷盘的消息数。
-
启动时会运行日志恢复:遍历最新段的所有消息,逐条校验:
size + offset < 文件长度
,且- payload 的 CRC32 与存储的一致。
-
若发现损坏,则将日志截断到最后一个有效 offset。
需要处理两种损坏:
- 截断:未写入的数据块因崩溃丢失;
- 伪增:由于 inode 与数据块写入次序无保证,可能出现文件大小增长但数据块尚未写入而崩溃,导致“凭空多出垃圾块”。
CRC 能识别第二种情况,避免将垃圾块当作真实消息(但未写入的消息仍会丢失)。
5 分布式:消费组位点(Offsets)的提交与获取
组协调者与发现流程
消费者会跟踪每个分区已消费的最大 offset,并可提交 offset以便重启恢复。Kafka 将某消费组的所有 offsets 存在其**组协调者(group coordinator)**所在 broker 上。
- 组到协调者是按组名映射的;
- 消费者可向任意 broker 发送
FindCoordinatorRequest
,从FindCoordinatorResponse
获取协调者地址; - 协调者变更时,消费者需重新发现。
offset 提交既可自动也可手动。
Offset 提交:写入 __consumer_offsets
(紧凑主题)
- 协调者收到
OffsetCommitRequest
后,会把请求追加到特殊紧凑主题__consumer_offsets
。 - 只有当该主题的所有副本都成功写入后,才返回提交成功。
- 若在可配置超时内复制未完成,提交失败,消费者应退避重试。
- 由于只需保留每个分区最近一次提交,brokers 会定期压缩该主题。
- 协调者还会把 offsets 缓存到内存表以加速OffsetFetch。
Offset 获取:来自缓存的“最后提交值”
-
协调者收到 OffsetFetch 请求时,直接从缓存返回最后提交的 offsets。
-
若协调者刚启动或刚成为
__consumer_offsets
某分区的 leader,需要从该分区加载到缓存;- 此时会返回
CoordinatorLoadInProgressException
,消费者应退避后重试OffsetFetchRequest
。
- 此时会返回
6.把“格式细节”转化为工程可用的能力
本文以“完整、不省略”为原则,覆盖了 Kafka Implementation 章节的关键实现:
- 网络层如何通过 NIO + sendfile 撑起高吞吐;
- Record/RecordBatch 的精确二进制格式与事务/压缩/控制批的边界;
- 日志文件布局、顺序写+二分读、分段删除与启动恢复校验;
- 消费组位点如何通过协调者与
__consumer_offsets
主题实现可靠提交与高效获取。