当前位置: 首页 > news >正文

Python版Kafka基础班 - 学习笔记

📚 目录

  1. Kafka概述
  2. Kafka核心概念
  3. 环境搭建
  4. Python Kafka客户端
  5. 生产者开发
  6. 消费者开发
  7. 实战案例
  8. 最佳实践

1. Kafka概述

1.1 什么是Kafka?

  • 定义:Apache Kafka是一个分布式流处理平台
  • 特点
    • 高吞吐量、低延迟
    • 可扩展性强
    • 持久化存储
    • 容错性好

1.2 Kafka的应用场景

  • 实时数据流处理
  • 消息队列
  • 日志收集
  • 事件溯源
  • 微服务间通信

1.3 Kafka vs 其他消息队列

# 对比表
"""
特性          Kafka    RabbitMQ    ActiveMQ
吞吐量         高        中          中
持久化         是        是          是
分布式         是        否          是
复杂度         高        中          中
"""

2. Kafka核心概念

2.1 基础概念图

Producer → Topic(Partition 0, 1, 2...) → Consumer Group↓Kafka Cluster↓ZooKeeper

2.2 详细概念解释

Topic(主题)
# Topic是消息的分类标签
# 例如:用户行为数据主题
topic_name = "user_behavior"
Partition(分区)
# 每个Topic可以有多个分区,实现并行处理
# 分区编号从0开始
partitions = [0, 1, 2]  # 3个分区
Producer(生产者)
# 发送消息到Kafka的客户端
# 决定消息发送到哪个分区
Consumer(消费者)
# 从Kafka读取消息的客户端
# 可以订阅一个或多个Topic
Consumer Group(消费者组)
# 多个消费者组成一个组,共同消费Topic中的消息
# 每个分区只能被组内一个消费者消费

3. 环境搭建

3.1 Docker方式(推荐)

# docker-compose.yml
version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:latestenvironment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: confluentinc/cp-kafka:latestdepends_on:- zookeeperports:- 9092:9092environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# 启动服务
docker-compose up -d# 验证服务
docker ps

3.2 Python依赖安装

# 安装kafka-python库
pip install kafka-python# 可选:安装其他有用的库
pip install json
pip install logging

4. Python Kafka客户端

4.1 基础配置

# config.py
KAFKA_CONFIG = {'bootstrap_servers': ['localhost:9092'],'client_id': 'python-kafka-client','group_id': 'my-group','auto_offset_reset': 'earliest','enable_auto_commit': True,'auto_commit_interval_ms': 1000,'value_serializer': lambda x: json.dumps(x).encode('utf-8'),'value_deserializer': lambda m: json.loads(m.decode('utf-8'))
}

4.2 连接测试

# connection_test.py
from kafka import KafkaProducer, KafkaConsumer
import jsondef test_kafka_connection():try:# 测试生产者连接producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'))# 测试消费者连接consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],group_id='test-group')print("✅ Kafka连接成功!")producer.close()consumer.close()except Exception as e:print(f"❌ Kafka连接失败: {e}")if __name__ == "__main__":test_kafka_connection()

5. 生产者开发

5.1 基础生产者

# producer_basic.py
from kafka import KafkaProducer
import json
import timeclass BasicProducer:def __init__(self):self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'),key_serializer=lambda x: x.encode('utf-8') if x else None)def send_message(self, topic, message, key=None):"""发送单条消息"""try:future = self.producer.send(topic, value=message, key=key)# 等待发送完成record_metadata = future.get(timeout=10)print(f"消息发送成功: topic={record_metadata.topic}, "f"partition={record_metadata.partition}, "f"offset={record_metadata.offset}")except Exception as e:print(f"消息发送失败: {e}")def send_batch_messages(self, topic, messages):"""批量发送消息"""for i, message in enumerate(messages):self.send_message(topic, message, key=f"key-{i}")time.sleep(0.1)  # 避免发送过快def close(self):"""关闭生产者"""self.producer.flush()  # 确保所有消息都发送完成self.producer.close()# 使用示例
if __name__ == "__main__":producer = BasicProducer()# 发送单条消息message = {"user_id": 123, "action": "login", "timestamp": time.time()}producer.send_message("user_events", message, "user_123")# 批量发送messages = [{"user_id": i, "action": "click", "timestamp": time.time()}for i in range(1, 6)]producer.send_batch_messages("user_events", messages)producer.close()

