【源码剖析】4-生产者-KafkaProducer分析
消息发送流程
上一章节kafka了解了KafkaProducer的基本使用,本节开始深入分析KafkaProducer的原理和实现,KafkaProducer的发送流程如下:
- ProducerInterceptors对消息进行拦截
- Serializer对消息的key和value进行序列化
- Parititioner为消息选择合适的Partition
- RecordAccumulator收集消息,实现批量发送
- Sender从RecordAccumulator获取消息
- 构造ClientRequest
- 将ClientReuquest交给NetworkClient,准备发送
- NetworkClient将请求放入KafkaChannel的缓存
- 执行网络IO,发送请求
- 收到响应,调用ClientRequest的回调函数
- 调用RecordBatch的回调函数,最终调用每个消息上注册的回调函数
消息发送过程中涉及两个线程的操作:
- 主线程将业务数据进行封装,然后调用send方法将消息放入RecordAccumulator中暂存
- Sender线程将消息构成请求,并最终执行网络IO请求
Send方法的主要步骤如下:
- 调用ProducerInterceptors.onSend方法,通过ProducerInterceptor对消息进行拦截或者修改
- 调用waitOnMetaData方法获取Kafka集群的信息,底层会唤醒Send线程更新metadata中保存的Kafka集群元数据
- 调用serialize对key和value进行序列化
- 调用partitioon方法为消息选择合适的分区
- 调用RecordAccumulator.append方法,将消息追加到RecordAccumulator中
- 唤醒Sender线程,由sender线程将RecordAccumulator中缓存的消息发送出去
ProducerInterceptors&ProducerInterceptor
Producerinterceptors是一个Producerinteceptor集合,其onSend方法、onAcknowledgement方法、onSendError方法,实际上是循环调用其封装的ProducerInterceptor集合的对应方法。ProducerInterceptor可以在发送前对消息进行拦截,也可以先于用于的callback,对ack响应进行预处理。
Kafka集群元数据
在前面Kafka消息发送的示例中,只指定了topic的名称,并未明确指定分区编号,但是在发送消息时,需要知道topic的分区数量,经过路由后确定目标分区,之后通过服务器地址、端口等信息才能建立链接,将消息发送到Kafka中,因此,在Kafka中维护了集群元数据:在topic中有几个分区,每个分区的Leader副本分配在哪个节点上,Follower副本分配在哪个节点上,哪些副本在ISR集合中以及这些节点的网络地址、端口。
在Kafka中,使用Node、TopicPartition、PartitionInfo这三个类封装了Kafka相关的元数据,并使用Cluster将其进行进一步封装:
-
Node表示一个节点,Node记录这个节点的host、ip、port等信息
private static final Node NO_NODE = new Node(-1, "", -1); private final int id; private final String idString; private final String host; private final int port; private final String rack;
-
TopicPartition表示某Topic的一个分区,其中Topic字段是Topic的名称,Partition字段则代表此分区在Topic中的分区编号
private int hash = 0; private final int partition; private final String topic;
-
PartitionInfo表示一个分区的详细信息
private final String topic; private final int partition; private final Node leader; private final Node[] replicas; private final Node[] inSyncReplicas;
-
这三个类在Cluster中可以完整的表示出KafkaProducer需要的元数据集群
// kafka集群中的节点信息 private final List<Node> nodes;// 记录了TopicPartition和PartitionInfo的映射关系 private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;// 记录了Topic名称和PartitionInfo的映射关系 private final Map<String, List<PartitionInfo>> partitionsByTopic;// Topic和PartitionInfo的映射关系,必须包括Leader副本 private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;// 记录node和PartitionInfo的映射关系,可以按照节点ID查询其上分布的全部分区详情 private final Map<Integer, List<PartitionInfo>> partitionsByNode;// brokerId与node之间的对应关系,方便按照brokerId进行索引 private final Map<Integer, Node> nodesById;
Node、Cluster、TopicPartition、PartitionInfo所有字段都是private final修饰的,且只提供了查询方法,保证了其线程安全性。
MetaData中封装了Cluster对象,并保存Cluster数据的最后更新时间、版本号、是否需要更新等信息
// 两次更新请求最小时间间隔
private final long refreshBackoffMs;//元数据多久更新一次
private final long metadataExpireMs;// version表示Kakfa的版本号
private int version;//上次更新时间
private long lastRefreshMs;
private long lastSuccessfulRefreshMs;// 记录Kafka集群的元数据
private Cluster cluster;
private boolean needUpdate;// 记录了当前已知的所有Topic,在Cluster字段中记录Topic的最新元数据
private final Map<String, Long> topics;//
private final List<Listener> listeners;
private final ClusterResourceListeners clusterResourceListeners;
private boolean needMetadataForAllTopics;
private final boolean topicExpiryEnabled;
metadata方法主要是操纵上述几个字段,requestUpdate将needUpdate设为true,此时就会更新元数据信息,awaitUpdate通过版本号来确定是否同步完成,未完成则阻塞。
public synchronized int requestUpdate() {this.needUpdate = true;return this.version;
}public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {......long begin = System.currentTimeMillis();long remainingWaitMs = maxWaitMs;while (this.version <= lastVersion) {if (remainingWaitMs != 0)wait(remainingWaitMs);long elapsed = System.currentTimeMillis() - begin;if (elapsed >= maxWaitMs)throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");remainingWaitMs = maxWaitMs - elapsed;}
}
Serializer&Deserializer
客户端发送的消息都是key和value的字节数据,Serializer&Deserializer提供了将发送数据转化为字节数组的功能。
- configure方法在序列化执行之前进行配置
- serializer进行序列化
- close进行关闭
Partitioner
KafkaProducer的下一步是选择分区,有的场景中由业务控制消息追加到哪个分区,有的时候业务不关心追加到哪个分区,此时会通过Partitioner的Partition()方法选择Partition。Kafka提供了Partitioner的一个默认实现DefaultPartitioner,其实现了Configurable接口。
在创建KafkaProducer时传入的key、value配置项会保存到AbstractConfig的originals字段中,其核心方法为getConfiguredInstance方法主要是通过反射机制实现originals方法中指定的类,Partitioner就是通过AbstractConfig类的getConfiguredInstance方法获取的,通过getConfiguredInstance还会返回其他配置实例。
设计Configurable接口的目的是统一反射后的初始化过程,对外提供统一的初始化接口。生成的配置类只需要通过configure方法就可以实现初始化过程。
如果没有指定分区情况下:
- 没有消息key的话消息会根据counter对分区数取模确定分区编号
- 有消息key的话消息会根据key进行hash,然后对分区数取模确定分区编号