当前位置: 首页 > news >正文

Kafka生产者事务机制原理

【博客】
Kafka生产者事务机制原理


一、为什么要引入事务?

在使用 Kafka 的早期版本时,开发者经常会遇到两种场景:

  1. 跨会话重复消息
    Producer 重启后,之前的重试逻辑会导致同一条消息被再次发送,消费者需要做幂等处理。

  2. 跨分区原子性缺失
    一批消息要同时写入多个 Topic / 多个 Partition,如果某一条失败,前面成功的消息无法回滚,业务数据出现“中间状态”。

Kafka 在 0.11.0.0 引入的事务(Transactions)正是为了解决“恰好一次(Exactly-Once)语义”的痛点,同时兼顾跨会话幂等性、跨分区原子性和 consume-process-produce 模式的一致性。


二、事务的四大核心目标

目标说明
原子性一组消息要么全部成功,要么全部失败。
跨会话幂等Producer 重启后,仍能识别并去重“上一次未完成的事务”。
一致性consume-process-produce 模式下,消费位点与下游发送结果保持一致。
隔离性事务未提交的消息对消费者不可见,防止脏读。

三、事务 API 速查表

Kafka Producer 端只提供了 5 个与事务相关的方法,掌握它们就能完成 90% 的编程需求:

方法作用
initTransactions()向 Coordinator 注册全局唯一 transactional.id,做初始化。
beginTransaction()显式开启一个事务。
sendOffsetsToTransaction()把消费者 offset 作为事务的一部分提交,用于 consume-process-produce 模式。
commitTransaction()全部成功,两阶段提交中的“真正提交”。
abortTransaction()出现异常,回滚当前事务。

Spring Boot 用户可以用 @TransactionalkafkaTemplate.executeInTransaction() 进行声明式/编程式事务,原理一致。


四、事务运行流程(两阶段提交 2PC)

Kafka 没有照搬传统 XA 的复杂协议,而是基于内部 Topic 实现了一个轻量级 2PC。

1. 组件角色

角色说明
Producer业务进程,负责发送消息。
Transaction Coordinator一个 Broker 内的模块,充当 2PC 的协调者。
__transaction_state内部 Topic,持久化事务状态(Ongoing → Prepare → Commit/Abort)。
目标 Topic-Partition最终存放业务数据。

2. Kafka事务机制原理

源码位置:org.apache.kafka.clients.producer.internals.TransactionManager,画出Kafka 事务 2PC 全景

