第5篇、 Kafka 数据可靠性与容错机制
在分布式消息队列系统中,数据可靠性 与 容错能力 是核心指标。Kafka 作为高吞吐、可扩展的流式处理平台,依靠副本复制、Leader 选举和 ISR 机制,保证了在节点故障时消息依然能够可靠传输与消费。
📚 目录
理论基础
-
一、数据复制机制与 ISR
- 核心概念介绍
- 1. 分区副本(Partition Replica)
- 2. Leader 与 Follower
- 3. ISR(In-Sync Replica)
- 工作机制
- 数据流向图
- 核心概念介绍
-
二、Leader 选举流程
- 核心概念介绍
- 1. Controller(控制器)
- 2. Leader 选举触发条件
- 3. 选举规则与流程
- 选举优势
- 核心概念介绍
-
三、Broker 宕机后的恢复
- 故障恢复流程图
-
四、
min.insync.replicas
配置与数据丢失风险- 核心概念介绍
- 1. min.insync.replicas 参数
- 2. Producer acks 参数
- 3. 数据丢失风险分析
- 配置建议
- 性能与可靠性权衡
- ISR 状态变化图
- 核心概念介绍
实践案例
-
五、案例演示:关停一个 Broker
- 1. 环境准备
- 2. 启动 Producer & Consumer
- 3. 模拟故障
- 4. 观察现象
- 5. Broker 恢复
-
六、总结
代码实战
- 七、动手实战:可靠性配置 + 故障注入(含代码)
- 1) 环境准备
- 2) 创建 Topic(3 副本 + 要求至少 2 个副本同步)
- 3) 代码实现与运行
- 生产者代码(可靠性配置)
- 消费者代码(可靠性监控)
- 运行命令
- 4) 故障注入与现象观察
- 5) 小结与实践建议
可视化工具
- 八、交互式可视化:实时查看 ISR 与发送/消费统计
- 1) 指标服务代码(Flask)
- 2) 可视化页面代码
- 3) 建议的演示流程
本文将从以下几个方面展开:
- 数据复制机制与 ISR(In-Sync Replica)
- Leader 选举流程
- Broker 宕机后的恢复过程
min.insync.replicas
配置与数据丢失风险- 案例演示:关停一个 Broker,观察副本切换与消费情况
重要术语解释
在深入讨论之前,先了解几个关键术语:
LEO(Log End Offset):日志结束偏移量,表示分区中最后一条消息的偏移量位置。
HW(High Watermark):高水位线,表示消费者可见的最大偏移量,HW ≤ LEO。
Replication Factor:副本因子,指定每个分区需要多少个副本。
Controller:Kafka 集群的控制器,负责管理分区和副本的分配。
ZooKeeper:Kafka 的元数据存储和协调服务(Kafka 2.8+ 版本开始支持 KRaft 模式,不再依赖 ZooKeeper)。
一、数据复制机制与 ISR
核心概念介绍
1. 分区副本(Partition Replica)
定义:Kafka 中每个分区都有多个副本,分布在不同的 Broker 上,用于提供数据冗余和容错能力。
特点:
- 每个分区有一个 Leader 和若干个 Follower
- Leader 负责处理所有读写请求
- Follower 只负责从 Leader 拉取数据并同步
- 副本数量由
replication-factor
参数控制
2. Leader 与 Follower
Leader:
- 处理所有客户端的读写请求
- 维护分区的元数据信息
- 负责向 Follower 推送数据变更
Follower:
- 被动接收 Leader 的数据同步
- 不处理客户端请求
- 在 Leader 故障时可能被选为新的 Leader
3. ISR(In-Sync Replica)
定义:ISR 是"同步副本集合",包含所有与 Leader 保持同步状态的副本。
同步条件:
- Follower 的 LEO(Log End Offset)与 Leader 的 HW(High Watermark)差距在阈值内
- 默认阈值由
replica.lag.time.max.ms
控制(通常为 10 秒)
ISR 的作用:
- 数据一致性保证:只有 ISR 中的副本才被认为是"安全"的
- Leader 选举:新 Leader 只能从 ISR 中选择
- 写入确认:Producer 的
acks=all
需要等待 ISR 中所有副本确认
工作机制
- Leader 写入 → ISR 跟进:只有 ISR 副本完成写入,消息才被确认
- Follower 滞后处理:若某个 Follower 落后过多,会被移出 ISR,避免影响整体可用性
- 动态调整:ISR 会根据副本的同步状态动态调整
这样,Kafka 保证了即使部分 Broker 宕机,只要 ISR 中至少有一个副本存活,数据就不会丢失。
数据流向图
数据流向说明:
- Producer 发送:Producer 向 Leader 发送消息,设置
acks=all
- Leader 写入:Leader 将消息写入本地日志
- Follower 拉取:Follower 从 Leader 拉取新消息
- Follower 写入:Follower 将消息写入各自的本地日志
- 更新 HW:Leader 根据 Follower 的 LEO 更新 HW
- 报告 LEO:Follower 向 Leader 报告自己的 LEO
- ISR 检查:Leader 检查所有副本的同步状态
- 确认成功:当 ISR 中所有副本都同步后,向 Producer 确认
- Consumer 消费:Consumer 只能看到 HW 以下的消息
二、Leader 选举流程
核心概念介绍
1. Controller(控制器)
定义:Controller 是 Kafka 集群中的一个特殊 Broker,负责管理整个集群的元数据和协调工作。
职责:
- 监控集群中所有 Broker 的状态
- 管理分区的 Leader 和 Follower 分配
- 处理 Broker 的加入和离开事件
- 协调 Leader 选举过程
选举机制:
- 集群启动时,所有 Broker 竞争成为 Controller
- 使用 ZooKeeper 的临时节点机制确保只有一个 Controller
- Controller 故障时,其他 Broker 会重新选举新的 Controller
2. Leader 选举触发条件
自动触发:
- Leader 所在的 Broker 宕机
- Leader 所在的 Broker 网络分区
- Leader 副本数据损坏
手动触发:
- 管理员主动触发分区重新分配
- 集群扩容或缩容
3. 选举规则与流程
选举规则:
- 优先从 ISR 中选择:新 Leader 必须来自 ISR 集合
- 选择第一个可用副本:在 ISR 中选择第一个可用的副本作为新 Leader
- 避免脏读:确保新 Leader 的数据与原 Leader 一致
选举流程:
- 检测故障:Controller 检测到 Leader 失效
- 更新 ISR:从 ISR 中移除失效的副本
- 选择新 Leader:从剩余 ISR 中选择一个作为新 Leader
- 更新元数据:通知所有 Broker 更新分区元数据
- 客户端感知:Producer 和 Consumer 自动感知 Leader 变化
选举优势
👉 数据一致性保证:基于 ISR 的 Leader 选举确保新 Leader 上的数据与原 Leader 一致,避免数据丢失
👉 快速故障恢复:选举过程通常在几秒内完成,最小化服务中断时间
👉 自动容错:无需人工干预,系统自动处理故障和恢复
三、Broker 宕机后的恢复
Broker 故障是分布式系统的常见情况。Kafka 的恢复过程大致如下:
-
故障发生:Leader Broker 宕机,分区不可写。
-
Leader 切换:Controller 从 ISR 中选出新的 Leader。
-
客户端感知:
- Producer 会自动更新元数据,继续向新 Leader 写入。
- Consumer 自动订阅新的 Leader 进行消费。
-
Broker 恢复上线:
- 重新加入集群。
- 从新的 Leader 拉取缺失的数据,追上进度后重新加入 ISR。
这样,Kafka 在 Broker 失效和恢复时,能够保证 高可用 + 数据不丢失。
故障恢复流程图
故障恢复阶段说明:
- 正常状态:3个Broker都在线,数据正常同步
- 故障检测:Controller检测到Leader Broker宕机
- Leader选举:从ISR中选择新的Leader
- 元数据更新:通知所有Broker更新分区元数据
- 客户端重连:Producer和Consumer自动连接到新Leader
- 服务恢复:新Leader开始处理读写请求
- 原Leader恢复:原Leader重新加入集群并同步数据
四、min.insync.replicas
配置与数据丢失风险
核心概念介绍
1. min.insync.replicas 参数
定义:min.insync.replicas
是 Topic 级别的配置参数,指定 Producer 使用 acks=all
时,至少需要多少个副本确认写入才算成功。
作用机制:
- 当 ISR 中的副本数量 <
min.insync.replicas
时,acks=all
的写入会被拒绝 - 这是一个"安全阀",防止在副本数量不足时冒险写入
- 默认值为 1,生产环境建议设置为 2 或更高
2. Producer acks 参数
acks=0:
- Producer 不等待任何确认
- 最高性能,但可能丢失数据
- 适用于对数据丢失容忍度高的场景
acks=1:
- 等待 Leader 确认写入
- 平衡性能和可靠性
- 在 Leader 故障时可能丢失数据
acks=all:
- 等待 ISR 中所有副本确认
- 最高可靠性,但性能较低
- 需要配合
min.insync.replicas
使用
3. 数据丢失风险分析
风险场景:
-
ISR 副本不足:
场景:3 副本 Topic,min.insync.replicas=2 情况:2 个 Broker 宕机,ISR 只剩 1 个副本 结果:acks=all 写入被拒绝,避免数据丢失
-
配置不当:
场景:min.insync.replicas=1 情况:Leader 写入后立即宕机,Follower 未同步 结果:数据丢失
-
网络分区:
场景:Leader 与 Follower 网络隔离 情况:Leader 继续写入,Follower 无法同步 结果:可能导致数据不一致
配置建议
生产环境最佳实践:
# Topic 配置
replication-factor=3
min.insync.replicas=2# Producer 配置
acks=all
retries=10
enable.idempotence=true
配置说明:
- 副本数 ≥3:提供足够的容错能力
- min.insync.replicas ≥2:确保至少 2 个副本同步
- Producer 使用
acks=all
:等待所有 ISR 副本确认 - 启用幂等性:避免重复消息
性能与可靠性权衡
可靠性优先:
min.insync.replicas=2
,acks=all
- 适合金融、支付等对数据一致性要求极高的场景
性能优先:
min.insync.replicas=1
,acks=1
- 适合日志收集、监控数据等对丢失容忍度高的场景
平衡配置:
min.insync.replicas=2
,acks=all
- 在大多数场景下提供最佳的性能与可靠性平衡
ISR 状态变化图
ISR 状态说明:
- 正常状态:3个副本都在ISR中,数据完全同步
- 单点故障:1个Follower故障,ISR减少但服务继续
- Leader故障:需要选举新Leader,短暂服务中断
- 多点故障:如果ISR副本数 < min.insync.replicas,写入被拒绝
- 故障恢复:故障Broker重新上线后需要同步数据才能重新加入ISR
五、案例演示:关停一个 Broker
我们可以通过一个小实验观察 Kafka 的容错性。
1. 环境准备
- 3 个 Broker(broker1, broker2, broker3)
- 主题
demo_topic
,副本数 = 3
2. 启动 Producer & Consumer
# Producer
kafka-console-producer.sh --broker-list localhost:9092 --topic demo_topic# Consumer
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo_topic --from-beginning
3. 模拟故障
# 停掉一个 Broker,例如 broker1
systemctl stop kafka@broker1
4. 观察现象
- Controller 会自动触发 Leader 重选。
- Producer 仍然可以继续写入消息(可能有短暂抖动)。
- Consumer 会自动切换到新 Leader 继续消费。
5. Broker 恢复
systemctl start kafka@broker1
恢复后,broker1 会 从当前 Leader 拉取缺失的数据,追上 ISR 进度,再次加入副本集合。
六、总结
Kafka 的数据可靠性与容错机制可以总结为:
- 副本机制:保证分区多副本存储,避免单点故障。
- ISR 集合:确保只有数据一致的副本才参与写入确认。
- Leader 选举:在 Broker 宕机时快速切换,保证可用性。
- min.insync.replicas:合理配置可在可靠性与性能间找到平衡。
通过本篇案例,我们看到 Kafka 即便在 Broker 故障时,也能保证 高可用与数据可靠性,这正是它被广泛应用于大规模流式处理场景的原因。
要不要我帮你画一张 Kafka 副本与 ISR 切换流程图(类似上一篇你用过的图示)?这样博客会更直观。
七、动手实战:可靠性配置 + 故障注入(含代码)
本节提供可运行的示例与故障注入步骤,帮助你直观看到 acks=all
、min.insync.replicas
与 ISR 的影响。
1) 环境准备
python3 -m venv .venv && source .venv/bin/activate
pip install kafka-python
2) 创建 Topic(3 副本 + 要求至少 2 个副本同步)
kafka-topics.sh --create \--topic demo-reliability \--bootstrap-server localhost:9092 \--partitions 3 \--replication-factor 3 \--config min.insync.replicas=2
验证与观察 ISR:
kafka-topics.sh --describe --topic demo-reliability --bootstrap-server localhost:9092
3) 代码实现与运行
生产者代码(可靠性配置)
# lesson_five/producer_reliability.py
import argparse
import json
import sys
import time
from typing import Optionalfrom kafka import KafkaProducer
from kafka.errors import KafkaErrordef parse_args() -> argparse.Namespace:parser = argparse.ArgumentParser(description="Kafka reliability demo producer")parser.add_argument("--bootstrap", default="localhost:9092", help="Kafka bootstrap servers")parser.add_argument("--topic", default="demo-reliability", help="Target topic")parser.add_argument("--count", type=int, default=50, help="Number of messages to send")parser.add_argument("--acks",choices=["all", "1"],default="all",help="Producer acks setting (all or 1)",)parser.add_argument("--sleep-ms",type=int,default=50,help="Sleep between sends in milliseconds (for readability)",)return parser.parse_args()def create_producer(bootstrap_servers: str, acks: str) -> KafkaProducer:# kafka-python uses acks as int or 'all'; map string accordinglyacks_opt: Optional[object]acks_opt = "all" if acks == "all" else 1return KafkaProducer(bootstrap_servers=bootstrap_servers,acks=acks_opt,retries=10, # retry on transient errorslinger_ms=5,value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),key_serializer=lambda v: v.encode("utf-8") if v is not None else None,)def main() -> int:args = parse_args()producer = create_producer(args.bootstrap, args.acks)print(f"Producer starting: bootstrap={args.bootstrap}, topic={args.topic}, count={args.count}, acks={args.acks}")errors = 0for i in range(args.count):key = f"user-{i % 3}"value = {"index": i, "ts": int(time.time() * 1000), "key": key}future = producer.send(args.topic, key=key, value=value)try:metadata = future.get(timeout=10)print(f"SENT ok: i={i}, partition={metadata.partition}, offset={metadata.offset}, acks={args.acks}")except KafkaError as e:errors += 1print(f"SENT error: i={i}, error={repr(e)}", file=sys.stderr)if args.sleep_ms > 0:time.sleep(args.sleep_ms / 1000.0)producer.flush()print(f"Done. total={args.count}, errors={errors}, acks={args.acks}")producer.close()return 0 if errors == 0 else 1if __name__ == "__main__":raise SystemExit(main())
消费者代码(可靠性监控)
# lesson_five/consumer_reliability.py
import argparse
from typing import Listfrom kafka import KafkaConsumer, TopicPartitiondef parse_args() -> argparse.Namespace:parser = argparse.ArgumentParser(description="Kafka reliability demo consumer")parser.add_argument("--bootstrap", default="localhost:9092", help="Kafka bootstrap servers")parser.add_argument("--topic", default="demo-reliability", help="Topic to consume")parser.add_argument("--group", default="reliability-group", help="Consumer group id")parser.add_argument("--auto-offset-reset", default="earliest", choices=["earliest", "latest"], help="Auto offset reset policy")parser.add_argument("--enable-auto-commit", action="store_true", help="Enable auto commit (default false)")parser.add_argument("--max-records", type=int, default=50, help="Print up to N messages then exit (0 means infinite)")return parser.parse_args()def main() -> int:args = parse_args()consumer = KafkaConsumer(args.topic,bootstrap_servers=args.bootstrap,group_id=args.group,enable_auto_commit=args.enable_auto_commit,auto_offset_reset=args.auto_offset_reset,value_deserializer=lambda v: v.decode("utf-8", "ignore"),)print(f"Consumer starting: bootstrap={args.bootstrap}, topic={args.topic}, group={args.group}, auto_commit={args.enable_auto_commit}")# Print initial assignment if availableconsumer.poll(timeout_ms=200)assignment: List[TopicPartition] = list(consumer.assignment())if assignment:parts = ", ".join([f"{tp.topic}-{tp.partition}" for tp in assignment])print(f"Assigned partitions: {parts}")seen = 0for message in consumer:print(f"RECV partition={message.partition}, offset={message.offset}, key={message.key}, value={message.value}")if not args.enable_auto_commit:consumer.commit()if args.max_records and args.max_records > 0:seen += 1if seen >= args.max_records:breakconsumer.close()print("Consumer closed")return 0if __name__ == "__main__":raise SystemExit(main())
运行命令
# 启动消费者(建议先启动,便于观察早期消息)
python lesson_five/consumer_reliability.py --topic demo-reliability# 另起终端,启动生产者(默认 acks=all,发送 50 条)
python lesson_five/producer_reliability.py --topic demo-reliability --count 50 --acks all
生产者参数说明:
--acks
:可选all
或1
(默认all
)--count
:发送条数(默认 50)--bootstrap
:Kafka 地址(默认localhost:9092
)--sleep-ms
:发送间隔毫秒数(默认 50ms)
4) 故障注入与现象观察
下述步骤可在本地多 Broker 环境(3 节点)或容器环境中完成。
- 先保持 3 个 Broker 正常,确认
ISR
列表包含 3 个副本,生产者以acks=all
发送,应全部成功。 - 停掉其中 1 个 Broker(示例命令二选一):
# systemd(按你的服务名替换)
sudo systemctl stop kafka@broker1# 或 Docker Compose(按你的服务名替换)
docker compose stop kafka-1
- 再次
describe
,此时 ISR 可能为 2。继续以acks=all
发送,仍可成功(偶有短暂抖动)。 - 再停掉第 2 个 Broker,使 ISR=1。此时:
- 以
acks=all
发送将失败,常见错误为NotEnoughReplicas
或NotEnoughReplicasAfterAppend
; - 若切换为
acks=1
发送,可能成功返回,但在随后 Leader 故障时存在数据丢失风险(不建议生产使用)。
- 以
- 依次启动被关停的 Broker:
sudo systemctl start kafka@broker1
# 或
docker compose start kafka-1
- 等待副本追上进度重新加入 ISR,再次
describe
可见 ISR 恢复。生产与消费恢复稳定。
5) 小结与实践建议
- 生产环境建议:
replication-factor >= 3
、min.insync.replicas >= 2
、生产者acks=all
。 - 在 ISR 数量不足时,
acks=all
会拒绝写入而非“冒险”成功,这是避免数据丢失的关键机制。 - Python 的
kafka-python
不提供生产端幂等/事务能力,如需更强语义(EOS),可选用confluent-kafka-python
并开启enable.idempotence
与事务 API(需要 Kafka 端到端配合)。
进阶练习:把生产者
--acks
切换为1
与all
,在不同 ISR 场景下对比成功率与风险;同时观察消费者端分区与位移的变化。
八、交互式可视化:实时查看 ISR 与发送/消费统计
为了更直观地观察副本 ISR 的变化与发送/消费成效,提供了一个轻量级的可视化页面与本地指标服务:
1) 指标服务代码(Flask)
# lesson_five/metrics_server.py
import json
import threading
import time
from collections import deque, defaultdict
from dataclasses import dataclass
from typing import Deque, Dict, List, Optional, Tuplefrom flask import Flask, jsonify, request
from kafka import KafkaAdminClient
from kafka.errors import KafkaErrorapp = Flask(__name__)@dataclass
class Counters:sent_ok: int = 0sent_err: int = 0recv: int = 0metrics_lock = threading.Lock()
metrics: Dict[str, Counters] = defaultdict(Counters)
recent_errors: Deque[Tuple[float, str]] = deque(maxlen=200)def record_error(msg: str) -> None:with metrics_lock:recent_errors.append((time.time(), msg))def inc(topic: str, key: str, value: int = 1) -> None:with metrics_lock:c = metrics[topic]if key == "sent_ok":c.sent_ok += valueelif key == "sent_err":c.sent_err += valueelif key == "recv":c.recv += value@app.route("/api/metrics")
def api_metrics():with metrics_lock:data = {t: {"sent_ok": c.sent_ok, "sent_err": c.sent_err, "recv": c.recv} for t, c in metrics.items()}errs = list(recent_errors)return jsonify({"metrics": data, "errors": errs, "ts": int(time.time() * 1000)})def describe_topic(bootstrap: str, topic: str) -> Optional[dict]:try:admin = KafkaAdminClient(bootstrap_servers=bootstrap, client_id="lesson5-metrics")# kafka-python AdminClient doesn't expose describe directly; use _client for metadatamd = admin._client.clustermd.request_update()# wait a bit for metadatadeadline = time.time() + 5while time.time() < deadline and not md.topics():time.sleep(0.1)if topic not in md.topics():return Noneparts = md.partitions_for_topic(topic) or []partitions = []for p in parts:leader = md.leader_for_partition(topic, p)replicas = md.replicas_for_partition(topic, p) or []isr = md.in_sync_replicas_for_partition(topic, p) or []partitions.append({"partition": p,"leader": getattr(leader, "id", leader),"replicas": [getattr(n, "id", n) for n in replicas],"isr": [getattr(n, "id", n) for n in isr],})brokers = [getattr(b, "nodeId", getattr(b, "id", None)) for b in md.brokers()]return {"topic": topic, "brokers": brokers, "partitions": partitions}except Exception as e:record_error(f"describe_topic error: {e}")return None@app.route("/api/isr")
def api_isr():bootstrap = request.args.get("bootstrap", "localhost:9092")topic = request.args.get("topic", "demo-reliability")data = describe_topic(bootstrap, topic)if not data:return jsonify({"ok": False, "error": "topic not found or metadata unavailable"}), 404return jsonify({"ok": True, "data": data, "ts": int(time.time() * 1000)})@app.route("/api/mark", methods=["POST"])
def api_mark():try:body = request.get_json(force=True) or {}except Exception:body = {}topic = body.get("topic", "demo-reliability")kind = body.get("kind", "sent_ok") # sent_ok | sent_err | recvvalue = int(body.get("value", 1))inc(topic, kind, value)return jsonify({"ok": True})@app.route("/")
def root():return jsonify({"ok": True, "endpoints": ["/api/metrics", "/api/isr?topic=...", "/api/mark"]})def run(host: str = "127.0.0.1", port: int = 5005):app.run(host=host, port=port, debug=False)if __name__ == "__main__":run()
启动服务:
python lesson_five/metrics_server.py
API 接口说明:
GET /api/isr?bootstrap=...&topic=...
:查询 Topic 的分区 Leader/Replicas/ISRGET /api/metrics
:返回发送/接收累计计数与错误列表POST /api/mark
:可手动上报一条计数(示例页面不需要手动调用)
2) 可视化页面代码
<!-- lesson_five/visualization.html -->
<!DOCTYPE html>
<html lang="zh-CN">
<head><meta charset="UTF-8" /><meta name="viewport" content="width=device-width, initial-scale=1.0" /><title>Kafka ISR 与可靠性可视化</title><link rel="preconnect" href="https://cdn.jsdelivr.net" /><script src="https://cdn.jsdelivr.net/npm/mermaid@10/dist/mermaid.min.js"></script><style>:root {--bg: #0f172a;--panel: #111827;--muted: #9ca3af;--text: #e5e7eb;--accent: #60a5fa;--good: #34d399;--warn: #f59e0b;--bad: #ef4444;--border: #374151;}html, body {margin: 0;padding: 0;background: var(--bg);color: var(--text);font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Helvetica, Arial;}.container { max-width: 1200px; margin: 0 auto; padding: 24px; }h1, h2 { font-weight: 600; letter-spacing: 0.2px; }h1 { font-size: 24px; margin: 0 0 12px; }h2 { font-size: 18px; margin: 24px 0 12px; }.panel { background: var(--panel); border: 1px solid var(--border); border-radius: 10px; padding: 16px; margin-bottom: 16px; }.row { display: flex; gap: 12px; flex-wrap: wrap; align-items: center; }label { color: var(--muted); }input, select { background: #0b1220; color: var(--text); border: 1px solid var(--border); border-radius: 8px; padding: 8px 10px; }input[type="text"]{ min-width: 220px; }button { background: #1f2937; color: var(--text); border: 1px solid var(--border); border-radius: 8px; padding: 8px 12px; cursor: pointer; }button:hover { border-color: var(--accent); }.grid { display: grid; grid-template-columns: 1fr; gap: 16px; }@media (min-width: 980px) { .grid { grid-template-columns: 1.1fr 0.9fr; } }.diagram { background: #0b1220; border: 1px solid var(--border); border-radius: 10px; padding: 12px; overflow: auto; }.logs { background: #0b1220; border: 1px solid var(--border); border-radius: 10px; padding: 12px; height: 200px; overflow: auto; font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, monospace; font-size: 12px; line-height: 1.5; }.stats { display: grid; grid-template-columns: repeat(3, 1fr); gap: 12px; }.stat { background: #0b1220; border: 1px solid var(--border); border-radius: 10px; padding: 12px; text-align: center; }.stat h3 { margin: 0 0 6px; font-size: 12px; color: var(--muted); font-weight: 500; }.stat .v { font-size: 20px; font-weight: 700; }</style>
</head>
<body><div class="container"><h1>Kafka ISR 与可靠性可视化</h1><div class="panel"><div class="row"><label>Bootstrap</label><input id="bootstrap" type="text" value="localhost:9092" /><label>Topic</label><input id="topic" type="text" value="demo-reliability" /><label>Metrics API</label><input id="api" type="text" value="http://127.0.0.1:5005" /><button id="refresh">刷新 ISR</button></div></div><div class="grid"><div class="panel"><h2>分区与 ISR</h2><div id="isr" class="diagram"></div></div><div class="panel"><h2>发送/接收统计</h2><div class="stats"><div class="stat"><h3>发送成功</h3><div id="sent_ok" class="v">0</div></div><div class="stat"><h3>发送失败</h3><div id="sent_err" class="v">0</div></div><div class="stat"><h3>消费条数</h3><div id="recv" class="v">0</div></div></div></div></div><div class="panel"><h2>近期错误</h2><div id="logs" class="logs"></div></div></div><script>(function ensureMermaid(cb){function ok(){ try{ mermaid.initialize({ startOnLoad: false, theme: 'dark', securityLevel: 'loose' }); cb(); }catch(e){ console.error(e); } }if (window.mermaid) return ok();const s = document.createElement('script');s.src = 'https://cdn.jsdelivr.net/npm/mermaid@10/dist/mermaid.min.js';s.onload = ok; s.onerror = ok; document.head.appendChild(s);})(function(){ /* ready */ });const isrDiv = document.getElementById('isr');const logsDiv = document.getElementById('logs');const sentOkEl = document.getElementById('sent_ok');const sentErrEl = document.getElementById('sent_err');const recvEl = document.getElementById('recv');const bootstrapInput = document.getElementById('bootstrap');const topicInput = document.getElementById('topic');const apiInput = document.getElementById('api');const refreshBtn = document.getElementById('refresh');function log(line){const t = new Date().toLocaleTimeString();logsDiv.innerText += `[${t}] ${line}\n`;logsDiv.scrollTop = logsDiv.scrollHeight;}async function renderMermaid(targetEl, def) {try {const id = `m-${Math.random().toString(36).slice(2)}`;const out = await mermaid.render(id, def);targetEl.innerHTML = out.svg || '';} catch (e) {targetEl.innerHTML = `<pre style="white-space:pre-wrap;color:#fca5a5">渲染失败: ${String(e)}</pre>`;}}function buildIsrDiagram(data){// data: {brokers:number[], partitions:[{partition, leader, replicas[], isr[]}]}]let def = 'graph TD\n';def += ' subgraph "Topic ' + data.topic + '"\n';for (const p of data.partitions){const isr = p.isr.join(',');const leader = p.leader;const label = `P${p.partition} (L:${leader} | ISR:[${isr}])`;def += ` P${p.partition}["${label}"]\n`;}def += ' end\n';for (const b of data.brokers){def += ` B${b}((Broker ${b}))\n`;}for (const p of data.partitions){def += ` P${p.partition} --> B${p.leader}\n`;for (const r of p.replicas){ def += ` P${p.partition} -.replica.-> B${r}\n`; }}return def;}async function fetchIsr(){const api = apiInput.value.replace(/\/$/, '');const topic = encodeURIComponent(topicInput.value);const bootstrap = encodeURIComponent(bootstrapInput.value);try{const res = await fetch(`${api}/api/isr?topic=${topic}&bootstrap=${bootstrap}`);if (!res.ok) throw new Error('isr http ' + res.status);const j = await res.json();if (!j.ok) throw new Error('isr api not ok');const def = buildIsrDiagram(j.data);await renderMermaid(isrDiv, def);log('刷新 ISR 成功');}catch(e){ log('刷新 ISR 失败: ' + e); }}async function fetchMetrics(){const api = apiInput.value.replace(/\/$/, '');try{const res = await fetch(`${api}/api/metrics`);if (!res.ok) throw new Error('metrics http ' + res.status);const j = await res.json();const t = topicInput.value;const m = (j.metrics && j.metrics[t]) || {sent_ok:0, sent_err:0, recv:0};sentOkEl.innerText = m.sent_ok;sentErrEl.innerText = m.sent_err;recvEl.innerText = m.recv;// errors listconst errs = j.errors || [];if (errs.length){logsDiv.innerText = '';for (const [ts, line] of errs){const tline = new Date(ts*1000).toLocaleTimeString();logsDiv.innerText += `[${tline}] ${line}\n`;}}}catch(e){ /* ignore transient */ }}refreshBtn.addEventListener('click', fetchIsr);setInterval(fetchMetrics, 1000);setInterval(fetchIsr, 5000);setTimeout(fetchIsr, 300);</script>
</body>
</html>
使用说明:
- 填写 Kafka
Bootstrap
、Topic
与Metrics API
地址(默认localhost:9092
与http://127.0.0.1:5005
) - 点击"刷新 ISR"或等待定时刷新,页面会展示每个分区的 Leader 与 ISR 列表,并通过 Mermaid 图连线到 Broker
- 右侧看板实时展示:发送成功、发送失败、消费条数
统计的来源:
producer_reliability.py
与consumer_reliability.py
可在业务处理处通过 HTTP 调用/api/mark
上报计数;- 本示例为了最少侵入,先使用“外部观察 + 手动触发”模式,推荐你按需将上报逻辑嵌入到生产/消费流程中。
3) 建议的演示流程
- 启动指标服务与可视化页面。
- 启动消费者:
python lesson_five/consumer_reliability.py --topic demo-reliability
- 启动生产者(默认
acks=all
):
python lesson_five/producer_reliability.py --topic demo-reliability --count 50 --acks all
- 在多 Broker 环境下,依照“七、动手实战”的故障注入步骤逐一停/启 Broker:
- 页面左侧 ISR 图会随
describe
结果变化(每 5s 刷新一次); - 发送失败计数会在
ISR < min.insync.replicas
且acks=all
场景上升; - 重新启动 Broker 后,ISR 追上恢复,可再次观察指标变化。
- 页面左侧 ISR 图会随
你也可以将生产者与消费者脚本中对成功/失败、消费计数的位置,调用
POST /api/mark
进行自动上报,使统计更精确。