当前位置: 首页 > news >正文

第5篇、 Kafka 数据可靠性与容错机制

在分布式消息队列系统中,数据可靠性容错能力 是核心指标。Kafka 作为高吞吐、可扩展的流式处理平台,依靠副本复制、Leader 选举和 ISR 机制,保证了在节点故障时消息依然能够可靠传输与消费。

📚 目录

理论基础

  • 一、数据复制机制与 ISR

    • 核心概念介绍
      • 1. 分区副本(Partition Replica)
      • 2. Leader 与 Follower
      • 3. ISR(In-Sync Replica)
    • 工作机制
    • 数据流向图
  • 二、Leader 选举流程

    • 核心概念介绍
      • 1. Controller(控制器)
      • 2. Leader 选举触发条件
      • 3. 选举规则与流程
    • 选举优势
  • 三、Broker 宕机后的恢复

    • 故障恢复流程图
  • 四、min.insync.replicas 配置与数据丢失风险

    • 核心概念介绍
      • 1. min.insync.replicas 参数
      • 2. Producer acks 参数
      • 3. 数据丢失风险分析
    • 配置建议
    • 性能与可靠性权衡
    • ISR 状态变化图

实践案例

  • 五、案例演示:关停一个 Broker

    • 1. 环境准备
    • 2. 启动 Producer & Consumer
    • 3. 模拟故障
    • 4. 观察现象
    • 5. Broker 恢复
  • 六、总结

代码实战

  • 七、动手实战:可靠性配置 + 故障注入(含代码)
    • 1) 环境准备
    • 2) 创建 Topic(3 副本 + 要求至少 2 个副本同步)
    • 3) 代码实现与运行
      • 生产者代码(可靠性配置)
      • 消费者代码(可靠性监控)
      • 运行命令
    • 4) 故障注入与现象观察
    • 5) 小结与实践建议

可视化工具

  • 八、交互式可视化:实时查看 ISR 与发送/消费统计
    • 1) 指标服务代码(Flask)
    • 2) 可视化页面代码
    • 3) 建议的演示流程

本文将从以下几个方面展开:

  • 数据复制机制与 ISR(In-Sync Replica)
  • Leader 选举流程
  • Broker 宕机后的恢复过程
  • min.insync.replicas 配置与数据丢失风险
  • 案例演示:关停一个 Broker,观察副本切换与消费情况

重要术语解释

在深入讨论之前,先了解几个关键术语:

LEO(Log End Offset):日志结束偏移量,表示分区中最后一条消息的偏移量位置。

HW(High Watermark):高水位线,表示消费者可见的最大偏移量,HW ≤ LEO。

Replication Factor:副本因子,指定每个分区需要多少个副本。

Controller:Kafka 集群的控制器,负责管理分区和副本的分配。

ZooKeeper:Kafka 的元数据存储和协调服务(Kafka 2.8+ 版本开始支持 KRaft 模式,不再依赖 ZooKeeper)。


一、数据复制机制与 ISR

核心概念介绍

1. 分区副本(Partition Replica)

定义:Kafka 中每个分区都有多个副本,分布在不同的 Broker 上,用于提供数据冗余和容错能力。

特点

  • 每个分区有一个 Leader 和若干个 Follower
  • Leader 负责处理所有读写请求
  • Follower 只负责从 Leader 拉取数据并同步
  • 副本数量由 replication-factor 参数控制
2. Leader 与 Follower

Leader

  • 处理所有客户端的读写请求
  • 维护分区的元数据信息
  • 负责向 Follower 推送数据变更

Follower

  • 被动接收 Leader 的数据同步
  • 不处理客户端请求
  • 在 Leader 故障时可能被选为新的 Leader
3. ISR(In-Sync Replica)

定义:ISR 是"同步副本集合",包含所有与 Leader 保持同步状态的副本。

同步条件

  • Follower 的 LEO(Log End Offset)与 Leader 的 HW(High Watermark)差距在阈值内
  • 默认阈值由 replica.lag.time.max.ms 控制(通常为 10 秒)

ISR 的作用

  • 数据一致性保证:只有 ISR 中的副本才被认为是"安全"的
  • Leader 选举:新 Leader 只能从 ISR 中选择
  • 写入确认:Producer 的 acks=all 需要等待 ISR 中所有副本确认

工作机制

  • Leader 写入 → ISR 跟进:只有 ISR 副本完成写入,消息才被确认
  • Follower 滞后处理:若某个 Follower 落后过多,会被移出 ISR,避免影响整体可用性
  • 动态调整:ISR 会根据副本的同步状态动态调整

这样,Kafka 保证了即使部分 Broker 宕机,只要 ISR 中至少有一个副本存活,数据就不会丢失。

数据流向图

