当前位置: 首页 > news >正文

找网络公司做网站需要注意什么推广软文范例大全500

找网络公司做网站需要注意什么,推广软文范例大全500,网站建设公司安丘市,江门网站设计制作📌Python自定义消费Kafka至HDFS 当Flume消费Kafka出现问题间隔很长时间才发现,此时需要将历史未消费的数据,通过Python脚本重新消费并写入到指定位置,之后在数仓建表等操作,具体代码如下【Kafka --> Python脚本 --&…

📌Python自定义消费Kafka至HDFS

当Flume消费Kafka出现问题间隔很长时间才发现,此时需要将历史未消费的数据,通过Python脚本重新消费并写入到指定位置,之后在数仓建表等操作,具体代码如下【Kafka --> Python脚本 --> HDFS】


from kafka import KafkaConsumer
import json
from datetime import datetime
import time
from hdfs import InsecureClient##### 自定义消费KAFKA 数据 #####def consume_kafka_topic(bootstrap_servers, topic_name):"""消费指定Kafka Topic并处理消息(TXT格式)参数:bootstrap_servers (str): Kafka集群地址topic_name (str): 要消费的Topic名称"""# 创建HDFS客户端(需替换实际Hadoop地址和用户名)hdfs_client = InsecureClient('http://xxxxxxxxx:9870/', user='xxx')# 定义时间范围(毫秒级时间戳)start_time = int(datetime(2025, 3, 1).timestamp() * 1000)end_time = int(datetime(2025, 3, 31).timestamp() * 1000)# 创建Kafka消费者consumer = KafkaConsumer(topic_name,bootstrap_servers=bootstrap_servers,auto_offset_reset='earliest',enable_auto_commit=True,group_id='test-group4',value_deserializer=lambda x: json.loads(x.decode('utf-8')))buffer = []count = 0file_num = 1  # 文件序号print(f"开始消费Topic: {topic_name},时间范围: {datetime.fromtimestamp(start_time/1000)} ~ {datetime.fromtimestamp(end_time/1000)}")try:for message in consumer:# 提取消息时间(假设消息中的time字段已经是毫秒时间戳)msg_time = message.value.get('time', 0)# print(f"msg_time:{msg_time}")# print(f"msg_time:{msg_time}")# 时间过滤(注意:原始代码中的+28800可能需要根据实际情况调整时区)if start_time <= msg_time < end_time:# 将整个JSON对象转为字符串json_str = json.dumps(message.value, ensure_ascii=False) + "\n"  # 添加换行符buffer.append(json_str)count += 1# 达到30万条时写入文件if count >= 300000:filename = f"kafka_data_{file_num}.txt"hdfs_path = f"/tmp/mqtt_drive_data1/{filename}"# 写入HDFShdfs_client.write(hdfs_path, "".join(buffer).encode('utf-8-sig'), overwrite=True)print(f"已写入文件: hdfs://{hdfs_path} | 消息数: {count}")# 重置计数器和缓冲区buffer = []count = 0file_num += 1except KeyboardInterrupt:print("\n用户中断消费,正在保存剩余数据...")finally:# 保存剩余消息if count > 0:filename = f"kafka_data_{file_num}.txt"hdfs_path = f"/tmp/mqtt_drive_data1/{filename}"# 写入HDFShdfs_client.write(hdfs_path, "".join(buffer).encode('utf-8-sig'), overwrite=True)print(f"已写入剩余文件: hdfs://{hdfs_path} | 消息数: {count}")# 清理资源consumer.close()hdfs_client.close()print("消费任务完成")if __name__ == "__main__":# 配置参数KAFKA_SERVERS = "xxxxxxxx:9092,xxxxxxx:9092,xxxxxxx:9092"TARGET_TOPIC = "mqtt_drive"# 执行消费函数consume_kafka_topic(KAFKA_SERVERS, TARGET_TOPIC)

http://www.dtcms.com/a/476473.html

相关文章:

  • 怎么给网站做优化网页空间的利用要
  • 珠海中小企业网站建设网上商城购物系统er图
  • 网页跟网站的区别微博图床wordpress
  • 做网站投广告赚钱么花卉公司网页设计
  • 怎么做几个版面的网站广州网站开发定制
  • 免备案网站网页设计师培训费
  • 优化推广网站怎么做网页视频怎么下载不了
  • 路由器端口转发做网站访问量如何做简洁网站设计
  • 品牌包装建设网站wordpress 更换模板
  • pc网站自动生成app自如网站做的好 服务
  • 企业网站产品分类多怎么做seo做h5页面网站有哪些
  • 加强门户网站建设通知吉林电商网站建设费用
  • 赣州宏达网站建设wordpress媒体库不显示图片
  • 网站到期了怎么办做班级网站的实训报告
  • 手机网站开发库太原网络推广代理公司
  • 做网站的服务器多少钱做企业网站还有市场吗
  • 建设网站培训的pptxuzhou公司网站制作
  • 重庆制作网站软件像网站分类一样的表格图怎么做
  • wordpress sql root系统优化建议
  • 大型网站开发公司个人网站 icp
  • asp网站图片专门做折扣的网站
  • 东莞市网站建设服务机构网站建设费用核算
  • 登封做网站优化介绍一个公司的ppt
  • 电子商务网站开发 pdf沭阳做网站公司排名前十
  • 做网站是怎么赚钱吗企业网站备案需要法人拍照吗
  • 如何查看一个网站的访问量海外短视频服务器
  • 专门做动漫的网站吗保定网络公司网站
  • 济南建设个人网站平台织梦模板网站怎么上线
  • 网站左侧树形导航怎么做在县城做商城网站
  • 艾特思成都网站建设怎么建立一个小说网站