当前位置: 首页 > news >正文

西充县住房和城乡规划建设局网站外贸公司没网站 怎么做业务

西充县住房和城乡规划建设局网站,外贸公司没网站 怎么做业务,太原建筑公司网站,广州海珠区赤岗 新港网站建设公司1 kafka消息压缩 kafka关于消息压缩的定义(来源于官网): 此为 Kafka 中端到端的块压缩功能。如果启用,数据将由 producer 压缩,以压缩格式写入服务器,并由 consumer 解压缩。压缩将提高 consumer 的吞吐量…

1 kafka消息压缩

kafka关于消息压缩的定义(来源于官网):

        此为 Kafka 中端到端的块压缩功能。如果启用,数据将由 producer 压缩,以压缩格式写入服务器,并由 consumer 解压缩。压缩将提高 consumer 的吞吐量,但需付出一定的解压成本。

        压缩就是用时间换空间,其基本理念是基于重复,将重复的片段编码为字典,字典的 key 为重复片段,value 为更短的代码,比如序列号,然后将原始内容中的片段用代码表示,达到缩短内容的效果,压缩后的内容则由字典和代码序列两部分组成。解压时根据字典和代码序列可无损地还原为原始内容。通常来讲,重复越多,压缩效果越好。比如 JSON 是 Kafka 消息中常用的序列化格式,单条消息内可能并没有多少重复片段,但如果是批量消息,则会有大量重复的字段名,批量中消息越多,则重复越多,这也是为什么 Kafka 更偏向块压缩,而不是单条消息压缩。

2 kafka的消息压缩类型对比

        目前 Kafka 共支持四种主要的压缩类型:Gzip、Snappy、Lz4 和 Zstd。关于这几种压缩的特性。

压缩类型压缩率CPU 使用率压缩速度带宽使用率
GzipHighestHighestSlowestLowest
SnappyMediumModerateModerateMedium
Lz4LowLowestFastestHighest
ZstdMediumModerateModerateMedium

        从上表可知,Snappy 在 CPU 使用率、压缩比、压缩速度和网络带宽使用率之间实现良好的平衡,我们最终也是采用的该类型进行压缩试点。这里值得一提的是,Zstd 是 Facebook 于 2016 年开源的新压缩算法,压缩率和压缩性能都不错,具有与 Snappy(Google 杰作)相似的特性,直到 Kafka 的 2.1.0 版本才引入支持。

3 何时需要压缩

        压缩是需要额外的 CPU 代价的,并且会带来一定的消息分发延迟,因而在压缩前要慎重考虑是否有必要。

  • 压缩带来的磁盘空间和带宽节省远大于额外的 CPU 代价,这样的压缩是值得的。
  • 数据量足够大且具重复性。消息压缩是批量的,低频的数据流可能都无法填满一个批量,会影响压缩比。数据重复性越高,往往压缩效果越好,例如 JSON、XML 等结构化数据;但若数据不具重复性,例如文本都是唯一的 md5 或 UUID 之类,违背了压缩的重复性前提,压缩效果可能不会理想。
  • 系统对消息分发的延迟没有严苛要求,可容忍轻微的延迟增长。

4 如何开启压缩

        Kafka 通过配置属性 compression.type 控制是否压缩。该属性在 producer 端和 broker 端各自都有一份,也就是说,我们可以选择在 producer 或 broker 端开启压缩,对应的应用场景各有不同。目前没有尝试在broker段开启压缩。

4.1 在broker端开启解压缩

        Broker 端的 compression.type 属性默认值为 producer,即直接继承 producer 端所发来消息的压缩方式,无论消息采用何种压缩或者不压缩,broker 都原样存储。、

4.1.1 broker 和 topic 两个级别

        在 broker 端的压缩配置分为两个级别:全局的 broker 级别 和 局部的 topic 级别。顾名思义,如果配置的是 broker 级别,则对于该 Kafka 集群中所有的 topic 都是生效的。但如果 topic 级别配置了自己的压缩类型,则会覆盖 broker 全局的配置,以 topic 自己配置的为准。

broker级别:要配置 broker 级别的压缩类型,可通过 configs 命令修改   compression.type  配置项取值。此处要使修改生效,是否需要重启 broker 取决于 Kafak 的版本,在 1.1.0 之前,任何配置项的改动都需要重启 broker 才生效,而从 1.1.0 版本开始,Kafka 引入了动态 broker 参数,将配置项分为三类:read-onlyper-broker 和 cluster-wide,第一类跟原来一样需重启才生效,而后面两类都是动态生效的,只是影响范围不同,关于 Kafka 动态参数,以后单开博文介绍。从 官网 可以看到,compression.type 是属于 cluster-wide 的,如果是 1.1.0 及之后的版本,则无需重启 broker。

topic级别:topic 的配置分为两部分,一部分是 topic 特有的,如 partitions 等,另一部分则是默认采用 broker 配置,但也可以覆盖。如果要定义 topic 级别的压缩,可以在 topic 创建时通过 --config 选项覆盖配置项 compression.type 的取值,命令如下:

