当前位置: 首页 > news >正文

kafka安装与参数配置

文章目录

  • 安装
    • 先安装zookeeper
    • 安装kafka
      • **列出所有主题,验证创建成功**
      • **测试步骤 4:启动消费者(Consumer)接收消息**
    • 此时消费者会等待,不会立即输出内容。
      • **启动生产者(Producer)发送消息**
      • **删除测试主题(可选)**
  • 配置详解
    • 生产者配置文件
      • 🔹 核心配置(Core Configuration)
      • 🔹 可靠性与重试(Reliability & Retry)
      • 🔹 批量与性能优化(Batching & Performance)
      • 🔹 连接与网络(Connection & Networking)
      • 🔹 分区与路由(Partitioning)
      • 🔹 幂等性与事务(Idempotence & Transactions)
      • 🔹 安全配置(Security)
      • 🔹 监控与指标(Metrics & Monitoring)
      • 🔹 其他(Miscellaneous)
      • ✅ 总结:该配置的特点
      • 📌 建议(根据场景)
    • 消费者配置详解核心配置项
      • 基本连接配置
      • 主题相关配置
      • 消费组配置
      • 提交配置
      • 拉取配置
      • 序列化配置
      • 心跳和会话配置
      • 网络和安全配置

安装

先安装zookeeper

apache-zookeeper-3.8.4

暂时无法在飞书文档外展示此内容

安装kafka

kafka_2.13-3.8.0

暂时无法在飞书文档外展示此内容

  1. 上传解压
  2. 修改配置

./config/server.properties

broker.id=0
listeners=PLAINTEXT://your.server.ip.address:9092
zookeeper.connect=localhost:2181
log.dirs=/opt/kafka/data

启动脚本

nohup /home/kafka/kafka_2.13-3.8.0/bin/kafka-server-start.sh /home/kafka/kafka_2.13-3.8.0/config/server.properties > /home/kafka/kafka_2.13-3.8.0/logs/kafka-server.log 2>&1 &
keyvalue
安装目录/home/kafka/kafka_2.13-3.8.0
配置文件/home/kafka/kafka_2.13-3.8.0/config/server.properties
启动sh /home/kafka/start.sh

测试创建一个测试主题(Topic)

./kafka-topics.sh --create \--topic test-topic \--bootstrap-server 192.168.1.106:9092 \--partitions 1 \--replication-factor 1

列出所有主题,验证创建成功

./kafka-topics.sh --list --bootstrap-server 192.168.1.106:9092

测试步骤 4:启动消费者(Consumer)接收消息

打开一个新的终端窗口或 tab,启动一个控制台消费者,用于接收消息。

./kafka-console-consumer.sh --bootstrap-server 192.168.1.106:9092 \--topic test-topic \--from-beginning
  • --from-beginning:表示从该主题的最开始读取消息(即使之前已有消息)。
  • 此时消费者会等待,不会立即输出内容。

启动生产者(Producer)发送消息

回到原来的终端,启动一个控制台生产者。

./kafka-console-producer.sh --bootstrap-server 192.168.1.106:9092 \--topic test-topic

输入一些测试消息,每输入一行按回车发送:

Hello Kafka!
This is a test message.
Kafka is working!
^C  # 按 Ctrl+C 退出

删除测试主题(可选)

测试完成后,可以删除主题(需确保 delete.topic.enable=trueserver.properties 中已启用,默认在新版本中为 true):

./kafka-topics.sh --delete \--topic test-topic \--bootstrap-server 192.168.1.106:9092

配置详解

生产者配置文件

