当前位置: 首页 > news >正文

使用Python构建Kafka示例项目

新建项目

mkdir python-kafka-test
cd python-kafka-test

安装依赖

pip install confluent_kafka

创建配置文件

# Kafka配置文件

# Kafka服务器配置
KAFKA_CONFIG = {
    'bootstrap.servers': 'localhost:9092',
    # 生产者特定配置
    'producer': {
        'client.id': 'python-kafka-producer',
        'acks': 'all',                 # 确保消息被所有副本确认
        'retries': 3,                  # 重试次数
        'retry.backoff.ms': 1000,      # 重试间隔
        'batch.size': 16384,           # 批处理大小
        'linger.ms': 5,                # 等待时间以允许更多消息加入批次
        'compression.type': 'snappy',  # 压缩类型
    },
    # 消费者特定配置
    'consumer': {
        'group.id': 'notification-group',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': True,
        'auto.commit.interval.ms': 5000,
        'session.timeout.ms': 30000,
        'max.poll.interval.ms': 300000,
        'heartbeat.interval.ms': 10000,
    }
}

# 主题配置
TOPICS = {
    'email': 'email-topic',
    'sms': 'sms-topic'
}

创建Kafka生产者

import json
import logging
import signal
import sys
from confluent_kafka import Producer
from config import KAFKA_CONFIG, TOPICS

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)
logger = logging.getLogger('kafka-producer')

# 合并配置
producer_config = {**KAFKA_CONFIG, **KAFKA_CONFIG.get('producer', {})}
# 移除嵌套的producer配置,避免冲突
if 'producer' in producer_config:
    del producer_config['producer']
if 'consumer' in producer_config:
    del producer_config['consumer']

# 创建Producer实例
p = Producer(producer_config)

# 标记是否正在关闭
shutting_down = False

def signal_handler(sig, frame):
    """处理终止信号,确保优雅关闭"""
    global shutting_down
    if shutting_down:
        return
    shutting_down = True
    logger.info("接收到终止信号,正在优雅关闭...")
    # 确保所有消息都被发送
    p.flush(10)  # 等待最多10秒
    logger.info("生产者已关闭")
    sys.exit(0)

# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

def delivery_report(err, msg):
    """消息发送回调函数"""
    if err is not None:
        logger.error(f'消息发送失败: {err}')
    else:
        logger.info(f'消息已发送到 {msg.topic()} [分区 {msg.partition()}]')

def send_notification(topic_key, payload, key=None):
    """发送通知消息到指定主题
    
    Args:
        topic_key: 主题键名(在TOPICS字典中定义)
        payload: 消息内容(字典或JSON字符串)
        key: 可选的消息键
    
    Returns:
        bool: 是否成功将消息加入发送队列
    """
    try:
        # 获取实际主题名
        topic = TOPICS.get(topic_key, topic_key)
        
        # 如果payload是字典,转换为JSON字符串
        if isinstance(payload, dict):
            payload = json.dumps(payload)
        
        # 发送消息
        p.produce(
            topic, 
            payload.encode('utf-8'), 
            key=key.encode('utf-8') if key else None,
            callback=delivery_report
        )
        # 轮询一次以触发回调
        p.poll(0)
        
        logger.info(f'消息已加入发送队列: {topic}')
        return True
    except Exception as e:
        logger.error(f'发送消息时出错: {e}')
        return False

# 使用示例
if __name__ == "__main__":
    try:
        # 发送邮件通知
        email_payload = {
            "to": "receiver@example.com", 
            "from": "sender@example.com", 
            "subject": "Sample Email", 
            "body": "This is a sample email notification"
        }
        send_notification('email', email_payload)
        
        # 发送短信通知
        sms_payload = {
            "phoneNumber": "1234567890", 
            "message": "This is a sample SMS notification"
        }
        send_notification('sms', sms_payload)
        
        # 确保所有消息都被发送
        remaining = p.flush(timeout=5)
        if remaining > 0:
            logger.warning(f'仍有 {remaining} 条消息未发送完成')
        else:
            logger.info('所有消息已成功发送')
            
    except KeyboardInterrupt:
        logger.info("程序被用户中断")
    except Exception as e:
        logger.error(f"发生错误: {e}")
    finally:
        # 确保所有消息都被发送
        p.flush(timeout=5)

创建Kafka消费者

import json
import logging
import signal
import sys
from confluent_kafka import Consumer, KafkaError
from config import KAFKA_CONFIG, TOPICS

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)
logger = logging.getLogger('kafka-consumer')

