当前位置: 首页 > wzjs >正文

手机靓号网站制作北京百度网讯科技有限公司

手机靓号网站制作,北京百度网讯科技有限公司,怎么做才能提升网站收录,海淀教育互动平台用LLM AI完成tool开发。 利用ftrace的调度trace event (sched_switch 和 sched_stat_runtime) 进行cpu loading和task loading的统计分析,以及可视化展示。 项目名称 Ftrace 日志分析器 项目概述 此项目是一个用于分析 Ftrace 日志的 Python 脚本,它能够…

用LLM AI完成tool开发。


利用ftrace的调度trace event (sched_switch 和 sched_stat_runtime) 进行cpu loading和task loading的统计分析,以及可视化展示。

项目名称

Ftrace 日志分析器

项目概述

此项目是一个用于分析 Ftrace 日志的 Python 脚本,它能够解析 Ftrace 日志文件,统计每个线程和 CPU 的使用情况,并生成包含 CPU 负载曲线和线程使用情况表格的 HTML 报告。

功能特性

  • 日志解析:解析 Ftrace 日志文件,提取关键信息,如线程名、进程 ID、CPU 编号、时间戳和事件类型。
  • 统计信息:统计每个线程在各个 CPU 上的运行时间,以及每个 CPU 的总活跃时间和空闲时间。
  • 可视化报告:生成 HTML 报告,包含 CPU 负载曲线和线程使用情况表格,方便用户直观地查看系统的 CPU 使用情况。

安装依赖

运行此脚本前,需要安装以下 Python 库:

  • numpy
  • bokeh

可以使用以下命令进行安装:pip install numpy bokeh

使用的 Ftrace 事件

本工具主要使用以下两个 Ftrace 事件进行分析:

  • sched_switch:该事件在进程切换时触发,记录了从一个进程切换到另一个进程的相关信息。通过解析这个事件,可以追踪每个线程在不同 CPU 上的运行时间,以及 CPU 的空闲时间。
  • sched_stat_runtime:该事件用于统计进程的运行时间。通过这个事件,可以获取每个线程的运行时长,进一步完善线程的统计信息。

使用 trace-cmd 抓取 Ftrace 日志

trace-cmd 是一个用于控制 Ftrace 功能的命令行工具,以下是使用 trace-cmd 抓取所需 Ftrace 日志的步骤:

1. 安装 trace-cmd

如果你还没有安装 trace-cmd,可以使用包管理器进行安装。例如,在 Ubuntu 上可以使用以下命令:sudo apt-get install trace-cmd

2. 配置跟踪事件

此脚本需要跟踪 sched_switch 和 sched_stat_runtime 事件,你可以使用以下命令启用这些事件:sudo trace-cmd record -e sched_switch -e sched_stat_runtime

3. 开始记录日志

运行上述命令后,trace-cmd 会开始记录指定事件的日志。你可以让系统运行一段时间,以收集足够的日志数据。

4. 停止记录日志

当你认为已经收集到足够的日志数据时,可以按下 Ctrl + C 停止记录。trace-cmd 会将日志保存到一个文件中,默认文件名是 trace.dat

5. 转换日志文件格式

由于脚本需要的是文本格式的日志文件,你可以使用以下命令将 trace.dat 转换为文本格式:sudo trace-cmd report -i trace.dat > ftrace.log

使用方法

  1. 确保已经安装了所需的依赖库。
  2. 使用 trace-cmd 抓取 Ftrace 日志,并将其保存为文本文件(如 ftrace.log)。
  3. 运行脚本,命令如下:python ftrace_analyzer.py <ftrace.log>其中 <ftrace.log> 是你要分析的 Ftrace 日志文件的路径。
  4. 脚本运行完成后,会在当前目录下生成一个名为 report.html 的 HTML 报告文件,打开该文件即可查看分析结果。

代码结构

  • 类型定义:定义了一些数据类型,如 TimestampCPUID 和 PID,用于明确代码中变量的含义。
  • 日志解析模块FtraceParser 类负责解析 Ftrace 日志文件的每一行,将其转换为结构化的数据。
  • 事件处理模块EventProcessor 类处理 sched_switch 和 sched_stat_runtime 两种事件,统计线程和 CPU 的使用情况。
  • 报告生成模块ReportGenerator 类生成 HTML 报告,包含 CPU 负载曲线和线程使用情况表格。
  • 主流程analyze_ftrace 函数是主分析流程,读取日志文件,调用解析和处理函数,最后生成报告。

