#!/usr/bin/env python3 import argparse import json import math import sys import time from pathlib import Path import numpy as np from scipy.spatial import cKDTree from scipy.stats import spearmanr, rankdata sys.path.append(str(Path(__file__).resolve().parent)) import eval_tnt_wrapper as W def sigmoid(x): x = np.asarray(x, dtype=np.float64) return 1.0 / (1.0 + np.exp(-x)) def query_tree(tree, points, k=1, batch_size=50000): ds, inds = [], [] for s in range(0, len(points), batch_size): e = min(s + batch_size, len(points)) try: d, i = tree.query(points[s:e], k=k, workers=-1) except TypeError: d, i = tree.query(points[s:e], k=k) ds.append(d) inds.append(i) return np.concatenate(ds, axis=0), np.concatenate(inds, axis=0) def safe_spearman(x, y): x = np.asarray(x) y = np.asarray(y) m = np.isfinite(x) & np.isfinite(y) if m.sum() < 10: return float("nan") if np.std(x[m]) < 1e-12 or np.std(y[m]) < 1e-12: return float("nan") return float(spearmanr(x[m], y[m]).correlation) def auroc_score(labels, scores): labels = np.asarray(labels).astype(bool) scores = np.asarray(scores, dtype=np.float64) m = np.isfinite(scores) labels = labels[m] scores = scores[m] n_pos = int(labels.sum()) n_neg = int((~labels).sum()) if n_pos == 0 or n_neg == 0: return float("nan") ranks = rankdata(scores) rank_sum_pos = ranks[labels].sum() auc = (rank_sum_pos - n_pos * (n_pos + 1) / 2.0) / (n_pos * n_neg) return float(auc) def average_precision(labels, scores): labels = np.asarray(labels).astype(bool) scores = np.asarray(scores, dtype=np.float64) m = np.isfinite(scores) labels = labels[m] scores = scores[m] n_pos = int(labels.sum()) if n_pos == 0: return float("nan") order = np.argsort(-scores) y = labels[order] tp = np.cumsum(y) precision = tp / (np.arange(len(y)) + 1) ap = precision[y].mean() return float(ap) def quantile_bin_stats(feature, surface_mask, distance, normal_error=None, n_bins=5): feature = np.asarray(feature, dtype=np.float64) surface_mask = np.asarray(surface_mask).astype(bool) distance = np.asarray(distance, dtype=np.float64) valid = np.isfinite(feature) & np.isfinite(distance) if valid.sum() < n_bins * 10: return {} x = feature[valid] sm = surface_mask[valid] dist = distance[valid] qs = np.linspace(0, 1, n_bins + 1) edges = np.quantile(x, qs) edges[0] = -np.inf edges[-1] = np.inf out = {} for b in range(n_bins): lo, hi = edges[b], edges[b + 1] if b == n_bins - 1: idx = (x >= lo) & (x <= hi) else: idx = (x >= lo) & (x < hi) key = f"bin{b+1}" if idx.sum() == 0: continue out[f"{key}_n"] = int(idx.sum()) out[f"{key}_feature_median"] = float(np.median(x[idx])) out[f"{key}_surface_near_ratio"] = float(sm[idx].mean()) out[f"{key}_distance_median"] = float(np.median(dist[idx])) if normal_error is not None: ne = np.asarray(normal_error, dtype=np.float64) if len(ne) == len(feature): ne = ne[valid] for b in range(n_bins): lo, hi = edges[b], edges[b + 1] idx = (x >= lo) & (x <= hi) if b == n_bins - 1 else (x >= lo) & (x < hi) key = f"bin{b+1}" if idx.sum() > 0 and np.isfinite(ne[idx]).sum() > 0: out[f"{key}_normal_error_median"] = float(np.nanmedian(ne[idx])) return out def pca_normals_and_shape(points, neighbor_indices, batch_size=50000): """ Returns: normals: eigenvector of smallest eigenvalue linearity, planarity, scattering from local point distribution """ if neighbor_indices.ndim != 2: raise ValueError("neighbor_indices must be [M, K]") M, K = neighbor_indices.shape normals = np.empty((M, 3), dtype=np.float32) linearity = np.empty(M, dtype=np.float32) planarity = np.empty(M, dtype=np.float32) scattering = np.empty(M, dtype=np.float32) for s in range(0, M, batch_size): e = min(s + batch_size, M) neigh = points[neighbor_indices[s:e]] centered = neigh - neigh.mean(axis=1, keepdims=True) cov = np.einsum("bki,bkj->bij", centered, centered) / max(K - 1, 1) vals, vecs = np.linalg.eigh(cov) # eigh ascending: vals[:,0] <= vals[:,1] <= vals[:,2] l3 = np.maximum(vals[:, 0], 0.0) l2 = np.maximum(vals[:, 1], 0.0) l1 = np.maximum(vals[:, 2], 1e-30) normals[s:e] = vecs[:, :, 0].astype(np.float32) linearity[s:e] = ((l1 - l2) / l1).astype(np.float32) planarity[s:e] = ((l2 - l3) / l1).astype(np.float32) scattering[s:e] = (l3 / l1).astype(np.float32) return normals, linearity, planarity, scattering def covariance_shape_features(v, names, indices): required = ["scale_0", "scale_1", "scale_2"] if not all(k in names for k in required): return None scales_log = np.stack( [v["scale_0"][indices], v["scale_1"][indices], v["scale_2"][indices]], axis=1, ).astype(np.float64) # PLY stores log std in standard 3DGS. sigma = np.exp(scales_log) lamb = sigma ** 2 lamb_sort = np.sort(lamb, axis=1)[:, ::-1] # λ1 >= λ2 >= λ3 l1 = np.maximum(lamb_sort[:, 0], 1e-30) l2 = np.maximum(lamb_sort[:, 1], 0.0) l3 = np.maximum(lamb_sort[:, 2], 0.0) line = (l1 - l2) / l1 plane = (l2 - l3) / l1 scatter = l3 / l1 log_aniso_sigma = np.max(scales_log, axis=1) - np.min(scales_log, axis=1) aniso_sigma_ratio = np.exp(log_aniso_sigma) return { "cov_linearity": line.astype(np.float32), "cov_planarity": plane.astype(np.float32), "cov_scattering": scatter.astype(np.float32), "cov_log_anisotropy_sigma": log_aniso_sigma.astype(np.float32), "cov_anisotropy_sigma_ratio": aniso_sigma_ratio.astype(np.float32), } def gaussian_v3_normals(v, names, indices, transform_mat): required = ["scale_0", "scale_1", "scale_2", "rot_0", "rot_1", "rot_2", "rot_3"] if not all(k in names for k in required): return None idx = indices scales = np.stack( [v["scale_0"][idx], v["scale_1"][idx], v["scale_2"][idx]], axis=1, ).astype(np.float64) min_axis = np.argmin(scales, axis=1) q = np.stack( [v["rot_0"][idx], v["rot_1"][idx], v["rot_2"][idx], v["rot_3"][idx]], axis=1, ).astype(np.float64) q = q / (np.linalg.norm(q, axis=1, keepdims=True) + 1e-12) # 3DGS convention: [w, x, y, z] w, x, y, z = q[:, 0], q[:, 1], q[:, 2], q[:, 3] r00 = 1 - 2 * (y * y + z * z) r01 = 2 * (x * y - w * z) r02 = 2 * (x * z + w * y) r10 = 2 * (x * y + w * z) r11 = 1 - 2 * (x * x + z * z) r12 = 2 * (y * z - w * x) r20 = 2 * (x * z - w * y) r21 = 2 * (y * z + w * x) r22 = 1 - 2 * (x * x + y * y) normals = np.empty((len(idx), 3), dtype=np.float64) m0 = min_axis == 0 normals[m0, 0] = r00[m0] normals[m0, 1] = r10[m0] normals[m0, 2] = r20[m0] m1 = min_axis == 1 normals[m1, 0] = r01[m1] normals[m1, 1] = r11[m1] normals[m1, 2] = r21[m1] m2 = min_axis == 2 normals[m2, 0] = r02[m2] normals[m2, 1] = r12[m2] normals[m2, 2] = r22[m2] A = transform_mat[:3, :3].astype(np.float64) normals = normals @ A.T normals = normals / (np.linalg.norm(normals, axis=1, keepdims=True) + 1e-12) return normals.astype(np.float32) def two_dgs_normals(v, names, indices, transform_mat): """ 2DGS-specific normal. 2D Gaussian has two in-plane scale axes, scale_0 and scale_1. The surface normal is the third local axis of the quaternion rotation, i.e. the axis perpendicular to the two scaled tangent directions. This is NOT the generic 3DGS V3/minor-scale normal. """ required = ["scale_0", "scale_1", "rot_0", "rot_1", "rot_2", "rot_3"] if not all(k in names for k in required): return None # If scale_2 exists, this is probably a standard 3D Gaussian, not 2DGS. # Let generic V3 handle it instead. if "scale_2" in names: return None idx = indices q = np.stack( [v["rot_0"][idx], v["rot_1"][idx], v["rot_2"][idx], v["rot_3"][idx]], axis=1, ).astype(np.float64) q = q / (np.linalg.norm(q, axis=1, keepdims=True) + 1e-12) # 3DGS/2DGS convention is normally quaternion [w, x, y, z]. w, x, y, z = q[:, 0], q[:, 1], q[:, 2], q[:, 3] r02 = 2 * (x * z + w * y) r12 = 2 * (y * z - w * x) r22 = 1 - 2 * (x * x + y * y) # Third column of R: local z axis, perpendicular to the two in-plane axes. normals = np.stack([r02, r12, r22], axis=1) A = transform_mat[:3, :3].astype(np.float64) normals = normals @ A.T normals = normals / (np.linalg.norm(normals, axis=1, keepdims=True) + 1e-12) return normals.astype(np.float32) def explicit_normals(v, names, indices, transform_mat): candidates = [ ("nx", "ny", "nz"), ("normal_x", "normal_y", "normal_z"), ] found = None for a, b, c in candidates: if a in names and b in names and c in names: found = (a, b, c) break if found is None: return None a, b, c = found n = np.stack([v[a][indices], v[b][indices], v[c][indices]], axis=1).astype(np.float64) raw_norm = np.linalg.norm(n, axis=1) valid_ratio = float(np.mean(raw_norm > 1e-8)) # Many 3DGS-format PLYs contain nx/ny/nz as all-zero placeholders. # Treat them as absent, otherwise they produce a fake constant 90-degree error. if valid_ratio < 0.5 or np.nanmedian(raw_norm) < 1e-8: return None A = transform_mat[:3, :3].astype(np.float64) n = n @ A.T n = n / (np.linalg.norm(n, axis=1, keepdims=True) + 1e-12) return n.astype(np.float32) def angular_error(pred, ref): dots = np.sum(pred * ref, axis=1) dots = np.clip(np.abs(dots), 0.0, 1.0) return np.degrees(np.arccos(dots)).astype(np.float32) def angle_stats(prefix, angles): return { f"{prefix}_mean_deg": float(np.mean(angles)), f"{prefix}_median_deg": float(np.median(angles)), f"{prefix}_q25_deg": float(np.quantile(angles, 0.25)), f"{prefix}_q75_deg": float(np.quantile(angles, 0.75)), f"{prefix}_iqr_deg": float(np.quantile(angles, 0.75) - np.quantile(angles, 0.25)), } def feature_summary(prefix, x): x = np.asarray(x, dtype=np.float64) x = x[np.isfinite(x)] if len(x) == 0: return {} return { f"{prefix}_mean": float(np.mean(x)), f"{prefix}_median": float(np.median(x)), f"{prefix}_q25": float(np.quantile(x, 0.25)), f"{prefix}_q75": float(np.quantile(x, 0.75)), } def add_feature_diagnostics(result, name, feature, surface_mask, distances): feature = np.asarray(feature, dtype=np.float64) result.update(feature_summary(name, feature)) result[f"{name}_spearman_vs_minus_distance"] = safe_spearman(feature, -distances) result[f"{name}_auroc_surface_near"] = auroc_score(surface_mask, feature) result[f"{name}_ap_surface_near"] = average_precision(surface_mask, feature) bins = quantile_bin_stats(feature, surface_mask, distances, n_bins=5) for k, v in bins.items(): result[f"{name}_{k}"] = v def main(): ap = argparse.ArgumentParser() ap.add_argument("--method", required=True) ap.add_argument("--scene", required=True, choices=sorted(W.SCENE_MAP.keys())) ap.add_argument("--project-root", default="/root/autodl-tmp/SplatAtlas") ap.add_argument("--outputs-root", default=None) ap.add_argument("--tnt-eval-root", default=None) ap.add_argument("--iteration", type=int, default=None) ap.add_argument("--mode", choices=["all", "subsample"], default="subsample") ap.add_argument("--n-sample", type=int, default=200000) ap.add_argument("--seed", type=int, default=0) ap.add_argument("--distance-multiplier", type=float, default=2.0) ap.add_argument("--gt-normal-k", type=int, default=30) ap.add_argument("--recon-pca-k", type=int, default=30) ap.add_argument("--max-normal-points", type=int, default=50000) ap.add_argument("--batch-size", type=int, default=50000) ap.add_argument("--verbose", action="store_true") args = ap.parse_args() t0 = time.time() method_in = args.method if method_in in ["3dgs", "vanilla", "vanilla"]: method = "vanilla_3dgs" else: method = method_in project_root = Path(args.project_root) outputs_root = Path(args.outputs_root) if args.outputs_root else project_root / "outputs" tnt_eval_root = Path(args.tnt_eval_root) if args.tnt_eval_root else project_root / "data" / "tnt_eval" scene = args.scene.lower() official_scene = W.SCENE_MAP[scene] tau = W.TAU_DICT[scene] near_threshold = args.distance_multiplier * tau ply_path = W.locate_recon_ply(outputs_root, method, scene, args.iteration) scene_eval_dir = tnt_eval_root / official_scene gt_ply_path = scene_eval_dir / f"{official_scene}.ply" crop_path = scene_eval_dir / f"{official_scene}.json" trans_path = scene_eval_dir / f"{official_scene}_trans.txt" trans = W.read_transform(trans_path) crop = W.load_crop(crop_path) if args.verbose: print("=" * 80) print("Surfel hypothesis test") print("method:", method) print("scene:", scene, "->", official_scene) print("tau:", tau, "surface-near:", near_threshold) print("recon:", ply_path) recon_raw, recon_vertex, recon_names = W.load_vertex_data(ply_path) recon_aligned = W.apply_transform(recon_raw, trans) recon_crop_mask = W.crop_mask_tnt(recon_aligned, crop) recon_crop = recon_aligned[recon_crop_mask] recon_crop_raw_idx = np.where(recon_crop_mask)[0].astype(np.int64) if len(recon_crop) == 0: raise RuntimeError("Reconstruction crop is empty.") eval_idx_in_crop = W.choose_eval_indices( len(recon_crop), args.mode, args.n_sample, args.seed ) recon_eval = recon_crop[eval_idx_in_crop] recon_eval_raw_idx = recon_crop_raw_idx[eval_idx_in_crop] gt_raw, _, _ = W.load_vertex_data(gt_ply_path) gt_crop_mask = W.crop_mask_tnt(gt_raw, crop) gt_crop = gt_raw[gt_crop_mask] if len(gt_crop) == 0: raise RuntimeError("GT crop is empty.") gt_tree = cKDTree(gt_crop) d_r2g, nn_gt_idx = query_tree(gt_tree, recon_eval, k=1, batch_size=args.batch_size) surface_mask = d_r2g < near_threshold surface_eval_idx = np.where(surface_mask)[0].astype(np.int64) rng = np.random.default_rng(args.seed) if len(surface_eval_idx) > args.max_normal_points: normal_eval_idx = np.sort( rng.choice(surface_eval_idx, size=args.max_normal_points, replace=False) ).astype(np.int64) else: normal_eval_idx = surface_eval_idx if args.verbose: print("n_gaussians_recon:", len(recon_raw)) print("n_recon_after_crop:", len(recon_crop)) print("n_recon_eval:", len(recon_eval)) print("n_gt_after_crop:", len(gt_crop)) print("n_surface_near:", len(surface_eval_idx), "/", len(recon_eval)) print("n_normal_eval:", len(normal_eval_idx)) result = { "method": method, "method_input": method_in, "scene": scene, "official_scene": official_scene, "eval_protocol": "surfel_hypothesis_test_tnt_dense_gt_pca_proxy", "is_official_tnt_metric": False, "ply_path": str(ply_path), "gt_ply_path": str(gt_ply_path), "crop_path": str(crop_path), "trans_path": str(trans_path), "tau": float(tau), "surface_near_threshold": float(near_threshold), "surface_near_rule": f"d_recon_to_gt < {args.distance_multiplier} * tau", "mode": args.mode, "n_sample_requested": int(args.n_sample), "seed": int(args.seed), "gt_normal_k": int(args.gt_normal_k), "recon_pca_k": int(args.recon_pca_k), "max_normal_points": int(args.max_normal_points), "n_gaussians_recon": int(len(recon_raw)), "n_recon_after_crop": int(len(recon_crop)), "n_recon_eval": int(len(recon_eval)), "n_gt_after_crop": int(len(gt_crop)), "n_surface_near": int(len(surface_eval_idx)), "surface_near_ratio": float(len(surface_eval_idx) / max(len(recon_eval), 1)), "n_normal_eval": int(len(normal_eval_idx)), "distance_recon_to_gt_mean": float(np.mean(d_r2g)), "distance_recon_to_gt_median": float(np.median(d_r2g)), "distance_recon_to_gt_q25": float(np.quantile(d_r2g, 0.25)), "distance_recon_to_gt_q75": float(np.quantile(d_r2g, 0.75)), } # Feature 1: opacity. if "opacity" in recon_names: opacity = sigmoid(recon_vertex["opacity"][recon_eval_raw_idx]) add_feature_diagnostics(result, "opacity_sigmoid", opacity, surface_mask, d_r2g) # Feature 2: covariance shape, only for standard 3D Gaussian PLYs. cov_features = covariance_shape_features(recon_vertex, recon_names, recon_eval_raw_idx) if cov_features is not None: result["has_standard_covariance_shape"] = True for name, feat in cov_features.items(): add_feature_diagnostics(result, name, feat, surface_mask, d_r2g) else: result["has_standard_covariance_shape"] = False # Feature 3: local spatial PCA shape over Gaussian centers, for all methods. recon_tree = cKDTree(recon_crop) _, recon_neighbor_idx_all = query_tree( recon_tree, recon_eval, k=args.recon_pca_k, batch_size=args.batch_size ) recon_pca_normals_all, spatial_linearity, spatial_planarity, spatial_scattering = pca_normals_and_shape( recon_crop, recon_neighbor_idx_all, batch_size=args.batch_size ) add_feature_diagnostics(result, "spatial_pca_linearity", spatial_linearity, surface_mask, d_r2g) add_feature_diagnostics(result, "spatial_pca_planarity", spatial_planarity, surface_mask, d_r2g) add_feature_diagnostics(result, "spatial_pca_scattering", spatial_scattering, surface_mask, d_r2g) # Normal consistency on surface-near subset. if len(normal_eval_idx) > 0: chosen_points = recon_eval[normal_eval_idx] chosen_raw_idx = recon_eval_raw_idx[normal_eval_idx] nearest_gt_points = gt_crop[nn_gt_idx[normal_eval_idx]] _, gt_neighbor_idx = query_tree( gt_tree, nearest_gt_points, k=args.gt_normal_k, batch_size=args.batch_size ) gt_normals, gt_linearity, gt_planarity, gt_scattering = pca_normals_and_shape( gt_crop, gt_neighbor_idx, batch_size=args.batch_size ) # Center PCA normals already computed for all eval points. center_pca_normals = recon_pca_normals_all[normal_eval_idx] center_pca_err = angular_error(center_pca_normals, gt_normals) result.update(angle_stats("center_pca_normal_error", center_pca_err)) # V3 normals if standard. v3 = gaussian_v3_normals(recon_vertex, recon_names, chosen_raw_idx, trans) if v3 is not None: v3_err = angular_error(v3, gt_normals) result["has_v3_normal"] = True result.update(angle_stats("v3_normal_error", v3_err)) result["median_delta_v3_minus_center_pca_deg"] = float( np.median(v3_err) - np.median(center_pca_err) ) result["center_pca_better_than_v3_by_median"] = bool( np.median(center_pca_err) < np.median(v3_err) ) else: result["has_v3_normal"] = False # 2DGS-specific normal: perpendicular to the two in-plane scale axes. n2d = two_dgs_normals(recon_vertex, recon_names, chosen_raw_idx, trans) if n2d is not None: n2d_err = angular_error(n2d, gt_normals) result["has_2dgs_normal"] = True result.update(angle_stats("two_dgs_normal_error", n2d_err)) result["median_delta_2dgs_minus_center_pca_deg"] = float( np.median(n2d_err) - np.median(center_pca_err) ) result["center_pca_better_than_2dgs_by_median"] = bool( np.median(center_pca_err) < np.median(n2d_err) ) else: result["has_2dgs_normal"] = False # Explicit normal if available, e.g. 2DGS / surfel / PGSR / GOF exports. expn = explicit_normals(recon_vertex, recon_names, chosen_raw_idx, trans) if expn is not None: exp_err = angular_error(expn, gt_normals) result["has_explicit_normal"] = True result.update(angle_stats("explicit_normal_error", exp_err)) result["median_delta_explicit_minus_center_pca_deg"] = float( np.median(exp_err) - np.median(center_pca_err) ) result["center_pca_better_than_explicit_by_median"] = bool( np.median(center_pca_err) < np.median(exp_err) ) else: result["has_explicit_normal"] = False # Does anisotropy / planarity predict normal reliability? norm_dist = d_r2g[normal_eval_idx] dummy_surface = np.ones_like(norm_dist, dtype=bool) if cov_features is not None: for name, feat_all in cov_features.items(): feat = feat_all[normal_eval_idx] if result.get("has_v3_normal", False): result[f"{name}_spearman_vs_minus_v3_error"] = safe_spearman(feat, -v3_err) result[f"{name}_spearman_vs_minus_center_pca_error"] = safe_spearman(feat, -center_pca_err) spatial_feats = { "spatial_pca_linearity": spatial_linearity[normal_eval_idx], "spatial_pca_planarity": spatial_planarity[normal_eval_idx], "spatial_pca_scattering": spatial_scattering[normal_eval_idx], } for name, feat in spatial_feats.items(): if result.get("has_v3_normal", False): result[f"{name}_spearman_vs_minus_v3_error"] = safe_spearman(feat, -v3_err) if result.get("has_explicit_normal", False): result[f"{name}_spearman_vs_minus_explicit_error"] = safe_spearman(feat, -exp_err) result[f"{name}_spearman_vs_minus_center_pca_error"] = safe_spearman(feat, -center_pca_err) result.update(feature_summary("gt_local_pca_planarity", gt_planarity)) result.update(feature_summary("gt_local_pca_linearity", gt_linearity)) result.update(feature_summary("gt_local_pca_scattering", gt_scattering)) else: result["has_v3_normal"] = False result["has_explicit_normal"] = False result["normal_eval_skipped_reason"] = "No surface-near Gaussians." result["wall_time_seconds"] = float(time.time() - t0) out_dir = Path(outputs_root) / "surfel_hypothesis" / f"{method}_{scene}" out_dir.mkdir(parents=True, exist_ok=True) out_path = out_dir / "surfel_hypothesis_eval.json" with open(out_path, "w") as f: json.dump(result, f, indent=2, sort_keys=True) print(json.dumps(result, indent=2, sort_keys=True)) print(f"\n[WROTE] {out_path}") if __name__ == "__main__": main()