┌────────────────────────────────────────────────────────────┐
│                     Kafka 事务 2PC 全景                    │
├───────────────┐   ┌──────────────────┐   ┌──────────────────┐
│   Producer    │   │Transaction       │   │   Brokers        │
│(transaction.id│──▶│Coordinator(TC)   │◀──┤(Data partitions)│
└───────────────┘   └──────────────────┘   └──────────────────┘│                      │                     ││ 1. initTransactions  │                     ││--------------------->2. 写 __transaction_state│                      │   topic 记录 BEGIN│ 3. send()            │                     ││-----------------------------4. 写消息到目标分区│                      │                     ││ 5. commitTransaction │                     ││--------------------->6. 写 PREPARE_COMMIT│                      │                     ││                      │ 7. 给各分区写 COMMIT 标记│                      │◀────────────────────┘│                      │ 8. 写 __transaction_state│                      │   记录 COMMITTED
  1. 初始化阶段
    initTransactions() → 找到 Coordinator → 注册 transactional.id,幂等 Producer 自动开启(enable.idempotence=true)。

  2. 开始事务
    beginTransaction() 仅在客户端打一个标记,不会立即与 Broker 交互。

  3. 发送消息
    调用 producer.send(),消息并未直接写入目标分区,而是暂存客户端的 RecordAccumulator,并标记为事务消息。

  4. 预提交(Prepare)
    客户端 flush 或 commitTransaction() 时,Coordinator 收到 EndTxn(Prepare),把事务状态写入 __transaction_state,并向所有涉及的 Topic-Partition 写入 事务控制消息(Control Batch)。

  5. 正式提交(Commit)
    Coordinator 收到所有 Partition 的 ACK 后,写入 __transaction_state 的 Commit 标记,并向各 Partition Leader 发送 COMMIT Marker
    消费者只有在看到 COMMIT Marker 后,才能看到这批消息。至此事务对外可见。

  6. 异常回滚(Abort)
    任何一步失败,Producer 捕获异常后调用 abortTransaction(),流程同上,只是把标记改成 ABORT,消息对消费者永久不可见。

__transaction_state 状态有ongoingprepare committed,和对应操作的具体图示:

Producer            Transaction Coordinator            日志 & 分区|                       |                               ||--- init(t.id) ------>|--- 记录事务ID ---------------->||                       |                               ||--- begin() --------->|--- 状态=ongoing -------------->||                       |                               ||--- send 消息 -------->|--- 写入未提交数据 ------------>||                       |                               ||--- commit() -------->|--- 状态=prepare -------------->||                       |--- 写 commit marker ---------->||                       |--- 状态=committed ------------>|

生产者、Transactions Coordinator的相互作用图示:
在这里插入图片描述
A:生产者通过initTransactions API向Coordinator 注册事务ID
B:Transactions Coordinator 记录事务日志
C:生产者把消息写入分区
D:分区和Coordinator的交互。(当事务完成以后,消息的状态应该是已提交,消费者才可以消费)


五、代码实现

原生 API

// 1. 配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
props.put("transactional.id", "order-tx-" + UUID.randomUUID());
props.put("enable.idempotence", "true");        // 自动开启幂等
props.put("isolation.level", "read_committed"); // 消费者只读已提交KafkaProducer<String, String> producer = new KafkaProducer<>(props);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);// 2. 初始化
producer.initTransactions();while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));if (records.isEmpty()) continue;producer.beginTransaction();try {for (ConsumerRecord<String, String> r : records) {String newVal = transform(r.value());   // 业务逻辑producer.send(new ProducerRecord<>("target-topic", r.key(), newVal));}// 把消费位点也放进事务producer.sendOffsetsToTransaction(offsets(records), new ConsumerGroupMetadata("myGroup"));producer.commitTransaction();} catch (Exception e) {producer.abortTransaction();}
}

Spring Boot(声明式)

@Component
public class OrderListener {@Autowiredprivate KafkaTemplate<String, String> template;@KafkaListener(topics = "order-in")public void listen(ConsumerRecord<String, String> record,Acknowledgment ack) {template.executeInTransaction(t -> {try {// 1. 业务String newVal = processOrder(record.value());// 2. 写下游t.send("order-out", record.key(), newVal);// 3. 提交 offsett.sendOffsetsToTransaction(Map.of(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1)),new ConsumerGroupMetadata("order-group"));return null;} catch (Exception e) {throw new KafkaException("事务失败", e); // 触发回滚}});}private String processOrder(String json) {// 业务逻辑return json.toUpperCase();}
}

六、小结

Kafka 事务 = 幂等 Producer + 两阶段提交 + 内部 Topic 日志。
掌握 init → begin → commit/abort 三步曲,即可获得消息层面的 ACID。
🚀 下一步:把本地数据库事务与 Kafka 事务组合,实现真正的 端到端 Exactly-Once。用思维导图总结本博客内容:

在这里插入图片描述

http://www.dtcms.com/a/320743.html

相关文章:

  • Java集合中的链表
  • 解耦主库负载,赋能数据流转:MySQL Binlog Server 核心指南
  • Web 图像捕获革命:ImageCapture API 全面解析与实战指南
  • mt6897 scp a+g sh5201 porting记录
  • 数据结构:哈希表、排序和查找
  • 光子精密3D工业相机的应用与优势解析
  • CS231n2017 Assignment3 PyTorch部分
  • 代理模式在C++中的实现及面向对象设计原则的满足
  • 利用哥斯拉(Godzilla)进行文件上传漏洞渗透实战分析
  • ​「解决方案」Linux 无法在 NTFS 硬盘上创建文件/文件夹的问题
  • C++多态与虚函数的原理解析
  • MySQL的触发器:
  • 虹科技术分享 | LIN总线译码功能与LIN控制交流发电机(二)
  • 灌区信息化智能管理系统解决方案
  • 计算机视觉CS231n学习(5)
  • AI开发平台行业全景分析与战略方向建议
  • C++归并排序
  • 使用 Python GUI 工具创建安全的密码短语
  • tmi8150b在VM=3.3v电压下,如何提高转速,记录
  • 高性能 Vue 应用运行时策略
  • 仓颉编程语言的match表达式
  • 《算法导论》第 12 章 - 二叉搜索树
  • 【量子计算】量子计算驱动AI跃迁:2025年算法革命的曙光
  • conda pip uv与pixi
  • SpringCloud(4)-多机部署,负载均衡-LoadBalance
  • ASP.NET三层架构成绩管理系统源码
  • HBase的异步WAL性能优化:RingBuffer的奥秘
  • 深度虚值期权合约有什么特点?
  • InfoNCE 损失
  • 企微消息机器人推送配置-windows+python