| """Multi-view corner triangulation pipeline (T1.2 β T1.6). |
| |
| Drop-in replacement for the depth-based ``project_vertices_to_3d`` step in |
| ``sklearn_submission.py``. The depth map is only used as a sanity filter, never |
| as the source of 3D positions β the actual geometry comes from COLMAP cameras |
| via DLT triangulation. |
| |
| Entry points: |
| detect_corners_per_view(entry) β dict[view_id β List[Corner]] |
| triangulate_wireframe(entry, corners_per_view) β Tracks + per-track obs |
| |
| Everything is pure numpy + pycolmap + cv2 β no torch, no kornia. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import numpy as np |
| import cv2 |
| from dataclasses import dataclass, field |
|
|
| from hoho2025.example_solutions import ( |
| convert_entry_to_human_readable, |
| filter_vertices_by_background, |
| point_to_segment_dist, |
| ) |
| from hoho2025.color_mappings import gestalt_color_mapping |
|
|
| try: |
| from mvs_utils import ( |
| collect_views, triangulate_dlt, mean_reprojection_error, |
| fundamental_matrix, epipolar_line, point_to_line_distance, |
| project_world_to_image, |
| ) |
| except ImportError: |
| from submission.mvs_utils import ( |
| collect_views, triangulate_dlt, mean_reprojection_error, |
| fundamental_matrix, epipolar_line, point_to_line_distance, |
| project_world_to_image, |
| ) |
|
|
|
|
| |
| VERTEX_CLASSES = ['apex', 'eave_end_point', 'flashing_end_point'] |
| EDGE_CLASSES = ['eave', 'ridge', 'rake', 'valley', 'hip'] |
|
|
|
|
| @dataclass |
| class Corner: |
| """A 2D corner detected on a single view.""" |
| view_id: str |
| xy: np.ndarray |
| cls: str |
| blob_area: int |
|
|
|
|
| @dataclass |
| class Track: |
| """A 3D wireframe vertex with its per-view observations.""" |
| xyz: np.ndarray |
| cls: str |
| observations: list[tuple[str, np.ndarray]] = field(default_factory=list) |
| reproj_err: float = float("inf") |
| |
| |
| corner_indices: dict[str, int] = field(default_factory=dict) |
|
|
|
|
| def _refine_centroids_subpix(gest_seg_np, centroids, max_shift=4.0, win=5): |
| """cv2.cornerSubPix refinement inside an apex blob. Identical to the |
| version in sklearn_submission.py β duplicated here to keep triangulation.py |
| importable on its own. |
| """ |
| if len(centroids) == 0: |
| return centroids |
| gray = cv2.cvtColor(gest_seg_np, cv2.COLOR_RGB2GRAY) |
| gray = cv2.GaussianBlur(gray, (3, 3), 0) |
| pts = np.asarray(centroids, dtype=np.float32).reshape(-1, 1, 2).copy() |
| criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.01) |
| try: |
| refined = cv2.cornerSubPix(gray, pts, (win, win), (-1, -1), criteria) |
| except cv2.error: |
| return centroids |
| refined = refined.reshape(-1, 2) |
| orig = np.asarray(centroids, dtype=np.float32) |
| shifts = np.linalg.norm(refined - orig, axis=1) |
| mask = shifts <= max_shift |
| out = orig.copy() |
| out[mask] = refined[mask] |
| return out |
|
|
|
|
| def _detect_edges_2d( |
| gest_np: np.ndarray, |
| corners: list[Corner], |
| edge_th: float = 15.0, |
| ) -> list[tuple[int, int, str]]: |
| """Detect 2D gestalt edges and connect them to existing corner indices. |
| |
| Mirrors ``get_vertices_and_edges_improved`` from sklearn_submission but |
| keeps *all* edge classes and returns triples ``(ci, cj, edge_cls)`` so |
| we can aggregate edge-class votes downstream. |
| """ |
| if len(corners) < 2: |
| return [] |
| apex_pts = np.array([c.xy for c in corners], dtype=np.float32) |
| connections: list[tuple[int, int, str]] = [] |
| for edge_class in EDGE_CLASSES: |
| color = np.array(gestalt_color_mapping[edge_class]) |
| mask_raw = cv2.inRange(gest_np, color - 0.5, color + 0.5) |
| mask = cv2.morphologyEx(mask_raw, cv2.MORPH_CLOSE, np.ones((5, 5), np.uint8)) |
| if mask.sum() == 0: |
| continue |
| _, labels, _, _ = cv2.connectedComponentsWithStats(mask, 8, cv2.CV_32S) |
| for lbl in range(1, labels.max() + 1): |
| ys, xs = np.where(labels == lbl) |
| if len(xs) < 2: |
| continue |
| pts = np.column_stack([xs, ys]).astype(np.float32) |
| line_params = cv2.fitLine(pts, cv2.DIST_L2, 0, 0.01, 0.01) |
| vx, vy, x0, y0 = line_params.ravel() |
| proj = (xs - x0) * vx + (ys - y0) * vy |
| p1 = np.array([x0 + proj.min() * vx, y0 + proj.min() * vy]) |
| p2 = np.array([x0 + proj.max() * vx, y0 + proj.max() * vy]) |
| dists = np.array( |
| [point_to_segment_dist(apex_pts[i], p1, p2) for i in range(len(apex_pts))] |
| ) |
| near = np.where(dists <= edge_th)[0] |
| if len(near) < 2: |
| continue |
| near_pts = apex_pts[near] |
| a = int(near[np.argmin(np.linalg.norm(near_pts - p1, axis=1))]) |
| b = int(near[np.argmin(np.linalg.norm(near_pts - p2, axis=1))]) |
| if a != b: |
| lo, hi = (a, b) if a < b else (b, a) |
| connections.append((lo, hi, edge_class)) |
| return connections |
|
|
|
|
| def detect_corners_per_view( |
| entry, |
| vertex_classes: list[str] | None = None, |
| filter_background: bool = True, |
| return_edges: bool = False, |
| ): |
| """Run per-view corner detection + subpixel refinement. |
| |
| Returns |
| ------- |
| corners_per_view : dict[image_id β list[Corner]] |
| good_entry : the convert_entry_to_human_readable output (caller reuses it) |
| edges_per_view (if ``return_edges``) : dict[image_id β list[(ci, cj, edge_cls)]] |
| """ |
| if vertex_classes is None: |
| vertex_classes = VERTEX_CLASSES |
|
|
| good = convert_entry_to_human_readable(entry) |
| corners_per_view: dict[str, list[Corner]] = {} |
| edges_per_view: dict[str, list[tuple[int, int, str]]] = {} |
|
|
| for i, (gest, depth, img_id, ade_seg) in enumerate(zip( |
| good['gestalt'], good['depth'], good['image_ids'], good['ade'] |
| )): |
| |
| |
| |
| depth_np = np.array(depth) |
| H, W = depth_np.shape[:2] |
| gest_np = np.array(gest.resize((W, H))).astype(np.uint8) |
| ade_np = np.array(ade_seg.resize((W, H))).astype(np.uint8) |
|
|
| corners: list[Corner] = [] |
| for v_class in vertex_classes: |
| color = np.array(gestalt_color_mapping[v_class]) |
| mask = cv2.inRange(gest_np, color - 0.5, color + 0.5) |
| if mask.sum() == 0: |
| continue |
| _, _, stats, centroids = cv2.connectedComponentsWithStats(mask, 8, cv2.CV_32S) |
| blob_centroids = centroids[1:] |
| areas = stats[1:, cv2.CC_STAT_AREA] |
| if len(blob_centroids) == 0: |
| continue |
| refined = _refine_centroids_subpix(gest_np, blob_centroids) |
| for xy, area in zip(refined, areas): |
| corners.append(Corner( |
| view_id=img_id, |
| xy=np.asarray(xy, dtype=np.float32), |
| cls=v_class, |
| blob_area=int(area), |
| )) |
|
|
| if filter_background and corners: |
| fake_verts = [{"xy": c.xy, "type": c.cls} for c in corners] |
| fake_verts, _ = filter_vertices_by_background(fake_verts, [], ade_np) |
| kept_keys = {(float(v['xy'][0]), float(v['xy'][1]), v['type']) for v in fake_verts} |
| corners = [c for c in corners |
| if (float(c.xy[0]), float(c.xy[1]), c.cls) in kept_keys] |
|
|
| corners_per_view[img_id] = corners |
| if return_edges: |
| edges_per_view[img_id] = _detect_edges_2d(gest_np, corners) |
|
|
| if return_edges: |
| return corners_per_view, good, edges_per_view |
| return corners_per_view, good |
|
|
|
|
| def build_tracks( |
| corners_per_view: dict[str, list[Corner]], |
| views: dict[str, dict], |
| class_strict: bool = True, |
| epipolar_px: float = 6.0, |
| reproj_px: float = 4.0, |
| min_views: int = 2, |
| ) -> list[Track]: |
| """Greedy multi-view matching and triangulation with epipolar gating. |
| |
| Strategy (classical, mirrors PC2WF / COLMAP incremental triangulation): |
| |
| 1. Build a pool of unmatched corners from every view. |
| 2. For every ordered pair of views compute the fundamental matrix. |
| 3. For each corner in view_a, find all corners in view_b of the same class |
| whose perpendicular distance to the epipolar line is below |
| ``epipolar_px``. Triangulate each candidate pair via DLT. |
| 4. For each candidate 3D point, reproject it back into every other view. |
| A corner of the same class within ``reproj_px`` of the reprojection |
| becomes an additional observation. Re-triangulate with the enlarged |
| observation list. |
| 5. Accept the track if it has β₯ ``min_views`` observations, mean |
| reprojection error < ``reproj_px``, and positive depth everywhere. |
| 6. Mark all corners in the track as matched so they are not reused. |
| |
| Parameters are intentionally tight β noise-reducing rather than |
| permissive β because a wrongly triangulated vertex can sit meters |
| away from any real roof feature. |
| """ |
| |
| view_ids = [vid for vid in corners_per_view.keys() if vid in views] |
| view_ids.sort() |
|
|
| |
| remaining: dict[tuple[str, int], Corner] = {} |
| for vid in view_ids: |
| for idx, c in enumerate(corners_per_view[vid]): |
| remaining[(vid, idx)] = c |
|
|
| tracks: list[Track] = [] |
|
|
| for anchor_vid in view_ids: |
| for (r_vid, r_idx), anchor in list(remaining.items()): |
| if r_vid != anchor_vid: |
| continue |
| |
| best_track: Track | None = None |
|
|
| for other_vid in view_ids: |
| if other_vid == anchor_vid: |
| continue |
| F = fundamental_matrix(views[anchor_vid], views[other_vid]) |
| line = epipolar_line(F, anchor.xy) |
|
|
| for (o_vid, o_idx), cand in remaining.items(): |
| if o_vid != other_vid: |
| continue |
| if class_strict and cand.cls != anchor.cls: |
| continue |
| d = point_to_line_distance(line, cand.xy) |
| if d > epipolar_px: |
| continue |
|
|
| |
| Ps = [views[anchor_vid]["P"], views[other_vid]["P"]] |
| pts = [anchor.xy, cand.xy] |
| X = triangulate_dlt(Ps, pts) |
| if not np.all(np.isfinite(X)): |
| continue |
|
|
| |
| obs = [(anchor_vid, anchor.xy), (other_vid, cand.xy)] |
| used_keys = {(anchor_vid, r_idx), (other_vid, o_idx)} |
| for ext_vid in view_ids: |
| if ext_vid in (anchor_vid, other_vid): |
| continue |
| uv, z = project_world_to_image(views[ext_vid]["P"], X.reshape(1, 3)) |
| if z[0] <= 0: |
| continue |
| u_pred = uv[0] |
| best_match = None |
| best_dist = reproj_px |
| for (e_vid, e_idx), ec in remaining.items(): |
| if e_vid != ext_vid: |
| continue |
| if class_strict and ec.cls != anchor.cls: |
| continue |
| d2 = float(np.linalg.norm(ec.xy - u_pred)) |
| if d2 < best_dist: |
| best_dist = d2 |
| best_match = (e_vid, e_idx, ec) |
| if best_match is not None: |
| obs.append((best_match[0], best_match[2].xy)) |
| used_keys.add((best_match[0], best_match[1])) |
|
|
| if len(obs) < min_views: |
| continue |
|
|
| |
| Ps_full = [views[vid]["P"] for vid, _ in obs] |
| pts_full = [uv for _, uv in obs] |
| X_full = triangulate_dlt(Ps_full, pts_full) |
| if not np.all(np.isfinite(X_full)): |
| continue |
| err = mean_reprojection_error(X_full, Ps_full, pts_full) |
| if err > reproj_px: |
| continue |
|
|
| track = Track( |
| xyz=X_full, |
| cls=anchor.cls, |
| observations=obs, |
| reproj_err=err, |
| ) |
| track._used_keys = used_keys |
| if best_track is None or len(track.observations) > len(best_track.observations) \ |
| or (len(track.observations) == len(best_track.observations) and err < best_track.reproj_err): |
| best_track = track |
|
|
| if best_track is not None: |
| |
| used = getattr(best_track, "_used_keys", set()) |
| best_track.corner_indices = {vid: int(idx) for vid, idx in used} |
| try: |
| delattr(best_track, "_used_keys") |
| except AttributeError: |
| pass |
| tracks.append(best_track) |
| |
| for key in used: |
| remaining.pop(key, None) |
|
|
| return tracks |
|
|
|
|
| def get_high_confidence_tracks( |
| entry, |
| min_views: int = 3, |
| max_reproj_px: float = 2.0, |
| epipolar_px: float = 6.0, |
| build_reproj_px: float = 4.0, |
| ) -> list[Track]: |
| """Run the full triangulation pipeline and return only the tracks |
| that pass a stricter quality gate. |
| |
| The default ``min_views=3`` and ``max_reproj_px=2.0`` are tighter |
| than ``predict_wireframe_tracks`` defaults and are designed for |
| using these tracks as **vertex sources** rather than just edge |
| sources. A β₯3-view DLT triangulation with <2 px mean reprojection |
| error has a 3D accuracy of 5β10 cm β substantially better than |
| depth-based unprojection. |
| """ |
| tracks, _views, _good = triangulate_wireframe( |
| entry, |
| epipolar_px=epipolar_px, |
| reproj_px=build_reproj_px, |
| min_views=2, |
| want_edges=False, |
| ) |
| return [ |
| t for t in tracks |
| if len(t.observations) >= min_views and t.reproj_err <= max_reproj_px |
| ] |
|
|
|
|
| def predict_wireframe_tracks( |
| entry, |
| min_views: int = 2, |
| min_votes: int = 1, |
| epipolar_px: float = 6.0, |
| reproj_px: float = 4.0, |
| merge_radius: float = 0.3, |
| ) -> tuple[np.ndarray, list[tuple[int, int]]]: |
| """Standalone triangulation-based wireframe predictor. |
| |
| Returns (vertices, edges) in the same format as |
| ``predict_wireframe_sklearn`` β ready to feed into ``hss()``. |
| """ |
| import numpy as _np |
|
|
| tracks, _views, _good, t_edges = triangulate_wireframe( |
| entry, |
| epipolar_px=epipolar_px, |
| reproj_px=reproj_px, |
| min_views=min_views, |
| want_edges=True, |
| ) |
| if not tracks: |
| return _np.zeros((2, 3), dtype=_np.float64), [(0, 1)] |
|
|
| xyz = _np.array([t.xyz for t in tracks], dtype=_np.float64) |
|
|
| |
| |
| n = len(xyz) |
| parent = list(range(n)) |
|
|
| def find(x): |
| while parent[x] != x: |
| parent[x] = parent[parent[x]] |
| x = parent[x] |
| return x |
|
|
| def union(a, b): |
| ra, rb = find(a), find(b) |
| if ra != rb: |
| parent[ra] = rb |
|
|
| diff = xyz[:, None, :] - xyz[None, :, :] |
| dists = _np.sqrt((diff ** 2).sum(-1)) |
| for i in range(n): |
| for j in range(i + 1, n): |
| if dists[i, j] <= merge_radius: |
| union(i, j) |
|
|
| groups: dict[int, list[int]] = {} |
| for i in range(n): |
| r = find(i) |
| groups.setdefault(r, []).append(i) |
|
|
| old_to_new: dict[int, int] = {} |
| new_xyz = [] |
| for new_idx, (root, members) in enumerate(groups.items()): |
| for m in members: |
| old_to_new[m] = new_idx |
| new_xyz.append(xyz[members].mean(axis=0)) |
| new_xyz = _np.array(new_xyz, dtype=_np.float64) |
|
|
| |
| edge_set: dict[tuple[int, int], int] = {} |
| for ti, tj, votes in t_edges: |
| if votes < min_votes: |
| continue |
| a = old_to_new[ti] |
| b = old_to_new[tj] |
| if a == b: |
| continue |
| key = (a, b) if a < b else (b, a) |
| edge_set[key] = edge_set.get(key, 0) + votes |
|
|
| edges = list(edge_set.keys()) |
| if not edges or len(new_xyz) < 2: |
| return _np.zeros((2, 3), dtype=_np.float64), [(0, 1)] |
|
|
| return new_xyz, [(int(a), int(b)) for a, b in edges] |
|
|
|
|
| def build_track_edges( |
| tracks: list[Track], |
| edges_per_view: dict[str, list[tuple[int, int, str]]], |
| min_votes: int = 1, |
| max_3d_len: float = 8.0, |
| ) -> list[tuple[int, int, int]]: |
| """Aggregate 3D edges from per-view 2D gestalt edges. |
| |
| Parameters |
| ---------- |
| tracks : list of Track |
| edges_per_view : dict[view_id β list[(corner_i_idx, corner_j_idx, edge_cls)]] |
| min_votes : minimum number of views that must agree on an edge. |
| max_3d_len : drop edges that would be absurdly long in 3D. |
| |
| Returns |
| ------- |
| list of (track_i, track_j, vote_count) |
| """ |
| |
| key_to_track: dict[tuple[str, int], int] = {} |
| for t_idx, t in enumerate(tracks): |
| for vid, cidx in t.corner_indices.items(): |
| key_to_track[(vid, cidx)] = t_idx |
|
|
| votes: dict[tuple[int, int], int] = {} |
| for vid, edges in edges_per_view.items(): |
| for ci, cj, _ecls in edges: |
| ti = key_to_track.get((vid, ci)) |
| tj = key_to_track.get((vid, cj)) |
| if ti is None or tj is None or ti == tj: |
| continue |
| key = (ti, tj) if ti < tj else (tj, ti) |
| votes[key] = votes.get(key, 0) + 1 |
|
|
| out: list[tuple[int, int, int]] = [] |
| for (ti, tj), v in votes.items(): |
| if v < min_votes: |
| continue |
| d = float(np.linalg.norm(tracks[ti].xyz - tracks[tj].xyz)) |
| if d > max_3d_len: |
| continue |
| out.append((ti, tj, v)) |
| return out |
|
|
|
|
| def triangulate_wireframe( |
| entry, |
| epipolar_px: float = 6.0, |
| reproj_px: float = 4.0, |
| min_views: int = 2, |
| want_edges: bool = False, |
| ): |
| """High-level wrapper: detect corners, build views, triangulate tracks. |
| |
| Returns |
| ------- |
| (tracks, views, good_entry) |
| when ``want_edges=False`` (default, backwards compatible). |
| (tracks, views, good_entry, track_edges) |
| when ``want_edges=True``. ``track_edges`` is the output of |
| :func:`build_track_edges` β a list of ``(track_i, track_j, vote_count)``. |
| """ |
| if want_edges: |
| corners_per_view, good, edges_per_view = detect_corners_per_view( |
| entry, return_edges=True |
| ) |
| else: |
| corners_per_view, good = detect_corners_per_view(entry) |
| edges_per_view = None |
|
|
| colmap_rec = good.get('colmap') or good.get('colmap_binary') |
| views = collect_views(colmap_rec, good['image_ids']) |
| tracks = build_tracks( |
| corners_per_view, views, |
| epipolar_px=epipolar_px, |
| reproj_px=reproj_px, |
| min_views=min_views, |
| ) |
| if not want_edges: |
| return tracks, views, good |
| track_edges = build_track_edges(tracks, edges_per_view or {}) |
| return tracks, views, good, track_edges |
|
|
|
|
| |
| |
| |
| |
|
|
| def refine_vertices_with_tracks( |
| merged_v: np.ndarray, |
| tracks: list[Track], |
| snap_radius: float = 1.0, |
| min_views_for_snap: int = 2, |
| max_reproj_err_px: float = float("inf"), |
| ) -> tuple[np.ndarray, np.ndarray]: |
| """For each vertex in ``merged_v``, find the closest triangulated track |
| (by 3D distance) and, if it sits within ``snap_radius`` metres, move the |
| vertex to that track's position. |
| |
| The graph structure is preserved β only positions move. Tracks with |
| fewer than ``min_views_for_snap`` observations are ignored (2-view DLT |
| is noisy on short baselines). |
| |
| Returns |
| ------- |
| refined_v : (N, 3) float64 β refined vertex positions |
| snap_mask : (N,) bool β True where a snap happened |
| """ |
| refined = np.asarray(merged_v, dtype=np.float64).copy() |
| snap = np.zeros(len(refined), dtype=bool) |
|
|
| good_tracks = [ |
| t for t in tracks |
| if len(t.observations) >= min_views_for_snap and t.reproj_err <= max_reproj_err_px |
| ] |
| if not good_tracks or len(refined) == 0: |
| return refined, snap |
|
|
| track_xyz = np.array([t.xyz for t in good_tracks], dtype=np.float64) |
| for i in range(len(refined)): |
| d = np.linalg.norm(track_xyz - refined[i], axis=1) |
| j = int(np.argmin(d)) |
| if d[j] <= snap_radius: |
| refined[i] = track_xyz[j] |
| snap[i] = True |
| return refined, snap |
|
|
|
|
| def augment_with_tracks( |
| merged_v: np.ndarray, |
| heur_edges: list, |
| tracks: list[Track], |
| dup_radius: float = 0.4, |
| min_views_for_add: int = 3, |
| max_reproj_err_px: float = 2.5, |
| ) -> tuple[np.ndarray, list]: |
| """Append high-confidence triangulated tracks as new vertices. |
| |
| Unlike ``refine_vertices_with_tracks`` (which moves existing vertices and |
| risks regressions on already-good ones), this only adds new points that |
| sit more than ``dup_radius`` metres from any existing vertex. |
| |
| The edge list is returned unchanged β new vertices only get edges via the |
| downstream sklearn classifier or heuristic edge-detection step, not here. |
| """ |
| merged = np.asarray(merged_v, dtype=np.float64) |
| confident = [t for t in tracks |
| if len(t.observations) >= min_views_for_add |
| and t.reproj_err <= max_reproj_err_px] |
| if not confident: |
| return merged, heur_edges |
| tvs = np.array([t.xyz for t in confident], dtype=np.float64) |
| if len(merged) == 0: |
| return tvs, heur_edges |
| |
| diffs = tvs[:, None, :] - merged[None, :, :] |
| dists = np.sqrt((diffs ** 2).sum(-1)) |
| min_d = dists.min(axis=1) |
| new = tvs[min_d > dup_radius] |
| if len(new) == 0: |
| return merged, heur_edges |
| augmented = np.vstack([merged, new]) |
| return augmented, heur_edges |
|
|