当前位置: 首页 > news >正文

Kafka数据生产和发送

目录

前言

创建要发送数据

创建生产者对象

生产者主线程

数据生产者

数据收集器

数据发送器

应答处理级别

数据重试机制

幂等性

事务


前言

前两篇文章中主要介绍了Kafka的基本使用,这篇文章主要介绍Kafka数据的生产和发送的相关细节介绍

创建要发送数据

当我们把主题创建好之后,就可以进行数据生产和发送了,首先在Kafka中,需要先创建要发送的数据,我们称为消息或记录,在发送数据之前,需要将数据封装为指定的数据模型。

相关属性必须在构建数据模型时指定,其中主题和value的值是必须要传递的。如果配置中开启了自动创建主题,那么Topic主题可以不存在。value就是我们需要真正传递的数据了,而Key可以用于数据的分区定位。

创建生产者对象

据配置信息创建生产者对象,通过这个生产者对象向Kafka服务器节点发送数据,而具体的发送是由生产者对象创建时,内部构建的多个组件实现的,多个组件的关系有点类似于生产者消费者模式。

生产者主线程

数据生产者

生产者对象,用于对我们的数据进行必要的转换和处理,将处理后的数据放入到数据收集器中,类似于生产者消费者模式下的生产者。下面为内部的数据转换处理

拦截器

生产者API在数据准备好发送给Kafka服务器之前,允许我们对生产的数据进行统一的处理,比如校验,整合数据等等

