当前位置: 首页 > news >正文

Kafka 运维实战基本操作含命令与最佳实践

1. 基础概览与工具入口

  • Kafka 发行包的所有 CLI 工具均在 bin/ 目录下。
  • 任何工具不带参数运行都会显示所有可用选项。
  • 本文命令默认:--bootstrap-server localhost:9092;生产请替换为你的控制面或内网 VIP。

2. 主题管理(创建 / 修改 / 删除 / 命名限制)

2.1 创建主题

bin/kafka-topics.sh --bootstrap-server localhost:9092 \--create --topic my_topic_name \--partitions 20 --replication-factor 3 \--config x=y
  • replication-factor(副本因子):建议 2~3,可在不停机重启 Broker 的同时保证数据可读。
  • partitions(分区数):决定并行度数据/负载可分布的最大 Broker 数(不含副本)。
  • 配置覆盖:命令行 --config 会覆盖 Broker 的默认主题级配置(如保留时长等)。

2.2 主题命名长度限制

  • 分区目录命名规则:<topic>-<partitionId>;通常文件夹名最长 255 字符。
  • 假设分区数不超 100,000(5 位),则主题名 ≤ 249 字符(留出 - 与 5 位分区号)。

2.3 修改主题

  • 增加分区(仅增不减):

    bin/kafka-topics.sh --bootstrap-server localhost:9092 \--alter --topic my_topic_name --partitions 40
    

    ⚠️ 若你基于 hash(key) % partitions语义分片,新增分区不会重分布历史数据,可能影响消费者假设。Kafka 不会自动搬旧数据。

  • 新增配置

    bin/kafka-configs.sh --bootstrap-server localhost:9092 \--entity-type topics --entity-name my_topic_name \--alter --add-config x=y
    
  • 删除配置

    bin/kafka-configs.sh --bootstrap-server localhost:9092 \--entity-type topics --entity-name my_topic_name \--alter --delete-config x
    

2.4 删除主题

bin/kafka-topics.sh --bootstrap-server localhost:9092 \--delete --topic my_topic_name

❌ 目前 Kafka 不支持减少分区数
🔁 调整副本因子请参考第 6.4 节「提升副本因子」。

3. 消费组可观测与管理(Consumer Groups & Share Groups)

3.1 快速查看消费位点与 Lag

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--describe --group my-group

输出包含:CURRENT-OFFSET / LOG-END-OFFSET / LAG / CONSUMER-ID / HOST / CLIENT-ID

3.2 列举各类“组”

bin/kafka-groups.sh --bootstrap-server localhost:9092 --list
# 输出示例
# GROUP               TYPE      PROTOCOL
# my-consumer-group   Consumer  consumer
# my-share-group      Share     share

3.3 管理消费组(列出 / 描述 / 删除 / 重置位点)

  • 列出所有消费组

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
    
  • 描述组(默认含 offsets)

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--describe --group my-group
    
  • 查看成员

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--describe --group my-group --members
    
  • 查看成员 + 分配详情

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--describe --group my-group --members --verbose
    
  • 查看组状态

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--describe --group my-group --state
    
  • 删除组(仅当无活动成员)

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--delete --group my-group --group my-other-group
    

🔐 使用 consumer 协议的组需要对组订阅到的所有主题具备 DESCRIBE 权限;classic 协议不要求。

