当前位置: 首页 > news >正文

Python脚本(Kafka生产者+消费者)

以mac为例

1)安装kafka : brew install kafka

2)检查安装路径:

brew --prefix kafka
brew --prefix zookeeper

3)启动服务(目录为本人电脑目录,可能与读者目录不一致)

3.1)启动zookeeper:zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

3.2)启动kafka: kafka-server-start /usr/local/etc/kafka/server.properties

4)topic

4.1)创建topic:

# 创建名为 test_topic 的 topic,1个分区,1个副本
kafka-topics --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic test_topic

注意:

根据版本选择参数:
Kafka 版本推荐创建 Topic 的方式参数使用
Kafka ≥ 2.2推荐使用 --bootstrap-server(更现代,不依赖 ZK)--bootstrap-server localhost:9092
Kafka < 2.2(比如 2.1.x、1.x)​必须使用 --zookeeper--zookeeper localhost:2181

4.2)查看topic:kafka-topics --list --bootstrap-server localhost:9092

旧版本:kafka-topics --list --zookeeper localhost:2181

5)python3代码(以kafka-python为例):

工具适用场景特点
confluent-kafka生产环境,高性能需安装 librdkafka,API 更底层,速度快
kafka-python学习、测试、开发纯 Python,无需额外依赖,简单易用

5.1)安装依赖:pip install kafka-python

5.2)生产者(producer_kafka_python.py)

from kafka import KafkaProducer
import timebootstrap_servers = 'localhost:9092'
topic = 'test_topic'producer = KafkaProducer(bootstrap_servers=bootstrap_servers,value_serializer=lambda v: v.encode('utf-8')
)for i in range(5):msg = f'Hello from kafka-python #{i}'print(f'发送: {msg}')future = producer.send(topic, value=msg)metadata = future.get(timeout=10)print(f"发送到 Topic: {metadata.topic}, Partition: {metadata.partition}, Offset: {metadata.offset}")time.sleep(1)producer.close()
生产者运行结果如下:

5.3)消费者(consumer_kafka_python.py)
from kafka import KafkaConsumerbootstrap_servers = 'localhost:9092'
topic = 'test_topic'
group_id = 'my-group-python'consumer = KafkaConsumer(topic,bootstrap_servers=bootstrap_servers,group_id=group_id,auto_offset_reset='earliest',value_deserializer=lambda x: x.decode('utf-8')
)print(f"开始消费主题 '{topic}' ...")for message in consumer:print(f"收到消息 -> Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")

消费者运行结果如下:

6)运行步骤总结:

  1. 安装 Kafka(通过 Homebrew)​
  2. 启动 Zookeeper
  3. 启动 Kafka Broker
  4. ​**​(可选)创建 Topic:test_topic
  5. ​运行 Python 生产者:python producer_kafka_python.py
  6. ​运行 Python 消费者:python consumer_kafka_python.py

7)confluent-kafka版python代码:

7.1)安装librdkafka : brew install librdkafka

7.2)安装confluent-kafka : pip3 install confluent-kafka

7.3)生产者 

7.4)消费者

http://www.dtcms.com/a/423292.html

相关文章:

  • 光伏出海加速!中国企业抢占全球市场
  • 网站排名提升软件网站制作先做数据库还是前台
  • 上海工程咨询行业协会网站优化内链怎么做
  • 基于 Vue 3 + TypeScript + TSX + Naive UI 的数据渲染后台管理系统模板开发方案
  • PostgreSQL向量检索:pgvector入门指南
  • 日本复古电影风格街拍人像摄影后期Lr调色教程,手机滤镜PS+Lightroom预设下载!
  • STM32CUBEMX + STM32L051C8T6 + RTC实时时钟 + 闹钟定时唤醒 + 周期唤醒 + STANDBY模式RTC唤醒
  • Redis高可用架构设计:主从复制、哨兵、Cluster集群模式深度对比
  • 网站上线需要哪些步骤站长 网站对比
  • 网站定制的销售情况桂林人网
  • 国外免费网站模板在哪里可以改动网站标题
  • 【笔记】在WPF中 BulletDecorator 的功能、使用方式并对比 HeaderedContentControl 与常见 Panel 布局的区别
  • 13. 初识 NVMe over RDMA
  • 分词和词向量的学习记录
  • QML学习笔记(二十)QML的自定义信号
  • 青岛网站建设方案咨询注册网站时审核是人工审核吗还是电脑审核
  • 【星海出品】直接映射方式
  • LeetCode 7.整数反转
  • 网站开发目前用的是什么语言seo相关ppt
  • C++ 函数指针、回调与 Lambda 全解析
  • UNIX下C语言编程与实践4-UNIX 编程环境搭建:三种安装方式(本机、虚拟机、网络终端)对比与实操
  • 辽宁平台网站建设公司万维网站注册
  • 网站建设div ass抖音代运营合作方案ppt
  • uni-app 开发H5软键盘会顶起底部内容的解决方案
  • Syslog日志集成搭建
  • 基于AI辅助工具的原创音乐创作实践研究——以Tunee首届音乐挑战赛作品《断掉的铜线》为例[特殊字符]
  • mysql数据库学习之用户权限管理(四)
  • 如何做网站网页流程粤icp备案号查询网官网
  • AI使用 Node.js modbus-serial 搭建一个可交互的 Modbus TCP 主站与从站 Demo
  • Websocket+cpolar:如何轻松实现服务远程访问?