public class ProducerTestInterceptor {public static void main(String[] args) {//  配置属性集合Map<String, Object> configMap = new HashMap<>();//  配置属性:Kafka服务器集群地址configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//  配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//        配置value拦截器configMap.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ValueInterceptorTest.class.getName());//  创建Kafka生产者对象,建立Kafka连接//      构造对象时,需要传递配置参数KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);//  准备数据,定义泛型//      构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);// TODO 生产(发送)数据producer.send(record);}//  关闭生产者连接producer.close();}}
//生产者发送数据时的自定义拦截器
public class ValueInterceptorTest implements ProducerInterceptor<String,String> {@Override
//    发送数据的时候,会调用此方法public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {return new ProducerRecord<String,String>(producerRecord.topic(),producerRecord.key(),producerRecord.value() + producerRecord.value());}@Override
//   数据发送完成后,服务器返回响应,就会调用此方法public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {}@Override
//    生产者对象关闭的时候,就会调用此方法public void close() {}@Override
//    创建生产者对象的时候调用public void configure(Map<String, ?> map) {}
}

分区器

Kafka中Topic是对数据逻辑上的分类,而Partition才是数据真正存储的物理位置。所以在生产数据时,如果只是指定Topic的名称,其实Kafka是不知道将数据发送到哪一个Broker节点的。我们可以在构建数据传递Topic参数的同时,也可以指定数据存储的分区编号。

指定分区传递数据是没有任何问题的。Kafka会进行基本简单的校验,比如是否为空,是否小于0之类的,但是你的分区是否存在就无法判断了,所以需要从Kafka中获取集群元数据信息,此时会因为长时间获取不到元数据信息而出现超时异常。所以如果不能确定分区编号范围的情况,不指定分区还是一个不错的选择。

(1)    如果指定了分区,直接使用

(2)    如果指定了自己的分区器,通过分区器计算分区编号,如果有效,直接使用

(3)    如果指定了数据Key,且使用Key选择分区的场合,采用murmur2非加密散列算法(类似于hash)计算数据Key序列化后的值的散列值,然后对主题分区数量模运算取余,最后的结果就是分区编号

(4)如果未指定数据Key,或不使用Key选择分区,那么Kafka会采用优化后的粘性分区策略进行分区选择:

public class MyKafkaPartition {public static void main(String[] args) {//  配置属性集合Map<String, Object> configMap = new HashMap<>();//  配置属性:Kafka服务器集群地址configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//  配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());configMap.put(ProducerConfig.ACKS_CONFIG,"-1");  // -1同步 0异步  1:leader节点保存数据之后,就返回发送成功的回调方法
//        配置自定义分区器configMap.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyKafkaPartitioner.class.getName());//  创建Kafka生产者对象,建立Kafka连接//      构造对象时,需要传递配置参数KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);//  准备数据,定义泛型//      构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);// TODO 生产(发送)数据  异步发送producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println("数据发送成功");}});System.out.println("消息发生发送成功");}//  关闭生产者连接producer.close();}}
// 自定义分区器  实现 Partitioner接口 重写方法
public class MyKafkaPartitioner implements Partitioner {@Override
//    该方法为计算具体分区的方法 这里我们直接返回0分区public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {return 0;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

生产者主线程总结:

如果配置拦截器栈(interceptor.classes),那么将数据进行拦截处理。某一个拦截器出现异常并不会影响后续的拦截器处理。

因为发送的数据为KV数据,所以需要根据配置信息中的序列化对象对数据中Key和Value分别进行序列化处理。

计算数据所发送的分区位置。

将数据追加到数据收集器中。

数据收集器

用于收集,转换我们产生的数据,类似于生产者消费者模式下的缓冲区。为了优化数据的传输,Kafka并不是生产一条数据就向Broker发送一条数据,而是通过合并单条消息,进行批量(批次)发送,提高吞吐量,减少带宽消耗。

默认情况下,一个发送批次的数据容量为16K,这个可以通过参数batch.size进行改善

批次是和分区进行绑定的。也就是说发往同一个分区的数据会进行合并,形成一个批次

如果当前批次能容纳数据,那么直接将数据追加到批次中即可,如果不能容纳数据,那么会产生新的批次放入到当前分区的批次队列中,这个队列使用的是Java的双端队列Deque。旧的批次关闭不再接收新的数据,等待发送

数据发送器

是一个线程对象,用于从收集器对象中获取数据,向服务节点发送。类似于生产者消费者模式下的消费者。因为是线程对象,所以启动后会不断轮询获取数据收集器中已经关闭的批次数据。对批次进行整合后再发送到Broker节点中

因为数据真正发送的地方是Broker节点,不是分区。所以需要将从数据收集器中收集到的批次数据按照可用Broker节点重新组合成List集合。

将组合后的<节点,List<批次>>的数据封装成客户端请求(请求键为:Produce)发送到网络客户端对象的缓冲区,由网络客户端对象通过网络发送给Broker节点。

Broker节点获取客户端请求,并根据请求键进行后续的数据处理:向分区中增加数据

经过上述的处理,数据将成功到达Broker中。

应答处理级别

在Kafka中数据发送到broker中,应答处理级别主要有下面几种:

ACKS= -1   同步发送

这种应答处理的方式是kafka的默认应答处理的级别。

这个发送的方式为发送之后,需要等待所有的副本都同步之后,才会返回发送数据成功的回调,这里的副本不是分区中的所有副本,而是同步数据列表中的所有副本

ACKS = 0  异步发送

这个发送方式为,只要发送者线程将数据交给网络发送的客户端,不在关心网络发送成功与否,将直接返回发送数据成功的回调

ACKS = 1

这个方式均衡了上述两种方式的优点和缺点,具体为当数据成功通过网络发送到了Broker的leader节点上,并且成功保存到了磁盘上的时候,就会返回发送数据成功的回调,具体配置可以在属性集合中进行配置

configMap.put(ProducerConfig.ACKS_CONFIG,"0");// -1 同步 0 异步 1 leader节点保存数据之后,就返回发送成功的回调方法

数据重试机制

当生产者的Broker在发送数据的时候,通过网络成功的将数据发送到了Broker的leader节点,此时leader节点在将数据写入磁盘的时候,出现了故障,比如当前Broker的leader节点重启或者挂掉了,此时数据是发送失败的,因为节点故障,导致无法返回ack(应答处理机制),此时生产者的Broker就会等待,当超过超时时间时(默认30s),就会将从缓冲区中拿到当前发送失败的数据,进行重试,重试的次数为int类型的最大值。

上述数据重试机制存在的问题:

数据重复

就是假如此时生产者通过网络将数据成功的发送到了Broker的leader节点,leader节点也成功的将数据保存在磁盘上,但是当返回应答的时候当前Broker的leader节点出现了故障,导致无法返回应答,此时生产者的Broker由于没有拿到应答,也会认为是数据发送失败,就会进行重试,当Broker的leader节点恢复正常之后,数据也会通过重试机制发送到Broker的leader节点上,此时leader节点会将数据再次保存一次到磁盘上,此时就会出现数据重复问题。

数据乱序

生产者的Broker同时发送了三个数据,(最大可以同时处理5个数据:在途请求缓冲区),将数据发送到了Broker 的leader节点上的时候,此时第一个数据应答失败,后面两个数据应答成功,此时生成者的Broker就会将第一个数据进行重试机制发送,当重试发送成功之后,此时第一个数据就会在前两个数据的后面,此时数据就会出现乱序问题。

幂等性

为了解决上述数据重试的问题,Kakfa引入幂等性进行处理,具体如下:

所谓的幂等性,就是生产同样的一条数据,无论向Kafka发送多少次,kafka都只会存储一条。注意,这里的同样的一条数据,指的不是内容一致的数据,而是指的不断重试的数据。

通过幂等性就能解决上述的两个问题。

幂等性要求:

ack应答机制为 -1,需要开启重试,需要在途请求缓冲区的数量必须为5

默认幂等性是不起作用的,所以如果想要使用幂等性操作,只需要在生产者对象的配置中开启幂等性配置即可。

Map<String, Object> configMap = new HashMap<>();
//  配置属性:Kafka服务器集群地址
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//  配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作
configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
configMap.put(ProducerConfig.ACKS_CONFIG,"-1");  // -1 表示所有ISR(副本)节点都收到消息才确认
configMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); // 幂等性  要求我们的ack应答机制为 -1,同时需要开启重试,同时需要在途请求缓冲区的数量必须为5
configMap.put(ProducerConfig.RETRIES_CONFIG,3);  // 重试次数 默认为int 类型的最大值
configMap.put(ProducerConfig.BATCH_SIZE_CONFIG,5); // 批次大小 在途请求缓冲区
configMap.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000); // 请求超时时间

Kakfa实现幂等性

给消息添加标记:生产者ID + 数据顺序号(这里的数据顺序号只能针对一个分区起作用,也就是幂等性操作只能对同一个分区起作用)

kafka会记录每一个分区的生产者的生产状态。

采用队列的方式缓存最近的5个数据,队列中的数据按照数据顺序号进行升序排序,如果Borker当前新的请求数据在缓存的5个旧的中存在相同的(根据生产者id和数据顺序号进行判断是否相同),那么说明有重复,当前数据不做任何处理。

如果没有重复,那么判断当前新的请求批次的序列号是否为缓存的最后一个批次的序列号加1,如果是,说明是连续的,顺序没乱,那么更新缓冲区中的批次数据。将当前的批次放置再队列的结尾,将队列的第一个移除,保证队列中缓冲的数据最多5个。

如果不是连续的,说明数据的顺序已经乱了,将通过生产者Broker进行重试,重试前,需要在缓冲区中将数据重新排序,保证正确的顺序后。再进行重试。

事务

上述幂等性只能保证单分区的数据不会重复和乱序,不能处理多分区情况下的数据重复和乱序,对于这个问题,Kafka采用事务的方式进行处理。

kafka采用事务的方式解决跨会话的幂等性。基本的原理就是通过事务功能管理生产者ID,保证事务开启后,生产者对象总能获取一致的生产者ID。

public static void main(String[] args) {//  配置属性集合Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());configMap.put(ProducerConfig.ACKS_CONFIG,"-1");  // -1 表示所有ISR(副本)节点都收到消息才确认configMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); // 幂等性  configMap.put(ProducerConfig.RETRIES_CONFIG,3);  // 重试次数 configMap.put(ProducerConfig.BATCH_SIZE_CONFIG,5); // 批次大小 configMap.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000); // 请求超时时间configMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my_tx_id"); // 事务idKafkaProducer<String, String> producer = new KafkaProducer<>(configMap);producer.initTransactions(); // 初始化事务try {producer.beginTransaction(); // 开启事务for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);producer.send(record);}producer.commitTransaction(); // 提交事务} catch (Exception e) {e.printStackTrace();producer.abortTransaction(); // 终止事务}finally {producer.close();//  关闭生产者连接}
}

为了实现事务,Kafka引入了事务协调器(TransactionCoodinator)负责事务的处理,所有的事务逻辑包括分派PID等都是由TransactionCoodinator负责实施的。TransactionCoodinator 会将事务状态持久化到该主题中。

事务基本的实现思路就是通过配置的事务ID,将生产者ID进行绑定,然后存储在Kafka专门管理事务的内部主题__transaction_state中,而内部主题的操作是由事务协调器(TransactionCoodinator)对象完成的,这个协调器对象有点类似于咱们数据发送时的那个副本Leader。

事务的提交流程

Kafka中的事务是分布式事务,所以采用的也是二阶段提交

第一个阶段提交事务协调器会告诉生产者事务已经提交了,所以也称之预提交操作,事务协调器会修改事务为预提交状态

第二个阶段提交事务协调器会向分区Leader节点中发送数据标记,通知Broker事务已经提交,然后事务协调器会修改事务为完成提交状态

特殊情况下,事务已经提交成功,但还是读取不到数据,那是因为当前提交成功只是一阶段提交成功,事务协调器会继续向各个Partition发送marker信息,此操作会无限重试,直至成功。但是不同的Broker可能无法全部同时接收到marker信息,此时有的Broker上的数据还是无法访问,这也是正常的,因为kafka的事务不能保证强一致性,只能保证最终数据的一致性,无法保证中间的数据是一致的。不过对于常规的场景这里已经够用了,事务协调器会不遗余力的重试,直至成功。

http://www.dtcms.com/a/319440.html

相关文章:

