import os os.environ["HF_HOME"] = "/home/wanghongbo06/.cache/huggingface" import torch import torch.multiprocessing as mp from diffusers.pipelines import FluxPipeline from src.flux.condition import Condition from src.flux.generate import generate, seed_everything from color_fix import wavelet_color_fix, adain_color_fix from PIL import Image from tqdm import tqdm import time import numpy as np from dataclasses import dataclass from typing import List, Dict, Any # ============== 配置 ============== input_folder = "/home/wanghongbo06/baipudui/DATA/DIV2K/DIV2K-val-epoch1/lr" output_folder = "/home/wanghongbo06/baipurui/results/flops" # LoRA 路径 # SR_LORA_PATH = "/home/wanghongbo06/baipurui/CKPTs/FLUX_SR/pytorch_lora_weights_v2.safetensors" SR_LORA_PATH = "/home/wanghongbo06/baipurui/OminiControl/runs/20260105-171922/ckpt/800/pytorch_lora_weights.safetensors" # DPO_LORA_PATH = "/home/wanghongbo06/diffusion-dpo-adv/results/results_sobolev_20251212_2/checkpoint-400/lora_dpo/adapter_model.safetensors" DPO_LORA_PATH = "/home/wanghongbo06/diffusion-dpo-adv/results/results_sobolev_20260107_1356/checkpoint-500/lora_dpo/adapter_model.safetensors" # 可视化 Adversarial Sample # ADV_LORA_PATH = '/home/wanghongbo06/diffusion-dpo/results/final_lora/adapter_model.safetensors' # DPO_LORA_PATH = ADV_LORA_PATH # LoRA scale(可以调整权重强度) SR_LORA_SCALE = 1.0 DPO_LORA_SCALE = 1.0 # 多卡配置 NUM_GPUS = 1 # 同时加载模型的最大进程数(设为1表示串行加载,避免I/O瓶颈) MAX_CONCURRENT_LOAD = 1 # 性能测试配置 WARMUP_IMAGES = 10 # 预热图片数(不计入统计) ENABLE_PROFILING = True # 是否启用详细性能分析 # ================================ @dataclass class PerformanceMetrics: """性能指标数据类""" gpu_id: int inference_times: List[float] # 每张图的推理时间(不含预热) warmup_time: float # 预热时间 peak_memory_mb: float # 显存峰值 (MB) allocated_memory_mb: float # 实际分配显存 (MB) reserved_memory_mb: float # 保留显存 (MB) total_images: int # 处理的总图片数 @property def avg_inference_time(self) -> float: """平均推理时间(不含预热)""" if len(self.inference_times) == 0: return 0.0 return np.mean(self.inference_times) @property def std_inference_time(self) -> float: """推理时间标准差""" if len(self.inference_times) < 2: return 0.0 return np.std(self.inference_times) @property def throughput(self) -> float: """吞吐量(图片/秒)""" if len(self.inference_times) == 0: return 0.0 total_time = sum(self.inference_times) return len(self.inference_times) / total_time if total_time > 0 else 0.0 @property def memory_efficiency(self) -> float: """显存效率 = 实际分配 / 保留显存""" if self.reserved_memory_mb == 0: return 0.0 return self.allocated_memory_mb / self.reserved_memory_mb * 100 def estimate_model_flops(pipe, height=512, width=512, num_inference_steps=28): """ 修正后的 FLOPs 估算函数 (针对 Flux 架构优化) """ try: from fvcore.nn import FlopCountAnalysis, flop_count_str # 1. 获取正确的维度信息 transformer = pipe.transformer config = transformer.config # Flux 特定的维度参数 num_heads = config.num_attention_heads head_dim = config.attention_head_dim hidden_size = num_heads * head_dim # 通常是 3072 # 2. 计算 Latent 空间的分辨率 # Flux 使用的 VAE 通常由 8x 下采样,patch size 为 1 或 2 # 这里假设 input 是 latent,Sequence length = (H/16) * (W/16) * Time_ids ? # Flux 处理 patch 后的 latent。Standard latent is H/8, W/8. # Then patched to 2x2? Let's assume standard packed sequence length. # 对于 512x512 图片 -> Latent 64x64 = 4096 tokens. packed_seq_len = (height // 8) * (width // 8) // 4 # Flux patch_size=2 implies /2 on each dim? # 更安全的做法:直接取 4096 (针对 512x512) 或根据实际 latent 形状 # Flux 的 latent 是 H/8, W/8. Flatten 后是 4096. seq_len = (height // 8) * (width // 8) print(f"DEBUG: Estimating with Hidden Size: {hidden_size}, Seq Len: {seq_len}") device = next(transformer.parameters()).device dtype = next(transformer.parameters()).dtype # 3. 构造正确维度的 Dummy Inputs # 注意:Flux forward 需要正确的 img_ids 和 txt_ids 才能跑通, # 为了避免构造复杂的 IDs 导致报错,我们这里只针对主要的 Linear 层进行 Hook, # 或者尝试构造尽可能真实的输入。 dummy_hidden_states = torch.randn(1, seq_len, hidden_size, device=device, dtype=dtype) # Encoder hidden states (T5/CLIP text embeddings) # Flux text context length is usually 512 dummy_encoder_hidden_states = torch.randn(1, 512, hidden_size, device=device, dtype=dtype) # Pooled projections dummy_pooled = torch.randn(1, 768, device=device, dtype=dtype) # Timestep dummy_timestep = torch.tensor([500], device=device, dtype=dtype) # half precision # Flux 需要 img_ids 来计算 RoPE,如果传 None 可能会报错或跳过计算 # 这里尝试只传必要的 args。如果 fvcore 报错,可能需要手动计算 Linear 层的 flops inputs = ( dummy_hidden_states, dummy_encoder_hidden_states, dummy_pooled, dummy_timestep, # img_ids, txt_ids, guidance 通常可以为 None 或跳过,取决于具体实现 # 如果报错,需要补全这些参数 ) # 4. 运行分析 # 忽略未调用的参数警告 flops_analysis = FlopCountAnalysis(transformer, inputs) # 强制忽略未使用的算子警告 flops_analysis.unsupported_ops_warnings(False) single_forward_flops = flops_analysis.total() # 5. 加上 VAE 的估算 (粗略估算,通常 VAE 约占总量的 5-10% 或更少,但在 SR 中不能完全忽略) # 这里为了保守,只算 Transformer,但在报告中注明 "Transformer Only" total_flops = single_forward_flops * num_inference_steps print(f"DEBUG: Single step FLOPs: {single_forward_flops/1e12:.4f} TFLOPs") return total_flops, "fvcore (Transformer Only)" except Exception as e: print(f"fvcore FLOPs 估算失败: {e}") # 回退到理论计算 (Theoretical Calculation for Transformer) # Kaplan Scaling Laws approx: 6 * N * D_model^2 * Seq_len ??? # 这里的备用方案应该更科学一点 # 简单的 Transformer FLOPs 理论公式: # FLOPs per token ≈ 72 * (d_model ^ 2) (包含 attention 和 FFN) ? # 更准确的近似: # FLOPs = 24 * B * S * H^2 + 4 * B * S^2 * H (Attention + FFN) try: config = pipe.transformer.config H = config.num_attention_heads * config.attention_head_dim L = config.num_layers S = (height // 8) * (width // 8) # 这是一个非常粗略的 Transformer 理论计算 # 1. Linear Layers (Q,K,V, Out, MLP up, MLP down) # 每一层通常有 4个投影 (Attn) + 3个投影 (MLP)? Flux 是 MMDiT 结构更复杂 # 保守估计:每层参数量 P_layer. FLOPs ≈ 2 * P_layer * S total_params = sum(p.numel() for p in pipe.transformer.parameters()) # Transformer FLOPs ≈ 2 * Params * Sequence_Length theoretical_flops = 2 * total_params * S * num_inference_steps return theoretical_flops, "Theoretical (2*Params*SeqLen)" except: return 0, "failed" def profile_single_inference(pipe, image, prompt, condition, device): """ 对单次推理进行详细的性能分析 """ # 确保在正确的设备上操作 device_id = int(device.split(':')[1]) if isinstance(device, str) else device torch.cuda.reset_peak_memory_stats(device) # 使用 with torch.cuda.device 确保事件在正确的设备上创建 with torch.cuda.device(device_id): torch.cuda.synchronize() # 使用 time.perf_counter 作为更可靠的计时方式(多GPU兼容) start_time = time.perf_counter() result_img = generate( pipe, prompt=prompt, conditions=[condition], default_lora=True, ).images[0] torch.cuda.synchronize() end_time = time.perf_counter() inference_time = end_time - start_time # 获取显存信息 peak_memory = torch.cuda.max_memory_allocated(device) / (1024 ** 2) # MB allocated_memory = torch.cuda.memory_allocated(device) / (1024 ** 2) # MB reserved_memory = torch.cuda.memory_reserved(device) / (1024 ** 2) # MB return result_img, inference_time, peak_memory, allocated_memory, reserved_memory def load_pipeline(gpu_id, load_semaphore=None): """在指定 GPU 上加载 pipeline,使用信号量控制并发加载""" device = f"cuda:{gpu_id}" # 显式设置当前进程使用的 GPU torch.cuda.set_device(gpu_id) # 使用信号量控制同时加载模型的进程数 if load_semaphore is not None: load_semaphore.acquire() try: print(f"[GPU {gpu_id}] 开始加载模型...") load_start = time.time() pipe = FluxPipeline.from_pretrained( '/home/wanghongbo06/baipurui/.cache/huggingface/hub/models--black-forest-labs--FLUX.1-dev/snapshots/3de623fc3c33e44ffbe2bad470d0f45bccf2eb21', torch_dtype=torch.bfloat16, token="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", local_files_on=True, catch_dir=".cache/flux-sr" ).to(device) # 加载 LoRA pipe.load_lora_weights(SR_LORA_PATH, adapter_name="sr") pipe.load_lora_weights(DPO_LORA_PATH, adapter_name="dpo") pipe.set_adapters(["sr", "dpo"], adapter_weights=[SR_LORA_SCALE, DPO_LORA_SCALE]) load_time = time.time() - load_start print(f"[GPU {gpu_id}] 模型加载完成,耗时 {load_time:.1f}s") finally: if load_semaphore is not None: load_semaphore.release() return pipe def process_images(gpu_id, image_list, output_folder, load_semaphore, ready_event, start_barrier, metrics_dict=None): """ 单个 GPU 上的处理函数 Args: gpu_id: GPU 编号 image_list: 该 GPU 需要处理的图片文件名列表 output_folder: 输出目录 load_semaphore: 控制模型加载并发的信号量 ready_event: 通知主进程模型已加载完成 start_barrier: 同步所有进程开始推理 metrics_dict: 用于存储性能指标的共享字典 """ try: if len(image_list) == 0: ready_event.set() start_barrier.wait() return device = f"cuda:{gpu_id}" # 显式设置当前进程使用的 GPU(在子进程开始时设置) torch.cuda.set_device(gpu_id) # 加载模型到指定 GPU(通过信号量控制并发) pipe = load_pipeline(gpu_id, load_semaphore) # 通知主进程该GPU模型已加载完成 ready_event.set() # 等待所有GPU都加载完成后再开始推理 start_barrier.wait() print(f"[GPU {gpu_id}] 开始处理 {len(image_list)} 张图片") prompt = "" # 性能统计变量 inference_times = [] warmup_time = 0.0 peak_memory_mb = 0.0 allocated_memory_mb = 0.0 reserved_memory_mb = 0.0 # 重置显存统计 torch.cuda.reset_peak_memory_stats(device) # 只在 GPU 0 上显示主进度条 pbar = tqdm( enumerate(image_list), total=len(image_list), desc=f"GPU {gpu_id}", position=gpu_id, leave=True, ncols=120, bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]' ) for idx, filename in pbar: image_path = os.path.join(input_folder, filename) image = Image.open(image_path).convert("RGB") # 居中裁剪 + resize 到 512x512 w, h = image.size min_dim = min(w, h) image = image.crop( ((w - min_dim) // 2, (h - min_dim) // 2, (w + min_dim) // 2, (h + min_dim) // 2) ).resize((512, 512), Image.BICUBIC) # 构造条件并生成图像 condition = Condition("sr", image) seed_everything(1) # 使用精确计时进行推理 result_img, inf_time, peak_mem, alloc_mem, reserved_mem = profile_single_inference( pipe, image, prompt, condition, device ) # 更新显存峰值 peak_memory_mb = max(peak_memory_mb, peak_mem) allocated_memory_mb = alloc_mem reserved_memory_mb = reserved_mem # 区分预热和正式推理 if idx < WARMUP_IMAGES: warmup_time += inf_time pbar.set_postfix({ 'warmup': f'{inf_time:.2f}s', 'mem': f'{peak_mem:.0f}MB' }) else: inference_times.append(inf_time) avg_time = np.mean(inference_times) pbar.set_postfix({ 'time': f'{inf_time:.2f}s', 'avg': f'{avg_time:.2f}s', 'mem': f'{peak_mem:.0f}MB' }) result_img = adain_color_fix(result_img, image) result_img.save(os.path.join(output_folder, filename)) # 获取最终显存统计 final_peak_memory = torch.cuda.max_memory_allocated(device) / (1024 ** 2) final_allocated = torch.cuda.memory_allocated(device) / (1024 ** 2) final_reserved = torch.cuda.memory_reserved(device) / (1024 ** 2) # 创建性能指标对象 metrics = PerformanceMetrics( gpu_id=gpu_id, inference_times=inference_times, warmup_time=warmup_time, peak_memory_mb=max(peak_memory_mb, final_peak_memory), allocated_memory_mb=final_allocated, reserved_memory_mb=final_reserved, total_images=len(image_list) ) # 存储到共享字典 if metrics_dict is not None: metrics_dict[gpu_id] = { 'inference_times': inference_times, 'warmup_time': warmup_time, 'peak_memory_mb': metrics.peak_memory_mb, 'allocated_memory_mb': metrics.allocated_memory_mb, 'reserved_memory_mb': metrics.reserved_memory_mb, 'total_images': len(image_list), 'avg_inference_time': metrics.avg_inference_time, 'std_inference_time': metrics.std_inference_time, 'throughput': metrics.throughput, 'memory_efficiency': metrics.memory_efficiency } # 打印单 GPU 性能摘要 print(f"\n[GPU {gpu_id}] ✅ 完成!") print(f" 📊 性能摘要:") print(f" - 处理图片数: {len(image_list)} (预热: {WARMUP_IMAGES}, 统计: {len(inference_times)})") print(f" - 预热时间: {warmup_time:.2f}s") print(f" - 平均推理时间: {metrics.avg_inference_time:.3f}s ± {metrics.std_inference_time:.3f}s") print(f" - 吞吐量: {metrics.throughput:.2f} 图/秒") print(f" - 显存峰值: {metrics.peak_memory_mb:.1f} MB") print(f" - 显存效率: {metrics.memory_efficiency:.1f}%") except Exception as e: print(f"\n[GPU {gpu_id}] ❌ 错误: {e}") import traceback traceback.print_exc() # 确保事件被设置,避免死锁 ready_event.set() raise def print_performance_report(metrics_dict: Dict[int, Dict], load_time: float, total_time: float, total_images: int): """ 打印详细的性能报告 """ print("\n" + "=" * 70) print(" 📊 详细性能报告") print("=" * 70) # 汇总所有 GPU 的数据 all_inference_times = [] total_warmup_time = 0.0 max_peak_memory = 0.0 total_allocated_memory = 0.0 total_reserved_memory = 0.0 for gpu_id, metrics in sorted(metrics_dict.items()): all_inference_times.extend(metrics['inference_times']) total_warmup_time += metrics['warmup_time'] max_peak_memory = max(max_peak_memory, metrics['peak_memory_mb']) total_allocated_memory += metrics['allocated_memory_mb'] total_reserved_memory += metrics['reserved_memory_mb'] # ============== 1. 推理时间统计 ============== print("\n🕐 推理时间统计:") print("-" * 50) if len(all_inference_times) > 0: avg_time = np.mean(all_inference_times) std_time = np.std(all_inference_times) min_time = np.min(all_inference_times) max_time = np.max(all_inference_times) median_time = np.median(all_inference_times) p95_time = np.percentile(all_inference_times, 95) p99_time = np.percentile(all_inference_times, 99) print(f" 统计图片数: {len(all_inference_times)} (排除预热 {WARMUP_IMAGES * len(metrics_dict)} 张)") print(f" 平均推理时间: {avg_time:.4f} 秒/张") print(f" 标准差: {std_time:.4f} 秒") print(f" 最小值: {min_time:.4f} 秒") print(f" 最大值: {max_time:.4f} 秒") print(f" 中位数: {median_time:.4f} 秒") print(f" P95: {p95_time:.4f} 秒") print(f" P99: {p99_time:.4f} 秒") print(f" 预热总时间: {total_warmup_time:.2f} 秒") else: print(" ⚠️ 没有有效的推理时间数据") # ============== 2. 吞吐量统计 ============== print("\n⚡ 吞吐量 (Throughput):") print("-" * 50) if len(all_inference_times) > 0: total_inference_time = sum(all_inference_times) throughput_per_sec = len(all_inference_times) / total_inference_time if total_inference_time > 0 else 0 throughput_per_min = throughput_per_sec * 60 throughput_per_hour = throughput_per_sec * 3600 # 多卡并行吞吐量(wall-clock time) inference_wall_time = total_time - load_time parallel_throughput_sec = total_images / inference_wall_time if inference_wall_time > 0 else 0 parallel_throughput_min = parallel_throughput_sec * 60 print(f" 单 GPU 吞吐量:") print(f" - {throughput_per_sec:.3f} 图/秒") print(f" - {throughput_per_min:.1f} 图/分钟") print(f" - {throughput_per_hour:.0f} 图/小时") print(f" {len(metrics_dict)} GPU 并行吞吐量 (wall-clock):") print(f" - {parallel_throughput_sec:.3f} 图/秒") print(f" - {parallel_throughput_min:.1f} 图/分钟") # ============== 3. 显存统计 ============== print("\n💾 显存 (GPU Memory):") print("-" * 50) for gpu_id, metrics in sorted(metrics_dict.items()): print(f" GPU {gpu_id}:") print(f" - 显存峰值: {metrics['peak_memory_mb']:.1f} MB ({metrics['peak_memory_mb']/1024:.2f} GB)") print(f" - 实际分配: {metrics['allocated_memory_mb']:.1f} MB") print(f" - 保留显存: {metrics['reserved_memory_mb']:.1f} MB") print(f" - 显存效率: {metrics['memory_efficiency']:.1f}%") if len(metrics_dict) > 1: print(f" 汇总:") print(f" - 最大显存峰值: {max_peak_memory:.1f} MB ({max_peak_memory/1024:.2f} GB)") print(f" - 总分配显存: {total_allocated_memory:.1f} MB") # ============== 4. FLOPs 估算 ============== print("\n🔢 计算量 (FLOPs) - 估算:") print("-" * 50) print(" ⚠️ FLOPs 估算需要在单 GPU 模式下单独运行") print(" 💡 提示: 设置 NUM_GPUS=1 并运行 estimate_flops_standalone() 获取准确值") # ============== 5. 时间分解 ============== print("\n⏱️ 时间分解:") print("-" * 50) inference_time = total_time - load_time print(f" 模型加载时间: {load_time:.1f} 秒 ({load_time/total_time*100:.1f}%)") print(f" 推理时间: {inference_time:.1f} 秒 ({inference_time/total_time*100:.1f}%)") print(f" 总时间: {total_time:.1f} 秒") # ============== 6. 汇总 ============== print("\n" + "=" * 70) print(" 📈 性能汇总") print("=" * 70) if len(all_inference_times) > 0: avg_time = np.mean(all_inference_times) print(f""" ┌─────────────────────────────────────────────────────────────────┐ │ 指标 │ 值 │ ├─────────────────────────────────────────────────────────────────┤ │ 平均推理时间 (不含预热) │ {avg_time:.4f} 秒/张 │ │ 吞吐量 (单GPU) │ {throughput_per_sec:.3f} 图/秒 │ │ 吞吐量 ({len(metrics_dict)}GPU 并行) │ {parallel_throughput_sec:.3f} 图/秒 │ │ 显存峰值 │ {max_peak_memory:.1f} MB ({max_peak_memory/1024:.2f} GB) │ │ 总处理图片 │ {total_images} 张 │ └─────────────────────────────────────────────────────────────────┘ """) print("=" * 70) def estimate_flops_standalone(): """ 独立运行的 FLOPs 估算函数 需要在单 GPU 上运行 """ print("=" * 60) print("🔢 正在估算模型 FLOPs...") print("=" * 60) device = "cuda:0" # 加载模型 print("加载模型中...") pipe = FluxPipeline.from_pretrained( '/home/wanghongbo06/baipurui/.cache/huggingface/hub/models--black-forest-labs--FLUX.1-dev/snapshots/3de623fc3c33e44ffbe2bad470d0f45bccf2eb21', torch_dtype=torch.bfloat16, token="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", local_files_on=True, catch_dir=".cache/flux-sr" ).to(device) pipe.load_lora_weights(SR_LORA_PATH, adapter_name="sr") pipe.load_lora_weights(DPO_LORA_PATH, adapter_name="dpo") pipe.set_adapters(["sr", "dpo"], adapter_weights=[SR_LORA_SCALE, DPO_LORA_SCALE]) # 估算 FLOPs flops, method = estimate_model_flops(pipe) if flops > 0: print(f"\n📊 FLOPs 估算结果 (方法: {method}):") print(f" - 每次推理 FLOPs: {flops:.2e}") print(f" - 每次推理 TFLOPs: {flops / 1e12:.2f}") # 如果有推理时间,可以计算 FLOPS (每秒浮点运算数) # FLOPS = FLOPs / inference_time else: print("❌ FLOPs 估算失败") return flops def save_metrics_to_json(metrics_dict: Dict, output_path: str, load_time: float, total_time: float, total_images: int): """ 将性能指标保存到 JSON 文件 """ import json # 计算汇总指标 all_times = [] for gpu_id, m in metrics_dict.items(): all_times.extend(m['inference_times']) inference_wall_time = total_time - load_time summary = { 'avg_inference_time_sec': float(np.mean(all_times)) if all_times else 0, 'std_inference_time_sec': float(np.std(all_times)) if all_times else 0, 'min_inference_time_sec': float(np.min(all_times)) if all_times else 0, 'max_inference_time_sec': float(np.max(all_times)) if all_times else 0, 'median_inference_time_sec': float(np.median(all_times)) if all_times else 0, 'p95_inference_time_sec': float(np.percentile(all_times, 95)) if all_times else 0, 'p99_inference_time_sec': float(np.percentile(all_times, 99)) if all_times else 0, 'throughput_single_gpu_per_sec': float(len(all_times) / sum(all_times)) if all_times and sum(all_times) > 0 else 0, 'throughput_parallel_per_sec': float(total_images / inference_wall_time) if inference_wall_time > 0 else 0, 'peak_memory_mb': max([m['peak_memory_mb'] for m in metrics_dict.values()]) if metrics_dict else 0, 'peak_memory_gb': max([m['peak_memory_mb'] for m in metrics_dict.values()]) / 1024 if metrics_dict else 0, 'total_images': total_images, 'warmup_images': WARMUP_IMAGES * len(metrics_dict), 'measured_images': len(all_times), 'model_load_time_sec': load_time, 'inference_wall_time_sec': inference_wall_time, 'total_time_sec': total_time, 'num_gpus': len(metrics_dict), } result = { 'summary': summary, 'per_gpu_metrics': {str(k): v for k, v in metrics_dict.items()} } with open(output_path, 'w') as f: json.dump(result, f, indent=2, ensure_ascii=False) print(f"📄 性能指标已保存到: {output_path}") def main(save_metrics_path: str = None): """ 主函数 Args: save_metrics_path: 可选,保存性能指标的 JSON 文件路径 """ os.makedirs(output_folder, exist_ok=True) # 获取所有待处理的图片 all_images = sorted([ f for f in os.listdir(input_folder) if f.lower().endswith((".png", ".jpg", ".jpeg", ".bmp", ".webp")) ]) total_images = len(all_images) print("=" * 70) print(" 🚀 Diffusion 超分性能测试") print("=" * 70) print(f"📁 输入目录: {input_folder}") print(f"📁 输出目录: {output_folder}") print(f"🖼️ 总图片数: {total_images}") print(f"🎮 GPU 数量: {NUM_GPUS}") print(f"📦 每 GPU 处理: ~{total_images // NUM_GPUS} 张") print(f"⚙️ 模型加载并发数: {MAX_CONCURRENT_LOAD}") print(f"🔥 预热图片数: {WARMUP_IMAGES} (每个GPU)") print(f"📊 性能分析: {'开启' if ENABLE_PROFILING else '关闭'}") print("=" * 70) # 将图片列表平均分配给各个 GPU image_chunks = [[] for _ in range(NUM_GPUS)] for i, img in enumerate(all_images): image_chunks[i % NUM_GPUS].append(img) # 记录开始时间 start_time = time.time() # 使用多进程并行处理 mp.set_start_method('spawn', force=True) # 创建信号量来限制同时加载模型的进程数(避免I/O瓶颈) load_semaphore = mp.Semaphore(MAX_CONCURRENT_LOAD) # 创建事件来追踪每个进程的模型加载状态 ready_events = [mp.Event() for _ in range(NUM_GPUS)] # 创建屏障来同步所有进程在加载完成后开始推理 start_barrier = mp.Barrier(NUM_GPUS) # 创建共享字典存储各 GPU 的性能指标 manager = mp.Manager() metrics_dict = manager.dict() processes = [] print(f"\n⏳ 开始加载模型(最多 {MAX_CONCURRENT_LOAD} 个并发,避免I/O瓶颈)...") for gpu_id in range(NUM_GPUS): p = mp.Process( target=process_images, args=(gpu_id, image_chunks[gpu_id], output_folder, load_semaphore, ready_events[gpu_id], start_barrier, metrics_dict) ) p.start() processes.append(p) # 等待所有模型加载完成 loaded_count = 0 for i, event in enumerate(ready_events): event.wait() loaded_count += 1 print(f" ✅ GPU {i} 就绪 ({loaded_count}/{NUM_GPUS})") load_time = time.time() - start_time print(f"\n⏱️ 模型加载总耗时: {load_time:.1f}s ({load_time/60:.1f} 分钟)") print("🚀 所有模型加载完成,开始并行推理...\n") # 等待所有进程完成 for p in processes: p.join() # 计算总耗时 total_time = time.time() - start_time # 将 manager.dict 转换为普通 dict metrics_dict_normal = dict(metrics_dict) # 打印详细性能报告 print_performance_report(metrics_dict_normal, load_time, total_time, total_images) # 保存性能指标到文件 if save_metrics_path: save_metrics_to_json(metrics_dict_normal, save_metrics_path, load_time, total_time, total_images) print(f"\n📁 结果保存在: {output_folder}") print("=" * 70) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='Diffusion 超分性能测试') parser.add_argument('--mode', type=str, default='benchmark', choices=['benchmark', 'flops'], help='运行模式: benchmark (默认) 或 flops (仅估算FLOPs)') parser.add_argument('--save-metrics', type=str, default=None, help='保存性能指标到 JSON 文件的路径') parser.add_argument('--num-gpus', type=int, default=None, help='使用的 GPU 数量 (覆盖默认值)') parser.add_argument('--warmup', type=int, default=None, help='预热图片数量 (覆盖默认值)') args = parser.parse_args() # 覆盖配置 (使用 global) if args.num_gpus is not None: NUM_GPUS = args.num_gpus if args.warmup is not None: WARMUP_IMAGES = args.warmup if args.mode == 'flops': # 仅估算 FLOPs estimate_flops_standalone() else: # 运行完整的 benchmark main(save_metrics_path=args.save_metrics) # pyiqa psnr ssim lpips musiq clipiqa+ --target /home/wanghongbo06/diffusion-dpo-test/DIV2K-val/sobolev-400 --r /home/wanghongbo06/baipurui/DATA/DIV2K-val/gt