重置位点(一次仅支持一个组)
  • 支持范围:--all-topics / --topic(或 --from-file

  • 执行模式:默认预览、--execute 执行、--export 导出 CSV

  • 场景举例:--to-earliest / --to-latest / --to-datetime 'YYYY-MM-DDThh:mm:ss.sss' / --shift-by n / --to-offset x / --by-duration 'PnDTnHnMnS' / --to-current

  • 示例(重置到最新):

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--reset-offsets --group my-group --topic topic1 --to-latest
    

3.4 管理 Share Groups(预览)

Kafka 4.1 起提供 Share Groups 预览(默认关闭),需用 kafka-features.shshare.version=1 启用;详见发行说明。

  • 列出

    bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --list
    
  • 描述(起始位点 / 成员 / 状态等)

    bin/kafka-share-groups.sh --bootstrap-server localhost:9092 \--describe --group my-share-group
    bin/kafka-share-groups.sh --bootstrap-server localhost:9092 \--describe --group my-share-group --members
    bin/kafka-share-groups.sh --bootstrap-server localhost:9092 \--describe --group my-share-group --state
    
  • 删除主题在 share 组中的位点

    bin/kafka-share-groups.sh --bootstrap-server localhost:9092 \--delete-offsets --group my-share-group --topic topic1
    
  • 删除 share 组(仅无活动成员)

    bin/kafka-share-groups.sh --bootstrap-server localhost:9092 \--delete --group my-share-group
    

🔐 管理端同样需要对组内使用的所有主题具备 DESCRIBE 权限。
👥 Share 允许多个成员共享同一分区,与传统 consumer group 的“每分区一个成员”不同。

4. 集群维护:优雅下线、Leader 平衡、机架感知

4.1 优雅下线(Graceful Shutdown)

  • 作用:在计划重启

    1. 先将日志落盘,避免重启后的日志恢复;
    2. 受控迁移该 Broker 所领导的分区到其他副本,将不可用时长压到毫秒级。
  • 开关:

    controlled.shutdown.enable=true
    

✅ 成功前提:该 Broker 上的所有分区都存在其他存活副本(副本因子 > 1 且至少一副本在线)。

4.2 Leader 平衡(Preferred Leader)

  • Kafka 维护首选副本(副本列表中越靠前越“首选”)。默认会尝试把 Leader 恢复到首选:

    auto.leader.rebalance.enable=true
    
  • 若关闭自动平衡,可手动触发:

    bin/kafka-leader-election.sh --bootstrap-server localhost:9092 \--election-type preferred --all-topic-partitions
    

4.3 机架感知(Rack Awareness)

  • 为 Broker 标注机架:

    broker.rack=my-rack-id
    
  • 创建/修改/重分配时,Kafka 会尽量让副本跨越 min(#racks, replication-factor) 个机架;算法保证每台 Broker 承担近似相同数量的 Leader

  • 建议:每个机架的 Broker 数量尽量一致,否则少数机架会背更多副本,增加存储与复制开销。

5. 扩容与迁移:分区重分配全流程

新增 Broker 后不会自动承载旧分区,需分区重分配;过程由你发起,但复制与切换自动完成。

5.1 工具模式(互斥三选一)

  • --generate:给定主题列表目标 Broker 列表,生成候选方案
  • --execute:执行给定 JSON 方案
  • --verify:校验上一次 --execute 的进度/结果(完成/失败/进行中)

5.2 将若干主题“整体”迁到新机器

  1. 准备主题列表:
{"topics": [{ "topic": "foo1" }, { "topic": "foo2" }],"version": 1
}
  1. 生成候选方案并保存“当前分配(用于回滚)”与“建议分配(用于执行)”
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \--topics-to-move-json-file topics-to-move.json \--broker-list "5,6" --generate
  1. 执行与校验
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \--reassignment-json-file expand-cluster-reassignment.json --executebin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \--reassignment-json-file expand-cluster-reassignment.json --verify

5.3 精细化迁移(手工编写分配)

{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}
]}
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \--reassignment-json-file custom-reassignment.json --executebin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \--reassignment-json-file custom-reassignment.json --verify

5.4 下线 Broker(Decommission)

  • 目前无自动“一键下线”方案生成器;需自行列举该 Broker 上所有分区副本,并均衡地迁移到其他 Broker。
  • 规划要点:避免把大量副本迁到同一台目标机;必要时分批分波次执行。

6. 提升副本因子(线上无感扩容可靠性)

手工指定更多副本到新的 Broker,即可在线提升副本因子

{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \--reassignment-json-file increase-replication-factor.json --executebin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \--reassignment-json-file increase-replication-factor.json --verifybin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe
# 观察 ReplicationFactor 与 ISR

7. 迁移限速与进度监控(Throttle & Lag)

7.1 在执行重分配时设置复制带宽上限

bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute \--reassignment-json-file bigger-cluster.json \--throttle 50000000 \--replica-alter-log-dirs-throttle 100000000
  • 运行中可追加执行调大限速:

    bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \--additional --execute --reassignment-json-file bigger-cluster.json \--throttle 700000000
    
  • 完成后请及时移除限速--verify 会帮你清理):

    bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \--verify --reassignment-json-file bigger-cluster.json
    

