File size: 17,247 Bytes
31f43c9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
"""Plane-intersection wireframe predictor (Tier 2).

Classical-geometry pipeline, orthogonal to the gestalt + depth path:

1. Crop the COLMAP sparse cloud to the top portion along the up-axis so that
   only roof points remain (the dataset uses +Y as up).
2. Iteratively RANSAC-segment the cropped cloud into planes (open3d).
3. Keep only planes whose normal has a significant +Y component (roof
   slopes) or is near-horizontal (flat roof / eaves).
4. For each pair of surviving planes, compute the infinite intersection
   line via scikit-spatial and clip it to the overlap of the two inlier
   sets (percentile endpoints with a perpendicular tolerance).
5. Vertices = segment endpoints ∪ triple-plane intersections, merged at
   a small radius.
6. Edges = clipped segments remapped onto the merged vertex set.

Only numpy / open3d / scikit-spatial / pycolmap are used — no torch.

The main entry point is :func:`predict_wireframe_planes`, which returns
``(vertices, edges)`` in the format expected by ``hss()``.
"""

from __future__ import annotations

import numpy as np
import open3d as o3d
from skspatial.objects import Plane as SkPlane

from hoho2025.example_solutions import (
    convert_entry_to_human_readable,
    empty_solution,
)


UP_AXIS = 1  # +Y is up in this dataset (verified across 15 validation samples)


# ---------------------------------------------------------------------------
# Plane data structure
# ---------------------------------------------------------------------------

class RoofPlane:
    """A planar segment of the roof point cloud.

    ``eq`` stores a normalised (a, b, c, d) plane equation such that
    ``|n| = 1`` and ``a*x + b*y + c*z + d = 0``.
    """

    __slots__ = ("eq", "normal", "d", "inliers")

    def __init__(self, eq: np.ndarray, inliers: np.ndarray):
        eq = np.asarray(eq, dtype=np.float64)
        n = eq[:3]
        nn = np.linalg.norm(n)
        if nn > 1e-9:
            eq = eq / nn
        self.eq = eq
        self.normal = eq[:3]
        self.d = float(eq[3])
        self.inliers = np.asarray(inliers, dtype=np.float64)

    def signed_distance(self, pts: np.ndarray) -> np.ndarray:
        return pts @ self.normal + self.d


# ---------------------------------------------------------------------------
# Roof crop
# ---------------------------------------------------------------------------

def crop_to_roof(
    xyz: np.ndarray,
    up_axis: int = UP_AXIS,
    top_frac: float = 0.70,
    pad: float = 1.0,
) -> np.ndarray:
    """Keep points whose up-axis coordinate is in the top ``top_frac`` of the
    distribution.

    COLMAP reconstructions include ground, walls, vegetation and roof. The
    roof corners live in the upper Y range. A fractional cut along the up
    axis is a robust proxy that does not need any external scale calibration
    and works for both peaked and flat roofs.
    """
    if len(xyz) == 0:
        return xyz
    up = xyz[:, up_axis]
    lo, hi = float(up.min()), float(up.max())
    if hi - lo < 1e-6:
        return xyz
    threshold = lo + (hi - lo) * (1.0 - top_frac) - pad
    mask = up >= threshold
    return xyz[mask]


def _is_roof_normal(normal: np.ndarray, up_axis: int = UP_AXIS,
                    min_up: float = 0.15) -> bool:
    """A roof plane either has significant vertical component (pitched
    surface) or is nearly horizontal (flat roof). Walls have ``|n_up| ≈ 0``
    and are rejected.
    """
    return abs(float(normal[up_axis])) >= min_up


# ---------------------------------------------------------------------------
# T2.1 Iterative RANSAC plane segmentation (open3d backend)
# ---------------------------------------------------------------------------

