当前位置: 首页 > news >正文

Kafka客户端参数(一)

本文为个人学习笔记整理,仅供交流参考,非专业教学资料,内容请自行甄别。

文章目录

  • 概述
  • 一、消费者分组消费机制
  • 二、生产者拦截器机制
  • 三、消息序列化机制
  • 四、消息分区路由机制


概述

  在Kafka的客户端中,无论是消费者还是生产者,都有很多的参数,无论是通过Properties去put,还是整合Spring Boot在配置文件中配置,都需要对重要的参数进行理解。

一、消费者分组消费机制

  首先看一下源码中的解释:
在这里插入图片描述
  GROUP_ID_CONFIG参数,代表着标识此消费者所属的消费者组的唯一字符串。如果消费者使用subscribe(topic)或基于kafka的offset管理策略来使用组管理功能,则需要此属性。该参数的含义:

  • 每个消费者组在 Kafka 集群中必须有唯一的 group.id。不同组的消费者相互独立,互不干扰。例如,若两个消费者实例使用相同的 group.id,它们会被视为同一组的成员,共同消费主题的分区;而不同 group.id 的消费者组可以各自完整地消费同一主题的所有消息,实现类似 “广播” 的效果。
  • 它也是offset管理的核心参数。Kafka服务端通过 group.id 在内部主题 __consumer_offsets 中存储每个分区的Offset。消费者重启或故障恢复时,会根据 group.id 读取最后提交的 Offset,确保从断点继续消费,避免消息丢失或重复。

  在上一篇中有提到过,为了减少Kafka服务器偏移量管理的复杂度,故将偏移量的提交操作,交给客户端自己去处理。但是因为客户端的不确定性,所以服务器还是保有了兜底方案,即自动提交,需要在消费者进行设置enable.auto.commit=true。设置了该参数,消费者定期自动提交 Offset,依赖 group.id 定位存储位置。但是若未配置 group.id,自动提交会因无法定位消费者组而报错

二、生产者拦截器机制

  如果希望生产者在将消息发送到Kafka之前 ,能对消息进行一些特殊处理,也就是实现拦截器的效果,可以使用配置INTERCEPTOR_CLASSES_CONFIG
在这里插入图片描述
  自定义一个类,实现ProducerInterceptor接口:

public class MyProducerInterceptor implements ProducerInterceptor {/*** 发送消息时触发** @param record the record from client or the record returned by the previous interceptor in the chain of interceptors.* @return*/@Overridepublic ProducerRecord onSend(ProducerRecord record) {return record;}/*** 收到服务器响应触发*/@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}/*** 连接关闭时触发*/@Overridepublic void close() {}/*** 配置项处理** @param configs*/@Overridepublic void configure(Map<String, ?> configs) {System.out.println("配置项处理");for (Map.Entry<String, ?> stringEntry : configs.entrySet()) {String key = stringEntry.getKey();Object value = stringEntry.getValue();System.out.println(key + ":" + value);}}
}

  在生产者的配置中指定。或者配置文件中指定

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.ragdollcat.mykafkademo.producer.MyProducerInterceptor");

三、消息序列化机制

  无论是对象,还是基本类型,在客户端到服务端之间的流转,都是通过二进制流的方式。所以对于消息需要进行序列化(生产者)和反序列化(消费者)的处理。
  生产者的序列化参数,实现Serializer接口可以自定义序列化机制。
在这里插入图片描述
  消费者的反序列化参数,实现Deserializer接口可以自定义反序列化机制。
在这里插入图片描述
  序列化和反序列化参数,都有keyvalue两种:

  • key的作用是决定消息的分区分配(Hash (key) → 分区),是可选的,如果为 null,此时消息随机分配Partition。
  • value是存储实际业务数据的值,序列化和反序列化的格式必须一模一样,不能序列化使用JDK自带的,反序列化使用JSON。
    在这里插入图片描述
      对于String类型,还可以用getBytes方法转换为byte数组,如果是java bean 就需要自己定义序列化反序列化的策略:
      假设我有一个实体类,具有以下三个属性:
public class Order {private Long orderId;private String fundName;private int total;}

  生产者端自定义序列化:

public class MySerializer implements Serializer<Order> {/*** Convert {@code data} into a byte array.** @param topic topic associated with data* @param data  typed data* @return serialized bytes*/@Overridepublic byte[] serialize(String topic, Order data) {//效率更高的自定义序列化byte[] nameBytes = data.getFundName().getBytes(StandardCharsets.UTF_8);//id: long 8byte + 4: int name的长度 +  name:string 不定长 + sex: int 4byteint cap = 8 + 4 + nameBytes.length + 4;ByteBuffer byteBuffer = ByteBuffer.allocate(cap);byteBuffer.putLong(data.getOrderId());byteBuffer.putInt(nameBytes.length);byteBuffer.put(nameBytes);byteBuffer.putInt(data.getTotal());return byteBuffer.array();}
}

