当前位置: 首页 > wzjs >正文

如何自己做网站手机软件114啦网址导航官网

如何自己做网站手机软件,114啦网址导航官网,ftp给网站做备份,网站建设网络推广微信网站安装依赖 在开始之前,需要安装 kafka-python 库。可以通过以下命令安装: pip install kafka-python创建生产者 生产者负责将消息发送到 Kafka 主题。以下是一个简单的生产者示例: from kafka import KafkaProducer import json import ti…

安装依赖

在开始之前,需要安装 kafka-python 库。可以通过以下命令安装:

pip install kafka-python

创建生产者

生产者负责将消息发送到 Kafka 主题。以下是一个简单的生产者示例:

from kafka import KafkaProducer
import json
import time# 配置 Kafka 服务器地址和序列化方式
producer = KafkaProducer(bootstrap_servers='****:9092',  # Kafka Broker 地址value_serializer=lambda v: json.dumps(v).encode('utf-8'),  # JSON 序列化acks='all',  # 确保消息被所有副本确认retries=3  # 失败重试次数
)# 发送单条消息
producer.send('testTopic',  # 目标主题value={'message': 'Hello Kafka', 'timestamp': int(time.time())}
)# 批量发送消息(示例发送10条)
messages = [{'id': i, 'data': f'Message {i}'} for i in range(10)]
for msg in messages:producer.send('testTopic', value=msg)time.sleep(0.1)  # 控制发送频率防止阻塞# 确保所有消息发送完成并关闭连接
producer.flush(timeout=10)
producer.close()

创建消费者

消费者负责从 Kafka 主题中读取消息。以下是一个简单的消费者示例:

from kafka import KafkaConsumer
import json# 配置消费者组和反序列化方式
consumer = KafkaConsumer('testTopic',  # 订阅主题bootstrap_servers='****:9092',group_id='my_consumer_group',  # 消费者组(同一组共享消息)auto_offset_reset='earliest',  # 从最早未消费的消息开始读取value_deserializer=lambda x: json.loads(x.decode('utf-8'))  # JSON 反序列化
)# 持续消费消息
try:for message in consumer:print(f"""收到消息:主题: {message.topic}分区: {message.partition}偏移量: {message.offset}内容: {message.value}""")
except KeyboardInterrupt:print("用户中断操作")
finally:consumer.close()  # 关闭消费者连接

Kafka 服务器

运行 Kafka 服务器,快速启动 Kafka 和 Zookeeper:

bin/kafka-server-start.sh -daemon config/server.properties

创建指定topic的kafka的分区

./kafka-topics.sh --alter --topic testTopic --partitions 2 --bootstrap-server localhost:9092

 查询指定topic的kafka分区

./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic testTopic

通过以上步骤,可以快速构建一个基于 Kafka 的消息队列系统,并在 Python 中实现消息的生产和消费。

注意:

1、kafka广播模式:

不同的分组(group_id参数不同)收到的数据是一致的,类似于广播模式

2、kafka路由模式:

同一分组(group_id参数相同)对同一批数据进行处理,如果kafka服务器的分区数量大于该分组内的消费者数量,则每个消费者可以分到一些分区,每个消费者去处理自己对应分区里面的数据,类似于路由模式下的多消费者情形,如果消费者数量大于分区数,则多出来的消费者是被闲置的

http://www.dtcms.com/wzjs/443301.html

相关文章:

  • 天河商城网站建设网络优化app哪个好
  • 南通网站建设排名公司哪家好正规的培训学校
  • 电影网站怎么做关键词百度排行榜前十名
  • seo服务的内容优化网络的软件下载
  • 旅游网站开发毕业论文网站注册账号
  • 高端网站首页电商软文范例
  • 网站建设策划ppt关键词搜索引擎优化推广
  • 网站建设毕业论文做游戏推广一个月能拿多少钱
  • 国内顶尖工业设计公司鹤岗网站seo
  • 武汉市新洲区做网站制作网站代码
  • 如何学做网站全套教程新浪微指数
  • 中小微企业名录查询安卓优化神器
  • 网站建设人员组成百度店铺怎么开通
  • 网站开发百度云百度广告推广电话
  • 哪个公司网站做的好最好用的搜索引擎排名
  • 网站怎么做防劫持培训加盟
  • 电商网站 建设长沙网络营销公司
  • 杭州学校网站建设福州seo推广
  • 浏览器的网站通知怎么做百度知道提问
  • 玖壹购网站是做啥子的百度引擎搜索
  • 来广营做网站公司百度推广营销方案
  • 新手自学做网站多久软件推广是什么工作
  • 学做网站去哪学网站seo检测
  • 如何给网站流量来源做标记通过在网址后边加问号?国内最好的危机公关公司
  • 漳州网站开发制作棋牌今天国际新闻
  • 个人网站运营怎么做百度seo是啥意思
  • 宜昌网站建设开发费用内容营销策略
  • 德清网站制作网站设计的毕业论文
  • php网站后台模版网站播放视频速度优化
  • 疫情防控政策优化优化大师网页版