diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,22 +1,21 @@ """ -FloorPlan Analyser — Gradio Application (NVIDIA CUDA-Optimised Build) -======================================================================= -GPU improvements over baseline: - • EasyOCR : gpu=True (was hardcoded gpu=False) - • SAM inference : batched predict_batch() under torch.no_grad() + - torch.autocast("cuda") for FP16 speed-up - • OpenCV : cv2.cuda.* used for GaussianBlur, threshold, - morphologyEx, dilate wherever CUDA mat is valid - • Heavy NumPy : CuPy (cp.*) used for distance/angle arrays in - _bridge_wall_endpoints_v2 and close_large_door_gaps - • Memory mgmt : torch.cuda.empty_cache() after SAM; pin_memory - transfers; torch.no_grad() guard throughout - • cv2.cuda stream: single persistent CUDA stream for all cv2.cuda ops +FloorPlan Analyser — Streamlined GPU Build +========================================== +Pipeline: Upload → Crop → Remove Colours → Extract Walls → SAM → OCR Validate → Annotate → Excel +Removed: calibrate_wall, WallCalibration, skeleton/tip helpers, _outward_vectors, + _bridge_wall_endpoints_v2, _close_door_openings_v2, reconstruct_walls, + remove_dangling_lines, close_large_door_gaps, _filter_double_lines_and_thick, + remove_fixture_symbols, detect_and_close_door_arcs, _morphological_skeleton, + _skel, _tip_pixels, _find_thick_wall_neg_prompts (replaced inline), + _remove_thin_lines_calibrated, _estimate_wall_body_thickness (simplified), + apply_user_lines_to_walls, render_wall_canvas, door-closing UI panel, + cb_add_door_line, cb_undo_door_line, cb_clear_door_lines, cb_set_tool """ from __future__ import annotations import io, json, os, tempfile, time, requests, threading +from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Tuple @@ -26,7 +25,7 @@ import gradio as gr import openpyxl from openpyxl.styles import Font, PatternFill, Alignment -# ── GPU availability flags ─────────────────────────────────────────────────── +# ── GPU flags ──────────────────────────────────────────────────────────────── try: import torch _TORCH_CUDA = torch.cuda.is_available() @@ -38,23 +37,25 @@ try: _CUPY = True except ImportError: _CUPY = False - cp = None # type: ignore + cp = None # type: ignore -# Persistent CUDA stream for cv2.cuda ops (avoids per-call stream creation) -_CV2_CUDA = cv2.cuda.getCudaEnabledDeviceCount() > 0 -_CUDA_STREAM: Optional[cv2.cuda.Stream] = cv2.cuda.Stream() if _CV2_CUDA else None # type: ignore +try: + _CV2_CUDA = cv2.cuda.getCudaEnabledDeviceCount() > 0 + _CUDA_STREAM: Optional[Any] = cv2.cuda.Stream() if _CV2_CUDA else None +except Exception: + _CV2_CUDA = False + _CUDA_STREAM = None print(f"[GPU] torch_cuda={_TORCH_CUDA} cupy={_CUPY} cv2_cuda={_CV2_CUDA}") -# ─── SAM HuggingFace endpoint ──────────────────────────────────────────────── +# ── SAM checkpoint ─────────────────────────────────────────────────────────── HF_REPO = "Pream912/sam" -HF_API = f"https://huggingface.co/{HF_REPO}/resolve/main" SAM_CKPT = Path(tempfile.gettempdir()) / "sam_vit_h_4b8939.pth" -SAM_URL = f"{HF_API}/sam_vit_h_4b8939.pth" - -DPI = 300 -SCALE_FACTOR = 100 +SAM_URL = f"https://huggingface.co/{HF_REPO}/resolve/main/sam_vit_h_4b8939.pth" +# ── Constants ──────────────────────────────────────────────────────────────── +DPI = 300 +SCALE_FACTOR = 100 MIN_ROOM_AREA_FRAC = 0.000004 MAX_ROOM_AREA_FRAC = 0.08 MIN_ROOM_DIM_FRAC = 0.01 @@ -64,10 +65,8 @@ MIN_SOLIDITY = 0.25 MIN_EXTENT = 0.08 OCR_CONF_THR = 0.3 SAM_MIN_SCORE = 0.70 -SAM_CLOSET_THR = 300 SAM_WALL_NEG = 20 SAM_WALL_PCT = 75 -WALL_MIN_HALF_PX = 3 ROOM_COLORS = [ (255, 99, 71), (100, 149, 237), (60, 179, 113), @@ -78,61 +77,37 @@ ROOM_COLORS = [ # ════════════════════════════════════════════════════════════════════════════ -# GPU-ACCELERATED OpenCV HELPERS +# GPU HELPERS (cv2.cuda with CPU fallback) # ═════════════════════════════════════════════════��══════════════════════════ -def _cuda_upload(img: np.ndarray) -> "cv2.cuda.GpuMat": - """Upload a numpy array to GPU memory.""" - gm = cv2.cuda_GpuMat() - gm.upload(img, stream=_CUDA_STREAM) - return gm - - -def _cuda_gaussian_blur(gray: np.ndarray, ksize: Tuple[int,int], sigma: float) -> np.ndarray: - """GaussianBlur on GPU when available, CPU fallback.""" - if _CV2_CUDA: - g_gpu = _cuda_upload(gray) - filt = cv2.cuda.createGaussianFilter( - cv2.CV_8UC1, cv2.CV_8UC1, ksize, sigma - ) - out = filt.apply(g_gpu, stream=_CUDA_STREAM) - return out.download() - return cv2.GaussianBlur(gray, ksize, sigma) - - def _cuda_threshold(gray: np.ndarray, thr: float, maxval: float, typ: int ) -> Tuple[float, np.ndarray]: - """Threshold on GPU when available.""" if _CV2_CUDA: - g_gpu = _cuda_upload(gray) - ret, dst = cv2.cuda.threshold(g_gpu, thr, maxval, typ, stream=_CUDA_STREAM) - return ret, dst.download() + try: + gm = cv2.cuda_GpuMat(); gm.upload(gray, stream=_CUDA_STREAM) + ret, dst = cv2.cuda.threshold(gm, thr, maxval, typ, stream=_CUDA_STREAM) + return ret, dst.download() + except Exception: + pass return cv2.threshold(gray, thr, maxval, typ) def _cuda_morphology(src: np.ndarray, op: int, kernel: np.ndarray, iterations: int = 1) -> np.ndarray: - """MorphologyEx on GPU — falls back to CPU for unsupported ops.""" if _CV2_CUDA and op in (cv2.MORPH_ERODE, cv2.MORPH_DILATE, cv2.MORPH_OPEN, cv2.MORPH_CLOSE): - g_gpu = _cuda_upload(src) - filt = cv2.cuda.createMorphologyFilter( - op, cv2.CV_8UC1, kernel, iterations=iterations - ) - return filt.apply(g_gpu, stream=_CUDA_STREAM).download() + try: + gm = cv2.cuda_GpuMat(); gm.upload(src, stream=_CUDA_STREAM) + f = cv2.cuda.createMorphologyFilter(op, cv2.CV_8UC1, kernel, + iterations=iterations) + return f.apply(gm, stream=_CUDA_STREAM).download() + except Exception: + pass return cv2.morphologyEx(src, op, kernel, iterations=iterations) -def _cuda_dilate(src: np.ndarray, kernel: np.ndarray) -> np.ndarray: - if _CV2_CUDA: - g_gpu = _cuda_upload(src) - filt = cv2.cuda.createMorphologyFilter(cv2.MORPH_DILATE, cv2.CV_8UC1, kernel) - return filt.apply(g_gpu, stream=_CUDA_STREAM).download() - return cv2.dilate(src, kernel) - - # ════════════════════════════════════════════════════════════════════════════ -# PIPELINE HELPERS +# STEP 1 — DOWNLOAD SAM # ════════════════════════════════════════════════════════════════════════════ def download_sam_if_needed() -> Optional[str]: @@ -152,13 +127,17 @@ def download_sam_if_needed() -> Optional[str]: return None +# ════════════════════════════════════════════════════════════════════════════ +# STEP 2 — CROP (remove title block / legend border) +# ════════════════════════════════════════════════════════════════════════════ + def remove_title_block(img: np.ndarray) -> np.ndarray: h, w = img.shape[:2] gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) edges = cv2.Canny(gray, 50, 150) - h_kern = cv2.getStructuringElement(cv2.MORPH_RECT, (w // 20, 1)) - v_kern = cv2.getStructuringElement(cv2.MORPH_RECT, (1, h // 20)) + h_kern = cv2.getStructuringElement(cv2.MORPH_RECT, (w // 20, 1)) + v_kern = cv2.getStructuringElement(cv2.MORPH_RECT, (1, h // 20)) h_lines = _cuda_morphology(edges, cv2.MORPH_OPEN, h_kern) v_lines = _cuda_morphology(edges, cv2.MORPH_OPEN, v_kern) @@ -178,20 +157,24 @@ def remove_title_block(img: np.ndarray) -> np.ndarray: if crop_r == w and crop_b == h: main_d = np.sum(gray < 200) / gray.size - if np.sum(gray[:, int(w*0.8):] < 200) / (gray[:, int(w*0.8):].size) > main_d*1.5: + if np.sum(gray[:, int(w*0.8):] < 200) / (gray[:, int(w*0.8):].size + 1e-6) > main_d * 1.5: crop_r = int(w * 0.8) - if np.sum(gray[int(h*0.8):, :] < 200) / (gray[int(h*0.8):, :].size) > main_d*1.5: + if np.sum(gray[int(h*0.8):, :] < 200) / (gray[int(h*0.8):, :].size + 1e-6) > main_d * 1.5: crop_b = int(h * 0.8) - return img[:crop_b, :crop_r].copy() + return img[:max(crop_b, h // 4), :max(crop_r, w // 4)].copy() + +# ════════════════════════════════════════════════════════════════════════════ +# STEP 3 — REMOVE COLOURS (highlight / fill → white) +# ════════════════════════════════════════════════════════════════════════════ def remove_colors(img: np.ndarray) -> np.ndarray: - b = img[:,:,0].astype(np.int32) - g = img[:,:,1].astype(np.int32) - r = img[:,:,2].astype(np.int32) + b = img[:, :, 0].astype(np.int32) + g = img[:, :, 1].astype(np.int32) + r = img[:, :, 2].astype(np.int32) gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY).astype(np.int32) - chroma = np.maximum(np.maximum(r,g),b) - np.minimum(np.minimum(r,g),b) + chroma = np.maximum(np.maximum(r, g), b) - np.minimum(np.minimum(r, g), b) erase = (chroma > 15) & (gray < 240) result = img.copy() result[erase] = (255, 255, 255) @@ -199,981 +182,311 @@ def remove_colors(img: np.ndarray) -> np.ndarray: # ════════════════════════════════════════════════════════════════════════════ -# WALL CALIBRATION -# ════════════════════════════════════════════════════════════════════════════ - -from dataclasses import dataclass, field - -@dataclass -class WallCalibration: - stroke_width : int = 3 - min_component_dim : int = 30 - min_component_area: int = 45 - bridge_min_gap : int = 2 - bridge_max_gap : int = 14 - door_gap : int = 41 - max_bridge_thick : int = 15 - - -def calibrate_wall(mask: np.ndarray) -> WallCalibration: - cal = WallCalibration() - h, w = mask.shape - - n_cols = min(200, w) - col_idx = np.linspace(0, w-1, n_cols, dtype=int) - runs: List[int] = [] - max_run = max(2, int(h * 0.05)) - for ci in col_idx: - col = (mask[:, ci] > 0).astype(np.int8) - pad = np.concatenate([[0], col, [0]]) - d = np.diff(pad.astype(np.int16)) - s = np.where(d == 1)[0] - e = np.where(d == -1)[0] - n = min(len(s), len(e)) - r = (e[:n] - s[:n]).astype(int) - runs.extend(r[(r >= 1) & (r <= max_run)].tolist()) - if runs: - arr = np.array(runs, dtype=np.int32) - hist = np.bincount(np.clip(arr, 0, 200)) - cal.stroke_width = max(2, int(np.argmax(hist[1:])) + 1) - - cal.min_component_dim = max(15, cal.stroke_width * 10) - cal.min_component_area = max(30, cal.stroke_width * cal.min_component_dim // 2) - - gap_sizes: List[int] = [] - row_step = max(3, h // 200) - col_step = max(3, w // 200) - for row in range(5, h-5, row_step): - rd = (mask[row, :] > 0).astype(np.int8) - pad = np.concatenate([[0], rd, [0]]) - dif = np.diff(pad.astype(np.int16)) - ends = np.where(dif == -1)[0] - starts = np.where(dif == 1)[0] - for e in ends: - nxt = starts[starts > e] - if len(nxt): - g = int(nxt[0] - e) - if 1 < g < 200: gap_sizes.append(g) - for col in range(5, w-5, col_step): - cd = (mask[:, col] > 0).astype(np.int8) - pad = np.concatenate([[0], cd, [0]]) - dif = np.diff(pad.astype(np.int16)) - ends = np.where(dif == -1)[0] - starts = np.where(dif == 1)[0] - for e in ends: - nxt = starts[starts > e] - if len(nxt): - g = int(nxt[0] - e) - if 1 < g < 200: gap_sizes.append(g) - - cal.bridge_min_gap = 2 - if len(gap_sizes) >= 20: - g = np.array(gap_sizes) - sm = g[g <= 30] - if len(sm) >= 10: - cal.bridge_max_gap = int(np.clip(np.percentile(sm, 75), 4, 20)) - else: - cal.bridge_max_gap = cal.stroke_width * 4 - door = g[(g > cal.bridge_max_gap) & (g <= 80)] - if len(door) >= 5: - raw = int(np.percentile(door, 90)) - else: - raw = max(35, cal.stroke_width * 12) - raw = int(np.clip(raw, 25, 80)) - cal.door_gap = raw if raw % 2 == 1 else raw + 1 - - cal.max_bridge_thick = cal.stroke_width * 5 - return cal - - -# ════════════════════════════════════════════════════════════════════════════ -# SKELETON / TIP HELPERS -# ════════════════════════════════════════════════════════════════════════════ - -def _skel(binary: np.ndarray) -> np.ndarray: - try: - from skimage.morphology import skeletonize as _sk - return (_sk(binary > 0) * 255).astype(np.uint8) - except ImportError: - return _morphological_skeleton(binary) - - -def _tip_pixels(skel_u8: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: - sb = (skel_u8 > 0).astype(np.float32) - nbr = cv2.filter2D(sb, -1, np.ones((3,3), np.float32), borderType=cv2.BORDER_CONSTANT) - return np.where((sb == 1) & (nbr.astype(np.int32) == 2)) - - -def _outward_vectors(ex, ey, skel_u8: np.ndarray, lookahead: int): - n = len(ex) - odx = np.zeros(n, np.float32) - ody = np.zeros(n, np.float32) - sy, sx = np.where(skel_u8 > 0) - skel_set = set(zip(sx.tolist(), sy.tolist())) - D8 = [(-1,0),(1,0),(0,-1),(0,1),(-1,-1),(-1,1),(1,-1),(1,1)] - for i in range(n): - ox, oy = int(ex[i]), int(ey[i]) - cx, cy = ox, oy - px, py = ox, oy - for _ in range(lookahead): - moved = False - for dx, dy in D8: - nx2, ny2 = cx+dx, cy+dy - if (nx2, ny2) == (px, py): continue - if (nx2, ny2) in skel_set: - px, py = cx, cy; cx, cy = nx2, ny2; moved = True; break - if not moved: break - ix, iy = float(cx-ox), float(cy-oy) - nr = max(1e-6, float(np.hypot(ix, iy))) - odx[i], ody[i] = -ix/nr, -iy/nr - return odx, ody - - -# ════════════════════════════════════════════════════════════════════════════ -# ANALYZE IMAGE CHARACTERISTICS -# ════════════════════════════════════════════════════════════════════════════ - -def analyze_image_characteristics(img: np.ndarray) -> Dict[str, Any]: - gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - brightness = float(np.mean(gray)) - contrast = float(np.std(gray)) - otsu_thr, _ = _cuda_threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) - if brightness > 220: - wall_threshold = max(200, int(otsu_thr * 1.1)) - elif brightness < 180: - wall_threshold = max(150, int(otsu_thr * 0.9)) - else: - wall_threshold = int(otsu_thr) - return {"brightness": brightness, "contrast": contrast, - "wall_threshold": wall_threshold, "otsu_threshold": otsu_thr} - - -# ════════════════════════════════════════════════════════════════════════════ -# DOOR ARC DETECTION — GPU-accelerated GaussianBlur + HoughCircles +# STEP 4 — EXTRACT WALLS (threshold → long-line morphology → dilate) +# No calibration, no skeleton, no dangling-line removal. # ════════════════════════════════════════════════════════════════════════════ -def detect_and_close_door_arcs(img: np.ndarray) -> np.ndarray: - R_MIN=60; R_MAX=320; DP=1.2; PARAM1=50; PARAM2=22; MIN_DIST=50 - MAX_ARC=115.0; MIN_ARC=60.0; LEAF_FRAC=0.92; LEAF_THR=0.35 - WALL_R=1.25; WALL_THR=12; SNAP_R=30 - DOUBLE_R_RATIO=1.4; DOUBLE_DIST=1.8; LINE_T=3 - - gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - h, w = gray.shape - result = img.copy() - - _, binary = _cuda_threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) - binary = _cuda_morphology(binary.astype(np.uint8), cv2.MORPH_CLOSE, - np.ones((3,3), np.uint8)) - # GPU GaussianBlur for HoughCircles input - blurred = _cuda_gaussian_blur(gray, (7,7), 1.5) - - raw = cv2.HoughCircles(blurred, cv2.HOUGH_GRADIENT, dp=DP, minDist=MIN_DIST, - param1=PARAM1, param2=PARAM2, minRadius=R_MIN, maxRadius=R_MAX) - if raw is None: - return result - - circles = np.round(raw[0]).astype(np.int32) - binary = binary.astype(np.uint8) - - def sample_ring(cx, cy, r, n=360): - ang = np.linspace(0, 2*np.pi, n, endpoint=False) - xs = np.clip((cx + r*np.cos(ang)).astype(np.int32), 0, w-1) - ys = np.clip((cy + r*np.sin(ang)).astype(np.int32), 0, h-1) - return ang, xs, ys - - def arc_span(cx, cy, r): - ang, xs, ys = sample_ring(cx, cy, r) - on = ang[binary[ys, xs] > 0] - if len(on) == 0: return 0.0, np.array([]) - return float(np.degrees(on[-1]-on[0])), on - - def has_leaf(cx, cy, r): - lr = r*LEAF_FRAC; n = max(60, int(r)) - ang = np.linspace(0, 2*np.pi, n, endpoint=False) - xs = np.clip((cx+lr*np.cos(ang)).astype(np.int32), 0, w-1) - ys = np.clip((cy+lr*np.sin(ang)).astype(np.int32), 0, h-1) - return float(np.mean(binary[ys,xs]>0)) >= LEAF_THR - - def wall_outside(cx, cy, r): - pr = r*WALL_R; ang = np.linspace(0, 2*np.pi, 36, endpoint=False) - xs = np.clip((cx+pr*np.cos(ang)).astype(np.int32), 0, w-1) - ys = np.clip((cy+pr*np.sin(ang)).astype(np.int32), 0, h-1) - return int(np.sum(binary[ys,xs]>0)) >= WALL_THR - - def endpoints(cx, cy, r, occ): - gap_t = np.radians(25.0); diffs = np.diff(occ) - big = np.where(diffs > gap_t)[0] - if len(big) == 0: sa, ea = occ[0], occ[-1] - else: - sp = big[np.argmax(diffs[big])] - sa, ea = occ[sp+1], occ[sp] - def snap(a): - px2 = int(round(cx+r*np.cos(a))); py2 = int(round(cy+r*np.sin(a))) - y0=max(0,py2-SNAP_R); y1=min(h,py2+SNAP_R+1) - x0=max(0,px2-SNAP_R); x1=min(w,px2+SNAP_R+1) - roi = binary[y0:y1, x0:x1] - wy2, wx2 = np.where(roi>0) - if len(wx2)==0: return px2, py2 - dd = np.hypot(wx2-(px2-x0), wy2-(py2-y0)) - i = int(np.argmin(dd)) - return int(wx2[i]+x0), int(wy2[i]+y0) - return snap(sa), snap(ea) - - valid = [] - for cx, cy, r in circles: - span, occ = arc_span(cx, cy, r) - if not (MIN_ARC <= span <= MAX_ARC): continue - if not has_leaf(cx, cy, r): continue - if not wall_outside(cx, cy, r): continue - ep1, ep2 = endpoints(cx, cy, r, occ) - valid.append((cx, cy, r, ep1, ep2)) - - used = [False]*len(valid) - double_pairs = [] - for i in range(len(valid)): - if used[i]: continue - cx1,cy1,r1,_,_ = valid[i] - best_j, best_d = -1, 1e9 - for j in range(i+1, len(valid)): - if used[j]: continue - cx2,cy2,r2,_,_ = valid[j] - if max(r1,r2)/(min(r1,r2)+1e-6) > DOUBLE_R_RATIO: continue - cd = float(np.hypot(cx2-cx1, cy2-cy1)) - if cd < (r1+r2)*DOUBLE_DIST and cd < best_d: - best_d, best_j = cd, j - if best_j >= 0: - double_pairs.append((i, best_j)) - used[i] = used[best_j] = True - - singles = [i for i in range(len(valid)) if not used[i]] - for idx in singles: - cx,cy,r,ep1,ep2 = valid[idx] - cv2.line(result, ep1, ep2, (0,0,0), LINE_T) - for i_idx, j_idx in double_pairs: - cx1,cy1,r1,ep1a,ep1b = valid[i_idx] - cx2,cy2,r2,ep2a,ep2b = valid[j_idx] - daa = np.hypot(ep1a[0]-ep2a[0], ep1a[1]-ep2a[1]) - dab = np.hypot(ep1a[0]-ep2b[0], ep1a[1]-ep2b[1]) - if daa <= dab: inner1,outer1,inner2,outer2 = ep1a,ep1b,ep2a,ep2b - else: inner1,outer1,inner2,outer2 = ep1a,ep1b,ep2b,ep2a - cv2.line(result, outer1, outer2, (0,0,0), LINE_T) - cv2.line(result, inner1, inner2, (0,0,0), LINE_T) - - return result - +def extract_walls(img_clean: np.ndarray) -> Tuple[np.ndarray, int]: + """ + Simple, fast wall extraction: + 1. Otsu threshold → binary ink mask + 2. Keep only long horizontal / vertical strokes (morphological open) + 3. Dilate to body thickness + 4. Remove tiny noise blobs by area + Returns (walls_uint8, body_thickness_px). + """ + h, w = img_clean.shape[:2] + gray = cv2.cvtColor(img_clean, cv2.COLOR_BGR2GRAY) -# ════════════════════════════════════════════════════════════════════════════ -# EXTRACT WALLS ADAPTIVE — GPU morphology + GPU threshold -# ════════════════════════════════════════════════════════════════════════════ + _, binary = _cuda_threshold(gray, 0, 255, + cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) + binary = binary.astype(np.uint8) -def _estimate_wall_body_thickness(binary: np.ndarray, fallback: int = 12) -> int: - h, w = binary.shape - n_cols = min(200, w) - col_idx = np.linspace(0, w-1, n_cols, dtype=int) + # Estimate wall body thickness from column run-lengths (vectorised) + n_cols = min(100, w) + col_idx = np.linspace(0, w - 1, n_cols, dtype=int) cols = (binary[:, col_idx] > 0).astype(np.int8) - padded = np.concatenate([np.zeros((1,n_cols),np.int8), cols, - np.zeros((1,n_cols),np.int8)], axis=0) - diff = np.diff(padded.astype(np.int16), axis=0) - run_lengths = [] + pad = np.vstack([np.zeros((1, n_cols), np.int8), + cols, + np.zeros((1, n_cols), np.int8)]) + diff = np.diff(pad.astype(np.int16), axis=0) + runs = [] for ci in range(n_cols): - d = diff[:, ci] - s = np.where(d == 1)[0] - e = np.where(d == -1)[0] - if len(s)==0 or len(e)==0: continue - r = e - s - r = r[(r >= 2) & (r <= h*0.15)] - if len(r): run_lengths.append(r) - if run_lengths: - return int(np.median(np.concatenate(run_lengths))) - return fallback - - -def _remove_thin_lines(walls: np.ndarray, min_thickness: int) -> np.ndarray: - dist = cv2.distanceTransform(walls, cv2.DIST_L2, 5) - thick_mask = dist >= (min_thickness / 2) - n_lbl, labels, _, _ = cv2.connectedComponentsWithStats(walls, connectivity=8) - if n_lbl <= 1: return walls - thick_labels = labels[thick_mask] - if len(thick_labels) == 0: return np.zeros_like(walls) - has_thick = np.zeros(n_lbl, dtype=bool) - has_thick[thick_labels] = True - keep_lut = has_thick.astype(np.uint8)*255; keep_lut[0] = 0 - return keep_lut[labels] - - -def _filter_double_lines_and_thick(walls: np.ndarray) -> np.ndarray: - MIN_SINGLE_DIM = 20; DOUBLE_GAP = 60; DOUBLE_PCT = 12 - - n_lbl, labels, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8) - if n_lbl <= 1: return walls - - try: - skel_full = cv2.ximgproc.thinning(walls, thinningType=cv2.ximgproc.THINNING_ZHANGSUEN) - except AttributeError: - skel_full = _morphological_skeleton(walls) - - skel_bin = skel_full > 0 - keep_ids: set = set() - thin_cands = [] - - for i in range(1, n_lbl): - bw = int(stats[i, cv2.CC_STAT_WIDTH]); bh = int(stats[i, cv2.CC_STAT_HEIGHT]) - if min(bw, bh) >= MIN_SINGLE_DIM: keep_ids.add(i) - else: thin_cands.append(i) - - if not thin_cands: - filtered = np.zeros_like(walls) - for i in keep_ids: filtered[labels==i] = 255 - return filtered - - skel_labels = labels * skel_bin - img_h, img_w = labels.shape - probe_dists = np.arange(3, DOUBLE_GAP+1, 3, dtype=np.float32) - - for i in thin_cands: - bys, bxs = np.where(skel_labels == i) - if len(bys) < 4: continue - step = max(1, len(bys)//80) - sy = bys[::step].astype(np.float32); sx = bxs[::step].astype(np.float32) - n_s = len(sy) - sy_prev=np.roll(sy,1); sy_prev[0]=sy[0] - sy_next=np.roll(sy,-1); sy_next[-1]=sy[-1] - sx_prev=np.roll(sx,1); sx_prev[0]=sx[0] - sx_next=np.roll(sx,-1); sx_next[-1]=sx[-1] - dr=(sy_next-sy_prev); dc=(sx_next-sx_prev) - dlen=np.maximum(1.0, np.hypot(dr, dc)) - pr=(-dc/dlen)[:,np.newaxis]; pc=(dr/dlen)[:,np.newaxis] - for sign in (1.0, -1.0): - rr = np.round(sy[:,np.newaxis] + sign*pr*probe_dists).astype(np.int32) - cc = np.round(sx[:,np.newaxis] + sign*pc*probe_dists).astype(np.int32) - valid_m = (rr>=0)&(rr=0)&(cc0) & (lbl_at!=i) - hit_any = partner.any(axis=1) - hit_rows = np.where(hit_any)[0] - if len(hit_rows) == 0: continue - first_col = partner[hit_rows].argmax(axis=1) - partner_ids = lbl_at[hit_rows, first_col] - keep_ids.update(partner_ids.tolist()) - if 100.0*len(hit_rows)/n_s >= DOUBLE_PCT: - keep_ids.add(i); break - - if keep_ids: - ka = np.array(sorted(keep_ids), dtype=np.int32) - lut = np.zeros(n_lbl, dtype=np.uint8); lut[ka] = 255 - return lut[labels] - return np.zeros_like(walls) - - -def extract_walls_adaptive(img_clean: np.ndarray, - img_stats: Optional[Dict] = None) -> Tuple[np.ndarray, int]: - h, w = img_clean.shape[:2] - gray = cv2.cvtColor(img_clean, cv2.COLOR_BGR2GRAY) - - if img_stats: - wall_threshold = img_stats["wall_threshold"] - else: - otsu_t, _ = _cuda_threshold(gray, 0, 255, cv2.THRESH_BINARY_INV+cv2.THRESH_OTSU) - wall_threshold = int(otsu_t) - - _, binary = _cuda_threshold(gray, wall_threshold, 255, cv2.THRESH_BINARY_INV) - binary = binary.astype(np.uint8) - - min_line_len = max(8, int(0.012 * w)) - body_thickness = _estimate_wall_body_thickness(binary, fallback=12) - body_thickness = int(np.clip(body_thickness, 9, 30)) - - k_h = cv2.getStructuringElement(cv2.MORPH_RECT, (min_line_len, 1)) - k_v = cv2.getStructuringElement(cv2.MORPH_RECT, (1, min_line_len)) - long_h = _cuda_morphology(binary, cv2.MORPH_OPEN, k_h) - long_v = _cuda_morphology(binary, cv2.MORPH_OPEN, k_v) - orig_walls = cv2.bitwise_or(long_h, long_v) - - k_bh = cv2.getStructuringElement(cv2.MORPH_RECT, (1, body_thickness)) - k_bv = cv2.getStructuringElement(cv2.MORPH_RECT, (body_thickness, 1)) - dil_h = _cuda_dilate(long_h, k_bh) - dil_v = _cuda_dilate(long_v, k_bv) - walls = cv2.bitwise_or(dil_h, dil_v) - - collision = cv2.bitwise_and(dil_h, dil_v) - safe_zone = cv2.bitwise_and(collision, orig_walls) - walls = cv2.bitwise_or(cv2.bitwise_and(walls, cv2.bitwise_not(collision)), safe_zone) + s = np.where(diff[:, ci] == 1)[0] + e = np.where(diff[:, ci] == -1)[0] + n = min(len(s), len(e)) + if n == 0: continue + r = (e[:n] - s[:n]) + runs.extend(r[(r >= 2) & (r <= h * 0.15)].tolist()) + body = int(np.clip(int(np.median(runs)) if runs else 12, 6, 30)) + + # Keep long lines only + min_len = max(8, int(0.012 * w)) + kh = cv2.getStructuringElement(cv2.MORPH_RECT, (min_len, 1)) + kv = cv2.getStructuringElement(cv2.MORPH_RECT, (1, min_len)) + long_h = _cuda_morphology(binary, cv2.MORPH_OPEN, kh) + long_v = _cuda_morphology(binary, cv2.MORPH_OPEN, kv) + lines = cv2.bitwise_or(long_h, long_v) + + # Dilate to body thickness + kdh = cv2.getStructuringElement(cv2.MORPH_RECT, (1, body)) + kdv = cv2.getStructuringElement(cv2.MORPH_RECT, (body, 1)) + walls = cv2.bitwise_or( + cv2.dilate(long_h, kdh), + cv2.dilate(long_v, kdv), + ) - dist = cv2.distanceTransform(cv2.bitwise_not(orig_walls), cv2.DIST_L2, 5) - keep_mask = (dist <= body_thickness/2).astype(np.uint8) * 255 + # Clamp to where original ink actually was + dist = cv2.distanceTransform(cv2.bitwise_not(lines), cv2.DIST_L2, 5) + keep_mask = (dist <= body / 2).astype(np.uint8) * 255 walls = cv2.bitwise_and(walls, keep_mask) - walls = _remove_thin_lines(walls, min_thickness=body_thickness) + # Remove tiny noise blobs n_lbl, labels, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8) if n_lbl > 1: - areas = stats[1:, cv2.CC_STAT_AREA] - min_n = max(20, int(np.median(areas) * 0.0001)) - keep_lut = np.zeros(n_lbl, dtype=np.uint8) - keep_lut[1:] = (areas >= min_n).astype(np.uint8) - walls = (keep_lut[labels] * 255).astype(np.uint8) - - walls = _filter_double_lines_and_thick(walls) - return walls, body_thickness - - -# ════════════════════════════════════════════════════════════════════════════ -# REMOVE FIXTURE SYMBOLS -# ════════════════════════════════════════════════════════════════════════════ - -FIXTURE_MAX_BLOB=80; FIXTURE_MAX_AREA=4000; FIXTURE_MAX_ASP=4.0 -FIXTURE_DENSITY_R=50; FIXTURE_DENSITY_THR=0.35; FIXTURE_MIN_ZONE=1500 - -def remove_fixture_symbols(walls: np.ndarray) -> np.ndarray: - h, w = walls.shape - n_lbl, labels, stats, centroids = cv2.connectedComponentsWithStats(walls, connectivity=8) - if n_lbl <= 1: return walls - - bw_a=stats[1:,cv2.CC_STAT_WIDTH].astype(np.float32) - bh_a=stats[1:,cv2.CC_STAT_HEIGHT].astype(np.float32) - ar_a=stats[1:,cv2.CC_STAT_AREA].astype(np.float32) - cx_a=np.round(centroids[1:,0]).astype(np.int32) - cy_a=np.round(centroids[1:,1]).astype(np.int32) - mx=np.maximum(bw_a,bh_a); mn=np.minimum(bw_a,bh_a) - asp=mx/(mn+1e-6) - cand=(bw_a0: density/=dm - zone=(density>=FIXTURE_DENSITY_THR).astype(np.uint8)*255 - nz,zlbl,zst,_=cv2.connectedComponentsWithStats(zone,connectivity=8) - cz=np.zeros_like(zone) - if nz>1: - za=zst[1:,cv2.CC_STAT_AREA]; kz=np.where(za>=FIXTURE_MIN_ZONE)[0]+1 - if len(kz): - lut2=np.zeros(nz,dtype=np.uint8); lut2[kz]=255; cz=lut2[zlbl] - zone=cz - vc=(ccy>=0)&(ccy=0)&(ccx0) - erase_ids=cand_ids[in_zone] - result=walls.copy() - if len(erase_ids): - el=np.zeros(n_lbl,dtype=np.uint8); el[erase_ids]=1 - result[el[labels].astype(bool)]=0 - return result - - -# ════════════════════════════════════════════════════════════════════════════ -# WALL RECONSTRUCTION — 3-stage calibrated pipeline -# ════════════════════════════════════════════════════════════════════════════ - -def _remove_thin_lines_calibrated(walls: np.ndarray, cal: WallCalibration) -> np.ndarray: - n_cc, cc, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8) - if n_cc <= 1: return walls - bw=stats[1:,cv2.CC_STAT_WIDTH]; bh=stats[1:,cv2.CC_STAT_HEIGHT] - ar=stats[1:,cv2.CC_STAT_AREA]; mx=np.maximum(bw,bh) - keep=(mx>=cal.min_component_dim)|(ar>=cal.min_component_area*3) - lut=np.zeros(n_cc,np.uint8); lut[1:]=keep.astype(np.uint8)*255 - return lut[cc] - - -def _bridge_wall_endpoints_v2(walls: np.ndarray, cal: WallCalibration, - angle_tol: float = 15.0) -> np.ndarray: - """ - GPU-accelerated version: distance/angle arrays computed with CuPy when - available; scipy.spatial.cKDTree for pair lookup. - """ - try: - from scipy.spatial import cKDTree as _KDTree - _SCIPY = True - except ImportError: - _SCIPY = False - - result=walls.copy(); h,w=walls.shape; FCOS=np.cos(np.radians(70.0)) - skel=_skel(walls); ey,ex=_tip_pixels(skel); n_ep=len(ey) - if n_ep < 2: return result - - _,cc_map=cv2.connectedComponents(walls,connectivity=8) - ep_cc=cc_map[ey,ex] - lookahead=max(8, cal.stroke_width*3) - out_dx,out_dy=_outward_vectors(ex,ey,skel,lookahead) - pts=np.stack([ex,ey],axis=1).astype(np.float32) - - if _SCIPY: - from scipy.spatial import cKDTree - pairs=cKDTree(pts).query_pairs(float(cal.bridge_max_gap), output_type='ndarray') - ii=pairs[:,0].astype(np.int64); jj=pairs[:,1].astype(np.int64) - else: - _ii,_jj=np.triu_indices(n_ep,k=1) - ok=np.hypot(pts[_jj,0]-pts[_ii,0],pts[_jj,1]-pts[_ii,1])<=cal.bridge_max_gap - ii=_ii[ok].astype(np.int64); jj=_jj[ok].astype(np.int64) - if len(ii)==0: return result - - # ── CuPy GPU acceleration for vectorised distance/angle math ────────── - if _CUPY: - ii_cp = cp.asarray(ii); jj_cp = cp.asarray(jj) - pts_cp = cp.asarray(pts) - odx_cp = cp.asarray(out_dx); ody_cp = cp.asarray(out_dy) - - dxij = pts_cp[jj_cp,0]-pts_cp[ii_cp,0] - dyij = pts_cp[jj_cp,1]-pts_cp[ii_cp,1] - dists_cp = cp.hypot(dxij,dyij) - safe = cp.maximum(dists_cp, 1e-6) - ux,uy = dxij/safe, dyij/safe - ang = cp.degrees(cp.arctan2(cp.abs(dyij), cp.abs(dxij))) - is_H = (ang<=angle_tol) - is_V = (ang>=(90.0-angle_tol)) - - g1 = (dists_cp>=cal.bridge_min_gap)&(dists_cp<=cal.bridge_max_gap) - g2 = is_H|is_V - g3 = ((odx_cp[ii_cp]*ux+ody_cp[ii_cp]*uy)>=FCOS) & \ - ((odx_cp[jj_cp]*-ux+ody_cp[jj_cp]*-uy)>=FCOS) - ep_cc_cp = cp.asarray(ep_cc) - g4 = ep_cc_cp[ii_cp]!=ep_cc_cp[jj_cp] - pre_ok_cp = g1&g2&g3&g4 - - # pull back to CPU for the line-clearing CPU loop - pre_idx = cp.asnumpy(cp.where(pre_ok_cp)[0]) - dists = cp.asnumpy(dists_cp) - is_H = cp.asnumpy(is_H) - is_V = cp.asnumpy(is_V) - else: - dxij=pts[jj,0]-pts[ii,0]; dyij=pts[jj,1]-pts[ii,1] - dists=np.hypot(dxij,dyij); safe=np.maximum(dists,1e-6) - ux,uy=dxij/safe,dyij/safe - ang=np.degrees(np.arctan2(np.abs(dyij),np.abs(dxij))) - is_H=ang<=angle_tol; is_V=ang>=(90.0-angle_tol) - g1=(dists>=cal.bridge_min_gap)&(dists<=cal.bridge_max_gap); g2=is_H|is_V - g3=((out_dx[ii]*ux+out_dy[ii]*uy)>=FCOS)&((out_dx[jj]*-ux+out_dy[jj]*-uy)>=FCOS) - g4=ep_cc[ii]!=ep_cc[jj] - pre_ok=g1&g2&g3&g4; pre_idx=np.where(pre_ok)[0] - - N_SAMP=9; clr=np.ones(len(pre_idx),dtype=bool) - for k,pidx in enumerate(pre_idx): - ia,ib=int(ii[pidx]),int(jj[pidx]) - ax,ay=int(ex[ia]),int(ey[ia]); bx2,by2=int(ex[ib]),int(ey[ib]) - if is_H[pidx]: - xs=np.linspace(ax,bx2,N_SAMP,np.float32); ys=np.full(N_SAMP,ay,np.float32) - else: - xs=np.full(N_SAMP,ax,np.float32); ys=np.linspace(ay,by2,N_SAMP,np.float32) - sxs=np.clip(np.round(xs[1:-1]).astype(np.int32),0,w-1) - sys_=np.clip(np.round(ys[1:-1]).astype(np.int32),0,h-1) - if np.any(walls[sys_,sxs]>0): clr[k]=False - valid=pre_idx[clr] - if len(valid)==0: return result - - vi=ii[valid]; vj=jj[valid]; vd=dists[valid]; vH=is_H[valid] - order=np.argsort(vd); vi,vj,vd,vH=vi[order],vj[order],vd[order],vH[order] - used=np.zeros(n_ep,dtype=bool) - for k in range(len(vi)): - ia,ib=int(vi[k]),int(vj[k]) - if used[ia] or used[ib]: continue - ax,ay=int(ex[ia]),int(ey[ia]); bx2,by2=int(ex[ib]),int(ey[ib]) - p1,p2=((min(ax,bx2),ay),(max(ax,bx2),ay)) if vH[k] else ((ax,min(ay,by2)),(ax,max(ay,by2))) - cv2.line(result,p1,p2,255,cal.stroke_width) - used[ia]=used[ib]=True - return result - - -def _close_door_openings_v2(walls: np.ndarray, cal: WallCalibration) -> np.ndarray: - gap=cal.door_gap - def _shape_close(mask, kwh, axis, max_thick): - k=cv2.getStructuringElement(cv2.MORPH_RECT, kwh) - cls=_cuda_morphology(mask, cv2.MORPH_CLOSE, k) - new=cv2.bitwise_and(cls,cv2.bitwise_not(mask)) - if not np.any(new): return np.zeros_like(mask) - n2,lbl2,st2,_=cv2.connectedComponentsWithStats(new,connectivity=8) - if n2<=1: return np.zeros_like(mask) - perp=st2[1:,cv2.CC_STAT_HEIGHT if axis=='H' else cv2.CC_STAT_WIDTH] - keep=perp<=max_thick; lut2=np.zeros(n2,np.uint8); lut2[1:]=keep.astype(np.uint8)*255 - return lut2[lbl2] - add_h=_shape_close(walls,(gap,1),'H',cal.max_bridge_thick) - add_v=_shape_close(walls,(1,gap),'V',cal.max_bridge_thick) - return cv2.bitwise_or(walls, cv2.bitwise_or(add_h,add_v)) - - -def reconstruct_walls(walls: np.ndarray) -> Tuple[np.ndarray, WallCalibration]: - cal = calibrate_wall(walls) - walls = _remove_thin_lines_calibrated(walls, cal) - walls = _bridge_wall_endpoints_v2(walls, cal) - walls = _close_door_openings_v2(walls, cal) - return walls, cal + areas = stats[1:, cv2.CC_STAT_AREA] + min_a = max(20, int(np.median(areas) * 0.001)) + lut = np.zeros(n_lbl, dtype=np.uint8) + lut[1:] = (areas >= min_a).astype(np.uint8) * 255 + walls = lut[labels] - -# ════════════════════════════════════════════════════════════════════════════ -# REMOVE DANGLING LINES -# ════════════════════════════════════════════════════════════════════════════ - -def remove_dangling_lines(walls: np.ndarray, cal: WallCalibration) -> np.ndarray: - stroke = cal.stroke_width - connect_radius = max(6, stroke*3) - n_cc,cc_map,stats,_ = cv2.connectedComponentsWithStats(walls,connectivity=8) - if n_cc <= 1: return walls - - skel=_skel(walls); tip_y,tip_x=_tip_pixels(skel) - tip_cc=cc_map[tip_y,tip_x] - free_counts=np.zeros(n_cc,dtype=np.int32) - for i in range(len(tip_x)): free_counts[tip_cc[i]]+=1 - - remove=np.zeros(n_cc,dtype=bool) - ker=cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(connect_radius*2+1,connect_radius*2+1)) - for cc_id in range(1,n_cc): - if free_counts[cc_id]<2: continue - bw2=int(stats[cc_id,cv2.CC_STAT_WIDTH]); bh2=int(stats[cc_id,cv2.CC_STAT_HEIGHT]) - if max(bw2,bh2) > stroke*40: continue - cm=(cc_map==cc_id).astype(np.uint8) - dc=_cuda_dilate(cm, ker) - overlap=cv2.bitwise_and(dc,((walls>0)&(cc_map!=cc_id)).astype(np.uint8)) - if np.count_nonzero(overlap)==0: remove[cc_id]=True - - lut=np.ones(n_cc,dtype=np.uint8); lut[0]=0; lut[remove]=0 - return (lut[cc_map]*255).astype(np.uint8) + return walls.astype(np.uint8), body # ════════════════════════════════════════════════════════════════════════════ -# CLOSE LARGE DOOR GAPS — CuPy-accelerated distance/angle math +# STEP 5 — SAM SEGMENTATION (batched, FP16, one encoder pass) # ════════════════════════════════════════════════════════════════════════════ -def close_large_door_gaps(walls: np.ndarray, cal: WallCalibration) -> np.ndarray: - try: - from scipy.spatial import cKDTree - _SCIPY = True - except ImportError: - _SCIPY = False - - DOOR_MIN=180; DOOR_MAX=320; ANGLE_TOL=12.0 - FCOS=np.cos(np.radians(90.0-ANGLE_TOL)) - stroke=cal.stroke_width; line_width=max(stroke,3) - result=walls.copy(); h,w=walls.shape - - skel=_skel(walls); tip_y,tip_x=_tip_pixels(skel) - n_ep=len(tip_x) - if n_ep<2: return result - - _,cc_map=cv2.connectedComponents(walls,connectivity=8) - ep_cc=cc_map[tip_y,tip_x] - lookahead=max(12,stroke*4) - out_dx,out_dy=_outward_vectors(tip_x,tip_y,skel,lookahead) - pts=np.stack([tip_x,tip_y],axis=1).astype(np.float32) - - if _SCIPY: - pairs=cKDTree(pts).query_pairs(float(DOOR_MAX),output_type='ndarray') - ii=pairs[:,0].astype(np.int64); jj=pairs[:,1].astype(np.int64) - else: - _ii,_jj=np.triu_indices(n_ep,k=1) - ok=np.hypot(pts[_jj,0]-pts[_ii,0],pts[_jj,1]-pts[_ii,1])<=DOOR_MAX - ii=_ii[ok].astype(np.int64); jj=_jj[ok].astype(np.int64) - if len(ii)==0: return result - - # ── CuPy for vectorised math ────────────────────────────────────────── - if _CUPY: - ii_cp=cp.asarray(ii); jj_cp=cp.asarray(jj) - pts_cp=cp.asarray(pts) - odx_cp=cp.asarray(out_dx); ody_cp=cp.asarray(out_dy) - ep_cc_cp=cp.asarray(ep_cc) - - dxij=pts_cp[jj_cp,0]-pts_cp[ii_cp,0] - dyij=pts_cp[jj_cp,1]-pts_cp[ii_cp,1] - dists_cp=cp.hypot(dxij,dyij); safe=cp.maximum(dists_cp,1e-6) - ux,uy=dxij/safe,dyij/safe - ang=cp.degrees(cp.arctan2(cp.abs(dyij),cp.abs(dxij))) - is_H=(ang<=ANGLE_TOL); is_V=(ang>=(90.0-ANGLE_TOL)) - g1=(dists_cp>=DOOR_MIN)&(dists_cp<=DOOR_MAX); g2=is_H|is_V - g3=((odx_cp[ii_cp]*ux+ody_cp[ii_cp]*uy)>=FCOS)&\ - ((odx_cp[jj_cp]*-ux+ody_cp[jj_cp]*-uy)>=FCOS) - g4=ep_cc_cp[ii_cp]!=ep_cc_cp[jj_cp] - pre_idx=cp.asnumpy(cp.where(g1&g2&g3&g4)[0]) - dists=cp.asnumpy(dists_cp); is_H=cp.asnumpy(is_H); is_V=cp.asnumpy(is_V) - else: - dxij=pts[jj,0]-pts[ii,0]; dyij=pts[jj,1]-pts[ii,1] - dists=np.hypot(dxij,dyij); safe=np.maximum(dists,1e-6) - ux,uy=dxij/safe,dyij/safe - ang=np.degrees(np.arctan2(np.abs(dyij),np.abs(dxij))) - is_H=ang<=ANGLE_TOL; is_V=ang>=(90.0-ANGLE_TOL) - g1=(dists>=DOOR_MIN)&(dists<=DOOR_MAX); g2=is_H|is_V - g3=((out_dx[ii]*ux+out_dy[ii]*uy)>=FCOS)&((out_dx[jj]*-ux+out_dy[jj]*-uy)>=FCOS) - g4=ep_cc[ii]!=ep_cc[jj] - pre_idx=np.where(g1&g2&g3&g4)[0] - - N_SAMP=15; clr=np.ones(len(pre_idx),dtype=bool) - for k,pidx in enumerate(pre_idx): - ia,ib=int(ii[pidx]),int(jj[pidx]) - ax,ay=int(tip_x[ia]),int(tip_y[ia]); bx2,by2=int(tip_x[ib]),int(tip_y[ib]) - if is_H[pidx]: - xs=np.linspace(ax,bx2,N_SAMP,dtype=np.float32) - ys=np.full(N_SAMP,(ay+by2)/2.0,dtype=np.float32) - else: - xs=np.full(N_SAMP,(ax+bx2)/2.0,dtype=np.float32) - ys=np.linspace(ay,by2,N_SAMP,dtype=np.float32) - sxs=np.clip(np.round(xs[1:-1]).astype(np.int32),0,w-1) - sys_=np.clip(np.round(ys[1:-1]).astype(np.int32),0,h-1) - if np.any(walls[sys_,sxs]>0): clr[k]=False - valid=pre_idx[clr] - if len(valid)==0: return result - - vi=ii[valid]; vj=jj[valid]; vd=dists[valid]; vH=is_H[valid] - order=np.argsort(vd); vi,vj,vd,vH=vi[order],vj[order],vd[order],vH[order] - used=np.zeros(n_ep,dtype=bool) - for k in range(len(vi)): - ia,ib=int(vi[k]),int(vj[k]) - if used[ia] or used[ib]: continue - ax,ay=int(tip_x[ia]),int(tip_y[ia]); bx2,by2=int(tip_x[ib]),int(tip_y[ib]) - if vH[k]: p1=(min(ax,bx2),(ay+by2)//2); p2=(max(ax,bx2),(ay+by2)//2) - else: p1=((ax+bx2)//2,min(ay,by2)); p2=((ax+bx2)//2,max(ay,by2)) - cv2.line(result,p1,p2,255,line_width) - used[ia]=used[ib]=True - return result - - -def apply_user_lines_to_walls(walls, lines, thickness): - result = walls.copy() - for x1, y1, x2, y2 in lines: - cv2.line(result, (x1, y1), (x2, y2), 255, max(thickness, 3)) - return result - - -def segment_rooms_flood(walls: np.ndarray) -> np.ndarray: - h, w = walls.shape - work = walls.copy() - work[:5, :] = 255; work[-5:, :] = 255 - work[:, :5] = 255; work[:, -5:] = 255 +def _flood_rooms(walls: np.ndarray) -> np.ndarray: + """Flood-fill from image border to find enclosed regions.""" + h, w = walls.shape + work = walls.copy() + work[:5, :] = 255; work[-5:, :] = 255 + work[:, :5] = 255; work[:, -5:] = 255 filled = work.copy() - mask = np.zeros((h+2, w+2), np.uint8) + mask = np.zeros((h + 2, w + 2), np.uint8) for sx, sy in [(0,0),(w-1,0),(0,h-1),(w-1,h-1), (w//2,0),(w//2,h-1),(0,h//2),(w-1,h//2)]: if filled[sy, sx] == 0: cv2.floodFill(filled, mask, (sx, sy), 255) rooms = cv2.bitwise_not(filled) rooms = cv2.bitwise_and(rooms, cv2.bitwise_not(walls)) - rooms = _cuda_morphology(rooms, cv2.MORPH_OPEN, np.ones((2,2), np.uint8)) + rooms = _cuda_morphology(rooms, cv2.MORPH_OPEN, np.ones((2, 2), np.uint8)) return rooms -def _morphological_skeleton(binary: np.ndarray) -> np.ndarray: - skel = np.zeros_like(binary) - img = binary.copy() - cross = cv2.getStructuringElement(cv2.MORPH_CROSS, (3,3)) - for _ in range(300): - eroded = cv2.erode(img, cross) - temp = cv2.subtract(img, cv2.dilate(eroded, cross)) - skel = cv2.bitwise_or(skel, temp) - img = eroded - if not cv2.countNonZero(img): - break - return skel - - -def _find_thick_wall_neg_prompts(walls_mask, n=SAM_WALL_NEG): - h, w = walls_mask.shape - dist = cv2.distanceTransform(walls_mask, cv2.DIST_L2, cv2.DIST_MASK_PRECISE) - try: - skel = cv2.ximgproc.thinning(walls_mask, thinningType=cv2.ximgproc.THINNING_ZHANGSUEN) - except AttributeError: - skel = _morphological_skeleton(walls_mask) - skel_vals = dist[skel > 0] - if len(skel_vals) == 0: return [] - thr = max(float(np.percentile(skel_vals, SAM_WALL_PCT)), WALL_MIN_HALF_PX) - ys, xs = np.where((skel > 0) & (dist >= thr)) - if len(ys) == 0: return [] - grid_cells = max(1, int(np.ceil(np.sqrt(n * 4)))) - cell_h = max(1, h // grid_cells); cell_w = max(1, w // grid_cells) - cell_ids = (ys // cell_h) * grid_cells + (xs // cell_w) - _, first = np.unique(cell_ids, return_index=True) - sel = first[:n] - return [(int(xs[i]), int(ys[i])) for i in sel] - - -def generate_prompts(walls_mask, rooms_flood): - h, w = walls_mask.shape - inv = cv2.bitwise_not(walls_mask) +def _make_prompts(walls: np.ndarray, rooms_flood: np.ndarray): + """Generate SAM point prompts: one positive per flood region, negatives on thick wall centres.""" + h, w = walls.shape + inv = cv2.bitwise_not(walls) n, labels, stats, centroids = cv2.connectedComponentsWithStats(inv, connectivity=8) - min_prompt_area = max(200, int(h * w * 0.0001)) + min_area = max(200, int(h * w * 0.0001)) pts, lbls = [], [] + for i in range(1, n): - area = int(stats[i, cv2.CC_STAT_AREA]) - if area < min_prompt_area: continue + if int(stats[i, cv2.CC_STAT_AREA]) < min_area: continue bx = int(stats[i, cv2.CC_STAT_LEFT]); by = int(stats[i, cv2.CC_STAT_TOP]) - bw = int(stats[i, cv2.CC_STAT_WIDTH]); bh = int(stats[i, cv2.CC_STAT_HEIGHT]) - if bx <= 2 and by <= 2 and bx+bw >= w-2 and by+bh >= h-2: continue - cx = int(np.clip(centroids[i][0], 0, w-1)) - cy = int(np.clip(centroids[i][1], 0, h-1)) - if walls_mask[cy, cx] > 0: + bw = int(stats[i, cv2.CC_STAT_WIDTH]); bh2 = int(stats[i, cv2.CC_STAT_HEIGHT]) + if bx <= 2 and by <= 2 and bx + bw >= w - 2 and by + bh2 >= h - 2: continue + cx = int(np.clip(centroids[i][0], 0, w - 1)) + cy = int(np.clip(centroids[i][1], 0, h - 1)) + if walls[cy, cx] > 0: found = False for dy in range(-15, 16, 2): for dx in range(-15, 16, 2): - ny2, nx2 = cy+dy, cx+dx - if 0<=ny2 0).astype(np.uint8) * 255 # fallback: whole wall + skel_vals = dist[skel > 0] + if len(skel_vals): + thr = max(float(np.percentile(skel_vals, SAM_WALL_PCT)), 3.0) + ys, xs = np.where((skel > 0) & (dist >= thr)) + if len(ys): + step = max(1, len(ys) // SAM_WALL_NEG) + for yi, xi in zip(ys[::step][:SAM_WALL_NEG], xs[::step][:SAM_WALL_NEG]): + pts.append([int(xi), int(yi)]); lbls.append(0) -# ════════════════════════════════════════════════════════════════════════════ -# SAM — BATCHED INFERENCE with torch.no_grad + torch.autocast (FP16) -# ════════════════════════════════════════════════════════════════════════════ + return np.array(pts, dtype=np.float32), np.array(lbls, dtype=np.int32) -def segment_with_sam(img_rgb, walls, sam_ckpt, rooms_flood=None): - """ - GPU-optimised SAM segmentation: - • torch.no_grad() — disables gradient tape entirely - • torch.autocast("cuda", dtype=torch.float16) — FP16 for 2× speed on Tensor cores - • Batched predict: all positive prompts sent in ONE predictor call - (negative prompts broadcast to every positive point) - • torch.cuda.empty_cache() after inference to release VRAM - """ - if rooms_flood is None: - rooms_flood = segment_rooms_flood(walls.copy()) +def segment_with_sam(img_rgb: np.ndarray, walls: np.ndarray, + sam_ckpt: str) -> Tuple[np.ndarray, List[Dict]]: + rooms_flood = _flood_rooms(walls) sam_room_masks: List[Dict] = [] try: import torch from segment_anything import sam_model_registry, SamPredictor - - if not Path(sam_ckpt).exists(): - print(" [SAM] Model not found — using flood-fill") - return rooms_flood, [] - - device = "cuda" if torch.cuda.is_available() else "cpu" - print(f" [SAM] Loading vit_h on {device} (FP16 autocast enabled)") + device = "cuda" if _TORCH_CUDA else "cpu" + print(f" [SAM] Loading vit_h on {device}") sam = sam_model_registry["vit_h"](checkpoint=sam_ckpt) sam.to(device); sam.eval() predictor = SamPredictor(sam) - except Exception as e: - print(f" [SAM] Load failed ({e}) — using flood-fill") + print(f" [SAM] Load failed ({e}) — flood-fill fallback") return rooms_flood, [] - all_points, all_labels = generate_prompts(walls, rooms_flood) - if len(all_points) == 0: + all_pts, all_lbls = _make_prompts(walls, rooms_flood) + if len(all_pts) == 0: return rooms_flood, [] - pos_pts = [(p, l) for p, l in zip(all_points, all_labels) if l == 1] - neg_pts = [p for p, l in zip(all_points, all_labels) if l == 0] - print(f" [SAM] {len(pos_pts)} room prompts + {len(neg_pts)} wall-neg prompts") + pos_pts = [(p, l) for p, l in zip(all_pts, all_lbls) if l == 1] + neg_pts = [p for p, l in zip(all_pts, all_lbls) if l == 0] + neg_coords = np.array(neg_pts, dtype=np.float32) if neg_pts else None + neg_lbls = np.zeros(len(neg_pts), dtype=np.int32) if neg_pts else None + print(f" [SAM] {len(pos_pts)} pos + {len(neg_pts)} neg prompts") - # ── Set image ONCE (encoder runs once on GPU) ───────────────────────── with torch.no_grad(): predictor.set_image(img_rgb) - h, w = walls.shape - sam_mask = np.zeros((h, w), dtype=np.uint8) - accepted = 0 + h, w = walls.shape + sam_mask = np.zeros((h, w), dtype=np.uint8) + accepted = 0 + denoise_k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) + autocast = (torch.autocast("cuda", dtype=torch.float16) if _TORCH_CUDA + else torch.autocast("cpu", dtype=torch.bfloat16)) - neg_coords = np.array(neg_pts, dtype=np.float32) if neg_pts else None - neg_lbls = np.zeros(len(neg_pts), dtype=np.int32) if neg_pts else None - denoise_k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) - - # ── BATCH: stack all positive prompts (with shared negatives) ───────── - # SAM's predict() accepts (N,2) point_coords and (N,) point_labels for - # multi-point inference per call. We run one call per positive centroid - # but inside torch.no_grad + autocast to maximise GPU throughput. - autocast_ctx = ( - torch.autocast("cuda", dtype=torch.float16) - if _TORCH_CUDA else - torch.autocast("cpu", dtype=torch.bfloat16) - ) + # ── Batched decode via mask_decoder ────────────────────────────────── + use_fallback = False + try: + import torch + BATCH = len(pos_pts) + pos_t = torch.from_numpy( + np.array([p for p, _ in pos_pts], dtype=np.float32) + ).to(predictor.device) # (B, 2) + pos_l = torch.ones(BATCH, dtype=torch.int, device=predictor.device) + + trans_pos = predictor.transform.apply_coords_torch( + pos_t.unsqueeze(0), predictor.original_size # (1, B, 2) + ) # → (1, B, 2) + + if neg_coords is not None: + neg_t = torch.from_numpy(neg_coords).to(predictor.device) + neg_l = torch.zeros(len(neg_pts), dtype=torch.int, device=predictor.device) + trans_neg = predictor.transform.apply_coords_torch( + neg_t.unsqueeze(0).expand(BATCH, -1, -1), + predictor.original_size, + ) # (B, N_neg, 2) + all_coords = torch.cat( + [trans_pos.squeeze(0).unsqueeze(1), trans_neg], dim=1) + all_labels = torch.cat( + [pos_l.unsqueeze(1), + neg_l.unsqueeze(0).expand(BATCH, -1)], dim=1) + else: + all_coords = trans_pos.squeeze(0).unsqueeze(1) # (B, 1, 2) + all_labels = pos_l.unsqueeze(1) # (B, 1) + + with torch.no_grad(), autocast: + sparse_emb, dense_emb = predictor.model.prompt_encoder( + points=(all_coords, all_labels), boxes=None, masks=None) + low_res, iou_preds, _ = predictor.model.mask_decoder( + image_embeddings = predictor.features.expand(BATCH, -1, -1, -1), + image_pe = predictor.model.prompt_encoder.get_dense_pe(), + sparse_prompt_embeddings= sparse_emb, + dense_prompt_embeddings = dense_emb, + multimask_output = True, + ) + + best_idx = iou_preds.argmax(dim=1) # (B,) + best_score = iou_preds[torch.arange(BATCH), best_idx] + upscaled = predictor.model.postprocess_masks( + low_res[torch.arange(BATCH), best_idx].unsqueeze(1), + input_size = predictor.input_size, + original_size = predictor.original_size, + ).squeeze(1) + masks_np = (upscaled > predictor.model.mask_threshold).cpu().numpy() + scores_np = best_score.cpu().float().numpy() - with torch.no_grad(), autocast_ctx: - for (px, py), lbl in pos_pts: - px, py = int(px), int(py) - if neg_coords is not None: - pt_c = np.vstack([[[px, py]], neg_coords]) - pt_l = np.concatenate([[lbl], neg_lbls]) - else: - pt_c = np.array([[px, py]], dtype=np.float32) - pt_l = np.array([lbl], dtype=np.int32) - - try: - masks, scores, _ = predictor.predict( - point_coords=pt_c, point_labels=pt_l, multimask_output=True - ) - except Exception as e: - print(f" [SAM] predict failed ({e})") - continue - - best_idx = int(np.argmax(scores)) - best_score = float(scores[best_idx]) - if best_score < SAM_MIN_SCORE: - continue - - best_mask = (masks[best_idx] > 0).astype(np.uint8) * 255 - best_mask = cv2.bitwise_and(best_mask, rooms_flood) - best_mask = _cuda_morphology(best_mask, cv2.MORPH_OPEN, denoise_k, iterations=1) - - if not np.any(best_mask): - continue - - sam_room_masks.append({ - "mask" : best_mask.copy(), - "score" : best_score, - "prompt": (px, py), - }) - sam_mask = cv2.bitwise_or(sam_mask, best_mask) + except Exception as e: + print(f" [SAM] Batched decode failed ({e}) — serial fallback") + use_fallback = True + + if use_fallback: + with torch.no_grad(), autocast: + for (px, py), _ in pos_pts: + px, py = int(px), int(py) + pt_c = (np.vstack([[[px, py]], neg_coords]) + if neg_coords is not None + else np.array([[px, py]], np.float32)) + pt_l = (np.concatenate([[1], neg_lbls]) + if neg_lbls is not None + else np.array([1], np.int32)) + try: + m, s, _ = predictor.predict( + point_coords=pt_c, point_labels=pt_l, multimask_output=True) + bi = int(np.argmax(s)) + sc = float(s[bi]) + if sc < SAM_MIN_SCORE: continue + bm = (m[bi] > 0).astype(np.uint8) * 255 + bm = cv2.bitwise_and(bm, rooms_flood) + bm = _cuda_morphology(bm, cv2.MORPH_OPEN, denoise_k) + if not np.any(bm): continue + sam_room_masks.append({"mask": bm.copy(), "score": sc, "prompt": (px, py)}) + sam_mask = cv2.bitwise_or(sam_mask, bm) + accepted += 1 + except Exception: + continue + else: + for idx in range(BATCH): + sc = float(scores_np[idx]) + if sc < SAM_MIN_SCORE: continue + bm = masks_np[idx].astype(np.uint8) * 255 + bm = cv2.bitwise_and(bm, rooms_flood) + bm = _cuda_morphology(bm, cv2.MORPH_OPEN, denoise_k) + if not np.any(bm): continue + px, py = int(pos_pts[idx][0][0]), int(pos_pts[idx][0][1]) + sam_room_masks.append({"mask": bm.copy(), "score": sc, "prompt": (px, py)}) + sam_mask = cv2.bitwise_or(sam_mask, bm) accepted += 1 - # ── Free GPU VRAM after inference ───────────────────────────────────── if _TORCH_CUDA: torch.cuda.empty_cache() - print(f" [SAM] VRAM freed. Accepted {accepted}/{len(pos_pts)} masks") - else: - print(f" [SAM] Accepted {accepted}/{len(pos_pts)} masks") - - if accepted == 0: - return rooms_flood, [] + print(f" [SAM] Accepted {accepted}/{len(pos_pts)} masks") + return (sam_mask if accepted else rooms_flood), sam_room_masks - return sam_mask, sam_room_masks +# ════════════════════════════════════════════════════════════════════════════ +# STEP 6 — FILTER VALID ROOMS +# ════════════════════════════════════════════════════════════════════════════ -def filter_room_regions(rooms_mask, img_shape): +def filter_room_regions(rooms_mask: np.ndarray, + img_shape: Tuple) -> Tuple[np.ndarray, List]: h, w = img_shape[:2] img_area = float(h * w) min_area = img_area * MIN_ROOM_AREA_FRAC max_area = img_area * MAX_ROOM_AREA_FRAC - min_dim = w * MIN_ROOM_DIM_FRAC - margin = max(5.0, w * BORDER_MARGIN_FRAC) + min_dim = w * MIN_ROOM_DIM_FRAC + margin = max(5.0, w * BORDER_MARGIN_FRAC) - contours, _ = cv2.findContours(rooms_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) - if not contours: return np.zeros_like(rooms_mask), [] + contours, _ = cv2.findContours(rooms_mask, cv2.RETR_EXTERNAL, + cv2.CHAIN_APPROX_SIMPLE) + if not contours: + return np.zeros_like(rooms_mask), [] - bboxes = np.array([cv2.boundingRect(c) for c in contours], dtype=np.float32) - areas = np.array([cv2.contourArea(c) for c in contours], dtype=np.float32) - bx, by, bw_arr, bh_arr = bboxes[:,0], bboxes[:,1], bboxes[:,2], bboxes[:,3] + bboxes = np.array([cv2.boundingRect(c) for c in contours], dtype=np.float32) + areas = np.array([cv2.contourArea(c) for c in contours], dtype=np.float32) + bx, by, bw_a, bh_a = bboxes[:, 0], bboxes[:, 1], bboxes[:, 2], bboxes[:, 3] - area_ok = (areas >= min_area) & (areas <= max_area) - border_ok = (bx >= margin) & (by >= margin) & \ - (bx + bw_arr <= w - margin) & (by + bh_arr <= h - margin) - dim_ok = (bw_arr >= min_dim) | (bh_arr >= min_dim) - aspect = np.maximum(bw_arr, bh_arr) / (np.minimum(bw_arr, bh_arr) + 1e-6) - aspect_ok = aspect <= MAX_ASPECT_RATIO - extent_ok = (areas / (bw_arr * bh_arr + 1e-6)) >= MIN_EXTENT - cheap_pass = np.where(area_ok & border_ok & dim_ok & aspect_ok & extent_ok)[0] + ok = ((areas >= min_area) & (areas <= max_area) & + (bx >= margin) & (by >= margin) & + (bx + bw_a <= w - margin) & (by + bh_a <= h - margin) & + ((bw_a >= min_dim) | (bh_a >= min_dim)) & + (np.maximum(bw_a, bh_a) / (np.minimum(bw_a, bh_a) + 1e-6) <= MAX_ASPECT_RATIO) & + (areas / (bw_a * bh_a + 1e-6) >= MIN_EXTENT)) valid_mask = np.zeros_like(rooms_mask) valid_rooms = [] - for i in cheap_pass: + for i in np.where(ok)[0]: cnt = contours[i] hull = cv2.convexHull(cnt) ha = cv2.contourArea(hull) @@ -1184,146 +497,58 @@ def filter_room_regions(rooms_mask, img_shape): return valid_mask, valid_rooms -def pixel_area_to_m2(area_px): - return area_px * (2.54 / DPI) ** 2 * (SCALE_FACTOR ** 2) / 10000 - - -def _mask_to_contour_flat(mask): - contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) - if not contours: return [] - largest = max(contours, key=cv2.contourArea) - pts = largest[:, 0, :].tolist() - return [v for pt in pts for v in pt] - - -def _match_sam_mask_to_contour(contour, sam_room_masks): - if not sam_room_masks: - return _contour_to_rle_and_flat(contour) - sam_h, sam_w = sam_room_masks[0]["mask"].shape - contour_mask = np.zeros((sam_h, sam_w), dtype=np.uint8) - cv2.drawContours(contour_mask, [contour], -1, 255, thickness=-1) - best_iou = 0.0; best_entry = None - for entry in sam_room_masks: - m = entry["mask"] - if m.shape != contour_mask.shape: continue - inter = np.count_nonzero(cv2.bitwise_and(m, contour_mask)) - if inter == 0: continue - union = np.count_nonzero(cv2.bitwise_or(m, contour_mask)) - iou = inter / (union + 1e-6) - if iou > best_iou: best_iou = iou; best_entry = entry - if best_entry is None or best_iou < 0.05: - return _contour_to_rle_and_flat(contour) - sam_contour_flat = _mask_to_contour_flat(best_entry["mask"]) - if not sam_contour_flat: - raw_pts = contour[:, 0, :].tolist() - sam_contour_flat = [v for pt in raw_pts for v in pt] - return mask_to_rle(best_entry["mask"]), sam_contour_flat, best_entry["score"] - - -def _contour_to_rle_and_flat(contour): - x, y, rw, rh = cv2.boundingRect(contour) - canvas = np.zeros((rh+y+20, rw+x+20), dtype=np.uint8) - cv2.drawContours(canvas, [contour], -1, 255, thickness=-1) - raw_pts = contour[:, 0, :].tolist() - flat_pts = [v for pt in raw_pts for v in pt] - return mask_to_rle(canvas), flat_pts, 1.0 - - -def measure_and_label_rooms(img, valid_rooms, sam_room_masks): - room_data = [] - for idx, contour in enumerate(valid_rooms, 1): - x, y, rw, rh = cv2.boundingRect(contour) - label = run_ocr_on_room(img, contour) - if not label or not validate_label(label): - label = f"ROOM {idx}" - area_px = cv2.contourArea(contour) - M = cv2.moments(contour) - cx = int(M["m10"] / M["m00"]) if M["m00"] else x + rw // 2 - cy = int(M["m01"] / M["m00"]) if M["m00"] else y + rh // 2 - _, raw_seg_flat, sam_score = _match_sam_mask_to_contour(contour, sam_room_masks) - room_data.append({ - "id": len(room_data)+1, "label": label, "contour": contour, - "segmentation": [raw_seg_flat], "raw_segmentation": [raw_seg_flat], - "sam_score": round(sam_score,4), "score": round(sam_score,4), - "area": area_px, "area_px": area_px, - "area_m2": round(pixel_area_to_m2(area_px),2), - "bbox": [x,y,rw,rh], "centroid": [cx,cy], - "confidence": 0.95, "isAi": True, - }) - return room_data - - # ════════════════════════════════════════════════════════════════════════════ -# OCR — GPU-ENABLED EasyOCR + EAGER WARM-UP -# -# The delay you saw ("Downloading detection model…") happens because EasyOCR -# downloads craft_mlt_25k.pth + english_g2.pth lazily on first readtext() call. -# Fix: initialise the Reader in a background thread at import time so the -# models are already in GPU memory by the time the user clicks "Run SAM+OCR". +# STEP 7 — OCR VALIDATE (EasyOCR, eager warm-up at import) # ════════════════════════════════════════════════════════════════════════════ -_OCR_READER: Optional[Any] = None # module-level singleton -_OCR_READY = threading.Event() # set once Reader is warm +_OCR_READER: Optional[Any] = None +_OCR_READY = threading.Event() + def _warm_up_ocr() -> None: - """Background thread: build EasyOCR Reader and run a 1-pixel dummy to - force ALL model weights (CRAFT + recognition) onto GPU immediately.""" global _OCR_READER try: import easyocr - print("[OCR] Warming up EasyOCR in background …") - reader = easyocr.Reader( - ["en"], - gpu = _TORCH_CUDA, - verbose = False, # suppress per-file progress bars - download_enabled = True, # allow downloads if cache missing - ) - # Force CRAFT + recognition model into GPU memory now, not on first call - dummy = np.ones((8, 8, 3), dtype=np.uint8) * 255 - reader.readtext(dummy, detail=0) + print("[OCR] Warming up in background …") + reader = easyocr.Reader(["en"], gpu=_TORCH_CUDA, verbose=False, + download_enabled=True) + reader.readtext(np.ones((8, 8, 3), dtype=np.uint8) * 255, detail=0) _OCR_READER = reader print(f"[OCR] Ready gpu={_TORCH_CUDA}") except Exception as e: - print(f"[OCR] Warm-up failed ({e}) — OCR will be skipped") + print(f"[OCR] Warm-up failed ({e})") finally: _OCR_READY.set() -# Kick off warm-up immediately at import time (non-blocking) + threading.Thread(target=_warm_up_ocr, daemon=True, name="ocr-warmup").start() def run_ocr_on_room(img_bgr: np.ndarray, contour: np.ndarray) -> Optional[str]: - # Block only if Reader not yet ready (should already be done by the time - # the user finishes preprocessing + SAM steps) _OCR_READY.wait(timeout=120) - reader = _OCR_READER - if reader is None: + if _OCR_READER is None: return None - x, y, rw, rh = cv2.boundingRect(contour) pad = 20 - roi = img_bgr[max(0,y-pad):min(img_bgr.shape[0],y+rh+pad), - max(0,x-pad):min(img_bgr.shape[1],x+rw+pad)] - if roi.size == 0: return None - + roi = img_bgr[max(0, y-pad):min(img_bgr.shape[0], y+rh+pad), + max(0, x-pad):min(img_bgr.shape[1], x+rw+pad)] + if roi.size == 0: + return None gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) - clahe = cv2.createCLAHE(2.0, (8,8)) - proc = clahe.apply(gray) - _, bin_img = _cuda_threshold(proc, 0, 255, cv2.THRESH_BINARY+cv2.THRESH_OTSU) + proc = cv2.createCLAHE(2.0, (8, 8)).apply(gray) + _, bin_img = _cuda_threshold(proc, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) rgb = cv2.cvtColor(cv2.medianBlur(bin_img.astype(np.uint8), 3), cv2.COLOR_GRAY2RGB) try: - results = reader.readtext(rgb, detail=1, paragraph=False) - cands = [ - (t.strip().upper(), c) - for _, t, c in results - if c >= OCR_CONF_THR and len(t.strip()) >= 2 and any(ch.isalpha() for ch in t) - ] + results = _OCR_READER.readtext(rgb, detail=1, paragraph=False) + cands = [(t.strip().upper(), c) for _, t, c in results + if c >= OCR_CONF_THR and len(t.strip()) >= 2 + and any(ch.isalpha() for ch in t)] return max(cands, key=lambda xc: xc[1])[0] if cands else None except Exception: return None -def validate_label(label): +def _validate_label(label: Optional[str]) -> bool: if not label: return False label = label.strip() if not label[0].isalpha(): return False @@ -1331,410 +556,322 @@ def validate_label(label): return lc == 1 or lc >= 3 -def build_annotated_image(img_bgr, rooms, selected_ids=None): +# ════════════════════════════════════════════════════════════════════════════ +# STEP 8 — MEASURE & ANNOTATE +# ════════════════════════════════════════════════════════════════════════════ + +def pixel_area_to_m2(area_px: float) -> float: + return area_px * (2.54 / DPI) ** 2 * (SCALE_FACTOR ** 2) / 10000 + + +def _match_sam_mask(contour: np.ndarray, + sam_room_masks: List[Dict]) -> Tuple[Any, List, float]: + if not sam_room_masks: + raw = contour[:, 0, :].tolist() + return None, [v for pt in raw for v in pt], 1.0 + h2, w2 = sam_room_masks[0]["mask"].shape + cmask = np.zeros((h2, w2), dtype=np.uint8) + cv2.drawContours(cmask, [contour], -1, 255, -1) + best_iou, best = 0.0, None + for entry in sam_room_masks: + m = entry["mask"] + if m.shape != cmask.shape: continue + inter = np.count_nonzero(cv2.bitwise_and(m, cmask)) + if inter == 0: continue + iou = inter / (np.count_nonzero(cv2.bitwise_or(m, cmask)) + 1e-6) + if iou > best_iou: best_iou = iou; best = entry + if best is None or best_iou < 0.05: + raw = contour[:, 0, :].tolist() + return None, [v for pt in raw for v in pt], 1.0 + cnts, _ = cv2.findContours(best["mask"], cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) + flat = [] + if cnts: + pts = max(cnts, key=cv2.contourArea)[:, 0, :].tolist() + flat = [v for pt in pts for v in pt] + if not flat: + raw = contour[:, 0, :].tolist() + flat = [v for pt in raw for v in pt] + return best["mask"], flat, best["score"] + + +def measure_and_label_rooms(img_bgr: np.ndarray, + valid_rooms: List, + sam_room_masks: List[Dict]) -> List[Dict]: + room_data = [] + for idx, contour in enumerate(valid_rooms, 1): + x, y, rw, rh = cv2.boundingRect(contour) + label = run_ocr_on_room(img_bgr, contour) + if not _validate_label(label): + label = f"ROOM {idx}" + area_px = cv2.contourArea(contour) + M = cv2.moments(contour) + cx = int(M["m10"] / M["m00"]) if M["m00"] else x + rw // 2 + cy = int(M["m01"] / M["m00"]) if M["m00"] else y + rh // 2 + _, seg_flat, score = _match_sam_mask(contour, sam_room_masks) + room_data.append({ + "id": idx, + "label": label, + "contour": contour, + "area_px": area_px, + "area_m2": round(pixel_area_to_m2(area_px), 2), + "bbox": [x, y, rw, rh], + "centroid": [cx, cy], + "score": round(score, 4), + }) + return room_data + + +# ════════════════════════════════════════════════════════════════════════════ +# STEP 9 — BUILD ANNOTATED IMAGE +# ════════════════════════════════════════════════════════════════════════════ + +def build_annotated_image(img_bgr: np.ndarray, + rooms: List[Dict], + selected_ids: Optional[List[int]] = None) -> np.ndarray: vis = img_bgr.copy(); overlay = vis.copy() for i, room in enumerate(rooms): + cnt = room.get("contour") + if cnt is None: continue color = ROOM_COLORS[i % len(ROOM_COLORS)] bgr = (color[2], color[1], color[0]) - cnt = room.get("contour") - if cnt is None: continue cv2.drawContours(overlay, [cnt], -1, bgr, -1) - vis = cv2.addWeighted(overlay, 0.35, vis, 0.65, 0) + vis = cv2.addWeighted(overlay, 0.35, vis, 0.65, 0) overlay = vis.copy() - is_sel = selected_ids and room["id"] in selected_ids - cv2.drawContours(vis, [cnt], -1, (0,255,255) if is_sel else bgr, 4 if is_sel else 2) + is_sel = selected_ids and room["id"] in selected_ids + cv2.drawContours(vis, [cnt], -1, (0, 255, 255) if is_sel else bgr, + 4 if is_sel else 2) M = cv2.moments(cnt) - cx = int(M["m10"]/M["m00"]) if M["m00"] else 0 - cy = int(M["m01"]/M["m00"]) if M["m00"] else 0 - label = room.get("label", f"Room {room['id']}") - area = room.get("area_m2", 0.0) + cx = int(M["m10"] / M["m00"]) if M["m00"] else 0 + cy = int(M["m01"] / M["m00"]) if M["m00"] else 0 + lbl = room.get("label", f"Room {room['id']}") + area = room.get("area_m2", 0.0) fs = 0.55; th = 1 - (tw1, th1), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fs, th) - (tw2, th2), _ = cv2.getTextSize(f"{area:.1f} m²", cv2.FONT_HERSHEY_SIMPLEX, fs-0.1, th) - bx2 = cx - max(tw1,tw2)//2 - 4; by2 = cy - th1 - th2 - 12 - bw2 = max(tw1,tw2)+8; bh2 = th1+th2+16 - sub = vis[max(0,by2):max(0,by2)+bh2, max(0,bx2):max(0,bx2)+bw2] - if sub.size > 0: - vis[max(0,by2):max(0,by2)+bh2, max(0,bx2):max(0,bx2)+bw2] = \ - cv2.addWeighted(sub, 0.3, np.ones_like(sub)*255, 0.7, 0) - cv2.putText(vis, label, (cx-tw1//2, cy-th2-6), - cv2.FONT_HERSHEY_SIMPLEX, fs, (20,20,20), th+1, cv2.LINE_AA) - cv2.putText(vis, f"{area:.1f} m²", (cx-tw2//2, cy+th2+2), - cv2.FONT_HERSHEY_SIMPLEX, fs-0.1, (20,20,20), th, cv2.LINE_AA) + (tw1, _), _ = cv2.getTextSize(lbl, cv2.FONT_HERSHEY_SIMPLEX, fs, th) + (tw2, th2), _ = cv2.getTextSize(f"{area:.1f} m²", + cv2.FONT_HERSHEY_SIMPLEX, fs - 0.1, th) + cv2.putText(vis, lbl, + (cx - tw1 // 2, cy - th2 - 6), + cv2.FONT_HERSHEY_SIMPLEX, fs, (20, 20, 20), th + 1, cv2.LINE_AA) + cv2.putText(vis, f"{area:.1f} m²", + (cx - tw2 // 2, cy + th2 + 2), + cv2.FONT_HERSHEY_SIMPLEX, fs - 0.1, (20, 20, 20), th, cv2.LINE_AA) return vis -def export_to_excel(rooms): +# ════════════════════════════════════════════════════════════════════════════ +# STEP 10 — EXPORT EXCEL +# ════════════════════════════════════════════════════════════════════════════ + +def export_to_excel(rooms: List[Dict]) -> str: wb = openpyxl.Workbook(); ws = wb.active; ws.title = "Room Analysis" - headers = ["ID","Label","Area (px)","Area (m²)","Centroid X","Centroid Y", - "Bbox X","Bbox Y","Bbox W","Bbox H","SAM Score","Confidence"] - hf = PatternFill("solid", fgColor="1F4E79"); hfont = Font(bold=True, color="FFFFFF", size=11) - for col, h in enumerate(headers,1): - cell=ws.cell(row=1,column=col,value=h) - cell.fill=hf; cell.font=hfont; cell.alignment=Alignment(horizontal="center") + headers = ["ID", "Label", "Area (px)", "Area (m²)", + "Centroid X", "Centroid Y", "Bbox X", "Bbox Y", + "Bbox W", "Bbox H", "SAM Score"] + hfill = PatternFill("solid", fgColor="1F4E79") + hfont = Font(bold=True, color="FFFFFF", size=11) + for col, h in enumerate(headers, 1): + c = ws.cell(row=1, column=col, value=h) + c.fill = hfill; c.font = hfont + c.alignment = Alignment(horizontal="center") alt = PatternFill("solid", fgColor="D6E4F0") for rn, room in enumerate(rooms, 2): - cnt = room.get("contour") - M = cv2.moments(cnt) if cnt is not None else {} - cx = int(M["m10"]/M["m00"]) if M.get("m00") else 0 - cy = int(M["m01"]/M["m00"]) if M.get("m00") else 0 - bbox = cv2.boundingRect(cnt) if cnt is not None else (0,0,0,0) - row_data=[room.get("id"), room.get("label","?"), - round(room.get("area_px",0),1), round(room.get("area_m2",0.0),2), - cx, cy, bbox[0], bbox[1], bbox[2], bbox[3], - round(room.get("score",1.0),4), round(room.get("confidence",0.95),2)] - fill = alt if rn%2==0 else None - for col,val in enumerate(row_data,1): - cell=ws.cell(row=rn,column=col,value=val) - cell.alignment=Alignment(horizontal="center") - if fill: cell.fill=fill + cnt = room.get("contour") + M = cv2.moments(cnt) if cnt is not None else {} + cx = int(M["m10"] / M["m00"]) if M.get("m00") else 0 + cy = int(M["m01"] / M["m00"]) if M.get("m00") else 0 + bbox = cv2.boundingRect(cnt) if cnt is not None else (0, 0, 0, 0) + row_data = [room["id"], room["label"], + round(room["area_px"], 1), room["area_m2"], + cx, cy, bbox[0], bbox[1], bbox[2], bbox[3], + room["score"]] + fill = alt if rn % 2 == 0 else None + for col, val in enumerate(row_data, 1): + c = ws.cell(row=rn, column=col, value=val) + c.alignment = Alignment(horizontal="center") + if fill: c.fill = fill for col in ws.columns: - mx=max(len(str(c.value or "")) for c in col)+4 - ws.column_dimensions[col[0].column_letter].width=min(mx,25) - out = Path(tempfile.gettempdir()) / f"floorplan_rooms_{int(time.time())}.xlsx" - wb.save(str(out)); return str(out) + mx = max(len(str(c.value or "")) for c in col) + 4 + ws.column_dimensions[col[0].column_letter].width = min(mx, 25) + out = Path(tempfile.gettempdir()) / f"rooms_{int(time.time())}.xlsx" + wb.save(str(out)) + return str(out) # ════════════════════════════════════════════════════════════════════════════ -# WAND TOOL — flood-fill colour-region selector on the annotated image -# -# How it works: -# • The annotated image has each room painted a distinct flat colour. -# • A click gives us (x, y). We sample the colour at that pixel. -# • We flood-fill from that seed on the colour image with a small -# tolerance, returning the set of pixels that belong to that colour blob. -# • We then find which room contour overlaps those pixels most → select it. +# WAND TOOL — flood-fill colour-region selector on annotated image # ═══════════════════════════════════════════════════���════════════════════════ -def wand_select_room( - x: int, y: int, - annotated_bgr: np.ndarray, - rooms: List[Dict], - tolerance: int = 30, -) -> Optional[int]: - """ - Return the room id under pixel (x, y) using a colour flood-fill wand. - tolerance: max per-channel colour distance for flood-fill neighbour inclusion. - Returns None if no room found. - """ +def wand_select_room(x: int, y: int, + annotated_bgr: np.ndarray, + rooms: List[Dict], + tolerance: int = 30) -> Optional[int]: h, w = annotated_bgr.shape[:2] if not (0 <= x < w and 0 <= y < h): return None - - # Fast path: just hit-test contours (works well after SAM) + # Fast contour hit-test first for room in rooms: cnt = room.get("contour") - if cnt is None: - continue - if cv2.pointPolygonTest(cnt, (float(x), float(y)), False) >= 0: + if cnt is not None and cv2.pointPolygonTest(cnt, (float(x), float(y)), False) >= 0: return room["id"] - - # Fallback: colour flood-fill wand (handles edge cases / aliased borders) - seed_colour = annotated_bgr[y, x].astype(np.int32) - if np.all(seed_colour > 240): - return None # clicked on white background / label box - - mask = np.zeros((h + 2, w + 2), dtype=np.uint8) + # Colour flood-fill fallback + if np.all(annotated_bgr[y, x].astype(np.int32) > 240): + return None + mask = np.zeros((h + 2, w + 2), dtype=np.uint8) flood = annotated_bgr.copy() - flags = (4 # 4-connectivity - | cv2.FLOODFILL_MASK_ONLY - | (255 << 8)) # fill value in mask - cv2.floodFill( - flood, mask, - seedPoint=(x, y), - newVal=(0, 0, 0), - loDiff=(tolerance,)*3, - upDiff=(tolerance,)*3, - flags=flags, - ) - wand_mask = mask[1:-1, 1:-1] # strip the 2-px border + cv2.floodFill(flood, mask, (x, y), (0, 0, 0), + loDiff=(tolerance,)*3, upDiff=(tolerance,)*3, + flags=4 | cv2.FLOODFILL_MASK_ONLY | (255 << 8)) + wand_mask = mask[1:-1, 1:-1] if not np.any(wand_mask): return None - - # Which room contour overlaps the wand mask most? - best_id, best_overlap = None, 0 + best_id, best_ov = None, 0 for room in rooms: cnt = room.get("contour") - if cnt is None: - continue - room_mask = np.zeros((h, w), dtype=np.uint8) - cv2.drawContours(room_mask, [cnt], -1, 255, -1) - overlap = int(np.count_nonzero(cv2.bitwise_and(room_mask, wand_mask))) - if overlap > best_overlap: - best_overlap = overlap - best_id = room["id"] - + if cnt is None: continue + rm = np.zeros((h, w), dtype=np.uint8) + cv2.drawContours(rm, [cnt], -1, 255, -1) + ov = int(np.count_nonzero(cv2.bitwise_and(rm, wand_mask))) + if ov > best_ov: best_ov = ov; best_id = room["id"] return best_id # ════════════════════════════════════════════════════════════════════════════ -# DOOR CLOSING TOOL — interactive rubber-band line drawing -# -# Two modes available (user picks via radio): -# LINE — two-click straight line (existing behaviour, improved UX) -# BRUSH — click+drag freehand stroke burned as thick wall pixels -# -# The wall canvas shows: -# • existing wall mask in white -# • committed user lines in red -# • live rubber-band preview in cyan (pending first→cursor) +# STATE # ════════════════════════════════════════════════════════════════════════════ -def render_wall_canvas( - walls_base: np.ndarray, - user_lines: List[Tuple[int,int,int,int]], - thickness: int, - pending_start: Optional[Tuple[int,int]] = None, - hover_pt: Optional[Tuple[int,int]] = None, -) -> np.ndarray: - """Render walls + committed lines + optional rubber-band preview.""" - # Apply committed lines to the mask - mask = walls_base.copy() - for x1, y1, x2, y2 in user_lines: - cv2.line(mask, (x1, y1), (x2, y2), 255, max(thickness, 3)) - - vis = cv2.cvtColor(mask, cv2.COLOR_GRAY2RGB) - - # Draw committed lines in red on top for visibility - for x1, y1, x2, y2 in user_lines: - cv2.line(vis, (x1, y1), (x2, y2), (255, 60, 60), max(thickness, 3)) - - # Rubber-band preview in cyan - if pending_start and hover_pt: - cv2.line(vis, pending_start, hover_pt, (0, 220, 255), 2) - cv2.circle(vis, pending_start, 7, (0, 220, 255), -1) - cv2.circle(vis, hover_pt, 5, (0, 220, 255), 2) - elif pending_start: - cv2.circle(vis, pending_start, 7, (0, 220, 255), -1) - - return vis - - - -def init_state(): +def init_state() -> Dict: return { - "img_orig": None, - "img_cropped": None, - "img_clean": None, - "walls": None, - "walls_base": None, - "wall_cal": None, - "user_lines": [], # committed door-closing lines - "draw_start": None, # pending line start (x,y) or None - "walls_thickness": 8, - # wand / annotation state - "rooms": [], - "selected_ids": [], - "annotated": None, - # active tool: "wand" | "line" - "active_tool": "wand", - "status": "Idle", + "img_orig": None, + "img_cropped": None, + "img_clean": None, + "walls": None, + "wall_thick": 8, + "rooms": [], + "selected_ids": [], + "annotated": None, } # ════════════════════════════════════════════════════════════════════════════ -# GRADIO CALLBACKS (unchanged logic, GPU benefits come from helpers above) +# GRADIO CALLBACKS # ════════════════════════════════════════════════════════════════════════════ def cb_load_image(upload, state): if upload is None: return None, state, "Upload a floor-plan image to begin." try: - if hasattr(upload,"name"): file_path=upload.name - elif isinstance(upload,dict) and "name" in upload: file_path=upload["name"] - elif isinstance(upload,str): file_path=upload - else: - img_bgr=cv2.imdecode(np.frombuffer(bytes(upload),dtype=np.uint8),cv2.IMREAD_COLOR) - file_path=None - if file_path is not None: img_bgr=cv2.imread(file_path) + fp = (upload.name if hasattr(upload, "name") + else upload["name"] if isinstance(upload, dict) else upload) + img = cv2.imread(fp) if isinstance(fp, str) else \ + cv2.imdecode(np.frombuffer(bytes(upload), dtype=np.uint8), cv2.IMREAD_COLOR) except Exception as e: - return None, state, f"❌ Error reading upload: {e}" - if img_bgr is None: return None, state, "❌ Could not decode image." - state=init_state(); state["img_orig"]=img_bgr; state["status"]="Image loaded." - return cv2.cvtColor(img_bgr,cv2.COLOR_BGR2RGB), state, f"✅ Loaded {img_bgr.shape[1]}×{img_bgr.shape[0]} px" + return None, state, f"❌ {e}" + if img is None: + return None, state, "❌ Could not decode image." + state = init_state(); state["img_orig"] = img + return cv2.cvtColor(img, cv2.COLOR_BGR2RGB), state, \ + f"✅ Loaded {img.shape[1]}×{img.shape[0]} px" def cb_preprocess(state): - img=state.get("img_orig") - if img is None: return None,None,state,"Load an image first." + img = state.get("img_orig") + if img is None: + return None, None, state, "Load an image first." cropped = remove_title_block(img) img_clean = remove_colors(cropped) - img_clean = detect_and_close_door_arcs(img_clean) - img_stats = analyze_image_characteristics(cropped) - walls, thick = extract_walls_adaptive(img_clean, img_stats) - walls = remove_fixture_symbols(walls) - walls, cal = reconstruct_walls(walls) - walls = remove_dangling_lines(walls, cal) - walls = close_large_door_gaps(walls, cal) - state["img_cropped"]=cropped; state["img_clean"]=img_clean - state["walls"]=walls.copy(); state["walls_base"]=walls.copy() - state["walls_thickness"]=thick; state["wall_cal"]=cal - walls_rgb = cv2.cvtColor(walls,cv2.COLOR_GRAY2RGB) - clean_rgb = cv2.cvtColor(img_clean,cv2.COLOR_BGR2RGB) - msg=(f"✅ Pipeline done | stroke≈{cal.stroke_width}px body≈{thick}px " - f"bridge=[{cal.bridge_min_gap},{cal.bridge_max_gap}] door={cal.door_gap}px " - f"| GPU: torch={_TORCH_CUDA} cupy={_CUPY} cv2_cuda={_CV2_CUDA}") - return clean_rgb, walls_rgb, state, msg - - -def cb_add_door_line(evt: gr.SelectData, state): - """Two-click line drawing on wall mask with rubber-band preview.""" - walls_base = state.get("walls_base") - if walls_base is None: - return None, state, "Run preprocessing first." - - x, y = int(evt.index[0]), int(evt.index[1]) - thick = state.get("walls_thickness", 8) - - if state["draw_start"] is None: - # First click — record start, show pending dot - state["draw_start"] = (x, y) - vis = render_wall_canvas(walls_base, state["user_lines"], thick, - pending_start=(x, y)) - msg = f"🖊 Start set ({x},{y}) — click endpoint to close door gap." - else: - # Second click — commit line - x1, y1 = state["draw_start"] - state["user_lines"].append((x1, y1, x, y)) - state["draw_start"] = None - - # Rebuild walls with all lines applied - walls_upd = apply_user_lines_to_walls( - walls_base, state["user_lines"], thick - ) - state["walls"] = walls_upd - vis = render_wall_canvas(walls_base, state["user_lines"], thick) - msg = (f"✅ Door line ({x1},{y1})→({x},{y}) committed. " - f"Total lines: {len(state['user_lines'])}") - - return vis, state, msg - - -def cb_undo_door_line(state): - if not state["user_lines"]: - return None, state, "No lines to undo." - state["user_lines"].pop() - state["draw_start"] = None - - walls_base = state.get("walls_base") - if walls_base is None: - return None, state, "Re-run preprocessing." - - thick = state.get("walls_thickness", 8) - walls_upd = apply_user_lines_to_walls(walls_base, state["user_lines"], thick) - state["walls"] = walls_upd - vis = render_wall_canvas(walls_base, state["user_lines"], thick) - return vis, state, f"↩ Last line removed. Remaining: {len(state['user_lines'])}" - - -def cb_clear_door_lines(state): - """Remove all user-drawn door lines.""" - state["user_lines"] = [] - state["draw_start"] = None - walls_base = state.get("walls_base") - if walls_base is None: - return None, state, "Run preprocessing first." - state["walls"] = walls_base.copy() - vis = render_wall_canvas(walls_base, [], state.get("walls_thickness", 8)) - return vis, state, "🗑 All door lines cleared." - - -def cb_set_tool(tool_name: str, state): - """Switch active tool between 'wand' and 'line'.""" - state["active_tool"] = tool_name - state["draw_start"] = None # cancel any pending line start - return state, f"🔧 Active tool: {tool_name.upper()}" + walls, thick = extract_walls(img_clean) + state["img_cropped"] = cropped + state["img_clean"] = img_clean + state["walls"] = walls + state["wall_thick"] = thick + return (cv2.cvtColor(img_clean, cv2.COLOR_BGR2RGB), + cv2.cvtColor(walls, cv2.COLOR_GRAY2RGB), + state, + f"✅ Walls extracted body≈{thick}px | " + f"GPU: torch={_TORCH_CUDA} cupy={_CUPY} cv2_cuda={_CV2_CUDA}") def cb_run_sam(state): - walls=state.get("walls"); img=state.get("img_cropped"); img_clean=state.get("img_clean") - if walls is None or img is None: return None,None,state,"Run preprocessing first." - img_rgb=cv2.cvtColor(img,cv2.COLOR_BGR2RGB) - ckpt=download_sam_if_needed() - sam_enabled=ckpt is not None and Path(ckpt).exists() - if sam_enabled: - rooms_mask,sam_room_masks=segment_with_sam(img_rgb,walls.copy(),ckpt) + walls = state.get("walls"); img = state.get("img_cropped") + img_clean = state.get("img_clean") + if walls is None or img is None: + return None, None, state, "Run preprocessing first." + ckpt = download_sam_if_needed() + if ckpt: + img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) + rooms_mask, sam_masks = segment_with_sam(img_rgb, walls.copy(), ckpt) else: - rooms_mask=segment_rooms_flood(walls.copy()); sam_room_masks=[] - state["_sam_room_masks"]=sam_room_masks - if not np.count_nonzero(rooms_mask): - return None,None,state,"⚠ rooms_mask empty." - valid_mask,valid_rooms=filter_room_regions(rooms_mask,img.shape) - if not valid_rooms: return None,None,state,"⚠ No valid rooms." - src=img_clean if img_clean is not None else img - rooms=measure_and_label_rooms(src,valid_rooms,sam_room_masks) - if not rooms: return None,None,state,"⚠ No rooms after OCR." - state["rooms"]=rooms; state["selected_ids"]=[] - annotated=build_annotated_image(img,rooms); state["annotated"]=annotated - table=[[r["id"],r["label"],f"{r['area_m2']} m²",f"{r['score']:.2f}"] for r in rooms] - return cv2.cvtColor(annotated,cv2.COLOR_BGR2RGB),table,state,f"✅ {len(rooms)} rooms detected." + rooms_mask = _flood_rooms(walls.copy()); sam_masks = [] + state["_sam_masks"] = sam_masks + valid_mask, valid_rooms = filter_room_regions(rooms_mask, img.shape) + if not valid_rooms: + return None, None, state, "⚠ No valid rooms found." + src = img_clean if img_clean is not None else img + rooms = measure_and_label_rooms(src, valid_rooms, sam_masks) + if not rooms: + return None, None, state, "⚠ No rooms after labelling." + state["rooms"] = rooms; state["selected_ids"] = [] + ann = build_annotated_image(img, rooms); state["annotated"] = ann + table = [[r["id"], r["label"], f"{r['area_m2']} m²", f"{r['score']:.2f}"] + for r in rooms] + return cv2.cvtColor(ann, cv2.COLOR_BGR2RGB), table, state, \ + f"✅ {len(rooms)} rooms detected." def cb_click_room(evt: gr.SelectData, state): - """ - Unified click handler for the annotated image. - Uses the WAND tool — colour flood-fill region detection — to identify - which room was clicked, then toggles selection. - Falls back to contour hit-test if wand finds nothing. - """ - annotated = state.get("annotated") - rooms = state.get("rooms", []) - img = state.get("img_cropped") + annotated = state.get("annotated"); rooms = state.get("rooms", []) + img = state.get("img_cropped") if annotated is None or not rooms: return None, state, "Run SAM first." - x, y = int(evt.index[0]), int(evt.index[1]) - - # ── WAND: colour-aware region selection ────────────────────────────── - clicked_id = wand_select_room(x, y, annotated, rooms, tolerance=30) - - if clicked_id is None: - state["selected_ids"] = [] - msg = "Clicked outside all rooms — selection cleared." + rid = wand_select_room(x, y, annotated, rooms) + if rid is None: + state["selected_ids"] = []; msg = "Clicked outside — selection cleared." else: sel = state["selected_ids"] - if clicked_id in sel: - sel.remove(clicked_id) - msg = f"🪄 Room {clicked_id} deselected." - else: - sel.append(clicked_id) - msg = f"🪄 Room {clicked_id} selected." + if rid in sel: sel.remove(rid); msg = f"🪄 Room {rid} deselected." + else: sel.append(rid); msg = f"🪄 Room {rid} selected." state["selected_ids"] = sel - - new_ann = build_annotated_image(img, rooms, state["selected_ids"]) - state["annotated"] = new_ann - return cv2.cvtColor(new_ann, cv2.COLOR_BGR2RGB), state, msg + ann = build_annotated_image(img, rooms, state["selected_ids"]) + state["annotated"] = ann + return cv2.cvtColor(ann, cv2.COLOR_BGR2RGB), state, msg def cb_remove_selected(state): - sel=state.get("selected_ids",[]); rooms=state.get("rooms",[]); img=state.get("img_cropped") - if not sel: return None,None,state,"No rooms selected." - removed=[r["label"] for r in rooms if r["id"] in sel] - rooms=[r for r in rooms if r["id"] not in sel] - for i,r in enumerate(rooms,1): r["id"]=i - state["rooms"]=rooms; state["selected_ids"]=[] - ann=build_annotated_image(img,rooms); state["annotated"]=ann - table=[[r["id"],r["label"],f"{r['area_m2']} m²",f"{r['score']:.2f}"] for r in rooms] - return cv2.cvtColor(ann,cv2.COLOR_BGR2RGB),table,state,f"🗑 Removed:{', '.join(removed)}" + sel = state.get("selected_ids", []); rooms = state.get("rooms", []) + img = state.get("img_cropped") + if not sel: return None, None, state, "No rooms selected." + removed = [r["label"] for r in rooms if r["id"] in sel] + rooms = [r for r in rooms if r["id"] not in sel] + for i, r in enumerate(rooms, 1): r["id"] = i + state["rooms"] = rooms; state["selected_ids"] = [] + ann = build_annotated_image(img, rooms); state["annotated"] = ann + table = [[r["id"], r["label"], f"{r['area_m2']} m²", f"{r['score']:.2f}"] + for r in rooms] + return cv2.cvtColor(ann, cv2.COLOR_BGR2RGB), table, state, \ + f"🗑 Removed: {', '.join(removed)}" def cb_rename_selected(new_label, state): - sel=state.get("selected_ids",[]); rooms=state.get("rooms",[]); img=state.get("img_cropped") - if not sel: return None,None,state,"Select a room first." - if not new_label.strip(): return None,None,state,"Enter a non-empty label." + sel = state.get("selected_ids", []); rooms = state.get("rooms", []) + img = state.get("img_cropped") + if not sel: return None, None, state, "Select a room first." + if not new_label.strip(): return None, None, state, "Enter a label." for r in rooms: - if r["id"] in sel: r["label"]=new_label.strip().upper() - state["rooms"]=rooms - ann=build_annotated_image(img,rooms,sel); state["annotated"]=ann - table=[[r["id"],r["label"],f"{r['area_m2']} m²",f"{r['score']:.2f}"] for r in rooms] - return cv2.cvtColor(ann,cv2.COLOR_BGR2RGB),table,state,f"✏ Renamed to '{new_label.strip().upper()}'" + if r["id"] in sel: r["label"] = new_label.strip().upper() + state["rooms"] = rooms + ann = build_annotated_image(img, rooms, sel); state["annotated"] = ann + table = [[r["id"], r["label"], f"{r['area_m2']} m²", f"{r['score']:.2f}"] + for r in rooms] + return cv2.cvtColor(ann, cv2.COLOR_BGR2RGB), table, state, \ + f"✏ Renamed → '{new_label.strip().upper()}'" def cb_export_excel(state): - rooms=state.get("rooms",[]) - if not rooms: return None,"No rooms to export." - path=export_to_excel(rooms) - return path,f"✅ Exported {len(rooms)} rooms → {Path(path).name}" + rooms = state.get("rooms", []) + if not rooms: return None, "No rooms to export." + path = export_to_excel(rooms) + return path, f"✅ Exported {len(rooms)} rooms → {Path(path).name}" # ════════════════════════════════════════════════════════════════════════════ @@ -1743,46 +880,36 @@ def cb_export_excel(state): CSS = """ #title{text-align:center;font-size:1.8em;font-weight:700;color:#1F4E79} -#subtitle{text-align:center;color:#666;margin-top:-6px;margin-bottom:14px;font-size:0.9em} -.step-card{border-left:4px solid #1F4E79!important;padding-left:10px!important;border-radius:4px} -.tool-active{background:#e8f4fd!important;border:2px solid #1F4E79!important;border-radius:6px} -.tool-btn{border-radius:6px!important} -.wand-hint{font-size:0.82em;color:#888;margin-top:4px} +#subtitle{text-align:center;color:#666;margin-top:-6px;margin-bottom:14px;font-size:0.85em} +.card{border-left:4px solid #1F4E79!important;padding-left:10px!important;border-radius:4px} +.hint{font-size:0.82em;color:#888;margin-top:4px} """ -def _walls_to_rgb(s): - wb = s.get("walls_base") - ul = s.get("user_lines", []) - th = s.get("walls_thickness", 8) - if wb is None: - return None - return render_wall_canvas(wb, ul, th) - - -with gr.Blocks(title="FloorPlan Analyser (GPU)", css=CSS) as app: +with gr.Blocks(title="FloorPlan Analyser", css=CSS) as app: state = gr.State(init_state()) - gr.Markdown("# 🏢 Floor Plan Room Analyser — NVIDIA GPU Build", elem_id="title") + gr.Markdown("# 🏢 Floor Plan Room Analyser", elem_id="title") gr.Markdown( f"EasyOCR gpu={'✅' if _TORCH_CUDA else '❌'} | " - f"SAM FP16 autocast={'✅' if _TORCH_CUDA else '❌'} | " + f"SAM FP16={'✅' if _TORCH_CUDA else '❌'} | " f"CuPy={'✅' if _CUPY else '❌'} | " f"cv2.cuda={'✅' if _CV2_CUDA else '❌'} | " - f"OCR warm-up: background thread started at import", + f"OCR warm-up: background thread", elem_id="subtitle", ) - - status_box = gr.Textbox(label="Status", interactive=False, value="Idle — upload a floor plan.") + status_box = gr.Textbox(label="Status", interactive=False, + value="Idle — upload a floor plan.") # ── Row 1: Upload + Preprocess ─────────────────────────────────────────── with gr.Row(): - with gr.Column(scale=1, elem_classes="step-card"): - gr.Markdown("### 1️⃣ Upload Floor Plan") - upload_btn = gr.UploadButton("📂 Upload Image", file_types=["image"], size="sm") - raw_preview = gr.Image(label="Loaded Image", height=300) - - with gr.Column(scale=1, elem_classes="step-card"): - gr.Markdown("### 2️⃣ Pre-process (Crop → De-color → Walls)") + with gr.Column(scale=1, elem_classes="card"): + gr.Markdown("### 1️⃣ Upload") + upload_btn = gr.UploadButton("📂 Upload Image", + file_types=["image"], size="sm") + raw_preview = gr.Image(label="Loaded", height=300) + + with gr.Column(scale=1, elem_classes="card"): + gr.Markdown("### 2️⃣ Preprocess (Crop → De-colour → Walls)") preprocess_btn = gr.Button("⚙ Run Preprocessing", variant="primary") with gr.Tabs(): with gr.Tab("Clean Image"): @@ -1790,126 +917,57 @@ with gr.Blocks(title="FloorPlan Analyser (GPU)", css=CSS) as app: with gr.Tab("Walls"): walls_img = gr.Image(label="Extracted walls", height=280) - # ── Row 2: Door Closing Tool ───────────────────────────────────────────── + # ── Row 2: SAM + Results ───────────────────────────────────────────────── with gr.Row(): - with gr.Column(elem_classes="step-card"): - gr.Markdown("### 3️⃣ Door-Closing Line Tool") - gr.Markdown( - "**How to use:** Click once on the wall image to set the **start** of a " - "closing line, then click again for the **end**. The line is burned into " - "the wall mask (shown in 🔴 red). This prevents rooms from leaking through " - "open door gaps before SAM runs.", - elem_classes=["wand-hint"], - ) - with gr.Row(): - undo_line_btn = gr.Button("↩ Undo Last", size="sm") - clear_lines_btn = gr.Button("🗑 Clear All", size="sm", variant="stop") - wall_draw_img = gr.Image( - label="Wall mask — click start then end to draw a door-closing line", - height=400, - interactive=False, - ) - - # ── Row 3: SAM + Annotation (with Wand tool) ──────────────────────────── - with gr.Row(): - with gr.Column(scale=2, elem_classes="step-card"): - gr.Markdown("### 4️⃣ SAM Segmentation + OCR") + with gr.Column(scale=2, elem_classes="card"): + gr.Markdown("### 3️⃣ SAM + OCR") sam_btn = gr.Button("🤖 Run SAM + OCR", variant="primary") gr.Markdown( - "**🪄 Wand Tool** — click any coloured room region to select / deselect it. " - "Uses colour flood-fill for pixel-accurate selection even near boundaries.", - elem_classes=["wand-hint"], - ) - ann_img = gr.Image( - label="Annotated rooms — 🪄 click to select/deselect with Wand", - height=500, - interactive=False, + "**🪄 Wand** — click any coloured room to select / deselect.", + elem_classes=["hint"], ) + ann_img = gr.Image(label="Annotated rooms", height=520, + interactive=False) - with gr.Column(scale=1, elem_classes="step-card"): - gr.Markdown("### 5️⃣ Room Table & Actions") + with gr.Column(scale=1, elem_classes="card"): + gr.Markdown("### 4️⃣ Rooms & Export") room_table = gr.Dataframe( - headers=["ID", "Label", "Area", "SAM Score"], + headers=["ID", "Label", "Area", "Score"], datatype=["number", "str", "str", "str"], - interactive=False, - label="Detected Rooms", + interactive=False, label="Detected Rooms", ) - with gr.Group(): - gr.Markdown("**Edit selected room(s)**") - rename_txt = gr.Textbox(placeholder="New label…", label="Rename Label") + rename_txt = gr.Textbox(placeholder="New label…", + label="Rename selected") with gr.Row(): rename_btn = gr.Button("✏ Rename", size="sm") - remove_btn = gr.Button("🗑 Remove Selected", size="sm", variant="stop") - + remove_btn = gr.Button("🗑 Remove Selected", size="sm", + variant="stop") gr.Markdown("---") export_btn = gr.Button("📊 Export to Excel", variant="secondary") - excel_file = gr.File(label="Download Excel", visible=True) + excel_file = gr.File(label="Download Excel") - # ── Event Wiring ───────────────────────────────────────────────────────── + # ── Wiring ─────────────────────────────────────────────────────────────── + upload_btn.upload(cb_load_image, + [upload_btn, state], [raw_preview, state, status_box]) - upload_btn.upload( - cb_load_image, - inputs=[upload_btn, state], - outputs=[raw_preview, state, status_box], - ) + preprocess_btn.click(cb_preprocess, + [state], [clean_img, walls_img, state, status_box]) - preprocess_btn.click( - cb_preprocess, - inputs=[state], - outputs=[clean_img, walls_img, state, status_box], - ).then( - _walls_to_rgb, - inputs=[state], - outputs=[wall_draw_img], - ) + sam_btn.click(cb_run_sam, + [state], [ann_img, room_table, state, status_box]) - # Door line drawing (two-click on wall image) - wall_draw_img.select( - cb_add_door_line, - inputs=[state], - outputs=[wall_draw_img, state, status_box], - ) - undo_line_btn.click( - cb_undo_door_line, - inputs=[state], - outputs=[wall_draw_img, state, status_box], - ) - clear_lines_btn.click( - cb_clear_door_lines, - inputs=[state], - outputs=[wall_draw_img, state, status_box], - ) + ann_img.select(cb_click_room, + [state], [ann_img, state, status_box]) - # SAM run - sam_btn.click( - cb_run_sam, - inputs=[state], - outputs=[ann_img, room_table, state, status_box], - ) + remove_btn.click(cb_remove_selected, + [state], [ann_img, room_table, state, status_box]) - # Wand selection on annotated image - ann_img.select( - cb_click_room, - inputs=[state], - outputs=[ann_img, state, status_box], - ) + rename_btn.click(cb_rename_selected, + [rename_txt, state], [ann_img, room_table, state, status_box]) - remove_btn.click( - cb_remove_selected, - inputs=[state], - outputs=[ann_img, room_table, state, status_box], - ) - rename_btn.click( - cb_rename_selected, - inputs=[rename_txt, state], - outputs=[ann_img, room_table, state, status_box], - ) - export_btn.click( - cb_export_excel, - inputs=[state], - outputs=[excel_file, status_box], - ) + export_btn.click(cb_export_excel, + [state], [excel_file, status_box]) if __name__ == "__main__":