ProducerConfig values: acks = -1batch.size = 16384bootstrap.servers = [192.168.1.106:9092]buffer.memory = 33554432client.dns.lookup = use_all_dns_ipsclient.id = producer-1compression.type = noneconnections.max.idle.ms = 540000delivery.timeout.ms = 120000enable.idempotence = trueinterceptor.classes = []key.serializer = class org.apache.kafka.common.serialization.StringSerializerlinger.ms = 0max.block.ms = 60000max.in.flight.requests.per.connection = 5max.request.size = 1048576metadata.max.age.ms = 300000metadata.max.idle.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitionerreceive.buffer.bytes = 32768reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 30000retries = 3retry.backoff.ms = 100sasl.client.callback.handler.class = nullsasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.login.callback.handler.class = nullsasl.login.class = nullsasl.login.connect.timeout.ms = nullsasl.login.read.timeout.ms = nullsasl.login.refresh.buffer.seconds = 300sasl.login.refresh.min.period.seconds = 60sasl.login.refresh.window.factor = 0.8sasl.login.refresh.window.jitter = 0.05sasl.login.retry.backoff.max.ms = 10000sasl.login.retry.backoff.ms = 100sasl.mechanism = GSSAPIsasl.oauthbearer.clock.skew.seconds = 30sasl.oauthbearer.expected.audience = nullsasl.oauthbearer.expected.issuer = nullsasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100sasl.oauthbearer.jwks.endpoint.url = nullsasl.oauthbearer.scope.claim.name = scopesasl.oauthbearer.sub.claim.name = subsasl.oauthbearer.token.endpoint.url = nullsecurity.protocol = PLAINTEXTsecurity.providers = nullsend.buffer.bytes = 131072socket.connection.setup.timeout.max.ms = 30000socket.connection.setup.timeout.ms = 10000ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2]ssl.endpoint.identification.algorithm = httpsssl.engine.factory.class = nullssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.certificate.chain = nullssl.keystore.key = nullssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSv1.2ssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.certificates = nullssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKStransaction.timeout.ms = 60000transactional.id = nullvalue.serializer = class org.apache.kafka.common.serialization.StringSerializer

提供的是一份 Kafka 生产者(Producer)的完整配置信息,这些配置决定了生产者如何将消息发送到 Kafka 集群。下面将逐项解释这些配置的含义、作用以及推荐使用场景。

🔹 核心配置(Core Configuration)

配置项解释
bootstrap.servers[192.168.1.106:9092]指定 Kafka 集群的入口地址,生产者通过它来连接和获取集群元数据。即使只写一个 broker 地址,Kafka 也会自动发现整个集群。
key.serializerStringSerializer将消息的 key 序列化为字节数组,此处使用字符串序列化器。
value.serializerStringSerializer将消息的 value 序列化为字节数组。
client.idproducer-1标识该生产者的逻辑名称,用于服务器端日志、监控和配额管理。

🔹 可靠性与重试(Reliability & Retry)

配置项解释
acks-1(等同于 all)控制消息写入副本的确认机制:
• acks=1:leader 写入即确认
• acks=0:不等待确认(可能丢消息)
• acks=-1 或 all:等待 ISR 中所有副本都写入才确认,最强一致性。
retries3发送失败后重试次数(如网络问题、leader 切换等)。
retry.backoff.ms100两次重试之间的等待时间(毫秒),避免频繁重试造成雪崩。
enable.idempotenceTRUE开启幂等性生产者,确保单分区内的消息不重复、不丢失、不乱序(需配合 acks=all 和 max.in.flight.requests.per.connection <= 5)。
这是实现“恰好一次语义”(exactly-once)的基础。

🔹 批量与性能优化(Batching & Performance)

配置项解释
batch.size16384(16KB)每个分区的批量发送缓冲区大小。当一个分区的消息积累到这个大小时,会触发发送。注意:不是必须填满才发。
linger.ms0消息在 batch 中等待更多消息加入的延迟时间。设为 0 表示立即发送;设为 5~100 可提升吞吐,但增加延迟。
buffer.memory33554432(32MB)生产者本地用于缓冲待发送消息的总内存大小。超过此值,send() 会阻塞或抛出异常(取决于 max.block.ms)。
max.request.size1048576(1MB)单个请求最大大小(包括所有消息总和),不能超过 broker 的 message.max.bytes。