  • RuoYi OpenAPI集成从单体到微服务改造全过程记录
  • 高速公路安装定向广播的优势
  • centos VMware ESXi 扩容
  • 为什么任务顺序会影响效率?如何实现自定义顺序?
  • Python 基础详解:数据类型(Data Types)—— 程序的“数据基石”
  • Fiddler 安装配置教程
  • 认识汇编:解码计算机思维的底层语言(第一章)
  • 【YOLO学习笔记】YOLOv8详解解读
  • WEB开发-第二十七天(PHP篇)
  • 【Unity Plugins】使用ULipSync插件实现人物唇形模拟
  • 基于Spring Cloud Stream与Kafka的事件驱动微服务架构设计与实战指南
  • 【Python】基于Python自动化邮件发送系统:从配置到实现的完整指南
  • 【YOLOv8改进 - C2f融合】C2f融合SFS-Conv(空间 - 频率选择卷积)提升特征多样性,同时减少参数和计算量
  • 如何在 VS Code 中进行 `cherry-pick`
  • 使用Python验证常见的50个正则表达式
  • react接口防抖处理
  • [网格图DP]3363. 最多可收集的水果数目
  • 视频二维码如何助力博物馆打造智慧讲解体验
  • 数据库事务总结
  • 升级g++编译器
  • RK3568项目(十二)--linux驱动开发之基础通讯接口(上)
  • 时序数据库的功能与应用价值
  • RPC 解析
  • Renesas Electronics RZ/V2N 评估套件
  • 从密钥生成到功能限制:Electron 中 secure-electron-license-keys 的完整集成方案
  • Spring Cloud系列—LoadBalance负载均衡
  • 5分钟了解OpenCV
  • 用 Enigma Virtual Box 把 Qt 程序压成单文件 EXE——从编译、收集依赖到一键封包
  • 大数据spark、hasdoop 深度学习、机器学习算法的音乐平台用户情感分析系统设计与实现
  • 多线程 future.get()的线程阻塞是什么意思?