当前位置: 首页 > news >正文

【源码剖析】4-生产者-KafkaProducer分析

消息发送流程

上一章节kafka了解了KafkaProducer的基本使用,本节开始深入分析KafkaProducer的原理和实现,KafkaProducer的发送流程如下:

  1. ProducerInterceptors对消息进行拦截
  2. Serializer对消息的key和value进行序列化
  3. Parititioner为消息选择合适的Partition
  4. RecordAccumulator收集消息,实现批量发送
  5. Sender从RecordAccumulator获取消息
  6. 构造ClientRequest
  7. 将ClientReuquest交给NetworkClient,准备发送
  8. NetworkClient将请求放入KafkaChannel的缓存
  9. 执行网络IO,发送请求
  10. 收到响应,调用ClientRequest的回调函数
  11. 调用RecordBatch的回调函数,最终调用每个消息上注册的回调函数

消息发送过程中涉及两个线程的操作:

  • 主线程将业务数据进行封装,然后调用send方法将消息放入RecordAccumulator中暂存
  • Sender线程将消息构成请求,并最终执行网络IO请求

Send方法的主要步骤如下:

  1. 调用ProducerInterceptors.onSend方法,通过ProducerInterceptor对消息进行拦截或者修改
  2. 调用waitOnMetaData方法获取Kafka集群的信息,底层会唤醒Send线程更新metadata中保存的Kafka集群元数据
  3. 调用serialize对key和value进行序列化
  4. 调用partitioon方法为消息选择合适的分区
  5. 调用RecordAccumulator.append方法,将消息追加到RecordAccumulator中
  6. 唤醒Sender线程,由sender线程将RecordAccumulator中缓存的消息发送出去

ProducerInterceptors&ProducerInterceptor

Producerinterceptors是一个Producerinteceptor集合,其onSend方法、onAcknowledgement方法、onSendError方法,实际上是循环调用其封装的ProducerInterceptor集合的对应方法。ProducerInterceptor可以在发送前对消息进行拦截,也可以先于用于的callback,对ack响应进行预处理。

Kafka集群元数据

在前面Kafka消息发送的示例中,只指定了topic的名称,并未明确指定分区编号,但是在发送消息时,需要知道topic的分区数量,经过路由后确定目标分区,之后通过服务器地址、端口等信息才能建立链接,将消息发送到Kafka中,因此,在Kafka中维护了集群元数据:在topic中有几个分区,每个分区的Leader副本分配在哪个节点上,Follower副本分配在哪个节点上,哪些副本在ISR集合中以及这些节点的网络地址、端口。

在Kafka中,使用Node、TopicPartition、PartitionInfo这三个类封装了Kafka相关的元数据,并使用Cluster将其进行进一步封装:

  • Node表示一个节点,Node记录这个节点的host、ip、port等信息

    private static final Node NO_NODE = new Node(-1, "", -1);
    private final int id;
    private final String idString;
    private final String host;
    private final int port;
    private final String rack;
    
  • TopicPartition表示某Topic的一个分区,其中Topic字段是Topic的名称,Partition字段则代表此分区在Topic中的分区编号

    private int hash = 0;
    private final int partition;
    private final String topic;
    
  • PartitionInfo表示一个分区的详细信息

    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;
    
  • 这三个类在Cluster中可以完整的表示出KafkaProducer需要的元数据集群

    // kafka集群中的节点信息
    private final List<Node> nodes;// 记录了TopicPartition和PartitionInfo的映射关系
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;// 记录了Topic名称和PartitionInfo的映射关系
    private final Map<String, List<PartitionInfo>> partitionsByTopic;// Topic和PartitionInfo的映射关系,必须包括Leader副本
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;// 记录node和PartitionInfo的映射关系,可以按照节点ID查询其上分布的全部分区详情
    private final Map<Integer, List<PartitionInfo>> partitionsByNode;// brokerId与node之间的对应关系,方便按照brokerId进行索引
    private final Map<Integer, Node> nodesById;
    

Node、Cluster、TopicPartition、PartitionInfo所有字段都是private final修饰的,且只提供了查询方法,保证了其线程安全性。

MetaData中封装了Cluster对象,并保存Cluster数据的最后更新时间、版本号、是否需要更新等信息

// 两次更新请求最小时间间隔
private final long refreshBackoffMs;//元数据多久更新一次
private final long metadataExpireMs;// version表示Kakfa的版本号
private int version;//上次更新时间
private long lastRefreshMs;
private long lastSuccessfulRefreshMs;// 记录Kafka集群的元数据
private Cluster cluster;
private boolean needUpdate;// 记录了当前已知的所有Topic,在Cluster字段中记录Topic的最新元数据
private final Map<String, Long> topics;//
private final List<Listener> listeners;
private final ClusterResourceListeners clusterResourceListeners;
private boolean needMetadataForAllTopics;
private final boolean topicExpiryEnabled;

metadata方法主要是操纵上述几个字段,requestUpdate将needUpdate设为true,此时就会更新元数据信息,awaitUpdate通过版本号来确定是否同步完成,未完成则阻塞。

