kuai / diffusion-dpo-test /test_flops.py
Larer's picture
Add files using upload-large-folder tool
5c19a88
import os
os.environ["HF_HOME"] = "/home/wanghongbo06/.cache/huggingface"
import torch
import torch.multiprocessing as mp
from diffusers.pipelines import FluxPipeline
from src.flux.condition import Condition
from src.flux.generate import generate, seed_everything
from color_fix import wavelet_color_fix, adain_color_fix
from PIL import Image
from tqdm import tqdm
import time
import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Any
# ============== 配置 ==============
input_folder = "/home/wanghongbo06/baipudui/DATA/DIV2K/DIV2K-val-epoch1/lr"
output_folder = "/home/wanghongbo06/baipurui/results/flops"
# LoRA 路径
# SR_LORA_PATH = "/home/wanghongbo06/baipurui/CKPTs/FLUX_SR/pytorch_lora_weights_v2.safetensors"
SR_LORA_PATH = "/home/wanghongbo06/baipurui/OminiControl/runs/20260105-171922/ckpt/800/pytorch_lora_weights.safetensors"
# DPO_LORA_PATH = "/home/wanghongbo06/diffusion-dpo-adv/results/results_sobolev_20251212_2/checkpoint-400/lora_dpo/adapter_model.safetensors"
DPO_LORA_PATH = "/home/wanghongbo06/diffusion-dpo-adv/results/results_sobolev_20260107_1356/checkpoint-500/lora_dpo/adapter_model.safetensors"
# 可视化 Adversarial Sample
# ADV_LORA_PATH = '/home/wanghongbo06/diffusion-dpo/results/final_lora/adapter_model.safetensors'
# DPO_LORA_PATH = ADV_LORA_PATH
# LoRA scale(可以调整权重强度)
SR_LORA_SCALE = 1.0
DPO_LORA_SCALE = 1.0
# 多卡配置
NUM_GPUS = 1
# 同时加载模型的最大进程数(设为1表示串行加载,避免I/O瓶颈)
MAX_CONCURRENT_LOAD = 1
# 性能测试配置
WARMUP_IMAGES = 10 # 预热图片数(不计入统计)
ENABLE_PROFILING = True # 是否启用详细性能分析
# ================================
@dataclass
class PerformanceMetrics:
"""性能指标数据类"""
gpu_id: int
inference_times: List[float] # 每张图的推理时间(不含预热)
warmup_time: float # 预热时间
peak_memory_mb: float # 显存峰值 (MB)
allocated_memory_mb: float # 实际分配显存 (MB)
reserved_memory_mb: float # 保留显存 (MB)
total_images: int # 处理的总图片数
@property
def avg_inference_time(self) -> float:
"""平均推理时间(不含预热)"""
if len(self.inference_times) == 0:
return 0.0
return np.mean(self.inference_times)
@property
def std_inference_time(self) -> float:
"""推理时间标准差"""
if len(self.inference_times) < 2:
return 0.0
return np.std(self.inference_times)
@property
def throughput(self) -> float:
"""吞吐量(图片/秒)"""
if len(self.inference_times) == 0:
return 0.0
total_time = sum(self.inference_times)
return len(self.inference_times) / total_time if total_time > 0 else 0.0
@property
def memory_efficiency(self) -> float:
"""显存效率 = 实际分配 / 保留显存"""
if self.reserved_memory_mb == 0:
return 0.0
return self.allocated_memory_mb / self.reserved_memory_mb * 100
def estimate_model_flops(pipe, height=512, width=512, num_inference_steps=28):
"""
修正后的 FLOPs 估算函数 (针对 Flux 架构优化)
"""
try:
from fvcore.nn import FlopCountAnalysis, flop_count_str
# 1. 获取正确的维度信息
transformer = pipe.transformer
config = transformer.config
# Flux 特定的维度参数
num_heads = config.num_attention_heads
head_dim = config.attention_head_dim
hidden_size = num_heads * head_dim # 通常是 3072
# 2. 计算 Latent 空间的分辨率
# Flux 使用的 VAE 通常由 8x 下采样,patch size 为 1 或 2
# 这里假设 input 是 latent,Sequence length = (H/16) * (W/16) * Time_ids ?
# Flux 处理 patch 后的 latent。Standard latent is H/8, W/8.
# Then patched to 2x2? Let's assume standard packed sequence length.
# 对于 512x512 图片 -> Latent 64x64 = 4096 tokens.
packed_seq_len = (height // 8) * (width // 8) // 4 # Flux patch_size=2 implies /2 on each dim?
# 更安全的做法:直接取 4096 (针对 512x512) 或根据实际 latent 形状
# Flux 的 latent 是 H/8, W/8. Flatten 后是 4096.
seq_len = (height // 8) * (width // 8)
print(f"DEBUG: Estimating with Hidden Size: {hidden_size}, Seq Len: {seq_len}")
device = next(transformer.parameters()).device
dtype = next(transformer.parameters()).dtype
# 3. 构造正确维度的 Dummy Inputs
# 注意:Flux forward 需要正确的 img_ids 和 txt_ids 才能跑通,
# 为了避免构造复杂的 IDs 导致报错,我们这里只针对主要的 Linear 层进行 Hook,
# 或者尝试构造尽可能真实的输入。
dummy_hidden_states = torch.randn(1, seq_len, hidden_size, device=device, dtype=dtype)
# Encoder hidden states (T5/CLIP text embeddings)
# Flux text context length is usually 512
dummy_encoder_hidden_states = torch.randn(1, 512, hidden_size, device=device, dtype=dtype)
# Pooled projections
dummy_pooled = torch.randn(1, 768, device=device, dtype=dtype)
# Timestep
dummy_timestep = torch.tensor([500], device=device, dtype=dtype) # half precision
# Flux 需要 img_ids 来计算 RoPE,如果传 None 可能会报错或跳过计算
# 这里尝试只传必要的 args。如果 fvcore 报错,可能需要手动计算 Linear 层的 flops
inputs = (
dummy_hidden_states,
dummy_encoder_hidden_states,
dummy_pooled,
dummy_timestep,
# img_ids, txt_ids, guidance 通常可以为 None 或跳过,取决于具体实现
# 如果报错,需要补全这些参数
)
# 4. 运行分析
# 忽略未调用的参数警告
flops_analysis = FlopCountAnalysis(transformer, inputs)
# 强制忽略未使用的算子警告
flops_analysis.unsupported_ops_warnings(False)
single_forward_flops = flops_analysis.total()
# 5. 加上 VAE 的估算 (粗略估算,通常 VAE 约占总量的 5-10% 或更少,但在 SR 中不能完全忽略)
# 这里为了保守,只算 Transformer,但在报告中注明 "Transformer Only"
total_flops = single_forward_flops * num_inference_steps
print(f"DEBUG: Single step FLOPs: {single_forward_flops/1e12:.4f} TFLOPs")
return total_flops, "fvcore (Transformer Only)"
except Exception as e:
print(f"fvcore FLOPs 估算失败: {e}")
# 回退到理论计算 (Theoretical Calculation for Transformer)
# Kaplan Scaling Laws approx: 6 * N * D_model^2 * Seq_len ???
# 这里的备用方案应该更科学一点
# 简单的 Transformer FLOPs 理论公式:
# FLOPs per token ≈ 72 * (d_model ^ 2) (包含 attention 和 FFN) ?
# 更准确的近似:
# FLOPs = 24 * B * S * H^2 + 4 * B * S^2 * H (Attention + FFN)
try:
config = pipe.transformer.config
H = config.num_attention_heads * config.attention_head_dim
L = config.num_layers
S = (height // 8) * (width // 8)
# 这是一个非常粗略的 Transformer 理论计算
# 1. Linear Layers (Q,K,V, Out, MLP up, MLP down)
# 每一层通常有 4个投影 (Attn) + 3个投影 (MLP)? Flux 是 MMDiT 结构更复杂
# 保守估计:每层参数量 P_layer. FLOPs ≈ 2 * P_layer * S
total_params = sum(p.numel() for p in pipe.transformer.parameters())
# Transformer FLOPs ≈ 2 * Params * Sequence_Length
theoretical_flops = 2 * total_params * S * num_inference_steps
return theoretical_flops, "Theoretical (2*Params*SeqLen)"
except:
return 0, "failed"
def profile_single_inference(pipe, image, prompt, condition, device):
"""
对单次推理进行详细的性能分析
"""
# 确保在正确的设备上操作
device_id = int(device.split(':')[1]) if isinstance(device, str) else device
torch.cuda.reset_peak_memory_stats(device)
# 使用 with torch.cuda.device 确保事件在正确的设备上创建
with torch.cuda.device(device_id):
torch.cuda.synchronize()
# 使用 time.perf_counter 作为更可靠的计时方式(多GPU兼容)
start_time = time.perf_counter()
result_img = generate(
pipe,
prompt=prompt,
conditions=[condition],
default_lora=True,
).images[0]
torch.cuda.synchronize()
end_time = time.perf_counter()
inference_time = end_time - start_time
# 获取显存信息
peak_memory = torch.cuda.max_memory_allocated(device) / (1024 ** 2) # MB
allocated_memory = torch.cuda.memory_allocated(device) / (1024 ** 2) # MB
reserved_memory = torch.cuda.memory_reserved(device) / (1024 ** 2) # MB
return result_img, inference_time, peak_memory, allocated_memory, reserved_memory
def load_pipeline(gpu_id, load_semaphore=None):
"""在指定 GPU 上加载 pipeline,使用信号量控制并发加载"""
device = f"cuda:{gpu_id}"
# 显式设置当前进程使用的 GPU
torch.cuda.set_device(gpu_id)
# 使用信号量控制同时加载模型的进程数
if load_semaphore is not None:
load_semaphore.acquire()
try:
print(f"[GPU {gpu_id}] 开始加载模型...")
load_start = time.time()
pipe = FluxPipeline.from_pretrained(
'/home/wanghongbo06/baipurui/.cache/huggingface/hub/models--black-forest-labs--FLUX.1-dev/snapshots/3de623fc3c33e44ffbe2bad470d0f45bccf2eb21',
torch_dtype=torch.bfloat16,
token="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",
local_files_on=True,
catch_dir=".cache/flux-sr"
).to(device)
# 加载 LoRA
pipe.load_lora_weights(SR_LORA_PATH, adapter_name="sr")
pipe.load_lora_weights(DPO_LORA_PATH, adapter_name="dpo")
pipe.set_adapters(["sr", "dpo"], adapter_weights=[SR_LORA_SCALE, DPO_LORA_SCALE])
load_time = time.time() - load_start
print(f"[GPU {gpu_id}] 模型加载完成,耗时 {load_time:.1f}s")
finally:
if load_semaphore is not None:
load_semaphore.release()
return pipe
def process_images(gpu_id, image_list, output_folder, load_semaphore, ready_event, start_barrier, metrics_dict=None):
"""
单个 GPU 上的处理函数
Args:
gpu_id: GPU 编号
image_list: 该 GPU 需要处理的图片文件名列表
output_folder: 输出目录
load_semaphore: 控制模型加载并发的信号量
ready_event: 通知主进程模型已加载完成
start_barrier: 同步所有进程开始推理
metrics_dict: 用于存储性能指标的共享字典
"""
try:
if len(image_list) == 0:
ready_event.set()
start_barrier.wait()
return
device = f"cuda:{gpu_id}"
# 显式设置当前进程使用的 GPU(在子进程开始时设置)
torch.cuda.set_device(gpu_id)
# 加载模型到指定 GPU(通过信号量控制并发)
pipe = load_pipeline(gpu_id, load_semaphore)
# 通知主进程该GPU模型已加载完成
ready_event.set()
# 等待所有GPU都加载完成后再开始推理
start_barrier.wait()
print(f"[GPU {gpu_id}] 开始处理 {len(image_list)} 张图片")
prompt = ""
# 性能统计变量
inference_times = []
warmup_time = 0.0
peak_memory_mb = 0.0
allocated_memory_mb = 0.0
reserved_memory_mb = 0.0
# 重置显存统计
torch.cuda.reset_peak_memory_stats(device)
# 只在 GPU 0 上显示主进度条
pbar = tqdm(
enumerate(image_list),
total=len(image_list),
desc=f"GPU {gpu_id}",
position=gpu_id,
leave=True,
ncols=120,
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]'
)
for idx, filename in pbar:
image_path = os.path.join(input_folder, filename)
image = Image.open(image_path).convert("RGB")
# 居中裁剪 + resize 到 512x512
w, h = image.size
min_dim = min(w, h)
image = image.crop(
((w - min_dim) // 2, (h - min_dim) // 2, (w + min_dim) // 2, (h + min_dim) // 2)
).resize((512, 512), Image.BICUBIC)
# 构造条件并生成图像
condition = Condition("sr", image)
seed_everything(1)
# 使用精确计时进行推理
result_img, inf_time, peak_mem, alloc_mem, reserved_mem = profile_single_inference(
pipe, image, prompt, condition, device
)
# 更新显存峰值
peak_memory_mb = max(peak_memory_mb, peak_mem)
allocated_memory_mb = alloc_mem
reserved_memory_mb = reserved_mem
# 区分预热和正式推理
if idx < WARMUP_IMAGES:
warmup_time += inf_time
pbar.set_postfix({
'warmup': f'{inf_time:.2f}s',
'mem': f'{peak_mem:.0f}MB'
})
else:
inference_times.append(inf_time)
avg_time = np.mean(inference_times)
pbar.set_postfix({
'time': f'{inf_time:.2f}s',
'avg': f'{avg_time:.2f}s',
'mem': f'{peak_mem:.0f}MB'
})
result_img = adain_color_fix(result_img, image)
result_img.save(os.path.join(output_folder, filename))
# 获取最终显存统计
final_peak_memory = torch.cuda.max_memory_allocated(device) / (1024 ** 2)
final_allocated = torch.cuda.memory_allocated(device) / (1024 ** 2)
final_reserved = torch.cuda.memory_reserved(device) / (1024 ** 2)
# 创建性能指标对象
metrics = PerformanceMetrics(
gpu_id=gpu_id,
inference_times=inference_times,
warmup_time=warmup_time,
peak_memory_mb=max(peak_memory_mb, final_peak_memory),
allocated_memory_mb=final_allocated,
reserved_memory_mb=final_reserved,
total_images=len(image_list)
)
# 存储到共享字典
if metrics_dict is not None:
metrics_dict[gpu_id] = {
'inference_times': inference_times,
'warmup_time': warmup_time,
'peak_memory_mb': metrics.peak_memory_mb,
'allocated_memory_mb': metrics.allocated_memory_mb,
'reserved_memory_mb': metrics.reserved_memory_mb,
'total_images': len(image_list),
'avg_inference_time': metrics.avg_inference_time,
'std_inference_time': metrics.std_inference_time,
'throughput': metrics.throughput,
'memory_efficiency': metrics.memory_efficiency
}
# 打印单 GPU 性能摘要
print(f"\n[GPU {gpu_id}] ✅ 完成!")
print(f" 📊 性能摘要:")
print(f" - 处理图片数: {len(image_list)} (预热: {WARMUP_IMAGES}, 统计: {len(inference_times)})")
print(f" - 预热时间: {warmup_time:.2f}s")
print(f" - 平均推理时间: {metrics.avg_inference_time:.3f}s ± {metrics.std_inference_time:.3f}s")
print(f" - 吞吐量: {metrics.throughput:.2f} 图/秒")
print(f" - 显存峰值: {metrics.peak_memory_mb:.1f} MB")
print(f" - 显存效率: {metrics.memory_efficiency:.1f}%")
except Exception as e:
print(f"\n[GPU {gpu_id}] ❌ 错误: {e}")
import traceback
traceback.print_exc()
# 确保事件被设置,避免死锁
ready_event.set()
raise
def print_performance_report(metrics_dict: Dict[int, Dict], load_time: float, total_time: float, total_images: int):
"""
打印详细的性能报告
"""
print("\n" + "=" * 70)
print(" 📊 详细性能报告")
print("=" * 70)
# 汇总所有 GPU 的数据
all_inference_times = []
total_warmup_time = 0.0
max_peak_memory = 0.0
total_allocated_memory = 0.0
total_reserved_memory = 0.0
for gpu_id, metrics in sorted(metrics_dict.items()):
all_inference_times.extend(metrics['inference_times'])
total_warmup_time += metrics['warmup_time']
max_peak_memory = max(max_peak_memory, metrics['peak_memory_mb'])
total_allocated_memory += metrics['allocated_memory_mb']
total_reserved_memory += metrics['reserved_memory_mb']
# ============== 1. 推理时间统计 ==============
print("\n🕐 推理时间统计:")
print("-" * 50)
if len(all_inference_times) > 0:
avg_time = np.mean(all_inference_times)
std_time = np.std(all_inference_times)
min_time = np.min(all_inference_times)
max_time = np.max(all_inference_times)
median_time = np.median(all_inference_times)
p95_time = np.percentile(all_inference_times, 95)
p99_time = np.percentile(all_inference_times, 99)
print(f" 统计图片数: {len(all_inference_times)} (排除预热 {WARMUP_IMAGES * len(metrics_dict)} 张)")
print(f" 平均推理时间: {avg_time:.4f} 秒/张")
print(f" 标准差: {std_time:.4f} 秒")
print(f" 最小值: {min_time:.4f} 秒")
print(f" 最大值: {max_time:.4f} 秒")
print(f" 中位数: {median_time:.4f} 秒")
print(f" P95: {p95_time:.4f} 秒")
print(f" P99: {p99_time:.4f} 秒")
print(f" 预热总时间: {total_warmup_time:.2f} 秒")
else:
print(" ⚠️ 没有有效的推理时间数据")
# ============== 2. 吞吐量统计 ==============
print("\n⚡ 吞吐量 (Throughput):")
print("-" * 50)
if len(all_inference_times) > 0:
total_inference_time = sum(all_inference_times)
throughput_per_sec = len(all_inference_times) / total_inference_time if total_inference_time > 0 else 0
throughput_per_min = throughput_per_sec * 60
throughput_per_hour = throughput_per_sec * 3600
# 多卡并行吞吐量(wall-clock time)
inference_wall_time = total_time - load_time
parallel_throughput_sec = total_images / inference_wall_time if inference_wall_time > 0 else 0
parallel_throughput_min = parallel_throughput_sec * 60
print(f" 单 GPU 吞吐量:")
print(f" - {throughput_per_sec:.3f} 图/秒")
print(f" - {throughput_per_min:.1f} 图/分钟")
print(f" - {throughput_per_hour:.0f} 图/小时")
print(f" {len(metrics_dict)} GPU 并行吞吐量 (wall-clock):")
print(f" - {parallel_throughput_sec:.3f} 图/秒")
print(f" - {parallel_throughput_min:.1f} 图/分钟")
# ============== 3. 显存统计 ==============
print("\n💾 显存 (GPU Memory):")
print("-" * 50)
for gpu_id, metrics in sorted(metrics_dict.items()):
print(f" GPU {gpu_id}:")
print(f" - 显存峰值: {metrics['peak_memory_mb']:.1f} MB ({metrics['peak_memory_mb']/1024:.2f} GB)")
print(f" - 实际分配: {metrics['allocated_memory_mb']:.1f} MB")
print(f" - 保留显存: {metrics['reserved_memory_mb']:.1f} MB")
print(f" - 显存效率: {metrics['memory_efficiency']:.1f}%")
if len(metrics_dict) > 1:
print(f" 汇总:")
print(f" - 最大显存峰值: {max_peak_memory:.1f} MB ({max_peak_memory/1024:.2f} GB)")
print(f" - 总分配显存: {total_allocated_memory:.1f} MB")
# ============== 4. FLOPs 估算 ==============
print("\n🔢 计算量 (FLOPs) - 估算:")
print("-" * 50)
print(" ⚠️ FLOPs 估算需要在单 GPU 模式下单独运行")
print(" 💡 提示: 设置 NUM_GPUS=1 并运行 estimate_flops_standalone() 获取准确值")
# ============== 5. 时间分解 ==============
print("\n⏱️ 时间分解:")
print("-" * 50)
inference_time = total_time - load_time
print(f" 模型加载时间: {load_time:.1f} 秒 ({load_time/total_time*100:.1f}%)")
print(f" 推理时间: {inference_time:.1f} 秒 ({inference_time/total_time*100:.1f}%)")
print(f" 总时间: {total_time:.1f} 秒")
# ============== 6. 汇总 ==============
print("\n" + "=" * 70)
print(" 📈 性能汇总")
print("=" * 70)
if len(all_inference_times) > 0:
avg_time = np.mean(all_inference_times)
print(f"""
┌─────────────────────────────────────────────────────────────────┐
│ 指标 │ 值 │
├─────────────────────────────────────────────────────────────────┤
│ 平均推理时间 (不含预热) │ {avg_time:.4f} 秒/张 │
│ 吞吐量 (单GPU) │ {throughput_per_sec:.3f} 图/秒 │
│ 吞吐量 ({len(metrics_dict)}GPU 并行) │ {parallel_throughput_sec:.3f} 图/秒 │
│ 显存峰值 │ {max_peak_memory:.1f} MB ({max_peak_memory/1024:.2f} GB) │
│ 总处理图片 │ {total_images} 张 │
└─────────────────────────────────────────────────────────────────┘
""")
print("=" * 70)
def estimate_flops_standalone():
"""
独立运行的 FLOPs 估算函数
需要在单 GPU 上运行
"""
print("=" * 60)
print("🔢 正在估算模型 FLOPs...")
print("=" * 60)
device = "cuda:0"
# 加载模型
print("加载模型中...")
pipe = FluxPipeline.from_pretrained(
'/home/wanghongbo06/baipurui/.cache/huggingface/hub/models--black-forest-labs--FLUX.1-dev/snapshots/3de623fc3c33e44ffbe2bad470d0f45bccf2eb21',
torch_dtype=torch.bfloat16,
token="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",
local_files_on=True,
catch_dir=".cache/flux-sr"
).to(device)
pipe.load_lora_weights(SR_LORA_PATH, adapter_name="sr")
pipe.load_lora_weights(DPO_LORA_PATH, adapter_name="dpo")
pipe.set_adapters(["sr", "dpo"], adapter_weights=[SR_LORA_SCALE, DPO_LORA_SCALE])
# 估算 FLOPs
flops, method = estimate_model_flops(pipe)
if flops > 0:
print(f"\n📊 FLOPs 估算结果 (方法: {method}):")
print(f" - 每次推理 FLOPs: {flops:.2e}")
print(f" - 每次推理 TFLOPs: {flops / 1e12:.2f}")
# 如果有推理时间,可以计算 FLOPS (每秒浮点运算数)
# FLOPS = FLOPs / inference_time
else:
print("❌ FLOPs 估算失败")
return flops
def save_metrics_to_json(metrics_dict: Dict, output_path: str, load_time: float, total_time: float, total_images: int):
"""
将性能指标保存到 JSON 文件
"""
import json
# 计算汇总指标
all_times = []
for gpu_id, m in metrics_dict.items():
all_times.extend(m['inference_times'])
inference_wall_time = total_time - load_time
summary = {
'avg_inference_time_sec': float(np.mean(all_times)) if all_times else 0,
'std_inference_time_sec': float(np.std(all_times)) if all_times else 0,
'min_inference_time_sec': float(np.min(all_times)) if all_times else 0,
'max_inference_time_sec': float(np.max(all_times)) if all_times else 0,
'median_inference_time_sec': float(np.median(all_times)) if all_times else 0,
'p95_inference_time_sec': float(np.percentile(all_times, 95)) if all_times else 0,
'p99_inference_time_sec': float(np.percentile(all_times, 99)) if all_times else 0,
'throughput_single_gpu_per_sec': float(len(all_times) / sum(all_times)) if all_times and sum(all_times) > 0 else 0,
'throughput_parallel_per_sec': float(total_images / inference_wall_time) if inference_wall_time > 0 else 0,
'peak_memory_mb': max([m['peak_memory_mb'] for m in metrics_dict.values()]) if metrics_dict else 0,
'peak_memory_gb': max([m['peak_memory_mb'] for m in metrics_dict.values()]) / 1024 if metrics_dict else 0,
'total_images': total_images,
'warmup_images': WARMUP_IMAGES * len(metrics_dict),
'measured_images': len(all_times),
'model_load_time_sec': load_time,
'inference_wall_time_sec': inference_wall_time,
'total_time_sec': total_time,
'num_gpus': len(metrics_dict),
}
result = {
'summary': summary,
'per_gpu_metrics': {str(k): v for k, v in metrics_dict.items()}
}
with open(output_path, 'w') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
print(f"📄 性能指标已保存到: {output_path}")
def main(save_metrics_path: str = None):
"""
主函数
Args:
save_metrics_path: 可选,保存性能指标的 JSON 文件路径
"""
os.makedirs(output_folder, exist_ok=True)
# 获取所有待处理的图片
all_images = sorted([
f for f in os.listdir(input_folder)
if f.lower().endswith((".png", ".jpg", ".jpeg", ".bmp", ".webp"))
])
total_images = len(all_images)
print("=" * 70)
print(" 🚀 Diffusion 超分性能测试")
print("=" * 70)
print(f"📁 输入目录: {input_folder}")
print(f"📁 输出目录: {output_folder}")
print(f"🖼️ 总图片数: {total_images}")
print(f"🎮 GPU 数量: {NUM_GPUS}")
print(f"📦 每 GPU 处理: ~{total_images // NUM_GPUS} 张")
print(f"⚙️ 模型加载并发数: {MAX_CONCURRENT_LOAD}")
print(f"🔥 预热图片数: {WARMUP_IMAGES} (每个GPU)")
print(f"📊 性能分析: {'开启' if ENABLE_PROFILING else '关闭'}")
print("=" * 70)
# 将图片列表平均分配给各个 GPU
image_chunks = [[] for _ in range(NUM_GPUS)]
for i, img in enumerate(all_images):
image_chunks[i % NUM_GPUS].append(img)
# 记录开始时间
start_time = time.time()
# 使用多进程并行处理
mp.set_start_method('spawn', force=True)
# 创建信号量来限制同时加载模型的进程数(避免I/O瓶颈)
load_semaphore = mp.Semaphore(MAX_CONCURRENT_LOAD)
# 创建事件来追踪每个进程的模型加载状态
ready_events = [mp.Event() for _ in range(NUM_GPUS)]
# 创建屏障来同步所有进程在加载完成后开始推理
start_barrier = mp.Barrier(NUM_GPUS)
# 创建共享字典存储各 GPU 的性能指标
manager = mp.Manager()
metrics_dict = manager.dict()
processes = []
print(f"\n⏳ 开始加载模型(最多 {MAX_CONCURRENT_LOAD} 个并发,避免I/O瓶颈)...")
for gpu_id in range(NUM_GPUS):
p = mp.Process(
target=process_images,
args=(gpu_id, image_chunks[gpu_id], output_folder,
load_semaphore, ready_events[gpu_id], start_barrier, metrics_dict)
)
p.start()
processes.append(p)
# 等待所有模型加载完成
loaded_count = 0
for i, event in enumerate(ready_events):
event.wait()
loaded_count += 1
print(f" ✅ GPU {i} 就绪 ({loaded_count}/{NUM_GPUS})")
load_time = time.time() - start_time
print(f"\n⏱️ 模型加载总耗时: {load_time:.1f}s ({load_time/60:.1f} 分钟)")
print("🚀 所有模型加载完成,开始并行推理...\n")
# 等待所有进程完成
for p in processes:
p.join()
# 计算总耗时
total_time = time.time() - start_time
# 将 manager.dict 转换为普通 dict
metrics_dict_normal = dict(metrics_dict)
# 打印详细性能报告
print_performance_report(metrics_dict_normal, load_time, total_time, total_images)
# 保存性能指标到文件
if save_metrics_path:
save_metrics_to_json(metrics_dict_normal, save_metrics_path, load_time, total_time, total_images)
print(f"\n📁 结果保存在: {output_folder}")
print("=" * 70)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Diffusion 超分性能测试')
parser.add_argument('--mode', type=str, default='benchmark', choices=['benchmark', 'flops'],
help='运行模式: benchmark (默认) 或 flops (仅估算FLOPs)')
parser.add_argument('--save-metrics', type=str, default=None,
help='保存性能指标到 JSON 文件的路径')
parser.add_argument('--num-gpus', type=int, default=None,
help='使用的 GPU 数量 (覆盖默认值)')
parser.add_argument('--warmup', type=int, default=None,
help='预热图片数量 (覆盖默认值)')
args = parser.parse_args()
# 覆盖配置 (使用 global)
if args.num_gpus is not None:
NUM_GPUS = args.num_gpus
if args.warmup is not None:
WARMUP_IMAGES = args.warmup
if args.mode == 'flops':
# 仅估算 FLOPs
estimate_flops_standalone()
else:
# 运行完整的 benchmark
main(save_metrics_path=args.save_metrics)
# pyiqa psnr ssim lpips musiq clipiqa+ --target /home/wanghongbo06/diffusion-dpo-test/DIV2K-val/sobolev-400 --r /home/wanghongbo06/baipurui/DATA/DIV2K-val/gt