5.2 异步生产者

# producer_async.py
from kafka import KafkaProducer
import json
import threading
import timeclass AsyncProducer:def __init__(self):self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'),key_serializer=lambda x: x.encode('utf-8') if x else None,# 异步配置acks='all',  # 等待所有副本确认retries=3,   # 重试次数batch_size=16384,  # 批次大小linger_ms=10,      # 等待时间buffer_memory=33554432  # 缓冲区大小)def success_callback(self, record_metadata):"""发送成功回调"""print(f"✅ 消息发送成功: "f"topic={record_metadata.topic}, "f"partition={record_metadata.partition}, "f"offset={record_metadata.offset}")def error_callback(self, exception):"""发送失败回调"""print(f"❌ 消息发送失败: {exception}")def send_async(self, topic, message, key=None):"""异步发送消息"""future = self.producer.send(topic, value=message, key=key)future.add_callback(self.success_callback)future.add_errback(self.error_callback)return futuredef close(self):self.producer.flush()self.producer.close()# 使用示例
if __name__ == "__main__":producer = AsyncProducer()# 异步发送多条消息futures = []for i in range(10):message = {"id": i, "data": f"message-{i}", "timestamp": time.time()}future = producer.send_async("test_topic", message, f"key-{i}")futures.append(future)# 等待所有消息发送完成for future in futures:try:future.get(timeout=10)except Exception as e:print(f"消息发送异常: {e}")producer.close()

5.3 自定义分区策略

# producer_partition.py
from kafka import KafkaProducer
from kafka.partitioner.hashed import HashPartitioner
import json
import hashlibclass CustomPartitioner:def __init__(self, partitions):self.partitions = partitionsdef __call__(self, key_bytes, all_partitions, available_partitions):"""自定义分区逻辑"""if key_bytes is None:return 0# 根据key的hash值选择分区hash_value = hashlib.md5(key_bytes).hexdigest()partition_id = int(hash_value, 16) % len(all_partitions)return partition_idclass PartitionProducer:def __init__(self):self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'),key_serializer=lambda x: x.encode('utf-8'),partitioner=CustomPartitioner(3)  # 假设有3个分区)def send_with_partition_key(self, topic, message, partition_key):"""根据分区键发送消息"""self.producer.send(topic, value=message, key=partition_key)def close(self):self.producer.flush()self.producer.close()

6. 消费者开发

6.1 基础消费者

# consumer_basic.py
from kafka import KafkaConsumer
import jsonclass BasicConsumer:def __init__(self, topics, group_id='default-group'):self.consumer = KafkaConsumer(*topics,bootstrap_servers=['localhost:9092'],group_id=group_id,auto_offset_reset='earliest',enable_auto_commit=True,auto_commit_interval_ms=1000,value_deserializer=lambda m: json.loads(m.decode('utf-8')))def consume_messages(self):"""消费消息"""print("开始消费消息...")try:for message in self.consumer:self.process_message(message)except KeyboardInterrupt:print("停止消费...")finally:self.consumer.close()def process_message(self, message):"""处理单条消息"""print(f"收到消息:")print(f"  Topic: {message.topic}")print(f"  Partition: {message.partition}")print(f"  Offset: {message.offset}")print(f"  Key: {message.key}")print(f"  Value: {message.value}")print(f"  Timestamp: {message.timestamp}")print("-" * 50)# 使用示例
if __name__ == "__main__":consumer = BasicConsumer(['user_events'], 'user-event-group')consumer.consume_messages()

6.2 手动提交消费者