def segment_roof_planes(
    xyz: np.ndarray,
    distance_threshold: float = 0.15,
    ransac_n: int = 3,
    num_iterations: int = 1000,
    min_inliers: int = 60,
    max_planes: int = 8,
    roof_crop_top_frac: float = 0.70,
    crop_pad: float = 1.0,
    keep_walls: bool = True,
) -> list[RoofPlane]:
    """Sequentially RANSAC-fit roof planes.

    Crops the cloud to the top ``roof_crop_top_frac`` along +Y first, then
    iteratively removes inliers until no plane with at least ``min_inliers``
    remains or ``max_planes`` have been found. Planes whose normal is nearly
    perpendicular to the up axis (walls) are dropped.
    """
    cropped = crop_to_roof(xyz, top_frac=roof_crop_top_frac, pad=crop_pad)
    if len(cropped) < min_inliers * 2:
        # Fall back to the full cloud if the crop is too aggressive.
        cropped = np.asarray(xyz, dtype=np.float64)
    if len(cropped) < min_inliers:
        return []

    remaining = cropped.copy()
    planes: list[RoofPlane] = []

    pcd = o3d.geometry.PointCloud()
    for _ in range(max_planes):
        if len(remaining) < min_inliers:
            break
        pcd.points = o3d.utility.Vector3dVector(remaining)
        try:
            eq, inlier_idx = pcd.segment_plane(
                distance_threshold=distance_threshold,
                ransac_n=ransac_n,
                num_iterations=num_iterations,
            )
        except Exception:
            break
        if len(inlier_idx) < min_inliers:
            break
        eq = np.asarray(eq, dtype=np.float64)
        inliers = remaining[np.asarray(inlier_idx, dtype=np.int64)]
        normal = eq[:3] / (np.linalg.norm(eq[:3]) + 1e-12)
        if keep_walls or _is_roof_normal(normal):
            planes.append(RoofPlane(eq, inliers))
        # Always remove inliers from the remaining cloud even for rejected
        # planes, otherwise RANSAC keeps returning the same ones.
        keep_mask = np.ones(len(remaining), dtype=bool)
        keep_mask[np.asarray(inlier_idx, dtype=np.int64)] = False
        remaining = remaining[keep_mask]

    return planes


# ---------------------------------------------------------------------------
# T2.2 Plane-pair intersection line (scikit-spatial)
# ---------------------------------------------------------------------------

def intersect_two_planes(
    p1: RoofPlane, p2: RoofPlane, parallel_cos: float = 0.995,
) -> tuple[np.ndarray, np.ndarray] | None:
    """Return ``(point_on_line, unit_direction)`` or ``None`` if near parallel."""
    dot = abs(float(np.dot(p1.normal, p2.normal)))
    if dot >= parallel_cos:
        return None
    sk1 = SkPlane(point=-p1.d * p1.normal, normal=p1.normal)
    sk2 = SkPlane(point=-p2.d * p2.normal, normal=p2.normal)
    try:
        line = sk1.intersect_plane(sk2)
    except Exception:
        return None
    point = np.asarray(line.point, dtype=np.float64)
    direction = np.asarray(line.direction, dtype=np.float64)
    norm = np.linalg.norm(direction)
    if norm < 1e-9:
        return None
    return point, direction / norm


# ---------------------------------------------------------------------------
# T2.3 Clip the line to a real segment
# ---------------------------------------------------------------------------

def clip_line_to_segment(
    point: np.ndarray,
    direction: np.ndarray,
    p1: RoofPlane,
    p2: RoofPlane,
    perp_tol: float = 0.4,
    trim_pct: float = 5.0,
    min_length: float = 0.3,
) -> tuple[np.ndarray, np.ndarray] | None:
    """Clip the infinite line to the overlap region of the two inlier sets.

    Only inliers whose projection onto the line is within ``perp_tol`` of the
    line contribute — otherwise a large plane would stretch the intersection
    far outside the real roof feature. The segment endpoints are the
    5th / 95th percentile of projected scalars taken over the union of the
    two filtered sets.
    """
    endpoints_s = []
    for plane in (p1, p2):
        rel = plane.inliers - point
        s = rel @ direction
        perp = rel - s[:, None] * direction
        d_perp = np.linalg.norm(perp, axis=1)
        near = s[d_perp <= perp_tol]
        if len(near) >= 5:
            endpoints_s.append(near)
    if not endpoints_s:
        return None
    all_s = np.concatenate(endpoints_s)
    if len(all_s) < 5:
        return None
    lo, hi = np.percentile(all_s, [trim_pct, 100.0 - trim_pct])
    if hi - lo < min_length:
        return None
    a = point + lo * direction
    b = point + hi * direction
    return a, b


# ---------------------------------------------------------------------------
# T2.4 Triple-plane corners + vertex dedup
# ---------------------------------------------------------------------------