代码优化

在 ReportGenerator.generate_html_report 方法中,使用了滑动窗口算法来计算 CPU 负载曲线,使曲线更加平滑,粒度更细致。

注意事项

  • 确保输入的 Ftrace 日志文件格式正确,否则可能会导致解析失败。
  • 报告文件 report.html 会覆盖当前目录下已有的同名文件,请谨慎操作。
  • 使用 trace-cmd 抓取日志需要 root 权限,请确保你有足够的权限执行相关命令。

贡献与反馈

如果你发现任何问题或有改进建议,欢迎提交 issue 或 pull request。

1.Bokeh

import re
import datetime
import numpy as np
import logging
from collections import defaultdict
from typing import Dict, Tuple, Optional, Listfrom bokeh.plotting import figure, output_file, save
from bokeh.models import ColumnDataSource, DataTable, TableColumn, HoverTool
from bokeh.layouts import column# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)# ----------------------
# 类型定义
# ----------------------
Timestamp = float  # 时间戳(秒)
CPUID = int  # CPU标识符
PID = int  # 进程IDclass ThreadStats:"""线程统计数据结构"""def __init__(self):self.name: str = ""  # 线程名self.cpu_time: Dict[CPUID, float] = defaultdict(float)  # 各CPU累计时间self.total_time: float = 0.0  # 总运行时间class CPUStats:"""CPU统计数据结构"""def __init__(self):self.total_active: float = 0.0  # 总活跃时间(非swapper线程)self.total_idle: float = 0.0  # 总空闲时间(swapper线程)self.time_periods: List[Tuple[float, float, bool]] = []  # 时间段记录(开始时间,结束时间,是否活跃)class GlobalState:"""全局状态容器"""def __init__(self):self.threads: Dict[Tuple[str, PID], ThreadStats] = defaultdict(ThreadStats)  # (线程名, PID) -> 线程统计self.cpus: Dict[CPUID, CPUStats] = defaultdict(CPUStats)  # CPU -> 统计信息self.current_pid: Dict[CPUID, Tuple[Tuple[str, PID], Timestamp]] = {}  # CPU当前运行的(线程名, PID)及开始时间self.pid_to_name: Dict[PID, str] = {}  # PID到线程名的映射# ----------------------
# 日志解析模块
# ----------------------
class FtraceParser:"""Ftrace日志解析器"""LOG_REGEX = re.compile(r'^(?P<comm>[^-]+)-(?P<pid>\d+)\s+\[(?P<cpu>\d+)\]\s+'r'(?P<timestamp>\d+\.\d+):\s+'r'(?P<event>\w+):\s+(?P<details>.*)$')@classmethoddef parse_line(cls, line: str) -> Optional[dict]:"""解析单行日志,返回结构化数据"""start_time = datetime.datetime.now()match = cls.LOG_REGEX.match(line.strip())if not match:logger.debug(f"无法解析的行: {line.strip()}")return Noneparsed_data = {'comm': match.group('comm'),'pid': int(match.group('pid')),'cpu': int(match.group('cpu')),'timestamp': float(match.group('timestamp')),'event': match.group('event'),'details': match.group('details')}duration = (datetime.datetime.now() - start_time).total_seconds() * 1000logger.debug(f"解析成功: {parsed_data}, 耗时: {duration:.2f}ms")return parsed_data# ----------------------
# 事件处理模块
# ----------------------
class EventProcessor:"""事件处理器"""SWAPPER_NAMES = ('swapper', 'swapper/')  # 空闲线程名称特征@classmethoddef _is_swapper(cls, comm: str, pid: int) -> bool:"""判断是否为表示空闲的swapper线程"""return pid == 0 or comm.startswith(cls.SWAPPER_NAMES)@classmethoddef handle_sched_switch(cls, state: GlobalState, entry: dict):"""处理进程切换事件"""start_time = datetime.datetime.now()details = entry['details']match = re.match(r'(?P<prev_comm>[^:]+(?::[^:]+)*):(?P<prev_pid>\d+).*?==>\s+'r'(?P<next_comm>[^:]+(?::[^:]+)*):(?P<next_pid>\d+)',details)if not match:logger.warning(f"无法解析的sched_switch事件: {details}")returnlogger.debug(f"开始处理sched_switch事件: CPU={entry['cpu']}, 从 {match.group('prev_comm')}:{match.group('prev_pid')} 切换到 {match.group('next_comm')}:{match.group('next_pid')}")cpu = entry['cpu']ts = entry['timestamp']# 提取 prev 线程信息prev_comm = match.group('prev_comm')prev_pid = int(match.group('prev_pid'))prev_key = (prev_comm, prev_pid)# 提取 next 线程信息next_comm = match.group('next_comm')next_pid = int(match.group('next_pid'))next_key = (next_comm, next_pid)# 结束前一个线程的时间统计if cpu in state.current_pid:(running_comm, running_pid), start_ts = state.current_pid[cpu]running_key = (running_comm, running_pid)duration = ts - start_tsprev_running_comm = running_comm# 记录时间段is_active = not cls._is_swapper(prev_running_comm, running_pid)state.cpus[cpu].time_periods.append((start_ts, ts, is_active))# 根据线程类型统计时间if cls._is_swapper(prev_running_comm, running_pid):state.cpus[cpu].total_idle += durationstate.threads[running_key].cpu_time[cpu] += durationstate.threads[running_key].total_time += durationelse:state.cpus[cpu].total_active += durationstate.threads[running_key].cpu_time[cpu] += durationstate.threads[running_key].total_time += duration# 记录新线程信息state.current_pid[cpu] = (next_key, ts)state.pid_to_name[prev_pid] = prev_commstate.pid_to_name[next_pid] = next_commstate.threads[prev_key].name = prev_commstate.threads[next_key].name = next_comm@classmethoddef handle_sched_stat_runtime(cls, state: GlobalState, entry: dict):"""处理运行时统计事件"""start_time = datetime.datetime.now()details = entry['details']comm = re.search(r'comm=([^\s]+)', details).group(1)pid = int(re.search(r'pid=(\d+)', details).group(1))runtime_ns = int(re.search(r'runtime=(\d+)', details).group(1))cpu = entry['cpu']runtime = runtime_ns / 1e9  # 转换为秒key = (comm, pid)logger.debug(f"开始处理sched_stat_runtime: CPU={cpu}, {comm}:{pid}, 运行时间={runtime:.6f}s")if cls._is_swapper(comm, pid):state.cpus[cpu].total_idle += runtimestate.threads[key].cpu_time[cpu] += runtimestate.threads[key].total_time += runtimestate.threads[key].name = commstate.pid_to_name[pid] = commelse:state.cpus[cpu].total_active += runtimestate.threads[key].total_time += runtimestate.threads[key].name = commstate.pid_to_name[pid] = comm# ----------------------
# 报告生成模块
# ----------------------
class ReportGenerator:"""统计报告生成器"""@staticmethoddef generate_html_report(state: GlobalState, start_ts: float, end_ts: float, output_filename: str = "report.html"):"""生成HTML可视化报告"""report_start = datetime.datetime.now()logger.info(f"start generate report, duration:{start_ts:.2f}s-{end_ts:.2f}s, output file: {output_filename}")# 生成CPU负载曲线数据window_size = 1.0  # 滑动窗口大小(秒)interval = 0.1  # 采样间隔(秒)time_points = np.arange(start_ts, end_ts, interval)cpu_load_data = defaultdict(list)logger.info(f"start analysis cpu loading, cpu number: {len(state.cpus)}")for cpu in state.cpus:periods = state.cpus[cpu].time_periodsfor t in time_points:window_start = t - window_size / 2window_end = t + window_size / 2if window_start < start_ts or window_end > end_ts:continuetotal_active_time = 0.0for (p_start, p_end, is_active) in periods:if p_end <= window_start or p_start >= window_end:continueoverlap_start = max(p_start, window_start)overlap_end = min(p_end, window_end)if is_active:total_active_time += overlap_end - overlap_startload_pct = (total_active_time / window_size) * 100 if window_size > 0 else 0cpu_load_data[cpu].append((t - start_ts, load_pct))logger.info(f"create chart...")# 创建图表output_file(output_filename)p = figure(title="CPU Load Percentage Over Time",x_axis_label='Time (s from start)',y_axis_label='Load (%)',width=1200,height=400,tools="pan,wheel_zoom,box_zoom,reset")colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728','#9467bd', '#8c564b', '#e377c2', '#7f7f7f']# 为每个CPU添加折线for idx, cpu in enumerate(sorted(cpu_load_data.keys())):data = cpu_load_data[cpu]if not data:continuetimes, loads = zip(*data)source = ColumnDataSource(data={'time': times,'load': loads,'cpu': [f'CPU {cpu}'] * len(times)})p.line('time', 'load',source=source,legend_label=f'CPU {cpu}',line_width=1.5,color=colors[idx % len(colors)])# 配置悬停工具hover = HoverTool(tooltips=[('CPU', '@cpu'),('Time', '@time{0.2f} s'),('Load', '@load{0.2f}%'),])p.add_tools(hover)p.legend.location = "top_left"p.legend.click_policy = "hide"# 生成线程表格数据cpus = sorted(state.cpus.keys())thread_data = []for (name, pid), stats in state.threads.items():total_time = stats.total_timetotal_percent = total_time / (end_ts - start_ts) * 100 if (end_ts - start_ts) > 0 else 0row = {'PID': pid, 'Name': name, 'Total (%)': f"{total_percent:.2f}%"}for cpu in cpus:cpu_time = stats.cpu_time.get(cpu, 0.0)cpu_percent = cpu_time / (end_ts - start_ts) * 100 if (end_ts - start_ts) > 0 else 0row[f'CPU {cpu}'] = f"{cpu_percent:.2f}%"thread_data.append(row)# 将 thread_data 转换为字典形式column_data = defaultdict(list)for row in thread_data:for key, value in row.items():column_data[key].append(value)# 创建数据表格columns = [TableColumn(field="PID", title="PID", width=80),TableColumn(field="Name", title="Thread Name", width=200),TableColumn(field="Total (%)", title="Total Load (%)", width=100),]for cpu in cpus:columns.append(TableColumn(field=f'CPU {cpu}', title=f'CPU {cpu} (%)', width=100))source = ColumnDataSource(column_data)data_table = DataTable(source=source,columns=columns,width=1200,height=400,editable=False,index_position=None,autosize_mode="none")# 组合布局并保存layout = column(p, data_table)save(layout)report_duration = (datetime.datetime.now() - report_start).total_seconds()logger.info(f"end report, {output_filename}, consume duration: {report_duration:.2f}s")# ----------------------
# 主流程
# ----------------------
def analyze_ftrace(log_path: str):"""主分析流程"""state = GlobalState()first_ts = Nonelast_ts = 0.0parse_start = datetime.datetime.now()logger.info("start parse ftrace log...")with open(log_path) as f:line_count = sum(1 for _ in f)logger.info(f"ftrace log file contains {line_count} lines")f.seek(0)for line in f:entry = FtraceParser.parse_line(line)if not entry:continuets = entry['timestamp']if first_ts is None:first_ts = tslast_ts = tsif entry['event'] == 'sched_switch':EventProcessor.handle_sched_switch(state, entry)elif entry['event'] == 'sched_stat_runtime':EventProcessor.handle_sched_stat_runtime(state, entry)# 处理最后未结束的时间段for cpu in state.current_pid:(running_comm, running_pid), start_ts = state.current_pid[cpu]running_key = (running_comm, running_pid)comm = running_commis_active = not EventProcessor._is_swapper(comm, running_pid)state.cpus[cpu].time_periods.append((start_ts, last_ts, is_active))duration = last_ts - start_tsif is_active:state.cpus[cpu].total_active += durationstate.threads[running_key].cpu_time[cpu] += durationstate.threads[running_key].total_time += durationelse:state.cpus[cpu].total_idle += durationstate.threads[running_key].cpu_time[cpu] += durationstate.threads[running_key].total_time += durationduration = (datetime.datetime.now() - parse_start).total_seconds()logger.info(f"end parse ftrace log, duration:{duration:.2f}s")# 生成报告if first_ts is not None:ReportGenerator.generate_html_report(state, first_ts, last_ts)if __name__ == "__main__":import sysif len(sys.argv) != 2:print("Usage: python ftrace_analyzer.py <ftrace.log>")sys.exit(1)analyze_ftrace(sys.argv[1])

