当前位置: 首页 > wzjs >正文

产品服务展示型网站有哪些个人网站制作体会

产品服务展示型网站有哪些,个人网站制作体会,网站服务器租用协议,岳阳网站建设设计📌Python自定义消费Kafka至HDFS 当Flume消费Kafka出现问题间隔很长时间才发现,此时需要将历史未消费的数据,通过Python脚本重新消费并写入到指定位置,之后在数仓建表等操作,具体代码如下【Kafka --> Python脚本 --&…

📌Python自定义消费Kafka至HDFS

当Flume消费Kafka出现问题间隔很长时间才发现,此时需要将历史未消费的数据,通过Python脚本重新消费并写入到指定位置,之后在数仓建表等操作,具体代码如下【Kafka --> Python脚本 --> HDFS】


from kafka import KafkaConsumer
import json
from datetime import datetime
import time
from hdfs import InsecureClient##### 自定义消费KAFKA 数据 #####def consume_kafka_topic(bootstrap_servers, topic_name):"""消费指定Kafka Topic并处理消息(TXT格式)参数:bootstrap_servers (str): Kafka集群地址topic_name (str): 要消费的Topic名称"""# 创建HDFS客户端(需替换实际Hadoop地址和用户名)hdfs_client = InsecureClient('http://xxxxxxxxx:9870/', user='xxx')# 定义时间范围(毫秒级时间戳)start_time = int(datetime(2025, 3, 1).timestamp() * 1000)end_time = int(datetime(2025, 3, 31).timestamp() * 1000)# 创建Kafka消费者consumer = KafkaConsumer(topic_name,bootstrap_servers=bootstrap_servers,auto_offset_reset='earliest',enable_auto_commit=True,group_id='test-group4',value_deserializer=lambda x: json.loads(x.decode('utf-8')))buffer = []count = 0file_num = 1  # 文件序号print(f"开始消费Topic: {topic_name},时间范围: {datetime.fromtimestamp(start_time/1000)} ~ {datetime.fromtimestamp(end_time/1000)}")try:for message in consumer:# 提取消息时间(假设消息中的time字段已经是毫秒时间戳)msg_time = message.value.get('time', 0)# print(f"msg_time:{msg_time}")# print(f"msg_time:{msg_time}")# 时间过滤(注意:原始代码中的+28800可能需要根据实际情况调整时区)if start_time <= msg_time < end_time:# 将整个JSON对象转为字符串json_str = json.dumps(message.value, ensure_ascii=False) + "\n"  # 添加换行符buffer.append(json_str)count += 1# 达到30万条时写入文件if count >= 300000:filename = f"kafka_data_{file_num}.txt"hdfs_path = f"/tmp/mqtt_drive_data1/{filename}"# 写入HDFShdfs_client.write(hdfs_path, "".join(buffer).encode('utf-8-sig'), overwrite=True)print(f"已写入文件: hdfs://{hdfs_path} | 消息数: {count}")# 重置计数器和缓冲区buffer = []count = 0file_num += 1except KeyboardInterrupt:print("\n用户中断消费,正在保存剩余数据...")finally:# 保存剩余消息if count > 0:filename = f"kafka_data_{file_num}.txt"hdfs_path = f"/tmp/mqtt_drive_data1/{filename}"# 写入HDFShdfs_client.write(hdfs_path, "".join(buffer).encode('utf-8-sig'), overwrite=True)print(f"已写入剩余文件: hdfs://{hdfs_path} | 消息数: {count}")# 清理资源consumer.close()hdfs_client.close()print("消费任务完成")if __name__ == "__main__":# 配置参数KAFKA_SERVERS = "xxxxxxxx:9092,xxxxxxx:9092,xxxxxxx:9092"TARGET_TOPIC = "mqtt_drive"# 执行消费函数consume_kafka_topic(KAFKA_SERVERS, TARGET_TOPIC)


文章转载自:

http://yMspUEF5.ncwgt.cn
http://ATjXersQ.ncwgt.cn
http://hOuAhJ3o.ncwgt.cn
http://rhIDYHVH.ncwgt.cn
http://dkZwJ6K1.ncwgt.cn
http://5iDYyu2Z.ncwgt.cn
http://TsWZu6vu.ncwgt.cn
http://MevDPk4t.ncwgt.cn
http://391hfzv8.ncwgt.cn
http://v33oa8ws.ncwgt.cn
http://iakGRTGW.ncwgt.cn
http://pKuhKEx4.ncwgt.cn
http://9q8qxMac.ncwgt.cn
http://9XE2fwJd.ncwgt.cn
http://1t73Y7fc.ncwgt.cn
http://AS3u6DCM.ncwgt.cn
http://TOv92OBB.ncwgt.cn
http://ehIRUdvy.ncwgt.cn
http://aLS4ZMev.ncwgt.cn
http://KlnuJkO0.ncwgt.cn
http://bqw0OZCR.ncwgt.cn
http://jWsSuZR0.ncwgt.cn
http://tZ3OCUwu.ncwgt.cn
http://EbigZzsB.ncwgt.cn
http://c1tLhoyO.ncwgt.cn
http://LaYdzXt4.ncwgt.cn
http://9CUsBEeZ.ncwgt.cn
http://prkhL30F.ncwgt.cn
http://LtZpJiKG.ncwgt.cn
http://i3buZPxF.ncwgt.cn
http://www.dtcms.com/wzjs/764507.html

相关文章:

  • 贵州网站推广电话中国建筑设计研究院官网
  • iis6cgi php网站缓存网站建设提案
  • 潜江做网站哪家好免费网站建设网站有那些
  • 网站的结构包括哪些内容seo课程
  • 蒙文网站建设的意义黎平网站开发
  • 做健身俱乐部网站的目的和意义学大教育培训机构怎么样
  • 网站建设在360属于什么类目深圳服务网站入口
  • 网站需要哪些中国有没有一家做茶叶的网站
  • 做外贸的网站有哪几个今天的新闻联播内容
  • 网站没有被收录asp与sql网站建设
  • 昆山设计网站公司泉州网站建设技术支持
  • 昭通做网站网站建设人员分工
  • 商业网站域名常用的关键词挖掘工具
  • 建设网站公司怎么样进一步加强网站内容建设
  • 乡镇网站建设中的问题深圳开发网站建设哪家好
  • 网站是什么样子的安阳区号电话号码
  • 网站开发 超速云wordpress目录链接外链
  • 网站建设技术保证怎么写asp网站出现乱码
  • 注入漏洞网站源码各大网站博客怎么做推广
  • 大数据平台建站九讯鹿网站建设
  • 家用宽带做网站沧州网站建设微艾薇
  • 如何自己做资源网站单位做后盾工作总结
  • 怎么建设网站让国外看整站seo免费咨询
  • 个人网站设计模板中文西安网站建设网络推广
  • 代做一个网站多少钱小伙做钓鱼网站 背警方带走
  • 网站加载优化网站开发的技术方案
  • 优化网站搜索排名网站直播间 是怎么做的
  • 深圳网站制作网站建设怎么制作网站深圳博纳企业管理平台下载
  • 兰州响应式网站建设数字媒体ui设计是做什么的
  • 做网站如何突出网站特色建站售后服务