| """Depth-discontinuity edge source. |
| |
| Independent from the gestalt segmentation: extracts 2D line segments |
| along sharp depth jumps inside the house silhouette, lifts them to 3D |
| via the affine-fitted depth map, then merges across views. |
| |
| Pipeline: |
| 1. Affine-fit COLMAP-calibrated depth (same as the rest of the pipeline). |
| 2. Inside the eroded ADE20k house mask, run Canny on normalised depth. |
| 3. Connected components → fit 2D line per component. |
| 4. Sample N depth values along each 2D segment, unproject to 3D. |
| 5. RANSAC-fit a 3D line through the unprojected samples. |
| 6. Merge lines across views (direction + midpoint proximity). |
| |
| The merged 3D lines have endpoints (p1, p2) suitable for the same |
| 'edges-only lift onto merged_v' integration that v11 does for gestalt |
| line cloud. Since gestalt and depth-discontinuity sources are independent, |
| their lifts should be additive. |
| |
| Entry point: |
| extract_depth_3d_lines(entry) -> list[Line3D] |
| """ |
|
|
| from __future__ import annotations |
|
|
| import numpy as np |
| import cv2 |
|
|
| from hoho2025.example_solutions import ( |
| convert_entry_to_human_readable, |
| get_sparse_depth, get_house_mask, |
| ) |
|
|
| try: |
| from line_cloud import Line3D, _fit_3d_line_ransac, _unproject_pixel, merge_3d_lines |
| from mvs_utils import collect_views |
| from sklearn_submission import fit_affine_ransac |
| except ImportError: |
| from submission.line_cloud import Line3D, _fit_3d_line_ransac, _unproject_pixel, merge_3d_lines |
| from submission.mvs_utils import collect_views |
| from submission.sklearn_submission import fit_affine_ransac |
|
|
|
|
| def _detect_depth_segments_2d( |
| depth_fitted: np.ndarray, |
| house_mask: np.ndarray, |
| canny_lo: int = 30, |
| canny_hi: int = 80, |
| erode_px: int = 9, |
| min_area_px: int = 20, |
| min_seglen_px: int = 25, |
| ): |
| """Return list of (xs, ys, p1, p2) for each detected 2D line segment.""" |
| if depth_fitted.size == 0: |
| return [] |
| H, W = depth_fitted.shape[:2] |
| eroded = cv2.erode( |
| house_mask.astype(np.uint8), |
| np.ones((erode_px, erode_px), np.uint8), |
| ).astype(bool) |
| if eroded.sum() < 100: |
| return [] |
|
|
| |
| d_in = depth_fitted.copy() |
| in_d = d_in[eroded] |
| if in_d.size == 0: |
| return [] |
| d_min, d_max = float(in_d.min()), float(in_d.max()) |
| if d_max - d_min < 0.5: |
| return [] |
| d_norm = np.clip((d_in - d_min) / (d_max - d_min), 0.0, 1.0) |
| d_u8 = (d_norm * 255).astype(np.uint8) |
| d_u8 = cv2.GaussianBlur(d_u8, (5, 5), 0) |
|
|
| canny = cv2.Canny(d_u8, canny_lo, canny_hi) |
| canny[~eroded] = 0 |
| if canny.sum() == 0: |
| return [] |
|
|
| n_lbl, lbl, stats, _ = cv2.connectedComponentsWithStats(canny, 8) |
| out = [] |
| for i in range(1, n_lbl): |
| area = int(stats[i, cv2.CC_STAT_AREA]) |
| if area < min_area_px: |
| continue |
| ys, xs = np.where(lbl == i) |
| if len(xs) < 3: |
| continue |
| pts = np.column_stack([xs, ys]).astype(np.float32) |
| line = cv2.fitLine(pts, cv2.DIST_L2, 0, 0.01, 0.01) |
| vx, vy, x0, y0 = line.ravel() |
| proj = (xs - x0) * vx + (ys - y0) * vy |
| t_min, t_max = float(proj.min()), float(proj.max()) |
| seglen = t_max - t_min |
| if seglen < min_seglen_px: |
| continue |
| p1 = np.array([x0 + t_min * vx, y0 + t_min * vy]) |
| p2 = np.array([x0 + t_max * vx, y0 + t_max * vy]) |
| out.append((xs, ys, p1, p2, (vx, vy, x0, y0, t_min, t_max))) |
| return out |
|
|
|
|
| def extract_depth_3d_lines_single_view( |
| depth_fitted: np.ndarray, |
| house_mask: np.ndarray, |
| view_info: dict, |
| n_samples: int = 30, |
| ) -> list[Line3D]: |
| """Extract 3D lines from depth discontinuities in a single view.""" |
| H, W = depth_fitted.shape[:2] |
| K = view_info['K'] |
| R = view_info['R'] |
| t = view_info['t'] |
| K_inv = np.linalg.inv(K) |
| R_inv = R.T |
| cam_center = -R_inv @ t |
|
|
| segments = _detect_depth_segments_2d(depth_fitted, house_mask) |
| out: list[Line3D] = [] |
| view_id = view_info['image_id'] |
|
|
| for _, _, _, _, params in segments: |
| vx, vy, x0, y0, t_min, t_max = params |
| ts = np.linspace(t_min, t_max, n_samples) |
| pts3d_list = [] |
| for tv in ts: |
| u = x0 + tv * vx |
| v_px = y0 + tv * vy |
| ui, vi = int(round(u)), int(round(v_px)) |
| if 0 <= ui < W and 0 <= vi < H: |
| d = depth_fitted[vi, ui] |
| p = _unproject_pixel(u, v_px, d, K_inv, R_inv, cam_center) |
| if p is not None: |
| pts3d_list.append(p) |
|
|
| if len(pts3d_list) < 5: |
| continue |
|
|
| pts3d = np.array(pts3d_list, dtype=np.float64) |
| result = _fit_3d_line_ransac(pts3d, n_iter=50, inlier_th=0.3, min_inliers=5) |
| if result is None: |
| continue |
| centroid, direction, inlier_pts = result |
| s = (inlier_pts - centroid) @ direction |
| p1 = centroid + float(s.min()) * direction |
| p2 = centroid + float(s.max()) * direction |
| length = float(np.linalg.norm(p2 - p1)) |
| if length < 0.4: |
| continue |
|
|
| out.append(Line3D( |
| point=centroid, |
| direction=direction, |
| p1=p1, p2=p2, |
| length=length, |
| n_inliers=len(inlier_pts), |
| edge_class='depth_discontinuity', |
| view_id=view_id, |
| )) |
| return out |
|
|
|
|
| def extract_depth_3d_lines(entry) -> tuple[list[Line3D], dict]: |
| """Extract depth-discontinuity 3D lines from all views. |
| |
| Returns (all_lines, good_entry). |
| """ |
| good = convert_entry_to_human_readable(entry) |
| colmap_rec = good.get('colmap') or good.get('colmap_binary') |
| if colmap_rec is None: |
| return [], good |
|
|
| views = collect_views(colmap_rec, good['image_ids']) |
| all_lines: list[Line3D] = [] |
|
|
| for gest, depth, img_id, ade_seg in zip( |
| good['gestalt'], good['depth'], good['image_ids'], good['ade'] |
| ): |
| info = views.get(img_id) |
| if info is None: |
| continue |
| depth_np = np.array(depth).astype(np.float64) / 1000.0 |
| H, W = depth_np.shape[:2] |
|
|
| |
| try: |
| depth_sparse, found, _, _ = get_sparse_depth(colmap_rec, img_id, depth_np) |
| if found: |
| _, _, depth_np = fit_affine_ransac( |
| depth_np, depth_sparse, get_house_mask(ade_seg), |
| ) |
| except Exception: |
| pass |
|
|
| try: |
| house = get_house_mask(ade_seg) |
| house_resized = cv2.resize( |
| house.astype(np.uint8), (W, H), interpolation=cv2.INTER_NEAREST, |
| ) > 0 |
| except Exception: |
| continue |
|
|
| view_lines = extract_depth_3d_lines_single_view( |
| depth_np, house_resized, info, |
| ) |
| all_lines.extend(view_lines) |
|
|
| return all_lines, good |
|
|
|
|
| def extract_and_merge_depth_lines(entry) -> list[Line3D]: |
| """Convenience: extract + merge across views.""" |
| lines, _ = extract_depth_3d_lines(entry) |
| if not lines: |
| return [] |
| return merge_3d_lines(lines) |
|
|