🔹 连接与网络(Connection & Networking)

配置项解释
connections.max.idle.ms540000(9分钟)连接空闲多久后关闭。
request.timeout.ms30000(30秒)等待 broker 响应请求的最大时间,超时则重试。
delivery.timeout.ms120000(120秒)从 send() 调用开始到确认发送成功或失败的总超时时间,涵盖重试、缓存、请求等全过程。
max.block.ms60000(60秒)当缓冲区满或元数据不可用时,send() 方法最多阻塞的时间。超过则抛出 TimeoutException。
metadata.max.age.ms300000(5分钟)强制刷新元数据(如 topic 分区变化)的周期。
reconnect.backoff.ms / .max.ms50 / 1000连接失败后重试的初始和最大退避时间,避免频繁重连。
send.buffer.bytes / receive.buffer.bytes131072 / 32768TCP 发送和接收缓冲区大小,设为 -1 表示使用系统默认值。

🔹 分区与路由(Partitioning)

配置项解释
partitioner.classDefaultPartitioner分区策略类:
• 有 key:按 key 的 hash 值分配分区(保证相同 key 到同一分区)
• 无 key:轮询或粘性分区(Sticky Partitioning)
可自定义实现。

🔹 幂等性与事务(Idempotence & Transactions)

配置项解释
enable.idempotenceTRUE已解释如上,开启后 Kafka 会为每条消息分配序列号,防止重复。
max.in.flight.requests.per.connection5每个连接最多允许多少个未确认的请求。若开启幂等性,必须 ≤5,否则可能破坏顺序性。
transactional.idnull若启用事务(跨分区原子写入),需设置此 ID。当前未启用。
transaction.timeout.ms60000(60秒)事务最大超时时间,由 broker 控制。

🔹 安全配置(Security)

配置项解释
security.protocolPLAINTEXT使用明文传输,无加密或认证。
生产环境应使用 SSL 或 SASL_SSL。
sasl.mechanismGSSAPISASL 认证机制,GSSAPI 通常用于 Kerberos 认证。
但当前 security.protocol=PLAINTEXT,所以 SASL 不生效。
SSL 相关配置多项当前未启用 SSL 加密(因 security.protocol=PLAINTEXT),这些配置仅在使用 SSL 或 SASL_SSL 时生效。

🔹 监控与指标(Metrics & Monitoring)

配置项解释
metric.reporters[]自定义指标上报器(如 Prometheus),当前无。
metrics.sample.window.ms30000(30秒)统计窗口大小。
metrics.num.samples2保留的样本数。
metrics.recording.levelINFO记录的指标级别。

🔹 其他(Miscellaneous)

配置项解释
interceptor.classes[]生产者拦截器,可在消息发送前后插入逻辑(如日志、监控)。
client.dns.lookupuse_all_dns_ipsDNS 解析策略,支持多 IP 地址负载均衡。
socket.connection.setup.timeout.ms10000建立 TCP 连接的超时时间。

✅ 总结:该配置的特点

  • 高可靠性:acks=all + retries=3 + enable.idempotence=true → 保证消息不丢、不重复。
  • 低延迟:linger.ms=0 → 消息立即发送,不等待批量。
  • 中等吞吐:batch.size=16KB 较小,适合低延迟场景;若追求高吞吐,可适当调大 batch.sizelinger.ms
  • 无安全加密:security.protocol=PLAINTEXT → 仅适用于内网或测试环境。
  • 非事务性:transactional.id=null → 不支持跨分区原子操作。

📌 建议(根据场景)

场景建议调整
高吞吐增大 batch.size(如 64KB~1MB),设置 linger.ms=5~20
更强安全改用 security.protocol=SASL_SSL,配置 SSL 证书或 Kerberos
事务支持设置 transactional.id=your-txn-id,并启用事务 API
更低延迟保持当前配置即可

消费者配置详解核心配置项

