Spaces:
Paused
Paused
| """ | |
| Wall Extraction Pipeline | |
| ======================== | |
| EXACT algorithm from GeometryAgent v5. | |
| Only the GPU capability detection block has been hardened to probe | |
| actual CUDA allocations before committing β this prevents the | |
| cudaErrorInsufficientDriver crash when the host driver is older | |
| than the installed CUDA runtime. | |
| All wall extraction logic (stages 1-8, bridging, calibration, wand) | |
| is byte-for-byte identical to the original GeometryAgent source. | |
| """ | |
| from __future__ import annotations | |
| import numpy as np | |
| import cv2 | |
| from dataclasses import dataclass | |
| from typing import List, Dict, Any, Tuple, Optional | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # GPU capability detection β probe before commit | |
| # | |
| # The key insight: CuPy/PyTorch import successfully even when the CUDA *driver* | |
| # is too old for the installed CUDA *runtime*. The error only fires on the | |
| # first real allocation. We do a tiny probe allocation inside a broad | |
| # except-Exception guard so every possible CUDA error degrades gracefully. | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββ CuPy βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| import cupy as _cp_probe | |
| import cupyx.scipy.ndimage as _cpnd_probe | |
| # Force a real CUDA context + allocation to expose driver mismatches | |
| _cp_probe.zeros(1, dtype=_cp_probe.uint8) | |
| # Survived β re-bind to public names | |
| import cupy as cp # type: ignore[assignment] | |
| import cupyx.scipy.ndimage as cpnd | |
| _GPU = True | |
| _CUPY = True | |
| print(f"[GPU] CuPy OK version={cp.__version__}") | |
| except ImportError: | |
| cp = np # type: ignore[assignment] | |
| cpnd = None | |
| _GPU = False | |
| _CUPY = False | |
| print("[GPU] CuPy not installed β CPU fallback") | |
| except Exception as _ce: | |
| # Catches CUDARuntimeError (driver too old), CUDADriverError, etc. | |
| cp = np # type: ignore[assignment] | |
| cpnd = None | |
| _GPU = False | |
| _CUPY = False | |
| print(f"[GPU] CuPy DISABLED ({type(_ce).__name__}: {_ce})") | |
| print("[GPU] All CuPy ops β NumPy fallback") | |
| # ββ PyTorch βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| import torch as _torch_probe | |
| _TORCH = True | |
| try: | |
| _TORCH_CUDA = _torch_probe.cuda.is_available() | |
| if _TORCH_CUDA: | |
| _torch_probe.zeros(1, device="cuda") # probe real allocation | |
| print(f"[GPU] PyTorch CUDA OK device={_torch_probe.cuda.get_device_name(0)}") | |
| else: | |
| print("[GPU] PyTorch: CUDA not available β CPU tensors") | |
| except Exception as _te: | |
| _TORCH_CUDA = False | |
| print(f"[GPU] PyTorch CUDA DISABLED ({type(_te).__name__}: {_te})") | |
| import torch | |
| _DEVICE = torch.device("cuda" if _TORCH_CUDA else "cpu") | |
| except ImportError: | |
| _TORCH = _TORCH_CUDA = False | |
| _DEVICE = None | |
| print("[GPU] PyTorch not installed") | |
| # ββ OpenCV CUDA βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _CV_CUDA = False | |
| try: | |
| _n = cv2.cuda.getCudaEnabledDeviceCount() | |
| if _n > 0: | |
| _pm = cv2.cuda_GpuMat() | |
| _pm.upload(np.zeros((2, 2), np.uint8)) # probe | |
| del _pm | |
| _CV_CUDA = True | |
| print(f"[GPU] OpenCV CUDA OK devices={_n}") | |
| else: | |
| print("[GPU] OpenCV CUDA: no CUDA-enabled devices") | |
| except AttributeError: | |
| print("[GPU] OpenCV CUDA module absent") | |
| except Exception as _oce: | |
| print(f"[GPU] OpenCV CUDA DISABLED ({type(_oce).__name__}: {_oce})") | |
| # ββ scikit-image skeleton βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| from skimage.morphology import skeletonize as _sk_skel | |
| _SKIMAGE = True | |
| except ImportError: | |
| _SKIMAGE = False | |
| # ββ scipy KD-tree βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| from scipy.spatial import cKDTree | |
| _SCIPY = True | |
| except ImportError: | |
| _SCIPY = False | |
| print(f"[GPU] Summary: CuPy={_CUPY} PyTorchCUDA={_TORCH_CUDA} OpenCV-CUDA={_CV_CUDA}") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CuPy / NumPy shims (unchanged from original) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _to_gpu(arr: np.ndarray): | |
| return cp.asarray(arr) if _GPU else arr | |
| def _to_cpu(arr) -> np.ndarray: | |
| return cp.asnumpy(arr) if _GPU else arr | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # RLE helpers (original) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def mask_to_rle(mask: np.ndarray) -> Dict[str, Any]: | |
| h, w = mask.shape | |
| flat = mask.flatten(order='F').astype(bool) | |
| counts: List[int] = [] | |
| current_val = False | |
| run = 0 | |
| for v in flat: | |
| if v == current_val: | |
| run += 1 | |
| else: | |
| counts.append(run) | |
| run = 1 | |
| current_val = v | |
| counts.append(run) | |
| if mask[0, 0]: | |
| counts.insert(0, 0) | |
| return {"counts": counts, "size": [h, w]} | |
| def _mask_to_contour_flat(mask: np.ndarray) -> List[float]: | |
| contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) | |
| if not contours: | |
| return [] | |
| largest = max(contours, key=cv2.contourArea) | |
| pts = largest[:, 0, :].tolist() | |
| return [v for pt in pts for v in pt] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Calibration dataclass (original) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class WallCalibration: | |
| stroke_width : int = 3 | |
| min_component_dim : int = 30 | |
| min_component_area: int = 45 | |
| bridge_min_gap : int = 2 | |
| bridge_max_gap : int = 14 | |
| door_gap : int = 41 | |
| max_bridge_thick : int = 15 | |
| def as_dict(self): | |
| return { | |
| "stroke_width" : self.stroke_width, | |
| "min_component_dim" : self.min_component_dim, | |
| "min_component_area": self.min_component_area, | |
| "bridge_min_gap" : self.bridge_min_gap, | |
| "bridge_max_gap" : self.bridge_max_gap, | |
| "door_gap" : self.door_gap, | |
| "max_bridge_thick" : self.max_bridge_thick, | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Core pipeline class β EXACT original GeometryAgent implementation | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class WallPipeline: | |
| """ | |
| Stateless (per-call) wall extraction + room segmentation. | |
| All intermediate images are stored in stage_images for the UI. | |
| """ | |
| MIN_ROOM_AREA_FRAC = 0.000004 | |
| MAX_ROOM_AREA_FRAC = 0.08 | |
| MIN_ROOM_DIM_FRAC = 0.01 | |
| BORDER_MARGIN_FRAC = 0.01 | |
| MAX_ASPECT_RATIO = 8.0 | |
| MIN_SOLIDITY = 0.25 | |
| MIN_EXTENT = 0.08 | |
| FIXTURE_MAX_BLOB_DIM = 80 | |
| FIXTURE_MAX_AREA = 4000 | |
| FIXTURE_MAX_ASPECT = 4.0 | |
| FIXTURE_DENSITY_RADIUS = 50 | |
| FIXTURE_DENSITY_THRESHOLD = 0.35 | |
| FIXTURE_MIN_ZONE_AREA = 1500 | |
| DOOR_ARC_MIN_RADIUS = 60 | |
| DOOR_ARC_MAX_RADIUS = 320 | |
| def __init__(self, progress_cb=None, sam_checkpoint: str = ""): | |
| self.progress_cb = progress_cb or (lambda msg, pct: None) | |
| self._wall_cal : Optional[WallCalibration] = None | |
| self._wall_thickness : int = 8 | |
| self.stage_images : Dict[str, np.ndarray] = {} | |
| self._sam_checkpoint = sam_checkpoint | |
| def _log(self, msg: str, pct: int): | |
| print(f" [{pct:3d}%] {msg}") | |
| self.progress_cb(msg, pct) | |
| def _save(self, key: str, img: np.ndarray): | |
| self.stage_images[key] = img.copy() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Public entry point (original flow, original step names) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run(self, img_bgr: np.ndarray, | |
| extra_door_lines: List[Tuple[int,int,int,int]] = None, | |
| use_sam: bool = True, | |
| ) -> Tuple[np.ndarray, np.ndarray, WallCalibration]: | |
| """ | |
| Returns (wall_mask uint8, room_mask uint8, WallCalibration). | |
| extra_door_lines: [(x1,y1,x2,y2), β¦] painted onto walls before seg. | |
| """ | |
| self.stage_images = {} | |
| self._log("Step 1 β Removing title block", 5) | |
| img = self._remove_title_block(img_bgr) | |
| self._save("01_title_removed", img) | |
| self._log("Step 2 β Removing colored annotations", 12) | |
| img = self._remove_colors(img) | |
| self._save("02_colors_removed", img) | |
| self._log("Step 3 β Closing door arcs", 20) | |
| img = self._close_door_arcs(img) | |
| self._save("03_door_arcs", img) | |
| self._log("Step 4 β Extracting walls", 30) | |
| walls = self._extract_walls(img) | |
| self._save("04_walls_raw", walls) | |
| self._log("Step 5b β Removing fixture symbols", 38) | |
| walls = self._remove_fixtures(walls) | |
| self._save("05b_no_fixtures", walls) | |
| self._log("Step 5c β Calibrating & removing thin lines", 45) | |
| self._wall_cal = self._calibrate_wall(walls) | |
| walls = self._remove_thin_lines_calibrated(walls) | |
| self._save("05c_thin_removed", walls) | |
| self._log("Step 5d β Bridging wall endpoints", 55) | |
| walls = self._bridge_endpoints(walls) | |
| self._save("05d_bridged", walls) | |
| self._log("Step 5e β Closing door openings", 63) | |
| walls = self._close_door_openings(walls) | |
| self._save("05e_doors_closed", walls) | |
| self._log("Step 5f β Removing dangling lines", 70) | |
| walls = self._remove_dangling(walls) | |
| self._save("05f_dangling_removed", walls) | |
| self._log("Step 5g β Sealing large door gaps", 76) | |
| walls = self._close_large_gaps(walls) | |
| self._save("05g_large_gaps", walls) | |
| # Paint extra door-seal lines from UI | |
| if extra_door_lines: | |
| self._log("Applying manual door seal lines", 79) | |
| lw = max(3, self._wall_cal.stroke_width if self._wall_cal else 3) | |
| for x1, y1, x2, y2 in extra_door_lines: | |
| cv2.line(walls, (x1, y1), (x2, y2), 255, lw) | |
| self._save("05h_manual_doors", walls) | |
| # SAM segmentation (optional, falls back to flood-fill) | |
| rooms = None | |
| if use_sam and _TORCH_CUDA: | |
| self._log("Step 7 β SAM segmentation [Torch GPU]", 80) | |
| rooms = self._segment_with_sam(img_bgr, walls) | |
| if rooms is None: | |
| self._log("Step 7 β Flood-fill room segmentation", 85) | |
| rooms = self._segment_rooms(walls) | |
| self._save("07_rooms", rooms) | |
| self._log("Step 8 β Filtering room regions", 93) | |
| valid_mask, _ = self._filter_rooms(rooms, img_bgr.shape) | |
| self._save("08_rooms_filtered", valid_mask) | |
| self._log("Done", 100) | |
| return walls, valid_mask, self._wall_cal | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Stage 1 β Remove title block (original) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _remove_title_block(self, img: np.ndarray) -> np.ndarray: | |
| h, w = img.shape[:2] | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| edges = cv2.Canny(gray, 50, 150) | |
| h_kern = cv2.getStructuringElement(cv2.MORPH_RECT, (w // 20, 1)) | |
| v_kern = cv2.getStructuringElement(cv2.MORPH_RECT, (1, h // 20)) | |
| h_lines = cv2.morphologyEx(edges, cv2.MORPH_OPEN, h_kern) | |
| v_lines = cv2.morphologyEx(edges, cv2.MORPH_OPEN, v_kern) | |
| crop_right, crop_bottom = w, h | |
| right_region = v_lines[:, int(w * 0.7):] | |
| if np.any(right_region): | |
| vp = np.where(np.sum(right_region, axis=0) > h * 0.3)[0] | |
| if len(vp): | |
| crop_right = int(w * 0.7) + vp[0] - 10 | |
| bot_region = h_lines[int(h * 0.7):, :] | |
| if np.any(bot_region): | |
| hp = np.where(np.sum(bot_region, axis=1) > w * 0.3)[0] | |
| if len(hp): | |
| crop_bottom = int(h * 0.7) + hp[0] - 10 | |
| return img[:crop_bottom, :crop_right].copy() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Stage 2 β Remove colors (original β GPU via CuPy when available) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _remove_colors(self, img: np.ndarray) -> np.ndarray: | |
| if _GPU: | |
| g_img = _to_gpu(img.astype(np.int32)) | |
| b, gch, r = g_img[:,:,0], g_img[:,:,1], g_img[:,:,2] | |
| gray = (0.114*b + 0.587*gch + 0.299*r) | |
| chroma = cp.maximum(cp.maximum(r,gch),b) - cp.minimum(cp.minimum(r,gch),b) | |
| erase = (chroma > 15) & (gray < 240) | |
| result = _to_gpu(img.copy()) | |
| result[erase] = cp.array([255,255,255], dtype=cp.uint8) | |
| return _to_cpu(result) | |
| else: | |
| b = img[:,:,0].astype(np.int32) | |
| g = img[:,:,1].astype(np.int32) | |
| r = img[:,:,2].astype(np.int32) | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY).astype(np.int32) | |
| chroma = np.maximum(np.maximum(r,g),b) - np.minimum(np.minimum(r,g),b) | |
| erase = (chroma > 15) & (gray < 240) | |
| result = img.copy() | |
| result[erase] = (255, 255, 255) | |
| return result | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Stage 3 β Close door arcs (original) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _close_door_arcs(self, img: np.ndarray) -> np.ndarray: | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| h, w = gray.shape | |
| result = img.copy() | |
| _, binary = cv2.threshold(gray, 0, 255, | |
| cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) | |
| binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, np.ones((3,3), np.uint8)) | |
| blurred = cv2.GaussianBlur(gray, (7,7), 1.5) | |
| raw = cv2.HoughCircles(blurred, cv2.HOUGH_GRADIENT, | |
| dp=1.2, minDist=50, param1=50, param2=22, | |
| minRadius=self.DOOR_ARC_MIN_RADIUS, | |
| maxRadius=self.DOOR_ARC_MAX_RADIUS) | |
| if raw is None: | |
| return result | |
| circles = np.round(raw[0]).astype(np.int32) | |
| for cx, cy, r in circles: | |
| angles = np.linspace(0, 2*np.pi, 360, endpoint=False) | |
| xs = np.clip((cx + r*np.cos(angles)).astype(np.int32), 0, w-1) | |
| ys = np.clip((cy + r*np.sin(angles)).astype(np.int32), 0, h-1) | |
| on_wall = binary[ys, xs] > 0 | |
| if not np.any(on_wall): | |
| continue | |
| occ = angles[on_wall] | |
| span = float(np.degrees(occ[-1] - occ[0])) | |
| if not (60 <= span <= 115): | |
| continue | |
| leaf_r = r * 0.92 | |
| n_pts = max(60, int(r)) | |
| la = np.linspace(0, 2*np.pi, n_pts, endpoint=False) | |
| lx = np.clip((cx + leaf_r*np.cos(la)).astype(np.int32), 0, w-1) | |
| ly = np.clip((cy + leaf_r*np.sin(la)).astype(np.int32), 0, h-1) | |
| if float(np.mean(binary[ly, lx] > 0)) < 0.35: | |
| continue | |
| gap_thresh = np.radians(25.0) | |
| diffs = np.diff(occ) | |
| big = np.where(diffs > gap_thresh)[0] | |
| if len(big) == 0: | |
| start_a, end_a = occ[0], occ[-1] | |
| else: | |
| split = big[np.argmax(diffs[big])] | |
| start_a, end_a = occ[split+1], occ[split] | |
| ep1 = (int(round(cx + r*np.cos(start_a))), | |
| int(round(cy + r*np.sin(start_a)))) | |
| ep2 = (int(round(cx + r*np.cos(end_a))), | |
| int(round(cy + r*np.sin(end_a)))) | |
| ep1 = (np.clip(ep1[0],0,w-1), np.clip(ep1[1],0,h-1)) | |
| ep2 = (np.clip(ep2[0],0,w-1), np.clip(ep2[1],0,h-1)) | |
| cv2.line(result, ep1, ep2, (0,0,0), 3) | |
| return result | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Stage 4 β Extract walls (exact GeometryAgent.extract_walls_adaptive) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _extract_walls(self, img: np.ndarray) -> np.ndarray: | |
| """ | |
| Exact port of GeometryAgent.extract_walls_adaptive(). | |
| Uses analyze_image_characteristics() for the threshold, then: | |
| H/V morph-open β body dilate β collision resolve β distance gate | |
| β _remove_thin_lines β small-CC noise filter β _filter_double_lines_and_thick | |
| """ | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| h, w = gray.shape | |
| # ββ adaptive threshold (identical to analyze_image_characteristics) ββ | |
| brightness = float(np.mean(gray)) | |
| contrast = float(np.std(gray)) | |
| otsu_thr, _ = cv2.threshold(gray, 0, 255, | |
| cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) | |
| wall_pct = np.sum(_ > 0) / _.size * 100 | |
| if brightness > 220: | |
| wall_threshold = max(200, int(otsu_thr * 1.1)) | |
| elif brightness < 180: | |
| wall_threshold = max(150, int(otsu_thr * 0.9)) | |
| else: | |
| wall_threshold = int(otsu_thr) | |
| _, binary = cv2.threshold(gray, wall_threshold, 255, cv2.THRESH_BINARY_INV) | |
| min_line_len = max(8, int(0.012 * w)) | |
| body_thickness = self._estimate_wall_body_thickness(binary, fallback=12) | |
| body_thickness = int(np.clip(body_thickness, 9, 30)) | |
| print(f" min_line={min_line_len}px body={body_thickness}px (w={w}px)") | |
| k_h = cv2.getStructuringElement(cv2.MORPH_RECT, (min_line_len, 1)) | |
| k_v = cv2.getStructuringElement(cv2.MORPH_RECT, (1, min_line_len)) | |
| long_h = cv2.morphologyEx(binary, cv2.MORPH_OPEN, k_h) | |
| long_v = cv2.morphologyEx(binary, cv2.MORPH_OPEN, k_v) | |
| orig_walls = cv2.bitwise_or(long_h, long_v) | |
| k_bh = cv2.getStructuringElement(cv2.MORPH_RECT, (1, body_thickness)) | |
| k_bv = cv2.getStructuringElement(cv2.MORPH_RECT, (body_thickness, 1)) | |
| dilated_h = cv2.dilate(long_h, k_bh) | |
| dilated_v = cv2.dilate(long_v, k_bv) | |
| walls = cv2.bitwise_or(dilated_h, dilated_v) | |
| collision = cv2.bitwise_and(dilated_h, dilated_v) | |
| safe_zone = cv2.bitwise_and(collision, orig_walls) | |
| walls = cv2.bitwise_or( | |
| cv2.bitwise_and(walls, cv2.bitwise_not(collision)), | |
| safe_zone | |
| ) | |
| dist = cv2.distanceTransform(cv2.bitwise_not(orig_walls), cv2.DIST_L2, 5) | |
| keep_mask = (dist <= (body_thickness / 2)).astype(np.uint8) * 255 | |
| walls = cv2.bitwise_and(walls, keep_mask) | |
| walls = self._remove_thin_lines(walls, min_thickness=body_thickness) | |
| n_lbl, labels, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8) | |
| if n_lbl > 1: | |
| areas = stats[1:, cv2.CC_STAT_AREA] | |
| min_noise = max(20, int(np.median(areas) * 0.0001)) | |
| keep_lut = np.zeros(n_lbl, dtype=np.uint8) | |
| keep_lut[1:] = (areas >= min_noise).astype(np.uint8) | |
| walls = (keep_lut[labels] * 255).astype(np.uint8) | |
| walls = self._filter_double_lines_and_thick(walls) | |
| self._wall_thickness = body_thickness | |
| print(f" Walls: {np.count_nonzero(walls)} px " | |
| f"({100*np.count_nonzero(walls)/walls.size:.1f}%)") | |
| return walls | |
| def _estimate_wall_body_thickness(self, binary: np.ndarray, | |
| fallback: int = 12) -> int: | |
| """Exact GeometryAgent._estimate_wall_body_thickness β vectorised column scan.""" | |
| try: | |
| h, w = binary.shape | |
| n_cols = min(200, w) | |
| col_indices = np.linspace(0, w - 1, n_cols, dtype=int) | |
| cols = (binary[:, col_indices] > 0).astype(np.int8) | |
| padded = np.concatenate( | |
| [np.zeros((1, n_cols), dtype=np.int8), cols, | |
| np.zeros((1, n_cols), dtype=np.int8)], axis=0 | |
| ) | |
| diff = np.diff(padded.astype(np.int16), axis=0) | |
| run_lengths = [] | |
| for ci in range(n_cols): | |
| d = diff[:, ci] | |
| starts = np.where(d == 1)[0] | |
| ends = np.where(d == -1)[0] | |
| if len(starts) == 0 or len(ends) == 0: | |
| continue | |
| runs = ends - starts | |
| runs = runs[(runs >= 2) & (runs <= h * 0.15)] | |
| if len(runs): | |
| run_lengths.append(runs) | |
| if run_lengths: | |
| all_runs = np.concatenate(run_lengths) | |
| thickness = int(np.median(all_runs)) | |
| print(f" [WallThickness] Estimated: {thickness} px") | |
| return thickness | |
| except Exception as exc: | |
| print(f" [WallThickness] Estimation failed ({exc}), fallback={fallback}") | |
| return fallback | |
| def _remove_thin_lines(self, walls: np.ndarray, | |
| min_thickness: int) -> np.ndarray: | |
| """Exact GeometryAgent._remove_thin_lines β distance transform CC gate.""" | |
| dist = cv2.distanceTransform(walls, cv2.DIST_L2, 5) | |
| thick_mask = dist >= (min_thickness / 2) | |
| n_lbl, labels, _, _ = cv2.connectedComponentsWithStats(walls, connectivity=8) | |
| if n_lbl <= 1: | |
| return walls | |
| thick_labels = labels[thick_mask] | |
| if len(thick_labels) == 0: | |
| return np.zeros_like(walls) | |
| has_thick = np.zeros(n_lbl, dtype=bool) | |
| has_thick[thick_labels] = True | |
| keep_lut = has_thick.astype(np.uint8) * 255 | |
| keep_lut[0] = 0 | |
| return keep_lut[labels] | |
| def _filter_double_lines_and_thick( | |
| self, | |
| walls: np.ndarray, | |
| min_single_dim: int = 20, | |
| double_line_gap: int = 60, | |
| double_line_search_pct: int = 12, | |
| ) -> np.ndarray: | |
| """ | |
| Exact GeometryAgent._filter_double_lines_and_thick. | |
| Keeps blobs that either: | |
| (a) have min(bbox_w, bbox_h) >= min_single_dim (proper wall body), OR | |
| (b) have a parallel partner blob within double_line_gap px | |
| (double-line wall conventions used in CAD drawings). | |
| """ | |
| n_lbl, labels, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8) | |
| if n_lbl <= 1: | |
| return walls | |
| # Try ximgproc thinning, fall back to morphological skeleton | |
| try: | |
| skel_full = cv2.ximgproc.thinning( | |
| walls, thinningType=cv2.ximgproc.THINNING_ZHANGSUEN | |
| ) | |
| except AttributeError: | |
| skel_full = self._morphological_skeleton(walls) | |
| skel_bin = (skel_full > 0) | |
| keep_ids: set = set() | |
| thin_candidates = [] | |
| for i in range(1, n_lbl): | |
| bw = int(stats[i, cv2.CC_STAT_WIDTH]) | |
| bh = int(stats[i, cv2.CC_STAT_HEIGHT]) | |
| if min(bw, bh) >= min_single_dim: | |
| keep_ids.add(i) | |
| else: | |
| thin_candidates.append(i) | |
| if not thin_candidates: | |
| filtered = np.zeros_like(walls) | |
| for i in keep_ids: | |
| filtered[labels == i] = 255 | |
| print(f" [DblLineFilter] Kept {len(keep_ids)}/{n_lbl-1} blobs " | |
| "(all passed size test)") | |
| return filtered | |
| skel_labels = labels * skel_bin | |
| img_h, img_w = labels.shape | |
| probe_dists = np.arange(3, double_line_gap + 1, 3, dtype=np.float32) | |
| for i in thin_candidates: | |
| blob_skel_ys, blob_skel_xs = np.where(skel_labels == i) | |
| if len(blob_skel_ys) < 4: | |
| continue | |
| step = max(1, len(blob_skel_ys) // 80) | |
| sy = blob_skel_ys[::step].astype(np.float32) | |
| sx = blob_skel_xs[::step].astype(np.float32) | |
| n_s = len(sy) | |
| sy_prev = np.roll(sy, 1); sy_prev[0] = sy[0] | |
| sy_next = np.roll(sy, -1); sy_next[-1] = sy[-1] | |
| sx_prev = np.roll(sx, 1); sx_prev[0] = sx[0] | |
| sx_next = np.roll(sx, -1); sx_next[-1] = sx[-1] | |
| dr = (sy_next - sy_prev).astype(np.float32) | |
| dc = (sx_next - sx_prev).astype(np.float32) | |
| dlen = np.maximum(1.0, np.hypot(dr, dc)) | |
| pr = (-dc / dlen)[:, np.newaxis] | |
| pc = ( dr / dlen)[:, np.newaxis] | |
| for sign in (1.0, -1.0): | |
| rr = np.round(sy[:, np.newaxis] + sign * pr * probe_dists).astype(np.int32) | |
| cc = np.round(sx[:, np.newaxis] + sign * pc * probe_dists).astype(np.int32) | |
| valid = (rr >= 0) & (rr < img_h) & (cc >= 0) & (cc < img_w) | |
| safe_rr = np.clip(rr, 0, img_h - 1) | |
| safe_cc = np.clip(cc, 0, img_w - 1) | |
| lbl_at = labels[safe_rr, safe_cc] | |
| partner_mask = valid & (lbl_at > 0) & (lbl_at != i) | |
| hit_any = partner_mask.any(axis=1) | |
| hit_rows = np.where(hit_any)[0] | |
| if len(hit_rows) == 0: | |
| continue | |
| first_hit_col = partner_mask[hit_rows].argmax(axis=1) | |
| partner_ids = lbl_at[hit_rows, first_hit_col] | |
| keep_ids.update(partner_ids.tolist()) | |
| if 100.0 * len(hit_rows) / n_s >= double_line_search_pct: | |
| keep_ids.add(i) | |
| break | |
| if keep_ids: | |
| keep_arr = np.array(sorted(keep_ids), dtype=np.int32) | |
| keep_lut = np.zeros(n_lbl, dtype=np.uint8) | |
| keep_lut[keep_arr] = 255 | |
| filtered = keep_lut[labels] | |
| else: | |
| filtered = np.zeros_like(walls) | |
| print(f" [DblLineFilter] Kept {len(keep_ids)}/{n_lbl-1} blobs " | |
| f"(min_dim>={min_single_dim}px OR double-line partner found)") | |
| return filtered | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Stage 5b β Remove fixture symbols (original) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _remove_fixtures(self, walls: np.ndarray) -> np.ndarray: | |
| h, w = walls.shape | |
| n, labels, stats, centroids = cv2.connectedComponentsWithStats( | |
| walls, connectivity=8) | |
| if n <= 1: | |
| return walls | |
| bw = stats[1:, cv2.CC_STAT_WIDTH].astype(np.float32) | |
| bh = stats[1:, cv2.CC_STAT_HEIGHT].astype(np.float32) | |
| ar = stats[1:, cv2.CC_STAT_AREA].astype(np.float32) | |
| cx = np.round(centroids[1:, 0]).astype(np.int32) | |
| cy = np.round(centroids[1:, 1]).astype(np.int32) | |
| maxs = np.maximum(bw, bh) | |
| mins = np.minimum(bw, bh) | |
| asp = maxs / (mins + 1e-6) | |
| cand = ((bw < self.FIXTURE_MAX_BLOB_DIM) & (bh < self.FIXTURE_MAX_BLOB_DIM) | |
| & (ar < self.FIXTURE_MAX_AREA) & (asp <= self.FIXTURE_MAX_ASPECT)) | |
| ci = np.where(cand)[0] | |
| if len(ci) == 0: | |
| return walls | |
| heatmap = np.zeros((h, w), dtype=np.float32) | |
| r_heat = int(self.FIXTURE_DENSITY_RADIUS) | |
| for px, py in zip(cx[ci].tolist(), cy[ci].tolist()): | |
| cv2.circle(heatmap, (px, py), r_heat, 1.0, -1) | |
| blur_k = max(3, (r_heat // 2) | 1) | |
| density = cv2.GaussianBlur(heatmap, (blur_k*4+1, blur_k*4+1), blur_k) | |
| d_max = float(density.max()) | |
| if d_max > 0: | |
| density /= d_max | |
| zone = (density >= self.FIXTURE_DENSITY_THRESHOLD).astype(np.uint8) * 255 | |
| n_z, z_labels, z_stats, _ = cv2.connectedComponentsWithStats(zone) | |
| clean = np.zeros_like(zone) | |
| if n_z > 1: | |
| za = z_stats[1:, cv2.CC_STAT_AREA] | |
| kz = np.where(za >= self.FIXTURE_MIN_ZONE_AREA)[0] + 1 | |
| if len(kz): | |
| lut = np.zeros(n_z, np.uint8) | |
| lut[kz] = 255 | |
| clean = lut[z_labels] | |
| zone = clean | |
| valid = (cy[ci].clip(0,h-1) >= 0) & (cx[ci].clip(0,w-1) >= 0) | |
| in_zone = valid & (zone[cy[ci].clip(0,h-1), cx[ci].clip(0,w-1)] > 0) | |
| erase_ids= ci[in_zone] + 1 | |
| result = walls.copy() | |
| if len(erase_ids): | |
| lut = np.zeros(n, np.uint8) | |
| lut[erase_ids] = 1 | |
| result[(lut[labels]).astype(bool)] = 0 | |
| return result | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Stage 5c β Calibrate wall + remove thin lines (original) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _calibrate_wall(self, mask: np.ndarray) -> WallCalibration: | |
| cal = WallCalibration() | |
| h, w = mask.shape | |
| n_cols = min(200, w) | |
| col_idx = np.linspace(0, w-1, n_cols, dtype=int) | |
| runs = [] | |
| max_run = max(2, int(h * 0.05)) | |
| for ci in col_idx: | |
| col = (mask[:, ci] > 0).astype(np.int8) | |
| pad = np.concatenate([[0], col, [0]]) | |
| d = np.diff(pad.astype(np.int16)) | |
| s = np.where(d == 1)[0] | |
| e = np.where(d == -1)[0] | |
| n_ = min(len(s), len(e)) | |
| r = (e[:n_] - s[:n_]).astype(int) | |
| runs.extend(r[(r >= 1) & (r <= max_run)].tolist()) | |
| if runs: | |
| arr = np.array(runs, np.int32) | |
| hist = np.bincount(np.clip(arr, 0, 200)) | |
| cal.stroke_width = max(2, int(np.argmax(hist[1:])) + 1) | |
| cal.min_component_dim = max(15, cal.stroke_width * 10) | |
| cal.min_component_area = max(30, cal.stroke_width * cal.min_component_dim // 2) | |
| gap_sizes = [] | |
| row_step = max(3, h // 200) | |
| col_step = max(3, w // 200) | |
| for row in range(5, h-5, row_step): | |
| rd = (mask[row, :] > 0).astype(np.int8) | |
| pad = np.concatenate([[0], rd, [0]]) | |
| dif = np.diff(pad.astype(np.int16)) | |
| ends = np.where(dif == -1)[0] | |
| starts = np.where(dif == 1)[0] | |
| for e in ends: | |
| nxt = starts[starts > e] | |
| if len(nxt): | |
| g = int(nxt[0] - e) | |
| if 1 < g < 200: | |
| gap_sizes.append(g) | |
| for col in range(5, w-5, col_step): | |
| cd = (mask[:, col] > 0).astype(np.int8) | |
| pad = np.concatenate([[0], cd, [0]]) | |
| dif = np.diff(pad.astype(np.int16)) | |
| ends = np.where(dif == -1)[0] | |
| starts = np.where(dif == 1)[0] | |
| for e in ends: | |
| nxt = starts[starts > e] | |
| if len(nxt): | |
| g = int(nxt[0] - e) | |
| if 1 < g < 200: | |
| gap_sizes.append(g) | |
| cal.bridge_min_gap = 2 | |
| if len(gap_sizes) >= 20: | |
| g = np.array(gap_sizes) | |
| sm = g[g <= 30] | |
| if len(sm) >= 10: | |
| cal.bridge_max_gap = int(np.clip(np.percentile(sm, 75), 4, 20)) | |
| else: | |
| cal.bridge_max_gap = cal.stroke_width * 4 | |
| door = g[(g > cal.bridge_max_gap) & (g <= 80)] | |
| if len(door) >= 5: | |
| raw = int(np.percentile(door, 90)) | |
| else: | |
| raw = max(35, cal.stroke_width * 12) | |
| raw = int(np.clip(raw, 25, 80)) | |
| cal.door_gap = raw if raw % 2 == 1 else raw + 1 | |
| cal.max_bridge_thick = cal.stroke_width * 5 | |
| self._wall_thickness = cal.stroke_width | |
| return cal | |
| def _remove_thin_lines_calibrated(self, walls: np.ndarray) -> np.ndarray: | |
| cal = self._wall_cal | |
| n, cc, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8) | |
| if n <= 1: | |
| return walls | |
| bw = stats[1:, cv2.CC_STAT_WIDTH] | |
| bh = stats[1:, cv2.CC_STAT_HEIGHT] | |
| ar = stats[1:, cv2.CC_STAT_AREA] | |
| mx = np.maximum(bw, bh) | |
| keep = (mx >= cal.min_component_dim) | (ar >= cal.min_component_area * 3) | |
| lut = np.zeros(n, np.uint8) | |
| lut[1:] = keep.astype(np.uint8) * 255 | |
| return lut[cc] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Stage 5d β Bridge endpoints (original) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _skel(self, binary: np.ndarray) -> np.ndarray: | |
| if _SKIMAGE: | |
| return (_sk_skel(binary > 0) * 255).astype(np.uint8) | |
| return self._morphological_skeleton(binary) | |
| def _morphological_skeleton(self, binary: np.ndarray) -> np.ndarray: | |
| skel = np.zeros_like(binary) | |
| img = binary.copy() | |
| cross = cv2.getStructuringElement(cv2.MORPH_CROSS, (3,3)) | |
| for _ in range(300): | |
| eroded = cv2.erode(img, cross) | |
| temp = cv2.subtract(img, cv2.dilate(eroded, cross)) | |
| skel = cv2.bitwise_or(skel, temp) | |
| img = eroded | |
| if not cv2.countNonZero(img): | |
| break | |
| return skel | |
| def _tip_pixels(self, skel: np.ndarray): | |
| sb = (skel > 0).astype(np.float32) | |
| nbr = cv2.filter2D(sb, -1, np.ones((3,3), np.float32), | |
| borderType=cv2.BORDER_CONSTANT) | |
| return np.where((sb == 1) & (nbr.astype(np.int32) == 2)) | |
| def _outward_vectors(self, ex, ey, skel, lookahead): | |
| n = len(ex) | |
| odx = np.zeros(n, np.float32) | |
| ody = np.zeros(n, np.float32) | |
| sy, sx = np.where(skel > 0) | |
| skel_set = set(zip(sx.tolist(), sy.tolist())) | |
| D8 = [(-1,0),(1,0),(0,-1),(0,1),(-1,-1),(-1,1),(1,-1),(1,1)] | |
| for i in range(n): | |
| ox, oy = int(ex[i]), int(ey[i]) | |
| cx, cy = ox, oy | |
| px, py = ox, oy | |
| for _ in range(lookahead): | |
| moved = False | |
| for dx, dy in D8: | |
| nx_, ny_ = cx+dx, cy+dy | |
| if (nx_, ny_) == (px, py): | |
| continue | |
| if (nx_, ny_) in skel_set: | |
| px, py = cx, cy | |
| cx, cy = nx_, ny_ | |
| moved = True | |
| break | |
| if not moved: | |
| break | |
| ix, iy = float(cx-ox), float(cy-oy) | |
| nr = max(1e-6, np.hypot(ix, iy)) | |
| odx[i], ody[i] = -ix/nr, -iy/nr | |
| return odx, ody | |
| def _bridge_endpoints(self, walls: np.ndarray) -> np.ndarray: | |
| cal = self._wall_cal | |
| result = walls.copy() | |
| h, w = walls.shape | |
| FCOS = np.cos(np.radians(70.0)) | |
| skel = self._skel(walls) | |
| ey, ex = self._tip_pixels(skel) | |
| n_ep = len(ey) | |
| if n_ep < 2: | |
| return result | |
| _, cc_map = cv2.connectedComponents(walls, connectivity=8) | |
| ep_cc = cc_map[ey, ex] | |
| lookahead = max(8, cal.stroke_width * 3) | |
| out_dx, out_dy = self._outward_vectors(ex, ey, skel, lookahead) | |
| pts = np.stack([ex, ey], axis=1).astype(np.float32) | |
| if _SCIPY: | |
| pairs = cKDTree(pts).query_pairs(float(cal.bridge_max_gap), output_type='ndarray') | |
| ii = pairs[:,0].astype(np.int64) | |
| jj = pairs[:,1].astype(np.int64) | |
| else: | |
| _ii, _jj = np.triu_indices(n_ep, k=1) | |
| ok = np.hypot(pts[_jj,0]-pts[_ii,0], pts[_jj,1]-pts[_ii,1]) <= cal.bridge_max_gap | |
| ii = _ii[ok].astype(np.int64) | |
| jj = _jj[ok].astype(np.int64) | |
| if len(ii) == 0: | |
| return result | |
| dxij = pts[jj,0] - pts[ii,0] | |
| dyij = pts[jj,1] - pts[ii,1] | |
| dists = np.hypot(dxij, dyij) | |
| safe = np.maximum(dists, 1e-6) | |
| ux, uy = dxij/safe, dyij/safe | |
| ang = np.degrees(np.arctan2(np.abs(dyij), np.abs(dxij))) | |
| is_H = ang <= 15.0 | |
| is_V = ang >= 75.0 | |
| g1 = (dists >= cal.bridge_min_gap) & (dists <= cal.bridge_max_gap) | |
| g2 = is_H | is_V | |
| g3 = ((out_dx[ii]*ux + out_dy[ii]*uy) >= FCOS) & \ | |
| ((out_dx[jj]*-ux + out_dy[jj]*-uy) >= FCOS) | |
| g4 = ep_cc[ii] != ep_cc[jj] | |
| pre_ok = g1 & g2 & g3 & g4 | |
| pre_idx = np.where(pre_ok)[0] | |
| N_SAMP = 9 | |
| clr = np.ones(len(pre_idx), dtype=bool) | |
| for k, pidx in enumerate(pre_idx): | |
| ia, ib = int(ii[pidx]), int(jj[pidx]) | |
| ax, ay = int(ex[ia]), int(ey[ia]) | |
| bx, by = int(ex[ib]), int(ey[ib]) | |
| if is_H[pidx]: | |
| xs = np.linspace(ax, bx, N_SAMP, np.float32) | |
| ys = np.full(N_SAMP, ay, np.float32) | |
| else: | |
| xs = np.full(N_SAMP, ax, np.float32) | |
| ys = np.linspace(ay, by, N_SAMP, np.float32) | |
| sxs = np.clip(np.round(xs[1:-1]).astype(np.int32), 0, w-1) | |
| sys_ = np.clip(np.round(ys[1:-1]).astype(np.int32), 0, h-1) | |
| if np.any(walls[sys_, sxs] > 0): | |
| clr[k] = False | |
| valid = pre_idx[clr] | |
| if len(valid) == 0: | |
| return result | |
| vi = ii[valid]; vj = jj[valid] | |
| vd = dists[valid]; vH = is_H[valid] | |
| order = np.argsort(vd) | |
| vi, vj, vd, vH = vi[order], vj[order], vd[order], vH[order] | |
| used = np.zeros(n_ep, dtype=bool) | |
| for k in range(len(vi)): | |
| ia, ib = int(vi[k]), int(vj[k]) | |
| if used[ia] or used[ib]: | |
| continue | |
| ax, ay = int(ex[ia]), int(ey[ia]) | |
| bx, by = int(ex[ib]), int(ey[ib]) | |
| p1, p2 = ((min(ax,bx),ay),(max(ax,bx),ay)) if vH[k] \ | |
| else ((ax,min(ay,by)),(ax,max(ay,by))) | |
| cv2.line(result, p1, p2, 255, cal.stroke_width) | |
| used[ia] = used[ib] = True | |
| return result | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Stage 5e β Close door openings (original) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _close_door_openings(self, walls: np.ndarray) -> np.ndarray: | |
| cal = self._wall_cal | |
| gap = cal.door_gap | |
| def _shape_close(mask, kwh, axis, max_thick): | |
| k = cv2.getStructuringElement(cv2.MORPH_RECT, kwh) | |
| cls = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k) | |
| new = cv2.bitwise_and(cls, cv2.bitwise_not(mask)) | |
| if not np.any(new): | |
| return np.zeros_like(mask) | |
| n, lbl, stats, _ = cv2.connectedComponentsWithStats(new, connectivity=8) | |
| if n <= 1: | |
| return np.zeros_like(mask) | |
| perp = stats[1:, cv2.CC_STAT_HEIGHT if axis == 'H' else cv2.CC_STAT_WIDTH] | |
| keep = perp <= max_thick | |
| lut = np.zeros(n, np.uint8) | |
| lut[1:] = keep.astype(np.uint8) * 255 | |
| return lut[lbl] | |
| add_h = _shape_close(walls, (gap,1), 'H', cal.max_bridge_thick) | |
| add_v = _shape_close(walls, (1,gap), 'V', cal.max_bridge_thick) | |
| return cv2.bitwise_or(walls, cv2.bitwise_or(add_h, add_v)) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Stage 5f β Remove dangling lines (original) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _remove_dangling(self, walls: np.ndarray) -> np.ndarray: | |
| stroke = self._wall_cal.stroke_width if self._wall_cal else self._wall_thickness | |
| connect_radius = max(6, stroke * 3) | |
| n, cc_map, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8) | |
| if n <= 1: | |
| return walls | |
| skel = self._skel(walls) | |
| tip_y, tip_x = self._tip_pixels(skel) | |
| tip_cc = cc_map[tip_y, tip_x] | |
| free_counts = np.zeros(n, np.int32) | |
| for i in range(len(tip_x)): | |
| free_counts[tip_cc[i]] += 1 | |
| remove = np.zeros(n, dtype=bool) | |
| for cc_id in range(1, n): | |
| if free_counts[cc_id] < 2: | |
| continue | |
| bw_ = int(stats[cc_id, cv2.CC_STAT_WIDTH]) | |
| bh_ = int(stats[cc_id, cv2.CC_STAT_HEIGHT]) | |
| if max(bw_, bh_) > stroke * 40: | |
| continue | |
| comp = (cc_map == cc_id).astype(np.uint8) | |
| dcomp = cv2.dilate(comp, cv2.getStructuringElement( | |
| cv2.MORPH_ELLIPSE, (connect_radius*2+1, connect_radius*2+1))) | |
| overlap = cv2.bitwise_and( | |
| dcomp, ((walls > 0) & (cc_map != cc_id)).astype(np.uint8)) | |
| if np.count_nonzero(overlap) == 0: | |
| remove[cc_id] = True | |
| lut = np.ones(n, np.uint8); lut[0] = 0; lut[remove] = 0 | |
| return (lut[cc_map] * 255).astype(np.uint8) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Stage 5g β Close large gaps (original) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _close_large_gaps(self, walls: np.ndarray) -> np.ndarray: | |
| DOOR_MIN_GAP = 180 | |
| DOOR_MAX_GAP = 320 | |
| ANGLE_TOL_DEG = 12.0 | |
| FCOS = np.cos(np.radians(90.0 - ANGLE_TOL_DEG)) | |
| stroke = self._wall_cal.stroke_width if self._wall_cal else self._wall_thickness | |
| line_width = max(stroke, 3) | |
| result = walls.copy() | |
| h, w = walls.shape | |
| skel = self._skel(walls) | |
| tip_y, tip_x = self._tip_pixels(skel) | |
| n_ep = len(tip_x) | |
| if n_ep < 2: | |
| return result | |
| _, cc_map = cv2.connectedComponents(walls, connectivity=8) | |
| ep_cc = cc_map[tip_y, tip_x] | |
| lookahead = max(12, stroke * 4) | |
| out_dx, out_dy = self._outward_vectors(tip_x, tip_y, skel, lookahead) | |
| pts = np.stack([tip_x, tip_y], axis=1).astype(np.float32) | |
| if _SCIPY: | |
| pairs = cKDTree(pts).query_pairs(float(DOOR_MAX_GAP), output_type='ndarray') | |
| ii = pairs[:,0].astype(np.int64) | |
| jj = pairs[:,1].astype(np.int64) | |
| else: | |
| _ii, _jj = np.triu_indices(n_ep, k=1) | |
| ok = np.hypot(pts[_jj,0]-pts[_ii,0], pts[_jj,1]-pts[_ii,1]) <= DOOR_MAX_GAP | |
| ii = _ii[ok].astype(np.int64) | |
| jj = _jj[ok].astype(np.int64) | |
| if len(ii) == 0: | |
| return result | |
| dxij = pts[jj,0] - pts[ii,0] | |
| dyij = pts[jj,1] - pts[ii,1] | |
| dists = np.hypot(dxij, dyij) | |
| safe = np.maximum(dists, 1e-6) | |
| ux, uy = dxij/safe, dyij/safe | |
| ang = np.degrees(np.arctan2(np.abs(dyij), np.abs(dxij))) | |
| is_H = ang <= ANGLE_TOL_DEG | |
| is_V = ang >= (90.0 - ANGLE_TOL_DEG) | |
| g1 = (dists >= DOOR_MIN_GAP) & (dists <= DOOR_MAX_GAP) | |
| g2 = is_H | is_V | |
| g3 = ((out_dx[ii]*ux + out_dy[ii]*uy) >= FCOS) & \ | |
| ((out_dx[jj]*-ux + out_dy[jj]*-uy) >= FCOS) | |
| g4 = ep_cc[ii] != ep_cc[jj] | |
| pre_ok = g1 & g2 & g3 & g4 | |
| pre_idx = np.where(pre_ok)[0] | |
| N_SAMP = 15 | |
| clr = np.ones(len(pre_idx), dtype=bool) | |
| for k, pidx in enumerate(pre_idx): | |
| ia, ib = int(ii[pidx]), int(jj[pidx]) | |
| ax, ay = int(tip_x[ia]), int(tip_y[ia]) | |
| bx, by = int(tip_x[ib]), int(tip_y[ib]) | |
| if is_H[pidx]: | |
| xs = np.linspace(ax, bx, N_SAMP, np.float32) | |
| ys = np.full(N_SAMP, (ay+by)/2.0, np.float32) | |
| else: | |
| xs = np.full(N_SAMP, (ax+bx)/2.0, np.float32) | |
| ys = np.linspace(ay, by, N_SAMP, np.float32) | |
| sxs = np.clip(np.round(xs[1:-1]).astype(np.int32), 0, w-1) | |
| sys_ = np.clip(np.round(ys[1:-1]).astype(np.int32), 0, h-1) | |
| if np.any(walls[sys_, sxs] > 0): | |
| clr[k] = False | |
| valid = pre_idx[clr] | |
| if len(valid) == 0: | |
| return result | |
| vi = ii[valid]; vj = jj[valid] | |
| vd = dists[valid]; vH = is_H[valid] | |
| order = np.argsort(vd) | |
| vi, vj, vd, vH = vi[order], vj[order], vd[order], vH[order] | |
| used = np.zeros(n_ep, dtype=bool) | |
| for k in range(len(vi)): | |
| ia, ib = int(vi[k]), int(vj[k]) | |
| if used[ia] or used[ib]: | |
| continue | |
| ax, ay = int(tip_x[ia]), int(tip_y[ia]) | |
| bx, by = int(tip_x[ib]), int(tip_y[ib]) | |
| if vH[k]: | |
| p1 = (min(ax,bx), (ay+by)//2) | |
| p2 = (max(ax,bx), (ay+by)//2) | |
| else: | |
| p1 = ((ax+bx)//2, min(ay,by)) | |
| p2 = ((ax+bx)//2, max(ay,by)) | |
| cv2.line(result, p1, p2, 255, line_width) | |
| used[ia] = used[ib] = True | |
| return result | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Stage 7 β Flood-fill segmentation (original) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _segment_rooms(self, walls: np.ndarray) -> np.ndarray: | |
| h, w = walls.shape | |
| walls = walls.copy() | |
| walls[:5,:] = 255; walls[-5:,:] = 255 | |
| walls[:,:5] = 255; walls[:,-5:] = 255 | |
| filled = walls.copy() | |
| mask = np.zeros((h+2, w+2), np.uint8) | |
| for sx, sy in [(0,0),(w-1,0),(0,h-1),(w-1,h-1), | |
| (w//2,0),(w//2,h-1),(0,h//2),(w-1,h//2)]: | |
| if filled[sy, sx] == 0: | |
| cv2.floodFill(filled, mask, (sx, sy), 255) | |
| rooms = cv2.bitwise_not(filled) | |
| rooms = cv2.bitwise_and(rooms, cv2.bitwise_not(walls)) | |
| rooms = cv2.morphologyEx(rooms, cv2.MORPH_OPEN, np.ones((2,2), np.uint8)) | |
| return rooms | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Stage 7 (optional) β SAM segmentation (GPU Torch) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _segment_with_sam(self, orig_bgr: np.ndarray, | |
| walls: np.ndarray) -> Optional[np.ndarray]: | |
| """GPU SAM pass; returns mask or None to trigger flood-fill fallback.""" | |
| if not _TORCH_CUDA: | |
| return None | |
| predictor = self._get_sam_predictor() | |
| if predictor is None: | |
| return None | |
| try: | |
| import torch | |
| h, w = walls.shape | |
| flood = self._segment_rooms(walls) | |
| n, labels, stats, centroids = cv2.connectedComponentsWithStats( | |
| cv2.bitwise_not(walls), 8) | |
| pos_pts = [] | |
| for i in range(1, n): | |
| if int(stats[i, cv2.CC_STAT_AREA]) < 300: | |
| continue | |
| bx,by,bw,bh = (int(stats[i,cv2.CC_STAT_LEFT]), | |
| int(stats[i,cv2.CC_STAT_TOP]), | |
| int(stats[i,cv2.CC_STAT_WIDTH]), | |
| int(stats[i,cv2.CC_STAT_HEIGHT])) | |
| if bx<=5 and by<=5 and bx+bw>=w-5 and by+bh>=h-5: | |
| continue | |
| cx_ = int(np.clip(centroids[i][0], 0, w-1)) | |
| cy_ = int(np.clip(centroids[i][1], 0, h-1)) | |
| if walls[cy_, cx_] > 0: | |
| continue | |
| pos_pts.append((cx_, cy_)) | |
| if not pos_pts: | |
| return None | |
| rgb = cv2.cvtColor(orig_bgr, cv2.COLOR_BGR2RGB) | |
| predictor.set_image(rgb) | |
| sam_mask = np.zeros((h,w), np.uint8) | |
| dk = cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(5,5)) | |
| for px, py in pos_pts: | |
| pi = np.array([[px,py]], np.float32) | |
| pl = np.array([1], np.int32) | |
| with torch.inference_mode(): | |
| masks, scores, _ = predictor.predict( | |
| point_coords=pi, point_labels=pl, multimask_output=True) | |
| best = int(np.argmax(scores)) | |
| if float(scores[best]) < 0.70: | |
| continue | |
| m = (masks[best]>0).astype(np.uint8)*255 | |
| m = cv2.bitwise_and(m, flood) | |
| m = cv2.morphologyEx(m, cv2.MORPH_OPEN, dk) | |
| if np.any(m): | |
| sam_mask = cv2.bitwise_or(sam_mask, m) | |
| return sam_mask if np.any(sam_mask) else None | |
| except Exception as exc: | |
| import traceback | |
| print(f"[SAM] Error: {exc}\n{traceback.format_exc()}") | |
| return None | |
| _sam_predictor_cache = None | |
| def _get_sam_predictor(self): | |
| if WallPipeline._sam_predictor_cache is not None: | |
| return WallPipeline._sam_predictor_cache | |
| ckpt = self._sam_checkpoint | |
| if not ckpt or not os.path.isfile(ckpt): | |
| ckpt = self._download_sam_checkpoint() | |
| if not ckpt or not os.path.isfile(ckpt): | |
| return None | |
| try: | |
| from segment_anything import sam_model_registry, SamPredictor | |
| name = os.path.basename(ckpt).lower() | |
| mtype = ("vit_h" if "vit_h" in name else | |
| "vit_l" if "vit_l" in name else | |
| "vit_b" if "vit_b" in name else "vit_h") | |
| import torch | |
| sam = sam_model_registry[mtype](checkpoint=ckpt) | |
| sam.to(device="cuda"); sam.eval() | |
| WallPipeline._sam_predictor_cache = SamPredictor(sam) | |
| print(f"[SAM] {mtype} loaded on cuda") | |
| except Exception as exc: | |
| print(f"[SAM] Load failed: {exc}") | |
| WallPipeline._sam_predictor_cache = None | |
| return WallPipeline._sam_predictor_cache | |
| def _download_sam_checkpoint() -> str: | |
| import os | |
| dest = os.path.join(".models", "sam", "sam_vit_h_4b8939.pth") | |
| if os.path.isfile(dest): | |
| return dest | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| os.makedirs(os.path.dirname(dest), exist_ok=True) | |
| path = hf_hub_download( | |
| repo_id="facebook/sam-vit-huge", | |
| filename="sam_vit_h_4b8939.pth", | |
| local_dir=os.path.dirname(dest)) | |
| return path | |
| except Exception as exc: | |
| print(f"[SAM] Download failed: {exc}") | |
| return "" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Stage 8 β Filter room regions (original) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _filter_rooms(self, rooms_mask: np.ndarray, | |
| img_shape: Tuple) -> Tuple[np.ndarray, List]: | |
| h, w = img_shape[:2] | |
| img_area = float(h * w) | |
| min_area = img_area * self.MIN_ROOM_AREA_FRAC | |
| max_area = img_area * self.MAX_ROOM_AREA_FRAC | |
| min_dim = w * self.MIN_ROOM_DIM_FRAC | |
| margin = max(5.0, w * self.BORDER_MARGIN_FRAC) | |
| contours, _ = cv2.findContours(rooms_mask, cv2.RETR_EXTERNAL, | |
| cv2.CHAIN_APPROX_SIMPLE) | |
| if not contours: | |
| return np.zeros_like(rooms_mask), [] | |
| valid_mask = np.zeros_like(rooms_mask) | |
| valid_rooms = [] | |
| for cnt in contours: | |
| area = cv2.contourArea(cnt) | |
| if not (min_area <= area <= max_area): | |
| continue | |
| bx, by, bw, bh = cv2.boundingRect(cnt) | |
| if bx < margin or by < margin or bx+bw > w-margin or by+bh > h-margin: | |
| continue | |
| if not (bw >= min_dim or bh >= min_dim): | |
| continue | |
| asp = max(bw,bh) / (min(bw,bh) + 1e-6) | |
| if asp > self.MAX_ASPECT_RATIO: | |
| continue | |
| if (area / (bw*bh + 1e-6)) < self.MIN_EXTENT: | |
| continue | |
| hull = cv2.convexHull(cnt) | |
| ha = cv2.contourArea(hull) | |
| if ha > 0 and (area / ha) < self.MIN_SOLIDITY: | |
| continue | |
| cv2.drawContours(valid_mask, [cnt], -1, 255, -1) | |
| valid_rooms.append(cnt) | |
| return valid_mask, valid_rooms | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Wand β click-to-segment (original) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def wand_segment(self, walls: np.ndarray, click_x: int, click_y: int, | |
| existing_rooms: List[Dict]) -> Optional[Dict]: | |
| """Flood-fill from click point β return new room dict or None.""" | |
| h, w = walls.shape | |
| if not (0 <= click_x < w and 0 <= click_y < h): | |
| return None | |
| if walls[click_y, click_x] > 0: | |
| return None # clicked on a wall | |
| tmp = walls.copy() | |
| tmp[:5,:] = 255; tmp[-5:,:] = 255 | |
| tmp[:,:5] = 255; tmp[:,-5:] = 255 | |
| filled = tmp.copy() | |
| mask = np.zeros((h+2, w+2), np.uint8) | |
| for sx, sy in [(0,0),(w-1,0),(0,h-1),(w-1,h-1), | |
| (w//2,0),(w//2,h-1),(0,h//2),(w-1,h//2)]: | |
| if filled[sy, sx] == 0: | |
| cv2.floodFill(filled, mask, (sx, sy), 255) | |
| rooms = cv2.bitwise_not(filled) | |
| rooms = cv2.bitwise_and(rooms, cv2.bitwise_not(tmp)) | |
| if rooms[click_y, click_x] == 0: | |
| return None | |
| ff_mask = rooms.copy() | |
| fill_mask = np.zeros((h+2, w+2), np.uint8) | |
| cv2.floodFill(ff_mask, fill_mask, (click_x, click_y), 128) | |
| room_mask = (ff_mask == 128).astype(np.uint8) * 255 | |
| area = float(np.count_nonzero(room_mask)) | |
| if area < 100: | |
| return None | |
| contours, _ = cv2.findContours(room_mask, cv2.RETR_EXTERNAL, | |
| cv2.CHAIN_APPROX_SIMPLE) | |
| if not contours: | |
| return None | |
| cnt = max(contours, key=cv2.contourArea) | |
| bx, by, bw, bh = cv2.boundingRect(cnt) | |
| M = cv2.moments(cnt) | |
| cx = int(M["m10"]/M["m00"]) if M["m00"] else bx+bw//2 | |
| cy = int(M["m01"]/M["m00"]) if M["m00"] else by+bh//2 | |
| flat_seg = cnt[:,0,:].tolist() | |
| flat_seg = [v for pt in flat_seg for v in pt] | |
| new_id = max((r["id"] for r in existing_rooms), default=0) + 1 | |
| return { | |
| "id" : new_id, | |
| "label" : f"Room {new_id}", | |
| "segmentation": [flat_seg], | |
| "area" : area, | |
| "bbox" : [bx, by, bw, bh], | |
| "centroid" : [cx, cy], | |
| "confidence" : 0.90, | |
| "isWand" : True, | |
| } | |
| import os |