当前位置: 首页 > news >正文

【学习笔记】kafka权威指南——第3章 kafka生产者—向kafka写入数据

第3章 kafka生产者—向kafka写入数据

3.1 生产者概览

各生产者的需求是不同的,包括延迟、吞吐量、是否允许丢失与重复等。生产者发送的是一个ProducerRecord对象,该对象包含目标主题和发送内容,键与分区可选。
ProducerRecord–>序列化器–>分区器(已指定不修改,未指定根据键分区)
然后该消息被添加到一个记录批次里,有独立的线程负责将记录批次发送到相应的broker,发送失败会重试,重试失败会抛出异常

3.2 创建kafka生产者

3个必选属性:
1>bootstrap.servers
生产者指定broker的地址清单,不必包含所有的,生产者会从给定的broker里查找到其他的broker信息,不过应至少提供两个broker信息,以防止其中一个宕机
2>key.serializer与value.serializer
broker期望收到字节数组,而生产者的key与value一般为Java对象,此时可指定实现了serializer接口的对象作key与value
创建生产者:设置配置参数及指定K,V类型

发送消息的三种方式:
1>发送并忘记:发送消息且不关心是否到达
2>同步发送:使用send()发送,返回Future对象,可调用get()方法等待发送结果的返回
3>异步发送:send()方法指定回调函数,再返回响应时调用,可以调用多个send()而不关心回调函数是否执行

3.3 发送消息到kafka

生产者的send方法将ProducerRecord作为参数发送,消息先进入缓冲区,再由单独线程发送,send方法可以有返回值,也有异常。

3.3.1 同步发送消息

send().get()方法可以得到RecordMetadata对象,可以获得偏移量等信息

3.3.2 异步发送消息

使用回调函数,DemoProducerCallback

3.4 生产者的配置

1> acks:必须由多少分区副本收到消息
0:不等待 1:首领收到即可 all:全副本收到才返回
2>buffer.memory:生产者内存缓冲区大小,当生产消息速度大于发送服务端速度,则阻塞或抛异常
3>compression.type:消息压缩方式,默认不压缩
snappy:性能好,gzip:占用cpu多,但压缩比高
4>reties:临时性错误重试次数
retry.backoff.ms:重试每次间隔,默认为100ms
5>batch.size:多个消息要发到同一个分区时,会放在一个批次缓冲发送。并不是满了才发
6>linger.ms:等待消息加入批次的时间,增加每条消息发送延迟,增大吞吐量
7>client.id:任意字符串,消息来源
8>max.in.flight.requests.pre.connection:在收到服务器响应之前可以发送多少消息,1表示顺序发送
9>timeout.ms:分区副本等待响应时间
request.timeout.ms:等待响应返回时间
metadata.fetch.timeout.ms:生产者获取元数据时间
10>max.block,ms:生产者发送缓冲区已满,或没有可用的元数据时阻塞的时间,到时后抛出异常
11>max.request.size:生产者发送一批次消息的最大值,broker对可接受消息大小也有限制配置
12>receive.buffer.byte与send.buffer.byte:
TCP接收和发送数据包缓冲区大小,为-1时使用操作系统默认值。跨数据中心时一般表现为高延迟,低带宽,可以适当增加以上两值。
【kafka分区消息在retries为0时不重试,与max.in.flight.request.per.connection为1,服务器响应前只发送一条消息,两者满足其1,分区消息才是严格顺序的】

3.5 序列化器

3.5.1 自定义序列化器

实现serializer接口
实现将Key/Value对象映射成byte数组的方法

3.5.2 使用Auro序列化

Key/Value.serializer指定kafkaAuroSerializer
【schema.registry.url指定schema地址可以使用schema的参数生成对象,而不用new对象】

3.6 分区

键可以作为值的附加消息,但一般情况下用户来做分区。
【键相同的消息会被写入同一个分区】
【默认的分区策略在key为null时轮询,在key有值时使用hash分区,同一个键一定在同一个分区中,但如果分区数改变了,那么同一个键不一定在之前的分区中】(这边如果发送消息时指定了分区,同一个key应该也可能发送到不同的分区中)
自定义分区:实现Partitioner接口,实现partition方法

http://www.dtcms.com/a/445606.html

相关文章:

  • 广州微信网站建设价格WordPress显示403
  • 机器学习16:自监督式学习(Self-Supervised Learning)②
  • MySQL+keepalived主主复制
  • 深入理解操作系统中的线程
  • 栈:每日温度
  • 从普通用户到AI专家:掌握“专家指南模板”,获取可复现、深度且精确的AI专业反馈
  • Photoshop调色
  • Google Jules Tools —— 开发者的新助手
  • 做方案的网站同城58找房子租房信息
  • 【LeetCode hot100|Week5】链表2
  • 刘家窑网站建设公司附近哪有学编程的地方
  • MP偏振相机在工业视觉检测中的应用
  • 安全初级(二)HTTP
  • 数组算法精讲:从入门到实战
  • 对文件的输入和输出
  • CSS3 过渡
  • 手机网站代码asp做网站策划书
  • macOS sequoia 15.7.1 源码安装node14,并加入nvm管理教程
  • LabVIEW利用DataSocket读取OPC 服务器数据
  • 第十章:外观模式 - 复杂系统的简化大师
  • 【数据结构】顺序栈的基本操作
  • 哈尔滨网站开发企业网站一直维护意味着什么
  • 第4集:配置管理的艺术:环境变量、多环境配置与安全实践
  • soular入门到实战(2) - 如何统一管理TikLab帐号体系
  • C语言进阶知识--指针(3)
  • M-LLM Based Video Frame Selection for Efficient Video Understanding论文阅读
  • 福州建设高端网站wordpress中控制图片标签
  • Prometheus 05-01: 告警规则与Alertmanager配置
  • 【Linux】Mysql的基本文件组成和配置
  • 简单易用!NAS+Leantime,开源轻量级项目管理,高效协作一键开启