s23-model / local_eval.py
xsponenta
Add vertex POSITION REGRESSOR (DINOv2) on top of edge classifier
9bb88ff
Raw
History Blame Contribute Delete
33.4 kB
"""Local HSS evaluation harness for S23DR submissions.
Streams N samples from the trainval dataset, runs the full pipeline
(fuse → predict → triangulation → 2D filter), computes HSS against the
ground truth, and reports mean / quartiles plus per-sample DIAG lines.
Use this to validate any change BEFORE pushing to the leaderboard:
python local_eval.py # default 50 samples
python local_eval.py 100 # 100 samples
python local_eval.py 100 --no-filter # skip the 2D edge filter
"""
import os
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'
import sys
import time
import argparse
from pathlib import Path
SCRIPT_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(SCRIPT_DIR))
import numpy as np
import torch
from datasets import load_dataset
from hoho2025.metric_helper import hss
import script
from s23dr_2026_example.point_fusion import FuserConfig
def parse_args():
p = argparse.ArgumentParser()
p.add_argument("n_samples", type=int, nargs="?", default=50,
help="number of samples to evaluate")
p.add_argument("--no-filter", action="store_true",
help="disable the 2D edge filter (compare A/B)")
p.add_argument("--orphan-only", action="store_true",
help="skip 2D edge filter, apply only orphan-vertex cleanup")
p.add_argument("--strict-no-support", action="store_true",
help="use the asymmetric filter: drop only edges with NO support in any view")
p.add_argument("--no-tracks", action="store_true",
help="disable the triangulation track ensemble")
p.add_argument("--seed", type=int, default=2718,
help="rng seed for point fusion priority sampling")
p.add_argument("--label", type=str, default="run",
help="label printed in summary line")
p.add_argument("--conf-thresh", type=float, default=None,
help="override CONF_THRESH in script.py for this run")
p.add_argument("--snap-apex", action="store_true",
help="extend snap_to_point_cloud target_classes to include apex (class 0)")
p.add_argument("--vertex-refine", action="store_true",
help="apply vertex view-projection refinement after all post-process")
p.add_argument("--refine-max-pixel-dist", type=float, default=15.0,
help="vertex refine: max 2D pixel distance for corner matching")
p.add_argument("--refine-min-views", type=int, default=2,
help="vertex refine: min views with 2D match")
p.add_argument("--refine-max-move", type=float, default=0.5,
help="vertex refine: max 3D displacement in meters")
p.add_argument("--tta", action="store_true",
help="enable multi-seed TTA (3 priority-sample seeds, concat segments)")
p.add_argument("--tta-hungarian", action="store_true",
help="use Hungarian-matched averaging TTA (rejects unmatched segments)")
p.add_argument("--tta-min-passes", type=int, default=1,
help="hungarian TTA: drop anchor segments without this many supporting passes")
p.add_argument("--tta-seeds", type=str, default="2718,31415,42",
help="comma-separated priority-sample seeds for TTA")
p.add_argument("--tracks-only", action="store_true",
help="output ONLY the triangulation tracks (debug: baseline of tracks alone)")
p.add_argument("--fallback-to-tracks-when", type=str, default="",
help="fallback to tracks-only when pred_v > X and track_v < Y, e.g. 20,8")
p.add_argument("--hallu-filter", action="store_true",
help="filter vertices lacking BOTH COLMAP and gestalt-corner support")
p.add_argument("--hallu-colmap-radius", type=float, default=0.8,
help="hallucination filter: COLMAP support radius (meters)")
p.add_argument("--hallu-gestalt-px", type=float, default=20.0,
help="hallucination filter: gestalt corner pixel radius")
p.add_argument("--hallu-min-views", type=int, default=1,
help="hallucination filter: min views with gestalt support")
p.add_argument("--ensemble", type=str, default="",
help="comma-separated paths to ensemble checkpoints (additional to default checkpoint.pt)")
p.add_argument("--ensemble-min-passes", type=int, default=1,
help="ensemble: min cross-pass agreement to keep anchor segments")
p.add_argument("--bundle-adjust", action="store_true",
help="apply joint multi-view wireframe bundle adjustment")
p.add_argument("--ba-iter", type=int, default=50,
help="bundle-adjust Adam iterations")
p.add_argument("--ba-lr", type=float, default=0.003,
help="bundle-adjust learning rate")
p.add_argument("--ba-vertex-weight", type=float, default=1.0,
help="bundle-adjust vertex (corner pixel) loss weight")
p.add_argument("--ba-edge-weight", type=float, default=0.5,
help="bundle-adjust edge (edge pixel) loss weight")
p.add_argument("--ba-anchor-weight", type=float, default=200.0,
help="bundle-adjust anchor regularization weight")
p.add_argument("--ba-max-move", type=float, default=0.4,
help="bundle-adjust hard cap on vertex displacement (meters)")
p.add_argument("--tri-supplement", action="store_true",
help="supplement sparse predictions with loose (min_views=2) tracks")
p.add_argument("--tri-sparse-threshold", type=int, default=5,
help="tri-supplement: only activate when pred has < N vertices")
p.add_argument("--tri-merge-radius", type=float, default=0.7,
help="tri-supplement: absorb loose vertex into pred if within radius")
p.add_argument("--edge-classifier", type=str, default="",
help="path to edge_classifier.pt; if set, filter edges via learned model")
p.add_argument("--edge-classifier-v2", type=str, default="",
help="path to edge_classifier_v2.pt; if set, use v2 with image-mask features")
p.add_argument("--edge-classifier-v3", type=str, default="",
help="path to edge_classifier_v3.pt; v3 = CNN patches + v2 features")
p.add_argument("--edge-classifier-v4", type=str, default="",
help="path to edge_classifier_v4.pt; v4 = DINOv2 features + v2 features")
p.add_argument("--vertex-regressor-v4", type=str, default="",
help="path to vertex_regressor_v4.pt; learned 3D position refinement")
p.add_argument("--vertex-reg-max-move", type=float, default=0.4,
help="vertex regressor: clamp predicted offset to this magnitude")
p.add_argument("--vertex-classifier-v4", type=str, default="",
help="path to vertex_classifier_v4.pt; drops low-conf vertices")
p.add_argument("--vertex-class-thresh", type=float, default=0.3,
help="vertex classifier: keep if P(keep) >= threshold")
p.add_argument("--vertex-class-min-keep", type=float, default=0.85,
help="vertex classifier: never drop more than (1 - this) of vertices")
p.add_argument("--edge-class-thresh", type=float, default=0.5,
help="edge classifier: keep edges with P(keep) >= threshold")
p.add_argument("--edge-class-min-keep", type=float, default=0.5,
help="edge classifier: never drop more than (1 - this) of edges")
p.add_argument("--edge-fill", action="store_true",
help="enable edge filling from 2D mask evidence")
p.add_argument("--fill-min-views", type=int, default=2,
help="edge fill: min views supporting a new edge")
p.add_argument("--fill-min-frac", type=float, default=0.40,
help="edge fill: min support fraction along projected segment")
p.add_argument("--fill-max-length", type=float, default=5.0,
help="edge fill: max edge length in meters")
return p.parse_args()
def predict_one(sample, model, device, cfg, rng,
use_tracks=True, use_2d_filter=True, orphan_only=False,
strict_no_support=False, vertex_refine=False,
refine_kwargs=None, edge_fill=False, fill_kwargs=None,
tta=False, tta_seeds=None,
tracks_only=False, fallback_tracks=None):
"""Run the full inference pipeline on one sample. Returns (pv, pe, diag)."""
diag = {"colmap": -1, "fused": 0, "track_v": 0, "track_e": 0,
"pred_v": 0, "pred_e": 0, "2dfilt_in": 0, "2dfilt_out": 0,
"status": "ok"}
try:
from hoho2025.example_solutions import convert_entry_to_human_readable
g = convert_entry_to_human_readable(sample)
rec = g.get('colmap') or g.get('colmap_binary')
if rec is not None:
diag["colmap"] = len(rec.points3D)
except Exception:
pass
if getattr(predict_one, "_ensemble_models", None):
try:
from ensemble import predict_sample_ensemble
# If --tta and --ensemble both set, use TTA seeds; else single seed.
if tta:
seeds = tta_seeds or (2718, 31415, 42)
else:
seeds = (2718,)
pred_v, pred_e = predict_sample_ensemble(
sample, cfg, predict_one._ensemble_models, device,
seeds=tuple(seeds),
min_passes_for_keep=getattr(predict_one, "_ensemble_min_passes", 1),
)
diag["fused"] = -1
except Exception as e:
diag["status"] = f"ensemble_failed:{type(e).__name__}"
return *script.empty_solution(), diag
elif tta:
try:
seeds = tta_seeds or (2718, 31415, 42)
tta_method = (
"predict_sample_tta_hungarian"
if getattr(predict_one, "_tta_hungarian", False)
else "predict_sample_tta"
)
import tta as _tta_mod
fn = getattr(_tta_mod, tta_method)
if tta_method == "predict_sample_tta_hungarian":
pred_v, pred_e = fn(
sample, cfg, model, device, seeds=tuple(seeds),
min_passes_for_keep=getattr(predict_one, "_tta_min_passes", 1),
)
else:
pred_v, pred_e = fn(
sample, cfg, model, device, seeds=tuple(seeds))
diag["fused"] = -1 # not single-seed
except Exception as e:
diag["status"] = f"tta_failed:{type(e).__name__}"
return *script.empty_solution(), diag
else:
fused = script.fuse_and_sample(sample, cfg, rng)
if fused is None:
diag["status"] = "fuse_failed"
return *script.empty_solution(), diag
diag["fused"] = len(fused["xyz_norm"])
try:
pred_v, pred_e = script.predict_sample(fused, model, device)
except Exception as e:
diag["status"] = f"predict_failed:{type(e).__name__}"
return *script.empty_solution(), diag
if use_tracks:
try:
from triangulation import predict_wireframe_tracks
track_v, track_e = predict_wireframe_tracks(sample, min_views=3)
diag["track_v"] = len(track_v) if track_v is not None else 0
diag["track_e"] = len(track_e) if track_e is not None else 0
pred_v, pred_e = script.hybrid_merge(
pred_v, pred_e, track_v, track_e, merge_radius=0.8)
except Exception as e:
diag["status"] = f"track_failed:{type(e).__name__}"
# Sparse-scene supplement: when pred remains tiny after model + min_views=3
# tracks, fall back to min_views=2 loose tracks.
if getattr(predict_one, "_tri_supplement", False):
try:
from triangulate_supplement import supplement_sparse_with_loose_tracks
v_before = len(pred_v) if hasattr(pred_v, '__len__') else 0
pred_v, pred_e = supplement_sparse_with_loose_tracks(
pred_v, pred_e, sample,
sparse_threshold=getattr(predict_one, "_tri_sparse_threshold", 5),
merge_radius=getattr(predict_one, "_tri_merge_radius", 0.7),
)
v_after = len(pred_v) if hasattr(pred_v, '__len__') else 0
diag["tri_added"] = v_after - v_before
except Exception as e:
diag["status"] = f"tri_failed:{type(e).__name__}"
diag["2dfilt_in"] = len(pred_e) if hasattr(pred_e, '__len__') else 0
# Vertex refinement runs FIRST (refines vertex positions while orphan is still present;
# orphan/2d-filter then cleans up afterwards).
if vertex_refine:
try:
from vertex_refine import refine_vertices_view_projection
pv_before = pred_v
pred_v, pred_e = refine_vertices_view_projection(
pred_v, pred_e, sample,
**(refine_kwargs or {}))
if hasattr(pred_v, '__len__') and len(pred_v) == len(pv_before):
moved = int(np.sum(np.linalg.norm(
np.asarray(pred_v) - np.asarray(pv_before), axis=1) > 1e-6))
diag["refined"] = moved
except Exception as e:
diag["status"] = f"refine_failed:{type(e).__name__}"
if orphan_only:
try:
from edge_2d_filter import drop_orphan_vertices
pred_v, pred_e = drop_orphan_vertices(pred_v, pred_e)
except Exception as e:
diag["status"] = f"orphan_failed:{type(e).__name__}"
elif strict_no_support:
try:
from edge_2d_filter import filter_edges_strict_no_support
pred_v, pred_e = filter_edges_strict_no_support(
pred_v, pred_e, sample,
max_support_thresh=0.10, dilate_px=4, sample_steps=20)
except Exception as e:
diag["status"] = f"strict_failed:{type(e).__name__}"
elif use_2d_filter:
try:
from edge_2d_filter import filter_edges_by_2d_support
pred_v, pred_e = filter_edges_by_2d_support(
pred_v, pred_e, sample,
min_views_support=2, min_pixel_frac=0.25,
dilate_px=4, sample_steps=20)
except Exception as e:
diag["status"] = f"2dfilt_failed:{type(e).__name__}"
diag["2dfilt_out"] = len(pred_e) if hasattr(pred_e, '__len__') else 0
# Vertex regressor (moves vertices toward learned position before classifier)
vr = getattr(predict_one, "_vertex_regressor", None)
if vr is not None:
try:
from vertex_regressor_v4 import refine_vertices_with_regressor
pred_v, pred_e = refine_vertices_with_regressor(
pred_v, pred_e, sample,
vr["model"], vr["dino"], device=vr["dino_device"],
feature_mean=vr["mean"], feature_std=vr["std"],
edge_feat_mean=vr["edge_feat_mean"], edge_feat_std=vr["edge_feat_std"],
max_move_meters=vr["max_move_meters"],
)
except Exception as e:
diag["status"] = f"vr_failed:{type(e).__name__}"
# Vertex classifier (drops low-conf vertices before edge classifier)
vc = getattr(predict_one, "_vertex_classifier", None)
if vc is not None:
try:
from vertex_classifier_v4 import classify_vertices_v4
v_before = len(pred_v) if hasattr(pred_v, '__len__') else 0
pred_v, pred_e = classify_vertices_v4(
pred_v, pred_e, sample,
vc["model"], vc["dino"], device=vc["dino_device"],
threshold=vc["threshold"],
feature_mean=vc["mean"], feature_std=vc["std"],
edge_feat_mean=vc["edge_feat_mean"], edge_feat_std=vc["edge_feat_std"],
min_keep_frac=vc["min_keep_frac"],
)
diag["vc_kept"] = (len(pred_v) if hasattr(pred_v, '__len__') else 0)
diag["vc_dropped"] = v_before - diag["vc_kept"]
except Exception as e:
diag["status"] = f"vc_failed:{type(e).__name__}"
# Edge classifier: learned keep/drop on top of post-filter edges.
ec = getattr(predict_one, "_edge_classifier", None)
if ec is not None:
try:
e_before = len(pred_e) if hasattr(pred_e, '__len__') else 0
ver = ec.get("version", 1)
if ver == 4:
from edge_classifier_v4 import classify_edges_v4
pred_v, pred_e = classify_edges_v4(
pred_v, pred_e, sample,
ec["model"], ec["dino"], device=ec["dino_device"],
threshold=ec["threshold"],
feature_mean=ec["mean"], feature_std=ec["std"],
edge_feat_mean=ec["edge_feat_mean"], edge_feat_std=ec["edge_feat_std"],
min_keep_frac=ec["min_keep_frac"],
)
else:
if ver == 3:
from edge_classifier_v3 import classify_edges_v3 as _cls_fn
elif ver == 2:
from edge_classifier_v2 import classify_edges_v2 as _cls_fn
else:
from edge_classifier import classify_edges as _cls_fn
pred_v, pred_e = _cls_fn(
pred_v, pred_e, sample,
ec["model"], threshold=ec["threshold"],
feature_mean=ec["mean"], feature_std=ec["std"],
min_keep_frac=ec["min_keep_frac"],
)
diag["ec_kept"] = (len(pred_e) if hasattr(pred_e, '__len__') else 0)
diag["ec_dropped"] = e_before - diag["ec_kept"]
except Exception as e:
diag["status"] = f"ec_failed:{type(e).__name__}"
if edge_fill:
e_before = len(pred_e) if hasattr(pred_e, '__len__') else 0
try:
from edge_fill import fill_missing_edges_from_2d
pred_v, pred_e = fill_missing_edges_from_2d(
pred_v, pred_e, sample,
**(fill_kwargs or {}))
diag["filled"] = (len(pred_e) if hasattr(pred_e, '__len__') else 0) - e_before
except Exception as e:
diag["status"] = f"fill_failed:{type(e).__name__}"
if getattr(predict_one, "_bundle_adjust", False):
try:
from bundle_wireframe import bundle_adjust_wireframe
ba_kwargs = getattr(predict_one, "_ba_kwargs", {})
pred_v, pred_e = bundle_adjust_wireframe(
pred_v, pred_e, sample, **ba_kwargs)
except Exception as e:
diag["status"] = f"ba_failed:{type(e).__name__}"
# Optional: hallucination filter (drop vertices lacking both COLMAP and gestalt support)
if getattr(predict_one, "_hallu_filter", False):
try:
from hallucination_filter import filter_hallucinated_vertices
n_before = len(pred_v) if hasattr(pred_v, '__len__') else 0
pred_v, pred_e = filter_hallucinated_vertices(
pred_v, pred_e, sample,
colmap_radius=getattr(predict_one, "_hallu_colmap_radius", 0.8),
gestalt_radius_px=getattr(predict_one, "_hallu_gestalt_px", 20.0),
min_views_with_gestalt=getattr(predict_one, "_hallu_min_views", 1),
)
n_after = len(pred_v) if hasattr(pred_v, '__len__') else 0
diag["hallu_dropped"] = n_before - n_after
except Exception as e:
diag["status"] = f"hallu_failed:{type(e).__name__}"
# Optional: replace prediction with tracks-only on hard scenes
if tracks_only:
# Use the tracks computed earlier (regardless of pred quality)
try:
from triangulation import predict_wireframe_tracks
track_v, track_e = predict_wireframe_tracks(sample, min_views=3)
if track_v is not None and track_e is not None and len(track_v) >= 2 and len(track_e) >= 1:
pred_v = np.asarray(track_v, dtype=np.float32)
pred_e = list(track_e)
diag["status"] = "tracks_only_forced"
except Exception:
pass
elif fallback_tracks is not None:
pred_v_thresh, track_v_thresh = fallback_tracks # tuple
n_pv = len(pred_v) if hasattr(pred_v, '__len__') else 0
if n_pv > pred_v_thresh and diag.get("track_v", 0) < track_v_thresh:
try:
from triangulation import predict_wireframe_tracks
track_v, track_e = predict_wireframe_tracks(sample, min_views=3)
if track_v is not None and track_e is not None and len(track_v) >= 2 and len(track_e) >= 1:
pred_v = np.asarray(track_v, dtype=np.float32)
pred_e = list(track_e)
diag["status"] = "fallback_to_tracks"
except Exception:
pass
diag["pred_v"] = len(pred_v) if hasattr(pred_v, '__len__') else 0
diag["pred_e"] = len(pred_e) if hasattr(pred_e, '__len__') else 0
return pred_v, pred_e, diag
def main():
args = parse_args()
print(f"=== Local eval | {args.n_samples} samples | "
f"tracks={'on' if not args.no_tracks else 'OFF'} | "
f"2dfilt={'on' if not args.no_filter else 'OFF'} | "
f"label={args.label} ===")
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(f"Device: {device}")
ckpt_path = SCRIPT_DIR / "checkpoint.pt"
if not ckpt_path.exists() or ckpt_path.stat().st_size < 1000:
import urllib.request
url = ("https://huggingface.co/jacklangerman/s23dr-2026-submission/"
"resolve/main/checkpoint.pt")
print(f"Downloading checkpoint.pt ...")
urllib.request.urlretrieve(url, str(ckpt_path))
model = script.load_model(ckpt_path, device)
print(f"Model: {sum(p.numel() for p in model.parameters()):,} params")
ensemble_models = None
if args.ensemble:
from ensemble import load_two_checkpoints
extra_paths = [p.strip() for p in args.ensemble.split(",") if p.strip()]
ensemble_models = [model] + load_two_checkpoints(extra_paths, device)
print(f"Ensemble: {len(ensemble_models)} models")
if args.edge_classifier:
from edge_classifier import load_classifier
ec_model, ec_mean, ec_std = load_classifier(args.edge_classifier, device="cpu")
print(f"Edge classifier loaded from {args.edge_classifier} "
f"(thresh={args.edge_class_thresh}, min_keep={args.edge_class_min_keep})")
predict_one._edge_classifier_loaded = {
"model": ec_model,
"mean": ec_mean.cpu().numpy() if hasattr(ec_mean, 'cpu') else ec_mean,
"std": ec_std.cpu().numpy() if hasattr(ec_std, 'cpu') else ec_std,
"threshold": args.edge_class_thresh,
"min_keep_frac": args.edge_class_min_keep,
"version": 1,
}
if args.edge_classifier_v2:
from edge_classifier_v2 import load_classifier_v2
ec_model, ec_mean, ec_std = load_classifier_v2(args.edge_classifier_v2, device="cpu")
print(f"Edge classifier V2 loaded from {args.edge_classifier_v2}")
predict_one._edge_classifier_loaded = {
"model": ec_model,
"mean": ec_mean.cpu().numpy() if hasattr(ec_mean, 'cpu') else ec_mean,
"std": ec_std.cpu().numpy() if hasattr(ec_std, 'cpu') else ec_std,
"threshold": args.edge_class_thresh,
"min_keep_frac": args.edge_class_min_keep,
"version": 2,
}
if args.edge_classifier_v3:
from edge_classifier_v3 import load_classifier_v3
ec_model, ec_mean, ec_std = load_classifier_v3(args.edge_classifier_v3, device="cpu")
print(f"Edge classifier V3 (CNN) loaded from {args.edge_classifier_v3}")
predict_one._edge_classifier_loaded = {
"model": ec_model,
"mean": ec_mean.cpu().numpy() if hasattr(ec_mean, 'cpu') else ec_mean,
"std": ec_std.cpu().numpy() if hasattr(ec_std, 'cpu') else ec_std,
"threshold": args.edge_class_thresh,
"min_keep_frac": args.edge_class_min_keep,
"version": 3,
}
if args.vertex_regressor_v4:
from vertex_regressor_v4 import load_regressor_v4
from edge_classifier_v4 import get_dino_model
vr_model, vrg_mean, vrg_std, vre_mean, vre_std = load_regressor_v4(
args.vertex_regressor_v4, device="cpu")
dino_vr = get_dino_model(device=device)
print(f"Vertex regressor V4 loaded from {args.vertex_regressor_v4}")
predict_one._vertex_regressor_loaded = {
"model": vr_model,
"dino": dino_vr,
"dino_device": device,
"mean": vrg_mean.cpu().numpy() if hasattr(vrg_mean, "cpu") else vrg_mean,
"std": vrg_std.cpu().numpy() if hasattr(vrg_std, "cpu") else vrg_std,
"edge_feat_mean": vre_mean.cpu().numpy() if hasattr(vre_mean, "cpu") else vre_mean,
"edge_feat_std": vre_std.cpu().numpy() if hasattr(vre_std, "cpu") else vre_std,
"max_move_meters": args.vertex_reg_max_move,
}
if args.vertex_classifier_v4:
from vertex_classifier_v4 import load_classifier_v4 as load_vc4
from edge_classifier_v4 import get_dino_model
vc_model, vg_mean, vg_std, ve_mean, ve_std = load_vc4(args.vertex_classifier_v4, device="cpu")
dino_v = get_dino_model(device=device)
print(f"Vertex classifier V4 loaded from {args.vertex_classifier_v4}")
predict_one._vertex_classifier_loaded = {
"model": vc_model,
"dino": dino_v,
"dino_device": device,
"mean": vg_mean.cpu().numpy() if hasattr(vg_mean, "cpu") else vg_mean,
"std": vg_std.cpu().numpy() if hasattr(vg_std, "cpu") else vg_std,
"edge_feat_mean": ve_mean.cpu().numpy() if hasattr(ve_mean, "cpu") else ve_mean,
"edge_feat_std": ve_std.cpu().numpy() if hasattr(ve_std, "cpu") else ve_std,
"threshold": args.vertex_class_thresh,
"min_keep_frac": args.vertex_class_min_keep,
}
if args.edge_classifier_v4:
from edge_classifier_v4 import load_classifier_v4, get_dino_model
ec_model, g_mean, g_std, e_mean, e_std = load_classifier_v4(args.edge_classifier_v4, device="cpu")
# DINO runs on the inference device for speed
dino = get_dino_model(device=device)
print(f"Edge classifier V4 (DINOv2) loaded from {args.edge_classifier_v4}")
predict_one._edge_classifier_loaded = {
"model": ec_model,
"dino": dino,
"dino_device": device,
"mean": g_mean.cpu().numpy() if hasattr(g_mean, 'cpu') else g_mean,
"std": g_std.cpu().numpy() if hasattr(g_std, 'cpu') else g_std,
"edge_feat_mean": e_mean.cpu().numpy() if hasattr(e_mean, 'cpu') else e_mean,
"edge_feat_std": e_std.cpu().numpy() if hasattr(e_std, 'cpu') else e_std,
"threshold": args.edge_class_thresh,
"min_keep_frac": args.edge_class_min_keep,
"version": 4,
}
if args.conf_thresh is not None:
print(f"Overriding script.CONF_THRESH: {script.CONF_THRESH} -> {args.conf_thresh}")
script.CONF_THRESH = args.conf_thresh
if args.snap_apex:
print("Monkey-patching snap_to_point_cloud to include apex (class 0)")
from s23dr_2026_example import postprocess_v2 as _pp
_orig_snap = _pp.snap_to_point_cloud
def _snap_with_apex(vertices, xyz, class_id, snap_radius=0.5, target_classes=None):
return _orig_snap(vertices, xyz, class_id, snap_radius=snap_radius,
target_classes=target_classes or [0, 1, 2])
_pp.snap_to_point_cloud = _snap_with_apex
script.snap_to_point_cloud = _snap_with_apex
ds = load_dataset(
'usm3d/hoho22k_2026_trainval', split='train',
streaming=True, trust_remote_code=True)
cfg = FuserConfig()
rng = np.random.RandomState(args.seed)
scores = []
diags = []
t_start = time.time()
for idx, sample in enumerate(ds):
if idx >= args.n_samples:
break
order_id = sample.get('order_id', str(idx))
gt_v = sample.get('wf_vertices')
gt_e = sample.get('wf_edges')
if gt_v is None or gt_e is None:
print(f"[{idx}] {order_id}: SKIP (no GT)")
continue
try:
refine_kwargs = {
"max_pixel_dist": args.refine_max_pixel_dist,
"min_views": args.refine_min_views,
"max_move_meters": args.refine_max_move,
}
fill_kwargs = {
"min_views_support": args.fill_min_views,
"min_pixel_frac": args.fill_min_frac,
"max_edge_length_meters": args.fill_max_length,
}
tta_seeds_tuple = tuple(int(s) for s in args.tta_seeds.split(","))
predict_one._tta_hungarian = args.tta_hungarian
predict_one._tta_min_passes = args.tta_min_passes
predict_one._hallu_filter = args.hallu_filter
predict_one._hallu_colmap_radius = args.hallu_colmap_radius
predict_one._hallu_gestalt_px = args.hallu_gestalt_px
predict_one._hallu_min_views = args.hallu_min_views
predict_one._ensemble_models = ensemble_models
predict_one._ensemble_min_passes = args.ensemble_min_passes
predict_one._bundle_adjust = args.bundle_adjust
predict_one._ba_kwargs = {
"n_iter": args.ba_iter,
"lr": args.ba_lr,
"vertex_weight": args.ba_vertex_weight,
"edge_weight": args.ba_edge_weight,
"anchor_weight": args.ba_anchor_weight,
"max_move_meters": args.ba_max_move,
}
predict_one._tri_supplement = args.tri_supplement
predict_one._tri_sparse_threshold = args.tri_sparse_threshold
predict_one._tri_merge_radius = args.tri_merge_radius
predict_one._edge_classifier = getattr(predict_one, "_edge_classifier_loaded", None)
predict_one._vertex_classifier = getattr(predict_one, "_vertex_classifier_loaded", None)
predict_one._vertex_regressor = getattr(predict_one, "_vertex_regressor_loaded", None)
fallback_tracks = None
if args.fallback_to_tracks_when:
pv_thr, tv_thr = args.fallback_to_tracks_when.split(",")
fallback_tracks = (int(pv_thr), int(tv_thr))
pred_v, pred_e, diag = predict_one(
sample, model, device, cfg, rng,
use_tracks=not args.no_tracks,
use_2d_filter=not args.no_filter,
orphan_only=args.orphan_only,
strict_no_support=args.strict_no_support,
vertex_refine=args.vertex_refine,
refine_kwargs=refine_kwargs,
edge_fill=args.edge_fill,
fill_kwargs=fill_kwargs,
tta=args.tta,
tta_seeds=tta_seeds_tuple,
tracks_only=args.tracks_only,
fallback_tracks=fallback_tracks)
if torch.backends.mps.is_available():
torch.mps.empty_cache()
res = hss(np.asarray(pred_v), pred_e, np.asarray(gt_v), gt_e)
score = float(res.hss) if hasattr(res, 'hss') else float(res)
scores.append(score)
diags.append({"order_id": order_id, "score": score, **diag})
print(f"[{idx:3d}] {order_id} hss={score:.4f} "
f"colmap={diag['colmap']} fused={diag['fused']} "
f"track={diag['track_v']}/{diag['track_e']} "
f"pred={diag['pred_v']}/{diag['pred_e']} "
f"2dfilt={diag['2dfilt_in']}->{diag['2dfilt_out']} "
f"{diag['status']}")
except Exception as e:
import traceback
print(f"[{idx}] {order_id} EVAL CRASH: {e}")
traceback.print_exc()
elapsed = time.time() - t_start
scores = np.array(scores)
if len(scores) == 0:
print("\nNo valid scores.")
return
print(f"\n=== {args.label} | {len(scores)}/{args.n_samples} samples | "
f"{elapsed:.0f}s ({elapsed/max(len(scores),1):.1f}s/sample) ===")
print(f" hss_mean = {scores.mean():.4f}")
print(f" hss_q5 = {np.quantile(scores, 0.05):.4f}")
print(f" hss_q25 = {np.quantile(scores, 0.25):.4f}")
print(f" hss_q50 = {np.quantile(scores, 0.50):.4f}")
print(f" hss_q75 = {np.quantile(scores, 0.75):.4f}")
print(f" hss_q95 = {np.quantile(scores, 0.95):.4f}")
print(f" hss_min = {scores.min():.4f} hss_max = {scores.max():.4f}")
# Save per-sample details for later analysis
import json
out_path = SCRIPT_DIR / f"local_eval_{args.label}.json"
with out_path.open("w") as f:
json.dump(diags, f, indent=2)
print(f" Per-sample details: {out_path}")
if __name__ == "__main__":
main()