# consumer_manual_commit.py
from kafka import KafkaConsumer, TopicPartition
import jsonclass ManualCommitConsumer:def __init__(self, topics, group_id='manual-commit-group'):self.consumer = KafkaConsumer(*topics,bootstrap_servers=['localhost:9092'],group_id=group_id,auto_offset_reset='earliest',enable_auto_commit=False,  # 关闭自动提交value_deserializer=lambda m: json.loads(m.decode('utf-8')))self.message_count = 0self.commit_interval = 10  # 每10条消息提交一次def consume_messages(self):"""消费消息并手动提交offset"""print("开始消费消息 (手动提交模式)...")try:for message in self.consumer:success = self.process_message(message)if success:self.message_count += 1# 每处理N条消息提交一次if self.message_count % self.commit_interval == 0:self.consumer.commit()print(f"✅ 已提交offset, 处理了 {self.message_count} 条消息")except KeyboardInterrupt:print("停止消费...")# 最后提交一次self.consumer.commit()finally:self.consumer.close()def process_message(self, message):"""处理消息,返回是否处理成功"""try:print(f"处理消息: {message.value}")# 这里添加具体的业务逻辑# 模拟处理时间import timetime.sleep(0.1)return Trueexcept Exception as e:print(f"处理消息失败: {e}")return False# 使用示例
if __name__ == "__main__":consumer = ManualCommitConsumer(['user_events'])consumer.consume_messages()

6.3 多线程消费者

# consumer_multithread.py
from kafka import KafkaConsumer
import json
import threading
import queue
import timeclass MultiThreadConsumer:def __init__(self, topics, group_id='multithread-group', worker_count=3):self.topics = topicsself.group_id = group_idself.worker_count = worker_countself.message_queue = queue.Queue()self.workers = []self.running = Falsedef create_consumer(self):"""创建消费者"""return KafkaConsumer(*self.topics,bootstrap_servers=['localhost:9092'],group_id=self.group_id,auto_offset_reset='earliest',value_deserializer=lambda m: json.loads(m.decode('utf-8')))def consumer_thread(self):"""消费者线程 - 从Kafka读取消息"""consumer = self.create_consumer()print(f"消费者线程启动: {threading.current_thread().name}")try:for message in consumer:if not self.running:breakself.message_queue.put(message)finally:consumer.close()def worker_thread(self, worker_id):"""工作线程 - 处理消息"""print(f"工作线程 {worker_id} 启动")while self.running:try:# 从队列获取消息,超时1秒message = self.message_queue.get(timeout=1)self.process_message(message, worker_id)self.message_queue.task_done()except queue.Empty:continueexcept Exception as e:print(f"工作线程 {worker_id} 处理消息异常: {e}")def process_message(self, message, worker_id):"""处理消息"""print(f"工作线程 {worker_id} 处理消息: {message.value}")# 模拟处理时间time.sleep(0.5)def start(self):"""启动多线程消费"""self.running = True# 启动消费者线程consumer_thread = threading.Thread(target=self.consumer_thread,name="consumer-thread")consumer_thread.start()# 启动工作线程for i in range(self.worker_count):worker = threading.Thread(target=self.worker_thread,args=(i,),name=f"worker-{i}")worker.start()self.workers.append(worker)try:# 等待消费者线程consumer_thread.join()# 等待队列清空self.message_queue.join()except KeyboardInterrupt:print("收到停止信号...")finally:self.stop()def stop(self):"""停止消费"""self.running = False# 等待所有工作线程结束for worker in self.workers:worker.join()print("所有线程已停止")# 使用示例
if __name__ == "__main__":consumer = MultiThreadConsumer(['user_events'], worker_count=3)consumer.start()

7. 实战案例

7.1 用户行为日志处理系统

# user_behavior_system.py
from kafka import KafkaProducer, KafkaConsumer
import json
import time
import random
from datetime import datetime
import threadingclass UserBehaviorLogger:"""用户行为日志生产者"""def __init__(self):self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'),key_serializer=lambda x: x.encode('utf-8'))self.topic = 'user_behavior'def generate_user_event(self):"""生成用户行为事件"""actions = ['login', 'logout', 'click', 'view', 'purchase', 'search']user_id = random.randint(1000, 9999)event = {'user_id': user_id,'action': random.choice(actions),'page': f'/page/{random.randint(1, 10)}','timestamp': datetime.now().isoformat(),'ip': f'192.168.1.{random.randint(1, 255)}','device': random.choice(['mobile', 'desktop', 'tablet'])}return event, str(user_id)def start_logging(self, duration=60):"""开始生成日志"""print(f"开始生成用户行为日志,持续 {duration} 秒...")start_time = time.time()while time.time() - start_time < duration:event, user_key = self.generate_user_event()self.producer.send(self.topic, value=event, key=user_key)print(f"发送事件: {event['user_id']} - {event['action']}")time.sleep(random.uniform(0.1, 1.0))self.producer.flush()self.producer.close()class UserBehaviorProcessor:"""用户行为日志处理器"""def __init__(self):self.consumer = KafkaConsumer('user_behavior',bootstrap_servers=['localhost:9092'],group_id='behavior-processor',auto_offset_reset='earliest',value_deserializer=lambda m: json.loads(m.decode('utf-8')))self.stats = {'total_events': 0,'actions': {},'devices': {},'users': set()}def process_event(self, event):"""处理单个事件"""self.stats['total_events'] += 1self.stats['users'].add(event['user_id'])# 统计行为类型action = event['action']self.stats['actions'][action] = self.stats['actions'].get(action, 0) + 1# 统计设备类型device = event['device']self.stats['devices'][device] = self.stats['devices'].get(device, 0) + 1# 特殊处理:购买事件if action == 'purchase':self.handle_purchase_event(event)def handle_purchase_event(self, event):"""处理购买事件"""print(f"🛒 购买事件: 用户 {event['user_id']}{event['timestamp']}")def print_stats(self):"""打印统计信息"""print("\n" + "="*50)print("用户行为统计")print("="*50)print(f"总事件数: {self.stats['total_events']}")print(f"用户数: {len(self.stats['users'])}")print("\n行为统计:")for action, count in self.stats['actions'].items():print(f"  {action}: {count}")print("\n设备统计:")for device, count in self.stats['devices'].items():print(f"  {device}: {count}")print("="*50)def start_processing(self):"""开始处理日志"""print("开始处理用户行为日志...")try:for message in self.consumer:event = message.valueself.process_event(event)# 每100条消息打印一次统计if self.stats['total_events'] % 100 == 0:self.print_stats()except KeyboardInterrupt:print("\n停止处理...")self.print_stats()finally:self.consumer.close()# 运行示例
def run_demo():"""运行演示"""print("启动用户行为日志系统演示")# 启动处理器(在后台线程)processor = UserBehaviorProcessor()processor_thread = threading.Thread(target=processor.start_processing)processor_thread.daemon = Trueprocessor_thread.start()# 等待一下让消费者启动time.sleep(2)# 开始生成日志logger = UserBehaviorLogger()logger.start_logging(30)  # 运行30秒if __name__ == "__main__":run_demo()

7.2 实时数据流处理

# realtime_stream_processor.py
from kafka import KafkaProducer, KafkaConsumer
import json
import time
import threading
from collections import deque, defaultdict
import statisticsclass DataStream:"""数据流生成器"""def __init__(self):self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'))self.topic = 'sensor_data'def generate_sensor_data(self):"""生成传感器数据"""import randomsensors = ['temperature', 'humidity', 'pressure', 'cpu_usage', 'memory_usage']while True:for sensor in sensors:data = {'sensor_id': sensor,'value': round(random.uniform(0, 100), 2),'timestamp': time.time(),'location': random.choice(['server1', 'server2', 'server3'])}self.producer.send(self.topic, value=data)print(f"发送数据: {sensor} = {data['value']}")time.sleep(0.5)class StreamProcessor:"""实时流处理器"""def __init__(self, window_size=10):self.consumer = KafkaConsumer('sensor_data',bootstrap_servers=['localhost:9092'],group_id='stream-processor',auto_offset_reset='earliest',value_deserializer=lambda m: json.loads(m.decode('utf-8')))self.window_size = window_sizeself.data_windows = defaultdict(lambda: deque(maxlen=window_size))self.alert_thresholds = {'temperature': {'min': 10, 'max': 80},'cpu_usage': {'max': 90},'memory_usage': {'max': 85}}def process_stream(self):"""处理数据流"""print("开始处理实时数据流...")for message in self.consumer:data = message.valueself.add_to_window(data)self.check_alerts(data)self.calculate_statistics(data['sensor_id'])def add_to_window(self, data):"""添加数据到滑动窗口"""sensor_id = data['sensor_id']self.data_windows[sensor_id].append(data)def check_alerts(self, data):"""检查告警"""sensor_id = data['sensor_id']value = data['value']if sensor_id in self.alert_thresholds:thresholds = self.alert_thresholds[sensor_id]if 'max' in thresholds and value > thresholds['max']:self.trigger_alert(sensor_id, value, f"超过最大值 {thresholds['max']}")if 'min' in thresholds and value < thresholds['min']:self.trigger_alert(sensor_id, value, f"低于最小值 {thresholds['min']}")def trigger_alert(self, sensor_id, value, reason):"""触发告警"""print(f"🚨 告警: {sensor_id} = {value} ({reason})")# 这里可以发送邮件、短信等def calculate_statistics(self, sensor_id):"""计算统计信息"""window = self.data_windows[sensor_id]if len(window) >= self.window_size:values = [item['value'] for item in window]stats = {'sensor_id': sensor_id,'count': len(values),'avg': round(statistics.mean(values), 2),'min': min(values),'max': max(values),'median': round(statistics.median(values), 2)}print(f"📊 {sensor_id} 统计: {stats}")# 运行演示
def run_stream_demo():"""运行流处理演示"""# 启动数据生成器generator = DataStream()generator_thread = threading.Thread(target=generator.generate_sensor_data)generator_thread.daemon = Truegenerator_thread.start()# 等待数据生成器启动time.sleep(2)# 启动流处理器processor = StreamProcessor()processor.process_stream()if __name__ == "__main__":run_stream_demo()

8. 最佳实践

8.1 性能优化

# performance_optimization.py
from kafka import KafkaProducer, KafkaConsumerclass OptimizedProducer:"""高性能生产者配置"""def __init__(self):self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'],# 性能优化配置acks='1',           # 等待leader确认即可retries=3,          # 重试次数batch_size=16384,   # 16KB批次大小linger_ms=10,       # 等待10ms收集更多消息compression_type='snappy',  # 压缩类型buffer_memory=33554432,     # 32MB缓冲区max_in_flight_requests_per_connection=5,  # 并发请求数# 序列化器value_serializer=lambda x: json.dumps(x).encode('utf-8'),key_serializer=lambda x: x.encode('utf-8') if x else None)class OptimizedConsumer:"""高性能消费者配置"""def __init__(self, topics):self.consumer = KafkaConsumer(*topics,bootstrap_servers=['localhost:9092'],# 性能优化配置fetch_min_bytes=1024,      # 最小拉取1KBfetch_max_wait_ms=500,     # 最大等待500msmax_partition_fetch_bytes=1048576,  # 每个分区最大拉取1MB
http://www.dtcms.com/a/389328.html

相关文章:

  • IDEA 查看 Maven 依赖树与解决 Jar 包冲突
  • 【LVS入门宝典】LVS与Nginx、HAProxy的对比:四层(LVS) vs 七层(Nginx)的适用场景
  • 系统安全配置与加固
  • 【AI-Agent】AI游戏库
  • 病毒库更新原理
  • 服务器内存爆炸,日志无报错,通过分析 Dump 文件查找问题原因
  • 【Redis学习】服务端高并发分布式结构演变之路
  • 【JavaScript 性能优化实战】第三篇:内存泄漏排查与根治方案
  • 关于JavaScript性能优化实战的技术
  • 分布式流处理与消息传递——Paxos Stream 算法详解
  • ​​瑞芯微RK3576多路AHD摄像头实测演示,触觉智能配套AHD硬件方案
  • mysql删除数据库命令,如何安全彻底地删除MySQL数据库?
  • vscode中创建项目、虚拟环境,安装项目并添加到工作空间完整步骤来了
  • 如何快速传输TB级数据?公司大数据传输的终极解决方案
  • Linux的进程调度及内核实现
  • 使用BeanUtils返回前端为空值?
  • Windows Server数据库服务器安全加固
  • Linux TCP/IP调优实战,性能提升200%
  • Amazon ElastiCache:提升应用性能的云端缓存解决方案
  • 查找并替换 Excel 中的数据:Java 指南
  • 多线服务器具体是指什么?
  • Golang语言基础篇001_常量变量与数据类型
  • pytest文档1-环境准备与入门
  • MySQL 专题(四):MVCC(多版本并发控制)原理深度解析
  • 【开发者导航】在终端中运行任意图形应用:term.everything
  • [Python]pytest是什么?执行逻辑是什么?为什么要用它测试?
  • Nginx set指令不能使用在http块里,可以使用map指令
  • LeetCode 1759.统计同质子字符串的数目
  • 揭秘Linux文件管理与I/O重定向核心
  • 【PyTorch】DGL 报错FileNotFoundError: Cannot find DGL C++ graphbolt library