Kafka生产者详解(下):数据去重(幂等性)与数据有序
数据可靠性
acks的不同情况:
- ack = 0,生产者发送的数据不需要等待数据落盘就应答。有可能导致数据在发送到leader,但leader故障时丢失
- ack = 1,leader收到数据并且数据落盘后才应答。有可能数据没有从leader同步到follower时leader挂掉,也会导致数据的丢失。
- ack = -1,生产者发送过来的数据,Leader和ISR队列里面 的所有节点收齐数据后应答。但follower可能存在情况迟迟无法同步消息,解决方法: Leader维护了一个动态的in-sync replica set(ISR),意为和 Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。 如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如2超时,(leader:0, isr:0,1)。
可靠性总结:
- acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
- acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
- acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。
但ack = -1时会出现数据重复的场景。在数据发送到leader并同步到follower时,leader在准备返回ack时挂了,导致某个follower成为新的leader。这时系统认为消息发送失败了,再将同一个消息发送到新的leader上,这就会导致消息重复。
数据去重
数据传递语义
- 至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
- 最多一次(At Most Once)= ACK级别设置为0
总结: At Least Once可以保证数据不丢失,但是不能保证数据不重复; At Most Once可以保证数据不重复,但是不能保证数据不丢失。
- 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。 Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。
幂等性
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2)。
重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。
其中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。幂等性只能保证单分区单会话内不重复。

事务
开启事务,必须开启幂等性。
Producer 在使用事务功能前,必须先自定义一个唯一的 transactional.id。有 了 transactional.id,即使客户端挂掉了, 它重启后也能继续处理未完成的事务。

流程详解:
1. 请求 Producer ID
Kafka Producer 首先向 Kafka 集群的 事务协调器(Transaction Coordinator) 发送请求,申请一个 Producer ID(PID)。
2. 返回 Producer ID
事务协调器为 Producer 分配并返回一个唯一的 PID,Producer 保存该 ID 用于后续事务操作。
3. 发送消息到 TopicA
Producer 向目标 Topic(如 TopicA)的 Leader 分区(Partition0)发送业务消息。
此时消息处于 “未提交” 状态,消费者默认不会消费这些消息。
4. 发送 Commit 请求
当业务消息发送完成后,Producer 向事务协调器发送 Commit 请求,明确要求提交当前事务。
5. 持久化 Commit 请求
事务协调器收到 Commit 请求后,将该事务的 “提交意图” 持久化到 特殊主题 __transaction_state 中。
__transaction_state 是 Kafka 内部的特殊主题,默认有 50 个分区,用于存储所有事务的状态信息。
分区路由规则:根据 transactional.id 的哈希值 % 50 计算,确定事务属于哪个分区,该分区的 Leader 所在 Broker 即为对应的事务协调器节点。
6. 返回成功
事务协调器向 Producer 返回 “Commit 请求已接收” 的成功响应。
7. 后台发送 Commit 请求
事务协调器在后台向 TopicA 的 Leader 分区发送事务提交指令,告知其可以将之前的未提交消息标记为 “可消费”。
8. 返回成功
TopicA 的 Leader 分区收到提交指令后,向事务协调器返回成功响应。
9. 持久化事务成功信息
事务协调器将 “事务已成功提交” 的最终状态持久化到 __transaction_state 中,完成整个事务的闭环。
API使用:
public class CustomProducerTransactions {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value 序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置事务 id(必须),事务 id 任意起名properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 初始化事务kafkaProducer.initTransactions();// 开启事务kafkaProducer.beginTransaction();try {// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {// 发送消息kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));}// int i = 1 / 0; // 模拟异常,触发事务回滚// 提交事务kafkaProducer.commitTransaction();} catch (Exception e) {// 终止事务(回滚)kafkaProducer.abortTransaction();} finally {// 5. 关闭资源kafkaProducer.close();}}
}数据有序
1):kafka在1.x版本之前保证数据单分区有序,条件如下: max.in.flight.requests.per.connection=1(Broker中最多缓冲的请求,默认每个Broker最多缓存五个。设为一说明只要发送完当前消息才能拉取下一个)。
2):kafka在1.x及以后版本保证数据单分区有序,条件如下:
- 开启幂等性 max.in.flight.requests.per.connection需要设置小于等于5。
- 未开启幂等性 max.in.flight.requests.per.connection需要设置为1。
原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据, 故无论如何,都可以保证最近5个request的数据都是有序的。
从总体来看,Kafka可以做到局部有序,总体无序,即每个分区内的消息是有序的,但分区之间消息的总和是无序的。
