"""Depth-discontinuity edge source. Independent from the gestalt segmentation: extracts 2D line segments along sharp depth jumps inside the house silhouette, lifts them to 3D via the affine-fitted depth map, then merges across views. Pipeline: 1. Affine-fit COLMAP-calibrated depth (same as the rest of the pipeline). 2. Inside the eroded ADE20k house mask, run Canny on normalised depth. 3. Connected components → fit 2D line per component. 4. Sample N depth values along each 2D segment, unproject to 3D. 5. RANSAC-fit a 3D line through the unprojected samples. 6. Merge lines across views (direction + midpoint proximity). The merged 3D lines have endpoints (p1, p2) suitable for the same 'edges-only lift onto merged_v' integration that v11 does for gestalt line cloud. Since gestalt and depth-discontinuity sources are independent, their lifts should be additive. Entry point: extract_depth_3d_lines(entry) -> list[Line3D] """ from __future__ import annotations import numpy as np import cv2 from hoho2025.example_solutions import ( convert_entry_to_human_readable, get_sparse_depth, get_house_mask, ) try: from line_cloud import Line3D, _fit_3d_line_ransac, _unproject_pixel, merge_3d_lines from mvs_utils import collect_views from sklearn_submission import fit_affine_ransac except ImportError: from submission.line_cloud import Line3D, _fit_3d_line_ransac, _unproject_pixel, merge_3d_lines from submission.mvs_utils import collect_views from submission.sklearn_submission import fit_affine_ransac def _detect_depth_segments_2d( depth_fitted: np.ndarray, house_mask: np.ndarray, canny_lo: int = 30, canny_hi: int = 80, erode_px: int = 9, min_area_px: int = 20, min_seglen_px: int = 25, ): """Return list of (xs, ys, p1, p2) for each detected 2D line segment.""" if depth_fitted.size == 0: return [] H, W = depth_fitted.shape[:2] eroded = cv2.erode( house_mask.astype(np.uint8), np.ones((erode_px, erode_px), np.uint8), ).astype(bool) if eroded.sum() < 100: return [] # Normalise depth inside the eroded house mask to [0, 255] d_in = depth_fitted.copy() in_d = d_in[eroded] if in_d.size == 0: return [] d_min, d_max = float(in_d.min()), float(in_d.max()) if d_max - d_min < 0.5: return [] d_norm = np.clip((d_in - d_min) / (d_max - d_min), 0.0, 1.0) d_u8 = (d_norm * 255).astype(np.uint8) d_u8 = cv2.GaussianBlur(d_u8, (5, 5), 0) canny = cv2.Canny(d_u8, canny_lo, canny_hi) canny[~eroded] = 0 if canny.sum() == 0: return [] n_lbl, lbl, stats, _ = cv2.connectedComponentsWithStats(canny, 8) out = [] for i in range(1, n_lbl): area = int(stats[i, cv2.CC_STAT_AREA]) if area < min_area_px: continue ys, xs = np.where(lbl == i) if len(xs) < 3: continue pts = np.column_stack([xs, ys]).astype(np.float32) line = cv2.fitLine(pts, cv2.DIST_L2, 0, 0.01, 0.01) vx, vy, x0, y0 = line.ravel() proj = (xs - x0) * vx + (ys - y0) * vy t_min, t_max = float(proj.min()), float(proj.max()) seglen = t_max - t_min if seglen < min_seglen_px: continue p1 = np.array([x0 + t_min * vx, y0 + t_min * vy]) p2 = np.array([x0 + t_max * vx, y0 + t_max * vy]) out.append((xs, ys, p1, p2, (vx, vy, x0, y0, t_min, t_max))) return out def extract_depth_3d_lines_single_view( depth_fitted: np.ndarray, house_mask: np.ndarray, view_info: dict, n_samples: int = 30, ) -> list[Line3D]: """Extract 3D lines from depth discontinuities in a single view.""" H, W = depth_fitted.shape[:2] K = view_info['K'] R = view_info['R'] t = view_info['t'] K_inv = np.linalg.inv(K) R_inv = R.T cam_center = -R_inv @ t segments = _detect_depth_segments_2d(depth_fitted, house_mask) out: list[Line3D] = [] view_id = view_info['image_id'] for _, _, _, _, params in segments: vx, vy, x0, y0, t_min, t_max = params ts = np.linspace(t_min, t_max, n_samples) pts3d_list = [] for tv in ts: u = x0 + tv * vx v_px = y0 + tv * vy ui, vi = int(round(u)), int(round(v_px)) if 0 <= ui < W and 0 <= vi < H: d = depth_fitted[vi, ui] p = _unproject_pixel(u, v_px, d, K_inv, R_inv, cam_center) if p is not None: pts3d_list.append(p) if len(pts3d_list) < 5: continue pts3d = np.array(pts3d_list, dtype=np.float64) result = _fit_3d_line_ransac(pts3d, n_iter=50, inlier_th=0.3, min_inliers=5) if result is None: continue centroid, direction, inlier_pts = result s = (inlier_pts - centroid) @ direction p1 = centroid + float(s.min()) * direction p2 = centroid + float(s.max()) * direction length = float(np.linalg.norm(p2 - p1)) if length < 0.4: continue out.append(Line3D( point=centroid, direction=direction, p1=p1, p2=p2, length=length, n_inliers=len(inlier_pts), edge_class='depth_discontinuity', view_id=view_id, )) return out def extract_depth_3d_lines(entry) -> tuple[list[Line3D], dict]: """Extract depth-discontinuity 3D lines from all views. Returns (all_lines, good_entry). """ good = convert_entry_to_human_readable(entry) colmap_rec = good.get('colmap') or good.get('colmap_binary') if colmap_rec is None: return [], good views = collect_views(colmap_rec, good['image_ids']) all_lines: list[Line3D] = [] for gest, depth, img_id, ade_seg in zip( good['gestalt'], good['depth'], good['image_ids'], good['ade'] ): info = views.get(img_id) if info is None: continue depth_np = np.array(depth).astype(np.float64) / 1000.0 H, W = depth_np.shape[:2] # Affine fit (same as main pipeline) try: depth_sparse, found, _, _ = get_sparse_depth(colmap_rec, img_id, depth_np) if found: _, _, depth_np = fit_affine_ransac( depth_np, depth_sparse, get_house_mask(ade_seg), ) except Exception: pass try: house = get_house_mask(ade_seg) house_resized = cv2.resize( house.astype(np.uint8), (W, H), interpolation=cv2.INTER_NEAREST, ) > 0 except Exception: continue view_lines = extract_depth_3d_lines_single_view( depth_np, house_resized, info, ) all_lines.extend(view_lines) return all_lines, good def extract_and_merge_depth_lines(entry) -> list[Line3D]: """Convenience: extract + merge across views.""" lines, _ = extract_depth_3d_lines(entry) if not lines: return [] return merge_3d_lines(lines)