subm / triangulation.py
Neritz's picture
Add handcrafted_submission_2026 contents
5b6e3d8 verified
"""Multi-view corner triangulation pipeline (T1.2 – T1.6).
Drop-in replacement for the depth-based ``project_vertices_to_3d`` step in
``sklearn_submission.py``. The depth map is only used as a sanity filter, never
as the source of 3D positions β€” the actual geometry comes from COLMAP cameras
via DLT triangulation.
Entry points:
detect_corners_per_view(entry) β†’ dict[view_id β†’ List[Corner]]
triangulate_wireframe(entry, corners_per_view) β†’ Tracks + per-track obs
Everything is pure numpy + pycolmap + cv2 β€” no torch, no kornia.
"""
from __future__ import annotations
import numpy as np
import cv2
from dataclasses import dataclass, field
from hoho2025.example_solutions import (
convert_entry_to_human_readable,
filter_vertices_by_background,
point_to_segment_dist,
)
from hoho2025.color_mappings import gestalt_color_mapping
try:
from mvs_utils import (
collect_views, triangulate_dlt, mean_reprojection_error,
fundamental_matrix, epipolar_line, point_to_line_distance,
project_world_to_image,
)
except ImportError:
from submission.mvs_utils import (
collect_views, triangulate_dlt, mean_reprojection_error,
fundamental_matrix, epipolar_line, point_to_line_distance,
project_world_to_image,
)
# Vertex classes we consider (minus 'post' β€” added later in T1.7 when safe).
VERTEX_CLASSES = ['apex', 'eave_end_point', 'flashing_end_point']
EDGE_CLASSES = ['eave', 'ridge', 'rake', 'valley', 'hip']
@dataclass
class Corner:
"""A 2D corner detected on a single view."""
view_id: str
xy: np.ndarray # (2,) float32 pixel coords at COLMAP-native resolution
cls: str # gestalt class label
blob_area: int # area of the connected component, for tie-breaks
@dataclass
class Track:
"""A 3D wireframe vertex with its per-view observations."""
xyz: np.ndarray # (3,) float64
cls: str
observations: list[tuple[str, np.ndarray]] = field(default_factory=list)
reproj_err: float = float("inf")
# view_id β†’ index into corners_per_view[view_id]. Populated by build_tracks
# when per-view edges need to be lifted to 3D.
corner_indices: dict[str, int] = field(default_factory=dict)
def _refine_centroids_subpix(gest_seg_np, centroids, max_shift=4.0, win=5):
"""cv2.cornerSubPix refinement inside an apex blob. Identical to the
version in sklearn_submission.py β€” duplicated here to keep triangulation.py
importable on its own.
"""
if len(centroids) == 0:
return centroids
gray = cv2.cvtColor(gest_seg_np, cv2.COLOR_RGB2GRAY)
gray = cv2.GaussianBlur(gray, (3, 3), 0)
pts = np.asarray(centroids, dtype=np.float32).reshape(-1, 1, 2).copy()
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.01)
try:
refined = cv2.cornerSubPix(gray, pts, (win, win), (-1, -1), criteria)
except cv2.error:
return centroids
refined = refined.reshape(-1, 2)
orig = np.asarray(centroids, dtype=np.float32)
shifts = np.linalg.norm(refined - orig, axis=1)
mask = shifts <= max_shift
out = orig.copy()
out[mask] = refined[mask]
return out
def _detect_edges_2d(
gest_np: np.ndarray,
corners: list[Corner],
edge_th: float = 15.0,
) -> list[tuple[int, int, str]]:
"""Detect 2D gestalt edges and connect them to existing corner indices.
Mirrors ``get_vertices_and_edges_improved`` from sklearn_submission but
keeps *all* edge classes and returns triples ``(ci, cj, edge_cls)`` so
we can aggregate edge-class votes downstream.
"""
if len(corners) < 2:
return []
apex_pts = np.array([c.xy for c in corners], dtype=np.float32)
connections: list[tuple[int, int, str]] = []
for edge_class in EDGE_CLASSES:
color = np.array(gestalt_color_mapping[edge_class])
mask_raw = cv2.inRange(gest_np, color - 0.5, color + 0.5)
mask = cv2.morphologyEx(mask_raw, cv2.MORPH_CLOSE, np.ones((5, 5), np.uint8))
if mask.sum() == 0:
continue
_, labels, _, _ = cv2.connectedComponentsWithStats(mask, 8, cv2.CV_32S)
for lbl in range(1, labels.max() + 1):
ys, xs = np.where(labels == lbl)
if len(xs) < 2:
continue
pts = np.column_stack([xs, ys]).astype(np.float32)
line_params = cv2.fitLine(pts, cv2.DIST_L2, 0, 0.01, 0.01)
vx, vy, x0, y0 = line_params.ravel()
proj = (xs - x0) * vx + (ys - y0) * vy
p1 = np.array([x0 + proj.min() * vx, y0 + proj.min() * vy])
p2 = np.array([x0 + proj.max() * vx, y0 + proj.max() * vy])
dists = np.array(
[point_to_segment_dist(apex_pts[i], p1, p2) for i in range(len(apex_pts))]
)
near = np.where(dists <= edge_th)[0]
if len(near) < 2:
continue
near_pts = apex_pts[near]
a = int(near[np.argmin(np.linalg.norm(near_pts - p1, axis=1))])
b = int(near[np.argmin(np.linalg.norm(near_pts - p2, axis=1))])
if a != b:
lo, hi = (a, b) if a < b else (b, a)
connections.append((lo, hi, edge_class))
return connections
def detect_corners_per_view(
entry,
vertex_classes: list[str] | None = None,
filter_background: bool = True,
return_edges: bool = False,
):
"""Run per-view corner detection + subpixel refinement.
Returns
-------
corners_per_view : dict[image_id β†’ list[Corner]]
good_entry : the convert_entry_to_human_readable output (caller reuses it)
edges_per_view (if ``return_edges``) : dict[image_id β†’ list[(ci, cj, edge_cls)]]
"""
if vertex_classes is None:
vertex_classes = VERTEX_CLASSES
good = convert_entry_to_human_readable(entry)
corners_per_view: dict[str, list[Corner]] = {}
edges_per_view: dict[str, list[tuple[int, int, str]]] = {}
for i, (gest, depth, img_id, ade_seg) in enumerate(zip(
good['gestalt'], good['depth'], good['image_ids'], good['ade']
)):
# Native resolution used by the COLMAP camera is the depth resolution
# (768Γ—576 in practice). Resize gestalt to match so pixel coordinates
# are compatible with our projection matrices.
depth_np = np.array(depth)
H, W = depth_np.shape[:2]
gest_np = np.array(gest.resize((W, H))).astype(np.uint8)
ade_np = np.array(ade_seg.resize((W, H))).astype(np.uint8)
corners: list[Corner] = []
for v_class in vertex_classes:
color = np.array(gestalt_color_mapping[v_class])
mask = cv2.inRange(gest_np, color - 0.5, color + 0.5)
if mask.sum() == 0:
continue
_, _, stats, centroids = cv2.connectedComponentsWithStats(mask, 8, cv2.CV_32S)
blob_centroids = centroids[1:]
areas = stats[1:, cv2.CC_STAT_AREA]
if len(blob_centroids) == 0:
continue
refined = _refine_centroids_subpix(gest_np, blob_centroids)
for xy, area in zip(refined, areas):
corners.append(Corner(
view_id=img_id,
xy=np.asarray(xy, dtype=np.float32),
cls=v_class,
blob_area=int(area),
))
if filter_background and corners:
fake_verts = [{"xy": c.xy, "type": c.cls} for c in corners]
fake_verts, _ = filter_vertices_by_background(fake_verts, [], ade_np)
kept_keys = {(float(v['xy'][0]), float(v['xy'][1]), v['type']) for v in fake_verts}
corners = [c for c in corners
if (float(c.xy[0]), float(c.xy[1]), c.cls) in kept_keys]
corners_per_view[img_id] = corners
if return_edges:
edges_per_view[img_id] = _detect_edges_2d(gest_np, corners)
if return_edges:
return corners_per_view, good, edges_per_view
return corners_per_view, good
def build_tracks(
corners_per_view: dict[str, list[Corner]],
views: dict[str, dict],
class_strict: bool = True,
epipolar_px: float = 6.0,
reproj_px: float = 4.0,
min_views: int = 2,
) -> list[Track]:
"""Greedy multi-view matching and triangulation with epipolar gating.
Strategy (classical, mirrors PC2WF / COLMAP incremental triangulation):
1. Build a pool of unmatched corners from every view.
2. For every ordered pair of views compute the fundamental matrix.
3. For each corner in view_a, find all corners in view_b of the same class
whose perpendicular distance to the epipolar line is below
``epipolar_px``. Triangulate each candidate pair via DLT.
4. For each candidate 3D point, reproject it back into every other view.
A corner of the same class within ``reproj_px`` of the reprojection
becomes an additional observation. Re-triangulate with the enlarged
observation list.
5. Accept the track if it has β‰₯ ``min_views`` observations, mean
reprojection error < ``reproj_px``, and positive depth everywhere.
6. Mark all corners in the track as matched so they are not reused.
Parameters are intentionally tight β€” noise-reducing rather than
permissive β€” because a wrongly triangulated vertex can sit meters
away from any real roof feature.
"""
# Stable ordering: view ids sorted
view_ids = [vid for vid in corners_per_view.keys() if vid in views]
view_ids.sort()
# Index remaining corners (view_id, idx) β†’ Corner
remaining: dict[tuple[str, int], Corner] = {}
for vid in view_ids:
for idx, c in enumerate(corners_per_view[vid]):
remaining[(vid, idx)] = c
tracks: list[Track] = []
for anchor_vid in view_ids:
for (r_vid, r_idx), anchor in list(remaining.items()):
if r_vid != anchor_vid:
continue
# Try matching this anchor against each other view.
best_track: Track | None = None
for other_vid in view_ids:
if other_vid == anchor_vid:
continue
F = fundamental_matrix(views[anchor_vid], views[other_vid])
line = epipolar_line(F, anchor.xy)
for (o_vid, o_idx), cand in remaining.items():
if o_vid != other_vid:
continue
if class_strict and cand.cls != anchor.cls:
continue
d = point_to_line_distance(line, cand.xy)
if d > epipolar_px:
continue
# Two-view DLT
Ps = [views[anchor_vid]["P"], views[other_vid]["P"]]
pts = [anchor.xy, cand.xy]
X = triangulate_dlt(Ps, pts)
if not np.all(np.isfinite(X)):
continue
# Extend with all other views that also see this point.
obs = [(anchor_vid, anchor.xy), (other_vid, cand.xy)]
used_keys = {(anchor_vid, r_idx), (other_vid, o_idx)}
for ext_vid in view_ids:
if ext_vid in (anchor_vid, other_vid):
continue
uv, z = project_world_to_image(views[ext_vid]["P"], X.reshape(1, 3))
if z[0] <= 0:
continue
u_pred = uv[0]
best_match = None
best_dist = reproj_px
for (e_vid, e_idx), ec in remaining.items():
if e_vid != ext_vid:
continue
if class_strict and ec.cls != anchor.cls:
continue
d2 = float(np.linalg.norm(ec.xy - u_pred))
if d2 < best_dist:
best_dist = d2
best_match = (e_vid, e_idx, ec)
if best_match is not None:
obs.append((best_match[0], best_match[2].xy))
used_keys.add((best_match[0], best_match[1]))
if len(obs) < min_views:
continue
# Retriangulate on full observation set for stability
Ps_full = [views[vid]["P"] for vid, _ in obs]
pts_full = [uv for _, uv in obs]
X_full = triangulate_dlt(Ps_full, pts_full)
if not np.all(np.isfinite(X_full)):
continue
err = mean_reprojection_error(X_full, Ps_full, pts_full)
if err > reproj_px:
continue
track = Track(
xyz=X_full,
cls=anchor.cls,
observations=obs,
reproj_err=err,
)
track._used_keys = used_keys # type: ignore[attr-defined]
if best_track is None or len(track.observations) > len(best_track.observations) \
or (len(track.observations) == len(best_track.observations) and err < best_track.reproj_err):
best_track = track
if best_track is not None:
# Freeze the corner-index mapping and forget the private attr.
used = getattr(best_track, "_used_keys", set())
best_track.corner_indices = {vid: int(idx) for vid, idx in used}
try:
delattr(best_track, "_used_keys")
except AttributeError:
pass
tracks.append(best_track)
# Retire matched corners so they aren't reused.
for key in used:
remaining.pop(key, None)
return tracks
def get_high_confidence_tracks(
entry,
min_views: int = 3,
max_reproj_px: float = 2.0,
epipolar_px: float = 6.0,
build_reproj_px: float = 4.0,
) -> list[Track]:
"""Run the full triangulation pipeline and return only the tracks
that pass a stricter quality gate.
The default ``min_views=3`` and ``max_reproj_px=2.0`` are tighter
than ``predict_wireframe_tracks`` defaults and are designed for
using these tracks as **vertex sources** rather than just edge
sources. A β‰₯3-view DLT triangulation with <2 px mean reprojection
error has a 3D accuracy of 5–10 cm β€” substantially better than
depth-based unprojection.
"""
tracks, _views, _good = triangulate_wireframe(
entry,
epipolar_px=epipolar_px,
reproj_px=build_reproj_px,
min_views=2,
want_edges=False,
)
return [
t for t in tracks
if len(t.observations) >= min_views and t.reproj_err <= max_reproj_px
]
def predict_wireframe_tracks(
entry,
min_views: int = 2,
min_votes: int = 1,
epipolar_px: float = 6.0,
reproj_px: float = 4.0,
merge_radius: float = 0.3,
) -> tuple[np.ndarray, list[tuple[int, int]]]:
"""Standalone triangulation-based wireframe predictor.
Returns (vertices, edges) in the same format as
``predict_wireframe_sklearn`` β€” ready to feed into ``hss()``.
"""
import numpy as _np
tracks, _views, _good, t_edges = triangulate_wireframe(
entry,
epipolar_px=epipolar_px,
reproj_px=reproj_px,
min_views=min_views,
want_edges=True,
)
if not tracks:
return _np.zeros((2, 3), dtype=_np.float64), [(0, 1)]
xyz = _np.array([t.xyz for t in tracks], dtype=_np.float64)
# Merge vertices closer than ``merge_radius``. A simple greedy union-find
# keyed on first-touched neighbour keeps it O(N^2) but N ≀ 200 in practice.
n = len(xyz)
parent = list(range(n))
def find(x):
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(a, b):
ra, rb = find(a), find(b)
if ra != rb:
parent[ra] = rb
diff = xyz[:, None, :] - xyz[None, :, :]
dists = _np.sqrt((diff ** 2).sum(-1))
for i in range(n):
for j in range(i + 1, n):
if dists[i, j] <= merge_radius:
union(i, j)
groups: dict[int, list[int]] = {}
for i in range(n):
r = find(i)
groups.setdefault(r, []).append(i)
old_to_new: dict[int, int] = {}
new_xyz = []
for new_idx, (root, members) in enumerate(groups.items()):
for m in members:
old_to_new[m] = new_idx
new_xyz.append(xyz[members].mean(axis=0))
new_xyz = _np.array(new_xyz, dtype=_np.float64)
# Remap edges, dedup
edge_set: dict[tuple[int, int], int] = {}
for ti, tj, votes in t_edges:
if votes < min_votes:
continue
a = old_to_new[ti]
b = old_to_new[tj]
if a == b:
continue
key = (a, b) if a < b else (b, a)
edge_set[key] = edge_set.get(key, 0) + votes
edges = list(edge_set.keys())
if not edges or len(new_xyz) < 2:
return _np.zeros((2, 3), dtype=_np.float64), [(0, 1)]
return new_xyz, [(int(a), int(b)) for a, b in edges]
def build_track_edges(
tracks: list[Track],
edges_per_view: dict[str, list[tuple[int, int, str]]],
min_votes: int = 1,
max_3d_len: float = 8.0,
) -> list[tuple[int, int, int]]:
"""Aggregate 3D edges from per-view 2D gestalt edges.
Parameters
----------
tracks : list of Track
edges_per_view : dict[view_id β†’ list[(corner_i_idx, corner_j_idx, edge_cls)]]
min_votes : minimum number of views that must agree on an edge.
max_3d_len : drop edges that would be absurdly long in 3D.
Returns
-------
list of (track_i, track_j, vote_count)
"""
# (view_id, corner_idx) β†’ track_idx
key_to_track: dict[tuple[str, int], int] = {}
for t_idx, t in enumerate(tracks):
for vid, cidx in t.corner_indices.items():
key_to_track[(vid, cidx)] = t_idx
votes: dict[tuple[int, int], int] = {}
for vid, edges in edges_per_view.items():
for ci, cj, _ecls in edges:
ti = key_to_track.get((vid, ci))
tj = key_to_track.get((vid, cj))
if ti is None or tj is None or ti == tj:
continue
key = (ti, tj) if ti < tj else (tj, ti)
votes[key] = votes.get(key, 0) + 1
out: list[tuple[int, int, int]] = []
for (ti, tj), v in votes.items():
if v < min_votes:
continue
d = float(np.linalg.norm(tracks[ti].xyz - tracks[tj].xyz))
if d > max_3d_len:
continue
out.append((ti, tj, v))
return out
def triangulate_wireframe(
entry,
epipolar_px: float = 6.0,
reproj_px: float = 4.0,
min_views: int = 2,
want_edges: bool = False,
):
"""High-level wrapper: detect corners, build views, triangulate tracks.
Returns
-------
(tracks, views, good_entry)
when ``want_edges=False`` (default, backwards compatible).
(tracks, views, good_entry, track_edges)
when ``want_edges=True``. ``track_edges`` is the output of
:func:`build_track_edges` β€” a list of ``(track_i, track_j, vote_count)``.
"""
if want_edges:
corners_per_view, good, edges_per_view = detect_corners_per_view(
entry, return_edges=True
)
else:
corners_per_view, good = detect_corners_per_view(entry)
edges_per_view = None
colmap_rec = good.get('colmap') or good.get('colmap_binary')
views = collect_views(colmap_rec, good['image_ids'])
tracks = build_tracks(
corners_per_view, views,
epipolar_px=epipolar_px,
reproj_px=reproj_px,
min_views=min_views,
)
if not want_edges:
return tracks, views, good
track_edges = build_track_edges(tracks, edges_per_view or {})
return tracks, views, good, track_edges
# ---------------------------------------------------------------------------
# T1.6: integration helper β€” refine an existing depth-based 3D vertex set
# by snapping each vertex to its closest triangulated track.
# ---------------------------------------------------------------------------
def refine_vertices_with_tracks(
merged_v: np.ndarray,
tracks: list[Track],
snap_radius: float = 1.0,
min_views_for_snap: int = 2,
max_reproj_err_px: float = float("inf"),
) -> tuple[np.ndarray, np.ndarray]:
"""For each vertex in ``merged_v``, find the closest triangulated track
(by 3D distance) and, if it sits within ``snap_radius`` metres, move the
vertex to that track's position.
The graph structure is preserved β€” only positions move. Tracks with
fewer than ``min_views_for_snap`` observations are ignored (2-view DLT
is noisy on short baselines).
Returns
-------
refined_v : (N, 3) float64 β€” refined vertex positions
snap_mask : (N,) bool β€” True where a snap happened
"""
refined = np.asarray(merged_v, dtype=np.float64).copy()
snap = np.zeros(len(refined), dtype=bool)
good_tracks = [
t for t in tracks
if len(t.observations) >= min_views_for_snap and t.reproj_err <= max_reproj_err_px
]
if not good_tracks or len(refined) == 0:
return refined, snap
track_xyz = np.array([t.xyz for t in good_tracks], dtype=np.float64)
for i in range(len(refined)):
d = np.linalg.norm(track_xyz - refined[i], axis=1)
j = int(np.argmin(d))
if d[j] <= snap_radius:
refined[i] = track_xyz[j]
snap[i] = True
return refined, snap
def augment_with_tracks(
merged_v: np.ndarray,
heur_edges: list,
tracks: list[Track],
dup_radius: float = 0.4,
min_views_for_add: int = 3,
max_reproj_err_px: float = 2.5,
) -> tuple[np.ndarray, list]:
"""Append high-confidence triangulated tracks as new vertices.
Unlike ``refine_vertices_with_tracks`` (which moves existing vertices and
risks regressions on already-good ones), this only adds new points that
sit more than ``dup_radius`` metres from any existing vertex.
The edge list is returned unchanged β€” new vertices only get edges via the
downstream sklearn classifier or heuristic edge-detection step, not here.
"""
merged = np.asarray(merged_v, dtype=np.float64)
confident = [t for t in tracks
if len(t.observations) >= min_views_for_add
and t.reproj_err <= max_reproj_err_px]
if not confident:
return merged, heur_edges
tvs = np.array([t.xyz for t in confident], dtype=np.float64)
if len(merged) == 0:
return tvs, heur_edges
# Keep tracks that are not a duplicate of any existing merged vertex.
diffs = tvs[:, None, :] - merged[None, :, :]
dists = np.sqrt((diffs ** 2).sum(-1))
min_d = dists.min(axis=1)
new = tvs[min_d > dup_radius]
if len(new) == 0:
return merged, heur_edges
augmented = np.vstack([merged, new])
return augmented, heur_edges