def _triple_plane_corners(
    planes: list[RoofPlane], max_dist_to_inlier: float = 1.0,
) -> list[np.ndarray]:
    """Solve the 3x3 linear system for every non-collinear triple.

    A corner is kept only if every one of the three parent planes has at
    least one inlier within ``max_dist_to_inlier`` of the computed point,
    which removes ghost intersections far outside the roof.
    """
    out: list[np.ndarray] = []
    n = len(planes)
    for i in range(n):
        for j in range(i + 1, n):
            for k in range(j + 1, n):
                A = np.vstack([planes[i].normal, planes[j].normal, planes[k].normal])
                if abs(float(np.linalg.det(A))) < 1e-3:
                    continue
                b = -np.array([planes[i].d, planes[j].d, planes[k].d])
                try:
                    X = np.linalg.solve(A, b)
                except np.linalg.LinAlgError:
                    continue
                ok = True
                for p in (planes[i], planes[j], planes[k]):
                    if np.linalg.norm(p.inliers - X, axis=1).min() > max_dist_to_inlier:
                        ok = False
                        break
                if ok:
                    out.append(X)
    return out


def _merge_points(points: np.ndarray, radius: float) -> tuple[np.ndarray, np.ndarray]:
    """Greedy dedup by nearest-cluster assignment."""
    pts = np.asarray(points, dtype=np.float64)
    if len(pts) == 0:
        return np.empty((0, 3)), np.empty((0,), dtype=np.int64)
    mapping = np.full(len(pts), -1, dtype=np.int64)
    clusters: list[list[int]] = []
    centroids: list[np.ndarray] = []
    for i, p in enumerate(pts):
        if not centroids:
            clusters.append([i])
            centroids.append(p.copy())
            mapping[i] = 0
            continue
        c_arr = np.array(centroids)
        d = np.linalg.norm(c_arr - p, axis=1)
        j = int(np.argmin(d))
        if d[j] <= radius:
            clusters[j].append(i)
            centroids[j] = pts[clusters[j]].mean(axis=0)
            mapping[i] = j
        else:
            clusters.append([i])
            centroids.append(p.copy())
            mapping[i] = len(centroids) - 1
    merged = np.array(centroids, dtype=np.float64)
    return merged, mapping


# ---------------------------------------------------------------------------
# T2.7 Hybrid integration helpers: snap intersection lines to existing
# sklearn-derived vertices.
# ---------------------------------------------------------------------------

def edges_from_planes_and_vertices(
    vertices: np.ndarray,
    planes: list[RoofPlane],
    perp_tol: float = 0.6,
    min_length: float = 0.5,
    max_length: float = 10.0,
) -> list[tuple[int, int]]:
    """Vote edges between vertices using plane-pair intersection lines.

    For each line ``L_ij = plane_i ∩ plane_j``:
      * find all ``vertices`` whose perpendicular distance to L_ij is
        below ``perp_tol``,
      * pair the two extremes along the line direction as an edge.

    The result is a set of 3D edges supported by plane geometry. Because
    the vertices come from sklearn's depth-based detection, positions are
    noisy but complete — while the lines come from RANSAC on thousands
    of COLMAP points and are very accurate in direction. Matching the two
    gives clean roof ridges / eaves without depending on 2D fitLine noise.
    """
    if len(vertices) < 2 or len(planes) < 2:
        return []
    V = np.asarray(vertices, dtype=np.float64)
    edges: set[tuple[int, int]] = set()

    for i in range(len(planes)):
        for j in range(i + 1, len(planes)):
            inter = intersect_two_planes(planes[i], planes[j])
            if inter is None:
                continue
            point, direction = inter
            rel = V - point
            s = rel @ direction
            perp = rel - s[:, None] * direction
            d_perp = np.linalg.norm(perp, axis=1)
            near_idx = np.where(d_perp <= perp_tol)[0]
            if len(near_idx) < 2:
                continue
            # Take the two vertices with the most extreme projections
            s_near = s[near_idx]
            a = int(near_idx[np.argmin(s_near)])
            b = int(near_idx[np.argmax(s_near)])
            if a == b:
                continue
            dist3d = float(np.linalg.norm(V[a] - V[b]))
            if dist3d < min_length or dist3d > max_length:
                continue
            lo, hi = (a, b) if a < b else (b, a)
            edges.add((lo, hi))

            # Additionally, for each adjacent pair of projections along the
            # line, add them as an edge if the 3D distance is reasonable.
            order = np.argsort(s[near_idx])
            sorted_idx = near_idx[order]
            for k in range(len(sorted_idx) - 1):
                x = int(sorted_idx[k])
                y = int(sorted_idx[k + 1])
                d = float(np.linalg.norm(V[x] - V[y]))
                if d < min_length or d > max_length:
                    continue
                lo, hi = (x, y) if x < y else (y, x)
                edges.add((lo, hi))

    return list(edges)