2025-08-07 10:10:11.537  INFO 20840 --- [pool-3-thread-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: acks = -1batch.size = 16384bootstrap.servers = [192.168.1.106:9092]buffer.memory = 33554432client.dns.lookup = use_all_dns_ipsclient.id = producer-2compression.type = noneconnections.max.idle.ms = 540000delivery.timeout.ms = 120000enable.idempotence = trueinterceptor.classes = []key.serializer = class org.apache.kafka.common.serialization.StringSerializerlinger.ms = 0max.block.ms = 60000max.in.flight.requests.per.connection = 5max.request.size = 1048576metadata.max.age.ms = 300000metadata.max.idle.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitionerreceive.buffer.bytes = 32768reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 30000retries = 3retry.backoff.ms = 100sasl.client.callback.handler.class = nullsasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.login.callback.handler.class = nullsasl.login.class = nullsasl.login.connect.timeout.ms = nullsasl.login.read.timeout.ms = nullsasl.login.refresh.buffer.seconds = 300sasl.login.refresh.min.period.seconds = 60sasl.login.refresh.window.factor = 0.8sasl.login.refresh.window.jitter = 0.05sasl.login.retry.backoff.max.ms = 10000sasl.login.retry.backoff.ms = 100sasl.mechanism = GSSAPIsasl.oauthbearer.clock.skew.seconds = 30sasl.oauthbearer.expected.audience = nullsasl.oauthbearer.expected.issuer = nullsasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100sasl.oauthbearer.jwks.endpoint.url = nullsasl.oauthbearer.scope.claim.name = scopesasl.oauthbearer.sub.claim.name = subsasl.oauthbearer.token.endpoint.url = nullsecurity.protocol = PLAINTEXTsecurity.providers = nullsend.buffer.bytes = 131072socket.connection.setup.timeout.max.ms = 30000socket.connection.setup.timeout.ms = 10000ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2]ssl.endpoint.identification.algorithm = httpsssl.engine.factory.class = nullssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.certificate.chain = nullssl.keystore.key = nullssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSv1.2ssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.certificates = nullssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKStransaction.timeout.ms = 60000transactional.id = nullvalue.serializer = class org.apache.kafka.common.serialization.StringSerializer
  1. 基本连接配置

bootstrap.servers = [192.168.1.106:9092]:Kafka 集群地址,消费者将连接到这个地址

client.id = consumer-user-service-group-2:消费者客户端标识符,用于标识不同的消费者实例

consumer: group-id:

user-service-group

client.id = consumer-user-service-group-2:消费者客户端标识符,用于标识不同的消费者实例

为什么实际id多了个 -2?

Kafka Client ID 自动扩展机制,当您在配置中设置 client.id 时,Kafka 客户端库通常会在运行时自动扩展这个 ID,以确保在同一个 JVM 或同一台机器上运行的多个消费者实例具有唯一的标识符

基于实例计数的扩展:

Kafka 客户端可能会在基础 client.id 后面添加数字后缀

例如:如果配置了 consumer-user-service-group,实际可能变成 consumer-user-service-group-1、consumer-user-service-group-2 等

group相同,client.id不同会重复消费吗?

这是一个关于 Kafka 消费者组和消息消费机制的重要问题。让我详细解释一下:

相同 group.id,不同 client.id 是否会重复消费

简短答案

不会重复消费同一份消息。在同一个消费者组(group.id 相同)中,即使 client.id 不同,Kafka 也会确保每条消息只被组内的一个消费者实例消费。

  1. 主题相关配置

allow.auto.create.topics = true:允许自动创建主题,当消费者订阅不存在的主题时会自动创建

auto.offset.reset = latest:当没有初始偏移量或服务器上不再存在偏移量时的重置策略,设置为 latest 表示从最新的消息开始消费

相关配置

  1. earliest:与latest相反,设置为earliest表示当没有初始偏移量或当前偏移量不再可用时,消费者将从最早的消息开始消费。这对于希望重新处理所有历史消息的场景非常有用。
  2. none:如果设置为none,则消费者不会自动重置偏移量。这意味着如果没有找到先前提交的偏移量,消费者会报错。这要求用户必须明确知道如何处理这种情况。
  1. 消费组配置

