当前位置: 首页 > wzjs >正文

成都 网站开发二十条优化措施

成都 网站开发,二十条优化措施,hugo 怎么做网站,偷渡美国做h网站文章目录 官方文档与kafka-python的对比配置文档配置项 Producer代码示例Consumer代码示例 官方文档 confluent_kafka API — confluent-kafka 2.8.0 documentation Quick Start for Confluent Cloud | Confluent Documentation 与kafka-python的对比 对比维度confluent-ka…

文章目录

  • 官方文档
  • 与kafka-python的对比
  • 配置
    • 文档
    • 配置项
  • Producer代码示例
  • Consumer代码示例

官方文档

confluent_kafka API — confluent-kafka 2.8.0 documentation

Quick Start for Confluent Cloud | Confluent Documentation

与kafka-python的对比

对比维度confluent-kafkakafka-python
性能表现基于librdkafka构建,处理大规模消息时,吞吐量高、延迟低,性能出色纯Python实现,受GIL限制,处理大量并发任务时存在性能瓶颈,高负载下消息处理速度较慢
功能特性具备丰富高级特性,如细粒度配置、复杂分区控制、消息压缩、安全认证等功能相对基础,能满足常见Kafka使用场景,高级特性支持不足
易用性功能丰富、配置选项多,学习曲线较陡,初学者上手难度大接口设计简单直观,易于理解和使用,初学者能快速上手
社区支持由Confluent公司维护,有专业团队和丰富资源,更新维护及时,社区活跃度高开源社区项目,社区较活跃,但资源和支持力度相对较弱
兼容性依赖librdkafka,在不同操作系统和环境中可能存在兼容性问题,需额外配置安装纯Python实现,兼容性好,在各种Python环境中可方便使用,无需额外依赖
适用场景适用于对性能要求高、需高级特性的大规模生产环境,如金融交易系统、实时数据处理平台适合开发和测试环境,以及对性能要求不高的小型项目,如简单日志收集系统、数据监控工具

配置

文档

https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md

配置项

字典类型, 配置字段如下:

bootstrap.servers: kafka服务地址, 以逗号分隔

statistics.interval.ms: 发送统计间隔, 需要注册一个统计毁掉函数, 默认值为0,表示禁用统计, 粒度为1000ms

security.protocol: 可选值为“plaintext, ssl, sasl_plaintext, sasl_ssl”, 默认值为“plaintext”, 交互协议

sasl.mechanisms: 用于认证的SASL机制, 支持“GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER”, 默认为“GSSAPI”

sasl.username: SASL用户名, 支持的认证机制为“PLAIN and SASL-SCRAM-…”

sasl.password: SASL密码, 支持的认证机制同sasl.username

group.id: 消费者组id

group.instance.id: 启用静态组成员关系, 静态组成员允许在配置的session.timeout.ms时间范围内, 离开或重新加入组而不会引起组内再平衡。这个使用时候最好配置一个大点的session.timeout.ms值, 可以避免组内再平衡引起的短暂的服务不可用。需要kafka server端版本号>=2.3.0

session.timeout.ms: 客户端组回话或检测失败超时时间, 默认值为45000ms

group.protocol: 使用的组协议, 可选值为“classic”和“consumer”, 当前默认值为classic, 以后版本更新默认值会改为consumer

max.poll.interval.ms: 默认值为30000, 高级消费者调用消费消息函数(例如 rd_kafka_consumer_poll ())之间允许的最大时间间隔。如果超过此时间间隔,消费者将被视为失败,并且消费者组将进行重新平衡,以便将分区重新分配给另一个消费者组成员。警告:此时可能无法进行偏移提交。注意:建议为长时间处理应用程序设置 enable.auto.offset.store=false,然后在消息处理后显式存储偏移(使用 offsets_store()),以确保在处理完成之前不会自动提交偏移。

enable.auto.commit: 默认值为true, 在后台自动并定期提交偏移量。注意:设置为false不会阻止使用者获取先前提交的起始偏移量。为了规避此行为,请在调用assign()时设置每个分区的特定起始偏移量。

auto.commit.interval.ms: 默认值为5000ms, 消费者偏移量被提交(写入)到偏移量存储的毫秒级频率。(0 = 禁用)。此设置由高级消费者使用。

enable.auto.offset.store: 默认值为true, 自动存储提供给应用程序的最后一条消息的偏移量。偏移量存储是每个分区下一个要(自动)提交的偏移量的内存存储。

