当前位置: 首页 > news >正文

Kafka 中的幂等机制

Kafka 中的 幂等性(Idempotence) 是生产者端的重要机制,旨在确保即使在网络抖动、重试、Broker 重启等情况下,同一条消息不会被重复写入到 Topic 中。这是实现可靠消息传递、避免重复消费的关键手段之一。

✅ 什么是幂等性?

简单说:无论一个操作执行多少次,结果都是一样的。

在 Kafka 中,幂等性意味着:

相同的消息,即使发送多次,也只会被写入一次,且不会重复出现在日志中。

✅ Kafka 幂等性的作用场景

生产者可能会因为以下情况 重试发送 消息:

  • 网络超时,未收到 Broker 的 ack;
  • Kafka Broker 重启;
  • 客户端主动重试(retries > 0);
  • Leader 重新选举。

这些重试可能会导致:同一条消息写入多次,从而带来“重复消费”的问题。

Kafka 的幂等性功能可以自动解决这个问题,不用你在应用层手动做去重。

✅ 如何开启幂等性?

从 Kafka 0.11 版本开始支持幂等性。

✔ 开启方式

Kafka 2.0 版本之后,幂等性可以通过如下方式开启:

Properties props = new Properties();
props.put("enable.idempotence", "true");  // ✅ 显式开启
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

注意

  • acks=all 是开启幂等性的默认要求。
  • Kafka 2.5+ 中,enable.idempotence 默认就是 true。

✅ Kafka 是如何实现幂等性的?

Kafka 利用了以下几个机制:

1. Producer ID(PID)

  • 每个生产者初始化时,Kafka 分配一个唯一的 PID(Producer ID)。
  • Kafka 会记住这个 PID 发给哪个 Partition 了哪些消息。

2. Sequence Number(序列号)

  • Kafka 给每个消息分配一个自增的 Sequence Number每个 Partition 单独维护
  • Broker 在每个 Partition 中,记录下最近收到的 PID 和对应的序号。

✨ Kafka 判断是否是重复消息的规则:

如果某个 PID + Partition 下,收到一条消息,其 Sequence Number 是重复的或小于上一次的,说明是重试的重复消息,Kafka 会自动丢弃它

✅ 幂等性 vs 事务,有什么区别?

特性幂等性(Idempotence)事务(Transaction)
作用避免消息重复写入保证多条消息的原子提交
粒度单条消息一组消息
范围单个 partition、单个 producer多 partition、消费者偏移、多个 Topic
是否有回滚❌ 无✅ 有
消费者是否感知❌ 不感知✅ read_committed 下感知

可以理解为:

幂等性是事务的基础。Kafka 启用事务时,会自动启用幂等性,但单独开启幂等性不等于开启事务。

✅ 使用幂等性的推荐配置

enable.idempotence=true      ✅ 开启幂等性
acks=all                     ✅ 所有副本都要确认
retries=Integer.MAX_VALUE    ✅ 无限重试,确保最终写入成功
max.in.flight.requests.per.connection=1(旧版本)
                             ✅ 限制同时请求数,确保顺序(Kafka 2.4+ 可放宽为5)

⚠️ 若你设置 max.in.flight.requests.per.connection > 1,在旧版本 Kafka(<2.4)中可能会造成乱序+重复写入,不再幂等

✅ 总结一句话

Kafka 幂等性 = 在网络失败或客户端重试时,确保消息只被写入一次,自动去重,避免重复消费问题。

它是 实现可靠消息系统的第一步,在开启事务或处理金融、支付等关键数据时非常重要。

相关文章:

  • SQLI打靶
  • 【嵌入式学习6】多任务版TCP服务器
  • 玄机-第六章-哥斯拉4.0流量分析的测试报告
  • 盛水最多的容器
  • Kafka负载均衡挑战解决
  • Jupyter Notebook不能自动打开默认浏览器怎么办?
  • IDEA快速入门
  • Airflow集成Lark机器人
  • 深入理解PCA降维:原理、实现与应用
  • 【Introduction to Reinforcement Learning】翻译解读2
  • Spring Boot 3.x 集成 MongoDB 的 默认配置项及默认值,以及 常用需要修改的配置项 的详细说明
  • nacos集群启动问题
  • CAS号:288574-78-7,Zinpyr-1可用作PET传感器
  • 【数据分享】2014-2025年全国监测站点的逐时空气质量数据(15个指标\Excel\Shp格式)
  • (PROFINET 转 EtherCAT)EtherCAT/Ethernet/IP/Profinet/ModbusTCP协议互转工业串口网关
  • Linux终止进程(kill process)的一些玩法
  • Jetpack Compose 基础组件学习2.0
  • SVT-AV1学习-svt_aom_get_sg_filter_level,svt_av1_selfguided_restoration_c
  • 算法与数据结构线性表之栈和队列
  • MongoDB及Yapi迁移数据
  • 外包软件公司/合肥seo网络营销推广
  • 小程序模板网 凡平台/seo技巧与技术
  • 网店客服外包一般多少钱/上海seo推广公司