public synchronized int requestUpdate() {this.needUpdate = true;return this.version;
}public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {......long begin = System.currentTimeMillis();long remainingWaitMs = maxWaitMs;while (this.version <= lastVersion) {if (remainingWaitMs != 0)wait(remainingWaitMs);long elapsed = System.currentTimeMillis() - begin;if (elapsed >= maxWaitMs)throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");remainingWaitMs = maxWaitMs - elapsed;}
}

Serializer&Deserializer

客户端发送的消息都是key和value的字节数据,Serializer&Deserializer提供了将发送数据转化为字节数组的功能。

  • configure方法在序列化执行之前进行配置
  • serializer进行序列化
  • close进行关闭

Partitioner

KafkaProducer的下一步是选择分区,有的场景中由业务控制消息追加到哪个分区,有的时候业务不关心追加到哪个分区,此时会通过Partitioner的Partition()方法选择Partition。Kafka提供了Partitioner的一个默认实现DefaultPartitioner,其实现了Configurable接口。

在创建KafkaProducer时传入的key、value配置项会保存到AbstractConfig的originals字段中,其核心方法为getConfiguredInstance方法主要是通过反射机制实现originals方法中指定的类,Partitioner就是通过AbstractConfig类的getConfiguredInstance方法获取的,通过getConfiguredInstance还会返回其他配置实例。

设计Configurable接口的目的是统一反射后的初始化过程,对外提供统一的初始化接口。生成的配置类只需要通过configure方法就可以实现初始化过程。

如果没有指定分区情况下:

  • 没有消息key的话消息会根据counter对分区数取模确定分区编号
  • 有消息key的话消息会根据key进行hash,然后对分区数取模确定分区编号

文章转载自:

http://kODz3UcJ.mkyxp.cn
http://Ycwov0L3.mkyxp.cn
http://Ir5x9P7E.mkyxp.cn
http://W9Wd9d7t.mkyxp.cn
http://WfDj2lPJ.mkyxp.cn
http://OBcajQyo.mkyxp.cn
http://UIXW6jjv.mkyxp.cn
http://HJRzJSFF.mkyxp.cn
http://fRV4EpYp.mkyxp.cn
http://eJslKXNB.mkyxp.cn
http://z5vh5whc.mkyxp.cn
http://kXLrrw2H.mkyxp.cn
http://wWgWwQhs.mkyxp.cn
http://eg5VsMnC.mkyxp.cn
http://pzpPMbXT.mkyxp.cn
http://3hRd6lzk.mkyxp.cn
http://w82LFKHC.mkyxp.cn
http://gKrYdA5E.mkyxp.cn
http://qOp7c6Ot.mkyxp.cn
http://XklnOYXw.mkyxp.cn
http://Fli2fJQk.mkyxp.cn
http://m8Vin3oS.mkyxp.cn
http://UaPdCjCO.mkyxp.cn
http://ywWqCVQL.mkyxp.cn
http://363OWfWD.mkyxp.cn
http://xd072Y99.mkyxp.cn
http://LBWdQ1ES.mkyxp.cn
http://XVyvu2xf.mkyxp.cn
http://HfZLCZa5.mkyxp.cn
http://E8XMlO8T.mkyxp.cn
http://www.dtcms.com/a/378091.html

相关文章:

  • 事务方案选型全景图:金融与电商场景的实战差异与落地指南
  • 基于LSTM与3秒级Tick数据的金融时间序列预测实现
  • 第3节-使用表格数据-主键
  • 【C++练习】14.C++统计字符串中字母、数字、空格和其他字符的个数
  • ES6笔记5
  • 协议_https协议
  • 深入 Linux 文件系统:从数据存储到万物皆文件
  • 第十四届蓝桥杯青少组C++选拔赛[2023.1.15]第二部分编程题(1 、求十位数字)
  • CSS 属性概述
  • Ascend310B重构驱动run包
  • 碎片化采购是座金矿:数字化正重构电子元器件分销的价值链
  • 如何配置capacitor 打包的ios app固定竖屏展示?
  • 解锁Roo Code的强大功能:深入理解上下文提及(Context Mentions)
  • BilldDesk:基于Vue3+WebRTC+Nodejs+Electron的开源远程桌面控制
  • 上网管理行为-ISP路由部署
  • 立体校正(Stereo Rectification)的原理
  • 经营帮会员经营:全方位助力企业高效发展,解锁商业新可能
  • 无人机飞控系统原理深度解析
  • 预测赢家-区间dp
  • 2025年- H123-Lc69. x的平方根(技巧)--Java版
  • Visual Studio 2026 震撼发布!AI 智能编程时代正式来临
  • 2023年EAAI SCI1区TOP,基于差分进化的自适应圆柱矢量粒子群优化无人机路径规划,深度解析+性能实测
  • 强化学习框架Verl运行在单块Tesla P40 GPU配置策略及避坑指南
  • HTML 完整教程与实践
  • 前端开发易错易忽略的 HTML 的 lang 属性
  • html中css的四种定位方式
  • GCC 对 C 语言的扩展
  • 基于STM32的智能语音识别饮水机系统设计
  • 基于ubuntu-base制作Linux可启动镜像
  • 速通ACM省铜第一天 赋源码(The Cunning Seller (hard version))