Consumer 客户端
ISR 管理
Kafka 集群
Broker 1 (Leader)
Broker 2 (Follower)
Broker 3 (Follower)
Producer 客户端
1. 发送消息
acks=all
2. 写入本地日志
3. 拉取数据
4. 写入本地日志
3. 拉取数据
4. 写入本地日志
5. 更新 HW
6. 报告 LEO
6. 报告 LEO
7. 检查同步状态
8. 确认写入成功
9. 可见消息
Consumer
ISR: Leader, Follower1, Follower2
High Watermark
Follower Broker 2
Follower Log 2
Follower Broker 1
Follower Log 1
Local Log
Producer
Leader Broker

数据流向说明

  1. Producer 发送:Producer 向 Leader 发送消息,设置 acks=all
  2. Leader 写入:Leader 将消息写入本地日志
  3. Follower 拉取:Follower 从 Leader 拉取新消息
  4. Follower 写入:Follower 将消息写入各自的本地日志
  5. 更新 HW:Leader 根据 Follower 的 LEO 更新 HW
  6. 报告 LEO:Follower 向 Leader 报告自己的 LEO
  7. ISR 检查:Leader 检查所有副本的同步状态
  8. 确认成功:当 ISR 中所有副本都同步后,向 Producer 确认
  9. Consumer 消费:Consumer 只能看到 HW 以下的消息

二、Leader 选举流程

核心概念介绍

1. Controller(控制器)

定义:Controller 是 Kafka 集群中的一个特殊 Broker,负责管理整个集群的元数据和协调工作。

职责

  • 监控集群中所有 Broker 的状态
  • 管理分区的 Leader 和 Follower 分配
  • 处理 Broker 的加入和离开事件
  • 协调 Leader 选举过程

选举机制

  • 集群启动时,所有 Broker 竞争成为 Controller
  • 使用 ZooKeeper 的临时节点机制确保只有一个 Controller
  • Controller 故障时,其他 Broker 会重新选举新的 Controller
2. Leader 选举触发条件

自动触发

  • Leader 所在的 Broker 宕机
  • Leader 所在的 Broker 网络分区
  • Leader 副本数据损坏

手动触发

  • 管理员主动触发分区重新分配
  • 集群扩容或缩容
3. 选举规则与流程

选举规则

  1. 优先从 ISR 中选择:新 Leader 必须来自 ISR 集合
  2. 选择第一个可用副本:在 ISR 中选择第一个可用的副本作为新 Leader
  3. 避免脏读:确保新 Leader 的数据与原 Leader 一致

选举流程

  1. 检测故障:Controller 检测到 Leader 失效
  2. 更新 ISR:从 ISR 中移除失效的副本
  3. 选择新 Leader:从剩余 ISR 中选择一个作为新 Leader
  4. 更新元数据:通知所有 Broker 更新分区元数据
  5. 客户端感知:Producer 和 Consumer 自动感知 Leader 变化

选举优势

👉 数据一致性保证:基于 ISR 的 Leader 选举确保新 Leader 上的数据与原 Leader 一致,避免数据丢失

👉 快速故障恢复:选举过程通常在几秒内完成,最小化服务中断时间

👉 自动容错:无需人工干预,系统自动处理故障和恢复


三、Broker 宕机后的恢复

Broker 故障是分布式系统的常见情况。Kafka 的恢复过程大致如下:

  1. 故障发生:Leader Broker 宕机,分区不可写。

  2. Leader 切换:Controller 从 ISR 中选出新的 Leader。

  3. 客户端感知

    • Producer 会自动更新元数据,继续向新 Leader 写入。
    • Consumer 自动订阅新的 Leader 进行消费。
  4. Broker 恢复上线

    • 重新加入集群。
    • 从新的 Leader 拉取缺失的数据,追上进度后重新加入 ISR。

这样,Kafka 在 Broker 失效和恢复时,能够保证 高可用 + 数据不丢失

故障恢复流程图

Producer Leader Broker Follower Broker 1 Follower Broker 2 Controller Consumer 正常状态:3个Broker都在线 发送消息 同步数据 同步数据 确认同步 确认同步 写入成功确认 故障发生:Leader Broker宕机 连接断开 连接断开 连接断开 Controller检测故障并选举新Leader 检测到Leader失效 检测到Leader失效 选举为新Leader 更新元数据 客户端自动感知Leader变化 重新连接新Leader 重新连接新Leader 继续处理请求 继续提供数据 原Leader恢复 重新加入集群 从新Leader同步数据 追上进度后重新加入ISR Producer Leader Broker Follower Broker 1 Follower Broker 2 Controller Consumer

故障恢复阶段说明

  1. 正常状态:3个Broker都在线,数据正常同步
  2. 故障检测:Controller检测到Leader Broker宕机
  3. Leader选举:从ISR中选择新的Leader
  4. 元数据更新:通知所有Broker更新分区元数据
  5. 客户端重连:Producer和Consumer自动连接到新Leader
  6. 服务恢复:新Leader开始处理读写请求
  7. 原Leader恢复:原Leader重新加入集群并同步数据

