当前位置: 首页 > news >正文

05 kafka 如何存储较大数据记录

前言

此问题是最近碰到的一个问题 

最近的一个项目, 单条记录的大小 超过了 1M, 然后 使用 kafka 存储数据出现了各种问题 

一些问题很容易发现问题, 但是一些问题 很隐蔽 

这里 特此记录一下, 篇幅不会很长 

 

 

测试用例 

测试用例如下, 问题主要分为两个地方, 其一是客户端这边的限制, 其二是服务器那边的限制 

其中 服务器那边的限制 出现的问题较为隐蔽 

package com.hx.test14;/*** Test10KafkaProducer** @author Jerry.X.He* @version 1.0* @date 2023-03-12 10:15*/
public class Test10KafkaProducer {// Test06KafkaProducerpublic static void main(String[] args) throws Exception {Properties properties = new Properties();properties.put("bootstrap.servers", "192.168.0.116:9092");properties.put("acks", "all");properties.put("retries", 0);properties.put("batch.size", 16384);properties.put("linger.ms", 1);properties.put("buffer.memory", 33554432);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//        properties.put("max.request.size", 10485760);String topic = "test20230312";String path = "/Users/jerry/Jobs/14_lzxm/xx/realdata.json";String content = Tools.getContent(path);KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);long start = System.currentTimeMillis();for (int i = 1; i <= 10; i++) {Future<RecordMetadata> respFuture = kafkaProducer.send(new ProducerRecord<>(topic, content));System.out.println("message" + i);}long spent = System.currentTimeMillis() - start;System.out.println(" spent " + spent + " ms ");kafkaProducer.close();//}}

 

 

客户端这边的限制

客户端这边报错如下 

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1880363 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

 

主要是客户端这边 本地对于消息大小的校验 使用的配置是 max.request.size 

然后 默认值是 1M, 我们这里没有手动配置, 因此是 默认值 1M 

但是实际消息大小在 1.7M 左右, 因此 抛出了异常 

 

 

解决方式, ProducerConfig 增加 max.request.size 的配置 

 

 

服务器那边的限制

客户端这边生产线消息之后, 同步获取服务器的反馈, 得到信息如下 

但是 服务器那边 没有报错消息, 因此 较为隐蔽 

Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)at com.hx.test14.Test10KafkaProducer.main(Test10KafkaProducer.java:41)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

 

服务器这边处理如下, 有一个 消息大小的限制 

读取的配置是服务器的 message.max.bytes 的配置

 

调整了了这个配置之后, 服务器这边的校验就过了 

 

 

客户端这边拿到存储的记录如下 

 

 

问了一下 chatgpt 

和本文的梳理出来的东西一致, 但是 开销却比 人类 大脑快多了 

 

 

 

 

 

http://www.dtcms.com/a/585423.html

相关文章:

  • 使用Unity ASE插件设置数值不会生效的问题
  • 【ZeroRange WebRTC】WebRTC 信令安全:实现原理与应用(深入指南)
  • 关于Flutter与Qt for python 的一些技术、开源、商用等问题
  • 国外免费建站网站不用下载设计师培训心得
  • 深入解析 LeetCode 1572:矩阵对角线元素的和 —— 从问题本质到高效实现
  • Android Input ANR分析
  • Dify 添加 Google cloud 模型供应商
  • 大语言模型提示词技巧总结
  • 高职无人机应用技术专业职业发展指南
  • 网站流量软件银行门户网站建设
  • MySQL的CASE WHEN函数介绍
  • 逻辑方程结构图语言的机器实现(草稿)
  • 计算机组成原理---中央处理器
  • 合肥建设学校官方网站网站栏目规划图
  • windows11配置wsl安装ubuntu20.04
  • Go语言中的函数
  • 建瓯市建设局网站国内团购网站做的最好的是
  • XMSRC4392_VC1:4通道192KHz ASRC及768KHz SSRC音频采样率转换器产品介绍
  • 来宾绍兴seo网站托管方案手机怎么弄微信公众号
  • C 标准库 - <ctype.h>
  • Xshell效率实战:SSH管理秘籍(二)
  • 克隆整个macOS系统到新磁盘
  • 详解【限流算法】:令牌桶、漏桶、计算器算法及Java实现
  • Spring Cloud Config
  • 河南卫生基层系统网站建设企业资质查询系统官网
  • 临沂网站改版购买商标去哪个网站
  • 模块化并行清洗工装:实现规模化清洗的增效方案
  • Vue项目实战《尚医通》,首页医院组件的搭建,笔记09
  • 《新概念英语青少年版》Unit1-4知识点
  • ParameterizedType