第一讲、Kafka 初识与环境搭建
一、Kafka 是什么?
Apache Kafka 是一个分布式的消息队列(Message Queue)与流处理平台。
它最早由 LinkedIn 开发,后来捐赠给 Apache 基金会,现已广泛应用于日志收集、实时数据管道和大数据处理。
Kafka 的特点:
- 高吞吐:单机可处理百万级消息/秒。
- 低延迟:毫秒级别,支持实时应用。
- 可扩展:天然支持水平扩展,集群规模可随需求增加。
- 持久化:消息存储在磁盘日志文件中,可回放和追溯。
- 可靠性:数据副本机制,保证故障恢复。
二、为什么需要消息队列?
在分布式系统中,如果系统间直接同步调用,会遇到:
- 耦合度高:调用链复杂,服务间强依赖。
- 抗压能力差:高并发流量容易压垮下游。
- 响应时间长:耗时任务导致用户体验差。
- 数据丢失风险:网络波动或服务宕机时,消息容易丢。
消息队列的优势:
- 解耦:上游只需把消息放进队列,不关心下游实现。
- 削峰填谷:高峰请求存入队列,消费者按能力消费。
- 异步处理:耗时任务后台执行,前端快速响应。
- 可追溯:消息持久化,可回放,便于数据分析。
三、Kafka 与 RabbitMQ、RocketMQ 对比
特性 | Kafka | RabbitMQ | RocketMQ |
---|---|---|---|
定位 | 分布式日志系统 / 流处理平台 | 传统消息队列 | 分布式消息中间件(阿里出品) |
消息模型 | 发布-订阅(Topic/Partition) | 队列 + 交换机 | Topic + Tag |
吞吐量 | 极高(百万级/秒) | 中等(万级/秒) | 高(十万级/秒) |
延迟 | 毫秒级 | 毫秒级 | 毫秒级 |
持久化 | 磁盘顺序写(高效) | 内存 + 磁盘 | 磁盘存储 |
场景 | 日志收集、实时数据流 | 传统异步通信 | 金融、电商事务消息 |
👉 总结:
- Kafka 擅长大数据、日志流处理。
- RabbitMQ 更适合传统企业系统的异步解耦。
- RocketMQ 常用于金融、电商,强调事务消息。
四、Kafka 核心组件
- Producer(生产者):消息发送方。
- Consumer(消费者):消息消费方。
- Broker(代理节点):Kafka 服务器实例,集群由多个 Broker 组成。
- Topic(主题):消息按主题分类。
- Partition(分区):一个主题可拆分成多个分区,提升并行能力。
- Offset(偏移量):消费者在分区里的游标位置。
- Consumer Group(消费者组):一组消费者共享消费任务,实现负载均衡。
📌 工作原理简图:
Producer → Topic → Partition0 [offset 0,1,2…]Partition1 [offset 0,1,2…]Partition2 [offset 0,1,2…]Consumer Group G1:- Consumer A ← Partition0- Consumer B ← Partition1- Consumer C ← Partition2
Kafka 的工作原理:
- Producer 把消息发送到 Topic。
- Kafka 将消息按分区存储,每个分区内保证顺序。
- 消费者组里的消费者按分区消费,每条消息只会被组内一个消费者消费。
- Offset 保证消费者能从上次中断的位置继续。
📊 可视化架构图:
五、本地快速环境搭建
方案一:Mac 安装 Kafka(推荐)
1. 使用 Homebrew 安装
Homebrew 是 Mac 最简单的安装方式:
# 安装 Homebrew(如果尚未安装)
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"# 安装 Kafka(会自动安装 Zookeeper 依赖)
brew install kafka# 查看安装位置
brew --prefix kafka
# 通常在 /opt/homebrew/bin/kafka 或 /usr/local/bin/kafka
2. 启动服务
# 启动 Zookeeper
brew services start zookeeper# 启动 Kafka
brew services start kafka# 或者手动启动(前台运行)
zookeeper-server-start /opt/homebrew/etc/kafka/zookeeper.properties
kafka-server-start /opt/homebrew/etc/kafka/server.properties
3. 创建 Topic
kafka-topics --create --topic demo.hello \--bootstrap-server localhost:9092 \--partitions 3 --replication-factor 1# 验证创建成功
kafka-topics --list --bootstrap-server localhost:9092
4. 停止服务
# 停止 Kafka 和 Zookeeper
brew services stop kafka
brew services stop zookeeper
方案二:传统安装(手动下载)
如果不使用 Homebrew,也可以手动安装:
1. 下载
# 下载 Kafka
cd ~/Downloads
wget https://downloads.apache.org/kafka/2.13-3.6.0/kafka_2.13-3.6.0.tgz# 解压
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0# 添加到 PATH(可选)
echo 'export PATH="$HOME/Downloads/kafka_2.13-3.6.0/bin:$PATH"' >> ~/.zshrc
source ~/.zshrc
2. 启动服务
# 启动 Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties# 新终端启动 Kafka
bin/kafka-server-start.sh config/server.properties
3. 创建 Topic
bin/kafka-topics.sh --create --topic demo.hello \--bootstrap-server localhost:9092 \--partitions 3 --replication-factor 1
方案三:Docker 一键启动(推荐新手)
如果不想本机安装,可用 Docker 快速拉起单机环境:
1. 创建 docker-compose.yml
在项目根目录创建 docker-compose.yml
:
version: '3.8'
services:zookeeper:image: bitnami/zookeeper:3.8environment:- ALLOW_ANONYMOUS_LOGIN=yesports:- "2181:2181"kafka:image: bitnami/kafka:3ports:- "9092:9092"environment:- KAFKA_BROKER_ID=1- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181- ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092- KAFKA_LISTENERS=PLAINTEXT://:9092depends_on:- zookeeper
2. 启动/停止
# 启动服务
docker compose up -d# 查看日志
docker compose logs -f kafka# 停止服务
docker compose down -v
安装 Python 环境和 kafka-python
1. Python 环境准备
# 检查 Python 版本(推荐 3.7+)
python3 --version# 创建虚拟环境(推荐)
python3 -m venv kafka_env
source kafka_env/bin/activate# 或使用 conda
conda create -n kafka_env python=3.9
conda activate kafka_env
2. 安装 kafka-python
# 基础安装
pip install kafka-python# 如果需要额外功能,可以安装可选依赖
pip install kafka-python[crc32c] # 更快的CRC32校验# 验证安装
python -c "import kafka; print(f'kafka-python version: {kafka.__version__}')"
3. 环境变量配置
# 设置 Kafka 连接地址(可选)
export KAFKA_BOOTSTRAP=localhost:9092# 添加到 shell 配置文件(永久生效)
echo 'export KAFKA_BOOTSTRAP=localhost:9092' >> ~/.zshrc
source ~/.zshrc
六、案例:Hello Kafka
1. 命令行体验
启动生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo.hello
输入几条消息:
hello-1
hello-2
hello-3
启动消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic demo.hello --from-beginning
输出:
hello-1
hello-2
hello-3
🎉 恭喜!你完成了第一个 Kafka "生产-消费"实验。
2. Python 代码实现
我们提供了增强版的 Python 脚本,展示了消息键、头部、批量发送、消息过滤等高级特性:
核心脚本介绍
lesson01_hello_producer.py
- 增强版生产者,支持消息键、自定义头部、批量发送lesson01_hello_consumer.py
- 增强版消费者,支持消息过滤、偏移量管理、错误处理lesson01_consumer_group_demo.py
- 消费者组演示,展示分区分配和负载均衡common.py
- 公共工具函数,包含生产者和消费者工厂函数
运行示例
# 1. 先启动消费者(观察消息)
python lesson01_hello_consumer.py# 2. 新终端启动生产者(发送消息)
python lesson01_hello_producer.py# 3. 或运行消费者组演示
python lesson01_consumer_group_demo.py
增强版生产者特性
生产者脚本包含两个函数,演示不同场景:
带键消息发送:
# 发送带键的消息,确保相同键分配到相同分区
user_ids = ["user_001", "user_002", "user_003", "user_004", "user_005"]
key = random.choice(user_ids)# 构造消息内容
message = {"id": i + 1,"user_id": key,"message": f"Hello from producer! Message #{i + 1}","timestamp": datetime.now().isoformat(),"source": "lesson01_producer"
}# 添加自定义头部
headers = [("message_type", b"greeting"),("priority", b"normal"),("batch_id", str(i // 3).encode())
]
批量发送优化:
# 高吞吐量配置
producer = make_producer(acks="all", # 等待所有副本确认linger_ms=50, # 批量发送延迟batch_size=32768, # 更大的批次compression_type="gzip", # 启用压缩
)# 预热发送确保连接稳定
producer.send(TOPIC, value={"warmup": True}).get(timeout=30)# 批量异步发送 + 统一flush
futures = [producer.send(TOPIC, value=msg) for msg in messages]
producer.flush(timeout=60)
增强版消费者特性
消息过滤:
def filter_messages(self, message):"""消息过滤逻辑"""headers_raw = _headers_map(message)# 只处理 message_type=greetingmsg_type = _hget(headers_raw, "message_type", None)if msg_type is not None and msg_type != "greeting":return False# 只处理特定用户if isinstance(message.value, dict) and "user_id" in message.value:return message.value["user_id"] in {"user_001", "user_002"}return True
优雅关闭和统计:
# 信号处理优雅关闭
signal.signal(signal.SIGINT, self.signal_handler)# 实时统计
def print_stats(self):duration = time.time() - self.start_timethroughput = self.message_count / durationprint(f"📊 消费统计: {self.message_count} 条消息, "f"吞吐量: {throughput:.2f} 消息/秒")
预期输出示例
生产者输出:
🎬 Kafka Producer 示例开始
⏳ 等待Kafka服务就绪...
✅ Kafka服务已就绪
🚀 启动带键的生产者...
✅ 消息 1 发送成功: topic=demo.hello, partition=1, offset=10, key=user_002
✅ 消息 2 发送成功: topic=demo.hello, partition=0, offset=8, key=user_001
📦 启动批量消息生产者...
🧩 主题 demo.hello 分区: [0, 1, 2] | 压缩: gzip
🔥 预热完成
🎯 批量发送完成:成功 50/50,耗时 2.15s,吞吐 23.3 msg/s
消费者输出:
🎬 Kafka Consumer 示例开始
⏳ 等待Kafka服务就绪...
✅ Kafka服务已就绪
🎯 消费者已创建: topic=demo.hello, group_id=lesson01_consumer_group
🚀 开始消费消息... (按 Ctrl+C 停止)📨 消息 #1主题: demo.hello分区: 1偏移量: 10键: user_002头部: {"message_type": "greeting", "priority": "normal"}内容: {"id": 1,"user_id": "user_002","message": "Hello from producer! Message #1","timestamp": "2024-01-15T10:30:15.123456"}👤 用户消息: user_002
配置参数说明
你可以在 common.py
中调整各种参数:
生产者参数:
acks
- 确认级别(0/1/all),影响可靠性enable_idempotence
- 幂等性,避免重复消息linger_ms
- 批量发送延迟,影响吞吐量batch_size
- 批次大小,影响内存使用compression_type
- 压缩类型(gzip/snappy/lz4)
消费者参数:
auto_offset_reset
- 偏移量重置策略earliest
: 从最早的消息开始消费(适合首次启动或重新处理历史数据)latest
: 从最新的消息开始消费(适合只关心新消息的场景)none
: 如果没有已提交的偏移量则抛出异常
enable_auto_commit
- 自动提交偏移量True
: 每隔auto_commit_interval_ms
自动提交,简单但可能重复消费False
: 手动提交,可精确控制处理完成后再提交,避免消息丢失
max_poll_records
- 单次拉取最大消息数- 默认500,建议根据消息大小和处理速度调整
- 数值越大吞吐量越高,但内存占用和处理延迟也会增加
session_timeout_ms
- 会话超时时间(默认30秒)- 消费者多久没有发送心跳就被认为已死亡
- 过短容易误判,过长影响故障检测速度
heartbeat_interval_ms
- 心跳间隔时间(默认3秒)- 向协调器发送心跳的频率,通常设为 session_timeout_ms 的 1/3
- 确保消费者存活状态及时同步
参数配置示例
高可靠性消费者配置(金融、订单等关键业务):
consumer = make_consumer(topic="critical_orders",group_id="order_processing_group",auto_offset_reset="earliest", # 确保不丢失消息enable_auto_commit=False, # 手动提交,确保处理完才确认max_poll_records=10, # 小批量处理,降低风险session_timeout_ms=60000, # 60秒超时,避免网络抖动误判heartbeat_interval_ms=20000 # 20秒心跳,减少网络开销
)
高吞吐量消费者配置(日志、监控等允许少量丢失的场景):
consumer = make_consumer(topic="application_logs",group_id="log_analysis_group", auto_offset_reset="latest", # 只处理新日志enable_auto_commit=True, # 自动提交,简化逻辑max_poll_records=500, # 大批量处理,提升吞吐session_timeout_ms=10000, # 10秒超时,快速故障检测heartbeat_interval_ms=3000 # 3秒心跳,及时同步状态
)
实时处理消费者配置(消息推送、实时计算等):
consumer = make_consumer(topic="realtime_events",group_id="realtime_processing_group",auto_offset_reset="latest", # 只处理最新事件enable_auto_commit=False, # 手动提交,控制处理节奏max_poll_records=50, # 中等批量,平衡延迟和吞吐session_timeout_ms=15000, # 15秒超时,快速响应heartbeat_interval_ms=5000 # 5秒心跳,保持连接活跃
)
七、小结
- Kafka 是一款分布式消息中间件,擅长高吞吐和实时流处理。
- 消息队列的价值:解耦、削峰、异步、可靠。
- 与 RabbitMQ、RocketMQ 对比,Kafka 更适合大数据、日志、实时管道。
- 核心概念:Producer、Consumer、Broker、Topic、Partition、Offset。
- 本地环境可快速启动 Zookeeper + Kafka,并用命令行体验"Hello Kafka"。
- Python 代码示例展示了如何在实际项目中使用 Kafka。
附录 A:术语速查(Glossary)
- Topic:消息主题,按业务维度分类。
- Partition:主题的分片,提升并发度;分区内有序,分区间无序。
- Offset:偏移量,消费者在分区内的位置游标。
- Consumer Group:同一组内的消费者共享分区实现水平扩展。
- Replication Factor:副本数,提高容灾能力。
- ACKS:生产者写入确认级别(0/1/all)。
- Idempotence:幂等写入,避免重复消息。
- Broker:Kafka 服务器实例,负责消息存储和转发。
- Producer:消息生产者,负责向 Topic 发送消息。
- Consumer:消息消费者,负责从 Topic 消费消息。
附录 B:常见问题排查(Troubleshooting)
-
错误:
NoBrokersAvailable
- 检查 Kafka 是否启动、
localhost:9092
端口是否可达,或设置KAFKA_BOOTSTRAP
。
- 检查 Kafka 是否启动、
-
错误:连接被拒绝或超时
- 如使用 Docker,确保
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
;Windows/WLS2 或远程容器需用宿主 IP。
- 如使用 Docker,确保
-
主题不存在/消费不到数据
- 先用 CLI 创建
demo.hello
;或确保 Broker 允许自动创建主题;消费者组首次消费需设置auto_offset_reset=earliest
(已在示例中设置)。
- 先用 CLI 创建
-
端口占用
- 停掉本地占用
2181/9092
的进程,或改端口。
- 停掉本地占用
-
Docker 容器启动失败
- 检查 Docker 服务状态,确保有足够内存(建议 4GB+)。
- 查看容器日志:
docker compose logs zookeeper
或docker compose logs kafka
。
附录 C:练习与思考(建议动手)
基础练习
-
Mac 环境搭建:使用 Homebrew 安装 Kafka,体验
brew services
管理服务的便利性。 -
分区分配观察:
# 修改分区数并观察 kafka-topics --alter --topic demo.hello --partitions 2 --bootstrap-server localhost:9092# 同时启动 2 个消费者(同一组) python lesson01_consumer_group_demo.py
-
消息键实验:
- 运行生产者,观察相同
user_id
的消息如何分配到相同分区 - 修改
user_ids
列表,测试不同键的分区分配
- 运行生产者,观察相同
进阶实验
-
性能对比测试:
# 在 lesson01_hello_producer.py 中修改配置 # 测试1:高可靠性 producer = make_producer(acks="all", enable_idempotence=True)# 测试2:高吞吐量 producer = make_producer(acks="1", linger_ms=100, batch_size=65536)# 测试3:无确认(最快但不可靠) producer = make_producer(acks="0", linger_ms=0)
-
压缩效果对比:
# 分别测试不同压缩算法 compression_types = [None, "gzip", "snappy", "lz4"] # 观察吞吐量和网络使用的差异
-
消息过滤实验:
- 修改消费者的
filter_messages()
函数 - 测试只消费特定优先级或特定用户的消息
- 观察过滤对性能的影响
- 修改消费者的
-
错误处理和重试:
# 在发送过程中故意制造错误 # 观察重试机制和幂等性的效果 try:future = producer.send("nonexistent-topic", value=message)future.get(timeout=5) except KafkaError as e:print(f"发送失败:{e}")
消费者组实验
-
负载均衡观察:
# 启动消费者组演示,观察分区分配 python lesson01_consumer_group_demo.py# 动态添加/移除消费者,观察 rebalance 过程
-
偏移量管理:
- 对比自动提交 vs 手动提交的区别
- 模拟消费者崩溃,观察重启后的消费位置
- 尝试重置偏移量:
kafka-consumer-groups --bootstrap-server localhost:9092 \--group lesson01_consumer_group --reset-offsets --to-earliest \--topic demo.hello --execute
性能调优实验
-
批量大小优化:
# 测试不同的 batch_size 值 batch_sizes = [1024, 16384, 32768, 65536] # 记录每种配置下的吞吐量
-
预热效果验证:
- 注释掉预热代码,对比首次发送的延迟
- 观察元数据获取对性能的影响
-
连接池优化:
# 测试不同的连接参数 max_in_flight_requests = [1, 5, 10] # 观察对吞吐量和消息顺序的影响
思考题与参考答案
- 架构设计:
Q: 为什么 Kafka 选择"分区内有序,分区间无序"的设计?
A: 这是性能与一致性的巧妙平衡:
- 分区内有序:保证单分区内消息按发送顺序消费,满足大多数业务场景的顺序需求
- 分区间无序:允许多分区并行处理,大幅提升吞吐量
- 设计优势:
- 避免了全局排序的性能瓶颈
- 通过消息键确保相关消息在同一分区内有序
- 支持水平扩展,分区数可根据需求调整
Q: 消费者组中的消费者数量超过分区数会发生什么?
A: 会出现消费者闲置情况:
主题有3个分区,但消费者组有5个消费者:
分区0 -> 消费者A ✅
分区1 -> 消费者B ✅
分区2 -> 消费者C ✅
空闲 -> 消费者D ❌ (无分区分配)
空闲 -> 消费者E ❌ (无分区分配)
- 最佳实践:消费者数量 ≤ 分区数
- 动态调整:当消费者退出时,闲置消费者会被重新分配分区
Q: 如何设计一个高可用的 Kafka 集群?
A: 关键要素包括:
# 集群配置示例
集群规模: >= 3 个 Broker (奇数个,便于选举)
副本因子: >= 3 (容忍 1 个 Broker 故障)
分区分布: 副本分散在不同 Broker 上
网络隔离: 不同机架/可用区部署
监控告警: JMX 指标 + 日志监控
备份策略: 定期快照 + 增量备份
- 性能权衡:
Q: acks=all
vs acks=1
vs acks=0
的性能和可靠性差异?
A: 三种模式的对比:
模式 | 可靠性 | 性能 | 使用场景 |
---|---|---|---|
acks=0 | 最低 | 最高 | 日志收集、指标上报 |
acks=1 | 中等 | 中等 | 一般业务消息 |
acks=all | 最高 | 最低 | 金融交易、订单处理 |
# 性能测试示例
# acks=0: ~100,000 msg/s, 0% 确保送达
# acks=1: ~50,000 msg/s, 99% 确保送达
# acks=all: ~20,000 msg/s, 99.99% 确保送达
Q: 启用压缩的CPU开销是否值得?
A: 通常值得,特别是网络带宽有限时:
压缩效果对比(JSON消息):
无压缩: 100MB/s 网络, 0% CPU
gzip: 40MB/s 网络, 5% CPU (压缩比60%)
snappy: 60MB/s 网络, 2% CPU (压缩比40%)
lz4: 70MB/s 网络, 1% CPU (压缩比30%)
- 推荐:生产环境使用
gzip
或snappy
- 权衡:网络成本 > CPU成本时启用压缩
Q: 批量发送的延迟 vs 吞吐量权衡?
A: 关键在于 linger.ms
的设置:
# 延迟敏感应用
linger_ms=0 # 立即发送,延迟 <1ms,吞吐量较低
linger_ms=5 # 等待5ms,延迟 ~5ms,吞吐量提升50%# 吞吐量优先应用
linger_ms=50 # 等待50ms,延迟 ~50ms,吞吐量提升200%
linger_ms=100 # 等待100ms,延迟 ~100ms,吞吐量提升300%
- 实际应用:
Q: 如何选择合适的分区数?
A: 综合考虑多个因素:
分区数计算公式:
分区数 = max(目标吞吐量 / 单分区吞吐量,目标吞吐量 / (消费者数 × 单消费者吞吐量)
)实际建议:
- 起始值:max(预期消费者数, 目标吞吐量MB/s ÷ 10)
- 上限:单Broker建议不超过1000个分区
- 扩展:只能增加不能减少,提前规划
Q: 什么时候使用消息键,什么时候不用?
A: 根据业务需求决定:
使用消息键的场景:
# 1. 需要顺序处理
user_orders = {"user_id": "12345", "order": "..."}
producer.send("orders", key="12345", value=user_orders)# 2. 相关消息聚合
log_events = {"session_id": "abc", "event": "click"}
producer.send("logs", key="abc", value=log_events)# 3. 负载均衡
producer.send("tasks", key=f"worker_{task_id % 10}", value=task)
不使用消息键的场景:
# 1. 独立事件(随机分布即可)
metrics = {"cpu_usage": 80, "timestamp": "..."}
producer.send("metrics", value=metrics) # key=None# 2. 需要最大并行度
notifications = {"message": "系统维护通知"}
producer.send("notifications", value=notifications)
Q: 如何处理消费者处理失败的消息?
A: 多种策略组合使用:
# 1. 重试机制
def process_message(message):max_retries = 3for attempt in range(max_retries):try:# 业务处理逻辑handle_business_logic(message.value)return Trueexcept RetryableError as e:if attempt == max_retries - 1:send_to_dlq(message) # 发送到死信队列time.sleep(2 ** attempt) # 指数退避except FatalError:send_to_dlq(message) # 直接进入死信队列return False# 2. 死信队列模式
def send_to_dlq(message):dlq_producer.send("dead_letter_queue",key=message.key,value={"original_topic": message.topic,"original_message": message.value,"error_reason": "processing_failed","failed_at": datetime.now().isoformat()})# 3. 手动确认模式
consumer = make_consumer("my_topic", group_id="my_group",enable_auto_commit=False # 关闭自动提交
)for message in consumer:try:process_message(message)consumer.commit() # 处理成功才提交except Exception:# 不提交,消息会重新消费continue
最佳实践组合:
- 幂等处理 - 确保重复消费不会产生副作用
- 错误分类 - 区分可重试错误和致命错误
- 监控告警 - 监控失败率和死信队列大小
- 人工介入 - 定期处理死信队列中的消息
扩展挑战
-
监控和可观测性:
- 添加消息发送成功率统计
- 实现消费延迟监控
- 添加分区级别的吞吐量统计
-
错误恢复机制:
- 实现死信队列(DLQ)
- 添加消息重试机制
- 实现优雅降级策略
-
多环境适配:
- 支持 SASL/SSL 认证
- 适配云服务(如 Confluent Cloud)
- 实现配置外部化
完整案例代码
common:
# -*- coding: utf-8 -*-
import os, json, time
from datetime import datetime
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import NoBrokersAvailable, KafkaTimeoutError, KafkaError# 优先用 IPv4,避免 macOS 把 localhost 解析为 ::1 的坑
BOOTSTRAP = os.getenv("KAFKA_BOOTSTRAP", "127.0.0.1:9092")
DEFAULT_COMPRESSION = os.getenv("KAFKA_COMPRESSION", "gzip") # gzip/snappy/lz4/zstd/none# ─────────────────────────── 基础连通性 ───────────────────────────def wait_kafka(timeout=30):"""等待 Kafka 可用(尝试建立短连接)。可结合上层打印 BOOTSTRAP,便于排查。"""start = time.time()last_err = Nonewhile time.time() - start < timeout:try:KafkaProducer(bootstrap_servers=BOOTSTRAP).close()returnexcept Exception as e:last_err = etime.sleep(1)raise RuntimeError(f"Kafka not available at {BOOTSTRAP}; last error: {last_err}")# ─────────────────────────── Producer ───────────────────────────def _normalize_compression(name: str | None):if not name or str(name).lower() in ("none", "false", "0", ""):return Nonereturn str(name).lower()def make_producer(acks: str = "all",enable_idempotence: bool = False, # 仅作语义标记;kafka-python 不支持该开关linger_ms: int = 50, # 适中等待,避免首条卡顿batch_size: int = 64 * 1024, # 64KB 批compression_type: str | None = DEFAULT_COMPRESSION,security: dict | None = None, # {username, password}(如接入云厂商时)request_timeout_ms: int = 60_000,retries: int = 8,retry_backoff_ms: int = 300,max_in_flight_requests_per_connection: int = 5,metadata_max_age_ms: int = 10_000,
):"""生产者:稳健默认值- acks="all" 生产更可靠(单副本等价于1)- gzip 默认免安装;snappy/lz4/zstd 需额外依赖- 如需幂等/事务,请使用 confluent-kafka 客户端"""params = dict(bootstrap_servers=BOOTSTRAP,acks=acks,value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),key_serializer=lambda v: None if v is None else str(v).encode("utf-8"),linger_ms=linger_ms,batch_size=batch_size,compression_type=_normalize_compression(compression_type),retries=retries,retry_backoff_ms=retry_backoff_ms,request_timeout_ms=request_timeout_ms,max_in_flight_requests_per_connection=max_in_flight_requests_per_connection,metadata_max_age_ms=metadata_max_age_ms,)# 可选:SASL/SSL(连云服务时开启)# if security:# params.update(dict(# security_protocol="SASL_SSL",# sasl_mechanism="PLAIN",# sasl_plain_username=security["username"],# sasl_plain_password=security["password"],# ))# 处理压缩库缺失的场景:snappy/lz4/zstd 没装则自动降级到 gziptry:return KafkaProducer(**params)except AssertionError as e:msg = str(e).lower()if "compression codec" in msg:# 自动降级到 gzipparams["compression_type"] = "gzip"return KafkaProducer(**params)raisedef warmup_producer(producer: KafkaProducer, topic: str, timeout: int = 30):"""发送一条极小消息并等待 ACK,完成连接/元数据/leader 预热。"""fut = producer.send(topic, value={"__warmup__": True, "ts": datetime.now().isoformat()})fut.get(timeout=timeout)producer.flush(timeout=timeout)# ─────────────────────────── Consumer ───────────────────────────def make_consumer(topic: str,group_id: str | None,auto_offset_reset: str = "earliest", # 教学用 earliest;生产更常用 latestenable_auto_commit: bool = False, # 推荐手动提交,先处理后提交max_poll_records: int = 100,security: dict | None = None,session_timeout_ms: int = 10_000,heartbeat_interval_ms: int = 3_000,request_timeout_ms: int = 305_000,fetch_min_bytes: int = 1,fetch_max_wait_ms: int = 500,fetch_max_bytes: int = 50 * 1024 * 1024, # 50MBconsumer_timeout_ms: int = 0, # >0: 无消息 N ms 后抛 StopIterationretry_backoff_ms: int = 100,reconnect_backoff_ms: int = 50,reconnect_backoff_max_ms: int = 1000,
):"""消费者:稳健默认值(更少超时、更平滑批量)"""params = dict(bootstrap_servers=BOOTSTRAP,group_id=group_id,auto_offset_reset=auto_offset_reset,enable_auto_commit=enable_auto_commit,value_deserializer=lambda v: json.loads(v.decode("utf-8")),key_deserializer=lambda v: None if v is None else v.decode("utf-8"),max_poll_records=max_poll_records,# 稳定性 & 吞吐session_timeout_ms=session_timeout_ms,heartbeat_interval_ms=heartbeat_interval_ms,request_timeout_ms=request_timeout_ms,fetch_min_bytes=fetch_min_bytes,fetch_max_wait_ms=fetch_max_wait_ms,fetch_max_bytes=fetch_max_bytes,# 控制退出consumer_timeout_ms=consumer_timeout_ms,# 退避重连retry_backoff_ms=retry_backoff_ms,reconnect_backoff_ms=reconnect_backoff_ms,reconnect_backoff_max_ms=reconnect_backoff_max_ms,)# if security:# params.update(dict(# security_protocol="SASL_SSL",# sasl_mechanism="PLAIN",# sasl_plain_username=security["username"],# sasl_plain_password=security["password"],# ))return KafkaConsumer(topic, **params)# ─────────────────────────── 小工具 ───────────────────────────def partitions_of(producer: KafkaProducer, topic: str):"""获取主题分区集合(None/空集合 => 元数据未取到或主题不存在)"""return producer.partitions_for(topic) or set()def close_safely(x):try:x.flush(timeout=30) if hasattr(x, "flush") else Noneexcept Exception:passtry:x.close()except Exception:pass
生产者:
#!/usr/bin/env python3
"""
Kafka Producer 示例 - 发送消息到 demo.hello 主题
演示:消息键、自定义头部、错误处理、配置调优
"""
import os, time
import json
import random
from datetime import datetime
from kafka.errors import KafkaTimeoutError, KafkaError
from common import make_producer, BOOTSTRAP,wait_kafkaTOPIC = "demo.hello"def send_messages_with_keys():"""发送带键的消息,演示分区分配"""print("🚀 启动带键的生产者...")# 创建生产者,启用幂等性producer = make_producer(acks="all", # 等待所有副本确认enable_idempotence=True, # 启用幂等性,避免重复linger_ms=10, # 批量发送延迟batch_size=16384, # 批次大小compression_type="gzip" # 启用压缩)# 定义一些用户ID作为消息键user_ids = ["user_001", "user_002", "user_003", "user_004", "user_005"]try:for i in range(10):# 随机选择用户ID作为键key = random.choice(user_ids)# 构造消息内容message = {"id": i + 1,"user_id": key,"message": f"Hello from producer! Message #{i + 1}","timestamp": datetime.now().isoformat(),"source": "lesson01_producer"}# 添加自定义头部headers = [("message_type", b"greeting"),("priority", b"normal"),("batch_id", str(i // 3).encode())]# 发送消息future = producer.send(topic="demo.hello",key=key,value=message,headers=headers)# 等待发送完成并检查结果record_metadata = future.get(timeout=10)print(f"✅ 消息 {i + 1} 发送成功: "f"topic={record_metadata.topic}, "f"partition={record_metadata.partition}, "f"offset={record_metadata.offset}, "f"key={key}")# 模拟消息间隔time.sleep(0.5)except Exception as e:print(f"❌ 发送消息时出错: {e}")finally:# 确保所有消息都发送完成producer.flush()producer.close()print("🔒 生产者已关闭")def send_batch_messages():"""批量发送消息(预热 + 更稳的超时/重试 + gzip)"""print("\n📦 启动批量消息生产者...")print(f"🔌 BOOTSTRAP = {BOOTSTRAP}")# 1) 使用更稳的参数;先用 gzip,排除 snappy 干扰producer = make_producer(acks="all", # 单副本等价于1,稳定一点linger_ms=50, # 避免首条等太久batch_size=32768,compression_type="gzip",# 如果你在 common.py 里支持以下键,建议设置;否则忽略# request_timeout_ms=60000,# retries=8,# retry_backoff_ms=300,# max_in_flight_requests_per_connection=5,)# 2) 元数据/Topic 自检parts = producer.partitions_for(TOPIC)if not parts:print(f"❌ 无法获取 {TOPIC} 的分区信息;请先创建主题或检查 Kafka 监听地址。")producer.close()returnprint(f"🧩 主题 {TOPIC} 分区: {sorted(parts)} | 压缩: gzip")# 3) 预热发送(确保连接/元数据/leader 完全就绪)try:producer.send(TOPIC, value={"warmup": True}).get(timeout=30)producer.flush(timeout=30)print("🔥 预热完成")except Exception as e:print(f"❌ 预热失败:{e}")producer.close()return# 4) 准备批量数据messages = [{"batch_id": i // 10,"sequence": i + 1,"content": f"Batch message #{i + 1}","timestamp": datetime.now().isoformat()} for i in range(50)]# 5) 异步发送 + 最后统一 flush;只在 flush 后检查结果start = time.time()futures = [producer.send(TOPIC, value=m) for m in messages]# 6) 等待缓冲区落盘errors = 0try:producer.flush(timeout=60)# 可选:逐条确认(此时元数据/连接稳定,失败概率更低)for i, fut in enumerate(futures, 1):try:fut.get(timeout=10)if i % 10 == 0:print(f"📊 已确认 {i}/{len(messages)} 条")except (KafkaTimeoutError, KafkaError) as e:errors += 1print(f"❌ 第 {i} 条失败:{e}")finally:duration = time.time() - startok = len(messages) - errorstput = ok / duration if duration > 0 else 0print(f"🎯 批量发送完成:成功 {ok}/{len(messages)},耗时 {duration:.2f}s,吞吐 {tput:.1f} msg/s")producer.close()print("🔒 批量生产者已关闭")def main():"""主函数"""print("🎬 Kafka Producer 示例开始")print("=" * 50)# 等待Kafka就绪print("⏳ 等待Kafka服务就绪...")wait_kafka()print("✅ Kafka服务已就绪")# 发送带键的消息send_messages_with_keys()# 发送批量消息send_batch_messages()print("\n🎉 所有消息发送完成!")print("💡 提示:现在可以启动消费者来接收这些消息")if __name__ == "__main__":main()
消费者:
#!/usr/bin/env python3
"""
Kafka Consumer 示例 - 从 demo.hello 主题消费消息
演示:消息过滤、偏移量管理、错误处理、消费者组管理
"""import json
import time
import signal
from datetime import datetime
from common import make_consumer, wait_kafka# ────────────── Headers 工具函数 ──────────────
def _headers_map(msg):"""将 message.headers 转为 {str: bytes} 的字典"""# kafka-python: headers 是 List[Tuple[str, bytes]]return dict(msg.headers or [])def _hget(headers: dict, key: str, default=None, decode=True):"""从 headers 获取 key;默认把 bytes 解码成 str。"""val = headers.get(key, default)if decode and isinstance(val, (bytes, bytearray)):try:return val.decode("utf-8")except Exception:return defaultreturn val# ────────────── 消费者类 ──────────────
class KafkaMessageConsumer:"""Kafka 消息消费者类"""def __init__(self, topic, group_id, auto_commit=True):self.topic = topicself.group_id = group_idself.auto_commit = auto_commitself.running = Trueself.message_count = 0self.start_time = time.time()# 设置信号处理,优雅关闭signal.signal(signal.SIGINT, self.signal_handler)signal.signal(signal.SIGTERM, self.signal_handler)# 创建消费者self.consumer = make_consumer(topic=topic,group_id=group_id,auto_offset_reset="earliest", # 从最早的消息开始消费(首次启动或重新处理历史数据)enable_auto_commit=auto_commit,# 是否自动提交偏移量(False=手动提交,更安全)max_poll_records=100, # 每次拉取的最大消息数(平衡吞吐量与内存使用)session_timeout_ms=30000, # 会话超时时间30秒(心跳丢失多久认为消费者死亡)heartbeat_interval_ms=3000, # 心跳间隔3秒(通常为session_timeout的1/3))print(f"🎯 消费者已创建: topic={topic}, group_id={group_id}")def signal_handler(self, signum, frame):"""信号处理函数,优雅关闭"""print(f"\n🛑 收到信号 {signum},正在优雅关闭...")self.running = Falsedef process_message(self, message):"""处理单条消息"""try:value = message.valuekey = message.keypartition = message.partitionoffset = message.offsettimestamp = message.timestampheaders_raw = _headers_map(message) # {str: bytes}headers = {k: (v.decode("utf-8", "ignore") if isinstance(v, (bytes, bytearray)) else v)for k, v in headers_raw.items()}self.message_count += 1# 格式化输出print(f"\n📨 消息 #{self.message_count}")print(f" 主题: {message.topic}")print(f" 分区: {partition}")print(f" 偏移量: {offset}")print(f" 键: {key}")print(f" 时间戳: {datetime.fromtimestamp(timestamp/1000) if timestamp else 'N/A'}")print(f" 头部: {headers}")print(f" 内容: {json.dumps(value, ensure_ascii=False, indent=2)}")# 示例业务逻辑if isinstance(value, dict):if "user_id" in value:print(f" 👤 用户消息: {value['user_id']}")if "batch_id" in value:print(f" 📦 批量消息: 批次 {value['batch_id']}")# 判断优先级priority = _hget(headers_raw, "priority", "normal")if priority == "high":print(" ⚠️ 高优先级消息!")# 模拟处理耗时time.sleep(0.1)except Exception as e:print(f"❌ 处理消息时出错: {e}")def filter_messages(self, message):"""消息过滤逻辑"""try:value = message.valueheaders_raw = _headers_map(message)# 条件1:只处理 message_type=greetingmsg_type = _hget(headers_raw, "message_type", None)if msg_type is not None and msg_type != "greeting":return False# 条件2:只处理特定用户if isinstance(value, dict) and "user_id" in value:return value["user_id"] in {"user_001", "user_002"}return Trueexcept Exception as e:print(f"❌ 消息过滤出错: {e}")return True # 出错时默认放行def commit_offsets(self):"""手动提交偏移量"""if not self.auto_commit:try:self.consumer.commit()print("💾 偏移量已手动提交")except Exception as e:print(f"❌ 提交偏移量失败: {e}")def print_stats(self):"""打印统计信息"""if self.message_count > 0:duration = time.time() - self.start_timethroughput = self.message_count / durationprint(f"\n📊 消费统计:")print(f" 总消息数: {self.message_count}")print(f" 运行时间: {duration:.2f} 秒")print(f" 吞吐量: {throughput:.2f} 消息/秒")def run(self):"""运行消费者"""print(f"🚀 开始消费消息... (按 Ctrl+C 停止)")print("=" * 60)try:while self.running:# 拉取消息messages = self.consumer.poll(timeout_ms=1000, max_records=10)for _, partition_messages in messages.items():for message in partition_messages:if not self.running:breakif self.filter_messages(message):self.process_message(message)# 定期手动提交偏移量if not self.auto_commit and self.message_count % 10 == 0:self.commit_offsets()if self.message_count > 0 and self.message_count % 20 == 0:print(f"💡 已处理 {self.message_count} 条消息...")except KeyboardInterrupt:print("\n🛑 用户中断,正在关闭...")except Exception as e:print(f"❌ 消费过程中出错: {e}")finally:self.cleanup()def cleanup(self):"""清理资源"""try:self.print_stats()if not self.auto_commit:self.commit_offsets()self.consumer.close()print("🔒 消费者已关闭")except Exception as e:print(f"❌ 清理资源时出错: {e}")# ────────────── 主入口 ──────────────
def main():print("🎬 Kafka Consumer 示例开始")print("=" * 50)topic = "demo.hello"group_id = "lesson01_consumer_group"auto_commit = False # 手动提交,便于观察print("⏳ 等待Kafka服务就绪...")wait_kafka()print("✅ Kafka服务已就绪")consumer = KafkaMessageConsumer(topic, group_id, auto_commit)try:consumer.run()except Exception as e:print(f"❌ 消费者运行出错: {e}")print("\n👋 消费者示例结束")if __name__ == "__main__":main()
演示demo:
#!/usr/bin/env python3
"""
Kafka 消费者组演示 - 展示分区分配和负载均衡
演示:多个消费者如何共享分区、消费者组的行为
"""import json
import time
import signal
import sys
import threading
from datetime import datetime
from common import make_consumer, wait_kafkaclass GroupConsumer:"""消费者组中的单个消费者"""def __init__(self, consumer_id, topic, group_id, auto_commit=True):self.consumer_id = consumer_idself.topic = topicself.group_id = group_idself.auto_commit = auto_commitself.running = Trueself.message_count = 0self.start_time = time.time()# 创建消费者self.consumer = make_consumer(topic=topic,group_id=group_id,auto_offset_reset="earliest", # 从最早的消息开始消费enable_auto_commit=auto_commit, # 自动提交偏移量设置max_poll_records=50, # 每次拉取50条消息(演示用,实际可调整)session_timeout_ms=30000, # 30秒会话超时heartbeat_interval_ms=3000 # 3秒心跳间隔)print(f"👤 消费者 {consumer_id} 已创建: group_id={group_id}")def process_message(self, message):"""处理消息"""try:value = message.valuekey = message.keypartition = message.partitionoffset = message.offsetself.message_count += 1# 格式化输出,显示消费者IDprint(f"[{self.consumer_id}] 📨 消息 #{self.message_count} "f"| 分区:{partition} | 偏移量:{offset} | 键:{key}")if isinstance(value, dict):if "user_id" in value:print(f" 👤 用户: {value['user_id']}")if "batch_id" in value:print(f" 📦 批次: {value['batch_id']}")# 模拟处理时间time.sleep(0.2)except Exception as e:print(f"[{self.consumer_id}] ❌ 处理消息出错: {e}")def run(self):"""运行消费者"""print(f"[{self.consumer_id}] 🚀 开始消费...")try:while self.running:messages = self.consumer.poll(timeout_ms=1000, max_records=10)for topic_partition, partition_messages in messages.items():for message in partition_messages:if not self.running:breakself.process_message(message)# 定期打印状态if self.message_count > 0 and self.message_count % 10 == 0:print(f"[{self.consumer_id}] 💡 已处理 {self.message_count} 条消息")except Exception as e:print(f"[{self.consumer_id}] ❌ 运行出错: {e}")finally:self.cleanup()def stop(self):"""停止消费者"""self.running = Falsedef cleanup(self):"""清理资源"""try:if self.message_count > 0:duration = time.time() - self.start_timethroughput = self.message_count / durationprint(f"[{self.consumer_id}] 📊 统计: {self.message_count} 条消息, "f"吞吐量 {throughput:.1f} 消息/秒")if not self.auto_commit:self.consumer.commit()self.consumer.close()print(f"[{self.consumer_id}] 🔒 已关闭")except Exception as e:print(f"[{self.consumer_id}] ❌ 清理出错: {e}")def run_consumer_group(num_consumers=3, topic="demo.hello", group_id="demo_group"):"""运行消费者组"""print(f"🎬 启动消费者组演示")print(f" 主题: {topic}")print(f" 消费者组: {group_id}")print(f" 消费者数量: {num_consumers}")print("=" * 60)# 等待Kafka就绪print("⏳ 等待Kafka服务就绪...")wait_kafka()print("✅ Kafka服务已就绪")# 创建消费者列表consumers = []threads = []try:# 创建并启动所有消费者for i in range(num_consumers):consumer_id = f"Consumer-{i+1}"consumer = GroupConsumer(consumer_id, topic, group_id, auto_commit=False)consumers.append(consumer)# 在新线程中运行消费者thread = threading.Thread(target=consumer.run, daemon=True)threads.append(thread)thread.start()# 间隔启动,便于观察分区分配time.sleep(2)print(f"\n🎯 所有消费者已启动,观察分区分配情况...")print("💡 提示:相同键的消息会分配到相同分区")print("💡 提示:按 Ctrl+C 停止所有消费者")print("-" * 60)# 等待用户中断while True:time.sleep(1)except KeyboardInterrupt:print(f"\n🛑 收到中断信号,正在停止所有消费者...")except Exception as e:print(f"❌ 运行出错: {e}")finally:# 停止所有消费者for consumer in consumers:consumer.stop()# 等待所有线程结束for thread in threads:thread.join(timeout=5)print("\n🎉 消费者组演示结束")def main():"""主函数"""print("🎬 Kafka 消费者组演示")print("=" * 50)# 配置参数topic = "demo.hello"group_id = "lesson01_demo_group"num_consumers = 3 # 建议与主题分区数相同print("📋 演示说明:")print("1. 启动多个消费者,观察分区如何分配")print("2. 相同键的消息会分配到相同分区")print("3. 每个分区只会被组内一个消费者消费")print("4. 观察负载均衡效果")print()try:run_consumer_group(num_consumers, topic, group_id)except Exception as e:print(f"❌ 演示运行出错: {e}")print("\n👋 演示结束")if __name__ == "__main__":main()
📖 扩展阅读
- Kafka 官方文档
- kafka-python 文档
- Kafka 设计原理
👉 下一篇课程:Kafka Topic 与 Partition 深入理解 —— 我们将通过实验观察"消息如何分区"和"消费者组如何分工"。