四、min.insync.replicas 配置与数据丢失风险

核心概念介绍

1. min.insync.replicas 参数

定义min.insync.replicas 是 Topic 级别的配置参数,指定 Producer 使用 acks=all 时,至少需要多少个副本确认写入才算成功。

作用机制

  • 当 ISR 中的副本数量 < min.insync.replicas 时,acks=all 的写入会被拒绝
  • 这是一个"安全阀",防止在副本数量不足时冒险写入
  • 默认值为 1,生产环境建议设置为 2 或更高
2. Producer acks 参数

acks=0

  • Producer 不等待任何确认
  • 最高性能,但可能丢失数据
  • 适用于对数据丢失容忍度高的场景

acks=1

  • 等待 Leader 确认写入
  • 平衡性能和可靠性
  • 在 Leader 故障时可能丢失数据

acks=all

  • 等待 ISR 中所有副本确认
  • 最高可靠性,但性能较低
  • 需要配合 min.insync.replicas 使用
3. 数据丢失风险分析

风险场景

  1. ISR 副本不足

    场景:3 副本 Topic,min.insync.replicas=2
    情况:2 个 Broker 宕机,ISR 只剩 1 个副本
    结果:acks=all 写入被拒绝,避免数据丢失
    
  2. 配置不当

    场景:min.insync.replicas=1
    情况:Leader 写入后立即宕机,Follower 未同步
    结果:数据丢失
    
  3. 网络分区

    场景:Leader 与 Follower 网络隔离
    情况:Leader 继续写入,Follower 无法同步
    结果:可能导致数据不一致
    

配置建议

生产环境最佳实践

# Topic 配置
replication-factor=3
min.insync.replicas=2# Producer 配置
acks=all
retries=10
enable.idempotence=true

配置说明

  • 副本数 ≥3:提供足够的容错能力
  • min.insync.replicas ≥2:确保至少 2 个副本同步
  • Producer 使用 acks=all:等待所有 ISR 副本确认
  • 启用幂等性:避免重复消息

性能与可靠性权衡

可靠性优先

  • min.insync.replicas=2acks=all
  • 适合金融、支付等对数据一致性要求极高的场景

性能优先

  • min.insync.replicas=1acks=1
  • 适合日志收集、监控数据等对丢失容忍度高的场景

平衡配置

  • min.insync.replicas=2acks=all
  • 在大多数场景下提供最佳的性能与可靠性平衡

ISR 状态变化图

集群启动
Leader宕机
Follower1宕机
Follower2宕机
Controller检测
移除故障副本
新Leader开始工作
移除故障副本
ISR仍有2个副本
移除故障副本
ISR仍有2个副本
原Leader重新上线
Follower1重新上线
Follower2重新上线
从新Leader拉取数据
从Leader拉取数据
从Leader拉取数据
追上进度
恢复完整ISR
正常状态
Broker1故障
Broker2故障
Broker3故障
选举新Leader
更新ISR
服务恢复
继续服务
Broker1恢复
Broker2恢复
Broker3恢复
数据同步
重新加入ISR
ISR: [Leader, Follower1, Follower2]
状态: 3个副本同步
写入: acks=all 成功
ISR: [Leader, Follower2]
状态: 2个副本同步
写入: acks=all 成功
ISR: [新Leader, Follower2]
状态: 2个副本同步
写入: acks=all 成功

ISR 状态说明

  • 正常状态:3个副本都在ISR中,数据完全同步
  • 单点故障:1个Follower故障,ISR减少但服务继续
  • Leader故障:需要选举新Leader,短暂服务中断
  • 多点故障:如果ISR副本数 < min.insync.replicas,写入被拒绝
  • 故障恢复:故障Broker重新上线后需要同步数据才能重新加入ISR

五、案例演示:关停一个 Broker

我们可以通过一个小实验观察 Kafka 的容错性。

1. 环境准备

  • 3 个 Broker(broker1, broker2, broker3)
  • 主题 demo_topic,副本数 = 3

2. 启动 Producer & Consumer

# Producer
kafka-console-producer.sh --broker-list localhost:9092 --topic demo_topic# Consumer
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo_topic --from-beginning

3. 模拟故障

# 停掉一个 Broker,例如 broker1
systemctl stop kafka@broker1

4. 观察现象

  • Controller 会自动触发 Leader 重选
  • Producer 仍然可以继续写入消息(可能有短暂抖动)。
  • Consumer 会自动切换到新 Leader 继续消费。

5. Broker 恢复

systemctl start kafka@broker1

恢复后,broker1 会 从当前 Leader 拉取缺失的数据,追上 ISR 进度,再次加入副本集合。


