| """Sklearn edge classifier + edge validation for submission — self-contained.""" |
|
|
| import numpy as np |
| import cv2 |
| from typing import Tuple, List |
| import sys |
| from pathlib import Path |
|
|
| _cur_dir = str(Path(__file__).parent.absolute()) |
| if _cur_dir not in sys.path: |
| sys.path.insert(0, _cur_dir) |
|
|
| from hoho2025.example_solutions import ( |
| convert_entry_to_human_readable, empty_solution, |
| filter_vertices_by_background, |
| get_sparse_depth, get_house_mask, get_uv_depth, |
| project_vertices_to_3d, merge_vertices_3d, |
| prune_not_connected, prune_too_far, point_to_segment_dist, |
| ) |
| from hoho2025.color_mappings import gestalt_color_mapping |
|
|
| try: |
| from junction import apply_junction_constraints |
| except ImportError: |
| from submission.junction import apply_junction_constraints |
|
|
| try: |
| from triangulation import predict_wireframe_tracks, get_high_confidence_tracks |
| _TRIANGULATION_OK = True |
| except Exception: |
| try: |
| from submission.triangulation import predict_wireframe_tracks, get_high_confidence_tracks |
| _TRIANGULATION_OK = True |
| except Exception: |
| _TRIANGULATION_OK = False |
|
|
| try: |
| from bundle_adjust import refine_vertices_ba |
| _BA_OK = True |
| except Exception: |
| try: |
| from submission.bundle_adjust import refine_vertices_ba |
| _BA_OK = True |
| except Exception: |
| _BA_OK = False |
|
|
| try: |
| from line_cloud import line_based_vertices |
| _LINECLOUD_OK = True |
| except Exception: |
| try: |
| from submission.line_cloud import line_based_vertices |
| _LINECLOUD_OK = True |
| except Exception: |
| _LINECLOUD_OK = False |
|
|
| |
| |
| USE_BUNDLE_ADJUST = False |
|
|
| |
| |
| |
| |
| |
| |
| USE_LINE_EDGES = True |
| |
| |
| |
| |
| |
| LINE_EDGE_MATCH_RADIUS = 0.8 |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| BYPASS_VALIDATE_FOR_TRACKS = True |
| BYPASS_VALIDATE_FOR_LINES = True |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| USE_DGCNN_REFINEMENT = False |
| DGCNN_CLS_THRESHOLD = 0.5 |
| DGCNN_DEDUP_RADIUS = 0.5 |
| DGCNN_REPLACE_RADIUS = 0.0 |
| DGCNN_MAX_DIST_TO_CLOUD = 5.0 |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| USE_DGCNN_EDGES = False |
| |
| |
| |
| DGCNN_EDGE_THRESHOLD = 0.60 |
| DGCNN_EDGE_STRONG_THRESHOLD = 0.70 |
| DGCNN_EDGE_VERY_STRONG_THRESHOLD = 0.85 |
| DGCNN_EDGE_MAX_LENGTH = 6.0 |
| DGCNN_EDGE_MAX_PER_VERTEX = 1 |
| DGCNN_EDGE_REPROJ_DILATE_PX = 6 |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| USE_WINNER_CANDIDATES = True |
| WINNER_DEDUP_RADIUS = 0.5 |
| WINNER_MAX_DIST_TO_CLOUD = 8.0 |
|
|
| |
| |
| |
| |
| |
| |
| USE_DEPTH_EDGES = False |
| DEPTH_EDGE_MATCH_RADIUS = 0.8 |
|
|
| |
| USE_RERANK = True |
| RERANK_BOOST_LINE = 0.20 |
| RERANK_BOOST_TRACK = 0.25 |
|
|
| try: |
| from plane_wireframe import predict_plane_edges |
| _PLANES_OK = True |
| except Exception: |
| try: |
| from submission.plane_wireframe import predict_plane_edges |
| _PLANES_OK = True |
| except Exception: |
| _PLANES_OK = False |
|
|
| try: |
| from depth_edges import extract_and_merge_depth_lines |
| _DEPTH_EDGES_OK = True |
| except Exception: |
| try: |
| from submission.depth_edges import extract_and_merge_depth_lines |
| _DEPTH_EDGES_OK = True |
| except Exception: |
| _DEPTH_EDGES_OK = False |
|
|
| try: |
| from winner_candidates import generate_winner_candidates |
| _WINNER_OK = True |
| except Exception: |
| try: |
| from submission.winner_candidates import generate_winner_candidates |
| _WINNER_OK = True |
| except Exception: |
| _WINNER_OK = False |
|
|
| |
| _DGCNN_VERTEX_MODEL = None |
| _DGCNN_VERTEX_TRIED = False |
|
|
|
|
| _DGCNN_EDGE_MODEL = None |
| _DGCNN_EDGE_TRIED = False |
|
|
|
|
| def _get_dgcnn_edge_model(): |
| global _DGCNN_EDGE_MODEL, _DGCNN_EDGE_TRIED |
| if _DGCNN_EDGE_TRIED: |
| return _DGCNN_EDGE_MODEL |
| _DGCNN_EDGE_TRIED = True |
| try: |
| from winner_inference import load_edge_model |
| except Exception: |
| try: |
| from submission.winner_inference import load_edge_model |
| except Exception: |
| return None |
| try: |
| import torch as _torch |
| device = "cuda" if _torch.cuda.is_available() else "cpu" |
| except Exception: |
| device = "cpu" |
| import os |
| model_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "edge_model_dgcnn.pt") |
| _DGCNN_EDGE_MODEL = load_edge_model(model_path, device=device) |
| return _DGCNN_EDGE_MODEL |
|
|
|
|
| def _get_dgcnn_vertex_model(): |
| global _DGCNN_VERTEX_MODEL, _DGCNN_VERTEX_TRIED |
| if _DGCNN_VERTEX_TRIED: |
| return _DGCNN_VERTEX_MODEL |
| _DGCNN_VERTEX_TRIED = True |
| try: |
| from winner_inference import load_vertex_model |
| except Exception: |
| try: |
| from submission.winner_inference import load_vertex_model |
| except Exception: |
| return None |
| import os as _os |
| device = "cuda" if _os.environ.get("CUDA_VISIBLE_DEVICES") != "" else "cuda" |
| try: |
| import torch as _torch |
| device = "cuda" if _torch.cuda.is_available() else "cpu" |
| except Exception: |
| device = "cpu" |
| import os |
| model_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "vertex_model_dgcnn.pt") |
| _DGCNN_VERTEX_MODEL = load_vertex_model(model_path, device=device) |
| return _DGCNN_VERTEX_MODEL |
|
|
| |
| |
| |
| |
| USE_TRACK_ENSEMBLE = True |
| ENSEMBLE_MATCH_RADIUS = 0.5 |
|
|
| |
| |
| |
| ADD_ISOLATED_TRACK_VERTICES = True |
| ISOLATED_TRACK_MIN_DIST = 0.8 |
| ISOLATED_TRACK_MAX_DIST = 3.5 |
|
|
| |
| |
| |
| |
| |
| |
| USE_TRACKS_AS_VERTICES = True |
| TRACK_MIN_VIEWS = 2 |
| TRACK_MAX_REPROJ_PX = 2.0 |
| TRACK_REPLACE_RADIUS = 0.6 |
| TRACK_ADD_MAX_RADIUS = 2.0 |
| TRACK_ADD_MIN_RADIUS = 0.6 |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| USE_REPROJECTION_EDGE_VAL = False |
| REPROJ_MIN_VIEWS = 2 |
| REPROJ_MIN_HIT_FRAC = 0.5 |
| REPROJ_MASK_DILATE_PX = 3 |
|
|
| |
| |
| |
| USE_PLANE_EDGES = False |
| PLANE_PERP_TOL = 0.8 |
|
|
|
|
| def _refine_centroids_subpix(gest_seg_np, centroids, max_shift=4.0, win=5): |
| """Run cv2.cornerSubPix on the grayscale gestalt image, seeded at centroids. |
| |
| Apex blobs sit at junctions where multiple coloured edge classes meet; in |
| the grayscale view that shows up as a real corner pattern. We feed the |
| centroid as a starting point, refine, and reject any refinement whose |
| displacement from the centroid exceeds ``max_shift`` pixels (likely |
| divergence to an unrelated texture). |
| """ |
| if len(centroids) == 0: |
| return centroids |
| gray = cv2.cvtColor(gest_seg_np, cv2.COLOR_RGB2GRAY) |
| gray = cv2.GaussianBlur(gray, (3, 3), 0) |
| pts = np.asarray(centroids, dtype=np.float32).reshape(-1, 1, 2).copy() |
| criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.01) |
| try: |
| refined = cv2.cornerSubPix(gray, pts, (win, win), (-1, -1), criteria) |
| except cv2.error: |
| return centroids |
| refined = refined.reshape(-1, 2) |
| orig = np.asarray(centroids, dtype=np.float32) |
| shifts = np.linalg.norm(refined - orig, axis=1) |
| mask = shifts <= max_shift |
| out = orig.copy() |
| out[mask] = refined[mask] |
| return out |
|
|
|
|
| def get_vertices_and_edges_improved(gest_seg_np, edge_th=15.0, refine_subpix=True): |
| vertices = [] |
| for v_class in ['apex', 'eave_end_point', 'flashing_end_point']: |
| color = np.array(gestalt_color_mapping[v_class]) |
| mask = cv2.inRange(gest_seg_np, color - 0.5, color + 0.5) |
| if mask.sum() == 0: |
| continue |
| _, _, _, centroids = cv2.connectedComponentsWithStats(mask, 8, cv2.CV_32S) |
| blob_centroids = centroids[1:] |
| if refine_subpix and len(blob_centroids) > 0: |
| blob_centroids = _refine_centroids_subpix(gest_seg_np, blob_centroids) |
| for centroid in blob_centroids: |
| vertices.append({"xy": np.asarray(centroid, dtype=np.float32), "type": v_class}) |
| apex_pts = np.array([v['xy'] for v in vertices]) if vertices else np.empty((0, 2)) |
| connections = [] |
| for edge_class in ['eave', 'ridge', 'rake', 'valley', 'hip']: |
| edge_color = np.array(gestalt_color_mapping[edge_class]) |
| mask_raw = cv2.inRange(gest_seg_np, edge_color - 0.5, edge_color + 0.5) |
| mask = cv2.morphologyEx(mask_raw, cv2.MORPH_CLOSE, np.ones((5, 5), np.uint8)) |
| if mask.sum() == 0: |
| continue |
| _, labels, _, _ = cv2.connectedComponentsWithStats(mask, 8, cv2.CV_32S) |
| for lbl in range(1, labels.max() + 1): |
| ys, xs = np.where(labels == lbl) |
| if len(xs) < 2: |
| continue |
| pts = np.column_stack([xs, ys]).astype(np.float32) |
| line_params = cv2.fitLine(pts, cv2.DIST_L2, 0, 0.01, 0.01) |
| vx, vy, x0, y0 = line_params.ravel() |
| proj = (xs - x0) * vx + (ys - y0) * vy |
| p1 = np.array([x0 + proj.min() * vx, y0 + proj.min() * vy]) |
| p2 = np.array([x0 + proj.max() * vx, y0 + proj.max() * vy]) |
| if len(apex_pts) < 2: |
| continue |
| dists = np.array([point_to_segment_dist(apex_pts[i], p1, p2) for i in range(len(apex_pts))]) |
| near = np.where(dists <= edge_th)[0] |
| if len(near) < 2: |
| continue |
| near_pts = apex_pts[near] |
| a = near[np.argmin(np.linalg.norm(near_pts - p1, axis=1))] |
| b = near[np.argmin(np.linalg.norm(near_pts - p2, axis=1))] |
| if a != b: |
| connections.append(tuple(sorted((a, b)))) |
| return vertices, connections |
|
|
|
|
| def fit_affine_ransac(depth, sparse_depth, validity_mask=None, n_iter=200, inlier_th=0.3): |
| """Fit affine depth correction: depth_corrected = alpha * depth + beta. |
| |
| Scale+shift (2 DOF) is more accurate than scale-only when MoGe has systematic offset. |
| Falls back to scale-only if not enough sparse points for 2-parameter fit. |
| """ |
| mask = (sparse_depth > 0) if validity_mask is None else (sparse_depth > 0) & validity_mask |
| mask = mask & (depth < 50) & (sparse_depth < 50) & (depth > 0) |
| X, Y = depth[mask], sparse_depth[mask] |
| if len(X) < 5: |
| if len(X) == 0 or np.all(X == 0): |
| return 1.0, 0.0, depth |
| alpha = float(np.median(Y / X)) |
| return alpha, 0.0, alpha * depth |
| if len(X) < 10: |
| |
| alpha = float(np.median(Y / X)) |
| return alpha, 0.0, alpha * depth |
|
|
| |
| best_alpha, best_beta, best_n = float(np.median(Y / X)), 0.0, 0 |
|
|
| for _ in range(n_iter): |
| idx = np.random.choice(len(X), 2, replace=False) |
| x1, x2 = X[idx[0]], X[idx[1]] |
| y1, y2 = Y[idx[0]], Y[idx[1]] |
| if abs(x1 - x2) < 1e-6: |
| continue |
| alpha = (y1 - y2) / (x1 - x2) |
| beta = y1 - alpha * x1 |
| if alpha <= 0.05 or alpha > 20.0: |
| continue |
| residuals = np.abs(alpha * X + beta - Y) |
| n_inliers = (residuals < inlier_th).sum() |
| if n_inliers > best_n: |
| best_n = n_inliers |
| inlier_mask = residuals < inlier_th |
| |
| Xi, Yi = X[inlier_mask], Y[inlier_mask] |
| A = np.column_stack([Xi, np.ones_like(Xi)]) |
| try: |
| result = np.linalg.lstsq(A, Yi, rcond=None)[0] |
| if result[0] > 0.05: |
| best_alpha, best_beta = float(result[0]), float(result[1]) |
| except Exception: |
| best_alpha, best_beta = alpha, beta |
|
|
| corrected = np.clip(best_alpha * depth + best_beta, 0.1, 100.0) |
| return best_alpha, best_beta, corrected |
|
|
|
|
| def fit_scale_ransac(depth, sparse_depth, validity_mask=None, n_iter=100, inlier_th=0.3): |
| """Legacy scale-only fitting. Use fit_affine_ransac for better accuracy.""" |
| _, _, corrected = fit_affine_ransac(depth, sparse_depth, validity_mask, n_iter, inlier_th) |
| return None, corrected |
|
|
|
|
| EDGE_CLASSES_FOR_VAL = ['eave', 'ridge', 'rake', 'valley', 'hip'] |
|
|
|
|
| def _build_gestalt_edge_masks(entry, dilate_px: int = 3): |
| """Build a ``dict[image_id → (H, W) uint8]`` of gestalt edge masks. |
| |
| Each mask is the union of all configured edge classes' pixels, dilated |
| by ``dilate_px`` so that a sub-pixel reprojection line can still land |
| on an edge pixel despite rendering / quantisation noise. |
| |
| Returns ``(masks, views)``: |
| masks : dict[image_id → (H, W) bool] |
| views : dict[image_id → mvs_utils.ViewInfo] for projection. |
| """ |
| try: |
| from hoho2025.example_solutions import convert_entry_to_human_readable as _conv |
| from hoho2025.color_mappings import gestalt_color_mapping as _gcm |
| except Exception: |
| return {}, {} |
|
|
| try: |
| from mvs_utils import collect_views as _cv |
| except Exception: |
| try: |
| from submission.mvs_utils import collect_views as _cv |
| except Exception: |
| return {}, {} |
|
|
| good = _conv(entry) |
| colmap_rec = good.get('colmap') or good.get('colmap_binary') |
| if colmap_rec is None: |
| return {}, {} |
|
|
| views = _cv(colmap_rec, good['image_ids']) |
| masks: dict[str, np.ndarray] = {} |
|
|
| kernel = None |
| if dilate_px > 0: |
| k = 2 * dilate_px + 1 |
| kernel = np.ones((k, k), np.uint8) |
|
|
| for gest, img_id in zip(good['gestalt'], good['image_ids']): |
| if img_id not in views: |
| continue |
| info = views[img_id] |
| W, H = info['width'], info['height'] |
| gest_np = np.array(gest.resize((W, H))).astype(np.uint8) |
| union_mask = np.zeros((H, W), dtype=np.uint8) |
| for ecls in EDGE_CLASSES_FOR_VAL: |
| color = np.array(_gcm[ecls]) |
| m = cv2.inRange(gest_np, color - 0.5, color + 0.5) |
| if m.sum(): |
| union_mask = np.maximum(union_mask, m) |
| if kernel is not None and union_mask.sum(): |
| union_mask = cv2.dilate(union_mask, kernel, iterations=1) |
| masks[img_id] = union_mask > 0 |
|
|
| return masks, views |
|
|
|
|
| def validate_edge_reprojection( |
| v1: np.ndarray, v2: np.ndarray, |
| masks: dict, views: dict, |
| n_samples: int = 20, |
| min_views: int = 2, |
| min_hit_frac: float = 0.4, |
| ) -> bool: |
| """Check that the edge's projection lies on gestalt edge pixels in at |
| least ``min_views`` views, with ≥ ``min_hit_frac`` of sampled points |
| landing on an edge pixel. |
| |
| If no masks at all are available (e.g. entry lacks gestalt images), |
| the check returns True so it never blocks the pipeline. |
| """ |
| if not masks or not views: |
| return True |
| t = np.linspace(0.0, 1.0, n_samples) |
| samples = v1 + t[:, None] * (v2 - v1) |
| ok_views = 0 |
| for img_id, mask in masks.items(): |
| info = views.get(img_id) |
| if info is None: |
| continue |
| P = info['P'] |
| H, W = mask.shape |
| homog = np.hstack([samples, np.ones((len(samples), 1))]) |
| proj = homog @ P.T |
| z = proj[:, 2] |
| if np.any(z <= 1e-6): |
| continue |
| uv = proj[:, :2] / z[:, None] |
| u = np.round(uv[:, 0]).astype(np.int64) |
| vv = np.round(uv[:, 1]).astype(np.int64) |
| in_bounds = (u >= 0) & (u < W) & (vv >= 0) & (vv < H) |
| if not np.any(in_bounds): |
| continue |
| u_in = u[in_bounds] |
| v_in = vv[in_bounds] |
| hits = mask[v_in, u_in] |
| hit_frac = float(hits.sum()) / max(1, int(in_bounds.sum())) |
| if hit_frac >= min_hit_frac: |
| ok_views += 1 |
| if ok_views >= min_views: |
| return True |
| return ok_views >= min_views |
|
|
|
|
| def _passes_dgcnn_edge_gates( |
| v1: np.ndarray, |
| v2: np.ndarray, |
| prob: float, |
| all_xyz: np.ndarray, |
| kd_tree=None, |
| masks: dict | None = None, |
| views: dict | None = None, |
| ) -> bool: |
| """Conservative accept rule for learned edge candidates. |
| |
| The DGCNN classifier is useful for recall, but raw learned edges can hurt |
| IoU if accepted without geometry. Strong candidates need COLMAP support; |
| very strong candidates may pass with looser sparse support; medium |
| candidates must also reproject onto gestalt edge pixels. |
| """ |
| length = float(np.linalg.norm(v2 - v1)) |
| if length < 0.25 or length > DGCNN_EDGE_MAX_LENGTH: |
| return False |
|
|
| strong_support = validate_edge( |
| v1, v2, all_xyz, kd_tree, |
| n_samples=24, radius=0.45, min_ratio=0.55, |
| ) |
| if prob >= DGCNN_EDGE_STRONG_THRESHOLD and strong_support: |
| return True |
|
|
| loose_support = validate_edge( |
| v1, v2, all_xyz, kd_tree, |
| n_samples=24, radius=0.60, min_ratio=0.35, |
| ) |
| if prob >= DGCNN_EDGE_VERY_STRONG_THRESHOLD and loose_support: |
| return True |
|
|
| if prob >= DGCNN_EDGE_STRONG_THRESHOLD and loose_support and masks and views: |
| return validate_edge_reprojection( |
| v1, v2, masks, views, |
| n_samples=24, min_views=1, min_hit_frac=0.35, |
| ) |
|
|
| return False |
|
|
|
|
| def _select_dgcnn_edges( |
| final_v: np.ndarray, |
| final_e: list, |
| dgcnn_edges: list, |
| all_xyz: np.ndarray, |
| kd_tree=None, |
| masks: dict | None = None, |
| views: dict | None = None, |
| ) -> list[tuple[int, int]]: |
| """Filter and degree-cap DGCNN edge proposals. |
| |
| Existing edges are never removed here. At most |
| ``DGCNN_EDGE_MAX_PER_VERTEX`` learned edges are added at each vertex, |
| prioritising higher classifier probabilities. |
| """ |
| existing = {tuple(sorted(e)) for e in final_e} |
| candidates = [] |
| for i, j, prob in dgcnn_edges: |
| lo, hi = (int(i), int(j)) if i < j else (int(j), int(i)) |
| if lo == hi or (lo, hi) in existing: |
| continue |
| prob = float(prob) |
| if _passes_dgcnn_edge_gates( |
| final_v[lo], final_v[hi], prob, |
| all_xyz, kd_tree, masks=masks, views=views, |
| ): |
| candidates.append((prob, lo, hi)) |
|
|
| candidates.sort(reverse=True) |
| added_per_vertex = np.zeros(len(final_v), dtype=np.int32) |
| accepted: list[tuple[int, int]] = [] |
| accepted_set = set() |
| for prob, lo, hi in candidates: |
| if (lo, hi) in accepted_set: |
| continue |
| if (added_per_vertex[lo] >= DGCNN_EDGE_MAX_PER_VERTEX |
| or added_per_vertex[hi] >= DGCNN_EDGE_MAX_PER_VERTEX): |
| continue |
| accepted.append((lo, hi)) |
| accepted_set.add((lo, hi)) |
| added_per_vertex[lo] += 1 |
| added_per_vertex[hi] += 1 |
| return accepted |
|
|
|
|
| def validate_edge(v1, v2, all_xyz, kd_tree=None, n_samples=20, radius=0.35, min_ratio=0.70): |
| """Check if edge v1→v2 is supported by COLMAP point cloud. |
| |
| Uses KD-tree for O(N log N) queries instead of O(N*n_samples). |
| |
| History of this parameter: |
| v4: loose (n=10, r=0.5, mr=0.4) public 0.3815 |
| v6: tight (n=20, r=0.35, mr=0.7) public 0.3559 → regression! |
| v7: tight (same) + tracks ensemble public 0.4095 → big win |
| v9: loose (reverted, by mistake) + tracks public 0.3832 → regression |
| v10 (current): tight restored → target paritet with v7 at 0.4095 |
| |
| The tight validate_edge is ONLY good in combination with the multi-view |
| tracks ensemble. Alone (v6) it removes too many real edges and loses |
| IoU. With tracks ensemble adding complementary edges, the tight filter |
| becomes a net win. Do not revert without also removing the tracks |
| ensemble. |
| """ |
| if len(all_xyz) == 0: |
| return True |
| t = np.linspace(0, 1, n_samples) |
| samples = v1 + t[:, None] * (v2 - v1) |
| if kd_tree is not None: |
| dists, _ = kd_tree.query(samples, k=1) |
| supported = (dists <= radius).sum() |
| else: |
| supported = sum(1 for s in samples if np.linalg.norm(all_xyz - s, axis=1).min() <= radius) |
| return supported / n_samples >= min_ratio |
|
|
|
|
| def extract_edge_features(v1, v2, all_xyz, gestalt_support=0, n_views=0, |
| line_support=None, track_support=None): |
| """Build the per-pair edge feature vector. |
| |
| By default returns the original 15-D vector (v1 sklearn model). |
| If either ``line_support`` or ``track_support`` is supplied, returns |
| a 17-D vector compatible with the v2 sklearn model. |
| """ |
| diff = v2 - v1 |
| dist = np.linalg.norm(diff) |
| mid = (v1 + v2) / 2.0 |
| h_diff = abs(diff[2]) |
| h_dist = np.linalg.norm(diff[:2]) |
| slope = np.arctan2(h_diff, h_dist + 1e-6) |
| if len(all_xyz) > 0 and dist > 0.01: |
| edge_dir = diff / dist |
| rel = all_xyz - v1 |
| proj = rel @ edge_dir |
| perp = np.linalg.norm(rel - proj[:, None] * edge_dir, axis=1) |
| in_cyl = (proj >= -0.5) & (proj <= dist + 0.5) & (perp <= 0.5) |
| n_along = in_cyl.sum() |
| n_mid = (np.linalg.norm(all_xyz - mid, axis=1) <= 1.0).sum() |
| density = n_along / max(dist, 0.01) |
| else: |
| n_along, n_mid, density = 0, 0, 0 |
| base = [dist, h_diff, h_dist, slope, n_along, n_mid, density, |
| gestalt_support, n_views, 0, 0, 0, 0, v1[2], v2[2]] |
| if line_support is not None or track_support is not None: |
| base.append(int(line_support or 0)) |
| base.append(int(track_support or 0)) |
| return np.array(base, dtype=np.float32) |
|
|
|
|
| def _line_support_for_edge(v1, v2, lines, perp_tol=0.5, min_overlap=0.5): |
| """1 if any 3D line in ``lines`` runs alongside the (v1, v2) edge. |
| |
| Both line endpoints must lie within ``perp_tol`` perpendicular distance |
| of the edge's infinite line, AND the projection overlap must be at |
| least ``min_overlap`` × edge length. |
| """ |
| if not lines: |
| return 0 |
| edge_dir = v2 - v1 |
| edge_len = float(np.linalg.norm(edge_dir)) |
| if edge_len < 0.05: |
| return 0 |
| edge_dir = edge_dir / edge_len |
| for ln in lines: |
| s1 = float(np.dot(ln.p1 - v1, edge_dir)) |
| s2 = float(np.dot(ln.p2 - v1, edge_dir)) |
| perp1 = ln.p1 - v1 - s1 * edge_dir |
| perp2 = ln.p2 - v1 - s2 * edge_dir |
| if np.linalg.norm(perp1) > perp_tol or np.linalg.norm(perp2) > perp_tol: |
| continue |
| lo = max(0.0, min(s1, s2)) |
| hi = min(edge_len, max(s1, s2)) |
| if hi - lo >= min_overlap * edge_len: |
| return 1 |
| return 0 |
|
|
|
|
| def _lift_track_edges_to_merged_v(tracks, t_edges, merged_v, match_radius=0.5): |
| """Map per-track edge votes onto pairs of merged_v indices.""" |
| if not tracks or not t_edges or len(merged_v) == 0: |
| return set() |
| track_xyz = np.array([t.xyz for t in tracks], dtype=np.float64) |
| from scipy.spatial import cKDTree |
| tree = cKDTree(merged_v) |
| track_to_merged = {} |
| for ti in range(len(tracks)): |
| d, j = tree.query(track_xyz[ti]) |
| if d <= match_radius: |
| track_to_merged[ti] = int(j) |
| out = set() |
| for ti, tj, _votes in t_edges: |
| a = track_to_merged.get(ti) |
| b = track_to_merged.get(tj) |
| if a is None or b is None or a == b: |
| continue |
| out.add((a, b) if a < b else (b, a)) |
| return out |
|
|
|
|
| def predict_wireframe_sklearn(entry, sklearn_model=None, edge_threshold=0.45): |
| good = convert_entry_to_human_readable(entry) |
| colmap_rec = good.get('colmap', good.get('colmap_binary')) |
|
|
| vert_edge_per_image = {} |
| for i, (gest, depth, img_id, ade_seg) in enumerate(zip( |
| good['gestalt'], good['depth'], good['image_ids'], good['ade'] |
| )): |
| depth_size = (np.array(depth).shape[1], np.array(depth).shape[0]) |
| gest_np = np.array(gest.resize(depth_size)).astype(np.uint8) |
| verts, conns = get_vertices_and_edges_improved(gest_np, edge_th=15.0) |
| ade_np = np.array(ade_seg.resize(depth_size)).astype(np.uint8) |
| verts, conns = filter_vertices_by_background(verts, conns, ade_np) |
| if len(verts) < 2 or len(conns) < 1: |
| vert_edge_per_image[i] = [], [], np.empty((0, 3)) |
| continue |
| depth_np = np.array(depth) / 1000.0 |
| depth_sparse, found, col_img, proj_pts = get_sparse_depth(colmap_rec, img_id, depth_np) |
| if found: |
| _, _, depth_fitted = fit_affine_ransac(depth_np, depth_sparse, get_house_mask(ade_seg)) |
| else: |
| depth_fitted = depth_np |
| uv, dv = get_uv_depth(verts, depth_fitted, |
| depth_sparse if found else np.zeros_like(depth_np), |
| search_radius=10, proj_pts=proj_pts) |
| v3d = project_vertices_to_3d(uv, dv, col_img, colmap_rec=colmap_rec) |
| vert_edge_per_image[i] = verts, conns, v3d |
|
|
| if not any(len(v[0]) > 0 for v in vert_edge_per_image.values()): |
| return empty_solution() |
|
|
| merged_v, heur_edges, vertex_views, _ = merge_vertices_3d(vert_edge_per_image, 0.8) |
| merged_v, heur_edges = prune_too_far(merged_v, heur_edges, colmap_rec, th=5.0) |
| if len(merged_v) < 2: |
| return empty_solution() |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if USE_TRACKS_AS_VERTICES and _TRIANGULATION_OK and len(merged_v) >= 1: |
| try: |
| hc_tracks = get_high_confidence_tracks( |
| entry, |
| min_views=TRACK_MIN_VIEWS, |
| max_reproj_px=TRACK_MAX_REPROJ_PX, |
| ) |
| if hc_tracks: |
| from scipy.spatial import cKDTree as _cKD13 |
| tree13 = _cKD13(merged_v) |
| added = [] |
| replaced_set = set() |
| for t in hc_tracks: |
| d, j = tree13.query(t.xyz, k=1) |
| if d <= TRACK_REPLACE_RADIUS: |
| if j in replaced_set: |
| continue |
| merged_v[j] = t.xyz |
| replaced_set.add(int(j)) |
| elif TRACK_ADD_MIN_RADIUS < d <= TRACK_ADD_MAX_RADIUS: |
| added.append(t.xyz) |
| if added: |
| merged_v = np.vstack([merged_v, np.asarray(added, dtype=np.float64)]) |
| |
| vertex_views = list(vertex_views) + [0] * len(added) |
| except Exception: |
| pass |
|
|
| |
| |
| |
| if USE_DGCNN_REFINEMENT: |
| try: |
| from s23dr.data_prep.vertex_candidates import generate_vertex_candidates |
| from winner_inference import refine_winner_candidates |
| except Exception: |
| try: |
| from submission.winner_inference import refine_winner_candidates |
| from s23dr.data_prep.vertex_candidates import generate_vertex_candidates |
| except Exception: |
| generate_vertex_candidates = None |
| refine_winner_candidates = None |
| model = _get_dgcnn_vertex_model() |
| if model is not None and generate_vertex_candidates is not None: |
| try: |
| cands = generate_vertex_candidates(entry, colmap_rec) |
| if cands: |
| refined = refine_winner_candidates( |
| cands, entry, model, |
| device=("cuda" if __import__('torch').cuda.is_available() else "cpu"), |
| cls_threshold=DGCNN_CLS_THRESHOLD, |
| ) |
| if refined: |
| from scipy.spatial import cKDTree as _cKD17 |
| tree17 = _cKD17(merged_v) if len(merged_v) >= 1 else None |
| new_pts = [] |
| replaced = set() |
| for xyz, _score in refined: |
| xyz_arr = np.asarray(xyz, dtype=np.float64) |
| if tree17 is None: |
| new_pts.append(xyz_arr) |
| continue |
| d, j = tree17.query(xyz_arr, k=1) |
| if d <= DGCNN_REPLACE_RADIUS: |
| |
| if int(j) not in replaced: |
| merged_v[int(j)] = xyz_arr |
| replaced.add(int(j)) |
| elif DGCNN_DEDUP_RADIUS < d <= DGCNN_MAX_DIST_TO_CLOUD: |
| new_pts.append(xyz_arr) |
| if new_pts: |
| merged_v = np.vstack([merged_v, np.array(new_pts, dtype=np.float64)]) |
| vertex_views = list(vertex_views) + [0] * len(new_pts) |
| except Exception: |
| pass |
|
|
| |
| |
| |
| |
| |
| |
| if USE_WINNER_CANDIDATES and _WINNER_OK and len(merged_v) >= 1: |
| try: |
| cands, _ = generate_winner_candidates(entry) |
| if cands: |
| cand_xyz = np.array([c.centroid for c in cands], dtype=np.float64) |
| from scipy.spatial import cKDTree as _cKD16 |
| tree16 = _cKD16(merged_v) |
| d, _j = tree16.query(cand_xyz, k=1) |
| |
| |
| keep_mask = (d > WINNER_DEDUP_RADIUS) & (d <= WINNER_MAX_DIST_TO_CLOUD) |
| new = cand_xyz[keep_mask] |
| if len(new) > 0: |
| merged_v = np.vstack([merged_v, new]) |
| vertex_views = list(vertex_views) + [0] * len(new) |
| except Exception: |
| pass |
|
|
| all_xyz = np.array([p.xyz for p in colmap_rec.points3D.values()]) |
| heur_set = set(tuple(sorted(e)) for e in heur_edges) |
|
|
| |
| kd_tree = None |
| if len(all_xyz) > 0: |
| try: |
| from scipy.spatial import KDTree |
| kd_tree = KDTree(all_xyz) |
| except Exception: |
| pass |
|
|
| |
| |
| |
| |
| _v2_model = ( |
| sklearn_model is not None |
| and getattr(sklearn_model, 'n_features_in_', 15) == 17 |
| ) |
| _need_line_track = (_v2_model or USE_RERANK) and _TRIANGULATION_OK |
| _precomputed_lines = None |
| _precomputed_tracks_lifted = None |
| if _need_line_track: |
| try: |
| from triangulation import triangulate_wireframe as _triwf |
| except ImportError: |
| try: |
| from submission.triangulation import triangulate_wireframe as _triwf |
| except ImportError: |
| _triwf = None |
| try: |
| from line_cloud import extract_3d_lines as _e3l, merge_3d_lines as _m3l |
| except ImportError: |
| try: |
| from submission.line_cloud import extract_3d_lines as _e3l, merge_3d_lines as _m3l |
| except ImportError: |
| _e3l = _m3l = None |
| if _triwf is not None: |
| try: |
| _t, _v, _g, _te = _triwf(entry, want_edges=True) |
| _precomputed_tracks_lifted = _lift_track_edges_to_merged_v( |
| _t, _te, merged_v, match_radius=ENSEMBLE_MATCH_RADIUS, |
| ) |
| except Exception: |
| pass |
| if _e3l is not None: |
| try: |
| _raw_lines, _ = _e3l(entry) |
| _precomputed_lines = _m3l(_raw_lines) |
| except Exception: |
| _precomputed_lines = None |
|
|
| if sklearn_model is not None: |
| features_list, pairs, supports = [], [], [] |
| n = len(merged_v) |
| for i in range(n): |
| for j in range(i + 1, n): |
| if np.linalg.norm(merged_v[i] - merged_v[j]) > 8.0: |
| continue |
| gs = 1 if (i, j) in heur_set else 0 |
| nv = min(vertex_views[i], vertex_views[j]) if len(vertex_views) > max(i, j) else 0 |
|
|
| |
| ls = ts = 0 |
| if _need_line_track: |
| ls = _line_support_for_edge( |
| merged_v[i], merged_v[j], _precomputed_lines or [], |
| ) |
| key = (i, j) if i < j else (j, i) |
| ts = 1 if (_precomputed_tracks_lifted and key in _precomputed_tracks_lifted) else 0 |
|
|
| if _v2_model: |
| feat = extract_edge_features( |
| merged_v[i], merged_v[j], all_xyz, gs, nv, |
| line_support=ls, track_support=ts, |
| ) |
| else: |
| feat = extract_edge_features(merged_v[i], merged_v[j], all_xyz, gs, nv) |
| features_list.append(feat) |
| pairs.append((i, j)) |
| supports.append((ls, ts)) |
|
|
| if features_list: |
| X = np.array(features_list) |
| probs = sklearn_model.predict_proba(X)[:, 1] |
| |
| |
| if USE_RERANK: |
| for k in range(len(pairs)): |
| ls, ts = supports[k] |
| if ls: |
| probs[k] = min(1.0, probs[k] + RERANK_BOOST_LINE) |
| if ts: |
| probs[k] = min(1.0, probs[k] + RERANK_BOOST_TRACK) |
| for k in range(len(pairs)): |
| if probs[k] >= edge_threshold: |
| heur_set.add(tuple(sorted(pairs[k]))) |
|
|
| edges = list(heur_set) |
|
|
| |
| validated = [e for e in edges if validate_edge(merged_v[e[0]], merged_v[e[1]], all_xyz, kd_tree)] |
| if not validated: |
| validated = edges |
|
|
| |
| |
| |
| |
| |
| if USE_PLANE_EDGES and _PLANES_OK and len(merged_v) >= 2: |
| try: |
| extra = predict_plane_edges(entry, merged_v, perp_tol=PLANE_PERP_TOL) |
| if extra: |
| validated_set = set(tuple(sorted(e)) for e in validated) |
| new_edges = [ |
| (a, b) for (a, b) in extra |
| if (min(a, b), max(a, b)) not in validated_set |
| ] |
| new_valid = [ |
| e for e in new_edges |
| if validate_edge(merged_v[e[0]], merged_v[e[1]], all_xyz, kd_tree) |
| ] |
| validated = list(validated_set | set(tuple(sorted(e)) for e in new_valid)) |
| except Exception: |
| pass |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| if USE_TRACK_ENSEMBLE and _TRIANGULATION_OK: |
| try: |
| tv, te = predict_wireframe_tracks(entry) |
| tv = np.asarray(tv, dtype=np.float64) |
| if len(tv) >= 2 and len(te) >= 1 and len(merged_v) >= 2: |
| |
| |
| |
| |
| |
| |
| t_idx_map: list[int | None] = [None] * len(tv) |
| added_vertices: list[np.ndarray] = [] |
| for i in range(len(tv)): |
| d = np.linalg.norm(merged_v - tv[i], axis=1) |
| j = int(np.argmin(d)) |
| if d[j] <= ENSEMBLE_MATCH_RADIUS: |
| t_idx_map[i] = j |
| elif (ADD_ISOLATED_TRACK_VERTICES |
| and ISOLATED_TRACK_MIN_DIST <= d[j] <= ISOLATED_TRACK_MAX_DIST): |
| added_vertices.append(tv[i]) |
| t_idx_map[i] = len(merged_v) + len(added_vertices) - 1 |
|
|
| if added_vertices: |
| merged_v = np.vstack([merged_v, np.asarray(added_vertices, dtype=np.float64)]) |
|
|
| extra_edges: set[tuple[int, int]] = set() |
| for (a, b) in te: |
| ia = t_idx_map[a] |
| ib = t_idx_map[b] |
| if ia is None or ib is None or ia == ib: |
| continue |
| lo, hi = (ia, ib) if ia < ib else (ib, ia) |
| extra_edges.add((lo, hi)) |
|
|
| |
| |
| |
| |
| |
| if BYPASS_VALIDATE_FOR_TRACKS: |
| extra_valid = list(extra_edges) |
| else: |
| extra_valid = [ |
| e for e in extra_edges |
| if validate_edge(merged_v[e[0]], merged_v[e[1]], all_xyz, kd_tree) |
| ] |
| validated = list(set(tuple(sorted(e)) for e in validated) | set(extra_valid)) |
| except Exception: |
| pass |
|
|
| |
| |
| |
| if USE_LINE_EDGES and _LINECLOUD_OK and len(merged_v) >= 2: |
| try: |
| from line_cloud import extract_3d_lines, merge_3d_lines |
| except ImportError: |
| from submission.line_cloud import extract_3d_lines, merge_3d_lines |
| try: |
| lines_3d, _ = extract_3d_lines(entry) |
| if lines_3d: |
| merged_lines = merge_3d_lines(lines_3d) |
| from scipy.spatial import cKDTree as _cKDTree2 |
| vtree = _cKDTree2(merged_v) |
| validated_set = set(tuple(sorted(e)) for e in validated) |
| line_edges: set[tuple[int, int]] = set() |
| for line in merged_lines: |
| |
| d1, i1 = vtree.query(line.p1) |
| d2, i2 = vtree.query(line.p2) |
| if d1 > LINE_EDGE_MATCH_RADIUS or d2 > LINE_EDGE_MATCH_RADIUS: |
| continue |
| if i1 == i2: |
| continue |
| lo, hi = (int(i1), int(i2)) if i1 < i2 else (int(i2), int(i1)) |
| if (lo, hi) not in validated_set: |
| line_edges.add((lo, hi)) |
| |
| |
| if BYPASS_VALIDATE_FOR_LINES: |
| new_valid = list(line_edges) |
| else: |
| new_valid = [ |
| e for e in line_edges |
| if validate_edge(merged_v[e[0]], merged_v[e[1]], all_xyz, kd_tree) |
| ] |
| validated = list(validated_set | set(new_valid)) |
| except Exception: |
| pass |
|
|
| |
| |
| |
| if USE_DEPTH_EDGES and _DEPTH_EDGES_OK and len(merged_v) >= 2: |
| try: |
| d_lines = extract_and_merge_depth_lines(entry) |
| if d_lines: |
| from scipy.spatial import cKDTree as _cKDTree3 |
| vtree = _cKDTree3(merged_v) |
| validated_set = set(tuple(sorted(e)) for e in validated) |
| depth_edges: set[tuple[int, int]] = set() |
| for line in d_lines: |
| d1, i1 = vtree.query(line.p1) |
| d2, i2 = vtree.query(line.p2) |
| if d1 > DEPTH_EDGE_MATCH_RADIUS or d2 > DEPTH_EDGE_MATCH_RADIUS: |
| continue |
| if i1 == i2: |
| continue |
| lo, hi = (int(i1), int(i2)) if i1 < i2 else (int(i2), int(i1)) |
| if (lo, hi) not in validated_set: |
| depth_edges.add((lo, hi)) |
| new_valid = [ |
| e for e in depth_edges |
| if validate_edge(merged_v[e[0]], merged_v[e[1]], all_xyz, kd_tree) |
| ] |
| validated = list(validated_set | set(new_valid)) |
| except Exception: |
| pass |
|
|
| |
| |
| |
| |
| |
| if USE_REPROJECTION_EDGE_VAL and validated: |
| try: |
| masks, mvs_views = _build_gestalt_edge_masks( |
| entry, dilate_px=REPROJ_MASK_DILATE_PX |
| ) |
| if masks and mvs_views: |
| kept = [ |
| e for e in validated |
| if validate_edge_reprojection( |
| merged_v[e[0]], merged_v[e[1]], |
| masks, mvs_views, |
| min_views=REPROJ_MIN_VIEWS, |
| min_hit_frac=REPROJ_MIN_HIT_FRAC, |
| ) |
| ] |
| |
| if len(kept) >= max(1, len(validated) // 3): |
| validated = kept |
| except Exception: |
| pass |
|
|
| |
| |
| |
| |
|
|
| final_v, final_e = prune_not_connected(merged_v, validated, keep_largest=False) |
| if len(final_v) < 2 or len(final_e) < 1: |
| return empty_solution() |
|
|
| |
| |
| |
| |
| |
| if USE_DGCNN_EDGES and len(final_v) >= 2: |
| edge_model = _get_dgcnn_edge_model() |
| if edge_model is not None: |
| try: |
| from winner_inference import score_edges |
| except ImportError: |
| try: |
| from submission.winner_inference import score_edges |
| except ImportError: |
| score_edges = None |
| if score_edges is not None: |
| try: |
| import torch as _torch |
| device = "cuda" if _torch.cuda.is_available() else "cpu" |
| dgcnn_edges = score_edges( |
| np.asarray(final_v, dtype=np.float64), |
| entry, edge_model, |
| device=device, |
| threshold=DGCNN_EDGE_THRESHOLD, |
| ) |
| if dgcnn_edges: |
| masks, mvs_views = {}, {} |
| try: |
| masks, mvs_views = _build_gestalt_edge_masks( |
| entry, dilate_px=DGCNN_EDGE_REPROJ_DILATE_PX, |
| ) |
| except Exception: |
| pass |
| extra = _select_dgcnn_edges( |
| np.asarray(final_v, dtype=np.float64), |
| final_e, |
| dgcnn_edges, |
| all_xyz, |
| kd_tree, |
| masks=masks, |
| views=mvs_views, |
| ) |
| if extra: |
| final_e.extend(extra) |
| except Exception: |
| pass |
|
|
| |
| |
| |
| if USE_BUNDLE_ADJUST and _BA_OK and len(final_v) >= 2: |
| try: |
| final_v = refine_vertices_ba( |
| np.asarray(final_v, dtype=np.float64), entry, |
| min_initial_err_px=3.0, |
| ) |
| except Exception: |
| pass |
|
|
| return final_v, [(int(a), int(b)) for a, b in final_e] |
|
|