group.id = user-service-group:消费者组 ID,标识消费者属于哪个消费组

同一个组,同一个主题下的消息,组内consumer共同消费,不会重复消费,提供消费者吞吐量

partition.assignment.strategy = [RangeAssignor, CooperativeStickyAssignor]:分区分配策略,决定如何将分区分配给消费者

  1. 提交配置

enable.auto.commit = false:禁用自动提交偏移量,需要手动提交以确保消息处理的可靠性

auto.commit.interval.ms = 5000:自动提交偏移量的时间间隔(毫秒),但因为启用了手动提交,此配置不生效

  1. 拉取配置

fetch.min.bytes = 1:消费者从服务器获取记录的最小字节数

fetch.max.bytes = 52428800:服务器为每个分区返回的最大数据量(50MB)

fetch.max.wait.ms = 500:如果没有足够的数据满足 fetch.min.bytes,服务器在响应前等待的最长时间

max.partition.fetch.bytes = 1048576:服务器从每个分区返回的最大数据量(1MB)

  1. 序列化配置

key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer:键的反序列化器

value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer:值的反序列化器

  1. 心跳和会话配置

heartbeat.interval.ms = 3000:消费者协调者心跳发送频率(3秒)

session.timeout.ms = 45000:消费者会话超时时间(45秒),超过此时间未收到心跳将触发重新平衡

max.poll.interval.ms = 300000:两次 poll 调用之间的最大时间间隔(5分钟)

  1. 网络和安全配置

security.protocol = PLAINTEXT:安全协议,使用明文传输

receive.buffer.bytes = 65536:TCP 接收缓冲区大小

send.buffer.bytes = 131072:TCP 发送缓冲区大小

http://www.dtcms.com/a/319852.html

相关文章:

  • MPC-in-the-Head 转换入门指南
  • 抖音、快手、视频号等多平台视频解析下载 + 磁力嗅探下载、视频加工(提取音频 / 压缩等)
  • 【性能测试】---测试工具篇(jmeter)
  • Java垃圾回收(GC)探析
  • 图像理解、计算机视觉相关名词解释
  • 最新教程 | CentOS 7 内网环境 Nginx + ECharts 页面离线部署手册(RPM 安装方式)
  • yolo目标检测技术:基础概念(一)
  • Vscode Data Wrangler 数据查看和处理工具
  • Docker容器技术详解
  • 施易德智慧门店管理系统:零售品牌出海的高效引擎
  • mysql 索引失效分析
  • Cesium粒子系统模拟风场动态效果
  • 国内使用 npm 时配置镜像源
  • 网络安全等级保护(等保)2.0 概述
  • 树莓派下载安装miniconda(linux版小anaconda)
  • 【奔跑吧!Linux 内核(第二版)】第6章:简单的字符设备驱动(一)
  • 解决 Nginx 反代中 proxy_ssl_name 环境变量失效问题:网页能打开但登录失败
  • 3深度学习Pytorch-神经网络--全连接神经网络、数据准备(构建数据类Dataset、TensorDataset 和数据加载器DataLoader)
  • TCP 如何保证可靠性
  • Linux openssl、openssh 升级 保留旧版本
  • 【插件式微服务架构系统分享】之 解耦至上:gateway 网关与APISIX 网关的不同分工
  • React 为什么要自定义 Hooks?
  • 一文解读“Performance面板”前端性能优化工具基础用法!
  • 顺序表——C语言
  • FPGA学习笔记——VGA静态字符的显示(寄存器)
  • SOMGAN:利用自组织映射提高生成对抗网络的模式探索能力
  • 国内PCB批量厂家推荐
  • Linux 文件IO与标准IO的区别解析
  • wordpress安装环境推荐php8.0+mysql5.7
  • Linux Docker 新手入门:一文学会配置镜像加速器