六、总结

Kafka 的数据可靠性与容错机制可以总结为:

  • 副本机制:保证分区多副本存储,避免单点故障。
  • ISR 集合:确保只有数据一致的副本才参与写入确认。
  • Leader 选举:在 Broker 宕机时快速切换,保证可用性。
  • min.insync.replicas:合理配置可在可靠性与性能间找到平衡。

通过本篇案例,我们看到 Kafka 即便在 Broker 故障时,也能保证 高可用与数据可靠性,这正是它被广泛应用于大规模流式处理场景的原因。


要不要我帮你画一张 Kafka 副本与 ISR 切换流程图(类似上一篇你用过的图示)?这样博客会更直观。


七、动手实战:可靠性配置 + 故障注入(含代码)

本节提供可运行的示例与故障注入步骤,帮助你直观看到 acks=allmin.insync.replicas 与 ISR 的影响。

1) 环境准备

python3 -m venv .venv && source .venv/bin/activate
pip install kafka-python

2) 创建 Topic(3 副本 + 要求至少 2 个副本同步)

kafka-topics.sh --create \--topic demo-reliability \--bootstrap-server localhost:9092 \--partitions 3 \--replication-factor 3 \--config min.insync.replicas=2

验证与观察 ISR:

kafka-topics.sh --describe --topic demo-reliability --bootstrap-server localhost:9092

3) 代码实现与运行

生产者代码(可靠性配置)
# lesson_five/producer_reliability.py
import argparse
import json
import sys
import time
from typing import Optionalfrom kafka import KafkaProducer
from kafka.errors import KafkaErrordef parse_args() -> argparse.Namespace:parser = argparse.ArgumentParser(description="Kafka reliability demo producer")parser.add_argument("--bootstrap", default="localhost:9092", help="Kafka bootstrap servers")parser.add_argument("--topic", default="demo-reliability", help="Target topic")parser.add_argument("--count", type=int, default=50, help="Number of messages to send")parser.add_argument("--acks",choices=["all", "1"],default="all",help="Producer acks setting (all or 1)",)parser.add_argument("--sleep-ms",type=int,default=50,help="Sleep between sends in milliseconds (for readability)",)return parser.parse_args()def create_producer(bootstrap_servers: str, acks: str) -> KafkaProducer:# kafka-python uses acks as int or 'all'; map string accordinglyacks_opt: Optional[object]acks_opt = "all" if acks == "all" else 1return KafkaProducer(bootstrap_servers=bootstrap_servers,acks=acks_opt,retries=10,  # retry on transient errorslinger_ms=5,value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),key_serializer=lambda v: v.encode("utf-8") if v is not None else None,)def main() -> int:args = parse_args()producer = create_producer(args.bootstrap, args.acks)print(f"Producer starting: bootstrap={args.bootstrap}, topic={args.topic}, count={args.count}, acks={args.acks}")errors = 0for i in range(args.count):key = f"user-{i % 3}"value = {"index": i, "ts": int(time.time() * 1000), "key": key}future = producer.send(args.topic, key=key, value=value)try:metadata = future.get(timeout=10)print(f"SENT ok: i={i}, partition={metadata.partition}, offset={metadata.offset}, acks={args.acks}")except KafkaError as e:errors += 1print(f"SENT error: i={i}, error={repr(e)}", file=sys.stderr)if args.sleep_ms > 0:time.sleep(args.sleep_ms / 1000.0)producer.flush()print(f"Done. total={args.count}, errors={errors}, acks={args.acks}")producer.close()return 0 if errors == 0 else 1if __name__ == "__main__":raise SystemExit(main())
消费者代码(可靠性监控)
# lesson_five/consumer_reliability.py
import argparse
from typing import Listfrom kafka import KafkaConsumer, TopicPartitiondef parse_args() -> argparse.Namespace:parser = argparse.ArgumentParser(description="Kafka reliability demo consumer")parser.add_argument("--bootstrap", default="localhost:9092", help="Kafka bootstrap servers")parser.add_argument("--topic", default="demo-reliability", help="Topic to consume")parser.add_argument("--group", default="reliability-group", help="Consumer group id")parser.add_argument("--auto-offset-reset", default="earliest", choices=["earliest", "latest"], help="Auto offset reset policy")parser.add_argument("--enable-auto-commit", action="store_true", help="Enable auto commit (default false)")parser.add_argument("--max-records", type=int, default=50, help="Print up to N messages then exit (0 means infinite)")return parser.parse_args()def main() -> int:args = parse_args()consumer = KafkaConsumer(args.topic,bootstrap_servers=args.bootstrap,group_id=args.group,enable_auto_commit=args.enable_auto_commit,auto_offset_reset=args.auto_offset_reset,value_deserializer=lambda v: v.decode("utf-8", "ignore"),)print(f"Consumer starting: bootstrap={args.bootstrap}, topic={args.topic}, group={args.group}, auto_commit={args.enable_auto_commit}")# Print initial assignment if availableconsumer.poll(timeout_ms=200)assignment: List[TopicPartition] = list(consumer.assignment())if assignment:parts = ", ".join([f"{tp.topic}-{tp.partition}" for tp in assignment])print(f"Assigned partitions: {parts}")seen = 0for message in consumer:print(f"RECV partition={message.partition}, offset={message.offset}, key={message.key}, value={message.value}")if not args.enable_auto_commit:consumer.commit()if args.max_records and args.max_records > 0:seen += 1if seen >= args.max_records:breakconsumer.close()print("Consumer closed")return 0if __name__ == "__main__":raise SystemExit(main())
运行命令
# 启动消费者(建议先启动,便于观察早期消息)
python lesson_five/consumer_reliability.py --topic demo-reliability# 另起终端,启动生产者(默认 acks=all,发送 50 条)
python lesson_five/producer_reliability.py --topic demo-reliability --count 50 --acks all

