subm / plane_wireframe.py
Neritz's picture
Add handcrafted_submission_2026 contents
5b6e3d8 verified
"""Plane-intersection wireframe predictor (Tier 2).
Classical-geometry pipeline, orthogonal to the gestalt + depth path:
1. Crop the COLMAP sparse cloud to the top portion along the up-axis so that
only roof points remain (the dataset uses +Y as up).
2. Iteratively RANSAC-segment the cropped cloud into planes (open3d).
3. Keep only planes whose normal has a significant +Y component (roof
slopes) or is near-horizontal (flat roof / eaves).
4. For each pair of surviving planes, compute the infinite intersection
line via scikit-spatial and clip it to the overlap of the two inlier
sets (percentile endpoints with a perpendicular tolerance).
5. Vertices = segment endpoints ∪ triple-plane intersections, merged at
a small radius.
6. Edges = clipped segments remapped onto the merged vertex set.
Only numpy / open3d / scikit-spatial / pycolmap are used — no torch.
The main entry point is :func:`predict_wireframe_planes`, which returns
``(vertices, edges)`` in the format expected by ``hss()``.
"""
from __future__ import annotations
import numpy as np
import open3d as o3d
from skspatial.objects import Plane as SkPlane
from hoho2025.example_solutions import (
convert_entry_to_human_readable,
empty_solution,
)
UP_AXIS = 1 # +Y is up in this dataset (verified across 15 validation samples)
# ---------------------------------------------------------------------------
# Plane data structure
# ---------------------------------------------------------------------------
class RoofPlane:
"""A planar segment of the roof point cloud.
``eq`` stores a normalised (a, b, c, d) plane equation such that
``|n| = 1`` and ``a*x + b*y + c*z + d = 0``.
"""
__slots__ = ("eq", "normal", "d", "inliers")
def __init__(self, eq: np.ndarray, inliers: np.ndarray):
eq = np.asarray(eq, dtype=np.float64)
n = eq[:3]
nn = np.linalg.norm(n)
if nn > 1e-9:
eq = eq / nn
self.eq = eq
self.normal = eq[:3]
self.d = float(eq[3])
self.inliers = np.asarray(inliers, dtype=np.float64)
def signed_distance(self, pts: np.ndarray) -> np.ndarray:
return pts @ self.normal + self.d
# ---------------------------------------------------------------------------
# Roof crop
# ---------------------------------------------------------------------------
def crop_to_roof(
xyz: np.ndarray,
up_axis: int = UP_AXIS,
top_frac: float = 0.70,
pad: float = 1.0,
) -> np.ndarray:
"""Keep points whose up-axis coordinate is in the top ``top_frac`` of the
distribution.
COLMAP reconstructions include ground, walls, vegetation and roof. The
roof corners live in the upper Y range. A fractional cut along the up
axis is a robust proxy that does not need any external scale calibration
and works for both peaked and flat roofs.
"""
if len(xyz) == 0:
return xyz
up = xyz[:, up_axis]
lo, hi = float(up.min()), float(up.max())
if hi - lo < 1e-6:
return xyz
threshold = lo + (hi - lo) * (1.0 - top_frac) - pad
mask = up >= threshold
return xyz[mask]
def _is_roof_normal(normal: np.ndarray, up_axis: int = UP_AXIS,
min_up: float = 0.15) -> bool:
"""A roof plane either has significant vertical component (pitched
surface) or is nearly horizontal (flat roof). Walls have ``|n_up| ≈ 0``
and are rejected.
"""
return abs(float(normal[up_axis])) >= min_up
# ---------------------------------------------------------------------------
# T2.1 Iterative RANSAC plane segmentation (open3d backend)
# ---------------------------------------------------------------------------
def segment_roof_planes(
xyz: np.ndarray,
distance_threshold: float = 0.15,
ransac_n: int = 3,
num_iterations: int = 1000,
min_inliers: int = 60,
max_planes: int = 8,
roof_crop_top_frac: float = 0.70,
crop_pad: float = 1.0,
keep_walls: bool = True,
) -> list[RoofPlane]:
"""Sequentially RANSAC-fit roof planes.
Crops the cloud to the top ``roof_crop_top_frac`` along +Y first, then
iteratively removes inliers until no plane with at least ``min_inliers``
remains or ``max_planes`` have been found. Planes whose normal is nearly
perpendicular to the up axis (walls) are dropped.
"""
cropped = crop_to_roof(xyz, top_frac=roof_crop_top_frac, pad=crop_pad)
if len(cropped) < min_inliers * 2:
# Fall back to the full cloud if the crop is too aggressive.
cropped = np.asarray(xyz, dtype=np.float64)
if len(cropped) < min_inliers:
return []
remaining = cropped.copy()
planes: list[RoofPlane] = []
pcd = o3d.geometry.PointCloud()
for _ in range(max_planes):
if len(remaining) < min_inliers:
break
pcd.points = o3d.utility.Vector3dVector(remaining)
try:
eq, inlier_idx = pcd.segment_plane(
distance_threshold=distance_threshold,
ransac_n=ransac_n,
num_iterations=num_iterations,
)
except Exception:
break
if len(inlier_idx) < min_inliers:
break
eq = np.asarray(eq, dtype=np.float64)
inliers = remaining[np.asarray(inlier_idx, dtype=np.int64)]
normal = eq[:3] / (np.linalg.norm(eq[:3]) + 1e-12)
if keep_walls or _is_roof_normal(normal):
planes.append(RoofPlane(eq, inliers))
# Always remove inliers from the remaining cloud even for rejected
# planes, otherwise RANSAC keeps returning the same ones.
keep_mask = np.ones(len(remaining), dtype=bool)
keep_mask[np.asarray(inlier_idx, dtype=np.int64)] = False
remaining = remaining[keep_mask]
return planes
# ---------------------------------------------------------------------------
# T2.2 Plane-pair intersection line (scikit-spatial)
# ---------------------------------------------------------------------------
def intersect_two_planes(
p1: RoofPlane, p2: RoofPlane, parallel_cos: float = 0.995,
) -> tuple[np.ndarray, np.ndarray] | None:
"""Return ``(point_on_line, unit_direction)`` or ``None`` if near parallel."""
dot = abs(float(np.dot(p1.normal, p2.normal)))
if dot >= parallel_cos:
return None
sk1 = SkPlane(point=-p1.d * p1.normal, normal=p1.normal)
sk2 = SkPlane(point=-p2.d * p2.normal, normal=p2.normal)
try:
line = sk1.intersect_plane(sk2)
except Exception:
return None
point = np.asarray(line.point, dtype=np.float64)
direction = np.asarray(line.direction, dtype=np.float64)
norm = np.linalg.norm(direction)
if norm < 1e-9:
return None
return point, direction / norm
# ---------------------------------------------------------------------------
# T2.3 Clip the line to a real segment
# ---------------------------------------------------------------------------
def clip_line_to_segment(
point: np.ndarray,
direction: np.ndarray,
p1: RoofPlane,
p2: RoofPlane,
perp_tol: float = 0.4,
trim_pct: float = 5.0,
min_length: float = 0.3,
) -> tuple[np.ndarray, np.ndarray] | None:
"""Clip the infinite line to the overlap region of the two inlier sets.
Only inliers whose projection onto the line is within ``perp_tol`` of the
line contribute — otherwise a large plane would stretch the intersection
far outside the real roof feature. The segment endpoints are the
5th / 95th percentile of projected scalars taken over the union of the
two filtered sets.
"""
endpoints_s = []
for plane in (p1, p2):
rel = plane.inliers - point
s = rel @ direction
perp = rel - s[:, None] * direction
d_perp = np.linalg.norm(perp, axis=1)
near = s[d_perp <= perp_tol]
if len(near) >= 5:
endpoints_s.append(near)
if not endpoints_s:
return None
all_s = np.concatenate(endpoints_s)
if len(all_s) < 5:
return None
lo, hi = np.percentile(all_s, [trim_pct, 100.0 - trim_pct])
if hi - lo < min_length:
return None
a = point + lo * direction
b = point + hi * direction
return a, b
# ---------------------------------------------------------------------------
# T2.4 Triple-plane corners + vertex dedup
# ---------------------------------------------------------------------------
def _triple_plane_corners(
planes: list[RoofPlane], max_dist_to_inlier: float = 1.0,
) -> list[np.ndarray]:
"""Solve the 3x3 linear system for every non-collinear triple.
A corner is kept only if every one of the three parent planes has at
least one inlier within ``max_dist_to_inlier`` of the computed point,
which removes ghost intersections far outside the roof.
"""
out: list[np.ndarray] = []
n = len(planes)
for i in range(n):
for j in range(i + 1, n):
for k in range(j + 1, n):
A = np.vstack([planes[i].normal, planes[j].normal, planes[k].normal])
if abs(float(np.linalg.det(A))) < 1e-3:
continue
b = -np.array([planes[i].d, planes[j].d, planes[k].d])
try:
X = np.linalg.solve(A, b)
except np.linalg.LinAlgError:
continue
ok = True
for p in (planes[i], planes[j], planes[k]):
if np.linalg.norm(p.inliers - X, axis=1).min() > max_dist_to_inlier:
ok = False
break
if ok:
out.append(X)
return out
def _merge_points(points: np.ndarray, radius: float) -> tuple[np.ndarray, np.ndarray]:
"""Greedy dedup by nearest-cluster assignment."""
pts = np.asarray(points, dtype=np.float64)
if len(pts) == 0:
return np.empty((0, 3)), np.empty((0,), dtype=np.int64)
mapping = np.full(len(pts), -1, dtype=np.int64)
clusters: list[list[int]] = []
centroids: list[np.ndarray] = []
for i, p in enumerate(pts):
if not centroids:
clusters.append([i])
centroids.append(p.copy())
mapping[i] = 0
continue
c_arr = np.array(centroids)
d = np.linalg.norm(c_arr - p, axis=1)
j = int(np.argmin(d))
if d[j] <= radius:
clusters[j].append(i)
centroids[j] = pts[clusters[j]].mean(axis=0)
mapping[i] = j
else:
clusters.append([i])
centroids.append(p.copy())
mapping[i] = len(centroids) - 1
merged = np.array(centroids, dtype=np.float64)
return merged, mapping
# ---------------------------------------------------------------------------
# T2.7 Hybrid integration helpers: snap intersection lines to existing
# sklearn-derived vertices.
# ---------------------------------------------------------------------------
def edges_from_planes_and_vertices(
vertices: np.ndarray,
planes: list[RoofPlane],
perp_tol: float = 0.6,
min_length: float = 0.5,
max_length: float = 10.0,
) -> list[tuple[int, int]]:
"""Vote edges between vertices using plane-pair intersection lines.
For each line ``L_ij = plane_i ∩ plane_j``:
* find all ``vertices`` whose perpendicular distance to L_ij is
below ``perp_tol``,
* pair the two extremes along the line direction as an edge.
The result is a set of 3D edges supported by plane geometry. Because
the vertices come from sklearn's depth-based detection, positions are
noisy but complete — while the lines come from RANSAC on thousands
of COLMAP points and are very accurate in direction. Matching the two
gives clean roof ridges / eaves without depending on 2D fitLine noise.
"""
if len(vertices) < 2 or len(planes) < 2:
return []
V = np.asarray(vertices, dtype=np.float64)
edges: set[tuple[int, int]] = set()
for i in range(len(planes)):
for j in range(i + 1, len(planes)):
inter = intersect_two_planes(planes[i], planes[j])
if inter is None:
continue
point, direction = inter
rel = V - point
s = rel @ direction
perp = rel - s[:, None] * direction
d_perp = np.linalg.norm(perp, axis=1)
near_idx = np.where(d_perp <= perp_tol)[0]
if len(near_idx) < 2:
continue
# Take the two vertices with the most extreme projections
s_near = s[near_idx]
a = int(near_idx[np.argmin(s_near)])
b = int(near_idx[np.argmax(s_near)])
if a == b:
continue
dist3d = float(np.linalg.norm(V[a] - V[b]))
if dist3d < min_length or dist3d > max_length:
continue
lo, hi = (a, b) if a < b else (b, a)
edges.add((lo, hi))
# Additionally, for each adjacent pair of projections along the
# line, add them as an edge if the 3D distance is reasonable.
order = np.argsort(s[near_idx])
sorted_idx = near_idx[order]
for k in range(len(sorted_idx) - 1):
x = int(sorted_idx[k])
y = int(sorted_idx[k + 1])
d = float(np.linalg.norm(V[x] - V[y]))
if d < min_length or d > max_length:
continue
lo, hi = (x, y) if x < y else (y, x)
edges.add((lo, hi))
return list(edges)
def predict_plane_edges(entry, vertices: np.ndarray,
distance_threshold: float = 0.20,
min_inliers: int = 60,
max_planes: int = 10,
roof_crop_top_frac: float = 0.95,
perp_tol: float = 0.8,
) -> list[tuple[int, int]]:
"""High-level helper: given a sklearn wireframe's vertices, return a
list of extra edges supported by plane-pair intersection geometry.
"""
good = convert_entry_to_human_readable(entry)
colmap_rec = good.get("colmap") or good.get("colmap_binary")
if colmap_rec is None:
return []
all_xyz = np.array([p.xyz for p in colmap_rec.points3D.values()], dtype=np.float64)
if len(all_xyz) < min_inliers * 2:
return []
planes = segment_roof_planes(
all_xyz,
distance_threshold=distance_threshold,
min_inliers=min_inliers,
max_planes=max_planes,
roof_crop_top_frac=roof_crop_top_frac,
)
if len(planes) < 2:
return []
return edges_from_planes_and_vertices(vertices, planes, perp_tol=perp_tol)
# ---------------------------------------------------------------------------
# T2.6 Standalone predictor
# ---------------------------------------------------------------------------
def predict_wireframe_planes(
entry,
distance_threshold: float = 0.15,
min_inliers: int = 60,
max_planes: int = 8,
perp_tol: float = 0.4,
merge_radius: float = 0.35,
roof_crop_top_frac: float = 0.55,
) -> tuple[np.ndarray, list[tuple[int, int]]]:
"""Build a wireframe from COLMAP sparse points via plane intersection."""
good = convert_entry_to_human_readable(entry)
colmap_rec = good.get("colmap") or good.get("colmap_binary")
if colmap_rec is None:
return empty_solution()
all_xyz = np.array([p.xyz for p in colmap_rec.points3D.values()], dtype=np.float64)
if len(all_xyz) < min_inliers * 2:
return empty_solution()
planes = segment_roof_planes(
all_xyz,
distance_threshold=distance_threshold,
min_inliers=min_inliers,
max_planes=max_planes,
roof_crop_top_frac=roof_crop_top_frac,
)
if len(planes) < 2:
return empty_solution()
endpoint_pool: list[np.ndarray] = []
segments: list[tuple[int, int]] = []
for i in range(len(planes)):
for j in range(i + 1, len(planes)):
inter = intersect_two_planes(planes[i], planes[j])
if inter is None:
continue
point, direction = inter
seg = clip_line_to_segment(
point, direction, planes[i], planes[j], perp_tol=perp_tol
)
if seg is None:
continue
a, b = seg
ia = len(endpoint_pool)
endpoint_pool.append(a)
ib = len(endpoint_pool)
endpoint_pool.append(b)
segments.append((ia, ib))
if not segments:
return empty_solution()
corners = _triple_plane_corners(planes)
endpoint_pool.extend(corners)
all_pts = np.asarray(endpoint_pool, dtype=np.float64)
merged, mapping = _merge_points(all_pts, radius=merge_radius)
edge_set: set[tuple[int, int]] = set()
for ia, ib in segments:
ma = int(mapping[ia])
mb = int(mapping[ib])
if ma == mb:
continue
lo, hi = (ma, mb) if ma < mb else (mb, ma)
edge_set.add((lo, hi))
if not edge_set or len(merged) < 2:
return empty_solution()
return merged, [(int(a), int(b)) for a, b in edge_set]