当前位置: 首页 > news >正文

【Note】《Kafka: The Definitive Guide》第三章: Kafka 生产者深入解析:如何高效写入 Kafka 消息队列

《Kafka: The Definitive Guide》第三章: Kafka 生产者深入解析:如何高效写入 Kafka 消息队列

Kafka 已经成为现代分布式系统中不可或缺的核心组件,尤其是在微服务、事件驱动架构与实时流处理领域。作为 Kafka 使用的第一步,生产者(Producer) 负责将消息写入 Kafka,这个过程背后有哪些关键机制?如何实现高可靠、高性能的写入?


什么是 Kafka Producer?

Kafka Producer 是 Kafka 客户端的一种,用于将消息以 Topic 的形式发送到 Kafka Broker 中的某个分区(Partition)
它是 Kafka 架构中数据输入的入口,也是连接上游系统与 Kafka 流平台的桥梁。


核心组成:写入一条消息发生了什么?

在你写入一条 Kafka 消息的背后,其实包含以下几个步骤:

1. 构造 ProducerRecord

这是 Kafka 中的基本写入单元。

new ProducerRecord<>("topic", "key", "value");
  • topic: 目标主题
  • key: 决定分区(可选)
  • value: 消息体

2. 消息序列化(Serialization)

Kafka 只接受字节数据,因此 keyvalue 需要先被序列化。Kafka 使用 Serializer 接口实现,比如最常用的:

key.serializer = org.apache.kafka.common.serialization.StringSerializer
value.serializer = org.apache.kafka.common.serialization.StringSerializer

你也可以自定义 JSON、Avro、Protobuf 等序列化方式,增强结构化能力与跨语言支持。

3. 分区策略(Partitioner)

Kafka Producer 需要决定消息该写入哪个分区:

情况分区策略
指定 Key使用 Key 的哈希结果 mod 分区数
未指定 Key使用轮询方式(Round-Robin)均匀分配
自定义 Partitioner可根据业务逻辑自行决定分区

配置 Kafka Producer:这些参数你不能不懂

Kafka Producer 是高度可配置的,以下是常用配置项与建议:

参数默认值含义
bootstrap.serversKafka Broker 地址列表
acks1写入确认级别(见下)
retries0重试次数(发送失败)
batch.size16384单批消息最大大小(字节)
linger.ms0批消息等待时间(毫秒)
compression.typenone支持 gzip/snappy/lz4/zstd
buffer.memory33554432Producer 缓存总容量(字节)

acks 参数详解(可靠性核心)

含义适用场景
0不等待 Broker 响应极端低延迟需求
1等 Leader 副本确认兼顾性能和基本可靠性
all(或 -1等所有副本(ISR)确认强一致性,金融/交易等场景推荐

异步 vs 同步发送消息

Kafka Producer 支持两种发送方式:

1. 异步发送(推荐)

producer.send(record, (metadata, exception) -> {if (exception != null) {// 失败处理} else {// 成功记录}
});
  • 非阻塞;
  • 通过回调函数处理发送结果;
  • 吞吐高、延迟低。

2. 同步发送

producer.send(record).get();
  • 阻塞调用;
  • 等待 Kafka 确认再继续;
  • 适合控制节奏或事务要求强的场景。

性能优化策略

为了提升 Kafka Producer 的吞吐性能,可采取以下几种方式:

批量发送

使用 batch.size + linger.ms 控制:

  • batch.size 设置为 32KB~128KB;
  • linger.ms 设置为 5~100ms,可适当延迟发送以合并更多消息。

压缩

compression.type = lz4
  • lz4 压缩率高,CPU 消耗低;
  • 可显著减少带宽与磁盘使用,适合高并发写入场景。

控制重试机制

retries = 5
retry.backoff.ms = 100
  • 自动容错短暂的网络中断;
  • 建议配合幂等性开启,避免重复写入。

幂等性 & 事务支持(高级特性)

幂等性(Idempotence)

enable.idempotence = true
  • Kafka 会为 Producer 分配唯一 ID;
  • 同一条消息不会被重复写入;
  • 开启后自动启用强一致性机制,建议默认开启。

事务性 Producer(Exactly-once)

transactional.id = "my-transactional-producer"
  • 支持将多个写入操作视为一个事务提交;
  • 保证多个 topic 的写入原子性;
  • 常见于 Kafka Streams 或写入 Kafka + DB 的一致性控制。

总结:写好 Kafka Producer 的六个关键点

关键点建议
ProducerRecord明确 Topic、Key、Value,决定分区行为
序列化根据数据格式选择合适 Serializer
分区策略默认即可,自定义需慎重设计
写入可靠性acks=all + enable.idempotence=true 实现强一致性
性能优化利用批量、压缩、异步发送提升性能
事务控制需要 Exactly-once 可使用事务性 Producer

附:Kafka Producer 最小可用示例

Java版本

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("demo-topic", "key" + i, "value" + i);producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.printf("Sent to partition %d with offset %d%n", metadata.partition(), metadata.offset());} else {exception.printStackTrace();}});
}producer.close();