生产者参数说明

  • --acks:可选 all1(默认 all
  • --count:发送条数(默认 50)
  • --bootstrap:Kafka 地址(默认 localhost:9092
  • --sleep-ms:发送间隔毫秒数(默认 50ms)

4) 故障注入与现象观察

下述步骤可在本地多 Broker 环境(3 节点)或容器环境中完成。

  1. 先保持 3 个 Broker 正常,确认 ISR 列表包含 3 个副本,生产者以 acks=all 发送,应全部成功。
  2. 停掉其中 1 个 Broker(示例命令二选一):
# systemd(按你的服务名替换)
sudo systemctl stop kafka@broker1# 或 Docker Compose(按你的服务名替换)
docker compose stop kafka-1
  1. 再次 describe,此时 ISR 可能为 2。继续以 acks=all 发送,仍可成功(偶有短暂抖动)。
  2. 再停掉第 2 个 Broker,使 ISR=1。此时:
    • acks=all 发送将失败,常见错误为 NotEnoughReplicasNotEnoughReplicasAfterAppend
    • 若切换为 acks=1 发送,可能成功返回,但在随后 Leader 故障时存在数据丢失风险(不建议生产使用)。
  3. 依次启动被关停的 Broker:
sudo systemctl start kafka@broker1
# 或
docker compose start kafka-1
  1. 等待副本追上进度重新加入 ISR,再次 describe 可见 ISR 恢复。生产与消费恢复稳定。

5) 小结与实践建议

  • 生产环境建议:replication-factor >= 3min.insync.replicas >= 2、生产者 acks=all
  • 在 ISR 数量不足时,acks=all 会拒绝写入而非“冒险”成功,这是避免数据丢失的关键机制。
  • Python 的 kafka-python 不提供生产端幂等/事务能力,如需更强语义(EOS),可选用 confluent-kafka-python 并开启 enable.idempotence 与事务 API(需要 Kafka 端到端配合)。

进阶练习:把生产者 --acks 切换为 1all,在不同 ISR 场景下对比成功率与风险;同时观察消费者端分区与位移的变化。


八、交互式可视化:实时查看 ISR 与发送/消费统计

为了更直观地观察副本 ISR 的变化与发送/消费成效,提供了一个轻量级的可视化页面与本地指标服务:

1) 指标服务代码(Flask)

