当前位置: 首页 > wzjs >正文

太原网站制作多少钱怎么样增加网站权重

太原网站制作多少钱,怎么样增加网站权重,wordpress全站启用ssl张戈,化工产品网站建设📌Python自定义消费Kafka至HDFS 当Flume消费Kafka出现问题间隔很长时间才发现,此时需要将历史未消费的数据,通过Python脚本重新消费并写入到指定位置,之后在数仓建表等操作,具体代码如下【Kafka --> Python脚本 --&…

📌Python自定义消费Kafka至HDFS

当Flume消费Kafka出现问题间隔很长时间才发现,此时需要将历史未消费的数据,通过Python脚本重新消费并写入到指定位置,之后在数仓建表等操作,具体代码如下【Kafka --> Python脚本 --> HDFS】


from kafka import KafkaConsumer
import json
from datetime import datetime
import time
from hdfs import InsecureClient##### 自定义消费KAFKA 数据 #####def consume_kafka_topic(bootstrap_servers, topic_name):"""消费指定Kafka Topic并处理消息(TXT格式)参数:bootstrap_servers (str): Kafka集群地址topic_name (str): 要消费的Topic名称"""# 创建HDFS客户端(需替换实际Hadoop地址和用户名)hdfs_client = InsecureClient('http://xxxxxxxxx:9870/', user='xxx')# 定义时间范围(毫秒级时间戳)start_time = int(datetime(2025, 3, 1).timestamp() * 1000)end_time = int(datetime(2025, 3, 31).timestamp() * 1000)# 创建Kafka消费者consumer = KafkaConsumer(topic_name,bootstrap_servers=bootstrap_servers,auto_offset_reset='earliest',enable_auto_commit=True,group_id='test-group4',value_deserializer=lambda x: json.loads(x.decode('utf-8')))buffer = []count = 0file_num = 1  # 文件序号print(f"开始消费Topic: {topic_name},时间范围: {datetime.fromtimestamp(start_time/1000)} ~ {datetime.fromtimestamp(end_time/1000)}")try:for message in consumer:# 提取消息时间(假设消息中的time字段已经是毫秒时间戳)msg_time = message.value.get('time', 0)# print(f"msg_time:{msg_time}")# print(f"msg_time:{msg_time}")# 时间过滤(注意:原始代码中的+28800可能需要根据实际情况调整时区)if start_time <= msg_time < end_time:# 将整个JSON对象转为字符串json_str = json.dumps(message.value, ensure_ascii=False) + "\n"  # 添加换行符buffer.append(json_str)count += 1# 达到30万条时写入文件if count >= 300000:filename = f"kafka_data_{file_num}.txt"hdfs_path = f"/tmp/mqtt_drive_data1/{filename}"# 写入HDFShdfs_client.write(hdfs_path, "".join(buffer).encode('utf-8-sig'), overwrite=True)print(f"已写入文件: hdfs://{hdfs_path} | 消息数: {count}")# 重置计数器和缓冲区buffer = []count = 0file_num += 1except KeyboardInterrupt:print("\n用户中断消费,正在保存剩余数据...")finally:# 保存剩余消息if count > 0:filename = f"kafka_data_{file_num}.txt"hdfs_path = f"/tmp/mqtt_drive_data1/{filename}"# 写入HDFShdfs_client.write(hdfs_path, "".join(buffer).encode('utf-8-sig'), overwrite=True)print(f"已写入剩余文件: hdfs://{hdfs_path} | 消息数: {count}")# 清理资源consumer.close()hdfs_client.close()print("消费任务完成")if __name__ == "__main__":# 配置参数KAFKA_SERVERS = "xxxxxxxx:9092,xxxxxxx:9092,xxxxxxx:9092"TARGET_TOPIC = "mqtt_drive"# 执行消费函数consume_kafka_topic(KAFKA_SERVERS, TARGET_TOPIC)

http://www.dtcms.com/wzjs/534773.html

相关文章:

  • 自助网站能在百度上搜到么网址查询地址查询站长之家
  • 天津河北区做网站官网微信
  • 博客和网站的区别服务器价格一览表
  • 图片生成二维码软件免费建站网站 seo
  • 欧美个人网站网站群 seo
  • 有多少种做网站后台程序加强会计师事务所品牌建设
  • 产品外观设计网站代练平台
  • 网站26个页面收费台州网站建设找哪家好点
  • 我的世界做指令的网站wordpress前端登陆
  • 专业的网站制作专业公司福建龙岩天宫山
  • 虹口品牌网站建设软件开发工具的基本功能是什么
  • 网站基本参数设置模块注册一家公司
  • 贵阳建设网站软文范例大全500字
  • 自己搭建一个博客网站站内推广有哪些具体方式
  • 网站专题页面设计网站开发应该怎么学
  • 制作游戏的软件app网站建设优化服务信息
  • 海淘返利网站怎么做wordpress菠菜插件
  • 网站建设的相关知识白山做网站
  • 自助创建网站怎么将自己做的网站发到网上去
  • 网站关闭备案适合小县城开的加盟店
  • 高效网站推广设计如何做一个二维码相册
  • 网站后台数据采集建设网站基础知识
  • 海南省建设与执业资格注册中心网站彩虹云免费主机
  • 网站备案号查询网址网站如何运作
  • 吉林省住房城乡建设厅网站首页特价手机网站建设
  • 工信部网站备案进度查询阿里云机器怎么做网站
  • 门户网站和微网站的区别淘宝交易指数换算工具
  • 建设一个网站需要考虑什么南京高端网站定制
  • 建设银行天津招聘网站怎样建设购物网站
  • 网站开发及服务合同模板什么网站赚的钱最多