def predict_plane_edges(entry, vertices: np.ndarray,
                         distance_threshold: float = 0.20,
                         min_inliers: int = 60,
                         max_planes: int = 10,
                         roof_crop_top_frac: float = 0.95,
                         perp_tol: float = 0.8,
                         ) -> list[tuple[int, int]]:
    """High-level helper: given a sklearn wireframe's vertices, return a
    list of extra edges supported by plane-pair intersection geometry.
    """
    good = convert_entry_to_human_readable(entry)
    colmap_rec = good.get("colmap") or good.get("colmap_binary")
    if colmap_rec is None:
        return []
    all_xyz = np.array([p.xyz for p in colmap_rec.points3D.values()], dtype=np.float64)
    if len(all_xyz) < min_inliers * 2:
        return []
    planes = segment_roof_planes(
        all_xyz,
        distance_threshold=distance_threshold,
        min_inliers=min_inliers,
        max_planes=max_planes,
        roof_crop_top_frac=roof_crop_top_frac,
    )
    if len(planes) < 2:
        return []
    return edges_from_planes_and_vertices(vertices, planes, perp_tol=perp_tol)


# ---------------------------------------------------------------------------
# T2.6 Standalone predictor
# ---------------------------------------------------------------------------

def predict_wireframe_planes(
    entry,
    distance_threshold: float = 0.15,
    min_inliers: int = 60,
    max_planes: int = 8,
    perp_tol: float = 0.4,
    merge_radius: float = 0.35,
    roof_crop_top_frac: float = 0.55,
) -> tuple[np.ndarray, list[tuple[int, int]]]:
    """Build a wireframe from COLMAP sparse points via plane intersection."""
    good = convert_entry_to_human_readable(entry)
    colmap_rec = good.get("colmap") or good.get("colmap_binary")
    if colmap_rec is None:
        return empty_solution()

    all_xyz = np.array([p.xyz for p in colmap_rec.points3D.values()], dtype=np.float64)
    if len(all_xyz) < min_inliers * 2:
        return empty_solution()

    planes = segment_roof_planes(
        all_xyz,
        distance_threshold=distance_threshold,
        min_inliers=min_inliers,
        max_planes=max_planes,
        roof_crop_top_frac=roof_crop_top_frac,
    )
    if len(planes) < 2:
        return empty_solution()

    endpoint_pool: list[np.ndarray] = []
    segments: list[tuple[int, int]] = []
    for i in range(len(planes)):
        for j in range(i + 1, len(planes)):
            inter = intersect_two_planes(planes[i], planes[j])
            if inter is None:
                continue
            point, direction = inter
            seg = clip_line_to_segment(
                point, direction, planes[i], planes[j], perp_tol=perp_tol
            )
            if seg is None:
                continue
            a, b = seg
            ia = len(endpoint_pool)
            endpoint_pool.append(a)
            ib = len(endpoint_pool)
            endpoint_pool.append(b)
            segments.append((ia, ib))

    if not segments:
        return empty_solution()

    corners = _triple_plane_corners(planes)
    endpoint_pool.extend(corners)

    all_pts = np.asarray(endpoint_pool, dtype=np.float64)
    merged, mapping = _merge_points(all_pts, radius=merge_radius)

    edge_set: set[tuple[int, int]] = set()
    for ia, ib in segments:
        ma = int(mapping[ia])
        mb = int(mapping[ib])
        if ma == mb:
            continue
        lo, hi = (ma, mb) if ma < mb else (mb, ma)
        edge_set.add((lo, hi))

    if not edge_set or len(merged) < 2:
        return empty_solution()

    return merged, [(int(a), int(b)) for a, b in edge_set]