2.Plotly

import re
import datetime
import numpy as np
import logging
from collections import defaultdict
from typing import Dict, Tuple, Optional, List
import plotly.graph_objects as go
import pandas as pd# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)# ----------------------
# 类型定义
# ----------------------
Timestamp = float  # 时间戳(秒)
CPUID = int  # CPU标识符
PID = int  # 进程IDclass ThreadStats:"""线程统计数据结构"""def __init__(self):self.name: str = ""  # 线程名self.cpu_time: Dict[CPUID, float] = defaultdict(float)  # 各CPU累计时间self.total_time: float = 0.0  # 总运行时间class CPUStats:"""CPU统计数据结构"""def __init__(self):self.total_active: float = 0.0  # 总活跃时间(非swapper线程)self.total_idle: float = 0.0  # 总空闲时间(swapper线程)self.time_periods: List[Tuple[float, float, bool]] = []  # 时间段记录(开始时间,结束时间,是否活跃)class GlobalState:"""全局状态容器"""def __init__(self):self.threads: Dict[Tuple[str, PID], ThreadStats] = defaultdict(ThreadStats)  # (线程名, PID) -> 线程统计self.cpus: Dict[CPUID, CPUStats] = defaultdict(CPUStats)  # CPU -> 统计信息self.current_pid: Dict[CPUID, Tuple[Tuple[str, PID], Timestamp]] = {}  # CPU当前运行的(线程名, PID)及开始时间self.pid_to_name: Dict[PID, str] = {}  # PID到线程名的映射# ----------------------
# 日志解析模块
# ----------------------
class FtraceParser:"""Ftrace日志解析器"""LOG_REGEX = re.compile(r'^(?P<comm>[^-]+)-(?P<pid>\d+)\s+\[(?P<cpu>\d+)\]\s+'r'(?P<timestamp>\d+\.\d+):\s+'r'(?P<event>\w+):\s+(?P<details>.*)$')@classmethoddef parse_line(cls, line: str) -> Optional[dict]:"""解析单行日志,返回结构化数据"""start_time = datetime.datetime.now()match = cls.LOG_REGEX.match(line.strip())if not match:logger.debug(f"无法解析的行: {line.strip()}")return Noneparsed_data = {'comm': match.group('comm'),'pid': int(match.group('pid')),'cpu': int(match.group('cpu')),'timestamp': float(match.group('timestamp')),'event': match.group('event'),'details': match.group('details')}duration = (datetime.datetime.now() - start_time).total_seconds() * 1000logger.debug(f"解析成功: {parsed_data}, 耗时: {duration:.2f}ms")return parsed_data# ----------------------
# 事件处理模块
# ----------------------
class EventProcessor:"""事件处理器"""SWAPPER_NAMES = ('swapper', 'swapper/')  # 空闲线程名称特征@classmethoddef _is_swapper(cls, comm: str, pid: int) -> bool:"""判断是否为表示空闲的swapper线程"""return pid == 0 or comm.startswith(cls.SWAPPER_NAMES)@classmethoddef handle_sched_switch(cls, state: GlobalState, entry: dict):"""处理进程切换事件"""start_time = datetime.datetime.now()details = entry['details']match = re.match(r'(?P<prev_comm>[^:]+(?::[^:]+)*):(?P<prev_pid>\d+).*?==>\s+'r'(?P<next_comm>[^:]+(?::[^:]+)*):(?P<next_pid>\d+)',details)if not match:logger.warning(f"无法解析的sched_switch事件: {details}")returnlogger.debug(f"开始处理sched_switch事件: CPU={entry['cpu']}, 从 {match.group('prev_comm')}:{match.group('prev_pid')} 切换到 {match.group('next_comm')}:{match.group('next_pid')}")cpu = entry['cpu']ts = entry['timestamp']# 提取 prev 线程信息prev_comm = match.group('prev_comm')prev_pid = int(match.group('prev_pid'))prev_key = (prev_comm, prev_pid)# 提取 next 线程信息next_comm = match.group('next_comm')next_pid = int(match.group('next_pid'))next_key = (next_comm, next_pid)# 结束前一个线程的时间统计if cpu in state.current_pid:(running_comm, running_pid), start_ts = state.current_pid[cpu]running_key = (running_comm, running_pid)duration = ts - start_tsprev_running_comm = running_comm# 记录时间段is_active = not cls._is_swapper(prev_running_comm, running_pid)state.cpus[cpu].time_periods.append((start_ts, ts, is_active))# 根据线程类型统计时间if cls._is_swapper(prev_running_comm, running_pid):state.cpus[cpu].total_idle += durationstate.threads[running_key].cpu_time[cpu] += durationstate.threads[running_key].total_time += durationelse:state.cpus[cpu].total_active += durationstate.threads[running_key].cpu_time[cpu] += durationstate.threads[running_key].total_time += duration# 记录新线程信息state.current_pid[cpu] = (next_key, ts)state.pid_to_name[prev_pid] = prev_commstate.pid_to_name[next_pid] = next_commstate.threads[prev_key].name = prev_commstate.threads[next_key].name = next_comm@classmethoddef handle_sched_stat_runtime(cls, state: GlobalState, entry: dict):"""处理运行时统计事件"""start_time = datetime.datetime.now()details = entry['details']comm = re.search(r'comm=([^\s]+)', details).group(1)pid = int(re.search(r'pid=(\d+)', details).group(1))runtime_ns = int(re.search(r'runtime=(\d+)', details).group(1))cpu = entry['cpu']runtime = runtime_ns / 1e9  # 转换为秒key = (comm, pid)logger.debug(f"开始处理sched_stat_runtime: CPU={cpu}, {comm}:{pid}, 运行时间={runtime:.6f}s")if cls._is_swapper(comm, pid):state.cpus[cpu].total_idle += runtimestate.threads[key].cpu_time[cpu] += runtimestate.threads[key].total_time += runtimestate.threads[key].name = commstate.pid_to_name[pid] = commelse:state.cpus[cpu].total_active += runtimestate.threads[key].total_time += runtimestate.threads[key].name = commstate.pid_to_name[pid] = comm# ----------------------
# 报告生成模块
# ----------------------
class ReportGenerator:"""统计报告生成器"""@staticmethoddef generate_html_report(state: GlobalState, start_ts: float, end_ts: float, output_filename: str = "report.html"):"""生成HTML可视化报告"""report_start = datetime.datetime.now()logger.info(f"start generate report, duration:{start_ts:.2f}s-{end_ts:.2f}s, output file: {output_filename}")# 生成CPU负载曲线数据window_size = 1.0  # 滑动窗口大小(秒)interval = 0.1  # 采样间隔(秒)time_points = np.arange(start_ts, end_ts, interval)cpu_load_data = defaultdict(list)logger.info(f"start analysis cpu loading, cpu number: {len(state.cpus)}")for cpu in state.cpus:periods = state.cpus[cpu].time_periodsfor t in time_points:window_start = t - window_size / 2window_end = t + window_size / 2if window_start < start_ts or window_end > end_ts:continuetotal_active_time = 0.0for (p_start, p_end, is_active) in periods:if p_end <= window_start or p_start >= window_end:continueoverlap_start = max(p_start, window_start)overlap_end = min(p_end, window_end)if is_active:total_active_time += overlap_end - overlap_startload_pct = (total_active_time / window_size) * 100 if window_size > 0 else 0cpu_load_data[cpu].append((t - start_ts, load_pct))logger.info(f"create chart...")# 创建图表fig = go.Figure()colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728','#9467bd', '#8c564b', '#e377c2', '#7f7f7f']# 为每个CPU添加折线for idx, cpu in enumerate(sorted(cpu_load_data.keys())):data = cpu_load_data[cpu]if not data:continuetimes, loads = zip(*data)fig.add_trace(go.Scatter(x=list(times),y=list(loads),mode='lines',name=f'CPU {cpu}',line=dict(color=colors[idx % len(colors)]),hovertemplate='CPU: %{name}<br>Time: %{x:.2f} s<br>Load: %{y:.2f}%'))fig.update_layout(title="CPU Load Percentage Over Time",xaxis_title='Time (s from start)',yaxis_title='Load (%)',legend=dict(orientation="h",yanchor="bottom",y=1.02,xanchor="right",x=1),hovermode='x unified')# 生成线程表格数据cpus = sorted(state.cpus.keys())thread_data = []for (name, pid), stats in state.threads.items():total_time = stats.total_timetotal_percent = total_time / (end_ts - start_ts) * 100 if (end_ts - start_ts) > 0 else 0row = {'PID': pid, 'Name': name, 'Total (%)': f"{total_percent:.2f}%"}for cpu in cpus:cpu_time = stats.cpu_time.get(cpu, 0.0)cpu_percent = cpu_time / (end_ts - start_ts) * 100 if (end_ts - start_ts) > 0 else 0row[f'CPU {cpu}'] = f"{cpu_percent:.2f}%"thread_data.append(row)# 创建数据表格df = pd.DataFrame(thread_data)table = go.Table(header=dict(values=list(df.columns),fill_color='paleturquoise',align='left'),cells=dict(values=[df[col] for col in df.columns],fill_color='lavender',align='left'))# 创建布局fig2 = go.Figure(data=[table])fig2.update_layout(title="Thread Load Statistics")# 组合布局并保存from plotly.subplots import make_subplotscombined_fig = make_subplots(rows=2, cols=1,subplot_titles=("CPU Load Percentage Over Time", "Thread Load Statistics"),specs=[[{'type': 'xy'}], [{'type': 'domain'}]])for trace in fig.data:combined_fig.add_trace(trace, row=1, col=1)for trace in fig2.data:combined_fig.add_trace(trace, row=2, col=1)combined_fig.update_layout(height=800, width=1200)combined_fig.write_html(output_filename)report_duration = (datetime.datetime.now() - report_start).total_seconds()logger.info(f"end report, {output_filename}, consume duration: {report_duration:.2f}s")# ----------------------
# 主流程
# ----------------------
def analyze_ftrace(log_path: str):"""主分析流程"""state = GlobalState()first_ts = Nonelast_ts = 0.0parse_start = datetime.datetime.now()logger.info("start parse ftrace log...")with open(log_path) as f:line_count = sum(1 for _ in f)logger.info(f"ftrace log file contains {line_count} lines")f.seek(0)for line in f:entry = FtraceParser.parse_line(line)if not entry:continuets = entry['timestamp']if first_ts is None:first_ts = tslast_ts = tsif entry['event'] == 'sched_switch':EventProcessor.handle_sched_switch(state, entry)elif entry['event'] == 'sched_stat_runtime':EventProcessor.handle_sched_stat_runtime(state, entry)# 处理最后未结束的时间段for cpu in state.current_pid:(running_comm, running_pid), start_ts = state.current_pid[cpu]running_key = (running_comm, running_pid)comm = running_commis_active = not EventProcessor._is_swapper(comm, running_pid)state.cpus[cpu].time_periods.append((start_ts, last_ts, is_active))duration = last_ts - start_tsif is_active:state.cpus[cpu].total_active += durationstate.threads[running_key].cpu_time[cpu] += durationstate.threads[running_key].total_time += durationelse:state.cpus[cpu].total_idle += durationstate.threads[running_key].cpu_time[cpu] += durationstate.threads[running_key].total_time += durationduration = (datetime.datetime.now() - parse_start).total_seconds()logger.info(f"end parse ftrace log, duration:{duration:.2f}s")# 生成报告if first_ts is not None:ReportGenerator.generate_html_report(state, first_ts, last_ts)if __name__ == "__main__":import sysif len(sys.argv) != 2:print("Usage: python ftrace_analyzer.py <ftrace.log>")sys.exit(1)analyze_ftrace(sys.argv[1])

