""" GPU Benchmark Module GPU 性能测试:显存带宽、FP32/FP16/Tensor Core 算力 仅在有 NVIDIA GPU 时可用 """ import time from typing import Dict, Any, Optional def check_gpu_available() -> bool: """检查是否有可用的 GPU""" try: import subprocess result = subprocess.run(['nvidia-smi'], capture_output=True, timeout=5) return result.returncode == 0 except: return False def check_cuda_available() -> bool: """检查 PyTorch CUDA 是否可用""" try: import torch return torch.cuda.is_available() except ImportError: return False def benchmark_gpu_memory_bandwidth() -> Optional[Dict[str, Any]]: """GPU 显存带宽测试""" if not check_cuda_available(): return None try: import torch device = torch.device('cuda') # 测试不同大小 size_mb = 256 size_elements = size_mb * 1024 * 1024 // 4 # float32 # 创建张量 src = torch.ones(size_elements, dtype=torch.float32, device=device) # 预热 for _ in range(10): dst = src.clone() torch.cuda.synchronize() # 带宽测试 start_time = time.time() iterations = 0 while time.time() - start_time < 2.0: dst = src.clone() iterations += 1 torch.cuda.synchronize() elapsed = time.time() - start_time # 计算带宽 (读 + 写) bytes_transferred = size_mb * 1024 * 1024 * 2 * iterations bandwidth = bytes_transferred / elapsed / (1024**3) return { "test": "gpu_memory_bandwidth", "description": f"GPU memory bandwidth ({size_mb}MB)", "size_mb": size_mb, "duration_seconds": round(elapsed, 3), "iterations": iterations, "bandwidth_gb_s": round(bandwidth, 2), "score": round(bandwidth, 2), } except Exception as e: return {"error": str(e)} def benchmark_gpu_fp32(matrix_size: int = 4096) -> Optional[Dict[str, Any]]: """GPU FP32 算力测试""" if not check_cuda_available(): return None try: import torch device = torch.device('cuda') # 创建矩阵 a = torch.randn(matrix_size, matrix_size, dtype=torch.float32, device=device) b = torch.randn(matrix_size, matrix_size, dtype=torch.float32, device=device) # 预热 for _ in range(5): _ = torch.mm(a, b) torch.cuda.synchronize() # 测试 start_time = time.time() iterations = 0 while time.time() - start_time < 3.0: _ = torch.mm(a, b) iterations += 1 torch.cuda.synchronize() elapsed = time.time() - start_time # 计算 TFLOPS flops_per_matmul = 2 * (matrix_size ** 3) total_flops = flops_per_matmul * iterations tflops = total_flops / elapsed / 1e12 return { "test": "gpu_fp32", "description": f"GPU FP32 compute ({matrix_size}x{matrix_size} matmul)", "matrix_size": matrix_size, "duration_seconds": round(elapsed, 3), "iterations": iterations, "tflops": round(tflops, 3), "score": round(tflops * 10, 2), } except Exception as e: return {"error": str(e)} def benchmark_gpu_fp16(matrix_size: int = 4096) -> Optional[Dict[str, Any]]: """GPU FP16 算力测试""" if not check_cuda_available(): return None try: import torch device = torch.device('cuda') # 创建矩阵 a = torch.randn(matrix_size, matrix_size, dtype=torch.float16, device=device) b = torch.randn(matrix_size, matrix_size, dtype=torch.float16, device=device) # 预热 for _ in range(5): _ = torch.mm(a, b) torch.cuda.synchronize() # 测试 start_time = time.time() iterations = 0 while time.time() - start_time < 3.0: _ = torch.mm(a, b) iterations += 1 torch.cuda.synchronize() elapsed = time.time() - start_time # 计算 TFLOPS flops_per_matmul = 2 * (matrix_size ** 3) total_flops = flops_per_matmul * iterations tflops = total_flops / elapsed / 1e12 return { "test": "gpu_fp16", "description": f"GPU FP16 compute ({matrix_size}x{matrix_size} matmul)", "matrix_size": matrix_size, "duration_seconds": round(elapsed, 3), "iterations": iterations, "tflops": round(tflops, 3), "score": round(tflops * 5, 2), } except Exception as e: return {"error": str(e)} def benchmark_gpu_tensor_cores(matrix_size: int = 4096) -> Optional[Dict[str, Any]]: """GPU Tensor Core 混合精度算力测试""" if not check_cuda_available(): return None try: import torch if not hasattr(torch.cuda, 'amp') or torch.cuda.get_device_capability()[0] < 7: return {"error": "Tensor Cores not available (requires compute capability >= 7.0)"} device = torch.device('cuda') # 使用自动混合精度 a = torch.randn(matrix_size, matrix_size, dtype=torch.float16, device=device) b = torch.randn(matrix_size, matrix_size, dtype=torch.float16, device=device) # 预热 with torch.cuda.amp.autocast(): for _ in range(5): _ = torch.mm(a, b) torch.cuda.synchronize() # 测试 start_time = time.time() iterations = 0 with torch.cuda.amp.autocast(): while time.time() - start_time < 3.0: _ = torch.mm(a, b) iterations += 1 torch.cuda.synchronize() elapsed = time.time() - start_time # 计算 TFLOPS flops_per_matmul = 2 * (matrix_size ** 3) total_flops = flops_per_matmul * iterations tflops = total_flops / elapsed / 1e12 return { "test": "gpu_tensor_cores", "description": f"GPU Tensor Cores mixed precision ({matrix_size}x{matrix_size})", "matrix_size": matrix_size, "duration_seconds": round(elapsed, 3), "iterations": iterations, "tflops": round(tflops, 3), "score": round(tflops * 3, 2), } except Exception as e: return {"error": str(e)} def run_all_gpu_benchmarks() -> Optional[Dict[str, Any]]: """运行所有 GPU 基准测试""" if not check_gpu_available(): return None results = {} # 内存带宽 mem_result = benchmark_gpu_memory_bandwidth() if mem_result: results["memory_bandwidth"] = mem_result # FP32 算力 fp32_result = benchmark_gpu_fp32() if fp32_result: results["fp32"] = fp32_result # FP16 算力 fp16_result = benchmark_gpu_fp16() if fp16_result: results["fp16"] = fp16_result # Tensor Cores tc_result = benchmark_gpu_tensor_cores() if tc_result: results["tensor_cores"] = tc_result if not results: return None # 计算总分 total_score = sum(r.get("score", 0) for r in results.values() if isinstance(r, dict) and "error" not in r) results["total_score"] = round(total_score, 2) return results