当前位置: 首页 > news >正文

【kafka系列】Exactly Once语义

目录

1. Exactly-Once语义的定义

2. Kafka实现Exactly-Once的机制

3. 端到端Exactly-Once示例

场景描述

3.1 生产者配置与代码

3.2 消费者配置与代码

4. 异常场景与Exactly-Once保障

场景1:生产者发送消息后宕机

场景2:消费者处理消息后宕机

场景3:Broker宕机

5. 关键实现细节

6. 总结


1. Exactly-Once语义的定义

Exactly-Once(精确一次)语义指:消息从生产到消费的整个生命周期中,每条消息被严格处理且仅处理一次。即使在生产者重试、Broker故障或消费者重启等场景下,也能避免数据重复或丢失。


2. Kafka实现Exactly-Once的机制

Kafka通过以下三部分实现Exactly-Once:

  1. 幂等性生产者(Idempotent Producer)
    • 确保单分区内消息不重复(通过PIDSequence Number)。
  1. 事务(Transactions)
    • 跨分区的原子性写入(通过两阶段提交和事务协调器)。
  1. 消费者端去重(Consumer Deduplication)
    • 结合事务和外部存储(如数据库)实现端到端精确一次。

3. 端到端Exactly-Once示例

场景描述

一个订单处理系统:

  • 生产者:发送订单支付消息到Topic orders
  • 消费者:消费消息,扣减用户账户余额,并将结果写入数据库。
    要求:订单支付消息必须被精确处理一次(避免重复扣款)。

3.1 生产者配置与代码
// 生产者配置
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("acks", "all");
props.put("enable.idempotence", "true"); // 开启幂等性
props.put("transactional.id", "order-producer"); // 必须设置事务ID

// 初始化事务
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

try {
    producer.beginTransaction();

    // 发送订单消息到Topic orders
    producer.send(new ProducerRecord<>("orders", "order-1001", "支付100元"));

    // 其他操作(如写入其他Topic)
    producer.send(new ProducerRecord<>("audit_log", "order-1001", "已处理"));

    producer.commitTransaction(); // 提交事务
} catch (Exception e) {
    producer.abortTransaction(); // 中止事务
    throw e;
}

3.2 消费者配置与代码
// 消费者配置
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "order-group");
props.put("isolation.level", "read_committed"); // 仅消费已提交的事务消息

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 1. 检查订单是否已处理(数据库去重)
        if (!isOrderProcessed(record.key())) {
            // 2. 扣减账户余额
            deductBalance(record.key(), record.value());
            // 3. 记录处理状态到数据库(原子操作)
            markOrderAsProcessed(record.key());
        }
        // 4. 手动提交Offset(事务性提交)
        consumer.commitSync();
    }
}

4. 异常场景与Exactly-Once保障

场景1:生产者发送消息后宕机
  • 问题:生产者未提交事务,消息未标记为已提交。
  • 结果:消费者不会读取到该消息,事务协调器自动回滚。
场景2:消费者处理消息后宕机
  • 问题:消费者已扣款但未提交Offset。
  • 结果:消费者重启后从上次提交的Offset重新拉取消息,但数据库已记录处理状态,通过isOrderProcessed()检查避免重复扣款。
场景3:Broker宕机
  • 问题:事务日志和消息日志通过副本机制持久化,新Leader继续处理事务。

5. 关键实现细节

  1. 生产者端
    • 事务ID(transactional.id)唯一标识生产者,协调器通过它恢复事务状态。
    • 两阶段提交确保所有消息原子性写入。
  1. 消费者端
    • isolation.level=read_committed:跳过未提交的事务消息。
    • 外部去重:依赖数据库唯一键或幂等操作(如INSERT IGNORE)。
  1. 端到端保障
    • 生产者事务 + 消费者外部去重 = 完整的Exactly-Once语义。

6. 总结

通过以下组合实现Exactly-Once:

  • 生产者幂等性:避免单分区消息重复。
  • 跨分区事务:确保多消息原子性写入。
  • 消费者去重:依赖外部存储或业务逻辑幂等性。

正确配置后,Kafka可支持金融支付、实时对账等对数据一致性要求极高的场景。

相关文章:

  • DeepSeek进阶开发与应用2:DeepSeek中的自定义层与复杂模型构建
  • 【AI】Docker中快速部署Ollama并安装DeepSeek-R1模型: 一步步指南
  • SpringBoot教程(三十二) SpringBoot集成Skywalking链路跟踪
  • 如何优雅地使用全局标志位
  • servlet中的ServletContext
  • 【D2】神经网络初步学习
  • dfs深度优先搜索—邻接矩阵 + 邻接矩阵-递归版 + 邻接表
  • 基于Flask的茶叶销售数据可视化分析系统设计与实现
  • 一. vue2和vue3的Proxy底层源码详细拆解
  • Kepware的OPC UA配置深入介绍
  • C++ 中将类的定义和实现都放在头文件中的优缺点分析
  • 【20250215】二叉树:94.二叉树的中序遍历
  • 深入理解Elasticsearch集群与分片:原理及配置方案
  • 【硬件设计细节】缓冲驱动器使用注意事项
  • Springboot项目:使用MockMvc测试get和post接口(含单个和多个请求参数场景)
  • Git 本地项目上传 GitHub 全指南(SSH Token 两种上传方式详细讲解)
  • 代码随想录刷题攻略---动态规划---子序列问题1---子序列
  • 计算机视觉+Numpy和OpenCV入门
  • Plaid | 数据库切换历程:从 AWS Aurora MySQL 到 TiDB 的迁移之旅
  • ⚡️《静电刺客的猎杀手册:芯片世界里的“千伏惊魂“》⚡️
  • 内塔尼亚胡:以军将在未来几天“全力进入”加沙
  • 持续8年仍难终了的纠纷:败诉方因拒执罪被立案,胜诉方银行账户遭冻结
  • 从普通人经历中发现历史,王笛解读《线索与痕迹》
  • 印称印巴军事行动总指挥同意将局势降级
  • 时隔近4年再出征!长三丙成功发射通信技术试验卫星十九号
  • 5月12日-14日,上海小升初民办初中进行网上报名