分布式专题——21 Kafka客户端消息流转流程
1 Kafka 的 Java 客户端
-
引入依赖:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>3.8.0</version> </dependency>
1.1 消息发送者主流程
-
前置操作:发送消息前需提前创建 Topic,可使用命令:指定分区数(
--partitions 3
)和备份因子(--replication - factor 2
)bin/kafka - topics.sh --bootstrap - server worker1:9092 --create --topic Topic --partitions 3 --replication - factor 2
-
接下来可以使用 Kafka 提供的
Producer
类快速发送消息:public class MyProducer {// 定义Kafka集群的地址和端口private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";// 定义要发送消息的主题名称private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {// 配置Kafka生产者的属性参数Properties props = new Properties();// 设置Kafka集群的服务器地址列表,生产者会从这里获取集群元数据信息props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 设置消息key的序列化器,这里使用String类型的序列化器props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 设置消息value的序列化器,同样使用String类型的序列化器props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka生产者实例,传入配置属性Producer<String,String> producer = new KafkaProducer<>(props);// 创建计数器,用于等待所有异步发送完成,初始值为5(因为有5条消息)CountDownLatch latch = new CountDownLatch(5);for(int i = 0; i < 5; i++) {// 构建要发送的消息对象// 创建ProducerRecord,包含主题名、消息key、消息valueProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);// 使用三种不同的方式发送消息// 方式1: 单向发送(fire-and-forget)- 不关心发送结果,性能最高但可靠性最低producer.send(record);System.out.println("message "+i+" sended");// 方式2: 同步发送 - 调用get()方法会阻塞当前线程,直到收到服务端响应RecordMetadata recordMetadata = producer.send(record).get();// 从返回的RecordMetadata中获取消息的详细信息String topic = recordMetadata.topic(); // 消息所在主题int partition = recordMetadata.partition(); // 消息所在分区long offset = recordMetadata.offset(); // 消息在分区中的偏移量String message = recordMetadata.toString(); // 元数据的字符串表示System.out.println("message:["+ message+"] sended with topic:"+topic+"; partition:"+partition+ ";offset:"+offset);// 方式3: 异步发送 - 发送后立即返回,通过回调函数处理服务端响应producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {// 回调函数:当服务端返回响应时会被调用if(null != e){// 如果异常不为空,说明发送失败System.out.println("消息发送失败,"+e.getMessage());e.printStackTrace();}else{// 发送成功,打印消息元数据信息String topic = recordMetadata.topic();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);}// 计数器减1,表示一条消息的异步发送处理完成latch.countDown();}});}// 等待所有异步发送操作完成(计数器变为0)latch.await();// 关闭生产者,释放资源producer.close();} }
-
Producer 发送消息的整体步骤:
- 设置 Producer 核心属性:
Producer
可选属性由ProducerConfig
类管理,如指定集群地址(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
)等必选属性,且ProducerConfig
对重要属性都有 DOC 文档说明; - 构建消息:Kafka 消息为 Key-Value 结构,Key 和 Value 可为任意对象,Key 用于
Partition
分区,业务上更关心 Value; - 使用 Producer 发送消息:常用单向发送、同步发送和异步发送三种方式。
- 设置 Producer 核心属性:
1.2 消息消费者主流程
-
使用Kafka提供的
Consumer
类来快速消费消息;public class MyConsumer {// 定义Kafka集群的地址和端口private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";// 定义要消费的消息主题名称private static final String TOPIC = "disTopic";public static void main(String[] args) {// 配置Kafka消费者的属性参数Properties props = new Properties();// 设置Kafka集群的服务器地址列表props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 设置消费者组ID,同一个组内的消费者共同消费主题的消息(实现负载均衡)props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 设置消息key的反序列化器,与生产者端的序列化器对应props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 设置消息value的反序列化器,与生产者端的序列化器对应props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 创建Kafka消费者实例Consumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅要消费的主题(可以订阅多个主题)consumer.subscribe(Arrays.asList(TOPIC));// 持续消费消息的循环while (true) {// 从Kafka拉取消息// 使用poll方法拉取消息,设置超时时间为100毫秒// 如果100毫秒内没有消息,会返回空的消息集合ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));// 处理拉取到的消息// 遍历本次拉取到的所有消息记录for (ConsumerRecord<String, String> record : records) {// 打印每条消息的详细信息:偏移量、key、valueSystem.out.println("offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());}// 提交当前消费的offset(位移),确保消息不会重复消费// 同步提交:阻塞当前线程,直到offset提交成功后才继续下一轮消费// 可靠性高,但会影响吞吐量consumer.commitSync(); // 异步提交:非阻塞方式,提交请求发送后立即返回,继续下一轮消费// 吞吐量高,但可能在提交失败时导致消息重复消费// consumer.commitAsync(); }// 注意:实际应用中需要在适当的时候调用consumer.close()来关闭消费者} }
-
Consumer的整体步骤:
-
设置Consumer核心属性:可选属性由
ConsumerConfig
类管理,该类中大部分重要属性都有对应的DOC属性描述,其中BOOTSTRAP_SERVERS_CONFIG
是必须设置的属性; -
拉取消息:Kafka采用
Pull
模式,消费者主动从Broker上拉取一批感兴趣的消息; -
处理消息与提交位点:消费者拉取完消息后,交由业务自行处理,同时需要向Broker提交偏移量
offset
,否则Broker会重复推送消息;
-
-
Kafka 的客户端基本按照设置生产者/消费者属性、生产/拉取消息、处理消息这三大步骤运行,在具体使用中,给生产者和消费者设定合适的属性会极大影响客户端程序的执行方式,可参考Kafka官方配置文档(Apache Kafka)了解更多配置。
2 客户端工作机制
2.1 消费者分组消费机制
-
在Consumer中,需指定
GROUP_ID_CONFIG
(即group.id
)属性,它是标识消费者所属消费者组的唯一字符串;public static final String GROUP_ID_CONFIG = "group.id"; public static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";
一个用于标识该消费者所属消费者组的唯一字符串。如果消费者使用通过
subscribe(topic)
实现的组管理功能,或者使用基于 Kafka 的偏移量管理策略,那么这个属性是必需的。- 若消费者要使用组管理功能(如
subscribe(topic)
)或Kafka提供的offset管理策略,必须配置该属性; - 还有一个
GROUP_INSTANCE_ID_CONFIG
参数,可给组成员设置固定的instanceId
,用于减少Kafka不必要的rebalance
(再平衡);
- 若消费者要使用组管理功能(如
-
分组消费机制的过程:
-
消息推送规则:
- 生产者往Topic发消息时,会尽量均匀发送到Topic的各个Partition;
- 消息会被推送给所有订阅该Topic的消费者组,且每个消费者组(ConsumerGroup)中只推送一份,即同一消费者组内多个消费者实例共同消费一个消息副本,不同消费者组会重复消费消息副本;
-
Offset偏移量:
- Offset表示每个消费者组在每个Partition中已消费处理的进度;
- 可通过
./kafka-consumer-groups.sh --bootstrap-server worker1:9092 --describe --group test
查看消费者组的Offset记录情况,其中CURRENT-OFFSET
是当前组消费的消息偏移量,LOG-END-OFFSET
是分区最新的消息偏移量,LAG
是未消费的消息数;
-
Offset提交:
-
消费者处理完消息后需主动向Kafka的Broker提交Offset,提交后Broker更新消费进度,表明该消息已被该消费者组处理;
-
若未提交,Broker会认为消息未处理,重新推送给同组其他消费者实例;
-
提交方式有业务端主动调用
commitAsync
或commitSync
方法,也可配置ENABLE_AUTO_COMMIT_CONFIG
(即enable.auto.commit
)属性开启自动提交,开启后消费者的offset会在后台定期提交;public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"; private static final String ENABLE_AUTO_COMMIT_DOC = "If true the consumer's offset will be periodically committed in the background.";
如果为 true,消费者的偏移量将在后台定期提交。
-
-
-
可以看到,Offset 是 Kafka 进行消息推送控制的关键之处。这里需要思考几个问题:
-
Partition与Consumer实例的消费限制:
- 问题:若一个Partition对应多个Consumer实例,每个实例会提交同一Partition的不同Offset,Broker无法判断该听谁的,所以一个Partition最多同时被一个Consumer消费;
- Offset按Group、Partition分开记录。例如有四个Partition的Topic,同一消费者组最多配置四个消费者实例;
-
Offset数据安全性:
- 问题:Offset保存在Broker端但由消费者主导推进,不够安全;
- Consumer中提供
AUTO_OFFSET_RESET_CONFIG
参数,指定服务端Offset不存在时的处理方式,可选earliest
(自动设为当前最早的offset)、latest
(自动设为当前最晚的offset)、none
(找不到对应offset就抛异常)等,作为服务端兜底保障;
-
Offset提交方式的优缺点与优化思路:
- 问题:
- 异步提交时消费者的效率高,但消息处理失败却提交了Offset会导致消息丢失;
- 同步提交可以保证消息不丢失(业务处理失败可不提交Offset),但消费者处理消息慢,且Broker若等不及消费者提交会认为处理失败,推送给同组其他消费者导致消息重复;
- 这类问题根源是Offset反映的消息处理进度与业务处理进度不同步,可将Offset从Broker端抽取到第三方存储(如Redis)自行管理,从而开发人员可以根据业务处理进度推进Offset更新。
- 问题:
-
2.2 生产者拦截器机制
-
生产者拦截器机制允许客户端在生产者将消息发送到Kafka集群之前,对消息进行拦截,甚至可以修改消息内容;
-
相关配置参数:
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"; public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. "+ "Implementing the <code>org.apache.kafka.clients.producer.ProducerInterceptor</code> interface allows you to intercept (and possibly mutate) the records "+ "received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.";
要用作拦截器的类的列表。实现
org.apache.kafka.clients.producer.ProducerInterceptor
接口,允许你在生产者接收到的记录被发布到 Kafka 集群之前,对这些记录进行拦截(甚至可能修改)。默认情况下,没有拦截器;- 涉及到
Producer
中的INTERCEPTOR_CLASSES_CONFIG
参数,它的作用是指定要用作拦截器的类列表; org.apache.kafka.clients.producer.ProducerInterceptor
接口支持在生产者将记录发布到Kafka集群之前,对生产者接收到的记录进行拦截(甚至可能修改),默认情况下没有拦截器;
- 涉及到
-
我们可以定义自己的拦截器实现类:
public class MyInterceptor implements ProducerInterceptor {// 发送消息时触发,方法内可以对ProducerRecord进行操作@Overridepublic ProducerRecord onSend(ProducerRecord producerRecord) {System.out.println("prudocerRecord : " + producerRecord.toString());return producerRecord;}// 收到服务端响应时触发@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {System.out.println("acknowledgement recordMetadata:"+recordMetadata.toString());}// 连接关闭时触发@Overridepublic void close() {System.out.println("producer closed");}// 整理配置项@Overridepublic void configure(Map<String, ?> map) {System.out.println("=====config start======");for (Map.Entry<String, ?> entry : map.entrySet()) {System.out.println("entry.key:"+entry.getKey()+" === entry.value: "+entry.getValue());}System.out.println("=====config end======");} }
-
接下来在生产者中指定上面自定义的拦截器类(如果有多个拦截器类,用逗号隔开):
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");
-
使用场景:
- 拦截器机制使用得比较少,主要用在一些统一添加时间等类似的业务场景;
- 比如,用Kafka传递一些POJO(普通Java对象),就可以用拦截器统一添加时间属性;
- 因为通常Kafka传递的是
String
类型消息,而POJO类型消息也可以通过序列化机制在Kafka中传递,这就可以使用消息序列化机制。
2.3 消息序列化机制
2.3.1 生产者端的序列化配置
-
在
1.1 消息发送者主流程
的简单示例里,Producer指定了KEY_SERIALIZER_CLASS_CONFIG
和VALUE_SERIALIZER_CLASS_CONFIG
这两个属性,在ProducerConfig
中都有配套的说明属性;public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer"; public static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the <code>org.apache.kafka.common.serialization.Serializer</code> interface.";public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>org.apache.kafka.common.serialization.Serializer</code> interface.";
实现了
org.apache.kafka.common.serialization.Serializer
接口的键(key)的序列化类。实现了
org.apache.kafka.common.serialization.Serializer
接口的值(value)的序列化类。- 通过这两个参数,能指定消息生产者如何将消息的key和value序列化成二进制数据;
-
key的作用:key是用于进行分区的可选项,Kafka通过key来判断消息要分发到哪个Partition;
- 若没有填写key,Kafka会自动选择Partition;
- 若填写了key,会通过声明的Serializer序列化接口,将key转换成一个
byte[]
数组,然后对key进行hash来选择Partition。这样能保证key相同的消息会分配到相同的Partition中;
-
value的作用:value是业务上比较关心的消息。Kafka需要将Value对象通过Serializer序列化接口,转换成
byte[]
数组,这样才能较好地在网络上传输Value信息,以及将Value信息落盘到操作系统的文件当中。
2.3.2 消费者端的反序列化配置
-
因为生产者要对消息进行序列化,所以消费者拉取消息时,自然需要进行反序列化。在Consumer中,有对应的两个反序列化配置:
public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer"; public static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Deserializer</code> interface.";public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer"; public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Deserializer</code> interface.";
实现了
org.apache.kafka.common.serialization.Deserializer
接口的键(key)的反序列化类。实现了
org.apache.kafka.common.serialization.Deserializer
接口的值(value)的反序列化类。
2.3.3 自定义消息格式的序列化实现
-
Kafka对于常用的一些基础数据类型,已经提供了对应的序列化和反序列化实现。但如果需要使用一些自定义的消息格式,比如自己定制的POJO(普通Java对象),就需要定制具体的实现类;
-
在进行自定义序列化机制时,需要考虑如何用二进制来描述业务数据。例如对于一个通常的POJO类型,可将其属性拆分为两种类型:
-
定长的基础类型:像Integer、Long、Double等,这些基础类型转化成二进制数组都是定长的。这类属性可以直接转成序列化数组,在反序列化时,只要按照定长去读取二进制数据就可以反序列化了;
-
不定长的浮动类型:如String,或者基于String的JSON类型等。这类属性转化成二进制数组,长度是不一定的。通常的处理方式是先往二进制数组中写入一个定长的数据的长度数据(用Integer或者Long类型),然后再继续写入数据本身。这样,反序列化时,就可以先读取一个定长的长度,再按照这个长度去读取对应长度的二进制数据,从而读取到数据的完整二进制内容;
-
2.3.4 序列化机制的重要性与拓展
- 高效的序列化实现能够极大提升分布式系统的网络传输以及数据落盘的能力;
- 例如对于一个User对象,既可以使用JSON字符串这种简单粗暴的序列化方式,也可以选择按照各个字段进行组合序列化的方式,后者占用空间更小,序列化速度也更快;
- Kafka在文件落盘时,也设计了非常高效的数据序列化实现,这是Kafka高效运行的一大支撑;
- 在很多其他业务场景中,也需要提供更高效的序列化实现;
- 比如使用MapReduce框架时,需要自定义数据的序列化方式;
- 使用Netty框架进行网络调用时,为了防止粘包,也需要定制数据的序列化机制;
- 在这些场景下,进行序列化的基础思想和这里介绍的是一样的;
- 另外,如果能进一步设计出更简短高效的数据压缩算法,对二进制数据进行压缩,也能进一步提高数据传输的效率,这体现了算法在其中的直接作用。
2.4 消息分区路由机制
-
了解前面两个机制后,可以自然地想到一个问题:消息如何进行路由?
-
Producer会根据消息的key选择Partition,具体过程是怎样的呢?
-
一个消费者组会共同消费一个Topic下的多个Partition中的同一套消息副本,那Consumer节点是不是可以决定自己消费哪些Partition的消息呢?
2.4.1 生产者端:通过Partitioner确定消息分区
-
生产者(Producer)会根据消息的
key
选择要发送到的Partition,具体是通过指定一个Partitioner
来实现消息分配:public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class"; private static final String PARTITIONER_CLASS_DOC = "A class to use to determine which partition to be send to when produce the records. Available options are:" +"<ul>" +"<li>If not set, the default partitioning logic is used. " +"This strategy will try sticking to a partition until at least " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy:" +"<ul>" +"<li>If no partition is specified but a key is present, choose a partition based on a hash of the key</li>" +"<li>If no partition or key is present, choose the sticky partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are produced to the partition.</li>" +"</ul>" +"</li>" +"<li><code>org.apache.kafka.clients.producer.RoundRobinPartitioner</code>: This partitioning strategy is that " +"each record in a series of consecutive records will be sent to a different partition(no matter if the 'key' is provided or not), " +"until we run out of partitions and start over again. Note: There's a known issue that will cause uneven distribution when new batch is created. " +"Please check KAFKA-9965 for more detail." +"</li>" +"</ul>" +"<p>Implementing the <code>org.apache.kafka.clients.producer.Partitioner</code> interface allows you to plug in a custom partitioner.";
-
在Producer中,Kafka是通过一个
Partitioner
接口的具体实现(PARTITIONER_CLASS_CONFIG
,即partitioner.class
),来决定一个消息如何根据Key分配到对应的Partition上的; -
若未设置
Partitioner
实现类,使用默认分区逻辑:- 若既未指定分区也无
key
,会选择“粘性分区”,当该分区的batch.size
(默认16KB)满,或等待时间超过linger.ms
(默认0毫秒)时,才切换分区; - 若未指定分区但有
key
,基于key
的哈希值选择分区,保证相同key
的消息进入同一分区;
- 若既未指定分区也无
-
在3.2.0版本及之前,Kafka提供
RoundRobinPartitioner
、DefaultPartitioner
、UniformStickyPartitioner
三种默认实现类:- 后两种已过期,被默认实现机制替换;
- 其中
RoundRobinPartitioner
是轮询发送,不考虑消息大小和Broker性能差异,使用较少; - 默认的
Sticky
策略会为生产者分配一个分区后尽可能持续使用,直到该分区batch.size
满或等待超时(即上面讲解的默认分区逻辑);
-
我们可以自定义实现一个Partitioner:
- 在
org.apache.kafka.clients.producer.Partitioner
接口中,核心是要实现partition
方法,可根据相关信息选择分区(如用key
对分区数取模等); - Topic下所有Partition信息都在cluster参数中,可通过
cluster.partitionsForTopic(topic)
获取。
- 在
2.4.2 消费者端:通过分区分配策略决定消费的Partition
-
消费者组(Consumer Group)共同消费一个Topic下的多个Partition中的同一套消息副本,Consumer节点可通过指定
PARTITION_ASSIGNMENT_STRATEGY
(即partition.assignment.strategy
)决定自己消费哪些Partition的消息:public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG ="partition.assignment.strategy"; private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "A list of class names or class types, " +"ordered by preference, of supported partition assignment strategies that the client will use to distribute " +"partition ownership amongst consumer instances when group management is used. Available options are:" +"<ul>" +"<li><code>org.apache.kafka.clients.consumer.RangeAssignor</code>: Assigns partitions on a per-topic basis.</li>" +"<li><code>org.apache.kafka.clients.consumer.RoundRobinAssignor</code>: Assigns partitions to consumers in a round-robin fashion.</li>" +"<li><code>org.apache.kafka.clients.consumer.StickyAssignor</code>: Guarantees an assignment that is " +"maximally balanced while preserving as many existing partition assignments as possible.</li>" +"<li><code>org.apache.kafka.clients.consumer.CooperativeStickyAssignor</code>: Follows the same StickyAssignor " +"logic, but allows for cooperative rebalancing.</li>" +"</ul>" +"<p>The default assignor is [RangeAssignor, CooperativeStickyAssignor], which will use the RangeAssignor by default, " +"but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list.</p>" +"<p>Implementing the <code>org.apache.kafka.clients.consumer.ConsumerPartitionAssignor</code> " +"interface allows you to plug in a custom assignment strategy.</p>";
-
在Consumer中,通过
PARTITION_ASSIGNMENT_STRATEGY_CONFIG
指定分区分配策略类列表,用于在使用组管理时,在多个Consumer实例间分配分区所有权; -
上面的代码中可以看到Kafka默认提供了三种消费者的分区分配策略:
- range策略:按Topic逐个分配。例如一个Topic有10个Partition(0 - 9),一个消费者组有3个Consumer(consumer1 - 3)。那么会将分区0 - 3分给一个Consumer,4 - 6分给一个Consumer,7 - 9分给一个Consumer;
- round-robin策略:轮询分配。比如0、3、6、9分区给consumer1,1、4、7分区给consumer2,2、5、8分区给consumer3;
- sticky策略:粘性策略,有两个原则:
- 开始分区时尽量保持分配均匀(比如按range策略分配,这一步随机);
- 分区分配尽可能与上一次保持一致。若某个Consumer宕机,会保持其他Consumer原有分区分配,将宕机Consumer的分区尽量平均分配给剩余Consumer,保证Consumer数据稳定性;
- 默认分配器是
[RangeAssignor, CooperativeStickyAssignor]
;
-
实现
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
接口,或继承AbstractPartitionAssignor
抽象类,可以自定义消费者订阅方式; -
官方默认提供的生产者端默认分区器以及消费者端的
RangeAssignor + CooperativeStickyAssignor
分配策略,在大部分场景下都是高效算法,深入理解这些算法,有助于深入理解MQ场景及横向对比其他MQ产品。
2.5 生产者消息缓存机制
-
Kafka生产者为避免高并发请求给服务端造成过大压力,不是逐条发送消息到服务端,而是通过高速缓存,集中消息后批量发送,这是高并发处理中常用的机制;
-
Kafka的消息缓存机制涉及
KafkaProducer
中的两个关键组件:accumulator
(记录累加器,即RecordAccumulator
)和sender
(数据发送线程);// 记录累加器 int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG)); this.accumulator = new RecordAccumulator(logContext,batchSize,this.compressionType,lingerMs(config),retryBackoffMs,deliveryTimeoutMs, partitionerConfig,metrics,PRODUCER_METRIC_GROUP_NAME,time,apiVersions,transactionManager,new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));// 数据发送线程 this.sender = newSender(logContext, kafkaClient, this.metadata);
2.5.1 RecordAccumulator
-
**
RecordAccumulator
**是Kafka生产者的消息累加器,KafkaProducer
要发送的消息会先在RecordAccumulator
中缓存,之后再分批发送给Kafka Broker; -
在
RecordAccumulator
中:- 针对每一个
Partition
,会维护一个Deque
双端队列,这些队列与Kafka服务端Topic
下的Partition
基本对应,每个Deque
里会放入若干个ProducerBatch
数据; KafkaProducer
每次发送的消息会根据key
分配到对应的Deque
队列中,每个消息保存在队列中的某一个ProducerBatch
里,消息分发规则由Partitioner
组件完成;
- 针对每一个
-
这里主要涉及到两个参数:
// RecordAccumulator缓冲区大小
public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are "+ "sent faster than they can be delivered to the server the producer will block for <code>" + MAX_BLOCK_MS_CONFIG + "</code> after which it will throw an exception."+ "<p>"+ "This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since "+ "not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if "+ "compression is enabled) as well as for maintaining in-flight requests.";// 缓冲区每一个batch的大小
public static final String BATCH_SIZE_CONFIG = "batch.size";
private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent"+ " to the same partition. This helps performance on both the client and the server. This configuration controls the "+ "default batch size in bytes. "+ "<p>"+ "No attempt will be made to batch records larger than this size. "+ "<p>"+ "Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. "+ "<p>"+ "A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable "+ "batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a "+ "buffer of the specified batch size in anticipation of additional records."+ "<p>"+ "Note: This setting gives the upper bound of the batch size to be sent. If we have fewer than this many bytes accumulated "+ "for this partition, we will 'linger' for the <code>linger.ms</code> time waiting for more records to show up. "+ "This <code>linger.ms</code> setting defaults to 0, which means we'll immediately send out a record even the accumulated "+ "batch size is under this <code>batch.size</code> setting.";
-
BUFFER_MEMORY_CONFIG
(buffer.memory
):RecordAccumulator
缓冲区大小,即生产者可用于缓存等待发送到服务器的记录的总字节数;- 若记录发送速度快于传递到服务器的速度,生产者会阻塞
MAX_BLOCK_MS_CONFIG
(默认60秒)时间,之后会抛出异常; - 该设置大致对应生产者将使用的总内存,但不是硬限制,因为生产者使用的内存并非都用于缓存,还有部分用于压缩(若启用压缩)以及维护进行中的请求;
- 若记录发送速度快于传递到服务器的速度,生产者会阻塞
-
BATCH_SIZE_CONFIG
(batch.size
):缓冲区中每个batch
的大小;- 生产者会尝试将发往同一分区的多条记录批量处理成更少的请求,这对客户端和服务器的性能都有帮助;
- 该配置控制默认的批大小(以字节为单位),不会尝试批量处理大于此大小的记录;
- 发送到Broker的请求将包含多个批,每个有数据可发送的分区对应一个批;
- 批大小过小会降低批处理的频率,可能减少吞吐量(批大小为0会完全禁用批处理);
- 批大小过大可能会更浪费内存,因为会提前分配指定批大小的缓冲区以等待额外记录;
- 如果为某个分区累积的字节数少于此数值,会等待
linger.ms
时间以等待更多记录出现;
2.5.2 Sender
线程的工作机制
-
sender
是KafkaProducer
中用于发送消息的单独线程,每个KafkaProducer
对象对应一个sender
线程,负责将RecordAccumulator
中的消息发送给Kafka; -
消息获取与发送时机:
Sender
不是一次性发送RecordAccumulator
中缓存的所有消息,而是每次只拿一部分;- 它会获取
RecordAccumulator
中缓存内容达到BATCH_SIZE_CONFIG
大小的ProducerBatch
消息; - 若消息少,
ProducerBatch
中的消息大小长期达不到BATCH_SIZE_CONFIG
,Sender
最多等待LINGER_MS_CONFIG
时长(默认值为0),然后读取ProducerBatch
中的消息;
-
消息缓存与发送:
Sender
读取消息后,会以Broker
为键,缓存到对应的队列中,这些队列中的消息称为InflightRequest
;- 之后这些
InflightRequest
会逐一发送到Kafka对应的Broker
中,直到收到Broker
的响应才从队列中移除; - 这些队列最多缓存
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
(默认值为5)个请求; - 生产者缓存机制的主要目的是打包消息,减少网络IO频率,所以在
Sender
的InflightRequest
队列中,消息是一批一起往Broker
发送的,这意味着一批消息没有固定的先后顺序;
-
涉及到的主要参数如下:
// 生产者会将在请求传输间隙到达的任何记录组合成一个批量请求 public static final String LINGER_MS_CONFIG = "linger.ms"; private static final String LINGER_MS_DOC = "The producer groups together any records that arrive in between request transmissions into a single batched request. "+ "Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to "+ "reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount "+ "of artificial delay—that is, rather than immediately sending out a record, the producer will wait for up to "+ "the given delay to allow other records to be sent so that the sends can be batched together. This can be thought "+ "of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once "+ "we get <code>" + BATCH_SIZE_CONFIG + "</code> worth of records for a partition it will be sent immediately regardless of this "+ "setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the "+ "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting <code>" + LINGER_MS_CONFIG + "=5</code>, "+ "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.";// 客户端在阻塞前,在单个连接上发送的未确认请求的最大数量 public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection"; private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."+ " Note that if this configuration is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"+ " message reordering after a failed send due to retries (i.e., if retries are enabled); "+ " if retries are disabled or if <code>enable.idempotence</code> is set to true, ordering will be preserved."+ " Additionally, enabling idempotence requires the value of this configuration to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."+ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. ";
LINGER_MS_CONFIG
(linger.ms
):生产者会将在请求传输间隙到达的任何记录组合成一个批量请求;- 通常这只在负载下发生(记录到达速度快于发送速度),但在某些情况下,客户端可能希望即使在中等负载下也减少请求数量;
- 该设置通过添加少量人为延迟来实现:不是立即发送记录,而是最多等待指定延迟,以允许其他记录被发送,从而将发送批量处理。这类似于TCP中的Nagle算法;
- 该设置给出了批处理延迟的上限:一旦为某个分区获取到
BATCH_SIZE_CONFIG
值的记录,就会立即发送,不管此设置;如果为某个分区累积的字节数少于此数值,会等待指定时间以等待更多记录出现。默认值为0(无延迟),设置该值(如5ms)会减少发送的请求数量,但在无负载时会给记录增加最多5ms的延迟;
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
:客户端在阻塞前,在单个连接上发送的未确认请求的最大数量;- 注意,如果此配置大于1且
enable.idempotence
设置为false,在发送失败重试时(若启用重试)存在消息重新排序的风险;如果禁用重试或enable.idempotence
设置为true,顺序会被保留; - 此外,启用幂等性要求此配置的值小于或等于
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE
。如果存在冲突配置且未明确启用幂等性,幂等性会被禁用;
- 注意,如果此配置大于1且
-
IO请求与响应:
Sender
会通过Selector
组件完成与Kafka的IO请求,并接收Kafka的响应。当topic
的某个partition
的批已满或新建了一个批时,会唤醒sender
线程(如下面代码中this.sender.wakeup()
所示);// org.apache.kafka.clients.producer.KafkaProducer#doSend if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch",record.topic(), appendCallbacks.getPartition());this.sender.wakeup(); }
2.5.3 缓存机制的优化意义
- Kafka的生产者缓存机制是Kafka面对海量消息时非常重要的优化机制;
- 合理优化相关参数,对Kafka集群性能提升非常重要;
- 例如,若消息体大,应考虑加大
batch.size
,尽量提升批缓存效率; - 若
Producer
要发送的消息非常多,需考虑加大total.memory
参数,尽量避免缓存不够造成阻塞; - 若发现生产者发送消息慢,可考虑提升
max.in.flight.requests.per.connection
参数,加大消息发送的吞吐量。
- 例如,若消息体大,应考虑加大
2.6 发送应答机制
-
在Producer将消息发送到Broker后,需要确定消息是否成功发送到Broker,这涉及到
Producer
端的ACKS_CONFIG
(即acks
)属性;public static final String ACKS_CONFIG = "acks"; private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the "+ " durability of records that are sent. The following settings are allowed: "+ " <ul>"+ " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the"+ " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be"+ " made that the server has received the record in this case, and the <code>retries</code> configuration will not"+ " take effect (as the client won't generally know of any failures). The offset given back for each record will"+ " always be set to <code>-1</code>."+ " <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond"+ " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after"+ " acknowledging the record but before the followers have replicated it then the record will be lost."+ " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to"+ " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"+ " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting."+ "</ul>"+ "<p>"+ "Note that enabling idempotence requires this config value to be 'all'."+ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.";
-
acks
参数控制生产者要求 Leader 在认为请求完成前已收到的确认数,影响发送记录的持久性,有以下取值:-
acks=0
:生产者不等待Broker端的任何确认,只要将记录添加到套接字缓冲区就认为发送成功。无法保证服务器已收到记录,retries
配置也不会生效(客户端通常不知道任何失败),每条记录返回的偏移量总是设为-1
。此时吞吐量最高,但数据安全性最低; -
acks=1
:Leader 会将记录写入自己的本地日志,但不等待所有 Follower 的完全确认就响应。若 Leader 在确认记录后但 Follower 复制记录前立即失败,记录会丢失。这是一种相对中和的策略,Leader Partition完成自己的消息写入后就向生产者返回结果; -
acks=all
(或acks=-1
):Leader 会等待所有同步副本(in-sync replicas)确认记录。只要至少有一个同步副本存活,就能保证记录不会丢失,这是最强的可用保证,与acks=1
设置等效。启用幂等性要求此配置值为all
;
-
-
生产环境中的选择:
-
acks=0
可靠性太差,很少使用; -
acks=1
一般用于传输日志等允许个别数据丢失的场景,使用范围最广; -
acks=-1
(即acks=all
)一般用于传输敏感数据,比如与钱相关的数据;
-
-
当
ack
设置为all
或者-1
时,Kafka并非强制要求所有Partition
都写入数据后才响应。在Kafka的Broker服务端有min.insync.replicas
配置参数,控制Leader Partition
在完成多少个Partition
的消息写入后,向Producer
返回响应,该参数可在broker.conf
文件中配置:min.insync.replicas When a producer sets acks to "all" (or "-1"), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend). When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of "all". This will ensure that the producer raises an exception if a majority of replicas do not receive a write.Type: int Default: 1 Valid Values: [1,...] Importance: high Update Mode: cluster-wide
- 当生产者将
acks
设为"all"
(或"-1"
)时,min.insync.replicas
指定了必须确认写入的最小副本数,若无法满足此最小值,生产者会抛出异常(NotEnoughReplicas
或NotEnoughReplicasAfterAppend
); - 例如,创建一个复制因子为3的Topic,将
min.insync.replicas
设为2,并以acks
为"all"
生产,若大多数副本未收到写入,生产者会抛出异常;
- 当生产者将
-
注意:
acks
设置成all
或者-1
,能够有效提高消息的安全性,但应答机制只是保证Broker可以给Producer
一个比较靠谱的响应,并不代表就保证了消息不丢失。Producer
拿到响应后如何进行后续处理,Kafka是不参与的。
2.7 生产者消息幂等性
-
上节中对于
acks
属性的说明,会看到另外一个单词:idempotence,这个单词的意思是幂等性。那幂等性是什么意思呢? -
当
Producer
的acks
设置成1或-1时,Producer
每次发送消息都需要获取Broker
端返回的RecordMetadata
,这个过程涉及两次跨网络请求;- 网络不可靠,在高并发场景下,若第一步发送消息请求成功,但第二步获取
RecordMetadata
的请求无返回,Producer
会认为消息发送失败并发起重试(重试次数由ProducerConfig.RETRIES_CONFIG
控制,默认是Integer.MAX
); - 此时
Producer
可能重复发送多条消息到Broker
,这就需要Kafka
保证无论Producer
向Broker
发送多少次重复数据,Broker
端都只保留一条消息,这就是生产者的幂等性问题;
- 网络不可靠,在高并发场景下,若第一步发送消息请求成功,但第二步获取
-
先来看Kafka中对于幂等性属性的介绍:
public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence"; public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer " + "retries due to broker failures, etc., may write duplicates of the retried message in the stream. "+ "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE+ " (with message ordering preserved for any allowable value), <code>" + RETRIES_CONFIG + "</code> to be greater than 0, and <code>"+ ACKS_CONFIG + "</code> must be 'all'. "+ "<p>"+ "Idempotence is enabled by default if no conflicting configurations are set. "+ "If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. "+ "If idempotence is explicitly enabled and conflicting configurations are set, a <code>ConfigException</code> is thrown.";private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;// 客户端在阻塞前,在单个连接上发送的未确认请求的最大数量(2.5.2 Sender线程的工作机制中也有提到) public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection"; private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."+ " Note that if this config is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"+ " message re-ordering after a failed send due to retries (i.e., if retries are enabled)."+ " Additionally, enabling idempotence requires this config value to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."+ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.";
-
ENABLE_IDEMPOTENCE_CONFIG
(enable.idempotence
):- 当设置为
true
时,生产者会确保每条消息在流中恰好有一个副本;若为false
,生产者因Broker
故障等原因重试时,可能会在流中写入重试消息的重复副本; - 启用幂等性要求
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
小于或等于MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE
(值为5),RETRIES_CONFIG
大于0,且ACKS_CONFIG
必须为all
; - 若没有冲突配置,幂等性默认启用;若有冲突配置且未显式启用幂等性,则幂等性禁用;若显式启用幂等性但有冲突配置,会抛出
ConfigException
;
- 当设置为
-
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
:客户端在阻塞前,单个连接上未确认请求的最大数量;- 若此配置大于1且
enable.idempotence
为false
,发送失败重试时(若启用重试)存在消息重新排序的风险; - 若禁用重试或
enable.idempotence
为true
,顺序会被保留; - 启用幂等性要求此配置值小于或等于
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE
;
- 若此配置大于1且
-
-
看一下分布式数据传递的语义:
-
at-least-once(至少一次):保证数据不丢失,但不保证数据不重复。例如往银行存100块,发送存钱消息到MQ,一次发送失败就重试几次直到成功;
-
at-most-once(最多一次):保证数据不重复,但不保证数据不丢失。例如往银行存100块,不管消息发送多少次,银行最多只记录一次;
-
exactly-once(精确一次):保证数据既不重复也不丢失,需要非常精密的设计;
-
-
Kafka 为保证消息发送的
Exactly-once
语义,增加了以下概念:-
PID:每个新的
Producer
在初始化过程中会被分配一个唯一的PID
,该PID
对用户不可见; -
Sequence Number:对于每个
PID
,Producer
针对Partition
会维护一个从0开始单调递增的sequenceNumber
。当Producer
往同一个Partition
发送消息时,这个Sequence Number
会加1,并随消息一起发往Broker
; -
Broker端的序列号维护:
Broker
端会针对每个<PID, Partition>
维护一个序列号(SN
)。只有当对应的SequenceNumber = SN + 1
时,Broker
才会接收消息,同时将SN
更新为SN + 1
;若SequenceNumber
过小,说明消息已写入,无需重复写入;若SequenceNumber
过大,说明中间可能有数据丢失,会对生产者抛出OutOfOrderSequenceException
;
-
-
这样,Kafka在开启幂等性控制后,
Broker
端会保证每条消息在一次发送过程中最多持久化一条,保证at-most-once
语义;再结合将生产者的acks
参数设置成1或-1,保证at-least-once
语义,整体上就保证了Exactly-once
语义; -
给
Producer
打开幂等性后,不管Producer
往同一个Partition
发送多少条消息,都可通过幂等机制保证消息的Exactly-only
语义,但这并不意味着消息就绝对安全,还需结合其他机制和场景综合考量。
2.8 生产者数据压缩机制
-
生产者端数据压缩
-
生产者往Broker发送消息时,对消息进行压缩,能降低Producer到Broker的网络数据传输压力,同时减少Broker的数据存储压力;
-
配置参数:
/** <code>compression.type</code> */ public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid "+ " values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, <code>lz4</code>, or <code>zstd</code>. "+ "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).";
- Kafka生产者支持四种压缩算法,分别是
none
(无压缩,默认)、gzip
、snappy
、lz4
、zstd
; - 其中
zstd
算法数据压缩比最高,但吞吐量不高;lz4
在吞吐量方面优势明显;
- Kafka生产者支持四种压缩算法,分别是
-
注意:压缩消息会增加CPU消耗,若CPU资源紧张,不建议进行压缩;
-
-
Broker端数据压缩配置
-
配置文件:在Broker端的
broker.conf
文件中可配置压缩算法;compression.type Specify the final compression type for a given topic. This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd'). It additionally accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the original compression codec set by the producer.Type: string Default: producer Valid Values: [uncompressed, zstd, lz4, snappy, gzip, producer] Server Default Property: compression.type Importance: medium
-
默认行为:正常情况下,Broker从Producer端接收到消息后不会对其进行任何修改;
-
异常情况:若Broker端和Producer端指定了不同的压缩算法,会产生很多异常表现;
-
compression.type
配置说明:该配置指定给定主题的最终压缩类型,接受标准压缩编码(gzip
、snappy
、lz4
、zstd
),还接受uncompressed
(等效于无压缩)和producer
(保留生产者设置的原始压缩编码)。其类型为string
,默认值是producer
,有效取值包括uncompressed
、zstd
、lz4
、snappy
、gzip
、producer
,重要性为medium
;
-
-
消费者端解压
-
若开启了消息压缩,消费者端需要进行解压。在Kafka中,消息从Producer到Broker再到Consumer会一直携带消息的压缩方式,当Consumer读取到消息集合时,能知道消息使用的压缩算法,从而自行解压;
-
版本匹配:要注意应用中使用的Kafka客户端版本和Kafka服务端版本是否匹配,否则可能出现解压问题。
-
2.9 生产者消息事务
-
生产者消息幂等性机制能解决单生产者消息写入单分区的幂等性问题,但当生产者一次发送多条消息,且这些消息可能写入多个分布在不同Broker上的Partition时,幂等性机制无法保证所有消息的幂等性。此时需要事务机制,保证一批消息同时成功(保持幂等性)或同时失败(便于生产者整体重试,避免消息重复);
-
Kafka引入了消息事务机制,涉及
Producer
中的几个API:// 初始化事务 void initTransactions(); // 开启事务 void beginTransaction() throws ProducerFencedException; // 提交事务 void commitTransaction() throws ProducerFencedException; // 放弃事务(类似于回滚事务的操作) void abortTransaction() throws ProducerFencedException;
-
可以做一个这样的测试:先启动一个订阅了
disTopic
这个Topic的消费者,启动下面这个生产者,进行测试public class TransactionErrorDemo {private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {// 配置 Kafka 相关属性(如集群地址、事务 ID、序列化类等)Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"111");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 初始化生产者后开启事务,循环发送消息Producer<String,String> producer = new KafkaProducer<>(props);producer.initTransactions();producer.beginTransaction();for(int i = 0; i < 5; i++) {ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);producer.send(record);// 当发送到第 3 条消息时,主动放弃事务(调用abortTransaction()),此时之前发送的消息会一起回滚if(i == 3){System.out.println("error");producer.abortTransaction();}}System.out.println("message sended");try {Thread.sleep(10000);} catch (Exception e) {e.printStackTrace();} // producer.commitTransaction();producer.close();} }
-
Kafka 的事务消息会做两件事情:
TransactionId
与PID
的对应关系:一个TransactionId
只会对应一个PID
。若当前一个Producer
的事务未提交,而另一个新的Producer
使用相同的TransactionId
,旧的生产者会立即失效,无法继续发送消息;- 跨会话事务对齐:如果某个
Producer
实例异常宕机,事务未被正常提交,新的TransactionId
相同的Producer
实例会对旧的事务进行补齐,保证旧事务要么提交,要么终止,使新的Producer
实例能以正常状态开始工作;
-
所以,如果一个Producer需要发送多条消息,通常比较安全的发送方式是这样的:
public class TransactionProducer {// 同样配置相关属性private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"111");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 初始化并开启事务Producer<String,String> producer = new KafkaProducer<>(props);producer.initTransactions();producer.beginTransaction();// 发送多条消息try{for(int i = 0; i < 5; i++) {ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);producer.send(record);}// 提交事务producer.commitTransaction();}catch (ProducerFencedException e){ // 若捕获到ProducerFencedException异常,就放弃事务,最后在finally块中关闭生产者producer.abortTransaction();}finally {producer.close();}} }
-
事务ID参数可以任意起名,但建议包含一定的业务唯一性,以便于管理和识别;
-
生产者的事务消息机制保证了
Producer
发送消息的安全性,但不保证已经提交的消息一定能被所有消费者消费。
3 客户端流程总结
-
生产者(Producer)流程
-
主线程操作
- 首先通过
new Properties
配置生产者相关属性,再创建ProducerRecord
(待发送的消息记录),然后调用send(ProducerRecord)
方法发送消息; - 发送前,消息会依次经过
Interceptors
(拦截器,可对消息进行拦截甚至修改)、Serializer
(序列化器,将消息序列化为字节数组)、partitioner
(分区器,决定消息发送到哪个分区)处理;
- 首先通过
-
消息缓存与发送
- 处理后的消息会被放入
RecordAccumulator
(记录累加器)中,RecordAccumulator
针对每个分区维护一个Deque
(双端队列),队列里存放着消息的字节数组形式; Sender
线程负责从RecordAccumulator
中获取消息,通过NetworkClient
进行网络通信。NetworkClient
维护着InflightBatches
(飞行中的批次),其中包含发往各个Broker
(如Broker1
)的InflightRequest
(飞行中的请求),最后通过Selector
组件完成与Kafka集群的IO请求,将消息发送到Kafka集群的Leader Partition
(由Broker1
等承载),同时等待acks
(发送应答,确认消息是否成功发送);
- 处理后的消息会被放入
-
-
Kafka集群:Kafka集群由多个
Broker
组成,每个Broker
存储着Topic
的分区数据,分为Leader Partition
(处理客户端读写请求)和Follower Partition
(同步Leader Partition
的数据,保证高可用); -
消费者(Consumer)流程
-
消费者组与订阅:消费者属于
Consumer Group
(如Consumer Group1
),通过subscribe
方法订阅Topic
; -
消息拉取与处理:消费者通过
poll
方法从Kafka集群的Leader Partition
拉取消息,拉取到的消息先经过Deserializer
(反序列化器,将字节数组反序列化为原始消息格式),然后进行消息处理; -
偏移量提交:消息处理完成后,消费者会向Kafka集群提交
commit
(偏移量,记录消费进度,确保消息不被重复消费或漏消费);
-
-
不需要强行记忆所有客户端属性,可通过
ProducerConfig
、ConsumerConfig
及其父类CommonClientConfig
来理解,大部分属性都有简明解释。关键是要建立消息流转模型,从高可用、高并发角度理解Kafka客户端设计,再逐步填充具体参数。
4 SpringBoot集成Kafka
-
在SpringBoot项目中,引入Maven依赖:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>
-
在
application.properties
中配置 Kafka 相关参数,例:###########【Kafka集群】########### spring.kafka.bootstrap-servers=worker1:9092,worker2:9093,worker3:9093 ###########【初始化生产者配置】########### # 重试次数 spring.kafka.producer.retries=0 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) spring.kafka.producer.acks=1 # 批量大小 spring.kafka.producer.batch-size=16384 # 提交延时 spring.kafka.producer.properties.linger.ms=0 # 生产端缓冲区大小 spring.kafka.producer.buffer-memory = 33554432 # Kafka提供的序列化和反序列化类 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer ###########【初始化消费者配置】########### # 默认的消费组ID spring.kafka.consumer.properties.group.id=defaultConsumerGroup # 是否自动提交offset spring.kafka.consumer.enable-auto-commit=true # 提交offset延时(接收到消息后多久提交offset) spring.kafka.consumer.auto-commit-interval=1000 # 当Kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:重置为分区中最小的offset; # latest:重置为分区中最新的offset(消费分区中新产生的数据); # none:只要有一个分区不存在已提交的offset,就抛出异常; spring.kafka.consumer.auto-offset-reset=latest # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作) spring.kafka.consumer.properties.session.timeout.ms=120000 # 消费请求超时时间 spring.kafka.consumer.properties.request.timeout.ms=180000 # Kafka提供的序列化和反序列化类 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
-
应用中使用框架注入的 KafkaTemplate 发送消息,例:
@RestController public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;// 发送消息@GetMapping("/kafka/normal/{message}")public void sendMessage1(@PathVariable("message") String normalMessage) {kafkaTemplate.send("topic1", normalMessage);} }
-
使用
@KafkaListener
注解声明消息消费者,例:@Component public class KafkaConsumer {// 消费监听@KafkaListener(topics = {"topic1"})public void onMessage1(ConsumerRecord<?, ?> record){// 消费的哪个topic、partition的消息,打印出消息内容System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());} }