sh bin/kafka-topics.sh \
--create \
--topic my-topic \
--replication-factor 1 \
--partitions 1 \
--config compression.type=snappy

也可以通过 configs 命令修改 topic 的 compression.type 取值,命令如下:

bin/kafka-configs.sh \
--entity-type topics \
--entity-name my-topic \
--alter \
--add-config compression.type=snappy

4.2 在 Producer 端压缩

        跟 broker 端一样,producer 端的压缩配置属性依然是 compression.type,只不过默认值和可选值有所不同。默认值为 none,表示不压缩。直接在代码层面更改 producer 的 config。但需要注意的是,改完 config 之后,需要重启 producer 端的应用程序,压缩才会生效。

代码示例如下:

public class KafkaProducerTest {public static void main(String[] args) {String brokerList = "127.0.0.1:9092";Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,"2097245");properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"gzip");KafkaProducer<String, String> producer = new KafkaProducer<>(properties);String topic = "mytestTopic1";int sizeInMb = 2; // 设置字符串大小为2MBint sizeInBytes = sizeInMb * 1024 * 1024; // 转换为字节数StringBuilder largeString = new StringBuilder(sizeInBytes);largeString.append(":");for (int i = 0; i < sizeInBytes; i++) {largeString.append("A"); // 使用大写字母"A"来构建字符串}String msg = largeString.toString();try {for (int i = 0; i < 100; i++) {String msg1 = i+msg;producer.send(new ProducerRecord<>(topic, msg1));Thread.sleep(500);}}catch (Exception e){e.printStackTrace();}}
}

上面示例特意制造了一个大字符串作为消息,测试压缩,需要注意的是,配置压缩的时候同时也需要配置消息的最大值。即:max.request.size。

5 解压缩

可能发生解压的地方依然是两处:consumer 端和 broker 端。

consumer端:consumer 端发生解压的唯一条件就是从 broker 端拉取到的消息是带压缩的。此时,consumer 会根据 recordBatch 中 compressionType 来对消息进行解压。

broker端:broker 端是否发生解压取决于 producer 发过来的批量消息 recordBatch 是否是压缩的:如果 producer 开启了压缩,则会发生解压,否则不会。原因简单说下,在 broker 端持久化消息前,会对消息做各种验证,此时必然会迭代 recordBatch,而在迭代的过程中,会直接采用 recordBatch 上的 compressionType 对消息字节流进行处理,是否解压取决于 compressionType 是否是压缩类型。关于这点,可以在   LogValidator 的 validateMessagesAndAssignOffsets 方法实现中可以看到,在 convertAndAssignOffsetsNonCompressedassignOffsetsNonCompressed  和 validateMessagesAndAssignOffsetsCompressed 三个不同的分支中,都会看到 records.batches.forEach {...} 的身影,而在后面的源码分析中会发现,在 recordBatch 的迭代器逻辑中,直接采用的 compressionType 的解压逻辑对消息字节流读取的。也就是说,如果 recordBatch 是压缩的 ,只要对其进行了迭代访问,则会自动触发解压逻辑。

通俗一点讲:producer端配置了压缩,consumer自动解压缩

http://www.dtcms.com/a/497481.html

相关文章:

  • 百度联盟做网站赚钱创意做网站公司
  • 会计公司网站模板下载网站建设构架
  • 可以做问卷挣钱的网站平度好的建设网站
  • 网站建设遇到哪些问题门户网站方案
  • 对战平台网站怎么建设网站内链是什么 怎么做
  • 建设网站为什么要虚拟主机黑龙江门户网站建设
  • 关于旅游的网站建设论文网站设计app
  • 百度seo网站排名广州十大网站建设
  • 网站开发与电子商务哈尔滨网站开发建设公司
  • pc站转换手机网站品牌建设全过程
  • 给自己的公司做网站怎么做好wordpress服务器出错
  • 网站数据库特点wordpress邀请 返佣
  • 苏州吴中区建设局网站怎样登录韵网网站
  • 电子商务网站的建设和维护论文活动策划ppt
  • wordpress网站被黑了广州学网站开发
  • 凡科建站官网网站模板手机网页制作app
  • 专门做布料的网站深圳龙华 网站建设
  • 天津建设工程合同备案网站最简单的网站建设
  • 企业网站的基本形式不包括网页设计作业怎么打包
  • 桐城建设规划局网站硬件开发和软件开发
  • 主流门户网站有哪些设计君
  • 自己怎样优化网站外贸展示网站多少钱
  • 番禺网站建设公司有哪些制作网站网页域名的公司
  • 萍乡做网站芜湖建设网站
  • 网站框架设计好后怎么做查询工程建设项目的网站
  • 长春商城网站制作网站怎么找
  • 做弹幕视频效果的网站wordpress 搜索排名
  • 北斗B3四阵元抗干扰天线的实现
  • 网站设计美工排版编辑做计算机模拟ie题模拟网站打不开
  • 九江建网站wordpress管理密码忘记