# lesson_five/metrics_server.py
import json
import threading
import time
from collections import deque, defaultdict
from dataclasses import dataclass
from typing import Deque, Dict, List, Optional, Tuplefrom flask import Flask, jsonify, request
from kafka import KafkaAdminClient
from kafka.errors import KafkaErrorapp = Flask(__name__)@dataclass
class Counters:sent_ok: int = 0sent_err: int = 0recv: int = 0metrics_lock = threading.Lock()
metrics: Dict[str, Counters] = defaultdict(Counters)
recent_errors: Deque[Tuple[float, str]] = deque(maxlen=200)def record_error(msg: str) -> None:with metrics_lock:recent_errors.append((time.time(), msg))def inc(topic: str, key: str, value: int = 1) -> None:with metrics_lock:c = metrics[topic]if key == "sent_ok":c.sent_ok += valueelif key == "sent_err":c.sent_err += valueelif key == "recv":c.recv += value@app.route("/api/metrics")
def api_metrics():with metrics_lock:data = {t: {"sent_ok": c.sent_ok, "sent_err": c.sent_err, "recv": c.recv} for t, c in metrics.items()}errs = list(recent_errors)return jsonify({"metrics": data, "errors": errs, "ts": int(time.time() * 1000)})def describe_topic(bootstrap: str, topic: str) -> Optional[dict]:try:admin = KafkaAdminClient(bootstrap_servers=bootstrap, client_id="lesson5-metrics")# kafka-python AdminClient doesn't expose describe directly; use _client for metadatamd = admin._client.clustermd.request_update()# wait a bit for metadatadeadline = time.time() + 5while time.time() < deadline and not md.topics():time.sleep(0.1)if topic not in md.topics():return Noneparts = md.partitions_for_topic(topic) or []partitions = []for p in parts:leader = md.leader_for_partition(topic, p)replicas = md.replicas_for_partition(topic, p) or []isr = md.in_sync_replicas_for_partition(topic, p) or []partitions.append({"partition": p,"leader": getattr(leader, "id", leader),"replicas": [getattr(n, "id", n) for n in replicas],"isr": [getattr(n, "id", n) for n in isr],})brokers = [getattr(b, "nodeId", getattr(b, "id", None)) for b in md.brokers()]return {"topic": topic, "brokers": brokers, "partitions": partitions}except Exception as e:record_error(f"describe_topic error: {e}")return None@app.route("/api/isr")
def api_isr():bootstrap = request.args.get("bootstrap", "localhost:9092")topic = request.args.get("topic", "demo-reliability")data = describe_topic(bootstrap, topic)if not data:return jsonify({"ok": False, "error": "topic not found or metadata unavailable"}), 404return jsonify({"ok": True, "data": data, "ts": int(time.time() * 1000)})@app.route("/api/mark", methods=["POST"])
def api_mark():try:body = request.get_json(force=True) or {}except Exception:body = {}topic = body.get("topic", "demo-reliability")kind = body.get("kind", "sent_ok")  # sent_ok | sent_err | recvvalue = int(body.get("value", 1))inc(topic, kind, value)return jsonify({"ok": True})@app.route("/")
def root():return jsonify({"ok": True, "endpoints": ["/api/metrics", "/api/isr?topic=...", "/api/mark"]})def run(host: str = "127.0.0.1", port: int = 5005):app.run(host=host, port=port, debug=False)if __name__ == "__main__":run()

启动服务

python lesson_five/metrics_server.py

API 接口说明

  • GET /api/isr?bootstrap=...&topic=...:查询 Topic 的分区 Leader/Replicas/ISR
  • GET /api/metrics:返回发送/接收累计计数与错误列表
  • POST /api/mark:可手动上报一条计数(示例页面不需要手动调用)

2) 可视化页面代码

