"""Plane-intersection wireframe predictor (Tier 2). Classical-geometry pipeline, orthogonal to the gestalt + depth path: 1. Crop the COLMAP sparse cloud to the top portion along the up-axis so that only roof points remain (the dataset uses +Y as up). 2. Iteratively RANSAC-segment the cropped cloud into planes (open3d). 3. Keep only planes whose normal has a significant +Y component (roof slopes) or is near-horizontal (flat roof / eaves). 4. For each pair of surviving planes, compute the infinite intersection line via scikit-spatial and clip it to the overlap of the two inlier sets (percentile endpoints with a perpendicular tolerance). 5. Vertices = segment endpoints ∪ triple-plane intersections, merged at a small radius. 6. Edges = clipped segments remapped onto the merged vertex set. Only numpy / open3d / scikit-spatial / pycolmap are used — no torch. The main entry point is :func:`predict_wireframe_planes`, which returns ``(vertices, edges)`` in the format expected by ``hss()``. """ from __future__ import annotations import numpy as np import open3d as o3d from skspatial.objects import Plane as SkPlane from hoho2025.example_solutions import ( convert_entry_to_human_readable, empty_solution, ) UP_AXIS = 1 # +Y is up in this dataset (verified across 15 validation samples) # --------------------------------------------------------------------------- # Plane data structure # --------------------------------------------------------------------------- class RoofPlane: """A planar segment of the roof point cloud. ``eq`` stores a normalised (a, b, c, d) plane equation such that ``|n| = 1`` and ``a*x + b*y + c*z + d = 0``. """ __slots__ = ("eq", "normal", "d", "inliers") def __init__(self, eq: np.ndarray, inliers: np.ndarray): eq = np.asarray(eq, dtype=np.float64) n = eq[:3] nn = np.linalg.norm(n) if nn > 1e-9: eq = eq / nn self.eq = eq self.normal = eq[:3] self.d = float(eq[3]) self.inliers = np.asarray(inliers, dtype=np.float64) def signed_distance(self, pts: np.ndarray) -> np.ndarray: return pts @ self.normal + self.d # --------------------------------------------------------------------------- # Roof crop # --------------------------------------------------------------------------- def crop_to_roof( xyz: np.ndarray, up_axis: int = UP_AXIS, top_frac: float = 0.70, pad: float = 1.0, ) -> np.ndarray: """Keep points whose up-axis coordinate is in the top ``top_frac`` of the distribution. COLMAP reconstructions include ground, walls, vegetation and roof. The roof corners live in the upper Y range. A fractional cut along the up axis is a robust proxy that does not need any external scale calibration and works for both peaked and flat roofs. """ if len(xyz) == 0: return xyz up = xyz[:, up_axis] lo, hi = float(up.min()), float(up.max()) if hi - lo < 1e-6: return xyz threshold = lo + (hi - lo) * (1.0 - top_frac) - pad mask = up >= threshold return xyz[mask] def _is_roof_normal(normal: np.ndarray, up_axis: int = UP_AXIS, min_up: float = 0.15) -> bool: """A roof plane either has significant vertical component (pitched surface) or is nearly horizontal (flat roof). Walls have ``|n_up| ≈ 0`` and are rejected. """ return abs(float(normal[up_axis])) >= min_up # --------------------------------------------------------------------------- # T2.1 Iterative RANSAC plane segmentation (open3d backend) # --------------------------------------------------------------------------- def segment_roof_planes( xyz: np.ndarray, distance_threshold: float = 0.15, ransac_n: int = 3, num_iterations: int = 1000, min_inliers: int = 60, max_planes: int = 8, roof_crop_top_frac: float = 0.70, crop_pad: float = 1.0, keep_walls: bool = True, ) -> list[RoofPlane]: """Sequentially RANSAC-fit roof planes. Crops the cloud to the top ``roof_crop_top_frac`` along +Y first, then iteratively removes inliers until no plane with at least ``min_inliers`` remains or ``max_planes`` have been found. Planes whose normal is nearly perpendicular to the up axis (walls) are dropped. """ cropped = crop_to_roof(xyz, top_frac=roof_crop_top_frac, pad=crop_pad) if len(cropped) < min_inliers * 2: # Fall back to the full cloud if the crop is too aggressive. cropped = np.asarray(xyz, dtype=np.float64) if len(cropped) < min_inliers: return [] remaining = cropped.copy() planes: list[RoofPlane] = [] pcd = o3d.geometry.PointCloud() for _ in range(max_planes): if len(remaining) < min_inliers: break pcd.points = o3d.utility.Vector3dVector(remaining) try: eq, inlier_idx = pcd.segment_plane( distance_threshold=distance_threshold, ransac_n=ransac_n, num_iterations=num_iterations, ) except Exception: break if len(inlier_idx) < min_inliers: break eq = np.asarray(eq, dtype=np.float64) inliers = remaining[np.asarray(inlier_idx, dtype=np.int64)] normal = eq[:3] / (np.linalg.norm(eq[:3]) + 1e-12) if keep_walls or _is_roof_normal(normal): planes.append(RoofPlane(eq, inliers)) # Always remove inliers from the remaining cloud even for rejected # planes, otherwise RANSAC keeps returning the same ones. keep_mask = np.ones(len(remaining), dtype=bool) keep_mask[np.asarray(inlier_idx, dtype=np.int64)] = False remaining = remaining[keep_mask] return planes # --------------------------------------------------------------------------- # T2.2 Plane-pair intersection line (scikit-spatial) # --------------------------------------------------------------------------- def intersect_two_planes( p1: RoofPlane, p2: RoofPlane, parallel_cos: float = 0.995, ) -> tuple[np.ndarray, np.ndarray] | None: """Return ``(point_on_line, unit_direction)`` or ``None`` if near parallel.""" dot = abs(float(np.dot(p1.normal, p2.normal))) if dot >= parallel_cos: return None sk1 = SkPlane(point=-p1.d * p1.normal, normal=p1.normal) sk2 = SkPlane(point=-p2.d * p2.normal, normal=p2.normal) try: line = sk1.intersect_plane(sk2) except Exception: return None point = np.asarray(line.point, dtype=np.float64) direction = np.asarray(line.direction, dtype=np.float64) norm = np.linalg.norm(direction) if norm < 1e-9: return None return point, direction / norm # --------------------------------------------------------------------------- # T2.3 Clip the line to a real segment # --------------------------------------------------------------------------- def clip_line_to_segment( point: np.ndarray, direction: np.ndarray, p1: RoofPlane, p2: RoofPlane, perp_tol: float = 0.4, trim_pct: float = 5.0, min_length: float = 0.3, ) -> tuple[np.ndarray, np.ndarray] | None: """Clip the infinite line to the overlap region of the two inlier sets. Only inliers whose projection onto the line is within ``perp_tol`` of the line contribute — otherwise a large plane would stretch the intersection far outside the real roof feature. The segment endpoints are the 5th / 95th percentile of projected scalars taken over the union of the two filtered sets. """ endpoints_s = [] for plane in (p1, p2): rel = plane.inliers - point s = rel @ direction perp = rel - s[:, None] * direction d_perp = np.linalg.norm(perp, axis=1) near = s[d_perp <= perp_tol] if len(near) >= 5: endpoints_s.append(near) if not endpoints_s: return None all_s = np.concatenate(endpoints_s) if len(all_s) < 5: return None lo, hi = np.percentile(all_s, [trim_pct, 100.0 - trim_pct]) if hi - lo < min_length: return None a = point + lo * direction b = point + hi * direction return a, b # --------------------------------------------------------------------------- # T2.4 Triple-plane corners + vertex dedup # --------------------------------------------------------------------------- def _triple_plane_corners( planes: list[RoofPlane], max_dist_to_inlier: float = 1.0, ) -> list[np.ndarray]: """Solve the 3x3 linear system for every non-collinear triple. A corner is kept only if every one of the three parent planes has at least one inlier within ``max_dist_to_inlier`` of the computed point, which removes ghost intersections far outside the roof. """ out: list[np.ndarray] = [] n = len(planes) for i in range(n): for j in range(i + 1, n): for k in range(j + 1, n): A = np.vstack([planes[i].normal, planes[j].normal, planes[k].normal]) if abs(float(np.linalg.det(A))) < 1e-3: continue b = -np.array([planes[i].d, planes[j].d, planes[k].d]) try: X = np.linalg.solve(A, b) except np.linalg.LinAlgError: continue ok = True for p in (planes[i], planes[j], planes[k]): if np.linalg.norm(p.inliers - X, axis=1).min() > max_dist_to_inlier: ok = False break if ok: out.append(X) return out def _merge_points(points: np.ndarray, radius: float) -> tuple[np.ndarray, np.ndarray]: """Greedy dedup by nearest-cluster assignment.""" pts = np.asarray(points, dtype=np.float64) if len(pts) == 0: return np.empty((0, 3)), np.empty((0,), dtype=np.int64) mapping = np.full(len(pts), -1, dtype=np.int64) clusters: list[list[int]] = [] centroids: list[np.ndarray] = [] for i, p in enumerate(pts): if not centroids: clusters.append([i]) centroids.append(p.copy()) mapping[i] = 0 continue c_arr = np.array(centroids) d = np.linalg.norm(c_arr - p, axis=1) j = int(np.argmin(d)) if d[j] <= radius: clusters[j].append(i) centroids[j] = pts[clusters[j]].mean(axis=0) mapping[i] = j else: clusters.append([i]) centroids.append(p.copy()) mapping[i] = len(centroids) - 1 merged = np.array(centroids, dtype=np.float64) return merged, mapping # --------------------------------------------------------------------------- # T2.7 Hybrid integration helpers: snap intersection lines to existing # sklearn-derived vertices. # --------------------------------------------------------------------------- def edges_from_planes_and_vertices( vertices: np.ndarray, planes: list[RoofPlane], perp_tol: float = 0.6, min_length: float = 0.5, max_length: float = 10.0, ) -> list[tuple[int, int]]: """Vote edges between vertices using plane-pair intersection lines. For each line ``L_ij = plane_i ∩ plane_j``: * find all ``vertices`` whose perpendicular distance to L_ij is below ``perp_tol``, * pair the two extremes along the line direction as an edge. The result is a set of 3D edges supported by plane geometry. Because the vertices come from sklearn's depth-based detection, positions are noisy but complete — while the lines come from RANSAC on thousands of COLMAP points and are very accurate in direction. Matching the two gives clean roof ridges / eaves without depending on 2D fitLine noise. """ if len(vertices) < 2 or len(planes) < 2: return [] V = np.asarray(vertices, dtype=np.float64) edges: set[tuple[int, int]] = set() for i in range(len(planes)): for j in range(i + 1, len(planes)): inter = intersect_two_planes(planes[i], planes[j]) if inter is None: continue point, direction = inter rel = V - point s = rel @ direction perp = rel - s[:, None] * direction d_perp = np.linalg.norm(perp, axis=1) near_idx = np.where(d_perp <= perp_tol)[0] if len(near_idx) < 2: continue # Take the two vertices with the most extreme projections s_near = s[near_idx] a = int(near_idx[np.argmin(s_near)]) b = int(near_idx[np.argmax(s_near)]) if a == b: continue dist3d = float(np.linalg.norm(V[a] - V[b])) if dist3d < min_length or dist3d > max_length: continue lo, hi = (a, b) if a < b else (b, a) edges.add((lo, hi)) # Additionally, for each adjacent pair of projections along the # line, add them as an edge if the 3D distance is reasonable. order = np.argsort(s[near_idx]) sorted_idx = near_idx[order] for k in range(len(sorted_idx) - 1): x = int(sorted_idx[k]) y = int(sorted_idx[k + 1]) d = float(np.linalg.norm(V[x] - V[y])) if d < min_length or d > max_length: continue lo, hi = (x, y) if x < y else (y, x) edges.add((lo, hi)) return list(edges) def predict_plane_edges(entry, vertices: np.ndarray, distance_threshold: float = 0.20, min_inliers: int = 60, max_planes: int = 10, roof_crop_top_frac: float = 0.95, perp_tol: float = 0.8, ) -> list[tuple[int, int]]: """High-level helper: given a sklearn wireframe's vertices, return a list of extra edges supported by plane-pair intersection geometry. """ good = convert_entry_to_human_readable(entry) colmap_rec = good.get("colmap") or good.get("colmap_binary") if colmap_rec is None: return [] all_xyz = np.array([p.xyz for p in colmap_rec.points3D.values()], dtype=np.float64) if len(all_xyz) < min_inliers * 2: return [] planes = segment_roof_planes( all_xyz, distance_threshold=distance_threshold, min_inliers=min_inliers, max_planes=max_planes, roof_crop_top_frac=roof_crop_top_frac, ) if len(planes) < 2: return [] return edges_from_planes_and_vertices(vertices, planes, perp_tol=perp_tol) # --------------------------------------------------------------------------- # T2.6 Standalone predictor # --------------------------------------------------------------------------- def predict_wireframe_planes( entry, distance_threshold: float = 0.15, min_inliers: int = 60, max_planes: int = 8, perp_tol: float = 0.4, merge_radius: float = 0.35, roof_crop_top_frac: float = 0.55, ) -> tuple[np.ndarray, list[tuple[int, int]]]: """Build a wireframe from COLMAP sparse points via plane intersection.""" good = convert_entry_to_human_readable(entry) colmap_rec = good.get("colmap") or good.get("colmap_binary") if colmap_rec is None: return empty_solution() all_xyz = np.array([p.xyz for p in colmap_rec.points3D.values()], dtype=np.float64) if len(all_xyz) < min_inliers * 2: return empty_solution() planes = segment_roof_planes( all_xyz, distance_threshold=distance_threshold, min_inliers=min_inliers, max_planes=max_planes, roof_crop_top_frac=roof_crop_top_frac, ) if len(planes) < 2: return empty_solution() endpoint_pool: list[np.ndarray] = [] segments: list[tuple[int, int]] = [] for i in range(len(planes)): for j in range(i + 1, len(planes)): inter = intersect_two_planes(planes[i], planes[j]) if inter is None: continue point, direction = inter seg = clip_line_to_segment( point, direction, planes[i], planes[j], perp_tol=perp_tol ) if seg is None: continue a, b = seg ia = len(endpoint_pool) endpoint_pool.append(a) ib = len(endpoint_pool) endpoint_pool.append(b) segments.append((ia, ib)) if not segments: return empty_solution() corners = _triple_plane_corners(planes) endpoint_pool.extend(corners) all_pts = np.asarray(endpoint_pool, dtype=np.float64) merged, mapping = _merge_points(all_pts, radius=merge_radius) edge_set: set[tuple[int, int]] = set() for ia, ib in segments: ma = int(mapping[ia]) mb = int(mapping[ib]) if ma == mb: continue lo, hi = (ma, mb) if ma < mb else (mb, ma) edge_set.add((lo, hi)) if not edge_set or len(merged) < 2: return empty_solution() return merged, [(int(a), int(b)) for a, b in edge_set]