当前位置: 首页 > wzjs >正文

网站备案办理头条今日头条新闻

网站备案办理,头条今日头条新闻,十大免费网站免费下载软件,为什么要用模板建站?欢迎来到啾啾的博客🐱。 记录学习点滴。分享工作思考和实用技巧,偶尔也分享一些杂谈💬。 有很多很多不足的地方,欢迎评论交流,感谢您的阅读和评论😄。 目录 1 引言2 消息ProducerRecord2.1 分区器 1 引言 …

欢迎来到啾啾的博客🐱。
记录学习点滴。分享工作思考和实用技巧,偶尔也分享一些杂谈💬。
有很多很多不足的地方,欢迎评论交流,感谢您的阅读和评论😄。

目录

  • 1 引言
  • 2 消息ProducerRecord
    • 2.1 分区器

1 引言

在之前的Kafka篇章中,我们已经了解到Kafka Producer内部会有一个缓冲区,生产者通过批量发送消息的方式提升总体的吞吐。

批量发送在很多场景中都很常见,Kafka是如何实现的?
![[Kafka源码-批量消息-1.png]]
首先,我们需要理解消息。

2 消息ProducerRecord

Kafka消息的封装如下:

![[Kafka源码-批量消息-2.png]]

2.1 分区器

Kafka Producer可以指定消息发送到哪个Topic,也可以指定到哪个Partition。没有指定Partition时,Kafka Producer会使用分区器 Partitioner来决定消息应该到哪个partition。

可以看一下分区器 Partitioner 的方法
![[Kafka源码-批量消息-3.png]]

  • 参数如下:
    ProducerRecord<K, V> record:要发送的 Kafka 消息记录,包含主题、键、值、分区等信息。
    byte[] serializedKey:消息键的序列化字节数组。
    byte[] serializedValue:消息值的序列化字节数组。
    Cluster cluster:Kafka 集群的元数据信息,包含主题、分区等信息。

其中serializedKey和serializedValue是分别用了同一个序列化类的不同的序列化对象来做的:
![[Kafka源码-批量消息-5.png]]

![[Kafka源码-批量消息-4.png]]

有意思的是序列化方法传入的topic与headers信息在Kafka的默认序列化中是没有被使用的。
![[Kafka源码-批量消息-6.png]]

很显然,这两个参数topic和headers是为了序列化的扩展性预留的设计。
使用topic参数,可以为不同的topic准备不同的序列化策略,比如加密。

import org.apache.kafka.common.serialization.Serializer;
import java.nio.charset.StandardCharsets;
import java.util.Map;public class TopicAwareSerializer implements Serializer<String> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// 配置逻辑}@Overridepublic byte[] serialize(String topic, String data) {if ("sensitive-topic".equals(topic)) {// 对敏感主题的数据进行特殊处理String encryptedData = "encrypted:" + data;return encryptedData.getBytes(StandardCharsets.UTF_8);}return data.getBytes(StandardCharsets.UTF_8);}@Overridepublic void close() {// 关闭逻辑}
}

使用headers参数,比如编码格式、版本号等,也可以做一些定制化操作。

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;public class HeaderAwareSerializer implements Serializer<String> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// 配置逻辑}@Overridepublic byte[] serialize(String topic, Headers headers, String data) {Charset charset = StandardCharsets.UTF_8;if (headers.lastHeader("charset") != null) {String charsetName = new String(headers.lastHeader("charset").value(), StandardCharsets.UTF_8);charset = Charset.forName(charsetName);}return data.getBytes(charset);}@Overridepublic byte[] serialize(String topic, String data) {return serialize(topic, null, data);}@Overridepublic void close() {// 关闭逻辑}
}

需要注意的是,key|value.serializer都必须被设置为实现了org.apache.kafka.common.serialization.Serializer接口的类。

没有指定分区partition时,分区器partitioner.partition方法有三种实现。Kafka 在 Producer 中默认(不配置分区器)使用的是 DefaultPartitioner。你可以在创建 KafkaProducer 时通过配置 partitioner.class 属性来指定使用的分区器,若不指定,就会使用默认的 DefaultPartitioner。
![[Kafka源码-批量消息-7.png]]

  • DefaultPartitioner
    用途:这是 Kafka 默认的分区器。当消息有键(key)时,会使用键的哈希值对分区数取模来确定分区;当消息没有键时,会使用粘性分区(Sticky Partitioning)策略,在一段时间内将消息发送到同一个分区,以提高批量发送效率。
    使用场景:大多数常规业务场景,无需特殊分区策略时使用。
    ![[Kafka源码-批量消息-8.png]]

  • RoundRobinPartitioner
    用途:采用轮询的方式依次将消息发送到各个分区,确保消息均匀分布在所有分区上。
    使用场景:需要消息均匀分布,且对消息顺序没有严格要求的场景。
    ![[Kafka源码-批量消息-9.png]]

  • UniformStickyPartitioner
    用途:随机选择一个分区,并在一段时间内将消息都发送到该分区,以此减少请求数量,提高吞吐量。
    使用场景:对吞吐量要求较高,且对消息顺序没有严格要求的场景。

所以要指定好Partition。

http://www.dtcms.com/wzjs/87300.html

相关文章:

  • 建设银行网站维修图片百度推广托管
  • 开网站建设个人如何做seo推广
  • 中国最大的网络公司排名南京seo优化公司
  • 紫金建设公司官网整站优化外包服务
  • 官网搭建 杭州seo视频网页入口网站推广
  • 网站建设项目确认书百度品牌专区
  • 网站开发技术现状百度推广培训机构
  • 怎么知道网站有没有做301重定向怎么把自己的网站发布到网上
  • 我做彩票网站开发彩票网站搭建百度站长资源平台
  • 中山智能设备网站建设最近的新闻热点
  • 给公司网站设计一元手游平台app
  • 做问卷比较好的网站手机创建网站免费注册
  • 好看的 网站后台模板搜索网站关键词
  • 佛山网站搭建最让顾客心动的促销活动
  • wordpress怎么用Redis快速seo关键词优化方案
  • 小米发布会ppt模板太原seo外包服务
  • 涿州建设局网签网站济南网络优化网址
  • 做民宿要给网站多少钱搜索引擎平台排名
  • 网站的制作流程百度竞价排名费用
  • 新乡网站优化公司百度账号登录
  • 网站开发一般多少钱百度信息流广告怎么收费
  • 做网页的软件做网站安庆seo
  • 网站开发思路b站推广入口在哪
  • 中国网站设计公司百度图片搜索引擎入口
  • 婚恋网站建设方案网络推广最好的网站有哪些
  • 学做网站用到哪些知识爱站网关键词挖掘工具熊猫
  • 成都网站建站推广中山排名推广
  • 网站建设代码流程百度指数分析工具
  • 企业网站如何建立经典模板网站建设
  • 网站建设服务费合同模板百度下载免费安装到桌面