C++版本

#include <iostream>
#include <string>
#include <csignal>
#include <librdkafka/rdkafkacpp.h>bool run = true;
void stop(int sig) { run = false; }class EventCb : public RdKafka::EventCb {
public:void event_cb(RdKafka::Event &event) override {if (event.type() == RdKafka::Event::EVENT_ERROR) {std::cerr << "Kafka Error: " << event.str() << std::endl;}}
};int main() {std::signal(SIGINT, stop);std::string brokers = "localhost:9092";std::string topic = "demo-topic";RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);conf->set("bootstrap.servers", brokers, nullptr);conf->set("dr_cb", nullptr, nullptr); // delivery report callback 可选EventCb event_cb;conf->set("event_cb", &event_cb, nullptr);std::string errstr;RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);if (!producer) {std::cerr << "Producer creation failed: " << errstr << std::endl;return 1;}while (run) {std::string payload = "Hello Kafka from C++";RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA,RdKafka::Producer::RK_MSG_COPY,const_cast<char *>(payload.c_str()), payload.size(),nullptr, nullptr);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;}producer->poll(0); // 触发回调std::this_thread::sleep_for(std::chrono::milliseconds(500));}producer->flush(1000);delete producer;delete conf;return 0;
}

http://www.dtcms.com/a/268175.html

相关文章:

  • HarmonyOS学习6 --- 数据存储
  • windows系统安装mongoDB且创建集合植入初始化数据
  • vue事件处理-按键修饰符
  • 闲庭信步使用图像验证平台加速FPGA的开发:第一课——由测试平台到验证平台
  • CSS06:字体样式
  • 数据结构---链表结构体、指针深入理解(三)
  • Petalinux工程如何离线编译
  • C++ 中左值和右值
  • 论文评价指标之(n-gram、BLEU、MRR、ANLS)
  • python库 maya 库的各种案例的使用详解(人性化的日期时间处理)
  • 使用Python将PDF转换成word、PPT
  • SSL 终结(SSL Termination)深度解析:从原理到实践的全维度指南
  • 电商系统二次开发找谁做?ZKmall开源商城前后端分离技术更易升级迭代
  • leetcode 每日一题 1865. 找出和为指定值的下标对
  • python学习打卡:DAY 21 常见的降维算法
  • 红宝书学习笔记
  • 多级缓存如何应用
  • YOLO目标检测数据集类别:分类与应用
  • Oracle使用SQL一次性向表中插入多行数据
  • NLP之文本纠错开源大模型:兼看语音大模型总结
  • 李宏毅genai笔记:推理
  • Maven引入第三方JAR包实战指南
  • 支持向量机(SVM)在肝脏CT/MRI图像分类(肝癌检测)中的应用及实现
  • Python11中创建虚拟环境、安装 TensorFlow
  • AI编程:打造炫酷多语倒计时器
  • 【Elasticsearch】自定义评分检索
  • 评论区实现 前端Vue
  • 【openp2p】 学习4: 纳秒级别的时间同步算法及demo
  • 数学建模的一般步骤
  • FastAPI+React19开发ERP系统实战第04期