PINN / evaluate.py
inaniloquentee's picture
Upload folder using huggingface_hub
7fa74f9 verified
Raw
History Blame Contribute Delete
16.2 kB
import sys
import os
import torch
import numpy as np
import pandas as pd
import scipy.io as sio
import matplotlib.pyplot as plt
from scipy.interpolate import griddata
from scipy.linalg import qr
from scipy.ndimage import binary_dilation
import importlib.util
# =========================================================
# 路径配置与字典定义
# =========================================================
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
NEW_GEOMETRY_PATH = os.path.abspath(os.path.join(SCRIPT_DIR, "../Geometric generalization/cylinder_mean_flow.npy"))
MAT_FILE_PATH = os.path.abspath(os.path.join(SCRIPT_DIR, "../Geometric generalization/cylinder_nektar_wake.mat"))
# 引入基准的鲁棒物理引擎
sys.path.append(os.path.join(SCRIPT_DIR, "baseline++"))
try:
from model import NavierStokesURANS
except ImportError:
print("[!] 找不到 baseline++ 中的 model.py,请检查目录结构")
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
VARIANTS = {
"Baseline++": {
"arch": os.path.join(SCRIPT_DIR, "baseline++", "architectures.py"),
"weight": os.path.join(SCRIPT_DIR, "baseline++", "results", "model_ep1900.pth"),
"type": "dynamic"
},
"V1 (Hardcoded Gate)": {
"arch": os.path.join(SCRIPT_DIR, "Variant_1 Hardcoded Gate", "architectures.py"),
"weight": os.path.join(SCRIPT_DIR, "Variant_1 Hardcoded Gate", "results", "v1_ep1900.pth"),
"type": "hardcoded_x"
},
"V2 (Global On)": {
"arch": os.path.join(SCRIPT_DIR, "Variant_2 Spatial Gate", "architectures.py"),
"weight": os.path.join(SCRIPT_DIR, "Variant_2 Spatial Gate", "results", "v2_ep1900.pth"),
"type": "global_1"
},
"V3 (Global Off)": {
"arch": os.path.join(SCRIPT_DIR, "Variant_3 High_Freq", "architectures.py"),
"weight": os.path.join(SCRIPT_DIR, "Variant_3 High_Freq", "results", "v3_ep1900.pth"),
"type": "global_0"
},
"V4 (w/o PDE Loss)": {
"arch": os.path.join(SCRIPT_DIR, "Variant_4 PDE_Loss", "architectures.py"),
"weight": os.path.join(SCRIPT_DIR, "Variant_4 PDE_Loss", "results", "v4_ep1900.pth"),
"type": "dynamic"
},
"V5 (w/o Base Flow)": {
"arch": os.path.join(SCRIPT_DIR, "Variant_5 BaseFlow", "architectures.py"),
"weight": os.path.join(SCRIPT_DIR, "Variant_5 BaseFlow", "results", "v5_ep1900.pth"),
"type": "dynamic"
},
"V6 (w/o Robust PDE)": {
"arch": os.path.join(SCRIPT_DIR, "Variant_6 Robust PDE", "architectures.py"),
"weight": os.path.join(SCRIPT_DIR, "Variant_6 Robust PDE", "results", "v6_ep1900.pth"),
"type": "dynamic"
},
"V7 (w/o Safe Bound)": {
"arch": os.path.join(SCRIPT_DIR, "Variant_7 Safe Boundary", "architectures.py"),
"weight": os.path.join(SCRIPT_DIR, "Variant_7 Safe Boundary", "results", "v7_ep1900.pth"),
"type": "dynamic"
}
}
def load_architecture_class(arch_path, module_name):
spec = importlib.util.spec_from_file_location(module_name, arch_path)
arch_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(arch_module)
return arch_module.PIGU_Hybrid
def compute_vorticity(u, v, dx, dy):
dv_dy, dv_dx = torch.gradient(v, spacing=(dy, dx), dim=(-2, -1))
du_dy, du_dx = torch.gradient(u, spacing=(dy, dx), dim=(-2, -1))
return dv_dx - du_dy
def main():
print("="*80)
print("🚀 终极测试:跨几何圆柱绕流 (Zero-Shot) 全量物理与掩码指标评估")
print("="*80)
# =========================================================
# 1. 动态生成网格与真实数据解析
# =========================================================
raw_mean = np.load(NEW_GEOMETRY_PATH)
coords = raw_mean[:, :2]
x_min, x_max = coords[:, 0].min(), coords[:, 0].max()
y_min, y_max = coords[:, 1].min(), coords[:, 1].max()
W, H = 256, 128
grid_x, grid_y = np.meshgrid(np.linspace(x_min, x_max, W), np.linspace(y_min, y_max, H))
dx_val, dy_val = (x_max - x_min)/(W-1), (y_max - y_min)/(H-1)
dx_tensor = torch.tensor(dx_val, device=DEVICE)
dy_tensor = torch.tensor(dy_val, device=DEVICE)
mat_data = sio.loadmat(MAT_FILE_PATH)
valid_keys = [k for k in mat_data.keys() if not k.startswith('__')]
if 'U_star' in valid_keys:
u_trans, v_trans = mat_data['U_star'][:, 0, :], mat_data['U_star'][:, 1, :]
else:
u_trans = mat_data[next((k for k in valid_keys if k.lower() in ['u', 'u_transient', 'u_star']))]
v_trans = mat_data[next((k for k in valid_keys if k.lower() in ['v', 'v_transient', 'v_star']))]
p_trans = np.zeros_like(u_trans)
for k in valid_keys:
if k.lower() in ['p', 'p_transient', 'p_star']:
p_trans = mat_data[k]
if p_trans.ndim == 3: p_trans = p_trans[:, 0, :]
break
has_pressure = np.max(np.abs(p_trans)) > 1e-6
u_mean_true = np.mean(u_trans, axis=1)
v_mean_true = np.mean(v_trans, axis=1)
p_mean_true = np.mean(p_trans, axis=1)
# 插入网格
grid_u_mean = griddata(coords, u_mean_true, (grid_x, grid_y), method='linear', fill_value=np.nan)
grid_v_mean = griddata(coords, v_mean_true, (grid_x, grid_y), method='linear', fill_value=np.nan)
grid_p_mean = griddata(coords, p_mean_true, (grid_x, grid_y), method='linear', fill_value=np.nan)
# 利用 NaN 生成固壁边界
is_solid = np.isnan(grid_u_mean)
valid_mask_np = ~is_solid
valid_mask = torch.from_numpy(valid_mask_np).to(DEVICE)
boundary_np = binary_dilation(is_solid, iterations=2) ^ is_solid
boundary_mask = torch.from_numpy(boundary_np).to(DEVICE)
grid_u_mean = np.nan_to_num(grid_u_mean, nan=0.0)
grid_v_mean = np.nan_to_num(grid_v_mean, nan=0.0)
grid_p_mean = np.nan_to_num(grid_p_mean, nan=0.0)
# 解析 TKE 阈值
u_rms_points = np.sqrt(np.mean((u_trans - u_mean_true[:, None])**2, axis=1))
v_rms_points = np.sqrt(np.mean((v_trans - v_mean_true[:, None])**2, axis=1))
tke_points = 0.5 * (u_rms_points**2 + v_rms_points**2)
grid_tke = np.nan_to_num(griddata(coords, tke_points, (grid_x, grid_y), method='linear'), nan=0.0)
total_tke = np.sum(grid_tke)
tke_threshold = np.max(grid_tke) * 0.05
true_turbulent_mask = (grid_tke > tke_threshold).astype(np.float32)
actual_positives = np.sum(true_turbulent_mask)
# 提取前 N 帧瞬态数据用于物理测试
N_FRAMES = min(100, u_trans.shape[1])
print(f"[*] 提取前 {N_FRAMES} 帧数据用于物理重建评估...")
grid_trans = []
for t in range(N_FRAMES):
gu = np.nan_to_num(griddata(coords, u_trans[:, t], (grid_x, grid_y), method='linear'), nan=0.0)
gv = np.nan_to_num(griddata(coords, v_trans[:, t], (grid_x, grid_y), method='linear'), nan=0.0)
gp = np.nan_to_num(griddata(coords, p_trans[:, t], (grid_x, grid_y), method='linear'), nan=0.0)
grid_trans.append(np.stack([gu, gv, gp], axis=0))
grid_trans = np.stack(grid_trans, axis=0) # [T, 3, H, W]
data_tensor = torch.from_numpy(grid_trans).float()
mean_tensor = torch.from_numpy(np.stack([grid_u_mean, grid_v_mean, grid_p_mean], axis=0)).float()
stats_max = data_tensor.amax(dim=(0, 2, 3), keepdim=True)
stats_min = data_tensor.amin(dim=(0, 2, 3), keepdim=True)
denom = stats_max - stats_min
denom[denom < 1e-8] = 1.0
data_norm = 2 * (data_tensor - stats_min) / denom - 1.0
mean_norm = 2 * (mean_tensor - stats_min.squeeze(0)) / denom.squeeze(0) - 1.0
# =========================================================
# 2. 零样本 QR 分解提取传感器
# =========================================================
print("[*] 正在执行零样本 SVD + QR 分解,寻找圆柱尾迹最佳传感器...")
fluctuations = (data_tensor[:, 0, :, :] - mean_tensor[0, :, :]).view(N_FRAMES, -1).numpy().T
k = min(N_FRAMES, 65 + 10)
U, _, _ = np.linalg.svd(fluctuations, full_matrices=False)
_, _, P = qr(U[:, :k].T, pivoting=True)
fluid_indices = np.where(valid_mask_np.ravel())[0]
valid_P = [idx for idx in P if idx in fluid_indices]
sensor_locs = [(idx // W, idx % W) for idx in valid_P[:65]]
grid_x_norm = 2 * (grid_x - x_min) / (x_max - x_min) - 1
grid_y_norm = 2 * (grid_y - y_min) / (y_max - y_min) - 1
grid_coords_norm = torch.from_numpy(np.stack([grid_x_norm, grid_y_norm], axis=-1)).float().to(DEVICE)
sensor_coords = torch.stack([grid_coords_norm[y, x] for y, x in sensor_locs]).to(DEVICE) # [65, 2]
def denormalize(norm_tensor):
uvp_norm = norm_tensor[:, :3, :, :]
uvp_phys = (uvp_norm + 1) / 2 * (stats_max.to(DEVICE) - stats_min.to(DEVICE)) + stats_min.to(DEVICE)
if norm_tensor.shape[1] >= 4:
return torch.cat([uvp_phys, norm_tensor[:, 3:4, :, :]], dim=1)
return uvp_phys
pde_engine = NavierStokesURANS({'box_len': [x_max - x_min, y_max - y_min, 1.0], 'dt': 0.05}).to(DEVICE)
results_list = []
# =========================================================
# 3. 核心评估循环
# =========================================================
for i, (name, config) in enumerate(VARIANTS.items()):
print(f"\n[{i+1}/{len(VARIANTS)}] 正在深度评估: {name}")
if not os.path.exists(config['weight']):
print(f" [!] 未找到权重文件,跳过...")
continue
PIGU_Class = load_architecture_class(config['arch'], module_name=f"arch_mod_{i}")
model = PIGU_Class(sensor_in_dim=3, sensor_count=65).to(DEVICE)
model.load_state_dict(torch.load(config['weight'], map_location=DEVICE, weights_only=True), strict=False)
model.eval()
# --- A. 门控掩码网格搜索 (Mask Metrics) ---
best_mask, capture, precision, recall, iou = np.zeros((H, W)), 0, 0, 0, 0
best_th = "N/A"
u_mean_flat = torch.tensor(grid_u_mean).view(1, -1, 1).float().to(DEVICE)
if config['type'] == 'dynamic':
gate_sc = model.projector.gate_scale.item()
best_score = -1
for th in np.arange(-0.5, 0.95, 0.05):
with torch.no_grad():
beta_flat = torch.sigmoid(gate_sc * (th - u_mean_flat))
mask = beta_flat.view(H, W).cpu().numpy()
if mask.max() > 0: mask = mask / mask.max()
pred_beta_mask = (mask > 0.5).astype(np.float32)
e_cap = np.sum(grid_tke * mask) / (total_tke + 1e-8)
intersection = np.sum(true_turbulent_mask * pred_beta_mask)
pred_pos = np.sum(pred_beta_mask)
pre = intersection / (pred_pos + 1e-8) if pred_pos > 0 else 0
score = e_cap * pre
if score > best_score and pre > 0.50:
best_score, best_th, best_mask = score, th, mask
capture, precision = e_cap, pre
recall = intersection / (actual_positives + 1e-8)
iou = intersection / (actual_positives + pred_pos - intersection + 1e-8)
else:
if config['type'] == 'hardcoded_x':
best_mask = 1.0 / (1.0 + np.exp(-50.0 * (grid_x_norm - 0.33)))
elif config['type'] == 'global_1': best_mask = np.ones((H, W))
elif config['type'] == 'global_0': best_mask = np.zeros((H, W))
pred_beta_mask = (best_mask > 0.5).astype(np.float32)
capture = np.sum(grid_tke * best_mask) / (total_tke + 1e-8)
intersection = np.sum(true_turbulent_mask * pred_beta_mask)
pred_pos = np.sum(pred_beta_mask)
recall = intersection / (actual_positives + 1e-8)
precision = intersection / (pred_pos + 1e-8) if pred_pos > 0 else 0
iou = intersection / (actual_positives + pred_pos - intersection + 1e-8)
beta_tensor = torch.from_numpy(best_mask).view(1, 1, H, W).float().to(DEVICE)
# --- B. 物理流场重建指标 (Flow Metrics) ---
m_l2, m_cont, m_vort, m_tke, m_bp = [], [], [], [], []
with torch.no_grad():
for t in range(N_FRAMES - 1):
s_val_t = data_norm[t, :, [l[0] for l in sensor_locs], [l[1] for l in sensor_locs]].T.unsqueeze(0).to(DEVICE)
s_val_next = data_norm[t+1, :, [l[0] for l in sensor_locs], [l[1] for l in sensor_locs]].T.unsqueeze(0).to(DEVICE)
# 🛑 核心修复:移除 permute,直接保持 [1, H, W, 2] 并在内存上连续
g_pos = grid_coords_norm.unsqueeze(0).contiguous()
b_flow = mean_norm.unsqueeze(0).to(DEVICE)
s_pos = sensor_coords.unsqueeze(0)
pred_t = model(s_val_t, s_pos, g_pos, b_flow)
pred_next = model(s_val_next, s_pos, g_pos, b_flow)
p_phys = denormalize(pred_t)
t_phys = data_tensor[t].unsqueeze(0).to(DEVICE)
m_phys = mean_tensor.unsqueeze(0).to(DEVICE)
u_p, v_p, p_p = p_phys[0,0], p_phys[0,1], p_phys[0,2]
u_t, v_t, p_t = t_phys[0,0], t_phys[0,1], t_phys[0,2]
u_m, v_m = m_phys[0,0], m_phys[0,1]
# 1. L2 Velocity
uv_p, uv_t = p_phys[0, 0:2][:, valid_mask], t_phys[0, 0:2][:, valid_mask]
m_l2.append((torch.norm(uv_p - uv_t) / (torch.norm(uv_t) + 1e-8)).item())
# 2. Continuity
_, _, res_c = pde_engine(p_phys, denormalize(pred_next), None, dx=dx_tensor, dy=dy_tensor, beta_mask=beta_tensor)
m_cont.append(torch.sqrt(torch.mean(res_c[0, 0][valid_mask]**2)).item())
# 3. Vorticity
vort_p, vort_t = compute_vorticity(u_p, v_p, dx_val, dy_val), compute_vorticity(u_t, v_t, dx_val, dy_val)
m_vort.append((torch.norm(vort_p[valid_mask] - vort_t[valid_mask]) / (torch.norm(vort_t[valid_mask]) + 1e-8)).item())
# 4. TKE
tke_p, tke_t = 0.5*((u_p-u_m)**2 + (v_p-v_m)**2), 0.5*((u_t-u_m)**2 + (v_t-v_m)**2)
m_tke.append((torch.norm(tke_p[valid_mask] - tke_t[valid_mask]) / (torch.norm(tke_t[valid_mask]) + 1e-8)).item())
# 5. Boundary Pressure
if has_pressure:
m_bp.append((torch.norm(p_p[boundary_mask] - p_t[boundary_mask]) / (torch.norm(p_t[boundary_mask]) + 1e-8)).item())
results_list.append({
"Variant": name,
"Thresh": f"{best_th:.2f}" if isinstance(best_th, float) else best_th,
"Cap(%)": capture * 100,
"IoU(%)": iou * 100,
"L2_Vel": np.mean(m_l2),
"Cont_Res": np.mean(m_cont),
"Vort_L2": np.mean(m_vort),
"TKE_L2": np.mean(m_tke),
"Bound_P": np.mean(m_bp) if has_pressure else 0.0
})
# =========================================================
# 4. 打印并保存终极神表
# =========================================================
if results_list:
df = pd.DataFrame(results_list)
formatters = {
"Cap(%)": "{:.1f}".format, "IoU(%)": "{:.1f}".format,
"L2_Vel": "{:.3e}".format, "Cont_Res": "{:.3e}".format,
"Vort_L2": "{:.3e}".format, "TKE_L2": "{:.3e}".format,
"Bound_P": "{:.3e}".format
}
print("\n" + "="*110)
print("🏆 终极版:跨几何泛化 (Zero-Shot Cylinder Wake) 全量评价大满贯 🏆")
print("="*110)
print(df.to_string(index=False, justify='center', formatters=formatters))
print("="*110)
os.makedirs("eval_results", exist_ok=True)
csv_path = "eval_results/ultimate_cylinder_zero_shot_metrics.csv"
df.to_csv(csv_path, index=False)
print(f"[+] 终极评估结果已保存至: {csv_path}")
if __name__ == "__main__":
main()