kafka安装与参数配置
文章目录
- 安装
- 先安装zookeeper
- 安装kafka
- **列出所有主题,验证创建成功**
- **测试步骤 4:启动消费者(Consumer)接收消息**
- 此时消费者会等待,不会立即输出内容。
- **启动生产者(Producer)发送消息**
- **删除测试主题(可选)**
- 配置详解
- 生产者配置文件
- 🔹 核心配置(Core Configuration)
- 🔹 可靠性与重试(Reliability & Retry)
- 🔹 批量与性能优化(Batching & Performance)
- 🔹 连接与网络(Connection & Networking)
- 🔹 分区与路由(Partitioning)
- 🔹 幂等性与事务(Idempotence & Transactions)
- 🔹 安全配置(Security)
- 🔹 监控与指标(Metrics & Monitoring)
- 🔹 其他(Miscellaneous)
- ✅ 总结:该配置的特点
- 📌 建议(根据场景)
- 消费者配置详解核心配置项
- 基本连接配置
- 主题相关配置
- 消费组配置
- 提交配置
- 拉取配置
- 序列化配置
- 心跳和会话配置
- 网络和安全配置
安装
先安装zookeeper
apache-zookeeper-3.8.4
暂时无法在飞书文档外展示此内容
安装kafka
kafka_2.13-3.8.0
暂时无法在飞书文档外展示此内容
- 上传解压
- 修改配置
./config/server.properties
broker.id=0
listeners=PLAINTEXT://your.server.ip.address:9092
zookeeper.connect=localhost:2181
log.dirs=/opt/kafka/data
启动脚本
nohup /home/kafka/kafka_2.13-3.8.0/bin/kafka-server-start.sh /home/kafka/kafka_2.13-3.8.0/config/server.properties > /home/kafka/kafka_2.13-3.8.0/logs/kafka-server.log 2>&1 &
key | value |
---|---|
安装目录 | /home/kafka/kafka_2.13-3.8.0 |
配置文件 | /home/kafka/kafka_2.13-3.8.0/config/server.properties |
启动 | sh /home/kafka/start.sh |
测试创建一个测试主题(Topic)
./kafka-topics.sh --create \--topic test-topic \--bootstrap-server 192.168.1.106:9092 \--partitions 1 \--replication-factor 1
列出所有主题,验证创建成功
./kafka-topics.sh --list --bootstrap-server 192.168.1.106:9092
测试步骤 4:启动消费者(Consumer)接收消息
打开一个新的终端窗口或 tab,启动一个控制台消费者,用于接收消息。
./kafka-console-consumer.sh --bootstrap-server 192.168.1.106:9092 \--topic test-topic \--from-beginning
启动生产者(Producer)发送消息
回到原来的终端,启动一个控制台生产者。
./kafka-console-producer.sh --bootstrap-server 192.168.1.106:9092 \--topic test-topic
输入一些测试消息,每输入一行按回车发送:
Hello Kafka!
This is a test message.
Kafka is working!
^C # 按 Ctrl+C 退出
删除测试主题(可选)
测试完成后,可以删除主题(需确保 delete.topic.enable=true
在 server.properties
中已启用,默认在新版本中为 true
):
./kafka-topics.sh --delete \--topic test-topic \--bootstrap-server 192.168.1.106:9092
配置详解
生产者配置文件
ProducerConfig values: acks = -1batch.size = 16384bootstrap.servers = [192.168.1.106:9092]buffer.memory = 33554432client.dns.lookup = use_all_dns_ipsclient.id = producer-1compression.type = noneconnections.max.idle.ms = 540000delivery.timeout.ms = 120000enable.idempotence = trueinterceptor.classes = []key.serializer = class org.apache.kafka.common.serialization.StringSerializerlinger.ms = 0max.block.ms = 60000max.in.flight.requests.per.connection = 5max.request.size = 1048576metadata.max.age.ms = 300000metadata.max.idle.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitionerreceive.buffer.bytes = 32768reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 30000retries = 3retry.backoff.ms = 100sasl.client.callback.handler.class = nullsasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.login.callback.handler.class = nullsasl.login.class = nullsasl.login.connect.timeout.ms = nullsasl.login.read.timeout.ms = nullsasl.login.refresh.buffer.seconds = 300sasl.login.refresh.min.period.seconds = 60sasl.login.refresh.window.factor = 0.8sasl.login.refresh.window.jitter = 0.05sasl.login.retry.backoff.max.ms = 10000sasl.login.retry.backoff.ms = 100sasl.mechanism = GSSAPIsasl.oauthbearer.clock.skew.seconds = 30sasl.oauthbearer.expected.audience = nullsasl.oauthbearer.expected.issuer = nullsasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100sasl.oauthbearer.jwks.endpoint.url = nullsasl.oauthbearer.scope.claim.name = scopesasl.oauthbearer.sub.claim.name = subsasl.oauthbearer.token.endpoint.url = nullsecurity.protocol = PLAINTEXTsecurity.providers = nullsend.buffer.bytes = 131072socket.connection.setup.timeout.max.ms = 30000socket.connection.setup.timeout.ms = 10000ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2]ssl.endpoint.identification.algorithm = httpsssl.engine.factory.class = nullssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.certificate.chain = nullssl.keystore.key = nullssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSv1.2ssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.certificates = nullssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKStransaction.timeout.ms = 60000transactional.id = nullvalue.serializer = class org.apache.kafka.common.serialization.StringSerializer
提供的是一份 Kafka 生产者(Producer)的完整配置信息,这些配置决定了生产者如何将消息发送到 Kafka 集群。下面将逐项解释这些配置的含义、作用以及推荐使用场景。
🔹 核心配置(Core Configuration)
配置项 | 值 | 解释 |
---|---|---|
bootstrap.servers | [192.168.1.106:9092] | 指定 Kafka 集群的入口地址,生产者通过它来连接和获取集群元数据。即使只写一个 broker 地址,Kafka 也会自动发现整个集群。 |
key.serializer | StringSerializer | 将消息的 key 序列化为字节数组,此处使用字符串序列化器。 |
value.serializer | StringSerializer | 将消息的 value 序列化为字节数组。 |
client.id | producer-1 | 标识该生产者的逻辑名称,用于服务器端日志、监控和配额管理。 |
🔹 可靠性与重试(Reliability & Retry)
配置项 | 值 | 解释 |
---|---|---|
acks | -1(等同于 all) | 控制消息写入副本的确认机制: • acks=1:leader 写入即确认 • acks=0:不等待确认(可能丢消息) • acks=-1 或 all:等待 ISR 中所有副本都写入才确认,最强一致性。 |
retries | 3 | 发送失败后重试次数(如网络问题、leader 切换等)。 |
retry.backoff.ms | 100 | 两次重试之间的等待时间(毫秒),避免频繁重试造成雪崩。 |
enable.idempotence | TRUE | 开启幂等性生产者,确保单分区内的消息不重复、不丢失、不乱序(需配合 acks=all 和 max.in.flight.requests.per.connection <= 5)。 这是实现“恰好一次语义”(exactly-once)的基础。 |
🔹 批量与性能优化(Batching & Performance)
配置项 | 值 | 解释 |
---|---|---|
batch.size | 16384(16KB) | 每个分区的批量发送缓冲区大小。当一个分区的消息积累到这个大小时,会触发发送。注意:不是必须填满才发。 |
linger.ms | 0 | 消息在 batch 中等待更多消息加入的延迟时间。设为 0 表示立即发送;设为 5~100 可提升吞吐,但增加延迟。 |
buffer.memory | 33554432(32MB) | 生产者本地用于缓冲待发送消息的总内存大小。超过此值,send() 会阻塞或抛出异常(取决于 max.block.ms)。 |
max.request.size | 1048576(1MB) | 单个请求最大大小(包括所有消息总和),不能超过 broker 的 message.max.bytes。 |
🔹 连接与网络(Connection & Networking)
配置项 | 值 | 解释 |
---|---|---|
connections.max.idle.ms | 540000(9分钟) | 连接空闲多久后关闭。 |
request.timeout.ms | 30000(30秒) | 等待 broker 响应请求的最大时间,超时则重试。 |
delivery.timeout.ms | 120000(120秒) | 从 send() 调用开始到确认发送成功或失败的总超时时间,涵盖重试、缓存、请求等全过程。 |
max.block.ms | 60000(60秒) | 当缓冲区满或元数据不可用时,send() 方法最多阻塞的时间。超过则抛出 TimeoutException。 |
metadata.max.age.ms | 300000(5分钟) | 强制刷新元数据(如 topic 分区变化)的周期。 |
reconnect.backoff.ms / .max.ms | 50 / 1000 | 连接失败后重试的初始和最大退避时间,避免频繁重连。 |
send.buffer.bytes / receive.buffer.bytes | 131072 / 32768 | TCP 发送和接收缓冲区大小,设为 -1 表示使用系统默认值。 |
🔹 分区与路由(Partitioning)
配置项 | 值 | 解释 |
---|---|---|
partitioner.class | DefaultPartitioner | 分区策略类: • 有 key:按 key 的 hash 值分配分区(保证相同 key 到同一分区) • 无 key:轮询或粘性分区(Sticky Partitioning) 可自定义实现。 |
🔹 幂等性与事务(Idempotence & Transactions)
配置项 | 值 | 解释 |
---|---|---|
enable.idempotence | TRUE | 已解释如上,开启后 Kafka 会为每条消息分配序列号,防止重复。 |
max.in.flight.requests.per.connection | 5 | 每个连接最多允许多少个未确认的请求。若开启幂等性,必须 ≤5,否则可能破坏顺序性。 |
transactional.id | null | 若启用事务(跨分区原子写入),需设置此 ID。当前未启用。 |
transaction.timeout.ms | 60000(60秒) | 事务最大超时时间,由 broker 控制。 |
🔹 安全配置(Security)
配置项 | 值 | 解释 |
---|---|---|
security.protocol | PLAINTEXT | 使用明文传输,无加密或认证。 生产环境应使用 SSL 或 SASL_SSL。 |
sasl.mechanism | GSSAPI | SASL 认证机制,GSSAPI 通常用于 Kerberos 认证。 但当前 security.protocol=PLAINTEXT,所以 SASL 不生效。 |
SSL 相关配置 | 多项 | 当前未启用 SSL 加密(因 security.protocol=PLAINTEXT),这些配置仅在使用 SSL 或 SASL_SSL 时生效。 |
🔹 监控与指标(Metrics & Monitoring)
配置项 | 值 | 解释 |
---|---|---|
metric.reporters | [] | 自定义指标上报器(如 Prometheus),当前无。 |
metrics.sample.window.ms | 30000(30秒) | 统计窗口大小。 |
metrics.num.samples | 2 | 保留的样本数。 |
metrics.recording.level | INFO | 记录的指标级别。 |
🔹 其他(Miscellaneous)
配置项 | 值 | 解释 |
---|---|---|
interceptor.classes | [] | 生产者拦截器,可在消息发送前后插入逻辑(如日志、监控)。 |
client.dns.lookup | use_all_dns_ips | DNS 解析策略,支持多 IP 地址负载均衡。 |
socket.connection.setup.timeout.ms | 10000 | 建立 TCP 连接的超时时间。 |
✅ 总结:该配置的特点
- 高可靠性:
acks=all
+retries=3
+enable.idempotence=true
→ 保证消息不丢、不重复。 - 低延迟:
linger.ms=0
→ 消息立即发送,不等待批量。 - 中等吞吐:
batch.size=16KB
较小,适合低延迟场景;若追求高吞吐,可适当调大batch.size
和linger.ms
。 - 无安全加密:
security.protocol=PLAINTEXT
→ 仅适用于内网或测试环境。 - 非事务性:
transactional.id=null
→ 不支持跨分区原子操作。
📌 建议(根据场景)
场景 | 建议调整 |
---|---|
高吞吐 | 增大 batch.size(如 64KB~1MB),设置 linger.ms=5~20 |
更强安全 | 改用 security.protocol=SASL_SSL,配置 SSL 证书或 Kerberos |
事务支持 | 设置 transactional.id=your-txn-id,并启用事务 API |
更低延迟 | 保持当前配置即可 |
消费者配置详解核心配置项
2025-08-07 10:10:11.537 INFO 20840 --- [pool-3-thread-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: acks = -1batch.size = 16384bootstrap.servers = [192.168.1.106:9092]buffer.memory = 33554432client.dns.lookup = use_all_dns_ipsclient.id = producer-2compression.type = noneconnections.max.idle.ms = 540000delivery.timeout.ms = 120000enable.idempotence = trueinterceptor.classes = []key.serializer = class org.apache.kafka.common.serialization.StringSerializerlinger.ms = 0max.block.ms = 60000max.in.flight.requests.per.connection = 5max.request.size = 1048576metadata.max.age.ms = 300000metadata.max.idle.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitionerreceive.buffer.bytes = 32768reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 30000retries = 3retry.backoff.ms = 100sasl.client.callback.handler.class = nullsasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.login.callback.handler.class = nullsasl.login.class = nullsasl.login.connect.timeout.ms = nullsasl.login.read.timeout.ms = nullsasl.login.refresh.buffer.seconds = 300sasl.login.refresh.min.period.seconds = 60sasl.login.refresh.window.factor = 0.8sasl.login.refresh.window.jitter = 0.05sasl.login.retry.backoff.max.ms = 10000sasl.login.retry.backoff.ms = 100sasl.mechanism = GSSAPIsasl.oauthbearer.clock.skew.seconds = 30sasl.oauthbearer.expected.audience = nullsasl.oauthbearer.expected.issuer = nullsasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100sasl.oauthbearer.jwks.endpoint.url = nullsasl.oauthbearer.scope.claim.name = scopesasl.oauthbearer.sub.claim.name = subsasl.oauthbearer.token.endpoint.url = nullsecurity.protocol = PLAINTEXTsecurity.providers = nullsend.buffer.bytes = 131072socket.connection.setup.timeout.max.ms = 30000socket.connection.setup.timeout.ms = 10000ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2]ssl.endpoint.identification.algorithm = httpsssl.engine.factory.class = nullssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.certificate.chain = nullssl.keystore.key = nullssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSv1.2ssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.certificates = nullssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKStransaction.timeout.ms = 60000transactional.id = nullvalue.serializer = class org.apache.kafka.common.serialization.StringSerializer
bootstrap.servers = [192.168.1.106:9092]:Kafka 集群地址,消费者将连接到这个地址
client.id = consumer-user-service-group-2:消费者客户端标识符,用于标识不同的消费者实例
consumer: group-id:
user-service-group
client.id = consumer-user-service-group-2:消费者客户端标识符,用于标识不同的消费者实例
为什么实际id多了个 -2?
Kafka Client ID 自动扩展机制,当您在配置中设置 client.id 时,Kafka 客户端库通常会在运行时自动扩展这个 ID,以确保在同一个 JVM 或同一台机器上运行的多个消费者实例具有唯一的标识符
基于实例计数的扩展:
Kafka 客户端可能会在基础 client.id 后面添加数字后缀
例如:如果配置了 consumer-user-service-group,实际可能变成 consumer-user-service-group-1、consumer-user-service-group-2 等
group相同,client.id不同会重复消费吗?
这是一个关于 Kafka 消费者组和消息消费机制的重要问题。让我详细解释一下:
相同 group.id,不同 client.id 是否会重复消费
简短答案
不会重复消费同一份消息。在同一个消费者组(group.id 相同)中,即使 client.id 不同,Kafka 也会确保每条消息只被组内的一个消费者实例消费。
allow.auto.create.topics = true:允许自动创建主题,当消费者订阅不存在的主题时会自动创建
auto.offset.reset = latest:当没有初始偏移量或服务器上不再存在偏移量时的重置策略,设置为 latest 表示从最新的消息开始消费
相关配置
- earliest:与
latest
相反,设置为earliest
表示当没有初始偏移量或当前偏移量不再可用时,消费者将从最早的消息开始消费。这对于希望重新处理所有历史消息的场景非常有用。- none:如果设置为
none
,则消费者不会自动重置偏移量。这意味着如果没有找到先前提交的偏移量,消费者会报错。这要求用户必须明确知道如何处理这种情况。
group.id = user-service-group:消费者组 ID,标识消费者属于哪个消费组
同一个组,同一个主题下的消息,组内consumer共同消费,不会重复消费,提供消费者吞吐量
partition.assignment.strategy = [RangeAssignor, CooperativeStickyAssignor]:分区分配策略,决定如何将分区分配给消费者
enable.auto.commit = false:禁用自动提交偏移量,需要手动提交以确保消息处理的可靠性
auto.commit.interval.ms = 5000:自动提交偏移量的时间间隔(毫秒),但因为启用了手动提交,此配置不生效
fetch.min.bytes = 1:消费者从服务器获取记录的最小字节数
fetch.max.bytes = 52428800:服务器为每个分区返回的最大数据量(50MB)
fetch.max.wait.ms = 500:如果没有足够的数据满足 fetch.min.bytes,服务器在响应前等待的最长时间
max.partition.fetch.bytes = 1048576:服务器从每个分区返回的最大数据量(1MB)
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer:键的反序列化器
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer:值的反序列化器
heartbeat.interval.ms = 3000:消费者协调者心跳发送频率(3秒)
session.timeout.ms = 45000:消费者会话超时时间(45秒),超过此时间未收到心跳将触发重新平衡
max.poll.interval.ms = 300000:两次 poll 调用之间的最大时间间隔(5分钟)
security.protocol = PLAINTEXT:安全协议,使用明文传输
receive.buffer.bytes = 65536:TCP 接收缓冲区大小
send.buffer.bytes = 131072:TCP 发送缓冲区大小