📚 目录
- Kafka概述
- Kafka核心概念
- 环境搭建
- Python Kafka客户端
- 生产者开发
- 消费者开发
- 实战案例
- 最佳实践
1. Kafka概述
1.1 什么是Kafka?
- 定义:Apache Kafka是一个分布式流处理平台
- 特点:
1.2 Kafka的应用场景
- 实时数据流处理
- 消息队列
- 日志收集
- 事件溯源
- 微服务间通信
1.3 Kafka vs 其他消息队列
"""
特性 Kafka RabbitMQ ActiveMQ
吞吐量 高 中 中
持久化 是 是 是
分布式 是 否 是
复杂度 高 中 中
"""
2. Kafka核心概念
2.1 基础概念图
Producer → Topic(Partition 0, 1, 2...) → Consumer Group↓Kafka Cluster↓ZooKeeper
2.2 详细概念解释
Topic(主题)
topic_name = "user_behavior"
Partition(分区)
partitions = [0, 1, 2]
Producer(生产者)
Consumer(消费者)
Consumer Group(消费者组)
3. 环境搭建
3.1 Docker方式(推荐)
version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:latestenvironment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: confluentinc/cp-kafka:latestdepends_on:- zookeeperports:- 9092:9092environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
docker-compose up -d
docker ps
3.2 Python依赖安装
pip install kafka-python
pip install json
pip install logging
4. Python Kafka客户端
4.1 基础配置
KAFKA_CONFIG = {'bootstrap_servers': ['localhost:9092'],'client_id': 'python-kafka-client','group_id': 'my-group','auto_offset_reset': 'earliest','enable_auto_commit': True,'auto_commit_interval_ms': 1000,'value_serializer': lambda x: json.dumps(x).encode('utf-8'),'value_deserializer': lambda m: json.loads(m.decode('utf-8'))
}
4.2 连接测试
from kafka import KafkaProducer, KafkaConsumer
import jsondef test_kafka_connection():try:producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'))consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],group_id='test-group')print("✅ Kafka连接成功!")producer.close()consumer.close()except Exception as e:print(f"❌ Kafka连接失败: {e}")if __name__ == "__main__":test_kafka_connection()
5. 生产者开发
5.1 基础生产者
from kafka import KafkaProducer
import json
import timeclass BasicProducer:def __init__(self):self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'),key_serializer=lambda x: x.encode('utf-8') if x else None)def send_message(self, topic, message, key=None):"""发送单条消息"""try:future = self.producer.send(topic, value=message, key=key)record_metadata = future.get(timeout=10)print(f"消息发送成功: topic={record_metadata.topic}, "f"partition={record_metadata.partition}, "f"offset={record_metadata.offset}")except Exception as e:print(f"消息发送失败: {e}")def send_batch_messages(self, topic, messages):"""批量发送消息"""for i, message in enumerate(messages):self.send_message(topic, message, key=f"key-{i}")time.sleep(0.1) def close(self):"""关闭生产者"""self.producer.flush() self.producer.close()
if __name__ == "__main__":producer = BasicProducer()message = {"user_id": 123, "action": "login", "timestamp": time.time()}producer.send_message("user_events", message, "user_123")messages = [{"user_id": i, "action": "click", "timestamp": time.time()}for i in range(1, 6)]producer.send_batch_messages("user_events", messages)producer.close()
5.2 异步生产者
from kafka import KafkaProducer
import json
import threading
import timeclass AsyncProducer:def __init__(self):self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'),key_serializer=lambda x: x.encode('utf-8') if x else None,acks='all', retries=3, batch_size=16384, linger_ms=10, buffer_memory=33554432 )def success_callback(self, record_metadata):"""发送成功回调"""print(f"✅ 消息发送成功: "f"topic={record_metadata.topic}, "f"partition={record_metadata.partition}, "f"offset={record_metadata.offset}")def error_callback(self, exception):"""发送失败回调"""print(f"❌ 消息发送失败: {exception}")def send_async(self, topic, message, key=None):"""异步发送消息"""future = self.producer.send(topic, value=message, key=key)future.add_callback(self.success_callback)future.add_errback(self.error_callback)return futuredef close(self):self.producer.flush()self.producer.close()
if __name__ == "__main__":producer = AsyncProducer()futures = []for i in range(10):message = {"id": i, "data": f"message-{i}", "timestamp": time.time()}future = producer.send_async("test_topic", message, f"key-{i}")futures.append(future)for future in futures:try:future.get(timeout=10)except Exception as e:print(f"消息发送异常: {e}")producer.close()
5.3 自定义分区策略
from kafka import KafkaProducer
from kafka.partitioner.hashed import HashPartitioner
import json
import hashlibclass CustomPartitioner:def __init__(self, partitions):self.partitions = partitionsdef __call__(self, key_bytes, all_partitions, available_partitions):"""自定义分区逻辑"""if key_bytes is None:return 0hash_value = hashlib.md5(key_bytes).hexdigest()partition_id = int(hash_value, 16) % len(all_partitions)return partition_idclass PartitionProducer:def __init__(self):self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'),key_serializer=lambda x: x.encode('utf-8'),partitioner=CustomPartitioner(3) )def send_with_partition_key(self, topic, message, partition_key):"""根据分区键发送消息"""self.producer.send(topic, value=message, key=partition_key)def close(self):self.producer.flush()self.producer.close()
6. 消费者开发
6.1 基础消费者
from kafka import KafkaConsumer
import jsonclass BasicConsumer:def __init__(self, topics, group_id='default-group'):self.consumer = KafkaConsumer(*topics,bootstrap_servers=['localhost:9092'],group_id=group_id,auto_offset_reset='earliest',enable_auto_commit=True,auto_commit_interval_ms=1000,value_deserializer=lambda m: json.loads(m.decode('utf-8')))def consume_messages(self):"""消费消息"""print("开始消费消息...")try:for message in self.consumer:self.process_message(message)except KeyboardInterrupt:print("停止消费...")finally:self.consumer.close()def process_message(self, message):"""处理单条消息"""print(f"收到消息:")print(f" Topic: {message.topic}")print(f" Partition: {message.partition}")print(f" Offset: {message.offset}")print(f" Key: {message.key}")print(f" Value: {message.value}")print(f" Timestamp: {message.timestamp}")print("-" * 50)
if __name__ == "__main__":consumer = BasicConsumer(['user_events'], 'user-event-group')consumer.consume_messages()
6.2 手动提交消费者
from kafka import KafkaConsumer, TopicPartition
import jsonclass ManualCommitConsumer:def __init__(self, topics, group_id='manual-commit-group'):self.consumer = KafkaConsumer(*topics,bootstrap_servers=['localhost:9092'],group_id=group_id,auto_offset_reset='earliest',enable_auto_commit=False, value_deserializer=lambda m: json.loads(m.decode('utf-8')))self.message_count = 0self.commit_interval = 10 def consume_messages(self):"""消费消息并手动提交offset"""print("开始消费消息 (手动提交模式)...")try:for message in self.consumer:success = self.process_message(message)if success:self.message_count += 1if self.message_count % self.commit_interval == 0:self.consumer.commit()print(f"✅ 已提交offset, 处理了 {self.message_count} 条消息")except KeyboardInterrupt:print("停止消费...")self.consumer.commit()finally:self.consumer.close()def process_message(self, message):"""处理消息,返回是否处理成功"""try:print(f"处理消息: {message.value}")import timetime.sleep(0.1)return Trueexcept Exception as e:print(f"处理消息失败: {e}")return False
if __name__ == "__main__":consumer = ManualCommitConsumer(['user_events'])consumer.consume_messages()
6.3 多线程消费者
from kafka import KafkaConsumer
import json
import threading
import queue
import timeclass MultiThreadConsumer:def __init__(self, topics, group_id='multithread-group', worker_count=3):self.topics = topicsself.group_id = group_idself.worker_count = worker_countself.message_queue = queue.Queue()self.workers = []self.running = Falsedef create_consumer(self):"""创建消费者"""return KafkaConsumer(*self.topics,bootstrap_servers=['localhost:9092'],group_id=self.group_id,auto_offset_reset='earliest',value_deserializer=lambda m: json.loads(m.decode('utf-8')))def consumer_thread(self):"""消费者线程 - 从Kafka读取消息"""consumer = self.create_consumer()print(f"消费者线程启动: {threading.current_thread().name}")try:for message in consumer:if not self.running:breakself.message_queue.put(message)finally:consumer.close()def worker_thread(self, worker_id):"""工作线程 - 处理消息"""print(f"工作线程 {worker_id} 启动")while self.running:try:message = self.message_queue.get(timeout=1)self.process_message(message, worker_id)self.message_queue.task_done()except queue.Empty:continueexcept Exception as e:print(f"工作线程 {worker_id} 处理消息异常: {e}")def process_message(self, message, worker_id):"""处理消息"""print(f"工作线程 {worker_id} 处理消息: {message.value}")time.sleep(0.5)def start(self):"""启动多线程消费"""self.running = Trueconsumer_thread = threading.Thread(target=self.consumer_thread,name="consumer-thread")consumer_thread.start()for i in range(self.worker_count):worker = threading.Thread(target=self.worker_thread,args=(i,),name=f"worker-{i}")worker.start()self.workers.append(worker)try:consumer_thread.join()self.message_queue.join()except KeyboardInterrupt:print("收到停止信号...")finally:self.stop()def stop(self):"""停止消费"""self.running = Falsefor worker in self.workers:worker.join()print("所有线程已停止")
if __name__ == "__main__":consumer = MultiThreadConsumer(['user_events'], worker_count=3)consumer.start()
7. 实战案例
7.1 用户行为日志处理系统
from kafka import KafkaProducer, KafkaConsumer
import json
import time
import random
from datetime import datetime
import threadingclass UserBehaviorLogger:"""用户行为日志生产者"""def __init__(self):self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'),key_serializer=lambda x: x.encode('utf-8'))self.topic = 'user_behavior'def generate_user_event(self):"""生成用户行为事件"""actions = ['login', 'logout', 'click', 'view', 'purchase', 'search']user_id = random.randint(1000, 9999)event = {'user_id': user_id,'action': random.choice(actions),'page': f'/page/{random.randint(1, 10)}','timestamp': datetime.now().isoformat(),'ip': f'192.168.1.{random.randint(1, 255)}','device': random.choice(['mobile', 'desktop', 'tablet'])}return event, str(user_id)def start_logging(self, duration=60):"""开始生成日志"""print(f"开始生成用户行为日志,持续 {duration} 秒...")start_time = time.time()while time.time() - start_time < duration:event, user_key = self.generate_user_event()self.producer.send(self.topic, value=event, key=user_key)print(f"发送事件: {event['user_id']} - {event['action']}")time.sleep(random.uniform(0.1, 1.0))self.producer.flush()self.producer.close()class UserBehaviorProcessor:"""用户行为日志处理器"""def __init__(self):self.consumer = KafkaConsumer('user_behavior',bootstrap_servers=['localhost:9092'],group_id='behavior-processor',auto_offset_reset='earliest',value_deserializer=lambda m: json.loads(m.decode('utf-8')))self.stats = {'total_events': 0,'actions': {},'devices': {},'users': set()}def process_event(self, event):"""处理单个事件"""self.stats['total_events'] += 1self.stats['users'].add(event['user_id'])action = event['action']self.stats['actions'][action] = self.stats['actions'].get(action, 0) + 1device = event['device']self.stats['devices'][device] = self.stats['devices'].get(device, 0) + 1if action == 'purchase':self.handle_purchase_event(event)def handle_purchase_event(self, event):"""处理购买事件"""print(f"🛒 购买事件: 用户 {event['user_id']} 在 {event['timestamp']}")def print_stats(self):"""打印统计信息"""print("\n" + "="*50)print("用户行为统计")print("="*50)print(f"总事件数: {self.stats['total_events']}")print(f"用户数: {len(self.stats['users'])}")print("\n行为统计:")for action, count in self.stats['actions'].items():print(f" {action}: {count}")print("\n设备统计:")for device, count in self.stats['devices'].items():print(f" {device}: {count}")print("="*50)def start_processing(self):"""开始处理日志"""print("开始处理用户行为日志...")try:for message in self.consumer:event = message.valueself.process_event(event)if self.stats['total_events'] % 100 == 0:self.print_stats()except KeyboardInterrupt:print("\n停止处理...")self.print_stats()finally:self.consumer.close()
def run_demo():"""运行演示"""print("启动用户行为日志系统演示")processor = UserBehaviorProcessor()processor_thread = threading.Thread(target=processor.start_processing)processor_thread.daemon = Trueprocessor_thread.start()time.sleep(2)logger = UserBehaviorLogger()logger.start_logging(30) if __name__ == "__main__":run_demo()
7.2 实时数据流处理
from kafka import KafkaProducer, KafkaConsumer
import json
import time
import threading
from collections import deque, defaultdict
import statisticsclass DataStream:"""数据流生成器"""def __init__(self):self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'))self.topic = 'sensor_data'def generate_sensor_data(self):"""生成传感器数据"""import randomsensors = ['temperature', 'humidity', 'pressure', 'cpu_usage', 'memory_usage']while True:for sensor in sensors:data = {'sensor_id': sensor,'value': round(random.uniform(0, 100), 2),'timestamp': time.time(),'location': random.choice(['server1', 'server2', 'server3'])}self.producer.send(self.topic, value=data)print(f"发送数据: {sensor} = {data['value']}")time.sleep(0.5)class StreamProcessor:"""实时流处理器"""def __init__(self, window_size=10):self.consumer = KafkaConsumer('sensor_data',bootstrap_servers=['localhost:9092'],group_id='stream-processor',auto_offset_reset='earliest',value_deserializer=lambda m: json.loads(m.decode('utf-8')))self.window_size = window_sizeself.data_windows = defaultdict(lambda: deque(maxlen=window_size))self.alert_thresholds = {'temperature': {'min': 10, 'max': 80},'cpu_usage': {'max': 90},'memory_usage': {'max': 85}}def process_stream(self):"""处理数据流"""print("开始处理实时数据流...")for message in self.consumer:data = message.valueself.add_to_window(data)self.check_alerts(data)self.calculate_statistics(data['sensor_id'])def add_to_window(self, data):"""添加数据到滑动窗口"""sensor_id = data['sensor_id']self.data_windows[sensor_id].append(data)def check_alerts(self, data):"""检查告警"""sensor_id = data['sensor_id']value = data['value']if sensor_id in self.alert_thresholds:thresholds = self.alert_thresholds[sensor_id]if 'max' in thresholds and value > thresholds['max']:self.trigger_alert(sensor_id, value, f"超过最大值 {thresholds['max']}")if 'min' in thresholds and value < thresholds['min']:self.trigger_alert(sensor_id, value, f"低于最小值 {thresholds['min']}")def trigger_alert(self, sensor_id, value, reason):"""触发告警"""print(f"🚨 告警: {sensor_id} = {value} ({reason})")def calculate_statistics(self, sensor_id):"""计算统计信息"""window = self.data_windows[sensor_id]if len(window) >= self.window_size:values = [item['value'] for item in window]stats = {'sensor_id': sensor_id,'count': len(values),'avg': round(statistics.mean(values), 2),'min': min(values),'max': max(values),'median': round(statistics.median(values), 2)}print(f"📊 {sensor_id} 统计: {stats}")
def run_stream_demo():"""运行流处理演示"""generator = DataStream()generator_thread = threading.Thread(target=generator.generate_sensor_data)generator_thread.daemon = Truegenerator_thread.start()time.sleep(2)processor = StreamProcessor()processor.process_stream()if __name__ == "__main__":run_stream_demo()
8. 最佳实践
8.1 性能优化
from kafka import KafkaProducer, KafkaConsumerclass OptimizedProducer:"""高性能生产者配置"""def __init__(self):self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'],acks='1', retries=3, batch_size=16384, linger_ms=10, compression_type='snappy', buffer_memory=33554432, max_in_flight_requests_per_connection=5, value_serializer=lambda x: json.dumps(x).encode('utf-8'),key_serializer=lambda x: x.encode('utf-8') if x else None)class OptimizedConsumer:"""高性能消费者配置"""def __init__(self, topics):self.consumer = KafkaConsumer(*topics,bootstrap_servers=['localhost:9092'],fetch_min_bytes=1024, fetch_max_wait_ms=500, max_partition_fetch_bytes=1048576,