当前位置: 首页 > news >正文

cacti导出的1分钟监控数据csv文件读取并按5分钟求平均值,然后计算95计费值,假设31天的月份

cacti导出的1分钟监控数据csv文件读取并按5分钟求平均值,然后计算95计费值,假设31天的月份

import pandas as pd
import openpyxl
from openpyxl.styles import Font
from openpyxl.utils.dataframe import dataframe_to_rows
import os
import chardet
import numpy as np# 文件路径
file_path = r'E:\data\feishu\BGP-CT-SN-同道新龙-9K-IN-AG.csv'
output_dir = r'E:\data\feishu'
output_file = os.path.join(output_dir, "处理结果.xlsx")# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)print(f"开始处理文件: {file_path}")# 第一步:检测文件编码
def detect_encoding(file_path):print("检测文件编码...")with open(file_path, 'rb') as f:rawdata = f.read(10000)result = chardet.detect(rawdata)encoding = result['encoding']confidence = result['confidence']print(f"检测到编码: {encoding} (置信度: {confidence:.2%})")return encoding# 第二步:读取文件并正确分割列
try:# 检测文件编码file_encoding = detect_encoding(file_path)# 如果置信度低,尝试常见中文编码if file_encoding is None or file_encoding.lower() == 'ascii':print("编码检测置信度低,尝试常见中文编码")for encoding in ['gb18030', 'gbk', 'big5', 'utf-8']:try:# 读取前5行检查实际分隔符with open(file_path, 'r', encoding=encoding) as f:sample_lines = [f.readline().strip() for _ in range(5)]# 分析最可能的分隔符possible_seps = [',', '\t', ';', '|']sep_counts = {sep: line.count(sep) for sep in possible_seps for line in sample_lines}most_common_sep = max(sep_counts, key=sep_counts.get)print(f"检测到最可能的分隔符: '{most_common_sep}'")# 使用检测到的分隔符读取文件df = pd.read_csv(file_path, sep=most_common_sep, encoding=encoding, header=None)print(f"成功使用 {encoding} 编码和分隔符 '{most_common_sep}' 读取文件")# 手动设置列名(根据列数)if df.shape[1] == 3:df.columns = ['Date', 'Total IN-MAX', 'Total OUT-MAX']print("手动设置列名为: Date, Total IN-MAX, Total OUT-MAX")else:raise Exception(f"列数异常: {df.shape[1]}列,应为3列")breakexcept Exception as e:print(f"尝试 {encoding} 编码失败: {e}")continueelse:raise Exception("无法使用常见中文编码读取文件")else:# 读取前5行检查实际分隔符with open(file_path, 'r', encoding=file_encoding) as f:sample_lines = [f.readline().strip() for _ in range(5)]# 分析最可能的分隔符possible_seps = [',', '\t', ';', '|']sep_counts = {sep: line.count(sep) for sep in possible_seps for line in sample_lines}most_common_sep = max(sep_counts, key=sep_counts.get)print(f"检测到最可能的分隔符: '{most_common_sep}'")# 使用检测到的分隔符读取文件df = pd.read_csv(file_path, sep=most_common_sep, encoding=file_encoding, header=None)print(f"使用检测到的编码 {file_encoding} 和分隔符 '{most_common_sep}' 成功读取文件")# 手动设置列名(根据列数)if df.shape[1] == 3:df.columns = ['Date', 'Total IN-MAX', 'Total OUT-MAX']print("手动设置列名为: Date, Total IN-MAX, Total OUT-MAX")else:raise Exception(f"列数异常: {df.shape[1]}列,应为3列")
except Exception as e:print(f"文件读取失败: {e}")print("尝试使用错误恢复模式读取...")try:# 尝试使用错误恢复模式df = pd.read_csv(file_path, sep=None, engine='python', encoding='gb18030',on_bad_lines='skip', header=None)print("使用错误恢复模式成功读取文件(可能丢失部分数据)")# 手动设置列名(根据列数)if df.shape[1] == 3:df.columns = ['Date', 'Total IN-MAX', 'Total OUT-MAX']print("手动设置列名为: Date, Total IN-MAX, Total OUT-MAX")else:raise Exception(f"列数异常: {df.shape[1]}列,应为3列")except:raise Exception("无法读取文件,请手动检查文件格式和编码")print(f"成功读取数据,行数: {len(df)}")
print("df.columns:", df.columns)# 检查必要列是否存在
required_columns = ['Date', 'Total IN-MAX', 'Total OUT-MAX']
print("检查必要列...")
for col in df.columns:print(f'列名: {col} (类型: {type(col)})')if not all(col in df.columns for col in required_columns):missing = [col for col in required_columns if col not in df.columns]raise Exception(f"文件缺少必要列: {missing}")
else:print("所有必要列都存在")# 转换数值列为浮点数
print("转换数值列为浮点数...")
for col in ['Total IN-MAX', 'Total OUT-MAX']:# 先尝试直接转换df[col] = pd.to_numeric(df[col], errors='coerce')# 检查转换失败的行failed_conversions = df[col].isna().sum()if failed_conversions > 0:print(f"警告: {col} 列有 {failed_conversions} 个值无法直接转换")# 尝试更复杂的转换(处理科学计数法)def try_convert(val):try:return float(val)except:# 尝试处理可能连接在一起的多个数值if isinstance(val, str) and 'e+' in val:parts = []current = ""for char in val:if char in '0123456789.e+-':current += charelif current:try:parts.append(float(current))current = ""except:current = ""if current:try:parts.append(float(current))except:passif parts:return np.mean(parts)return np.nan# 应用复杂转换df[col] = df[col].apply(try_convert)# 再次检查失败转换still_failed = df[col].isna().sum()if still_failed > 0:print(f"仍有 {still_failed} 个值无法转换,将被设为NaN")# 重命名列以便后续处理
df = df.rename(columns={'Date': '日期','Total IN-MAX': 'Total_in','Total OUT-MAX': 'Total_out'
})# 转换日期列
print("处理日期列...")
try:# 尝试多种日期格式df['日期'] = pd.to_datetime(df['日期'], format='%Y/%m/%d %H:%M', errors='coerce')if df['日期'].isna().any():print("尝试替代日期格式...")df['日期'] = pd.to_datetime(df['日期'], format='%Y-%m-%d %H:%M:%S', errors='coerce')# 检查是否有无效日期invalid_dates = df['日期'].isna().sum()if invalid_dates > 0:print(f"警告: 发现 {invalid_dates} 个无效日期")# 删除无效日期的行df = df.dropna(subset=['日期'])print(f"删除无效日期行后剩余: {len(df)} 行")
except Exception as e:print(f"日期转换错误: {e}")raise# 设置日期为索引并排序
df.set_index('日期', inplace=True)
df.sort_index(inplace=True)  # 确保时间顺序正确
print(f"处理后数据点数: {len(df)}")# 每5分钟重采样求平均值 - 使用新的频率表示法
print("进行5分钟重采样...")
resampled = df.resample('5min').mean()
resampled.columns = ['Total_in_avg', 'Total_out_avg']# 计算每个5分钟区间的最大值
resampled['max_value'] = resampled[['Total_in_avg', 'Total_out_avg']].max(axis=1)# 对所有5分钟区间的最大值进行降序排序
sorted_max = resampled['max_value'].sort_values(ascending=False).reset_index()
sorted_max.columns = ['时间区间', '降序最大值']# 创建Excel工作簿
print("创建Excel工作簿...")
wb = openpyxl.Workbook()# 工作表1: 原始数据
ws1 = wb.active
ws1.title = "原始数据"
for r in dataframe_to_rows(df.reset_index(), index=False, header=True):ws1.append(r)# 工作表2: 5分钟平均值和最大值
ws2 = wb.create_sheet("5分钟平均值")
# 准备数据
avg_data = resampled.reset_index()
avg_data.columns = ['时间区间', 'Total_in_avg', 'Total_out_avg', 'max_value']
for r in dataframe_to_rows(avg_data, index=False, header=True):ws2.append(r)# 工作表3: 降序最大值序列
ws3 = wb.create_sheet("降序最大值")
for r in dataframe_to_rows(sorted_max, index=False, header=True):ws3.append(r)# 标记第447个值为红色(如果存在)
if len(sorted_max) >= 447:# 第447行(表头占第1行,数据从第2行开始)target_row = 447 + 1ws3.cell(row=target_row, column=2).font = Font(color="FF0000", bold=True)# 添加注释说明ws3.cell(row=1, column=3, value="说明")ws3.cell(row=2, column=3, value=f"第447个最大值已用红色标记 (值={sorted_max.iloc[446]['降序最大值']:.2e})")ws3.cell(row=target_row, column=3, value="← 第447个最大值")print(f"已标记第447个值: {sorted_max.iloc[446]['降序最大值']:.2e}")
else:print(f"警告: 数据不足447个点 (只有{len(sorted_max)}个点)")# 自动调整列宽
print("调整列宽...")
for sheet in wb.sheetnames:ws = wb[sheet]for col_idx, column in enumerate(ws.columns, 1):max_length = 0for cell in column:try:value = str(cell.value) if cell.value is not None else ""if len(value) > max_length:max_length = len(value)except:passadjusted_width = min(max_length + 2, 50)  # 限制最大宽度为50ws.column_dimensions[openpyxl.utils.get_column_letter(col_idx)].width = adjusted_width# 保存Excel文件
print(f"保存Excel文件到: {output_file}")
wb.save(output_file)
print("处理完成!")
print("\n=== 处理结果统计 ===")
print(f"原始数据点数: {len(df)}")
print(f"5分钟区间数: {len(resampled)}")
print(f"最大值序列长度: {len(sorted_max)}")
if len(sorted_max) >= 447:print(f"第447个最大值: {sorted_max.iloc[446]['降序最大值']:.2e}")

