当前位置: 首页 > news >正文

【kafka系列】生产者

目录

发送流程

1. 流程逻辑分析

阶段一:主线程处理

阶段二:Sender 线程异步发送

核心设计思想

2. 流程

关键点总结

重要参数

一、核心必填参数

二、可靠性相关参数

三、性能优化参数

四、高级配置

五、安全性配置(可选)

六、错误处理与监控

典型配置示例

关键注意事项


发送流程

  • 序列化与分区:消息通过Partitioner选择目标分区(默认轮询或哈希),序列化后加入RecordAccumulator缓冲区。
  • 批次合并Sender线程将同一分区的消息合并为ProducerBatch,减少网络请求(源码见Sender.run()方法)。
  • 发送至Broker:通过NetworkClient异步发送,Broker的LogAppendTime处理写入请求。
  • ACK机制:根据acks配置(0/1/all)等待Broker确认,通过Metadata类更新分区元数据

1. 流程逻辑分析

Kafka 生产者发送消息的核心流程分为 主线程处理Sender 线程异步发送 两个阶段,具体步骤如下:


阶段一:主线程处理
  1. 创建 ProducerRecord
    • 用户调用 producer.send(ProducerRecord),指定 Topic、Key、Value 和可选的分区或时间戳。
  1. 选择分区(Partition)
    • 若未指定分区,根据以下规则选择:
      • 有 Key:对 Key 哈希取模(hash(key) % 分区数),确保相同 Key 的消息进入同一分区。
      • 无 Key:默认使用粘性分区策略(Sticky Partitioning,Kafka 2.4+),在批次填满或超时前发送到同一分区,提升性能。
  1. 序列化(Serialize)
    • 使用配置的 key.serializervalue.serializer 对 Key 和 Value 序列化(如 StringSerializerByteArraySerializer)。
  1. 追加到缓冲区(RecordAccumulator)
    • 将消息按 Topic-Partition 分组,存入 RecordAccumulator 的批次(Batch)中。
    • 批次策略
      • batch.size:批次大小阈值(默认 16KB),达到阈值立即发送。
      • linger.ms:批次等待时间(默认 0ms),超时后发送未满批次。

阶段二:Sender 线程异步发送
  1. Sender 线程拉取批次
    • Sender 线程定期检查缓冲区,将满足条件的批次(已满或超时)封装为 ProducerRequest
  1. 构建请求并发送到 Broker
    • 根据分区的 Leader 副本所在 Broker,将请求发送到对应的节点。
    • 关键配置
      • acks:控制消息持久化确认级别:
        • 0:不等待确认(可能丢失数据)。
        • 1:等待 Leader 确认(默认)。
        • all:等待所有 ISR 副本确认(最高可靠性)。
      • max.in.flight.requests.per.connection:控制单个 Broker 的未确认请求数(默认 5)。
  1. 处理 Broker 响应
    • 成功:触发用户设置的 Callback 回调,并释放批次内存。
    • 失败
      • 可重试错误(如网络抖动、Leader 切换):根据 retries(默认 0)和 retry.backoff.ms(默认 100ms)重试。
      • 不可重试错误(如消息过大):直接触发回调并抛出异常。

核心设计思想
  • 异步批处理:通过缓冲区合并小消息,减少网络 I/O 次数。
  • 零拷贝优化:使用 sendfile 系统调用提升网络传输效率。
  • 高可靠性:通过重试机制和 acks=all 确保消息不丢失。

2. 流程


关键点总结

  1. 分区选择:优先使用 Key 哈希或粘性分区策略,保证消息顺序性和吞吐量。
  2. 批次优化:通过 batch.sizelinger.ms 平衡延迟与吞吐。
  3. 可靠性保障:通过 acksretries 配置确保消息持久化。
  4. 异步处理:主线程与 Sender 线程解耦,避免阻塞用户逻辑。

重要参数

以下是 Kafka 生产者(Producer)在日常开发中的 常见配置参数 及其作用,按功能分类整理成表格:


一、核心必填参数

参数名

默认值

说明

bootstrap.servers

Kafka 集群地址列表(逗号分隔,如 host1:9092,host2:9092

)。

key.serializer

Key 的序列化类(如 org.apache.kafka.common.serialization.StringSerializer

)。