http://www.dtcms.com/wzjs/276001.html

相关文章:

  • 做网站后用浏览量百度给钱排名点击软件怎样
  • 建设网站收取广告费用百度热线客服24小时
  • 病毒杂志WordPress主题志鸿优化设计
  • 做装修公司网站百度app下载安装官方免费下载
  • 网站首页banner图海外营销推广服务
  • 的网站制作百度手机助手安卓版下载
  • 临朐网站建设定制首选哪家公司精准网站seo诊断报告
  • 帮人做网站如何收费企业网站管理系统源码
  • 商城网站怎么做推广方案网站快速搜索
  • 网站制作哪家好薇百度建一个网站多少钱
  • 专业建设购物网站南宁seo标准
  • 网络哪个公司好手机网站关键词seo
  • 网站建设合同印花税税目百度一下就知道手机版
  • 遵义网站制作教程培训机构是干什么的
  • 南山的网站建设公司有利于seo优化的是
  • 中山优秀网站建设seo代码优化包括哪些
  • 最牛的网站建设线上购买链接
  • wordpress开发官网seo优化基础教程pdf
  • 哪个网站有成品的毕业论文网站如何做优化排名
  • 网站正在建设中 打不开怎么办站长之家ppt模板
  • 应用网站开发百度seo排名培训优化
  • 女人做春梦网站百度seo指数查询
  • 网站个人博客怎么做谷歌推广优化
  • 无锡企业自助建站系统178软文网
  • wordpress中文附件乱码seo资源是什么意思
  • 北京故宫网页设计公司网站如何seo
  • 如何优选网站建设公司百度一下生活更好
  • 自己做购物网站seo职业
  • 平板电脑可以做淘宝网站吗百度推广app下载安卓版
  • 承德做网站boyun营销策划的概念