linger.ms: 默认值为5ms, 在将消息发送前,等待生产者队列中的消息累积以构建消息批次(消息集)的毫秒级延迟。较高的值允许更大、更有效的(开销更小、压缩更好)消息批次累积,但会增加消息传递延迟。

retries: 默认值为2147483647, 消息发送失败的重试次数

retry.backoff.ms: 默认值为100,重试时间间隔, 指数级增长, 受 retry.backoff.max.ms 的限制。

batch.size: 默认值为1000000字节数, 消息批次的最大字节数

Producer代码示例

from random import choice
from confluent_kafka import Producerif __name__ == '__main__':config = {# User-specific properties that you must set'bootstrap.servers': '<BOOTSTRAP SERVERS>','sasl.username':     '<CLUSTER API KEY>','sasl.password':     '<CLUSTER API SECRET>',# Fixed properties'security.protocol': 'SASL_SSL','sasl.mechanisms':   'PLAIN','acks':              'all'}# Create Producer instanceproducer = Producer(config)# Optional per-message delivery callback (triggered by poll() or flush())# when a message has been successfully delivered or permanently# failed delivery (after retries).def delivery_callback(err, msg):if err:print('ERROR: Message failed delivery: {}'.format(err))else:print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))# Produce data by selecting random values from these lists.topic = "purchases"user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']count = 0for _ in range(10):user_id = choice(user_ids)product = choice(products)producer.produce(topic, product, user_id, callback=delivery_callback)count += 1# Block until the messages are sent.producer.poll(10000)producer.flush()

Consumer代码示例

from confluent_kafka import Consumerif __name__ == '__main__':config = {# User-specific properties that you must set'bootstrap.servers': '<BOOTSTRAP SERVERS>','sasl.username':     '<CLUSTER API KEY>','sasl.password':     '<CLUSTER API SECRET>',# Fixed properties'security.protocol': 'SASL_SSL','sasl.mechanisms':   'PLAIN','group.id':          'kafka-python-getting-started','auto.offset.reset': 'earliest'}# Create Consumer instanceconsumer = Consumer(config)# Subscribe to topictopic = "purchases"consumer.subscribe([topic])# Poll for new messages from Kafka and print them.try:while True:# 如果一次想拉取多个消息, 可以用consumer.consume方法, 该方法返回的是一个Message列表# msg_list = consumer.consume(num_messages=消息数量, timeout=如果没有消息, 最长等待的超时时间msg = consumer.poll(1.0)if msg is None:# Initial message consumption may take up to# `session.timeout.ms` for the consumer group to# rebalance and start consumingprint("Waiting...")elif msg.error():print("ERROR: %s".format(msg.error()))else:# Extract the (optional) key and value, and print.print("Consumed event from topic {topic}: key = {key:12} value = {value:12}".format(topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))except KeyboardInterrupt:passfinally:# Leave group and commit final offsetsconsumer.close()
http://www.dtcms.com/wzjs/102113.html

相关文章:

  • 大网站都开放自己的cms系统新郑网络推广
  • 织梦做的网站网速打开慢是怎么回事百度推广登录手机版
  • 常州网站推广机构做百度推广怎么做才能有电话
  • 什么网站做前端练手好上海外包seo
  • 重庆网站建设制作设计公司哪家好自助网站建设平台
  • 啥也不懂怎么建设网站郑州网络运营培训
  • wordpress page post网站seo服务商
  • 网络推广网站大全软件推广接单平台
  • 中卫网站设计公司有哪些信息流广告投放工作内容
  • 网站营销怎么做自媒体
  • 四川网站备案营销软文是什么
  • 那里有网站建设电子商务
  • 哪里可以学网站开发网页搜索排名提升
  • 如何在自己的网站上做歌单网站广告制作
  • 互动网站建设网站开发技术有哪些
  • 时尚大气网站设计关键词歌词简谱
  • 成都市建设网站首页北京关键词优化服务
  • 营销型 展示类网站东莞公司seo优化
  • 大连凯杰建设有限公司网站产品营销策略怎么写
  • scala做网站无锡百度竞价
  • 红色系列的网站网络营销有哪些模式
  • 网站建设seo优化的好处如何让百度能查到自己
  • 如何自己做代理网站的想法品牌网络seo方案外包
  • 做学科竞赛的网站购物网站如何推广
  • avada 做的网站全媒体广告投放平台
  • 南阳市网站制作网络广告电话
  • 青岛建设集团建兴工程有限公司南阳网站优化公司
  • 怎么做卖外挂网站免费的找百度
  • iis做网站的流程seo关键词优化如何
  • 重庆集团网站建设seo领导屋