value.serializer

Value 的序列化类(同上)。


二、可靠性相关参数

参数名

默认值

说明

acks

1

消息持久化确认机制:

0:不等待确认(可能丢失数据)。 1:等待 Leader 确认(默认)。all:等待所有 ISR 副本确认(最高可靠性)。

retries

0

发送失败后的重试次数(建议设为 Integer.MAX_VALUE

配合 delivery.timeout.ms

)。

enable.idempotence

false

是否启用幂等性(true时保证消息不重复,需配合 acks=all

retries>0)。

max.in.flight.requests.per.connection

5

单个 Broker 的未确认请求数。若启用幂等性,建议设为 1

以保证顺序。


三、性能优化参数

参数名

默认值

说明

linger.ms

0

消息在缓冲区等待时间(毫秒),增大可提升吞吐量(但增加延迟)。

batch.size

16384

(16KB)

单个批次的大小阈值,达到阈值后立即发送。

buffer.memory

33554432

(32MB)

生产者缓冲区的总内存大小。

compression.type

none

消息压缩算法(gzip

snappy

lz4

zstd

),减少网络带宽占用。


四、高级配置

参数名

默认值

说明

request.timeout.ms

30000

(30秒)

生产者等待 Broker 响应的超时时间。

max.block.ms

60000

(60秒)

生产者缓冲区满或元数据不可用时的阻塞时间(超时抛异常)。

partitioner.class

默认轮询/哈希策略

自定义分区策略(实现 Partitioner

接口)。


五、安全性配置(可选)

参数名

默认值

说明

security.protocol

PLAINTEXT

安全协议(如 SSL

SASL_SSL

)。

ssl.keystore.location

SSL 证书路径(客户端认证时需配置)。

sasl.mechanism

SASL 认证机制(如 PLAIN

SCRAM-SHA-256

)。


六、错误处理与监控

参数名

默认值

说明

interceptor.classes

生产者拦截器(实现 ProducerInterceptor

接口),用于监控或修改消息。

metrics.sample.window.ms

30000

(30秒)

性能指标采样窗口时间。


典型配置示例

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 10);
props.put("linger.ms", 20);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");
props.put("enable.idempotence", "true");

关键注意事项

  1. 可靠性 vs 性能
    • acks=allenable.idempotence=true 提高可靠性,但可能降低吞吐量。
    • 增大 batch.sizelinger.ms 可提升吞吐量,但增加延迟。
  1. 幂等性限制
    • 需 Kafka 0.11+ 版本支持,且 max.in.flight.requests=1(或 Kafka 2.0+ 允许 5)。
  1. 监控与调优
    • 通过 metrics 和拦截器监控生产者性能,动态调整参数

相关文章:

  • 第十二篇:黑客帝国终章——电子技术思维导图与三电技术进化论
  • 线程安全的集合类
  • 每日十题八股-补充材料-2025年2月15日
  • Bob the Canadian
  • 安装Homebrew时提示Warning: /opt/homebrew/bin is not in your PATH.
  • UNET改进62:添加HFERB模块|提取高频信息的高频增强残差块
  • 基于Deepseek自动生成单元测试的Idea插件
  • 5分钟掌握LM Studio本地部署DeepSeek R1
  • Rust包管理
  • 计算机软件毕业设计选题指南:热门方向与创新思路
  • 【java】方法的值传递
  • 数据守护者:备份文件的重要性及自动化备份实践
  • 《深度Q网络优化:突破高维连续状态空间的束缚》
  • 代码随想录算法营Day39 | 416. 分割等和子集
  • 小火车理论
  • 高血压危险因素分析(项目分享)
  • 计算机基础-内存分配
  • Swift - 引用计数
  • 数电基础总结
  • STM32的HAL库开发---内存保护(MPU)
  • 杨建全已任天津市委副秘书长、市委市政府信访办主任
  • 工人日报:应对“职场肥胖”,健康与减重同受关注
  • 六省会共建交通枢纽集群,中部离经济“第五极”有多远?
  • 新片|《碟中谍8:最终清算》定档5月30日
  • 沧州低空经济起飞:飞行汽车开启千亿赛道,通用机场布局文旅体验
  • 上海市重大工程一季度开局良好,崇明线等按既定计划加快建设