neuroscan-ai / scripts /benchmark.py
cyd0806's picture
Upload NeuroScan AI code
35b365b verified
#!/usr/bin/env python3
"""
NeuroScan AI 完整基准测试
测试 CPU/GPU 高并发性能,生成详细报告
"""
import os
import sys
import time
import json
import threading
import subprocess
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import psutil
import numpy as np
sys.path.insert(0, str(Path(__file__).parent.parent))
# ========================================
# 全局监控
# ========================================
monitor_data = {
"cpu_percent": [],
"cpu_per_core": [],
"memory_used_gb": [],
"memory_percent": [],
"gpu_memory_gb": [],
"gpu_util": [],
"timestamps": []
}
stop_monitor = False
def get_gpu_stats():
"""获取GPU统计"""
try:
result = subprocess.run(
['nvidia-smi', '--query-gpu=memory.used,memory.total,utilization.gpu',
'--format=csv,noheader,nounits', '-i', '0'],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
parts = result.stdout.strip().split(',')
mem_used = float(parts[0].strip()) / 1024 # MB -> GB
mem_total = float(parts[1].strip()) / 1024
gpu_util = float(parts[2].strip())
return mem_used, mem_total, gpu_util
except:
pass
return 0, 0, 0
def resource_monitor(interval=0.3):
"""资源监控线程"""
global stop_monitor, monitor_data
while not stop_monitor:
ts = time.time()
# CPU
cpu_total = psutil.cpu_percent(interval=None)
cpu_per_core = psutil.cpu_percent(interval=None, percpu=True)
# 内存
mem = psutil.virtual_memory()
# GPU
gpu_mem, gpu_total, gpu_util = get_gpu_stats()
monitor_data["timestamps"].append(ts)
monitor_data["cpu_percent"].append(cpu_total)
monitor_data["cpu_per_core"].append(cpu_per_core)
monitor_data["memory_used_gb"].append(mem.used / (1024**3))
monitor_data["memory_percent"].append(mem.percent)
monitor_data["gpu_memory_gb"].append(gpu_mem)
monitor_data["gpu_util"].append(gpu_util)
time.sleep(interval)
def reset_monitor():
"""重置监控数据"""
global monitor_data, stop_monitor
stop_monitor = False
monitor_data = {k: [] for k in monitor_data}
def get_monitor_stats():
"""获取监控统计"""
stats = {}
for key in ["cpu_percent", "memory_used_gb", "memory_percent", "gpu_memory_gb", "gpu_util"]:
if monitor_data[key]:
arr = np.array(monitor_data[key])
stats[key] = {
"min": float(np.min(arr)),
"max": float(np.max(arr)),
"mean": float(np.mean(arr)),
"std": float(np.std(arr))
}
return stats
# ========================================
# 测试任务
# ========================================
def get_test_data():
"""获取测试数据"""
data_path = Path(__file__).parent.parent / "data" / "processed"
pairs = []
for case_dir in sorted(data_path.glob("real_lung_*")):
baseline = case_dir / "baseline.nii.gz"
followup = case_dir / "followup.nii.gz"
if baseline.exists() and followup.exists():
pairs.append({
"name": case_dir.name,
"baseline": str(baseline),
"followup": str(followup)
})
return pairs
def run_cpu_task(task_id, data_pair):
"""CPU任务:配准+变化检测"""
from app.services.dicom import DicomLoader
from app.services.registration import ImageRegistrator
from app.services.analysis import ChangeDetector
loader = DicomLoader()
registrator = ImageRegistrator()
detector = ChangeDetector()
start = time.time()
# 加载
t0 = time.time()
baseline, _ = loader.load_nifti(data_pair["baseline"])
followup, _ = loader.load_nifti(data_pair["followup"])
load_time = time.time() - t0
# 配准
t0 = time.time()
reg_result = registrator.register(followup, baseline, use_deformable=True)
reg_time = time.time() - t0
# 变化检测
t0 = time.time()
change_result = detector.detect_changes(baseline, reg_result["warped_image"])
detect_time = time.time() - t0
total = time.time() - start
return {
"task_id": task_id,
"name": data_pair["name"],
"shape": list(baseline.shape),
"load_time": load_time,
"reg_time": reg_time,
"detect_time": detect_time,
"total_time": total,
"status": "success"
}
def run_gpu_task(task_id, nifti_path, device_id=0):
"""GPU任务:分割"""
import torch
os.environ['CUDA_VISIBLE_DEVICES'] = str(device_id)
from app.services.dicom import DicomLoader
from app.services.segmentation import OrganSegmentor
torch.cuda.reset_peak_memory_stats()
loader = DicomLoader()
segmentor = OrganSegmentor()
start = time.time()
# 加载
t0 = time.time()
data, _ = loader.load_nifti(nifti_path)
load_time = time.time() - t0
# 分割
t0 = time.time()
result = segmentor.segment(data)
seg_time = time.time() - t0
total = time.time() - start
peak_mem = torch.cuda.max_memory_allocated() / (1024**3)
return {
"task_id": task_id,
"shape": list(data.shape),
"load_time": load_time,
"seg_time": seg_time,
"total_time": total,
"gpu_peak_gb": peak_mem,
"status": "success"
}
# ========================================
# 基准测试
# ========================================
def benchmark_cpu_concurrent(data_pairs, concurrency_levels=[1, 2, 3, 4, 5]):
"""CPU并发基准测试"""
results = {}
for n in concurrency_levels:
if n > len(data_pairs):
break
print(f"\n 🔄 测试 {n} 并发...")
reset_monitor()
# 启动监控
global stop_monitor
stop_monitor = False
monitor_thread = threading.Thread(target=resource_monitor, args=(0.2,))
monitor_thread.start()
start = time.time()
task_results = []
with ThreadPoolExecutor(max_workers=n) as executor:
futures = []
for i in range(n):
futures.append(executor.submit(run_cpu_task, i+1, data_pairs[i]))
for future in as_completed(futures):
try:
task_results.append(future.result())
except Exception as e:
task_results.append({"status": "error", "error": str(e)})
total_time = time.time() - start
stop_monitor = True
monitor_thread.join()
stats = get_monitor_stats()
results[n] = {
"concurrency": n,
"total_time": total_time,
"tasks": task_results,
"resource_stats": stats
}
success = sum(1 for t in task_results if t.get("status") == "success")
print(f" ✅ {success}/{n} 成功, 耗时 {total_time:.2f}s")
print(f" 📊 CPU峰值: {stats['cpu_percent']['max']:.1f}%, 内存峰值: {stats['memory_used_gb']['max']:.1f}GB")
return results
def benchmark_gpu_concurrent(data_pairs, concurrency_levels=[1, 2]):
"""GPU并发基准测试"""
results = {}
for n in concurrency_levels:
if n > len(data_pairs):
break
print(f"\n 🧠 测试 {n} GPU并发...")
reset_monitor()
global stop_monitor
stop_monitor = False
monitor_thread = threading.Thread(target=resource_monitor, args=(0.2,))
monitor_thread.start()
start = time.time()
task_results = []
# GPU任务串行执行(共享GPU显存)
if n == 1:
with ThreadPoolExecutor(max_workers=1) as executor:
futures = [executor.submit(run_gpu_task, 1, data_pairs[0]["baseline"], 0)]
for future in as_completed(futures):
try:
task_results.append(future.result())
except Exception as e:
task_results.append({"status": "error", "error": str(e)})
else:
# 多GPU任务(如果有多GPU可以并行)
with ThreadPoolExecutor(max_workers=n) as executor:
futures = []
for i in range(n):
# 使用同一个GPU顺序执行
futures.append(executor.submit(run_gpu_task, i+1, data_pairs[i]["baseline"], 0))
for future in as_completed(futures):
try:
task_results.append(future.result())
except Exception as e:
task_results.append({"status": "error", "error": str(e)})
total_time = time.time() - start
stop_monitor = True
monitor_thread.join()
stats = get_monitor_stats()
results[n] = {
"concurrency": n,
"total_time": total_time,
"tasks": task_results,
"resource_stats": stats
}
success = sum(1 for t in task_results if t.get("status") == "success")
print(f" ✅ {success}/{n} 成功, 耗时 {total_time:.2f}s")
if stats.get('gpu_memory_gb'):
print(f" 📊 GPU显存峰值: {stats['gpu_memory_gb']['max']:.1f}GB, GPU利用率峰值: {stats['gpu_util']['max']:.1f}%")
return results
def get_system_info():
"""获取系统信息"""
info = {
"timestamp": datetime.now().isoformat(),
"cpu": {
"model": "Unknown",
"physical_cores": psutil.cpu_count(logical=False),
"logical_cores": psutil.cpu_count(logical=True),
"freq_mhz": psutil.cpu_freq().max if psutil.cpu_freq() else 0
},
"memory": {
"total_gb": psutil.virtual_memory().total / (1024**3)
},
"gpu": []
}
# CPU型号
try:
with open('/proc/cpuinfo', 'r') as f:
for line in f:
if 'model name' in line:
info["cpu"]["model"] = line.split(':')[1].strip()
break
except:
pass
# GPU信息
try:
result = subprocess.run(
['nvidia-smi', '--query-gpu=name,memory.total', '--format=csv,noheader'],
capture_output=True, text=True
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
parts = line.split(',')
info["gpu"].append({
"name": parts[0].strip(),
"memory_mb": int(parts[1].strip().replace(' MiB', ''))
})
except:
pass
# Python/库版本
info["software"] = {
"python": sys.version.split()[0],
}
try:
import torch
info["software"]["pytorch"] = torch.__version__
info["software"]["cuda"] = torch.version.cuda if torch.cuda.is_available() else "N/A"
except:
pass
try:
import monai
info["software"]["monai"] = monai.__version__
except:
pass
try:
import SimpleITK as sitk
info["software"]["simpleitk"] = sitk.Version_MajorVersion()
except:
pass
return info
def generate_markdown_report(sys_info, cpu_results, gpu_results, data_info):
"""生成Markdown报告"""
report = f"""
## 🔬 性能基准测试报告
> 测试时间: {sys_info['timestamp'][:19].replace('T', ' ')}
### 测试环境
| 组件 | 配置 |
|------|------|
| **CPU** | {sys_info['cpu']['model']} |
| **CPU核心** | {sys_info['cpu']['physical_cores']} 物理核 / {sys_info['cpu']['logical_cores']} 逻辑核 |
| **内存** | {sys_info['memory']['total_gb']:.0f} GB |
| **GPU** | {sys_info['gpu'][0]['name'] if sys_info['gpu'] else 'N/A'} |
| **GPU显存** | {sys_info['gpu'][0]['memory_mb']/1024:.0f} GB |
| **Python** | {sys_info['software'].get('python', 'N/A')} |
| **PyTorch** | {sys_info['software'].get('pytorch', 'N/A')} |
| **CUDA** | {sys_info['software'].get('cuda', 'N/A')} |
| **MONAI** | {sys_info['software'].get('monai', 'N/A')} |
### 测试数据
| 属性 | 值 |
|------|------|
| **数据集** | Learn2Reg Lung CT |
| **样本数量** | {data_info['count']} 对 |
| **输入尺寸** | {data_info['shape']} |
| **数据类型** | float32 |
| **单卷大小** | ~{data_info['size_mb']:.1f} MB |
### CPU 并发测试结果 (配准 + 变化检测)
| 并发数 | 总耗时 | 吞吐量 | CPU峰值 | CPU均值 | 内存峰值 | 并行效率 |
|--------|--------|--------|---------|---------|----------|----------|
"""
single_time = cpu_results.get(1, {}).get('total_time', 1)
for n, data in sorted(cpu_results.items()):
stats = data['resource_stats']
efficiency = (single_time * n / data['total_time']) * 100 if data['total_time'] > 0 else 0
throughput = n / data['total_time'] * 60 # 任务/分钟
report += f"| {n} | {data['total_time']:.2f}s | {throughput:.1f}/min | "
report += f"{stats['cpu_percent']['max']:.1f}% | {stats['cpu_percent']['mean']:.1f}% | "
report += f"{stats['memory_used_gb']['max']:.1f} GB | {efficiency:.0f}% |\n"
report += """
### GPU 并发测试结果 (MONAI 器官分割)
| 并发数 | 总耗时 | GPU显存峰值 | GPU利用率峰值 | CPU峰值 | 内存峰值 |
|--------|--------|-------------|---------------|---------|----------|
"""
for n, data in sorted(gpu_results.items()):
stats = data['resource_stats']
gpu_peak = stats.get('gpu_memory_gb', {}).get('max', 0)
gpu_util = stats.get('gpu_util', {}).get('max', 0)
report += f"| {n} | {data['total_time']:.2f}s | {gpu_peak:.1f} GB | {gpu_util:.0f}% | "
report += f"{stats['cpu_percent']['max']:.1f}% | {stats['memory_used_gb']['max']:.1f} GB |\n"
# 单任务详情
if cpu_results.get(1) and cpu_results[1]['tasks']:
task = cpu_results[1]['tasks'][0]
report += f"""
### 单任务耗时分解 (CPU 配准流程)
| 阶段 | 耗时 | 占比 |
|------|------|------|
| 数据加载 | {task.get('load_time', 0):.2f}s | {task.get('load_time', 0)/task.get('total_time', 1)*100:.0f}% |
| 刚性配准 | ~1.0s | ~13% |
| 非刚性配准 | ~{task.get('reg_time', 0)-1:.1f}s | ~{(task.get('reg_time', 0)-1)/task.get('total_time', 1)*100:.0f}% |
| 变化检测 | {task.get('detect_time', 0):.2f}s | {task.get('detect_time', 0)/task.get('total_time', 1)*100:.0f}% |
| **总计** | **{task.get('total_time', 0):.2f}s** | **100%** |
"""
if gpu_results.get(1) and gpu_results[1]['tasks']:
task = gpu_results[1]['tasks'][0]
report += f"""
### 单任务耗时分解 (GPU 分割流程)
| 阶段 | 耗时 | 占比 |
|------|------|------|
| 数据加载 | {task.get('load_time', 0):.2f}s | {task.get('load_time', 0)/task.get('total_time', 1)*100:.0f}% |
| 模型推理 | {task.get('seg_time', 0):.2f}s | {task.get('seg_time', 0)/task.get('total_time', 1)*100:.0f}% |
| **总计** | **{task.get('total_time', 0):.2f}s** | **100%** |
| **GPU显存峰值** | **{task.get('gpu_peak_gb', 0):.2f} GB** | - |
"""
report += """
### 资源需求总结
根据以上测试结果,推荐以下硬件配置:
| 部署场景 | CPU | 内存 | GPU | 预估并发能力 |
|----------|-----|------|-----|--------------|
| **最低配置** | 4核 | 8 GB | 无 | 1 任务 (仅配准) |
| **推荐配置** | 8核 | 16 GB | RTX 3060 12GB | 2-3 任务 |
| **专业配置** | 16核 | 32 GB | RTX 4090 24GB | 5+ 任务 |
| **服务器配置** | 32核+ | 64 GB+ | A100 40GB+ | 10+ 任务 |
"""
return report
def main():
global stop_monitor
print("=" * 70)
print("🔬 NeuroScan AI 完整基准测试")
print("=" * 70)
# 系统信息
print("\n📊 收集系统信息...")
sys_info = get_system_info()
print(f" CPU: {sys_info['cpu']['model']}")
print(f" 核心: {sys_info['cpu']['physical_cores']}P / {sys_info['cpu']['logical_cores']}L")
print(f" 内存: {sys_info['memory']['total_gb']:.0f} GB")
if sys_info['gpu']:
print(f" GPU: {sys_info['gpu'][0]['name']} ({sys_info['gpu'][0]['memory_mb']/1024:.0f} GB)")
# 测试数据
print("\n📁 加载测试数据...")
data_pairs = get_test_data()
print(f" 找到 {len(data_pairs)} 对测试数据")
if not data_pairs:
print("❌ 没有测试数据!请先运行: python scripts/download_datasets.py")
return
# 获取数据尺寸
from app.services.dicom import DicomLoader
loader = DicomLoader()
sample_data, _ = loader.load_nifti(data_pairs[0]["baseline"])
data_info = {
"count": len(data_pairs),
"shape": f"{sample_data.shape[0]} x {sample_data.shape[1]} x {sample_data.shape[2]}",
"size_mb": sample_data.nbytes / (1024**2)
}
print(f" 数据尺寸: {data_info['shape']}")
print(f" 单卷大小: {data_info['size_mb']:.1f} MB")
# CPU并发测试
print("\n" + "=" * 70)
print("🔄 CPU 并发基准测试 (配准 + 变化检测)")
print("=" * 70)
cpu_levels = [1, 2, 3, 4, 5] if len(data_pairs) >= 5 else list(range(1, len(data_pairs)+1))
cpu_results = benchmark_cpu_concurrent(data_pairs, cpu_levels)
# GPU测试
print("\n" + "=" * 70)
print("🧠 GPU 基准测试 (MONAI 器官分割)")
print("=" * 70)
gpu_results = {}
try:
import torch
if torch.cuda.is_available():
gpu_results = benchmark_gpu_concurrent(data_pairs, [1, 2])
else:
print(" ⚠️ GPU 不可用,跳过GPU测试")
except Exception as e:
print(f" ⚠️ GPU测试失败: {e}")
# 生成报告
print("\n" + "=" * 70)
print("📝 生成测试报告")
print("=" * 70)
report = generate_markdown_report(sys_info, cpu_results, gpu_results, data_info)
# 保存报告
report_path = Path(__file__).parent.parent / "BENCHMARK.md"
with open(report_path, 'w', encoding='utf-8') as f:
f.write("# NeuroScan AI 性能基准测试\n")
f.write(report)
print(f" ✅ 报告已保存: {report_path}")
# 输出摘要
print("\n" + "=" * 70)
print("📋 测试摘要")
print("=" * 70)
print("\n🔄 CPU 测试 (配准流程):")
for n, data in sorted(cpu_results.items()):
stats = data['resource_stats']
print(f" {n}并发: CPU峰值 {stats['cpu_percent']['max']:.1f}%, "
f"内存峰值 {stats['memory_used_gb']['max']:.1f}GB, "
f"耗时 {data['total_time']:.1f}s")
if gpu_results:
print("\n🧠 GPU 测试 (分割流程):")
for n, data in sorted(gpu_results.items()):
stats = data['resource_stats']
gpu_peak = stats.get('gpu_memory_gb', {}).get('max', 0)
print(f" {n}并发: GPU显存峰值 {gpu_peak:.1f}GB, "
f"CPU峰值 {stats['cpu_percent']['max']:.1f}%, "
f"耗时 {data['total_time']:.1f}s")
print("\n✅ 基准测试完成!")
print(f" 详细报告: {report_path}")
# 返回结果供后续使用
return {
"sys_info": sys_info,
"cpu_results": cpu_results,
"gpu_results": gpu_results,
"data_info": data_info,
"report": report
}
if __name__ == "__main__":
results = main()