import sys import os import torch import numpy as np import pandas as pd import scipy.io as sio import matplotlib.pyplot as plt from scipy.interpolate import griddata from scipy.linalg import qr from scipy.ndimage import binary_dilation import importlib.util # ========================================================= # 路径配置与字典定义 # ========================================================= SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) NEW_GEOMETRY_PATH = os.path.abspath(os.path.join(SCRIPT_DIR, "../Geometric generalization/cylinder_mean_flow.npy")) MAT_FILE_PATH = os.path.abspath(os.path.join(SCRIPT_DIR, "../Geometric generalization/cylinder_nektar_wake.mat")) # 引入基准的鲁棒物理引擎 sys.path.append(os.path.join(SCRIPT_DIR, "baseline++")) try: from model import NavierStokesURANS except ImportError: print("[!] 找不到 baseline++ 中的 model.py,请检查目录结构") DEVICE = "cuda" if torch.cuda.is_available() else "cpu" VARIANTS = { "Baseline++": { "arch": os.path.join(SCRIPT_DIR, "baseline++", "architectures.py"), "weight": os.path.join(SCRIPT_DIR, "baseline++", "results", "model_ep1900.pth"), "type": "dynamic" }, "V1 (Hardcoded Gate)": { "arch": os.path.join(SCRIPT_DIR, "Variant_1 Hardcoded Gate", "architectures.py"), "weight": os.path.join(SCRIPT_DIR, "Variant_1 Hardcoded Gate", "results", "v1_ep1900.pth"), "type": "hardcoded_x" }, "V2 (Global On)": { "arch": os.path.join(SCRIPT_DIR, "Variant_2 Spatial Gate", "architectures.py"), "weight": os.path.join(SCRIPT_DIR, "Variant_2 Spatial Gate", "results", "v2_ep1900.pth"), "type": "global_1" }, "V3 (Global Off)": { "arch": os.path.join(SCRIPT_DIR, "Variant_3 High_Freq", "architectures.py"), "weight": os.path.join(SCRIPT_DIR, "Variant_3 High_Freq", "results", "v3_ep1900.pth"), "type": "global_0" }, "V4 (w/o PDE Loss)": { "arch": os.path.join(SCRIPT_DIR, "Variant_4 PDE_Loss", "architectures.py"), "weight": os.path.join(SCRIPT_DIR, "Variant_4 PDE_Loss", "results", "v4_ep1900.pth"), "type": "dynamic" }, "V5 (w/o Base Flow)": { "arch": os.path.join(SCRIPT_DIR, "Variant_5 BaseFlow", "architectures.py"), "weight": os.path.join(SCRIPT_DIR, "Variant_5 BaseFlow", "results", "v5_ep1900.pth"), "type": "dynamic" }, "V6 (w/o Robust PDE)": { "arch": os.path.join(SCRIPT_DIR, "Variant_6 Robust PDE", "architectures.py"), "weight": os.path.join(SCRIPT_DIR, "Variant_6 Robust PDE", "results", "v6_ep1900.pth"), "type": "dynamic" }, "V7 (w/o Safe Bound)": { "arch": os.path.join(SCRIPT_DIR, "Variant_7 Safe Boundary", "architectures.py"), "weight": os.path.join(SCRIPT_DIR, "Variant_7 Safe Boundary", "results", "v7_ep1900.pth"), "type": "dynamic" } } def load_architecture_class(arch_path, module_name): spec = importlib.util.spec_from_file_location(module_name, arch_path) arch_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(arch_module) return arch_module.PIGU_Hybrid def compute_vorticity(u, v, dx, dy): dv_dy, dv_dx = torch.gradient(v, spacing=(dy, dx), dim=(-2, -1)) du_dy, du_dx = torch.gradient(u, spacing=(dy, dx), dim=(-2, -1)) return dv_dx - du_dy def main(): print("="*80) print("🚀 终极测试:跨几何圆柱绕流 (Zero-Shot) 全量物理与掩码指标评估") print("="*80) # ========================================================= # 1. 动态生成网格与真实数据解析 # ========================================================= raw_mean = np.load(NEW_GEOMETRY_PATH) coords = raw_mean[:, :2] x_min, x_max = coords[:, 0].min(), coords[:, 0].max() y_min, y_max = coords[:, 1].min(), coords[:, 1].max() W, H = 256, 128 grid_x, grid_y = np.meshgrid(np.linspace(x_min, x_max, W), np.linspace(y_min, y_max, H)) dx_val, dy_val = (x_max - x_min)/(W-1), (y_max - y_min)/(H-1) dx_tensor = torch.tensor(dx_val, device=DEVICE) dy_tensor = torch.tensor(dy_val, device=DEVICE) mat_data = sio.loadmat(MAT_FILE_PATH) valid_keys = [k for k in mat_data.keys() if not k.startswith('__')] if 'U_star' in valid_keys: u_trans, v_trans = mat_data['U_star'][:, 0, :], mat_data['U_star'][:, 1, :] else: u_trans = mat_data[next((k for k in valid_keys if k.lower() in ['u', 'u_transient', 'u_star']))] v_trans = mat_data[next((k for k in valid_keys if k.lower() in ['v', 'v_transient', 'v_star']))] p_trans = np.zeros_like(u_trans) for k in valid_keys: if k.lower() in ['p', 'p_transient', 'p_star']: p_trans = mat_data[k] if p_trans.ndim == 3: p_trans = p_trans[:, 0, :] break has_pressure = np.max(np.abs(p_trans)) > 1e-6 u_mean_true = np.mean(u_trans, axis=1) v_mean_true = np.mean(v_trans, axis=1) p_mean_true = np.mean(p_trans, axis=1) # 插入网格 grid_u_mean = griddata(coords, u_mean_true, (grid_x, grid_y), method='linear', fill_value=np.nan) grid_v_mean = griddata(coords, v_mean_true, (grid_x, grid_y), method='linear', fill_value=np.nan) grid_p_mean = griddata(coords, p_mean_true, (grid_x, grid_y), method='linear', fill_value=np.nan) # 利用 NaN 生成固壁边界 is_solid = np.isnan(grid_u_mean) valid_mask_np = ~is_solid valid_mask = torch.from_numpy(valid_mask_np).to(DEVICE) boundary_np = binary_dilation(is_solid, iterations=2) ^ is_solid boundary_mask = torch.from_numpy(boundary_np).to(DEVICE) grid_u_mean = np.nan_to_num(grid_u_mean, nan=0.0) grid_v_mean = np.nan_to_num(grid_v_mean, nan=0.0) grid_p_mean = np.nan_to_num(grid_p_mean, nan=0.0) # 解析 TKE 阈值 u_rms_points = np.sqrt(np.mean((u_trans - u_mean_true[:, None])**2, axis=1)) v_rms_points = np.sqrt(np.mean((v_trans - v_mean_true[:, None])**2, axis=1)) tke_points = 0.5 * (u_rms_points**2 + v_rms_points**2) grid_tke = np.nan_to_num(griddata(coords, tke_points, (grid_x, grid_y), method='linear'), nan=0.0) total_tke = np.sum(grid_tke) tke_threshold = np.max(grid_tke) * 0.05 true_turbulent_mask = (grid_tke > tke_threshold).astype(np.float32) actual_positives = np.sum(true_turbulent_mask) # 提取前 N 帧瞬态数据用于物理测试 N_FRAMES = min(100, u_trans.shape[1]) print(f"[*] 提取前 {N_FRAMES} 帧数据用于物理重建评估...") grid_trans = [] for t in range(N_FRAMES): gu = np.nan_to_num(griddata(coords, u_trans[:, t], (grid_x, grid_y), method='linear'), nan=0.0) gv = np.nan_to_num(griddata(coords, v_trans[:, t], (grid_x, grid_y), method='linear'), nan=0.0) gp = np.nan_to_num(griddata(coords, p_trans[:, t], (grid_x, grid_y), method='linear'), nan=0.0) grid_trans.append(np.stack([gu, gv, gp], axis=0)) grid_trans = np.stack(grid_trans, axis=0) # [T, 3, H, W] data_tensor = torch.from_numpy(grid_trans).float() mean_tensor = torch.from_numpy(np.stack([grid_u_mean, grid_v_mean, grid_p_mean], axis=0)).float() stats_max = data_tensor.amax(dim=(0, 2, 3), keepdim=True) stats_min = data_tensor.amin(dim=(0, 2, 3), keepdim=True) denom = stats_max - stats_min denom[denom < 1e-8] = 1.0 data_norm = 2 * (data_tensor - stats_min) / denom - 1.0 mean_norm = 2 * (mean_tensor - stats_min.squeeze(0)) / denom.squeeze(0) - 1.0 # ========================================================= # 2. 零样本 QR 分解提取传感器 # ========================================================= print("[*] 正在执行零样本 SVD + QR 分解,寻找圆柱尾迹最佳传感器...") fluctuations = (data_tensor[:, 0, :, :] - mean_tensor[0, :, :]).view(N_FRAMES, -1).numpy().T k = min(N_FRAMES, 65 + 10) U, _, _ = np.linalg.svd(fluctuations, full_matrices=False) _, _, P = qr(U[:, :k].T, pivoting=True) fluid_indices = np.where(valid_mask_np.ravel())[0] valid_P = [idx for idx in P if idx in fluid_indices] sensor_locs = [(idx // W, idx % W) for idx in valid_P[:65]] grid_x_norm = 2 * (grid_x - x_min) / (x_max - x_min) - 1 grid_y_norm = 2 * (grid_y - y_min) / (y_max - y_min) - 1 grid_coords_norm = torch.from_numpy(np.stack([grid_x_norm, grid_y_norm], axis=-1)).float().to(DEVICE) sensor_coords = torch.stack([grid_coords_norm[y, x] for y, x in sensor_locs]).to(DEVICE) # [65, 2] def denormalize(norm_tensor): uvp_norm = norm_tensor[:, :3, :, :] uvp_phys = (uvp_norm + 1) / 2 * (stats_max.to(DEVICE) - stats_min.to(DEVICE)) + stats_min.to(DEVICE) if norm_tensor.shape[1] >= 4: return torch.cat([uvp_phys, norm_tensor[:, 3:4, :, :]], dim=1) return uvp_phys pde_engine = NavierStokesURANS({'box_len': [x_max - x_min, y_max - y_min, 1.0], 'dt': 0.05}).to(DEVICE) results_list = [] # ========================================================= # 3. 核心评估循环 # ========================================================= for i, (name, config) in enumerate(VARIANTS.items()): print(f"\n[{i+1}/{len(VARIANTS)}] 正在深度评估: {name}") if not os.path.exists(config['weight']): print(f" [!] 未找到权重文件,跳过...") continue PIGU_Class = load_architecture_class(config['arch'], module_name=f"arch_mod_{i}") model = PIGU_Class(sensor_in_dim=3, sensor_count=65).to(DEVICE) model.load_state_dict(torch.load(config['weight'], map_location=DEVICE, weights_only=True), strict=False) model.eval() # --- A. 门控掩码网格搜索 (Mask Metrics) --- best_mask, capture, precision, recall, iou = np.zeros((H, W)), 0, 0, 0, 0 best_th = "N/A" u_mean_flat = torch.tensor(grid_u_mean).view(1, -1, 1).float().to(DEVICE) if config['type'] == 'dynamic': gate_sc = model.projector.gate_scale.item() best_score = -1 for th in np.arange(-0.5, 0.95, 0.05): with torch.no_grad(): beta_flat = torch.sigmoid(gate_sc * (th - u_mean_flat)) mask = beta_flat.view(H, W).cpu().numpy() if mask.max() > 0: mask = mask / mask.max() pred_beta_mask = (mask > 0.5).astype(np.float32) e_cap = np.sum(grid_tke * mask) / (total_tke + 1e-8) intersection = np.sum(true_turbulent_mask * pred_beta_mask) pred_pos = np.sum(pred_beta_mask) pre = intersection / (pred_pos + 1e-8) if pred_pos > 0 else 0 score = e_cap * pre if score > best_score and pre > 0.50: best_score, best_th, best_mask = score, th, mask capture, precision = e_cap, pre recall = intersection / (actual_positives + 1e-8) iou = intersection / (actual_positives + pred_pos - intersection + 1e-8) else: if config['type'] == 'hardcoded_x': best_mask = 1.0 / (1.0 + np.exp(-50.0 * (grid_x_norm - 0.33))) elif config['type'] == 'global_1': best_mask = np.ones((H, W)) elif config['type'] == 'global_0': best_mask = np.zeros((H, W)) pred_beta_mask = (best_mask > 0.5).astype(np.float32) capture = np.sum(grid_tke * best_mask) / (total_tke + 1e-8) intersection = np.sum(true_turbulent_mask * pred_beta_mask) pred_pos = np.sum(pred_beta_mask) recall = intersection / (actual_positives + 1e-8) precision = intersection / (pred_pos + 1e-8) if pred_pos > 0 else 0 iou = intersection / (actual_positives + pred_pos - intersection + 1e-8) beta_tensor = torch.from_numpy(best_mask).view(1, 1, H, W).float().to(DEVICE) # --- B. 物理流场重建指标 (Flow Metrics) --- m_l2, m_cont, m_vort, m_tke, m_bp = [], [], [], [], [] with torch.no_grad(): for t in range(N_FRAMES - 1): s_val_t = data_norm[t, :, [l[0] for l in sensor_locs], [l[1] for l in sensor_locs]].T.unsqueeze(0).to(DEVICE) s_val_next = data_norm[t+1, :, [l[0] for l in sensor_locs], [l[1] for l in sensor_locs]].T.unsqueeze(0).to(DEVICE) # 🛑 核心修复:移除 permute,直接保持 [1, H, W, 2] 并在内存上连续 g_pos = grid_coords_norm.unsqueeze(0).contiguous() b_flow = mean_norm.unsqueeze(0).to(DEVICE) s_pos = sensor_coords.unsqueeze(0) pred_t = model(s_val_t, s_pos, g_pos, b_flow) pred_next = model(s_val_next, s_pos, g_pos, b_flow) p_phys = denormalize(pred_t) t_phys = data_tensor[t].unsqueeze(0).to(DEVICE) m_phys = mean_tensor.unsqueeze(0).to(DEVICE) u_p, v_p, p_p = p_phys[0,0], p_phys[0,1], p_phys[0,2] u_t, v_t, p_t = t_phys[0,0], t_phys[0,1], t_phys[0,2] u_m, v_m = m_phys[0,0], m_phys[0,1] # 1. L2 Velocity uv_p, uv_t = p_phys[0, 0:2][:, valid_mask], t_phys[0, 0:2][:, valid_mask] m_l2.append((torch.norm(uv_p - uv_t) / (torch.norm(uv_t) + 1e-8)).item()) # 2. Continuity _, _, res_c = pde_engine(p_phys, denormalize(pred_next), None, dx=dx_tensor, dy=dy_tensor, beta_mask=beta_tensor) m_cont.append(torch.sqrt(torch.mean(res_c[0, 0][valid_mask]**2)).item()) # 3. Vorticity vort_p, vort_t = compute_vorticity(u_p, v_p, dx_val, dy_val), compute_vorticity(u_t, v_t, dx_val, dy_val) m_vort.append((torch.norm(vort_p[valid_mask] - vort_t[valid_mask]) / (torch.norm(vort_t[valid_mask]) + 1e-8)).item()) # 4. TKE tke_p, tke_t = 0.5*((u_p-u_m)**2 + (v_p-v_m)**2), 0.5*((u_t-u_m)**2 + (v_t-v_m)**2) m_tke.append((torch.norm(tke_p[valid_mask] - tke_t[valid_mask]) / (torch.norm(tke_t[valid_mask]) + 1e-8)).item()) # 5. Boundary Pressure if has_pressure: m_bp.append((torch.norm(p_p[boundary_mask] - p_t[boundary_mask]) / (torch.norm(p_t[boundary_mask]) + 1e-8)).item()) results_list.append({ "Variant": name, "Thresh": f"{best_th:.2f}" if isinstance(best_th, float) else best_th, "Cap(%)": capture * 100, "IoU(%)": iou * 100, "L2_Vel": np.mean(m_l2), "Cont_Res": np.mean(m_cont), "Vort_L2": np.mean(m_vort), "TKE_L2": np.mean(m_tke), "Bound_P": np.mean(m_bp) if has_pressure else 0.0 }) # ========================================================= # 4. 打印并保存终极神表 # ========================================================= if results_list: df = pd.DataFrame(results_list) formatters = { "Cap(%)": "{:.1f}".format, "IoU(%)": "{:.1f}".format, "L2_Vel": "{:.3e}".format, "Cont_Res": "{:.3e}".format, "Vort_L2": "{:.3e}".format, "TKE_L2": "{:.3e}".format, "Bound_P": "{:.3e}".format } print("\n" + "="*110) print("🏆 终极版:跨几何泛化 (Zero-Shot Cylinder Wake) 全量评价大满贯 🏆") print("="*110) print(df.to_string(index=False, justify='center', formatters=formatters)) print("="*110) os.makedirs("eval_results", exist_ok=True) csv_path = "eval_results/ultimate_cylinder_zero_shot_metrics.csv" df.to_csv(csv_path, index=False) print(f"[+] 终极评估结果已保存至: {csv_path}") if __name__ == "__main__": main()