import torch
import time
import threading
import argparse
import math
import randomclass SimpleGPUStress:def __init__(self, gpu_ids=None, target_usage=80, fluctuation=20, memory_limit=85, matrix_size=2048):"""简洁版GPU压力测试Args:gpu_ids: GPU ID列表,如[0,1]或None(使用所有GPU)target_usage: 目标使用率% (默认80%)fluctuation: 波动范围% (默认±20%)memory_limit: 显存使用上限% (默认85%)matrix_size: 基础矩阵大小 (默认2048)"""if not torch.cuda.is_available():raise RuntimeError("CUDA不可用")if gpu_ids is None:self.gpu_ids = list(range(torch.cuda.device_count()))else:self.gpu_ids = gpu_idsself.target_usage = target_usage / 100.0self.fluctuation = fluctuation / 100.0self.memory_limit = memory_limit / 100.0self.matrix_size = matrix_sizeself.running = Trueself.gpu_memory_info = {}for gpu_id in self.gpu_ids:total_memory = torch.cuda.get_device_properties(gpu_id).total_memorymax_memory = total_memory * self.memory_limitself.gpu_memory_info[gpu_id] = {'total_gb': total_memory / 1024**3,'max_gb': max_memory / 1024**3}print(f"使用GPU: {self.gpu_ids}")print(f"目标使用率: {target_usage}% ±{fluctuation}%")print(f"显存限制: {memory_limit}%")for gpu_id in self.gpu_ids:info = self.gpu_memory_info[gpu_id]print(f"GPU {gpu_id}: 总显存 {info['total_gb']:.1f}GB, 限制 {info['max_gb']:.1f}GB")def get_current_target(self, start_time):"""计算当前目标强度(正弦波浮动)"""elapsed = time.time() - start_timewave = math.sin(elapsed / 30 * 2 * math.pi) target = self.target_usage + wave * self.fluctuationreturn max(0.1, min(1.0, target))def manage_memory(self, gpu_id, memory_pool):"""管理显存使用量"""torch.cuda.set_device(gpu_id)current_memory = torch.cuda.memory_allocated(gpu_id) / 1024**3target_memory = self.gpu_memory_info[gpu_id]['max_gb'] * 0.8 if current_memory < target_memory and len(memory_pool) < 10:try:size = random.randint(1024, 3072)matrix = torch.randn(size, size, device=f'cuda:{gpu_id}')memory_pool.append(matrix)except RuntimeError:pass elif current_memory > target_memory and len(memory_pool) > 1:if memory_pool:memory_pool.pop()torch.cuda.empty_cache()def worker(self, gpu_id):"""GPU工作线程"""torch.cuda.set_device(gpu_id)device = f'cuda:{gpu_id}'a = torch.randn(self.matrix_size, self.matrix_size, device=device)b = torch.randn(self.matrix_size, self.matrix_size, device=device)memory_pool = []start_time = time.time()iteration = 0print(f"GPU {gpu_id} 工作线程启动")while self.running:try:target_intensity = self.get_current_target(start_time)if iteration % 100 == 0:self.manage_memory(gpu_id, memory_pool)if random.random() < target_intensity:c = torch.mm(a, b)if target_intensity > 0.7:c = torch.relu(c)c = torch.sigmoid(c)c = torch.tanh(c)elif target_intensity > 0.4:c = torch.relu(c)c = torch.sigmoid(c)else:c = torch.relu(c)torch.cuda.synchronize()sleep_time = (1 - target_intensity) * 0.01if sleep_time > 0:time.sleep(sleep_time)iteration += 1except RuntimeError as e:if "out of memory" in str(e):memory_pool.clear()torch.cuda.empty_cache()print(f"GPU {gpu_id} 显存不足,已清理")else:print(f"GPU {gpu_id} 错误: {e}")breakprint(f"GPU {gpu_id} 工作线程退出")def monitor(self):"""监控线程"""start_time = time.time()while self.running:try:elapsed = time.time() - start_timetarget = self.get_current_target(start_time)print(f"\n时间: {elapsed:.0f}s | 目标强度: {target:.2f}")for gpu_id in self.gpu_ids:memory_used = torch.cuda.memory_allocated(gpu_id) / 1024**3memory_total = self.gpu_memory_info[gpu_id]['total_gb']memory_percent = (memory_used / memory_total) * 100print(f"GPU {gpu_id}: 显存 {memory_used:.1f}GB/{memory_total:.1f}GB ({memory_percent:.1f}%)")time.sleep(3) except KeyboardInterrupt:breakdef start(self):"""启动压力测试"""print("\n开始GPU压力测试,按Ctrl+C停止...\n")threads = []try:for gpu_id in self.gpu_ids:t = threading.Thread(target=self.worker, args=(gpu_id,))t.daemon = Truethreads.append(t)t.start()monitor_thread = threading.Thread(target=self.monitor)monitor_thread.daemon = Truemonitor_thread.start()while self.running:time.sleep(0.1)except KeyboardInterrupt:print("\n正在停止...")self.running = Falsefor t in threads:t.join(timeout=1)for gpu_id in self.gpu_ids:torch.cuda.set_device(gpu_id)torch.cuda.empty_cache()print("已停止")def parse_gpu_ids(gpu_str):"""解析GPU ID"""if gpu_str.lower() == 'all':return Nonereturn [int(x) for x in gpu_str.split(',')]def main():parser = argparse.ArgumentParser(description='简洁版GPU压力测试')parser.add_argument('--gpu', '-g', type=parse_gpu_ids, default='0',help='GPU ID,如: 0 或 0,1 或 all')parser.add_argument('--target', '-t', type=int, default=80,help='目标使用率% (默认80)')parser.add_argument('--fluctuation', '-f', type=int, default=20,help='波动范围% (默认±20)')parser.add_argument('--memory-limit', '-m', type=int, default=85,help='显存使用上限% (默认85)')parser.add_argument('--matrix-size', '-s', type=int, default=2048 * 2,help='矩阵大小 (默认2048)')args = parser.parse_args()try:stress_test = SimpleGPUStress(gpu_ids=args.gpu,target_usage=args.target,fluctuation=args.fluctuation,memory_limit=args.memory_limit,matrix_size=args.matrix_size)stress_test.start()except Exception as e:print(f"错误: {e}")if __name__ == "__main__":main()