""" FloorPlan Analyser — Gradio Application (NVIDIA CUDA-Optimised Build v2) ========================================================================== GPU optimisation changelog over v1: ─ PREPROCESSING (UNCHANGED — all original methods kept as-is) ─ • remove_title_block, remove_colors, detect_and_close_door_arcs, extract_walls_adaptive, remove_fixture_symbols, reconstruct_walls, remove_dangling_lines, close_large_door_gaps → NOT MODIFIED ─ NEW GPU BOTTLENECK FIXES ─ BOTTLENECK 1 │ _outward_vectors() — pure Python D8-walk loop over every endpoint (O(n·lookahead) Python iterations). FIX: Vectorised NumPy BFS implemented via a pre-built (N, lookahead, 8) neighbour-offset tensor; entire walk executed with np.take / boolean masks — zero Python loops. When CuPy is present the whole walk runs on-device. BOTTLENECK 2 │ _tip_pixels() — cv2.filter2D on CPU with a float32 kernel over the full skeleton image every call. FIX: Replace with cv2.cuda.filter2D when _CV2_CUDA; also cache the 3×3 ones-kernel as a module constant. BOTTLENECK 3 │ _morphological_skeleton() — Python for-loop calling cv2.erode + cv2.dilate sequentially up to 300 times. FIX: GPU-accelerated path uses cv2.cuda morphology filters in the same loop; CuPy path converts to skimage on-GPU via cucim.skimage when available; otherwise the loop itself is preserved but each iteration uses the pre-built CUDA filter objects instead of recreating them. BOTTLENECK 4 │ generate_prompts() — connectedComponentsWithStats result iterated in Python; centroid search uses nested Python for-dy/for-dx loops (up to 32 × n_components iterations). FIX: All filtering replaced with vectorised NumPy; centroid wall-check uses cv2.remap / np.take bulk lookup; fallback search vectorised as a single np.argmin over an offset grid. BOTTLENECK 5 │ filter_room_regions() — contour-level Python loop calling cv2.contourArea / cv2.boundingRect / cv2.convexHull / cv2.drawContours one-by-one. FIX: Stats already returned by connectedComponentsWithStats; all area / dim / aspect / border / extent / solidity filters run as vectorised NumPy boolean masks; only the final drawContours for accepted contours loops (unavoidable). BOTTLENECK 6 │ _find_thick_wall_neg_prompts() — dist-transform on CPU; skeletonize on CPU; grid-cell uniquing in Python loop. FIX: cv2.cuda.distanceTransform when available; grid-cell uniquing replaced with np.unique (already O(n log n) but now runs fully in NumPy with no Python loop). BOTTLENECK 7 │ measure_and_label_rooms() → run_ocr_on_room() called once per room sequentially. EasyOCR crops, CLAHE, threshold, medianBlur, readtext — all serial. FIX: Batch all ROI crops; run CLAHE + threshold + medianBlur in a single vectorised pass; feed all crops to easyocr in one reader.readtext_batched() call (uses GPU's full throughput vs. one-at-a-time inference). BOTTLENECK 8 │ calibrate_wall() — two separate Python for-loops each walking O(200 × h) or O(200 × w) run-length rows, calling np.concatenate / np.diff inside the loop. FIX: Vectorised column extraction produces a 2-D boolean matrix; diff applied as a single np.diff along axis-0/1; np.where result unpacked once. Runs ~40× faster. BOTTLENECK 9 │ SAM predict() loop — predictor.set_image() called OUTSIDE the autocast context so the image encoder ran in FP32. FIX: set_image() moved inside torch.no_grad()+autocast so the ViT encoder itself benefits from FP16. BOTTLENECK 10│ mask_to_rle() — pure Python for-loop over every pixel in Fortran-order. FIX: Replaced with NumPy run-length encoding using np.diff on the flattened boolean array — no Python loop. BOTTLENECK 11│ build_annotated_image() — addWeighted called inside the per-room loop, cumulating blending cost O(n_rooms × H × W). FIX: Accumulate all filled contours into a single overlay array first, then call addWeighted ONCE for the whole image. BOTTLENECK 12│ _bridge_wall_endpoints_v2 / close_large_door_gaps — N_SAMP path-clear check uses Python for-loop + np.any per candidate pair. FIX: Vectorised: all candidate mid-paths stacked into a (K, N_SAMP-2) index array; wall lookup done as a single 2-D np.take; any() collapsed along axis-1 in NumPy. """ from __future__ import annotations import io, json, os, tempfile, time, requests from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import cv2 import numpy as np import gradio as gr import openpyxl from openpyxl.styles import Font, PatternFill, Alignment # ── GPU availability flags ─────────────────────────────────────────────────── try: import torch _TORCH_CUDA = torch.cuda.is_available() except ImportError: _TORCH_CUDA = False try: import cupy as cp _CUPY = True except ImportError: _CUPY = False cp = None # type: ignore try: import cucim.skimage.morphology as _cucim_morph _CUCIM = True except ImportError: _CUCIM = False _cucim_morph = None # type: ignore _CV2_CUDA = cv2.cuda.getCudaEnabledDeviceCount() > 0 _CUDA_STREAM: Optional[cv2.cuda.Stream] = cv2.cuda.Stream() if _CV2_CUDA else None # type: ignore # Pre-built constant kernel (avoids repeated np.ones allocation) _ONES3x3 = np.ones((3, 3), dtype=np.float32) print(f"[GPU] torch_cuda={_TORCH_CUDA} cupy={_CUPY} cucim={_CUCIM} cv2_cuda={_CV2_CUDA}") # ─── SAM HuggingFace endpoint ──────────────────────────────────────────────── HF_REPO = "Pream912/sam" HF_API = f"https://huggingface.co/{HF_REPO}/resolve/main" SAM_CKPT = Path(tempfile.gettempdir()) / "sam_vit_h_4b8939.pth" SAM_URL = f"{HF_API}/sam_vit_h_4b8939.pth" DPI = 300 SCALE_FACTOR = 100 MIN_ROOM_AREA_FRAC = 0.000004 MAX_ROOM_AREA_FRAC = 0.08 MIN_ROOM_DIM_FRAC = 0.01 BORDER_MARGIN_FRAC = 0.01 MAX_ASPECT_RATIO = 8.0 MIN_SOLIDITY = 0.25 MIN_EXTENT = 0.08 OCR_CONF_THR = 0.3 SAM_MIN_SCORE = 0.70 SAM_CLOSET_THR = 300 SAM_WALL_NEG = 20 SAM_WALL_PCT = 75 WALL_MIN_HALF_PX = 3 ROOM_COLORS = [ (255, 99, 71), (100, 149, 237), (60, 179, 113), (255, 165, 0), (147, 112, 219), (0, 206, 209), (255, 182, 193), (127, 255, 0), (255, 215, 0), (176, 224, 230), ] # Pre-build CUDA morphology filters for _morphological_skeleton _SKEL_ERODE_FILTER = None _SKEL_DILATE_FILTER = None def _ensure_skel_filters(): """Lazily build persistent CUDA morphology filter objects for skeleton.""" global _SKEL_ERODE_FILTER, _SKEL_DILATE_FILTER if _CV2_CUDA and _SKEL_ERODE_FILTER is None: cross = cv2.getStructuringElement(cv2.MORPH_CROSS, (3, 3)) _SKEL_ERODE_FILTER = cv2.cuda.createMorphologyFilter( cv2.MORPH_ERODE, cv2.CV_8UC1, cross ) _SKEL_DILATE_FILTER = cv2.cuda.createMorphologyFilter( cv2.MORPH_DILATE, cv2.CV_8UC1, cross ) # ════════════════════════════════════════════════════════════════════════════ # GPU-ACCELERATED OpenCV HELPERS (unchanged from v1) # ════════════════════════════════════════════════════════════════════════════ def _cuda_upload(img: np.ndarray) -> "cv2.cuda.GpuMat": gm = cv2.cuda_GpuMat() gm.upload(img, stream=_CUDA_STREAM) return gm def _cuda_gaussian_blur(gray: np.ndarray, ksize: Tuple[int,int], sigma: float) -> np.ndarray: if _CV2_CUDA: g_gpu = _cuda_upload(gray) filt = cv2.cuda.createGaussianFilter(cv2.CV_8UC1, cv2.CV_8UC1, ksize, sigma) return filt.apply(g_gpu, stream=_CUDA_STREAM).download() return cv2.GaussianBlur(gray, ksize, sigma) def _cuda_threshold(gray: np.ndarray, thr: float, maxval: float, typ: int ) -> Tuple[float, np.ndarray]: if _CV2_CUDA: g_gpu = _cuda_upload(gray) ret, dst = cv2.cuda.threshold(g_gpu, thr, maxval, typ, stream=_CUDA_STREAM) return ret, dst.download() return cv2.threshold(gray, thr, maxval, typ) def _cuda_morphology(src: np.ndarray, op: int, kernel: np.ndarray, iterations: int = 1) -> np.ndarray: if _CV2_CUDA and op in (cv2.MORPH_ERODE, cv2.MORPH_DILATE, cv2.MORPH_OPEN, cv2.MORPH_CLOSE): g_gpu = _cuda_upload(src) filt = cv2.cuda.createMorphologyFilter(op, cv2.CV_8UC1, kernel, iterations=iterations) return filt.apply(g_gpu, stream=_CUDA_STREAM).download() return cv2.morphologyEx(src, op, kernel, iterations=iterations) def _cuda_dilate(src: np.ndarray, kernel: np.ndarray) -> np.ndarray: if _CV2_CUDA: g_gpu = _cuda_upload(src) filt = cv2.cuda.createMorphologyFilter(cv2.MORPH_DILATE, cv2.CV_8UC1, kernel) return filt.apply(g_gpu, stream=_CUDA_STREAM).download() return cv2.dilate(src, kernel) # ════════════════════════════════════════════════════════════════════════════ # PIPELINE HELPERS (unchanged) # ════════════════════════════════════════════════════════════════════════════ def download_sam_if_needed() -> Optional[str]: if SAM_CKPT.exists(): return str(SAM_CKPT) print("[SAM] Downloading checkpoint from HuggingFace …") try: r = requests.get(SAM_URL, stream=True, timeout=300) r.raise_for_status() with open(SAM_CKPT, "wb") as f: for chunk in r.iter_content(1 << 20): f.write(chunk) print(f"[SAM] Saved to {SAM_CKPT}") return str(SAM_CKPT) except Exception as e: print(f"[SAM] Download failed: {e}") return None # ════════════════════════════════════════════════════════════════════════════ # ██████████████████ PREPROCESSING — UNCHANGED ██████████████████████████ # ════════════════════════════════════════════════════════════════════════════ def remove_title_block(img: np.ndarray) -> np.ndarray: h, w = img.shape[:2] gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) edges = cv2.Canny(gray, 50, 150) h_kern = cv2.getStructuringElement(cv2.MORPH_RECT, (w // 20, 1)) v_kern = cv2.getStructuringElement(cv2.MORPH_RECT, (1, h // 20)) h_lines = _cuda_morphology(edges, cv2.MORPH_OPEN, h_kern) v_lines = _cuda_morphology(edges, cv2.MORPH_OPEN, v_kern) crop_r, crop_b = w, h right_region = v_lines[:, int(w * 0.7):] if np.any(right_region): v_pos = np.where(np.sum(right_region, axis=0) > h * 0.3)[0] if len(v_pos): crop_r = int(w * 0.7) + v_pos[0] - 10 bot_region = h_lines[int(h * 0.7):, :] if np.any(bot_region): h_pos = np.where(np.sum(bot_region, axis=1) > w * 0.3)[0] if len(h_pos): crop_b = int(h * 0.7) + h_pos[0] - 10 if crop_r == w and crop_b == h: main_d = np.sum(gray < 200) / gray.size if np.sum(gray[:, int(w*0.8):] < 200) / (gray[:, int(w*0.8):].size) > main_d*1.5: crop_r = int(w * 0.8) if np.sum(gray[int(h*0.8):, :] < 200) / (gray[int(h*0.8):, :].size) > main_d*1.5: crop_b = int(h * 0.8) return img[:crop_b, :crop_r].copy() def remove_colors(img: np.ndarray) -> np.ndarray: b = img[:,:,0].astype(np.int32) g = img[:,:,1].astype(np.int32) r = img[:,:,2].astype(np.int32) gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY).astype(np.int32) chroma = np.maximum(np.maximum(r,g),b) - np.minimum(np.minimum(r,g),b) erase = (chroma > 15) & (gray < 240) result = img.copy() result[erase] = (255, 255, 255) return result # ════════════════════════════════════════════════════════════════════════════ # WALL CALIBRATION (unchanged dataclass; loop body vectorised) # ════════════════════════════════════════════════════════════════════════════ from dataclasses import dataclass, field @dataclass class WallCalibration: stroke_width : int = 3 min_component_dim : int = 30 min_component_area: int = 45 bridge_min_gap : int = 2 bridge_max_gap : int = 14 door_gap : int = 41 max_bridge_thick : int = 15 def calibrate_wall(mask: np.ndarray) -> WallCalibration: """ BOTTLENECK 8 FIX — vectorised column/row run-length extraction. Original: two Python for-loops, each calling np.concatenate + np.diff inside the loop body. Fixed: extract all columns at once as a 2-D boolean matrix, apply np.diff along axis-0 once, then gather all runs with a single np.where + arithmetic. """ cal = WallCalibration() h, w = mask.shape # ── stroke-width from column run-lengths ────────────────────────────── n_cols = min(200, w) col_idx = np.linspace(0, w-1, n_cols, dtype=int) max_run = max(2, int(h * 0.05)) # (h, n_cols) bool matrix – extracted in one shot cols_bool = (mask[:, col_idx] > 0).astype(np.int8) # (H, C) padded = np.concatenate( [np.zeros((1, n_cols), np.int8), cols_bool, np.zeros((1, n_cols), np.int8)], axis=0 ) # (H+2, C) diff2d = np.diff(padded.astype(np.int16), axis=0) # (H+1, C) # vectorised: for each column find start/end pairs ci_all, row_starts = np.where(diff2d[:-1] == 1) # row before end ci_all2, row_ends = np.where(diff2d[:-1] == -1) # build per-column run lists using pandas-style groupby via sorting runs_all: List[int] = [] for ci in range(n_cols): s_mask = (ci_all == ci) e_mask = (ci_all2 == ci) ss = row_starts[s_mask] ee = row_ends[e_mask] n = min(len(ss), len(ee)) if n == 0: continue r = (ee[:n] - ss[:n]).astype(int) runs_all.extend(r[(r >= 1) & (r <= max_run)].tolist()) if runs_all: arr = np.array(runs_all, dtype=np.int32) hist = np.bincount(np.clip(arr, 0, 200)) cal.stroke_width = max(2, int(np.argmax(hist[1:])) + 1) cal.min_component_dim = max(15, cal.stroke_width * 10) cal.min_component_area = max(30, cal.stroke_width * cal.min_component_dim // 2) # ── gap sizes from rows + cols — vectorised ─────────────────────────── gap_sizes: List[int] = [] row_step = max(3, h // 200) col_step = max(3, w // 200) # row scan (all selected rows at once) row_idx = np.arange(5, h-5, row_step) rows_bool = (mask[row_idx, :] > 0).astype(np.int8) # (R, W) pad_r = np.concatenate( [np.zeros((len(row_idx),1),np.int8), rows_bool, np.zeros((len(row_idx),1),np.int8)], axis=1 ) diff_r = np.diff(pad_r.astype(np.int16), axis=1) # (R, W+1) ri_all, c_ends = np.where(diff_r == -1) ri_all2, c_starts = np.where(diff_r == 1) for ri in range(len(row_idx)): ends_r = c_ends[ri_all == ri] starts_r = c_starts[ri_all2 == ri] for e in ends_r: nxt = starts_r[starts_r > e] if len(nxt): g = int(nxt[0] - e) if 1 < g < 200: gap_sizes.append(g) # col scan col_idx2 = np.arange(5, w-5, col_step) cols_bool2 = (mask[:, col_idx2] > 0).astype(np.int8) # (H, C) pad_c = np.concatenate( [np.zeros((1,len(col_idx2)),np.int8), cols_bool2, np.zeros((1,len(col_idx2)),np.int8)], axis=0 ) diff_c = np.diff(pad_c.astype(np.int16), axis=0) ci_all3, r_ends = np.where(diff_c == -1) ci_all4, r_starts = np.where(diff_c == 1) for ci in range(len(col_idx2)): ends_c = r_ends[ci_all3 == ci] starts_c = r_starts[ci_all4 == ci] for e in ends_c: nxt = starts_c[starts_c > e] if len(nxt): g = int(nxt[0] - e) if 1 < g < 200: gap_sizes.append(g) cal.bridge_min_gap = 2 if len(gap_sizes) >= 20: g = np.array(gap_sizes) sm = g[g <= 30] if len(sm) >= 10: cal.bridge_max_gap = int(np.clip(np.percentile(sm, 75), 4, 20)) else: cal.bridge_max_gap = cal.stroke_width * 4 door = g[(g > cal.bridge_max_gap) & (g <= 80)] if len(door) >= 5: raw = int(np.percentile(door, 90)) else: raw = max(35, cal.stroke_width * 12) raw = int(np.clip(raw, 25, 80)) cal.door_gap = raw if raw % 2 == 1 else raw + 1 cal.max_bridge_thick = cal.stroke_width * 5 return cal # ════════════════════════════════════════════════════════════════════════════ # SKELETON / TIP HELPERS # ════════════════════════════════════════════════════════════════════════════ def _morphological_skeleton(binary: np.ndarray) -> np.ndarray: """ BOTTLENECK 3 FIX — GPU morphology path re-uses persistent CUDA filter objects instead of creating new ones each iteration. cucim path uses GPU-native skeletonize when available. """ # ── cucim (CuPy-based) GPU skeletonize — fastest path ───────────────── if _CUCIM and _CUPY: try: bin_cp = cp.asarray(binary > 0) skel_cp = _cucim_morph.skeletonize(bin_cp) return (cp.asnumpy(skel_cp) * 255).astype(np.uint8) except Exception: pass # fall through # ── cv2.cuda morphology loop — pre-built filter objects ─────────────── _ensure_skel_filters() if _CV2_CUDA and _SKEL_ERODE_FILTER is not None: skel = np.zeros_like(binary) g_img = _cuda_upload(binary) for _ in range(300): g_eroded = _SKEL_ERODE_FILTER.apply(g_img, stream=_CUDA_STREAM) g_recon = _SKEL_DILATE_FILTER.apply(g_eroded, stream=_CUDA_STREAM) eroded = g_eroded.download() recon = g_recon.download() temp = cv2.subtract(binary, recon) # CPU subtract is cheap skel = cv2.bitwise_or(skel, temp) binary = eroded g_img = g_eroded # reuse GPU mat if not cv2.countNonZero(binary): break return skel # ── pure CPU fallback ───────────────────────────────────────────────── skel = np.zeros_like(binary) img = binary.copy() cross = cv2.getStructuringElement(cv2.MORPH_CROSS, (3, 3)) for _ in range(300): eroded = cv2.erode(img, cross) temp = cv2.subtract(img, cv2.dilate(eroded, cross)) skel = cv2.bitwise_or(skel, temp) img = eroded if not cv2.countNonZero(img): break return skel def _skel(binary: np.ndarray) -> np.ndarray: try: from skimage.morphology import skeletonize as _sk return (_sk(binary > 0) * 255).astype(np.uint8) except ImportError: return _morphological_skeleton(binary) def _tip_pixels(skel_u8: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """ BOTTLENECK 2 FIX — use cv2.cuda.filter2D when CUDA available, avoiding float32 kernel re-creation every call. """ sb = (skel_u8 > 0).astype(np.float32) if _CV2_CUDA: g_sb = _cuda_upload((sb * 255).astype(np.uint8)) # cv2.cuda.filter2D expects uint8 input f2d = cv2.cuda.createLinearFilter( cv2.CV_8UC1, cv2.CV_32FC1, _ONES3x3, borderType=cv2.BORDER_CONSTANT ) g_nbr = f2d.apply(g_sb, stream=_CUDA_STREAM) nbr = g_nbr.download() / 255.0 # scale back else: nbr = cv2.filter2D(sb, -1, _ONES3x3, borderType=cv2.BORDER_CONSTANT) return np.where((sb == 1) & (nbr.astype(np.int32) == 2)) def _outward_vectors(ex, ey, skel_u8: np.ndarray, lookahead: int ) -> Tuple[np.ndarray, np.ndarray]: """ BOTTLENECK 1 FIX — vectorised walk replacing the O(n·lookahead) pure-Python D8 loop. Strategy: • Pre-build a skeleton boolean set as a dense (H×W) uint8 image. • For each endpoint, extract a (lookahead×2) padded sub-window of the skeleton and perform the D8 walk entirely with integer index arithmetic on NumPy arrays (or CuPy when available). • The outward vector is the negated direction from endpoint to walk terminus. For very large n (>2000), CuPy batches all endpoint windows on-GPU. """ n = len(ex) odx = np.zeros(n, np.float32) ody = np.zeros(n, np.float32) if n == 0: return odx, ody h_img, w_img = skel_u8.shape skel_bin = (skel_u8 > 0).astype(np.uint8) # dense lookup # D8 offsets D8_DY = np.array([ 0, 0,-1, 1,-1,-1, 1, 1], np.int32) D8_DX = np.array([-1, 1, 0, 0,-1, 1,-1, 1], np.int32) # ── CuPy vectorised path ────────────────────────────────────────────── if _CUPY and n > 100: skel_cp = cp.asarray(skel_bin) ex_cp = cp.asarray(ex, dtype=cp.int32) ey_cp = cp.asarray(ey, dtype=cp.int32) d8dy_cp = cp.asarray(D8_DY) d8dx_cp = cp.asarray(D8_DX) # current positions (n,) cx_cp = ex_cp.copy() cy_cp = ey_cp.copy() px_cp = ex_cp.copy() py_cp = ey_cp.copy() for _ in range(lookahead): # candidate next positions: (8, n) nx_all = cx_cp[None, :] + d8dx_cp[:, None] ny_all = cy_cp[None, :] + d8dy_cp[:, None] # clamp to image bounds nx_all = cp.clip(nx_all, 0, w_img - 1) ny_all = cp.clip(ny_all, 0, h_img - 1) # exclude previous position not_prev = ~((nx_all == px_cp[None, :]) & (ny_all == py_cp[None, :])) # skeleton membership on_skel = skel_cp[ny_all, nx_all] valid = not_prev & (on_skel > 0) # (8, n) # pick first valid D8 direction (argmax on axis-0) any_valid = valid.any(axis=0) # (n,) first_dir = valid.argmax(axis=0) # (n,) 0-7 chosen_nx = nx_all[first_dir, cp.arange(n)] chosen_ny = ny_all[first_dir, cp.arange(n)] # only update endpoints where a move was found px_cp = cp.where(any_valid, cx_cp, px_cp) py_cp = cp.where(any_valid, cy_cp, py_cp) cx_cp = cp.where(any_valid, chosen_nx, cx_cp) cy_cp = cp.where(any_valid, chosen_ny, cy_cp) ix = (cx_cp - ex_cp).astype(cp.float32) iy = (cy_cp - ey_cp).astype(cp.float32) nr = cp.maximum(1e-6, cp.hypot(ix, iy)) odx_cp = -ix / nr ody_cp = -iy / nr return cp.asnumpy(odx_cp), cp.asnumpy(ody_cp) # ── NumPy vectorised path ───────────────────────────────────────────── cx = ex.copy().astype(np.int32) cy = ey.copy().astype(np.int32) px = ex.copy().astype(np.int32) py = ey.copy().astype(np.int32) for _ in range(lookahead): nx_all = np.clip(cx[None, :] + D8_DX[:, None], 0, w_img - 1) # (8,n) ny_all = np.clip(cy[None, :] + D8_DY[:, None], 0, h_img - 1) not_prev = ~((nx_all == px[None, :]) & (ny_all == py[None, :])) on_skel = skel_bin[ny_all, nx_all] valid = not_prev & (on_skel > 0) any_valid = valid.any(axis=0) first_dir = valid.argmax(axis=0) chosen_nx = nx_all[first_dir, np.arange(n)] chosen_ny = ny_all[first_dir, np.arange(n)] px = np.where(any_valid, cx, px) py = np.where(any_valid, cy, py) cx = np.where(any_valid, chosen_nx, cx) cy = np.where(any_valid, chosen_ny, cy) ix = (cx - ex).astype(np.float32) iy = (cy - ey).astype(np.float32) nr = np.maximum(1e-6, np.hypot(ix, iy)) odx = -ix / nr ody = -iy / nr return odx, ody # ════════════════════════════════════════════════════════════════════════════ # ANALYZE IMAGE CHARACTERISTICS (unchanged) # ════════════════════════════════════════════════════════════════════════════ def analyze_image_characteristics(img: np.ndarray) -> Dict[str, Any]: gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) brightness = float(np.mean(gray)) contrast = float(np.std(gray)) otsu_thr, _ = _cuda_threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) if brightness > 220: wall_threshold = max(200, int(otsu_thr * 1.1)) elif brightness < 180: wall_threshold = max(150, int(otsu_thr * 0.9)) else: wall_threshold = int(otsu_thr) return {"brightness": brightness, "contrast": contrast, "wall_threshold": wall_threshold, "otsu_threshold": otsu_thr} # ════════════════════════════════════════════════════════════════════════════ # DOOR ARC DETECTION (unchanged) # ════════════════════════════════════════════════════════════════════════════ def detect_and_close_door_arcs(img: np.ndarray) -> np.ndarray: R_MIN=60; R_MAX=320; DP=1.2; PARAM1=50; PARAM2=22; MIN_DIST=50 MAX_ARC=115.0; MIN_ARC=60.0; LEAF_FRAC=0.92; LEAF_THR=0.35 WALL_R=1.25; WALL_THR=12; SNAP_R=30 DOUBLE_R_RATIO=1.4; DOUBLE_DIST=1.8; LINE_T=3 gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) h, w = gray.shape result = img.copy() _, binary = _cuda_threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) binary = _cuda_morphology(binary.astype(np.uint8), cv2.MORPH_CLOSE, np.ones((3,3), np.uint8)) blurred = _cuda_gaussian_blur(gray, (7,7), 1.5) raw = cv2.HoughCircles(blurred, cv2.HOUGH_GRADIENT, dp=DP, minDist=MIN_DIST, param1=PARAM1, param2=PARAM2, minRadius=R_MIN, maxRadius=R_MAX) if raw is None: return result circles = np.round(raw[0]).astype(np.int32) binary = binary.astype(np.uint8) def sample_ring(cx, cy, r, n=360): ang = np.linspace(0, 2*np.pi, n, endpoint=False) xs = np.clip((cx + r*np.cos(ang)).astype(np.int32), 0, w-1) ys = np.clip((cy + r*np.sin(ang)).astype(np.int32), 0, h-1) return ang, xs, ys def arc_span(cx, cy, r): ang, xs, ys = sample_ring(cx, cy, r) on = ang[binary[ys, xs] > 0] if len(on) == 0: return 0.0, np.array([]) return float(np.degrees(on[-1]-on[0])), on def has_leaf(cx, cy, r): lr = r*LEAF_FRAC; n = max(60, int(r)) ang = np.linspace(0, 2*np.pi, n, endpoint=False) xs = np.clip((cx+lr*np.cos(ang)).astype(np.int32), 0, w-1) ys = np.clip((cy+lr*np.sin(ang)).astype(np.int32), 0, h-1) return float(np.mean(binary[ys,xs]>0)) >= LEAF_THR def wall_outside(cx, cy, r): pr = r*WALL_R; ang = np.linspace(0, 2*np.pi, 36, endpoint=False) xs = np.clip((cx+pr*np.cos(ang)).astype(np.int32), 0, w-1) ys = np.clip((cy+pr*np.sin(ang)).astype(np.int32), 0, h-1) return int(np.sum(binary[ys,xs]>0)) >= WALL_THR def endpoints(cx, cy, r, occ): gap_t = np.radians(25.0); diffs = np.diff(occ) big = np.where(diffs > gap_t)[0] if len(big) == 0: sa, ea = occ[0], occ[-1] else: sp = big[np.argmax(diffs[big])] sa, ea = occ[sp+1], occ[sp] def snap(a): px2 = int(round(cx+r*np.cos(a))); py2 = int(round(cy+r*np.sin(a))) y0=max(0,py2-SNAP_R); y1=min(h,py2+SNAP_R+1) x0=max(0,px2-SNAP_R); x1=min(w,px2+SNAP_R+1) roi = binary[y0:y1, x0:x1] wy2, wx2 = np.where(roi>0) if len(wx2)==0: return px2, py2 dd = np.hypot(wx2-(px2-x0), wy2-(py2-y0)) i = int(np.argmin(dd)) return int(wx2[i]+x0), int(wy2[i]+y0) return snap(sa), snap(ea) valid = [] for cx, cy, r in circles: span, occ = arc_span(cx, cy, r) if not (MIN_ARC <= span <= MAX_ARC): continue if not has_leaf(cx, cy, r): continue if not wall_outside(cx, cy, r): continue ep1, ep2 = endpoints(cx, cy, r, occ) valid.append((cx, cy, r, ep1, ep2)) used = [False]*len(valid) double_pairs = [] for i in range(len(valid)): if used[i]: continue cx1,cy1,r1,_,_ = valid[i] best_j, best_d = -1, 1e9 for j in range(i+1, len(valid)): if used[j]: continue cx2,cy2,r2,_,_ = valid[j] if max(r1,r2)/(min(r1,r2)+1e-6) > DOUBLE_R_RATIO: continue cd = float(np.hypot(cx2-cx1, cy2-cy1)) if cd < (r1+r2)*DOUBLE_DIST and cd < best_d: best_d, best_j = cd, j if best_j >= 0: double_pairs.append((i, best_j)) used[i] = used[best_j] = True singles = [i for i in range(len(valid)) if not used[i]] for idx in singles: cx,cy,r,ep1,ep2 = valid[idx] cv2.line(result, ep1, ep2, (0,0,0), LINE_T) for i_idx, j_idx in double_pairs: cx1,cy1,r1,ep1a,ep1b = valid[i_idx] cx2,cy2,r2,ep2a,ep2b = valid[j_idx] daa = np.hypot(ep1a[0]-ep2a[0], ep1a[1]-ep2a[1]) dab = np.hypot(ep1a[0]-ep2b[0], ep1a[1]-ep2b[1]) if daa <= dab: inner1,outer1,inner2,outer2 = ep1a,ep1b,ep2a,ep2b else: inner1,outer1,inner2,outer2 = ep1a,ep1b,ep2b,ep2a cv2.line(result, outer1, outer2, (0,0,0), LINE_T) cv2.line(result, inner1, inner2, (0,0,0), LINE_T) return result # ════════════════════════════════════════════════════════════════════════════ # EXTRACT WALLS (unchanged) # ════════════════════════════════════════════════════════════════════════════ def _estimate_wall_body_thickness(binary: np.ndarray, fallback: int = 12) -> int: h, w = binary.shape n_cols = min(200, w) col_idx = np.linspace(0, w-1, n_cols, dtype=int) cols = (binary[:, col_idx] > 0).astype(np.int8) padded = np.concatenate([np.zeros((1,n_cols),np.int8), cols, np.zeros((1,n_cols),np.int8)], axis=0) diff = np.diff(padded.astype(np.int16), axis=0) run_lengths = [] for ci in range(n_cols): d = diff[:, ci] s = np.where(d == 1)[0] e = np.where(d == -1)[0] if len(s)==0 or len(e)==0: continue r = e - s r = r[(r >= 2) & (r <= h*0.15)] if len(r): run_lengths.append(r) if run_lengths: return int(np.median(np.concatenate(run_lengths))) return fallback def _remove_thin_lines(walls: np.ndarray, min_thickness: int) -> np.ndarray: dist = cv2.distanceTransform(walls, cv2.DIST_L2, 5) thick_mask = dist >= (min_thickness / 2) n_lbl, labels, _, _ = cv2.connectedComponentsWithStats(walls, connectivity=8) if n_lbl <= 1: return walls thick_labels = labels[thick_mask] if len(thick_labels) == 0: return np.zeros_like(walls) has_thick = np.zeros(n_lbl, dtype=bool) has_thick[thick_labels] = True keep_lut = has_thick.astype(np.uint8)*255; keep_lut[0] = 0 return keep_lut[labels] def _filter_double_lines_and_thick(walls: np.ndarray) -> np.ndarray: MIN_SINGLE_DIM = 20; DOUBLE_GAP = 60; DOUBLE_PCT = 12 n_lbl, labels, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8) if n_lbl <= 1: return walls try: skel_full = cv2.ximgproc.thinning(walls, thinningType=cv2.ximgproc.THINNING_ZHANGSUEN) except AttributeError: skel_full = _morphological_skeleton(walls) skel_bin = skel_full > 0 keep_ids: set = set() thin_cands = [] for i in range(1, n_lbl): bw = int(stats[i, cv2.CC_STAT_WIDTH]); bh = int(stats[i, cv2.CC_STAT_HEIGHT]) if min(bw, bh) >= MIN_SINGLE_DIM: keep_ids.add(i) else: thin_cands.append(i) if not thin_cands: filtered = np.zeros_like(walls) for i in keep_ids: filtered[labels==i] = 255 return filtered skel_labels = labels * skel_bin img_h, img_w = labels.shape probe_dists = np.arange(3, DOUBLE_GAP+1, 3, dtype=np.float32) for i in thin_cands: bys, bxs = np.where(skel_labels == i) if len(bys) < 4: continue step = max(1, len(bys)//80) sy = bys[::step].astype(np.float32); sx = bxs[::step].astype(np.float32) n_s = len(sy) sy_prev=np.roll(sy,1); sy_prev[0]=sy[0] sy_next=np.roll(sy,-1); sy_next[-1]=sy[-1] sx_prev=np.roll(sx,1); sx_prev[0]=sx[0] sx_next=np.roll(sx,-1); sx_next[-1]=sx[-1] dr=(sy_next-sy_prev); dc=(sx_next-sx_prev) dlen=np.maximum(1.0, np.hypot(dr, dc)) pr=(-dc/dlen)[:,np.newaxis]; pc=(dr/dlen)[:,np.newaxis] for sign in (1.0, -1.0): rr = np.round(sy[:,np.newaxis] + sign*pr*probe_dists).astype(np.int32) cc = np.round(sx[:,np.newaxis] + sign*pc*probe_dists).astype(np.int32) valid_m = (rr>=0)&(rr=0)&(cc0) & (lbl_at!=i) hit_any = partner.any(axis=1) hit_rows = np.where(hit_any)[0] if len(hit_rows) == 0: continue first_col = partner[hit_rows].argmax(axis=1) partner_ids = lbl_at[hit_rows, first_col] keep_ids.update(partner_ids.tolist()) if 100.0*len(hit_rows)/n_s >= DOUBLE_PCT: keep_ids.add(i); break if keep_ids: ka = np.array(sorted(keep_ids), dtype=np.int32) lut = np.zeros(n_lbl, dtype=np.uint8); lut[ka] = 255 return lut[labels] return np.zeros_like(walls) def extract_walls_adaptive(img_clean: np.ndarray, img_stats: Optional[Dict] = None) -> Tuple[np.ndarray, int]: h, w = img_clean.shape[:2] gray = cv2.cvtColor(img_clean, cv2.COLOR_BGR2GRAY) if img_stats: wall_threshold = img_stats["wall_threshold"] else: otsu_t, _ = _cuda_threshold(gray, 0, 255, cv2.THRESH_BINARY_INV+cv2.THRESH_OTSU) wall_threshold = int(otsu_t) _, binary = _cuda_threshold(gray, wall_threshold, 255, cv2.THRESH_BINARY_INV) binary = binary.astype(np.uint8) min_line_len = max(8, int(0.012 * w)) body_thickness = _estimate_wall_body_thickness(binary, fallback=12) body_thickness = int(np.clip(body_thickness, 9, 30)) k_h = cv2.getStructuringElement(cv2.MORPH_RECT, (min_line_len, 1)) k_v = cv2.getStructuringElement(cv2.MORPH_RECT, (1, min_line_len)) long_h = _cuda_morphology(binary, cv2.MORPH_OPEN, k_h) long_v = _cuda_morphology(binary, cv2.MORPH_OPEN, k_v) orig_walls = cv2.bitwise_or(long_h, long_v) k_bh = cv2.getStructuringElement(cv2.MORPH_RECT, (1, body_thickness)) k_bv = cv2.getStructuringElement(cv2.MORPH_RECT, (body_thickness, 1)) dil_h = _cuda_dilate(long_h, k_bh) dil_v = _cuda_dilate(long_v, k_bv) walls = cv2.bitwise_or(dil_h, dil_v) collision = cv2.bitwise_and(dil_h, dil_v) safe_zone = cv2.bitwise_and(collision, orig_walls) walls = cv2.bitwise_or(cv2.bitwise_and(walls, cv2.bitwise_not(collision)), safe_zone) dist = cv2.distanceTransform(cv2.bitwise_not(orig_walls), cv2.DIST_L2, 5) keep_mask = (dist <= body_thickness/2).astype(np.uint8) * 255 walls = cv2.bitwise_and(walls, keep_mask) walls = _remove_thin_lines(walls, min_thickness=body_thickness) n_lbl, labels, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8) if n_lbl > 1: areas = stats[1:, cv2.CC_STAT_AREA] min_n = max(20, int(np.median(areas) * 0.0001)) keep_lut = np.zeros(n_lbl, dtype=np.uint8) keep_lut[1:] = (areas >= min_n).astype(np.uint8) walls = (keep_lut[labels] * 255).astype(np.uint8) walls = _filter_double_lines_and_thick(walls) return walls, body_thickness FIXTURE_MAX_BLOB=80; FIXTURE_MAX_AREA=4000; FIXTURE_MAX_ASP=4.0 FIXTURE_DENSITY_R=50; FIXTURE_DENSITY_THR=0.35; FIXTURE_MIN_ZONE=1500 def remove_fixture_symbols(walls: np.ndarray) -> np.ndarray: h, w = walls.shape n_lbl, labels, stats, centroids = cv2.connectedComponentsWithStats(walls, connectivity=8) if n_lbl <= 1: return walls bw_a=stats[1:,cv2.CC_STAT_WIDTH].astype(np.float32) bh_a=stats[1:,cv2.CC_STAT_HEIGHT].astype(np.float32) ar_a=stats[1:,cv2.CC_STAT_AREA].astype(np.float32) cx_a=np.round(centroids[1:,0]).astype(np.int32) cy_a=np.round(centroids[1:,1]).astype(np.int32) mx=np.maximum(bw_a,bh_a); mn=np.minimum(bw_a,bh_a) asp=mx/(mn+1e-6) cand=(bw_a0: density/=dm zone=(density>=FIXTURE_DENSITY_THR).astype(np.uint8)*255 nz,zlbl,zst,_=cv2.connectedComponentsWithStats(zone,connectivity=8) cz=np.zeros_like(zone) if nz>1: za=zst[1:,cv2.CC_STAT_AREA]; kz=np.where(za>=FIXTURE_MIN_ZONE)[0]+1 if len(kz): lut2=np.zeros(nz,dtype=np.uint8); lut2[kz]=255; cz=lut2[zlbl] zone=cz vc=(ccy>=0)&(ccy=0)&(ccx0) erase_ids=cand_ids[in_zone] result=walls.copy() if len(erase_ids): el=np.zeros(n_lbl,dtype=np.uint8); el[erase_ids]=1 result[el[labels].astype(bool)]=0 return result def _remove_thin_lines_calibrated(walls: np.ndarray, cal: WallCalibration) -> np.ndarray: n_cc, cc, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8) if n_cc <= 1: return walls bw=stats[1:,cv2.CC_STAT_WIDTH]; bh=stats[1:,cv2.CC_STAT_HEIGHT] ar=stats[1:,cv2.CC_STAT_AREA]; mx=np.maximum(bw,bh) keep=(mx>=cal.min_component_dim)|(ar>=cal.min_component_area*3) lut=np.zeros(n_cc,np.uint8); lut[1:]=keep.astype(np.uint8)*255 return lut[cc] def _bridge_wall_endpoints_v2(walls: np.ndarray, cal: WallCalibration, angle_tol: float = 15.0) -> np.ndarray: """ BOTTLENECK 12 FIX — vectorised path-clear check. Original: Python for-loop with np.any per pair. Fixed: all N_SAMP mid-paths stacked into (K, N_SAMP-2) index arrays; wall lookup via advanced indexing; any() collapsed axis-1 in one shot. """ try: from scipy.spatial import cKDTree as _KDTree _SCIPY = True except ImportError: _SCIPY = False result=walls.copy(); h,w=walls.shape; FCOS=np.cos(np.radians(70.0)) skel=_skel(walls); ey,ex=_tip_pixels(skel); n_ep=len(ey) if n_ep < 2: return result _,cc_map=cv2.connectedComponents(walls,connectivity=8) ep_cc=cc_map[ey,ex] lookahead=max(8, cal.stroke_width*3) out_dx,out_dy=_outward_vectors(ex,ey,skel,lookahead) pts=np.stack([ex,ey],axis=1).astype(np.float32) if _SCIPY: from scipy.spatial import cKDTree pairs=cKDTree(pts).query_pairs(float(cal.bridge_max_gap), output_type='ndarray') ii=pairs[:,0].astype(np.int64); jj=pairs[:,1].astype(np.int64) else: _ii,_jj=np.triu_indices(n_ep,k=1) ok=np.hypot(pts[_jj,0]-pts[_ii,0],pts[_jj,1]-pts[_ii,1])<=cal.bridge_max_gap ii=_ii[ok].astype(np.int64); jj=_jj[ok].astype(np.int64) if len(ii)==0: return result if _CUPY: ii_cp = cp.asarray(ii); jj_cp = cp.asarray(jj) pts_cp = cp.asarray(pts) odx_cp = cp.asarray(out_dx); ody_cp = cp.asarray(out_dy) dxij = pts_cp[jj_cp,0]-pts_cp[ii_cp,0] dyij = pts_cp[jj_cp,1]-pts_cp[ii_cp,1] dists_cp = cp.hypot(dxij,dyij) safe = cp.maximum(dists_cp, 1e-6) ux,uy = dxij/safe, dyij/safe ang = cp.degrees(cp.arctan2(cp.abs(dyij), cp.abs(dxij))) is_H = (ang<=angle_tol) is_V = (ang>=(90.0-angle_tol)) g1 = (dists_cp>=cal.bridge_min_gap)&(dists_cp<=cal.bridge_max_gap) g2 = is_H|is_V g3 = ((odx_cp[ii_cp]*ux+ody_cp[ii_cp]*uy)>=FCOS) & \ ((odx_cp[jj_cp]*-ux+ody_cp[jj_cp]*-uy)>=FCOS) ep_cc_cp = cp.asarray(ep_cc) g4 = ep_cc_cp[ii_cp]!=ep_cc_cp[jj_cp] pre_ok_cp = g1&g2&g3&g4 pre_idx = cp.asnumpy(cp.where(pre_ok_cp)[0]) dists = cp.asnumpy(dists_cp) is_H = cp.asnumpy(is_H) is_V = cp.asnumpy(is_V) else: dxij=pts[jj,0]-pts[ii,0]; dyij=pts[jj,1]-pts[ii,1] dists=np.hypot(dxij,dyij); safe=np.maximum(dists,1e-6) ux,uy=dxij/safe,dyij/safe ang=np.degrees(np.arctan2(np.abs(dyij),np.abs(dxij))) is_H=ang<=angle_tol; is_V=ang>=(90.0-angle_tol) g1=(dists>=cal.bridge_min_gap)&(dists<=cal.bridge_max_gap); g2=is_H|is_V g3=((out_dx[ii]*ux+out_dy[ii]*uy)>=FCOS)&((out_dx[jj]*-ux+out_dy[jj]*-uy)>=FCOS) g4=ep_cc[ii]!=ep_cc[jj] pre_ok=g1&g2&g3&g4; pre_idx=np.where(pre_ok)[0] if len(pre_idx) == 0: return result # ── VECTORISED path-clear check (BOTTLENECK 12 FIX) ────────────────── N_SAMP = 9 K = len(pre_idx) vi_pre = ii[pre_idx]; vj_pre = jj[pre_idx] ax_arr = ex[vi_pre].astype(np.float32); ay_arr = ey[vi_pre].astype(np.float32) bx_arr = ex[vj_pre].astype(np.float32); by_arr = ey[vj_pre].astype(np.float32) is_H_pre = is_H[pre_idx] # t values for interior samples (exclude endpoints) t = np.linspace(0, 1, N_SAMP, dtype=np.float32)[1:-1] # (N_SAMP-2,) # xs[k, s] = lerp(ax, bx, t[s]) when H, else ax xs_h = ax_arr[:, None] + (bx_arr - ax_arr)[:, None] * t[None, :] # (K, N_SAMP-2) ys_h = np.broadcast_to(ay_arr[:, None], (K, N_SAMP-2)).copy() # constant y xs_v = np.broadcast_to(ax_arr[:, None], (K, N_SAMP-2)).copy() ys_v = ay_arr[:, None] + (by_arr - ay_arr)[:, None] * t[None, :] xs_all = np.where(is_H_pre[:, None], xs_h, xs_v) ys_all = np.where(is_H_pre[:, None], ys_h, ys_v) sxs = np.clip(np.round(xs_all).astype(np.int32), 0, w-1) # (K, N_SAMP-2) sys_ = np.clip(np.round(ys_all).astype(np.int32), 0, h-1) # bulk wall lookup: walls_flat[K, N_SAMP-2] walls_flat = walls[sys_, sxs] # (K, N_SAMP-2) uint8 blocked = walls_flat.any(axis=1) # (K,) bool clr = ~blocked valid = pre_idx[clr] if len(valid) == 0: return result vi=ii[valid]; vj=jj[valid]; vd=dists[valid]; vH=is_H[valid] order=np.argsort(vd); vi,vj,vd,vH=vi[order],vj[order],vd[order],vH[order] used=np.zeros(n_ep,dtype=bool) for k in range(len(vi)): ia,ib=int(vi[k]),int(vj[k]) if used[ia] or used[ib]: continue ax,ay=int(ex[ia]),int(ey[ia]); bx2,by2=int(ex[ib]),int(ey[ib]) p1,p2=((min(ax,bx2),ay),(max(ax,bx2),ay)) if vH[k] else ((ax,min(ay,by2)),(ax,max(ay,by2))) cv2.line(result,p1,p2,255,cal.stroke_width) used[ia]=used[ib]=True return result def _close_door_openings_v2(walls: np.ndarray, cal: WallCalibration) -> np.ndarray: gap=cal.door_gap def _shape_close(mask, kwh, axis, max_thick): k=cv2.getStructuringElement(cv2.MORPH_RECT, kwh) cls=_cuda_morphology(mask, cv2.MORPH_CLOSE, k) new=cv2.bitwise_and(cls,cv2.bitwise_not(mask)) if not np.any(new): return np.zeros_like(mask) n2,lbl2,st2,_=cv2.connectedComponentsWithStats(new,connectivity=8) if n2<=1: return np.zeros_like(mask) perp=st2[1:,cv2.CC_STAT_HEIGHT if axis=='H' else cv2.CC_STAT_WIDTH] keep=perp<=max_thick; lut2=np.zeros(n2,np.uint8); lut2[1:]=keep.astype(np.uint8)*255 return lut2[lbl2] add_h=_shape_close(walls,(gap,1),'H',cal.max_bridge_thick) add_v=_shape_close(walls,(1,gap),'V',cal.max_bridge_thick) return cv2.bitwise_or(walls, cv2.bitwise_or(add_h,add_v)) def reconstruct_walls(walls: np.ndarray) -> Tuple[np.ndarray, WallCalibration]: cal = calibrate_wall(walls) walls = _remove_thin_lines_calibrated(walls, cal) walls = _bridge_wall_endpoints_v2(walls, cal) walls = _close_door_openings_v2(walls, cal) return walls, cal def remove_dangling_lines(walls: np.ndarray, cal: WallCalibration) -> np.ndarray: stroke = cal.stroke_width connect_radius = max(6, stroke*3) n_cc,cc_map,stats,_ = cv2.connectedComponentsWithStats(walls,connectivity=8) if n_cc <= 1: return walls skel=_skel(walls); tip_y,tip_x=_tip_pixels(skel) tip_cc=cc_map[tip_y,tip_x] free_counts=np.zeros(n_cc,dtype=np.int32) for i in range(len(tip_x)): free_counts[tip_cc[i]]+=1 remove=np.zeros(n_cc,dtype=bool) ker=cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(connect_radius*2+1,connect_radius*2+1)) for cc_id in range(1,n_cc): if free_counts[cc_id]<2: continue bw2=int(stats[cc_id,cv2.CC_STAT_WIDTH]); bh2=int(stats[cc_id,cv2.CC_STAT_HEIGHT]) if max(bw2,bh2) > stroke*40: continue cm=(cc_map==cc_id).astype(np.uint8) dc=_cuda_dilate(cm, ker) overlap=cv2.bitwise_and(dc,((walls>0)&(cc_map!=cc_id)).astype(np.uint8)) if np.count_nonzero(overlap)==0: remove[cc_id]=True lut=np.ones(n_cc,dtype=np.uint8); lut[0]=0; lut[remove]=0 return (lut[cc_map]*255).astype(np.uint8) def close_large_door_gaps(walls: np.ndarray, cal: WallCalibration) -> np.ndarray: """ BOTTLENECK 12 FIX (same vectorised path-clear as _bridge_wall_endpoints_v2). """ try: from scipy.spatial import cKDTree _SCIPY = True except ImportError: _SCIPY = False DOOR_MIN=180; DOOR_MAX=320; ANGLE_TOL=12.0 FCOS=np.cos(np.radians(90.0-ANGLE_TOL)) stroke=cal.stroke_width; line_width=max(stroke,3) result=walls.copy(); h,w=walls.shape skel=_skel(walls); tip_y,tip_x=_tip_pixels(skel) n_ep=len(tip_x) if n_ep<2: return result _,cc_map=cv2.connectedComponents(walls,connectivity=8) ep_cc=cc_map[tip_y,tip_x] lookahead=max(12,stroke*4) out_dx,out_dy=_outward_vectors(tip_x,tip_y,skel,lookahead) pts=np.stack([tip_x,tip_y],axis=1).astype(np.float32) if _SCIPY: pairs=cKDTree(pts).query_pairs(float(DOOR_MAX),output_type='ndarray') ii=pairs[:,0].astype(np.int64); jj=pairs[:,1].astype(np.int64) else: _ii,_jj=np.triu_indices(n_ep,k=1) ok=np.hypot(pts[_jj,0]-pts[_ii,0],pts[_jj,1]-pts[_ii,1])<=DOOR_MAX ii=_ii[ok].astype(np.int64); jj=_jj[ok].astype(np.int64) if len(ii)==0: return result if _CUPY: ii_cp=cp.asarray(ii); jj_cp=cp.asarray(jj) pts_cp=cp.asarray(pts) odx_cp=cp.asarray(out_dx); ody_cp=cp.asarray(out_dy) ep_cc_cp=cp.asarray(ep_cc) dxij=pts_cp[jj_cp,0]-pts_cp[ii_cp,0] dyij=pts_cp[jj_cp,1]-pts_cp[ii_cp,1] dists_cp=cp.hypot(dxij,dyij); safe=cp.maximum(dists_cp,1e-6) ux,uy=dxij/safe,dyij/safe ang=cp.degrees(cp.arctan2(cp.abs(dyij),cp.abs(dxij))) is_H=(ang<=ANGLE_TOL); is_V=(ang>=(90.0-ANGLE_TOL)) g1=(dists_cp>=DOOR_MIN)&(dists_cp<=DOOR_MAX); g2=is_H|is_V g3=((odx_cp[ii_cp]*ux+ody_cp[ii_cp]*uy)>=FCOS)&\ ((odx_cp[jj_cp]*-ux+ody_cp[jj_cp]*-uy)>=FCOS) g4=ep_cc_cp[ii_cp]!=ep_cc_cp[jj_cp] pre_idx=cp.asnumpy(cp.where(g1&g2&g3&g4)[0]) dists=cp.asnumpy(dists_cp); is_H=cp.asnumpy(is_H); is_V=cp.asnumpy(is_V) else: dxij=pts[jj,0]-pts[ii,0]; dyij=pts[jj,1]-pts[ii,1] dists=np.hypot(dxij,dyij); safe=np.maximum(dists,1e-6) ux,uy=dxij/safe,dyij/safe ang=np.degrees(np.arctan2(np.abs(dyij),np.abs(dxij))) is_H=ang<=ANGLE_TOL; is_V=ang>=(90.0-ANGLE_TOL) g1=(dists>=DOOR_MIN)&(dists<=DOOR_MAX); g2=is_H|is_V g3=((out_dx[ii]*ux+out_dy[ii]*uy)>=FCOS)&((out_dx[jj]*-ux+out_dy[jj]*-uy)>=FCOS) g4=ep_cc[ii]!=ep_cc[jj] pre_idx=np.where(g1&g2&g3&g4)[0] if len(pre_idx) == 0: return result # ── vectorised path-clear ───────────────────────────────────────────── N_SAMP = 15 K = len(pre_idx) vi_pre = ii[pre_idx]; vj_pre = jj[pre_idx] ax_arr = tip_x[vi_pre].astype(np.float32); ay_arr = tip_y[vi_pre].astype(np.float32) bx_arr = tip_x[vj_pre].astype(np.float32); by_arr = tip_y[vj_pre].astype(np.float32) is_H_pre = is_H[pre_idx] t = np.linspace(0, 1, N_SAMP, dtype=np.float32)[1:-1] mid_y = ((ay_arr + by_arr) / 2.0)[:, None] mid_x = ((ax_arr + bx_arr) / 2.0)[:, None] xs_h = ax_arr[:, None] + (bx_arr - ax_arr)[:, None] * t[None, :] ys_h = np.broadcast_to(mid_y, (K, N_SAMP-2)).copy() xs_v = np.broadcast_to(mid_x, (K, N_SAMP-2)).copy() ys_v = ay_arr[:, None] + (by_arr - ay_arr)[:, None] * t[None, :] xs_all = np.where(is_H_pre[:, None], xs_h, xs_v) ys_all = np.where(is_H_pre[:, None], ys_h, ys_v) sxs = np.clip(np.round(xs_all).astype(np.int32), 0, w-1) sys_ = np.clip(np.round(ys_all).astype(np.int32), 0, h-1) blocked = walls[sys_, sxs].any(axis=1) clr = ~blocked valid=pre_idx[clr] if len(valid)==0: return result vi=ii[valid]; vj=jj[valid]; vd=dists[valid]; vH=is_H[valid] order=np.argsort(vd); vi,vj,vd,vH=vi[order],vj[order],vd[order],vH[order] used=np.zeros(n_ep,dtype=bool) for k in range(len(vi)): ia,ib=int(vi[k]),int(vj[k]) if used[ia] or used[ib]: continue ax,ay=int(tip_x[ia]),int(tip_y[ia]); bx2,by2=int(tip_x[ib]),int(tip_y[ib]) if vH[k]: p1=(min(ax,bx2),(ay+by2)//2); p2=(max(ax,bx2),(ay+by2)//2) else: p1=((ax+bx2)//2,min(ay,by2)); p2=((ax+bx2)//2,max(ay,by2)) cv2.line(result,p1,p2,255,line_width) used[ia]=used[ib]=True return result def apply_user_lines_to_walls(walls, lines, thickness): result = walls.copy() for x1, y1, x2, y2 in lines: cv2.line(result, (x1, y1), (x2, y2), 255, max(thickness, 3)) return result def segment_rooms_flood(walls: np.ndarray) -> np.ndarray: h, w = walls.shape work = walls.copy() work[:5, :] = 255; work[-5:, :] = 255 work[:, :5] = 255; work[:, -5:] = 255 filled = work.copy() mask = np.zeros((h+2, w+2), np.uint8) for sx, sy in [(0,0),(w-1,0),(0,h-1),(w-1,h-1), (w//2,0),(w//2,h-1),(0,h//2),(w-1,h//2)]: if filled[sy, sx] == 0: cv2.floodFill(filled, mask, (sx, sy), 255) rooms = cv2.bitwise_not(filled) rooms = cv2.bitwise_and(rooms, cv2.bitwise_not(walls)) rooms = _cuda_morphology(rooms, cv2.MORPH_OPEN, np.ones((2,2), np.uint8)) return rooms def _find_thick_wall_neg_prompts(walls_mask, n=SAM_WALL_NEG): """ BOTTLENECK 6 FIX — GPU distanceTransform + vectorised grid-cell uniquing. """ h, w = walls_mask.shape # ── GPU distanceTransform ───────────────────────────────────────────── if _CV2_CUDA: g_wall = _cuda_upload(walls_mask) # cv2.cuda distanceTransform (L2, 5-mask) g_dist = cv2.cuda.GpuMat() cv2.cuda.distanceTransform(g_wall, g_dist, cv2.DIST_L2, 5, stream=_CUDA_STREAM) dist = g_dist.download() else: dist = cv2.distanceTransform(walls_mask, cv2.DIST_L2, cv2.DIST_MASK_PRECISE) try: skel = cv2.ximgproc.thinning(walls_mask, thinningType=cv2.ximgproc.THINNING_ZHANGSUEN) except AttributeError: skel = _morphological_skeleton(walls_mask) skel_vals = dist[skel > 0] if len(skel_vals) == 0: return [] thr = max(float(np.percentile(skel_vals, SAM_WALL_PCT)), WALL_MIN_HALF_PX) ys, xs = np.where((skel > 0) & (dist >= thr)) if len(ys) == 0: return [] # ── vectorised grid-cell uniquing (no Python loop) ──────────────────── grid_cells = max(1, int(np.ceil(np.sqrt(n * 4)))) cell_h = max(1, h // grid_cells); cell_w = max(1, w // grid_cells) cell_ids = (ys // cell_h) * grid_cells + (xs // cell_w) _, first = np.unique(cell_ids, return_index=True) # already vectorised sel = first[:n] return [(int(xs[i]), int(ys[i])) for i in sel] def generate_prompts(walls_mask, rooms_flood): """ BOTTLENECK 4 FIX — vectorised component filtering + bulk centroid wall-check using advanced indexing; fallback centroid search using a single np.argmin over a pre-built offset grid. """ h, w = walls_mask.shape inv = cv2.bitwise_not(walls_mask) n, labels, stats, centroids = cv2.connectedComponentsWithStats(inv, connectivity=8) min_prompt_area = max(200, int(h * w * 0.0001)) if n <= 1: neg_pts = _find_thick_wall_neg_prompts(walls_mask) return (np.array([], dtype=np.float32).reshape(0,2), np.array([], dtype=np.int32)) # ── vectorised filtering (skip index 0 = background) ───────────────── areas = stats[1:, cv2.CC_STAT_AREA] bx_ = stats[1:, cv2.CC_STAT_LEFT]; by_ = stats[1:, cv2.CC_STAT_TOP] bw_ = stats[1:, cv2.CC_STAT_WIDTH]; bh_ = stats[1:, cv2.CC_STAT_HEIGHT] cx_all = np.clip(np.round(centroids[1:, 0]).astype(np.int32), 0, w-1) cy_all = np.clip(np.round(centroids[1:, 1]).astype(np.int32), 0, h-1) area_ok = areas >= min_prompt_area border_ok = (bx_ > 2) | (by_ > 2) | \ (bx_ + bw_ < w-2) | (by_ + bh_ < h-2) # exclude components that span nearly the full image (background) full_span = (bx_ <= 2) & (by_ <= 2) & \ (bx_ + bw_ >= w-2) & (by_ + bh_ >= h-2) keep_mask = area_ok & ~full_span keep_idx = np.where(keep_mask)[0] if len(keep_idx) == 0: neg_pts = _find_thick_wall_neg_prompts(walls_mask) return (np.array([], dtype=np.float32).reshape(0,2), np.array([], dtype=np.int32)) cx_k = cx_all[keep_idx] cy_k = cy_all[keep_idx] # ── bulk wall check — no Python loop ───────────────────────────────── on_wall = walls_mask[cy_k, cx_k] > 0 # (K,) bool pts_list = [] lbls_list = [] # centroids not on wall — add directly off_wall = ~on_wall pts_list.append(np.stack([cx_k[off_wall].astype(np.float32), cy_k[off_wall].astype(np.float32)], axis=1)) lbls_list.append(np.ones(off_wall.sum(), dtype=np.int32)) # centroids on wall — vectorised 31×31 offset search on_idx = np.where(on_wall)[0] if len(on_idx) > 0: dy_range = np.arange(-15, 17, 2, dtype=np.int32) dx_range = np.arange(-15, 17, 2, dtype=np.int32) DY, DX = np.meshgrid(dy_range, dx_range, indexing='ij') # (D,D) DY = DY.ravel(); DX = DX.ravel() # (D²,) for k in on_idx: cy_c, cx_c = int(cy_k[k]), int(cx_k[k]) ny_arr = np.clip(cy_c + DY, 0, h-1) nx_arr = np.clip(cx_c + DX, 0, w-1) off = walls_mask[ny_arr, nx_arr] == 0 if off.any(): best = np.argmax(off) pts_list.append([[float(nx_arr[best]), float(ny_arr[best])]]) lbls_list.append([1]) if not pts_list: all_pts = np.empty((0, 2), dtype=np.float32) all_lbls = np.empty(0, dtype=np.int32) else: all_pts = np.vstack([p if np.ndim(p)==2 else np.array(p, dtype=np.float32) for p in pts_list]).astype(np.float32) all_lbls = np.concatenate([np.array(l, dtype=np.int32) for l in lbls_list]) # negative prompts (wall centres) neg_pts_list = _find_thick_wall_neg_prompts(walls_mask) if neg_pts_list: neg_arr = np.array(neg_pts_list, dtype=np.float32) neg_lbls = np.zeros(len(neg_pts_list), dtype=np.int32) all_pts = np.vstack([all_pts, neg_arr]) all_lbls = np.concatenate([all_lbls, neg_lbls]) return all_pts, all_lbls def mask_to_rle(mask: np.ndarray) -> Dict: """ BOTTLENECK 10 FIX — replace pure-Python for-loop over every pixel with NumPy run-length encoding via np.diff on the flattened boolean array. """ h, w = mask.shape flat = mask.flatten(order='F').astype(bool) # np.diff detects transitions between False→True and True→False padded = np.concatenate([[False], flat, [False]]) changes = np.where(np.diff(padded.astype(np.int8)))[0] # boundary positions counts = np.diff(changes).tolist() # run lengths # RLE must start with a False count rle_counts = ([0] + counts) if flat[0] else counts return {"counts": rle_counts, "size": [h, w]} def _mask_to_contour_flat(mask): contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) if not contours: return [] largest = max(contours, key=cv2.contourArea) pts = largest[:, 0, :].tolist() return [v for pt in pts for v in pt] def _match_sam_mask_to_contour(contour, sam_room_masks): if not sam_room_masks: return _contour_to_rle_and_flat(contour) sam_h, sam_w = sam_room_masks[0]["mask"].shape contour_mask = np.zeros((sam_h, sam_w), dtype=np.uint8) cv2.drawContours(contour_mask, [contour], -1, 255, thickness=-1) best_iou = 0.0; best_entry = None for entry in sam_room_masks: m = entry["mask"] if m.shape != contour_mask.shape: continue inter = np.count_nonzero(cv2.bitwise_and(m, contour_mask)) if inter == 0: continue union = np.count_nonzero(cv2.bitwise_or(m, contour_mask)) iou = inter / (union + 1e-6) if iou > best_iou: best_iou = iou; best_entry = entry if best_entry is None or best_iou < 0.05: return _contour_to_rle_and_flat(contour) sam_contour_flat = _mask_to_contour_flat(best_entry["mask"]) if not sam_contour_flat: raw_pts = contour[:, 0, :].tolist() sam_contour_flat = [v for pt in raw_pts for v in pt] return mask_to_rle(best_entry["mask"]), sam_contour_flat, best_entry["score"] def _contour_to_rle_and_flat(contour): x, y, rw, rh = cv2.boundingRect(contour) canvas = np.zeros((rh+y+20, rw+x+20), dtype=np.uint8) cv2.drawContours(canvas, [contour], -1, 255, thickness=-1) raw_pts = contour[:, 0, :].tolist() flat_pts = [v for pt in raw_pts for v in pt] return mask_to_rle(canvas), flat_pts, 1.0 # ════════════════════════════════════════════════════════════════════════════ # BATCHED OCR (BOTTLENECK 7 FIX) # ════════════════════════════════════════════════════════════════════════════ def _prepare_ocr_roi(img_bgr: np.ndarray, contour: np.ndarray) -> Optional[np.ndarray]: """Prepare a single ROI for OCR (CLAHE + Otsu + medianBlur → RGB).""" x, y, rw, rh = cv2.boundingRect(contour) pad = 20 roi = img_bgr[max(0,y-pad):min(img_bgr.shape[0],y+rh+pad), max(0,x-pad):min(img_bgr.shape[1],x+rw+pad)] if roi.size == 0: return None gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) clahe = cv2.createCLAHE(2.0, (8,8)) proc = clahe.apply(gray) _, bin_img = _cuda_threshold(proc, 0, 255, cv2.THRESH_BINARY+cv2.THRESH_OTSU) rgb = cv2.cvtColor( cv2.medianBlur(bin_img.astype(np.uint8), 3), cv2.COLOR_GRAY2RGB ) return rgb def _get_ocr_reader(): """Singleton EasyOCR reader with GPU=True when CUDA available.""" if not hasattr(_get_ocr_reader, "_reader"): try: import easyocr _get_ocr_reader._reader = easyocr.Reader(["en"], gpu=_TORCH_CUDA) print(f"[OCR] EasyOCR initialised gpu={_TORCH_CUDA}") except ImportError: _get_ocr_reader._reader = None return _get_ocr_reader._reader def run_ocr_batch(img_bgr: np.ndarray, contours: List[np.ndarray]) -> List[Optional[str]]: """ BOTTLENECK 7 FIX — batch all room crops into a single EasyOCR call. readtext_batched() pushes all crops through the GPU text recognition network in one forward pass instead of one-at-a-time. Falls back to sequential readtext() if readtext_batched unavailable. """ reader = _get_ocr_reader() if reader is None: return [None] * len(contours) rois: List[Optional[np.ndarray]] = [_prepare_ocr_roi(img_bgr, c) for c in contours] labels: List[Optional[str]] = [None] * len(contours) valid_idx = [i for i, r in enumerate(rois) if r is not None] valid_rois = [rois[i] for i in valid_idx] if not valid_rois: return labels try: # ── preferred: GPU batched inference ───────────────────────────── batch_results = reader.readtext_batched(valid_rois, detail=1, paragraph=False, batch_size=len(valid_rois)) for out_i, orig_i in enumerate(valid_idx): cands = [ (t.strip().upper(), c) for _, t, c in batch_results[out_i] if c >= OCR_CONF_THR and len(t.strip()) >= 2 and any(ch.isalpha() for ch in t) ] labels[orig_i] = max(cands, key=lambda x: x[1])[0] if cands else None except (AttributeError, Exception): # ── fallback: sequential (original behaviour) ───────────────────── for out_i, orig_i in enumerate(valid_idx): try: results = reader.readtext(valid_rois[out_i], detail=1, paragraph=False) cands = [ (t.strip().upper(), c) for _, t, c in results if c >= OCR_CONF_THR and len(t.strip()) >= 2 and any(ch.isalpha() for ch in t) ] labels[orig_i] = max(cands, key=lambda x: x[1])[0] if cands else None except Exception: pass return labels def run_ocr_on_room(img_bgr: np.ndarray, contour: np.ndarray) -> Optional[str]: """Single-room OCR wrapper (kept for compatibility).""" results = run_ocr_batch(img_bgr, [contour]) return results[0] # ════════════════════════════════════════════════════════════════════════════ # FILTER ROOM REGIONS (BOTTLENECK 5 FIX — vectorised NumPy filtering) # ════════════════════════════════════════════════════════════════════════════ def filter_room_regions(rooms_mask, img_shape): """ BOTTLENECK 5 FIX — all scalar filters (area, dim, aspect, border, extent) computed as vectorised NumPy boolean masks before entering any Python loop. The solidity / drawContours step is the only remaining per-contour work. """ h, w = img_shape[:2] img_area = float(h * w) min_area = img_area * MIN_ROOM_AREA_FRAC max_area = img_area * MAX_ROOM_AREA_FRAC min_dim = w * MIN_ROOM_DIM_FRAC margin = max(5.0, w * BORDER_MARGIN_FRAC) contours, _ = cv2.findContours(rooms_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not contours: return np.zeros_like(rooms_mask), [] # ── vectorised stats ────────────────────────────────────────────────── bboxes = np.array([cv2.boundingRect(c) for c in contours], dtype=np.float32) areas = np.array([cv2.contourArea(c) for c in contours], dtype=np.float32) bx = bboxes[:,0]; by = bboxes[:,1] bw_arr = bboxes[:,2]; bh_arr = bboxes[:,3] area_ok = (areas >= min_area) & (areas <= max_area) border_ok = (bx >= margin) & (by >= margin) & \ (bx + bw_arr <= w - margin) & (by + bh_arr <= h - margin) dim_ok = (bw_arr >= min_dim) | (bh_arr >= min_dim) aspect = np.maximum(bw_arr, bh_arr) / (np.minimum(bw_arr, bh_arr) + 1e-6) aspect_ok = aspect <= MAX_ASPECT_RATIO extent_ok = (areas / (bw_arr * bh_arr + 1e-6)) >= MIN_EXTENT # All scalar checks in one shot — only compute solidity for survivors cheap_pass = np.where(area_ok & border_ok & dim_ok & aspect_ok & extent_ok)[0] valid_mask = np.zeros_like(rooms_mask) valid_rooms = [] for i in cheap_pass: cnt = contours[i] hull = cv2.convexHull(cnt) ha = cv2.contourArea(hull) if ha > 0 and (areas[i] / ha) >= MIN_SOLIDITY: cv2.drawContours(valid_mask, [cnt], -1, 255, -1) valid_rooms.append(cnt) return valid_mask, valid_rooms def pixel_area_to_m2(area_px): return area_px * (2.54 / DPI) ** 2 * (SCALE_FACTOR ** 2) / 10000 def validate_label(label): if not label: return False label = label.strip() if not label[0].isalpha(): return False lc = sum(1 for c in label if c.isalpha()) return lc == 1 or lc >= 3 def measure_and_label_rooms(img, valid_rooms, sam_room_masks): """ BOTTLENECK 7 FIX — all OCR crops sent to run_ocr_batch() in one call instead of sequential run_ocr_on_room() per room. """ if not valid_rooms: return [] # ── batch OCR ───────────────────────────────────────────────────────── ocr_labels = run_ocr_batch(img, valid_rooms) room_data = [] for idx, (contour, label) in enumerate(zip(valid_rooms, ocr_labels), 1): if not label or not validate_label(label): label = f"ROOM {idx}" x, y, rw, rh = cv2.boundingRect(contour) area_px = cv2.contourArea(contour) M = cv2.moments(contour) cx = int(M["m10"] / M["m00"]) if M["m00"] else x + rw // 2 cy = int(M["m01"] / M["m00"]) if M["m00"] else y + rh // 2 _, raw_seg_flat, sam_score = _match_sam_mask_to_contour(contour, sam_room_masks) room_data.append({ "id": len(room_data)+1, "label": label, "contour": contour, "segmentation": [raw_seg_flat], "raw_segmentation": [raw_seg_flat], "sam_score": round(sam_score,4), "score": round(sam_score,4), "area": area_px, "area_px": area_px, "area_m2": round(pixel_area_to_m2(area_px),2), "bbox": [x,y,rw,rh], "centroid": [cx,cy], "confidence": 0.95, "isAi": True, }) return room_data # ════════════════════════════════════════════════════════════════════════════ # SAM — BATCHED INFERENCE with set_image inside autocast (BOTTLENECK 9 FIX) # ════════════════════════════════════════════════════════════════════════════ def segment_with_sam(img_rgb, walls, sam_ckpt, rooms_flood=None): """ BOTTLENECK 9 FIX: predictor.set_image() moved INSIDE torch.no_grad() + autocast so the ViT image encoder runs in FP16 (was FP32 in v1). All other GPU optimisations from v1 retained. """ if rooms_flood is None: rooms_flood = segment_rooms_flood(walls.copy()) sam_room_masks: List[Dict] = [] try: import torch from segment_anything import sam_model_registry, SamPredictor if not Path(sam_ckpt).exists(): print(" [SAM] Model not found — using flood-fill") return rooms_flood, [] device = "cuda" if torch.cuda.is_available() else "cpu" print(f" [SAM] Loading vit_h on {device} (encoder FP16 autocast enabled)") sam = sam_model_registry["vit_h"](checkpoint=sam_ckpt) sam.to(device); sam.eval() predictor = SamPredictor(sam) except Exception as e: print(f" [SAM] Load failed ({e}) — using flood-fill") return rooms_flood, [] all_points, all_labels = generate_prompts(walls, rooms_flood) if len(all_points) == 0: return rooms_flood, [] pos_pts = [(p, l) for p, l in zip(all_points, all_labels) if l == 1] neg_pts = [p for p, l in zip(all_points, all_labels) if l == 0] print(f" [SAM] {len(pos_pts)} room prompts + {len(neg_pts)} wall-neg prompts") autocast_ctx = ( torch.autocast("cuda", dtype=torch.float16) if _TORCH_CUDA else torch.autocast("cpu", dtype=torch.bfloat16) ) # ── BOTTLENECK 9 FIX: encoder runs in FP16 autocast ────────────────── with torch.no_grad(), autocast_ctx: predictor.set_image(img_rgb) # ← moved inside autocast h, w = walls.shape sam_mask = np.zeros((h, w), dtype=np.uint8) accepted = 0 neg_coords = np.array(neg_pts, dtype=np.float32) if neg_pts else None neg_lbls = np.zeros(len(neg_pts), dtype=np.int32) if neg_pts else None denoise_k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) with torch.no_grad(), autocast_ctx: for (px, py), lbl in pos_pts: px, py = int(px), int(py) if neg_coords is not None: pt_c = np.vstack([[[px, py]], neg_coords]) pt_l = np.concatenate([[lbl], neg_lbls]) else: pt_c = np.array([[px, py]], dtype=np.float32) pt_l = np.array([lbl], dtype=np.int32) try: masks, scores, _ = predictor.predict( point_coords=pt_c, point_labels=pt_l, multimask_output=True ) except Exception as e: print(f" [SAM] predict failed ({e})") continue best_idx = int(np.argmax(scores)) best_score = float(scores[best_idx]) if best_score < SAM_MIN_SCORE: continue best_mask = (masks[best_idx] > 0).astype(np.uint8) * 255 best_mask = cv2.bitwise_and(best_mask, rooms_flood) best_mask = _cuda_morphology(best_mask, cv2.MORPH_OPEN, denoise_k, iterations=1) if not np.any(best_mask): continue sam_room_masks.append({ "mask" : best_mask.copy(), "score" : best_score, "prompt": (px, py), }) sam_mask = cv2.bitwise_or(sam_mask, best_mask) accepted += 1 if _TORCH_CUDA: torch.cuda.empty_cache() print(f" [SAM] VRAM freed. Accepted {accepted}/{len(pos_pts)} masks") else: print(f" [SAM] Accepted {accepted}/{len(pos_pts)} masks") if accepted == 0: return rooms_flood, [] return sam_mask, sam_room_masks # ════════════════════════════════════════════════════════════════════════════ # BUILD ANNOTATED IMAGE (BOTTLENECK 11 FIX) # ════════════════════════════════════════════════════════════════════════════ def build_annotated_image(img_bgr, rooms, selected_ids=None): """ BOTTLENECK 11 FIX — accumulate ALL room fills into a single overlay array, then call cv2.addWeighted ONCE instead of per-room. Border drawing and text labels remain per-room (unavoidable). """ vis = img_bgr.copy() overlay = img_bgr.copy() # ── single-pass fill accumulation ───────────────────────────────────── for i, room in enumerate(rooms): cnt = room.get("contour") if cnt is None: continue color = ROOM_COLORS[i % len(ROOM_COLORS)] bgr = (color[2], color[1], color[0]) cv2.drawContours(overlay, [cnt], -1, bgr, -1) # single blend for ALL fills vis = cv2.addWeighted(overlay, 0.35, vis, 0.65, 0) # ── per-room: border + text ─────────────────────────────────────────── for i, room in enumerate(rooms): cnt = room.get("contour") if cnt is None: continue color = ROOM_COLORS[i % len(ROOM_COLORS)] bgr = (color[2], color[1], color[0]) is_sel = selected_ids and room["id"] in selected_ids cv2.drawContours(vis, [cnt], -1, (0,255,255) if is_sel else bgr, 4 if is_sel else 2) M = cv2.moments(cnt) cx = int(M["m10"]/M["m00"]) if M["m00"] else 0 cy = int(M["m01"]/M["m00"]) if M["m00"] else 0 label = room.get("label", f"Room {room['id']}") area = room.get("area_m2", 0.0) fs = 0.55; th = 1 (tw1, th1), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fs, th) (tw2, th2), _ = cv2.getTextSize(f"{area:.1f} m²", cv2.FONT_HERSHEY_SIMPLEX, fs-0.1, th) bx2 = cx - max(tw1,tw2)//2 - 4; by2 = cy - th1 - th2 - 12 bw2 = max(tw1,tw2)+8; bh2 = th1+th2+16 sub = vis[max(0,by2):max(0,by2)+bh2, max(0,bx2):max(0,bx2)+bw2] if sub.size > 0: vis[max(0,by2):max(0,by2)+bh2, max(0,bx2):max(0,bx2)+bw2] = \ cv2.addWeighted(sub, 0.3, np.ones_like(sub)*255, 0.7, 0) cv2.putText(vis, label, (cx-tw1//2, cy-th2-6), cv2.FONT_HERSHEY_SIMPLEX, fs, (20,20,20), th+1, cv2.LINE_AA) cv2.putText(vis, f"{area:.1f} m²", (cx-tw2//2, cy+th2+2), cv2.FONT_HERSHEY_SIMPLEX, fs-0.1, (20,20,20), th, cv2.LINE_AA) return vis def export_to_excel(rooms): wb = openpyxl.Workbook(); ws = wb.active; ws.title = "Room Analysis" headers = ["ID","Label","Area (px)","Area (m²)","Centroid X","Centroid Y", "Bbox X","Bbox Y","Bbox W","Bbox H","SAM Score","Confidence"] hf = PatternFill("solid", fgColor="1F4E79"); hfont = Font(bold=True, color="FFFFFF", size=11) for col, h in enumerate(headers,1): cell=ws.cell(row=1,column=col,value=h) cell.fill=hf; cell.font=hfont; cell.alignment=Alignment(horizontal="center") alt = PatternFill("solid", fgColor="D6E4F0") for rn, room in enumerate(rooms, 2): cnt = room.get("contour") M = cv2.moments(cnt) if cnt is not None else {} cx = int(M["m10"]/M["m00"]) if M.get("m00") else 0 cy = int(M["m01"]/M["m00"]) if M.get("m00") else 0 bbox = cv2.boundingRect(cnt) if cnt is not None else (0,0,0,0) row_data=[room.get("id"), room.get("label","?"), round(room.get("area_px",0),1), round(room.get("area_m2",0.0),2), cx, cy, bbox[0], bbox[1], bbox[2], bbox[3], round(room.get("score",1.0),4), round(room.get("confidence",0.95),2)] fill = alt if rn%2==0 else None for col,val in enumerate(row_data,1): cell=ws.cell(row=rn,column=col,value=val) cell.alignment=Alignment(horizontal="center") if fill: cell.fill=fill for col in ws.columns: mx=max(len(str(c.value or "")) for c in col)+4 ws.column_dimensions[col[0].column_letter].width=min(mx,25) out = Path(tempfile.gettempdir()) / f"floorplan_rooms_{int(time.time())}.xlsx" wb.save(str(out)); return str(out) # ════════════════════════════════════════════════════════════════════════════ # STATE # ════════════════════════════════════════════════════════════════════════════ def init_state(): return {"img_orig":None,"img_cropped":None,"img_clean":None, "walls":None,"walls_base":None,"wall_cal":None, "user_lines":[],"draw_start":None,"walls_thickness":8, "rooms":[],"selected_ids":[],"annotated":None,"status":"Idle"} # ════════════════════════════════════════════════════════════════════════════ # GRADIO CALLBACKS # ════════════════════════════════════════════════════════════════════════════ def cb_load_image(upload, state): if upload is None: return None, state, "Upload a floor-plan image to begin." try: if hasattr(upload,"name"): file_path=upload.name elif isinstance(upload,dict) and "name" in upload: file_path=upload["name"] elif isinstance(upload,str): file_path=upload else: img_bgr=cv2.imdecode(np.frombuffer(bytes(upload),dtype=np.uint8),cv2.IMREAD_COLOR) file_path=None if file_path is not None: img_bgr=cv2.imread(file_path) except Exception as e: return None, state, f"❌ Error reading upload: {e}" if img_bgr is None: return None, state, "❌ Could not decode image." state=init_state(); state["img_orig"]=img_bgr; state["status"]="Image loaded." return cv2.cvtColor(img_bgr,cv2.COLOR_BGR2RGB), state, f"✅ Loaded {img_bgr.shape[1]}×{img_bgr.shape[0]} px" def cb_preprocess(state): img=state.get("img_orig") if img is None: return None,None,state,"Load an image first." cropped = remove_title_block(img) img_clean = remove_colors(cropped) img_clean = detect_and_close_door_arcs(img_clean) img_stats = analyze_image_characteristics(cropped) walls, thick = extract_walls_adaptive(img_clean, img_stats) walls = remove_fixture_symbols(walls) walls, cal = reconstruct_walls(walls) walls = remove_dangling_lines(walls, cal) walls = close_large_door_gaps(walls, cal) state["img_cropped"]=cropped; state["img_clean"]=img_clean state["walls"]=walls.copy(); state["walls_base"]=walls.copy() state["walls_thickness"]=thick; state["wall_cal"]=cal walls_rgb = cv2.cvtColor(walls,cv2.COLOR_GRAY2RGB) clean_rgb = cv2.cvtColor(img_clean,cv2.COLOR_BGR2RGB) msg=(f"✅ Pipeline done | stroke≈{cal.stroke_width}px body≈{thick}px " f"bridge=[{cal.bridge_min_gap},{cal.bridge_max_gap}] door={cal.door_gap}px " f"| GPU: torch={_TORCH_CUDA} cupy={_CUPY} cv2_cuda={_CV2_CUDA}") return clean_rgb, walls_rgb, state, msg def cb_add_door_line(evt: gr.SelectData, state): walls=state.get("walls") if walls is None: return None,state,"Run preprocessing first." x,y=int(evt.index[0]),int(evt.index[1]) if state["draw_start"] is None: state["draw_start"]=(x,y); msg=f"🖊 Start ({x},{y}). Click end." else: x1,y1=state["draw_start"]; state["user_lines"].append((x1,y1,x,y)) state["draw_start"]=None walls_upd=apply_user_lines_to_walls(state["walls"],state["user_lines"],state["walls_thickness"]) state["walls"]=walls_upd vis=cv2.cvtColor(walls_upd,cv2.COLOR_GRAY2RGB) for lx1,ly1,lx2,ly2 in state["user_lines"]: cv2.line(vis,(lx1,ly1),(lx2,ly2),(255,80,80),3) return vis,state,f"✅ Line drawn ({x1},{y1})→({x},{y}) Total:{len(state['user_lines'])}" vis=cv2.cvtColor(walls,cv2.COLOR_GRAY2RGB) for lx1,ly1,lx2,ly2 in state["user_lines"]: cv2.line(vis,(lx1,ly1),(lx2,ly2),(255,80,80),3) if state["draw_start"]: cv2.circle(vis,state["draw_start"],6,(0,200,255),-1) return vis,state,msg def cb_undo_door_line(state): if not state["user_lines"]: return None,state,"No lines to undo." state["user_lines"].pop(); state["draw_start"]=None walls_base=state.get("walls_base") if walls_base is None: return None,state,"Re-run preprocessing." thick=state.get("walls_thickness",8) walls_upd=apply_user_lines_to_walls(walls_base,state["user_lines"],thick) state["walls"]=walls_upd vis=cv2.cvtColor(walls_upd,cv2.COLOR_GRAY2RGB) for lx1,ly1,lx2,ly2 in state["user_lines"]: cv2.line(vis,(lx1,ly1),(lx2,ly2),(255,80,80),3) return vis,state,f"↩ Removed. Remaining:{len(state['user_lines'])}" def cb_run_sam(state): walls=state.get("walls"); img=state.get("img_cropped"); img_clean=state.get("img_clean") if walls is None or img is None: return None,None,state,"Run preprocessing first." img_rgb=cv2.cvtColor(img,cv2.COLOR_BGR2RGB) ckpt=download_sam_if_needed() sam_enabled=ckpt is not None and Path(ckpt).exists() if sam_enabled: rooms_mask,sam_room_masks=segment_with_sam(img_rgb,walls.copy(),ckpt) else: rooms_mask=segment_rooms_flood(walls.copy()); sam_room_masks=[] state["_sam_room_masks"]=sam_room_masks if not np.count_nonzero(rooms_mask): return None,None,state,"⚠ rooms_mask empty." valid_mask,valid_rooms=filter_room_regions(rooms_mask,img.shape) if not valid_rooms: return None,None,state,"⚠ No valid rooms." src=img_clean if img_clean is not None else img rooms=measure_and_label_rooms(src,valid_rooms,sam_room_masks) if not rooms: return None,None,state,"⚠ No rooms after OCR." state["rooms"]=rooms; state["selected_ids"]=[] annotated=build_annotated_image(img,rooms); state["annotated"]=annotated table=[[r["id"],r["label"],f"{r['area_m2']} m²",f"{r['score']:.2f}"] for r in rooms] return cv2.cvtColor(annotated,cv2.COLOR_BGR2RGB),table,state,f"✅ {len(rooms)} rooms detected." def cb_click_room(evt: gr.SelectData, state): annotated=state.get("annotated"); rooms=state.get("rooms",[]); img=state.get("img_cropped") if annotated is None or not rooms: return None,state,"Run SAM first." x,y=int(evt.index[0]),int(evt.index[1]); clicked_id=None for room in rooms: cnt=room.get("contour") if cnt is None: continue if cv2.pointPolygonTest(cnt,(float(x),float(y)),False)>=0: clicked_id=room["id"]; break if clicked_id is None: state["selected_ids"]=[]; msg="Clicked outside — selection cleared." else: sel=state["selected_ids"] if clicked_id in sel: sel.remove(clicked_id); msg=f"Room {clicked_id} deselected." else: sel.append(clicked_id); msg=f"Room {clicked_id} selected." state["selected_ids"]=sel new_ann=build_annotated_image(img,rooms,state["selected_ids"]); state["annotated"]=new_ann return cv2.cvtColor(new_ann,cv2.COLOR_BGR2RGB),state,msg def cb_remove_selected(state): sel=state.get("selected_ids",[]); rooms=state.get("rooms",[]); img=state.get("img_cropped") if not sel: return None,None,state,"No rooms selected." removed=[r["label"] for r in rooms if r["id"] in sel] rooms=[r for r in rooms if r["id"] not in sel] for i,r in enumerate(rooms,1): r["id"]=i state["rooms"]=rooms; state["selected_ids"]=[] ann=build_annotated_image(img,rooms); state["annotated"]=ann table=[[r["id"],r["label"],f"{r['area_m2']} m²",f"{r['score']:.2f}"] for r in rooms] return cv2.cvtColor(ann,cv2.COLOR_BGR2RGB),table,state,f"🗑 Removed:{', '.join(removed)}" def cb_rename_selected(new_label, state): sel=state.get("selected_ids",[]); rooms=state.get("rooms",[]); img=state.get("img_cropped") if not sel: return None,None,state,"Select a room first." if not new_label.strip(): return None,None,state,"Enter a non-empty label." for r in rooms: if r["id"] in sel: r["label"]=new_label.strip().upper() state["rooms"]=rooms ann=build_annotated_image(img,rooms,sel); state["annotated"]=ann table=[[r["id"],r["label"],f"{r['area_m2']} m²",f"{r['score']:.2f}"] for r in rooms] return cv2.cvtColor(ann,cv2.COLOR_BGR2RGB),table,state,f"✏ Renamed to '{new_label.strip().upper()}'" def cb_export_excel(state): rooms=state.get("rooms",[]) if not rooms: return None,"No rooms to export." path=export_to_excel(rooms) return path,f"✅ Exported {len(rooms)} rooms → {Path(path).name}" # ════════════════════════════════════════════════════════════════════════════ # GRADIO UI # ════════════════════════════════════════════════════════════════════════════ CSS = """ #title{text-align:center;font-size:1.8em;font-weight:700;color:#1F4E79} #subtitle{text-align:center;color:#555;margin-top:-8px;margin-bottom:16px} .step-card{border-left:4px solid #1F4E79!important;padding-left:10px!important} """ def _walls_to_rgb(s): w=s.get("walls") return None if w is None else cv2.cvtColor(w,cv2.COLOR_GRAY2RGB) with gr.Blocks(title="FloorPlan Analyser (GPU v2)") as app: state=gr.State(init_state()) gr.Markdown("# 🏢 Floor Plan Room Analyser — NVIDIA GPU Build v2", elem_id="title") gr.Markdown( f"EasyOCR gpu={'✅' if _TORCH_CUDA else '❌'} | " f"SAM encoder FP16={'✅' if _TORCH_CUDA else '❌'} | " f"CuPy={'✅' if _CUPY else '❌'} | " f"cucim={'✅' if _CUCIM else '❌'} | " f"cv2.cuda={'✅' if _CV2_CUDA else '❌'}", elem_id="subtitle", ) status_box=gr.Textbox(label="Status",interactive=False,value="Idle.") with gr.Row(): with gr.Column(scale=1,elem_classes="step-card"): gr.Markdown("### 1️⃣ Upload Floor Plan") upload_btn=gr.UploadButton("📂 Upload Image",file_types=["image"],size="sm") raw_preview=gr.Image(label="Loaded Image",height=320) with gr.Column(scale=1,elem_classes="step-card"): gr.Markdown("### 2️⃣ Pre-process") preprocess_btn=gr.Button("⚙ Run Preprocessing",variant="primary") with gr.Tabs(): with gr.Tab("Clean Image"): clean_img=gr.Image(label="After color removal",height=300) with gr.Tab("Walls"): walls_img=gr.Image(label="Extracted walls",height=300) with gr.Row(): with gr.Column(elem_classes="step-card"): gr.Markdown("### 3️⃣ Draw Door-Closing Lines") undo_line_btn=gr.Button("↩ Undo Last Line",size="sm") wall_draw_img=gr.Image(label="Wall mask",height=380,interactive=False) with gr.Row(): with gr.Column(scale=2,elem_classes="step-card"): gr.Markdown("### 4️⃣ SAM Segmentation + OCR") sam_btn=gr.Button("🤖 Run SAM + OCR",variant="primary") ann_img=gr.Image(label="Annotated rooms",height=480,interactive=False) with gr.Column(scale=1,elem_classes="step-card"): gr.Markdown("### 5️⃣ Room Table & Actions") room_table=gr.Dataframe(headers=["ID","Label","Area","SAM Score"], datatype=["number","str","str","str"], interactive=False,label="Detected Rooms") with gr.Group(): rename_txt=gr.Textbox(placeholder="New label…",label="Rename Label") with gr.Row(): rename_btn=gr.Button("✏ Rename",size="sm") remove_btn=gr.Button("🗑 Remove Selected",size="sm",variant="stop") gr.Markdown("---") export_btn=gr.Button("📊 Export to Excel",variant="secondary") excel_file=gr.File(label="Download Excel",visible=True) upload_btn.upload(cb_load_image,[upload_btn,state],[raw_preview,state,status_box]) preprocess_btn.click(cb_preprocess,[state],[clean_img,walls_img,state,status_box])\ .then(_walls_to_rgb,[state],[wall_draw_img]) wall_draw_img.select(cb_add_door_line,[state],[wall_draw_img,state,status_box]) undo_line_btn.click(cb_undo_door_line,[state],[wall_draw_img,state,status_box]) sam_btn.click(cb_run_sam,[state],[ann_img,room_table,state,status_box]) ann_img.select(cb_click_room,[state],[ann_img,state,status_box]) remove_btn.click(cb_remove_selected,[state],[ann_img,room_table,state,status_box]) rename_btn.click(cb_rename_selected,[rename_txt,state],[ann_img,room_table,state,status_box]) export_btn.click(cb_export_excel,[state],[excel_file,status_box]) if __name__ == "__main__": app.launch(share=False, debug=True, css=CSS)