  消费者端定义反序列化:

public class MyDeserializer implements Deserializer<Order> {/*** Deserialize a record value from a byte array into a value or object.** @param topic topic associated with the data* @param data  serialized bytes; may be null; implementations are recommended to handle null by returning a value or null rather than throwing an exception.* @return deserialized typed data; may be null*/@Overridepublic Order deserialize(String topic, byte[] data) {//自定义序列化ByteBuffer dataBuffer = ByteBuffer.allocate(data.length);dataBuffer.put(data);dataBuffer.flip();long id = dataBuffer.getLong();int fundName = dataBuffer.getInt();String name = new String(dataBuffer.get(data, 12, fundName).array(), StandardCharsets.UTF_8).trim();int total = dataBuffer.getInt();return new Order(id, name, total);}
}

  最后还要在消费者和生产者方进行注册(原有key的序列化不需要改动):

//消费者
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.ragdollcat.mykafkademo.consumer.MyDeserializer");
//生产者
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.ragdollcat.mykafkademo.producer.MySerializer");

  也可以使用JsonSerializer序列化机制,更加简单。但是如果追求极致的性能,还是建议自定义序列化和反序列化机制,因为JSON序列化成的字符串,有{}和""符号,对于传输性能也是有损耗的。

四、消息分区路由机制

  在消息序列化机制中,有提到过,生产者消费者,key的作用是决定消息的分区分配(Hash (key) → 分区),是可选的,如果为 null,此时消息随机分配Partition,那么是如何指定的呢?
  在生产者端有一个参数,可以通过自定义类实现Partitioner接口,指定生产者将消息发送到哪一个Partitioner:
在这里插入图片描述
  Partitioner有三个默认的实现类:
在这里插入图片描述
  生产者一般使用的最多的是RoundRobinPartitioner策略,即负载均衡。
  在消费者端,也可以指定从分区读取的策略:
在这里插入图片描述
  可选项有:

  • RangeAssignor:按Topic分配分区,也是默认的方式。比如一个Topic有10个Partition,一个消费者组下有10个消费者。则0~3给第一个消费者,4 ~ 6给第二个消费者,7 ~9给第三个消费者。
  • RoundRobinAssignor:以负载均衡的方式分区,如0,3,6给第一个消费者,1,4,7给第二个消费者,2,5,8给第三个消费者。
  • StickyAssignor:粘性分区策略,为了解决以上两种分区策略的痛点,RangeAssignor可能存在的问题是,5 个分区分给 2 个消费者,可能一个分 3 个、一个分 2 个。如果是RoundRobinAssignor策略,在新增或者下线一个消费者的情况下,所有的分区都会换消费者,也就是再平衡。StickyAssignor的核心目标是,在发生再平衡时,尽可能不去修改原有的分配,以最小的改动达成目的。
http://www.dtcms.com/a/617992.html

相关文章:

  • 用 Rust 从零开发一个隐写工具
  • 建设营销型网站的优势顺的品牌网站建设
  • 团雾、结冰、大风——高速公路的“隐形杀手”:智慧气象预警如何为您的路网安全保驾护航
  • PC 端常用 UI 组件库
  • 工业制品网站建设建程网是干嘛的
  • CS144 知识笔记二
  • 化妆品网站源码asp个人网站做淘宝客商城
  • 实战:使用 Python(Requests/Scrapy)接入京东商品详情 API 并解析数据结构
  • python学生成绩登记系统软件测试报告
  • GitHub Actions 和 GitLab CI/CD
  • 【Linux日新月异(七)】CentOS 7磁盘资源管理深度指南:从分区到性能调优
  • 企业网站pv是什么佛山推广优化公司
  • 机器学习常见问题之numpy维度问题
  • Redis 原理与实验
  • 网站开发职责与要求软件开发专业好吗
  • 【Linux驱动开发】Linux 设备驱动中的总线机制
  • 电压基准芯片详解:从原理到选型,附 TLV431 应用解析
  • 住房和城乡建设部网站监理工程师网站发送邮件功能
  • 开发第一个python程序
  • obet(Oracle Block Editor Tool)第二版发布
  • 【gas优化】2.11 Calldata 替换 Memory
  • 深度学习周报(11.10~11.16)
  • 阿里云建站论坛网站区块链网站建设方案
  • 李宏毅NLP-14-NLP任务
  • 惠普LaserJet Pro MFP M126a如何打印自检页
  • 南京大学cpp复习——面向对象第一部分(构造函数,拷贝构造函数,析构函数,移动构造函数,友元)
  • Stream 流核心速查表
  • 网站建设设计服务公司优化大师绿色版
  • STM32通信协议学习--I2C通信(了解)
  • 【技术选型】Go后台框架选型