相关文章:

  • yolov12 训练json格式
  • 数据安全合规体系构建的“三道防线“
  • 百度云盘 vs Zoho网盘:哪个更适合作为同步盘?
  • Cursor配置python解释器方法
  • 《当AutoScheduler遇见边缘端:Apache TVM如何重塑模型算子的极限》
  • LeetCode 300 最长递增子序列
  • 沟通频率不合适,如何找到平衡点
  • [特殊字符] Unity UI 性能优化终极指南 — ScrollRect篇
  • 灵光一现的问题和常见错误4
  • 安全编码规范与标准:对比与分析及应用案例
  • Spring Boot使用Redis实现分布式锁
  • SpringBoot 和 Spring 的区别是什么?
  • vue-15 (实践练习:使用路由防护实现身份验证和授权)
  • LeetCode hot100-11
  • Silky-CTF: 0x02靶场
  • Linux中断与异常:内核的事件驱动引擎
  • 接口测试的用例设计
  • 2025年浙江安全员C证考试题库
  • 基于langchain的简单RAG的实现
  • 12、企业应收账款(AR)全流程解析:从发票开具到回款完成
  • 做门户网站代码质量方面具体需要注意什么/搜狗seo培训
  • 免费做推广的网站/厦门搜索引擎优化
  • 合肥城乡建设网站首页/网络营销的应用
  • 河北手机网站制作哪家好/网页制作软件推荐
  • 网站建设包括什么/seo 优化教程
  • 中企动力做网站/青岛网站开发公司