7.2 验证与手工检查 throttle 配置

  • Broker 级(限速值):

    bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type brokers
    # 关注:
    # leader.replication.throttled.rate
    # follower.replication.throttled.rate
    # replica.alter.log.dirs.io.max.bytes.per.second
    
  • Topic 级(被限速的副本集合):

    bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics
    # 关注:
    # leader.replication.throttled.replicas
    # follower.replication.throttled.replicas
    
  • 必要时可用 --alter 手动修改。

7.3 安全使用要点

  1. 及时清理:重分配完成务必移除 throttle,避免影响正常复制。

  2. 确保前进:若 max(BytesInPerSec) > throttle,复制可能追不上写入,Lag 不降;监控

    kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=...,topic=...,partition=...
    

    若无下降,调大限速。

8. 配额管理(Quotas:按用户 / client-id / 组合)

8.1 设置覆盖

  • 指定 (user=user1, client-id=clientA)

    bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \--entity-type users --entity-name user1 \--entity-type clients --entity-name clientA
    
  • 仅用户:

    bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \--entity-type users --entity-name user1
    
  • 仅 client-id:

    bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \--entity-type clients --entity-name clientA
    

8.2 设置默认(--entity-default

  • 用户下的默认 client-id

    bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \--entity-type users --entity-name user1 \--entity-type clients --entity-default
    
  • 默认用户

    bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \--entity-type users --entity-default
    
  • 默认 client-id

    bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \--entity-type clients --entity-default
    

8.3 查询

# 指定 (user, client-id)
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe \--entity-type users --entity-name user1 \--entity-type clients --entity-name clientA# 指定 user / 指定 client-id / 默认实体 / 全量列表
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type clients --entity-name clientA
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-default
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type clients --entity-default
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users

9. 跨集群镜像与地理多活(Mirroring & Geo-Replication)

  • Kafka 支持跨集群 / 跨机房 / 跨地域的数据流;可结合企业网络与可用区设计进行多活或异地容灾。
  • 设计建议:配合主题白名单/正则 + 排除规则SASL/SCRAM + TLS 等安全策略与观测指标。

10. 常见风险清单(必读)

  • 不可减分区:设计初期就评估分区增长策略;语义分片要考虑“新增分区不重分布历史数据”。
  • 🧯 下线前提:优雅下线需确保每个分区有其他存活副本(RF>1)。
  • ⚖️ Leader 不均衡:重启后 Broker 会先当 Follower;需要开启自动平衡或手动 preferred election。
  • 🛰️ Rack 规划:尽量让每个机架的 Broker 数均衡,否则复制与存储压力会偏斜。
  • 🚦 Throttle 清理:重分配完成要立刻移除限速;Lag 不降要检查 BytesInPerSec 与 throttle。
  • 🔐 权限:consumer 协议的组“describe”需要对所有订阅主题具备 DESCRIBE;权限缺失会导致组状态/位点不可见。
  • 🧩 回滚准备:执行重分配前务必保存“当前分配 JSON”,失败可快速回滚。

11. 命令速查表(按场景)

目标命令
创建主题kafka-topics.sh --create --topic <t> --partitions N --replication-factor R
增分区kafka-topics.sh --alter --topic <t> --partitions N
增/删配置kafka-configs.sh --alter --entity-type topics --entity-name <t> --add-config/--delete-config
删主题kafka-topics.sh --delete --topic <t>
描述消费组kafka-consumer-groups.sh --describe --group <g>
列组成员... --members [--verbose]
组状态... --state
删除组... --delete --group <g> [--group <g2>]
重置位点... --reset-offsets --group <g> --topic <t> --to-xxx
Preferred 选主kafka-leader-election.sh --election-type preferred --all-topic-partitions
生成重分配kafka-reassign-partitions.sh --generate --topics-to-move-json-file ... --broker-list "..."
执行/校验重分配... --execute / --verify --reassignment-json-file ...
迁移限速... --throttle <B/s> --replica-alter-log-dirs-throttle <B/s>
查看 throttle(Broker/Topic)kafka-configs.sh --describe --entity-type brokers/topics
配额设置(user/client-id)kafka-configs.sh --alter --entity-type users/clients ... --add-config producer_byte_rate=...,consumer_byte_rate=...,request_percentage=...

12. 总结与实践建议

  • 以“可观测 → 变更 → 校验 → 回滚”闭环组织运维动作:任何重分配与限速都要有当前状态快照回滚 JSON
  • 把扩容当作“复制 + 切换”的受控流水线:先复制到新副本、进入 ISR,再切 Leader 与删除旧副本。
  • 把消费组当作“位点与分配”的可观测对象:任何“延迟大”的投诉,都应先 --describeLAG 与分配是否倾斜。
  • 前置容量规划:分区与副本因子是“上限设计”,后续只能增加(分区)或扩副本,不能“减分区”。

文章转载自:

http://tGFF5TU5.yznsx.cn
http://vMsJQ1h0.yznsx.cn
http://eLywG5Bz.yznsx.cn
http://DuD9xGSr.yznsx.cn
http://L8ITUaOY.yznsx.cn
http://lCMpkqq5.yznsx.cn
http://81Vfs0Ly.yznsx.cn
http://AP7LmwGx.yznsx.cn
http://ycM2QlGV.yznsx.cn
http://SKohB6Xz.yznsx.cn
http://JuWJkOgK.yznsx.cn
http://KE3I8iL1.yznsx.cn
http://UakKITzE.yznsx.cn
http://ozZEVVak.yznsx.cn
http://hZQk1Rn1.yznsx.cn
http://iueIHHm9.yznsx.cn
http://QUvIgOdX.yznsx.cn
http://LvpZ1DtZ.yznsx.cn
http://4OODHKwT.yznsx.cn
http://w5cd6gYN.yznsx.cn
http://5LlmqJiA.yznsx.cn
http://IeFuhIrY.yznsx.cn
http://ZumGfgYm.yznsx.cn
http://NpFJf7pw.yznsx.cn
http://pcUVhAKl.yznsx.cn
http://3BjJcgKc.yznsx.cn
http://RE55Tv2M.yznsx.cn
http://MmwqLB7I.yznsx.cn
http://iuxjKE0I.yznsx.cn
http://rPGH5OaQ.yznsx.cn
http://www.dtcms.com/a/383474.html

相关文章:

  • CAS理解
  • Linux动静态库开发基础:静态库与动态库的编译构建、链接使用及问题排查
  • 深度学习的定义
  • 数据库造神计划第七天---增删改查(CRUD)(3)
  • 【WitSystem】FastAPI目录架构最佳实践
  • Python的re模块
  • 条件扩散过程(附录H)
  • selenium web自动化测试
  • docker compose 部署dify
  • 接口协议全解析:从HTTP到gRPC,如何选择适合你的通信方案?
  • 单例模式重新学习
  • 【系列文章】Linux中的并发与竞争[04]-信号量
  • Linux入门(二)
  • Transformer 面试题及详细答案120道(41-50)-- 训练与优化
  • UDP-Server(3)chat聊天室
  • 【不背八股】12.十大排序算法
  • 华清远见25072班网络编程学习day5
  • 【CMake】List
  • Linux系统中查找某个动态库例如.so文件是哪个软件安装的
  • c++ unqiue指针
  • ​Go语言实战案例 — 工具开发篇:编写一个进程监控工具​
  • Roo Code 的检查点功能
  • 【go/gopls/mcp】官方gopls内置mcp server使用
  • 【无标题】神经网络算法初探
  • Genspark AI 浏览器
  • Linux内核IPsec接收机制剖析:XFRM框架与xfrm4_input.c的深度解读
  • Linux 系统下的流量控制工具之tc命令案例解析
  • 数据库造神计划第五天---增删改查(CRUD)(1)
  • 深入理解Java虚拟机:JVM高级特性与最佳实践(第3版)第九章知识点问答(10题)
  • AI表征了西方的有界,AI+体现了东方的无界