# 合并配置
consumer_config = {**KAFKA_CONFIG, **KAFKA_CONFIG.get('consumer', {})}
# 移除嵌套的配置,避免冲突
if 'producer' in consumer_config:
    del consumer_config['producer']
if 'consumer' in consumer_config:
    del consumer_config['consumer']

# 创建Consumer实例
c = Consumer(consumer_config)

# 标记是否正在关闭
shutting_down = False

def signal_handler(sig, frame):
    """处理终止信号,确保优雅关闭"""
    global shutting_down
    if shutting_down:
        return
    shutting_down = True
    logger.info("接收到终止信号,正在优雅关闭...")
    sys.exit(0)

# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

def process_message(msg):
    """处理接收到的消息
    
    Args:
        msg: Kafka消息对象
    """
    try:
        topic = msg.topic()
        value = msg.value().decode("utf-8")
        key = msg.key().decode("utf-8") if msg.key() else None
        
        # 尝试解析JSON
        try:
            payload = json.loads(value)
            logger.info(f'接收到消息 [主题: {topic}, 键: {key}]')
            logger.debug(f'消息内容: {payload}')
        except json.JSONDecodeError:
            logger.info(f'接收到非JSON消息 [主题: {topic}, 键: {key}]: {value}')
        
        # 根据主题类型处理不同的消息
        if topic == TOPICS['email']:
            handle_email_notification(payload if 'payload' in locals() else value)
        elif topic == TOPICS['sms']:
            handle_sms_notification(payload if 'payload' in locals() else value)
        else:
            logger.warning(f'收到未知主题的消息: {topic}')
            
    except Exception as e:
        logger.error(f'处理消息时出错: {e}')

def handle_email_notification(payload):
    """处理邮件通知"""
    # 这里实现实际的邮件发送逻辑
    logger.info(f'处理邮件通知: {payload}')

def handle_sms_notification(payload):
    """处理短信通知"""
    # 这里实现实际的短信发送逻辑
    logger.info(f'处理短信通知: {payload}')

def main():
    try:
        # 订阅主题
        topics_to_subscribe = list(TOPICS.values())
        logger.info(f'订阅主题: {topics_to_subscribe}')
        c.subscribe(topics_to_subscribe)
        
        logger.info('开始消费消息...')
        while not shutting_down:
            msg = c.poll(1.0)  # 超时时间1秒
            
            if msg is None:
                continue
                
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # 到达分区末尾,不是错误
                    logger.debug(f'到达分区末尾: {msg.topic()} [{msg.partition()}]')
                    continue
                else:
                    # 其他错误
                    logger.error(f'Kafka错误: {msg.error()}')
                    break
            
            # 处理消息
            process_message(msg)
            
    except KeyboardInterrupt:
        logger.info("程序被用户中断")
    except Exception as e:
        logger.error(f"发生错误: {e}")
    finally:
        # 关闭消费者
        logger.info("关闭消费者...")
        c.close()
        logger.info("消费者已关闭")

if __name__ == "__main__":
    main()

运行项目

打开终端运行命令

python producer.py
python consumer.py

可以看到终端输出正常

详细代码:https://github.com/wan88888/python-kafka-test

相关文章:

  • 视频设备轨迹回放平台EasyCVR综合智能化,搭建运动场体育赛事直播方案
  • es8实现向量检索与关键词匹配混合搜索
  • 算法题(114):矩阵距离
  • 计算机网络 3-1 数据链路层
  • 从零开始开发HarmonyOS应用并上架
  • AI 防口误指南_LLM 输出安全实践
  • 问题:tomcat下部署eureka双重路径
  • Laraver SQL日志 服务开发
  • wsl2配置proxy
  • git配置github
  • [c语言日寄]文件操作
  • OpenAI发布PaperBench,AI代理复现研究能力面临新考验
  • Ubuntu 22.04 一键部署openManus
  • 轻量级搜索接口技术解析:快速实现关键词检索的Java/Python实践
  • 最新全开源码支付系统,赠送3套模板
  • 深度学习基础
  • 在线Pdf文档转换成Excel文档,无需下载,快速转换,批量转换
  • 再来1章linux 系列-0. C语言过、Java半静对、Python纯动和C++对+C
  • 代码随想录算法训练营第三十五天 | 416.分割等和子集
  • 32、web前端开发之JavaScript(一)
  • 信誉好的做网站公司/soe搜索优化
  • 做网站的公司前三名/网站seo服务商
  • 付费阅读wordpress/seo外包优化网站
  • 兰州专业做网站/营销图片大全
  • 免费php源码资源网/网站seo优化分析
  • 网站优化排名如何做/百度搜索网站排名