<!-- lesson_five/visualization.html -->
<!DOCTYPE html>
<html lang="zh-CN">
<head><meta charset="UTF-8" /><meta name="viewport" content="width=device-width, initial-scale=1.0" /><title>Kafka ISR 与可靠性可视化</title><link rel="preconnect" href="https://cdn.jsdelivr.net" /><script src="https://cdn.jsdelivr.net/npm/mermaid@10/dist/mermaid.min.js"></script><style>:root {--bg: #0f172a;--panel: #111827;--muted: #9ca3af;--text: #e5e7eb;--accent: #60a5fa;--good: #34d399;--warn: #f59e0b;--bad: #ef4444;--border: #374151;}html, body {margin: 0;padding: 0;background: var(--bg);color: var(--text);font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Helvetica, Arial;}.container { max-width: 1200px; margin: 0 auto; padding: 24px; }h1, h2 { font-weight: 600; letter-spacing: 0.2px; }h1 { font-size: 24px; margin: 0 0 12px; }h2 { font-size: 18px; margin: 24px 0 12px; }.panel { background: var(--panel); border: 1px solid var(--border); border-radius: 10px; padding: 16px; margin-bottom: 16px; }.row { display: flex; gap: 12px; flex-wrap: wrap; align-items: center; }label { color: var(--muted); }input, select { background: #0b1220; color: var(--text); border: 1px solid var(--border); border-radius: 8px; padding: 8px 10px; }input[type="text"]{ min-width: 220px; }button { background: #1f2937; color: var(--text); border: 1px solid var(--border); border-radius: 8px; padding: 8px 12px; cursor: pointer; }button:hover { border-color: var(--accent); }.grid { display: grid; grid-template-columns: 1fr; gap: 16px; }@media (min-width: 980px) { .grid { grid-template-columns: 1.1fr 0.9fr; } }.diagram { background: #0b1220; border: 1px solid var(--border); border-radius: 10px; padding: 12px; overflow: auto; }.logs { background: #0b1220; border: 1px solid var(--border); border-radius: 10px; padding: 12px; height: 200px; overflow: auto; font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, monospace; font-size: 12px; line-height: 1.5; }.stats { display: grid; grid-template-columns: repeat(3, 1fr); gap: 12px; }.stat { background: #0b1220; border: 1px solid var(--border); border-radius: 10px; padding: 12px; text-align: center; }.stat h3 { margin: 0 0 6px; font-size: 12px; color: var(--muted); font-weight: 500; }.stat .v { font-size: 20px; font-weight: 700; }</style>
</head>
<body><div class="container"><h1>Kafka ISR 与可靠性可视化</h1><div class="panel"><div class="row"><label>Bootstrap</label><input id="bootstrap" type="text" value="localhost:9092" /><label>Topic</label><input id="topic" type="text" value="demo-reliability" /><label>Metrics API</label><input id="api" type="text" value="http://127.0.0.1:5005" /><button id="refresh">刷新 ISR</button></div></div><div class="grid"><div class="panel"><h2>分区与 ISR</h2><div id="isr" class="diagram"></div></div><div class="panel"><h2>发送/接收统计</h2><div class="stats"><div class="stat"><h3>发送成功</h3><div id="sent_ok" class="v">0</div></div><div class="stat"><h3>发送失败</h3><div id="sent_err" class="v">0</div></div><div class="stat"><h3>消费条数</h3><div id="recv" class="v">0</div></div></div></div></div><div class="panel"><h2>近期错误</h2><div id="logs" class="logs"></div></div></div><script>(function ensureMermaid(cb){function ok(){ try{ mermaid.initialize({ startOnLoad: false, theme: 'dark', securityLevel: 'loose' }); cb(); }catch(e){ console.error(e); } }if (window.mermaid) return ok();const s = document.createElement('script');s.src = 'https://cdn.jsdelivr.net/npm/mermaid@10/dist/mermaid.min.js';s.onload = ok; s.onerror = ok; document.head.appendChild(s);})(function(){ /* ready */ });const isrDiv = document.getElementById('isr');const logsDiv = document.getElementById('logs');const sentOkEl = document.getElementById('sent_ok');const sentErrEl = document.getElementById('sent_err');const recvEl = document.getElementById('recv');const bootstrapInput = document.getElementById('bootstrap');const topicInput = document.getElementById('topic');const apiInput = document.getElementById('api');const refreshBtn = document.getElementById('refresh');function log(line){const t = new Date().toLocaleTimeString();logsDiv.innerText += `[${t}] ${line}\n`;logsDiv.scrollTop = logsDiv.scrollHeight;}async function renderMermaid(targetEl, def) {try {const id = `m-${Math.random().toString(36).slice(2)}`;const out = await mermaid.render(id, def);targetEl.innerHTML = out.svg || '';} catch (e) {targetEl.innerHTML = `<pre style="white-space:pre-wrap;color:#fca5a5">渲染失败: ${String(e)}</pre>`;}}function buildIsrDiagram(data){// data: {brokers:number[], partitions:[{partition, leader, replicas[], isr[]}]}]let def = 'graph TD\n';def += '  subgraph "Topic ' + data.topic + '"\n';for (const p of data.partitions){const isr = p.isr.join(',');const leader = p.leader;const label = `P${p.partition} (L:${leader} | ISR:[${isr}])`;def += `    P${p.partition}["${label}"]\n`;}def += '  end\n';for (const b of data.brokers){def += `  B${b}((Broker ${b}))\n`;}for (const p of data.partitions){def += `  P${p.partition} --> B${p.leader}\n`;for (const r of p.replicas){ def += `  P${p.partition} -.replica.-> B${r}\n`; }}return def;}async function fetchIsr(){const api = apiInput.value.replace(/\/$/, '');const topic = encodeURIComponent(topicInput.value);const bootstrap = encodeURIComponent(bootstrapInput.value);try{const res = await fetch(`${api}/api/isr?topic=${topic}&bootstrap=${bootstrap}`);if (!res.ok) throw new Error('isr http ' + res.status);const j = await res.json();if (!j.ok) throw new Error('isr api not ok');const def = buildIsrDiagram(j.data);await renderMermaid(isrDiv, def);log('刷新 ISR 成功');}catch(e){ log('刷新 ISR 失败: ' + e); }}async function fetchMetrics(){const api = apiInput.value.replace(/\/$/, '');try{const res = await fetch(`${api}/api/metrics`);if (!res.ok) throw new Error('metrics http ' + res.status);const j = await res.json();const t = topicInput.value;const m = (j.metrics && j.metrics[t]) || {sent_ok:0, sent_err:0, recv:0};sentOkEl.innerText = m.sent_ok;sentErrEl.innerText = m.sent_err;recvEl.innerText = m.recv;// errors listconst errs = j.errors || [];if (errs.length){logsDiv.innerText = '';for (const [ts, line] of errs){const tline = new Date(ts*1000).toLocaleTimeString();logsDiv.innerText += `[${tline}] ${line}\n`;}}}catch(e){ /* ignore transient */ }}refreshBtn.addEventListener('click', fetchIsr);setInterval(fetchMetrics, 1000);setInterval(fetchIsr, 5000);setTimeout(fetchIsr, 300);</script>
</body>
</html>

使用说明

  • 填写 Kafka BootstrapTopicMetrics API 地址(默认 localhost:9092http://127.0.0.1:5005
  • 点击"刷新 ISR"或等待定时刷新,页面会展示每个分区的 Leader 与 ISR 列表,并通过 Mermaid 图连线到 Broker
  • 右侧看板实时展示:发送成功、发送失败、消费条数

统计的来源:

  • producer_reliability.pyconsumer_reliability.py 可在业务处理处通过 HTTP 调用 /api/mark 上报计数;
  • 本示例为了最少侵入,先使用“外部观察 + 手动触发”模式,推荐你按需将上报逻辑嵌入到生产/消费流程中。

3) 建议的演示流程

  1. 启动指标服务与可视化页面。
  2. 启动消费者:
python lesson_five/consumer_reliability.py --topic demo-reliability
  1. 启动生产者(默认 acks=all):
python lesson_five/producer_reliability.py --topic demo-reliability --count 50 --acks all
  1. 在多 Broker 环境下,依照“七、动手实战”的故障注入步骤逐一停/启 Broker:
    • 页面左侧 ISR 图会随 describe 结果变化(每 5s 刷新一次);
    • 发送失败计数会在 ISR < min.insync.replicasacks=all 场景上升;
    • 重新启动 Broker 后,ISR 追上恢复,可再次观察指标变化。

你也可以将生产者与消费者脚本中对成功/失败、消费计数的位置,调用 POST /api/mark 进行自动上报,使统计更精确。
image


文章转载自:

http://BhFenPoJ.chzbq.cn
http://VPwuNOpG.chzbq.cn
http://tEVtqJ9m.chzbq.cn
http://zqdBjLex.chzbq.cn
http://cm7E6zMM.chzbq.cn
http://IDpT1EYi.chzbq.cn
http://QXf4pc1A.chzbq.cn
http://XsyTf0RY.chzbq.cn
http://5ugULqZq.chzbq.cn
http://dpVoN64a.chzbq.cn
http://bgdRKuAr.chzbq.cn
http://3m4SILB8.chzbq.cn
http://mzHwfOrs.chzbq.cn
http://CdWUis0Q.chzbq.cn
http://ZZNKjSrp.chzbq.cn
http://ryBgxYJ4.chzbq.cn
http://6MpZzOzt.chzbq.cn
http://VpXs4KWB.chzbq.cn
http://A5hnRu6c.chzbq.cn
http://aspGQcxv.chzbq.cn
http://xHWIyxRk.chzbq.cn
http://ejRSBwBi.chzbq.cn
http://jrT2aqyn.chzbq.cn
http://moBdzZCA.chzbq.cn
http://e1Cs6JEx.chzbq.cn
http://ZOB8Y6DK.chzbq.cn
http://HE10Jomc.chzbq.cn
http://7Dnum94h.chzbq.cn
http://AEyUx9xv.chzbq.cn
http://qWomH3KI.chzbq.cn
http://www.dtcms.com/a/376474.html

相关文章:

  • EasyExcel部署Docker缺少字体报错
  • CentOS Steam 9安装 Redis
  • 将GitHub远程仓库修改为ssh
  • 什么是测试
  • 在pycharm终端安装torch
  • P1141 01迷宫
  • 大模型中的位置编码详解
  • 【华为OD】贪吃的猴子
  • 【CS32L015C8T6】下载Hex文件配置及异常现象解决方法
  • PySpark EDA 完整案例介绍,附代码(三)
  • 强化学习 Reinforcement Learing
  • 数据库物理外键与逻辑外键全解析
  • 分布式专题——8 京东热点缓存探测系统JDhotkey架构剖析
  • 计算机系统性能、架构设计、调度策略论文分类体系参考
  • Mujoco学习记录
  • [react] react-router-dom是啥?
  • uniapp,vue2 置顶功能实现,默认右边半隐藏,点击一次移出来,点击二次置顶,一段时间不操隐藏
  • 佩京VR重走长征路模拟系统
  • HTML详解
  • ai生成文章,流式传输(uniapp,微信小程序)
  • JVM 内存参数设置详解!
  • 医院高值耗材智能化管理路径分析(下)
  • 上市公司人工智能水平指数 1993-2024
  • AI/AR智能眼镜步入全球破圈增长期,五大科技大厂入局加剧生态市场角逐
  • FastGPT源码解析 Agent 智能体插件实现代码分析
  • 【Fastjson】Fastjson2 在不同 Modules 模块包下,@JSONField name映射无法反序列化的 BUG 及解决
  • [特殊字符] 从助手到引擎:基于 GPT 的战略协作系统演示
  • SSE 模仿 GPT 响应
  • ThingsKit物联网平台 v2.0.0 发布|前端UI重构、底层架构升级
  • 面向对象数据分析实战编程题:销售数据统计与可视化(Python)