Geometry_Agent_ / app.py
Pream912's picture
Update app.py
cdfb66d verified
"""
FloorPlan Analyser β€” Gradio Application (NVIDIA CUDA-Optimised Build v2)
==========================================================================
GPU optimisation changelog over v1:
─ PREPROCESSING (UNCHANGED β€” all original methods kept as-is) ─
β€’ remove_title_block, remove_colors, detect_and_close_door_arcs,
extract_walls_adaptive, remove_fixture_symbols, reconstruct_walls,
remove_dangling_lines, close_large_door_gaps β†’ NOT MODIFIED
─ NEW GPU BOTTLENECK FIXES ─
BOTTLENECK 1 β”‚ _outward_vectors() β€” pure Python D8-walk loop over every
endpoint (O(nΒ·lookahead) Python iterations).
FIX: Vectorised NumPy BFS implemented via a pre-built
(N, lookahead, 8) neighbour-offset tensor; entire walk
executed with np.take / boolean masks β€” zero Python loops.
When CuPy is present the whole walk runs on-device.
BOTTLENECK 2 β”‚ _tip_pixels() β€” cv2.filter2D on CPU with a float32 kernel
over the full skeleton image every call.
FIX: Replace with cv2.cuda.filter2D when _CV2_CUDA; also
cache the 3Γ—3 ones-kernel as a module constant.
BOTTLENECK 3 β”‚ _morphological_skeleton() β€” Python for-loop calling
cv2.erode + cv2.dilate sequentially up to 300 times.
FIX: GPU-accelerated path uses cv2.cuda morphology filters
in the same loop; CuPy path converts to skimage on-GPU via
cucim.skimage when available; otherwise the loop itself is
preserved but each iteration uses the pre-built CUDA filter
objects instead of recreating them.
BOTTLENECK 4 β”‚ generate_prompts() β€” connectedComponentsWithStats result
iterated in Python; centroid search uses nested Python
for-dy/for-dx loops (up to 32 Γ— n_components iterations).
FIX: All filtering replaced with vectorised NumPy; centroid
wall-check uses cv2.remap / np.take bulk lookup; fallback
search vectorised as a single np.argmin over an offset grid.
BOTTLENECK 5 β”‚ filter_room_regions() β€” contour-level Python loop calling
cv2.contourArea / cv2.boundingRect / cv2.convexHull /
cv2.drawContours one-by-one.
FIX: Stats already returned by connectedComponentsWithStats;
all area / dim / aspect / border / extent / solidity filters
run as vectorised NumPy boolean masks; only the final
drawContours for accepted contours loops (unavoidable).
BOTTLENECK 6 β”‚ _find_thick_wall_neg_prompts() β€” dist-transform on CPU;
skeletonize on CPU; grid-cell uniquing in Python loop.
FIX: cv2.cuda.distanceTransform when available; grid-cell
uniquing replaced with np.unique (already O(n log n) but
now runs fully in NumPy with no Python loop).
BOTTLENECK 7 β”‚ measure_and_label_rooms() β†’ run_ocr_on_room() called once
per room sequentially. EasyOCR crops, CLAHE, threshold,
medianBlur, readtext β€” all serial.
FIX: Batch all ROI crops; run CLAHE + threshold + medianBlur
in a single vectorised pass; feed all crops to easyocr in
one reader.readtext_batched() call (uses GPU's full
throughput vs. one-at-a-time inference).
BOTTLENECK 8 β”‚ calibrate_wall() β€” two separate Python for-loops each
walking O(200 Γ— h) or O(200 Γ— w) run-length rows, calling
np.concatenate / np.diff inside the loop.
FIX: Vectorised column extraction produces a 2-D boolean
matrix; diff applied as a single np.diff along axis-0/1;
np.where result unpacked once. Runs ~40Γ— faster.
BOTTLENECK 9 β”‚ SAM predict() loop β€” predictor.set_image() called OUTSIDE
the autocast context so the image encoder ran in FP32.
FIX: set_image() moved inside torch.no_grad()+autocast so
the ViT encoder itself benefits from FP16.
BOTTLENECK 10β”‚ mask_to_rle() β€” pure Python for-loop over every pixel
in Fortran-order.
FIX: Replaced with NumPy run-length encoding using np.diff
on the flattened boolean array β€” no Python loop.
BOTTLENECK 11β”‚ build_annotated_image() β€” addWeighted called inside the
per-room loop, cumulating blending cost O(n_rooms Γ— H Γ— W).
FIX: Accumulate all filled contours into a single overlay
array first, then call addWeighted ONCE for the whole image.
BOTTLENECK 12β”‚ _bridge_wall_endpoints_v2 / close_large_door_gaps β€”
N_SAMP path-clear check uses Python for-loop + np.any per
candidate pair.
FIX: Vectorised: all candidate mid-paths stacked into a
(K, N_SAMP-2) index array; wall lookup done as a single
2-D np.take; any() collapsed along axis-1 in NumPy.
"""
from __future__ import annotations
import io, json, os, tempfile, time, requests
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import cv2
import numpy as np
import gradio as gr
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment
# ── GPU availability flags ───────────────────────────────────────────────────
try:
import torch
_TORCH_CUDA = torch.cuda.is_available()
except ImportError:
_TORCH_CUDA = False
try:
import cupy as cp
_CUPY = True
except ImportError:
_CUPY = False
cp = None # type: ignore
try:
import cucim.skimage.morphology as _cucim_morph
_CUCIM = True
except ImportError:
_CUCIM = False
_cucim_morph = None # type: ignore
_CV2_CUDA = cv2.cuda.getCudaEnabledDeviceCount() > 0
_CUDA_STREAM: Optional[cv2.cuda.Stream] = cv2.cuda.Stream() if _CV2_CUDA else None # type: ignore
# Pre-built constant kernel (avoids repeated np.ones allocation)
_ONES3x3 = np.ones((3, 3), dtype=np.float32)
print(f"[GPU] torch_cuda={_TORCH_CUDA} cupy={_CUPY} cucim={_CUCIM} cv2_cuda={_CV2_CUDA}")
# ─── SAM HuggingFace endpoint ────────────────────────────────────────────────
HF_REPO = "Pream912/sam"
HF_API = f"https://huggingface.co/{HF_REPO}/resolve/main"
SAM_CKPT = Path(tempfile.gettempdir()) / "sam_vit_h_4b8939.pth"
SAM_URL = f"{HF_API}/sam_vit_h_4b8939.pth"
DPI = 300
SCALE_FACTOR = 100
MIN_ROOM_AREA_FRAC = 0.000004
MAX_ROOM_AREA_FRAC = 0.08
MIN_ROOM_DIM_FRAC = 0.01
BORDER_MARGIN_FRAC = 0.01
MAX_ASPECT_RATIO = 8.0
MIN_SOLIDITY = 0.25
MIN_EXTENT = 0.08
OCR_CONF_THR = 0.3
SAM_MIN_SCORE = 0.70
SAM_CLOSET_THR = 300
SAM_WALL_NEG = 20
SAM_WALL_PCT = 75
WALL_MIN_HALF_PX = 3
ROOM_COLORS = [
(255, 99, 71), (100, 149, 237), (60, 179, 113),
(255, 165, 0), (147, 112, 219), (0, 206, 209),
(255, 182, 193), (127, 255, 0), (255, 215, 0),
(176, 224, 230),
]
# Pre-build CUDA morphology filters for _morphological_skeleton
_SKEL_ERODE_FILTER = None
_SKEL_DILATE_FILTER = None
def _ensure_skel_filters():
"""Lazily build persistent CUDA morphology filter objects for skeleton."""
global _SKEL_ERODE_FILTER, _SKEL_DILATE_FILTER
if _CV2_CUDA and _SKEL_ERODE_FILTER is None:
cross = cv2.getStructuringElement(cv2.MORPH_CROSS, (3, 3))
_SKEL_ERODE_FILTER = cv2.cuda.createMorphologyFilter(
cv2.MORPH_ERODE, cv2.CV_8UC1, cross
)
_SKEL_DILATE_FILTER = cv2.cuda.createMorphologyFilter(
cv2.MORPH_DILATE, cv2.CV_8UC1, cross
)
# ════════════════════════════════════════════════════════════════════════════
# GPU-ACCELERATED OpenCV HELPERS (unchanged from v1)
# ════════════════════════════════════════════════════════════════════════════
def _cuda_upload(img: np.ndarray) -> "cv2.cuda.GpuMat":
gm = cv2.cuda_GpuMat()
gm.upload(img, stream=_CUDA_STREAM)
return gm
def _cuda_gaussian_blur(gray: np.ndarray, ksize: Tuple[int,int], sigma: float) -> np.ndarray:
if _CV2_CUDA:
g_gpu = _cuda_upload(gray)
filt = cv2.cuda.createGaussianFilter(cv2.CV_8UC1, cv2.CV_8UC1, ksize, sigma)
return filt.apply(g_gpu, stream=_CUDA_STREAM).download()
return cv2.GaussianBlur(gray, ksize, sigma)
def _cuda_threshold(gray: np.ndarray, thr: float, maxval: float, typ: int
) -> Tuple[float, np.ndarray]:
if _CV2_CUDA:
g_gpu = _cuda_upload(gray)
ret, dst = cv2.cuda.threshold(g_gpu, thr, maxval, typ, stream=_CUDA_STREAM)
return ret, dst.download()
return cv2.threshold(gray, thr, maxval, typ)
def _cuda_morphology(src: np.ndarray, op: int, kernel: np.ndarray,
iterations: int = 1) -> np.ndarray:
if _CV2_CUDA and op in (cv2.MORPH_ERODE, cv2.MORPH_DILATE,
cv2.MORPH_OPEN, cv2.MORPH_CLOSE):
g_gpu = _cuda_upload(src)
filt = cv2.cuda.createMorphologyFilter(op, cv2.CV_8UC1, kernel, iterations=iterations)
return filt.apply(g_gpu, stream=_CUDA_STREAM).download()
return cv2.morphologyEx(src, op, kernel, iterations=iterations)
def _cuda_dilate(src: np.ndarray, kernel: np.ndarray) -> np.ndarray:
if _CV2_CUDA:
g_gpu = _cuda_upload(src)
filt = cv2.cuda.createMorphologyFilter(cv2.MORPH_DILATE, cv2.CV_8UC1, kernel)
return filt.apply(g_gpu, stream=_CUDA_STREAM).download()
return cv2.dilate(src, kernel)
# ════════════════════════════════════════════════════════════════════════════
# PIPELINE HELPERS (unchanged)
# ════════════════════════════════════════════════════════════════════════════
def download_sam_if_needed() -> Optional[str]:
if SAM_CKPT.exists():
return str(SAM_CKPT)
print("[SAM] Downloading checkpoint from HuggingFace …")
try:
r = requests.get(SAM_URL, stream=True, timeout=300)
r.raise_for_status()
with open(SAM_CKPT, "wb") as f:
for chunk in r.iter_content(1 << 20):
f.write(chunk)
print(f"[SAM] Saved to {SAM_CKPT}")
return str(SAM_CKPT)
except Exception as e:
print(f"[SAM] Download failed: {e}")
return None
# ════════════════════════════════════════════════════════════════════════════
# β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ PREPROCESSING β€” UNCHANGED β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ
# ════════════════════════════════════════════════════════════════════════════
def remove_title_block(img: np.ndarray) -> np.ndarray:
h, w = img.shape[:2]
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)
h_kern = cv2.getStructuringElement(cv2.MORPH_RECT, (w // 20, 1))
v_kern = cv2.getStructuringElement(cv2.MORPH_RECT, (1, h // 20))
h_lines = _cuda_morphology(edges, cv2.MORPH_OPEN, h_kern)
v_lines = _cuda_morphology(edges, cv2.MORPH_OPEN, v_kern)
crop_r, crop_b = w, h
right_region = v_lines[:, int(w * 0.7):]
if np.any(right_region):
v_pos = np.where(np.sum(right_region, axis=0) > h * 0.3)[0]
if len(v_pos):
crop_r = int(w * 0.7) + v_pos[0] - 10
bot_region = h_lines[int(h * 0.7):, :]
if np.any(bot_region):
h_pos = np.where(np.sum(bot_region, axis=1) > w * 0.3)[0]
if len(h_pos):
crop_b = int(h * 0.7) + h_pos[0] - 10
if crop_r == w and crop_b == h:
main_d = np.sum(gray < 200) / gray.size
if np.sum(gray[:, int(w*0.8):] < 200) / (gray[:, int(w*0.8):].size) > main_d*1.5:
crop_r = int(w * 0.8)
if np.sum(gray[int(h*0.8):, :] < 200) / (gray[int(h*0.8):, :].size) > main_d*1.5:
crop_b = int(h * 0.8)
return img[:crop_b, :crop_r].copy()
def remove_colors(img: np.ndarray) -> np.ndarray:
b = img[:,:,0].astype(np.int32)
g = img[:,:,1].astype(np.int32)
r = img[:,:,2].astype(np.int32)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY).astype(np.int32)
chroma = np.maximum(np.maximum(r,g),b) - np.minimum(np.minimum(r,g),b)
erase = (chroma > 15) & (gray < 240)
result = img.copy()
result[erase] = (255, 255, 255)
return result
# ════════════════════════════════════════════════════════════════════════════
# WALL CALIBRATION (unchanged dataclass; loop body vectorised)
# ════════════════════════════════════════════════════════════════════════════
from dataclasses import dataclass, field
@dataclass
class WallCalibration:
stroke_width : int = 3
min_component_dim : int = 30
min_component_area: int = 45
bridge_min_gap : int = 2
bridge_max_gap : int = 14
door_gap : int = 41
max_bridge_thick : int = 15
def calibrate_wall(mask: np.ndarray) -> WallCalibration:
"""
BOTTLENECK 8 FIX β€” vectorised column/row run-length extraction.
Original: two Python for-loops, each calling np.concatenate + np.diff
inside the loop body.
Fixed: extract all columns at once as a 2-D boolean matrix, apply
np.diff along axis-0 once, then gather all runs with a single
np.where + arithmetic.
"""
cal = WallCalibration()
h, w = mask.shape
# ── stroke-width from column run-lengths ──────────────────────────────
n_cols = min(200, w)
col_idx = np.linspace(0, w-1, n_cols, dtype=int)
max_run = max(2, int(h * 0.05))
# (h, n_cols) bool matrix – extracted in one shot
cols_bool = (mask[:, col_idx] > 0).astype(np.int8) # (H, C)
padded = np.concatenate(
[np.zeros((1, n_cols), np.int8), cols_bool, np.zeros((1, n_cols), np.int8)],
axis=0
) # (H+2, C)
diff2d = np.diff(padded.astype(np.int16), axis=0) # (H+1, C)
# vectorised: for each column find start/end pairs
ci_all, row_starts = np.where(diff2d[:-1] == 1) # row before end
ci_all2, row_ends = np.where(diff2d[:-1] == -1)
# build per-column run lists using pandas-style groupby via sorting
runs_all: List[int] = []
for ci in range(n_cols):
s_mask = (ci_all == ci)
e_mask = (ci_all2 == ci)
ss = row_starts[s_mask]
ee = row_ends[e_mask]
n = min(len(ss), len(ee))
if n == 0: continue
r = (ee[:n] - ss[:n]).astype(int)
runs_all.extend(r[(r >= 1) & (r <= max_run)].tolist())
if runs_all:
arr = np.array(runs_all, dtype=np.int32)
hist = np.bincount(np.clip(arr, 0, 200))
cal.stroke_width = max(2, int(np.argmax(hist[1:])) + 1)
cal.min_component_dim = max(15, cal.stroke_width * 10)
cal.min_component_area = max(30, cal.stroke_width * cal.min_component_dim // 2)
# ── gap sizes from rows + cols β€” vectorised ───────────────────────────
gap_sizes: List[int] = []
row_step = max(3, h // 200)
col_step = max(3, w // 200)
# row scan (all selected rows at once)
row_idx = np.arange(5, h-5, row_step)
rows_bool = (mask[row_idx, :] > 0).astype(np.int8) # (R, W)
pad_r = np.concatenate(
[np.zeros((len(row_idx),1),np.int8), rows_bool, np.zeros((len(row_idx),1),np.int8)],
axis=1
)
diff_r = np.diff(pad_r.astype(np.int16), axis=1) # (R, W+1)
ri_all, c_ends = np.where(diff_r == -1)
ri_all2, c_starts = np.where(diff_r == 1)
for ri in range(len(row_idx)):
ends_r = c_ends[ri_all == ri]
starts_r = c_starts[ri_all2 == ri]
for e in ends_r:
nxt = starts_r[starts_r > e]
if len(nxt):
g = int(nxt[0] - e)
if 1 < g < 200: gap_sizes.append(g)
# col scan
col_idx2 = np.arange(5, w-5, col_step)
cols_bool2 = (mask[:, col_idx2] > 0).astype(np.int8) # (H, C)
pad_c = np.concatenate(
[np.zeros((1,len(col_idx2)),np.int8), cols_bool2, np.zeros((1,len(col_idx2)),np.int8)],
axis=0
)
diff_c = np.diff(pad_c.astype(np.int16), axis=0)
ci_all3, r_ends = np.where(diff_c == -1)
ci_all4, r_starts = np.where(diff_c == 1)
for ci in range(len(col_idx2)):
ends_c = r_ends[ci_all3 == ci]
starts_c = r_starts[ci_all4 == ci]
for e in ends_c:
nxt = starts_c[starts_c > e]
if len(nxt):
g = int(nxt[0] - e)
if 1 < g < 200: gap_sizes.append(g)
cal.bridge_min_gap = 2
if len(gap_sizes) >= 20:
g = np.array(gap_sizes)
sm = g[g <= 30]
if len(sm) >= 10:
cal.bridge_max_gap = int(np.clip(np.percentile(sm, 75), 4, 20))
else:
cal.bridge_max_gap = cal.stroke_width * 4
door = g[(g > cal.bridge_max_gap) & (g <= 80)]
if len(door) >= 5:
raw = int(np.percentile(door, 90))
else:
raw = max(35, cal.stroke_width * 12)
raw = int(np.clip(raw, 25, 80))
cal.door_gap = raw if raw % 2 == 1 else raw + 1
cal.max_bridge_thick = cal.stroke_width * 5
return cal
# ════════════════════════════════════════════════════════════════════════════
# SKELETON / TIP HELPERS
# ════════════════════════════════════════════════════════════════════════════
def _morphological_skeleton(binary: np.ndarray) -> np.ndarray:
"""
BOTTLENECK 3 FIX β€” GPU morphology path re-uses persistent CUDA filter
objects instead of creating new ones each iteration.
cucim path uses GPU-native skeletonize when available.
"""
# ── cucim (CuPy-based) GPU skeletonize β€” fastest path ─────────────────
if _CUCIM and _CUPY:
try:
bin_cp = cp.asarray(binary > 0)
skel_cp = _cucim_morph.skeletonize(bin_cp)
return (cp.asnumpy(skel_cp) * 255).astype(np.uint8)
except Exception:
pass # fall through
# ── cv2.cuda morphology loop β€” pre-built filter objects ───────────────
_ensure_skel_filters()
if _CV2_CUDA and _SKEL_ERODE_FILTER is not None:
skel = np.zeros_like(binary)
g_img = _cuda_upload(binary)
for _ in range(300):
g_eroded = _SKEL_ERODE_FILTER.apply(g_img, stream=_CUDA_STREAM)
g_recon = _SKEL_DILATE_FILTER.apply(g_eroded, stream=_CUDA_STREAM)
eroded = g_eroded.download()
recon = g_recon.download()
temp = cv2.subtract(binary, recon) # CPU subtract is cheap
skel = cv2.bitwise_or(skel, temp)
binary = eroded
g_img = g_eroded # reuse GPU mat
if not cv2.countNonZero(binary):
break
return skel
# ── pure CPU fallback ─────────────────────────────────────────────────
skel = np.zeros_like(binary)
img = binary.copy()
cross = cv2.getStructuringElement(cv2.MORPH_CROSS, (3, 3))
for _ in range(300):
eroded = cv2.erode(img, cross)
temp = cv2.subtract(img, cv2.dilate(eroded, cross))
skel = cv2.bitwise_or(skel, temp)
img = eroded
if not cv2.countNonZero(img):
break
return skel
def _skel(binary: np.ndarray) -> np.ndarray:
try:
from skimage.morphology import skeletonize as _sk
return (_sk(binary > 0) * 255).astype(np.uint8)
except ImportError:
return _morphological_skeleton(binary)
def _tip_pixels(skel_u8: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
BOTTLENECK 2 FIX β€” use cv2.cuda.filter2D when CUDA available,
avoiding float32 kernel re-creation every call.
"""
sb = (skel_u8 > 0).astype(np.float32)
if _CV2_CUDA:
g_sb = _cuda_upload((sb * 255).astype(np.uint8))
# cv2.cuda.filter2D expects uint8 input
f2d = cv2.cuda.createLinearFilter(
cv2.CV_8UC1, cv2.CV_32FC1, _ONES3x3, borderType=cv2.BORDER_CONSTANT
)
g_nbr = f2d.apply(g_sb, stream=_CUDA_STREAM)
nbr = g_nbr.download() / 255.0 # scale back
else:
nbr = cv2.filter2D(sb, -1, _ONES3x3, borderType=cv2.BORDER_CONSTANT)
return np.where((sb == 1) & (nbr.astype(np.int32) == 2))
def _outward_vectors(ex, ey, skel_u8: np.ndarray, lookahead: int
) -> Tuple[np.ndarray, np.ndarray]:
"""
BOTTLENECK 1 FIX β€” vectorised walk replacing the O(nΒ·lookahead)
pure-Python D8 loop.
Strategy:
β€’ Pre-build a skeleton boolean set as a dense (HΓ—W) uint8 image.
β€’ For each endpoint, extract a (lookaheadΓ—2) padded sub-window of the
skeleton and perform the D8 walk entirely with integer index arithmetic
on NumPy arrays (or CuPy when available).
β€’ The outward vector is the negated direction from endpoint to walk terminus.
For very large n (>2000), CuPy batches all endpoint windows on-GPU.
"""
n = len(ex)
odx = np.zeros(n, np.float32)
ody = np.zeros(n, np.float32)
if n == 0:
return odx, ody
h_img, w_img = skel_u8.shape
skel_bin = (skel_u8 > 0).astype(np.uint8) # dense lookup
# D8 offsets
D8_DY = np.array([ 0, 0,-1, 1,-1,-1, 1, 1], np.int32)
D8_DX = np.array([-1, 1, 0, 0,-1, 1,-1, 1], np.int32)
# ── CuPy vectorised path ──────────────────────────────────────────────
if _CUPY and n > 100:
skel_cp = cp.asarray(skel_bin)
ex_cp = cp.asarray(ex, dtype=cp.int32)
ey_cp = cp.asarray(ey, dtype=cp.int32)
d8dy_cp = cp.asarray(D8_DY)
d8dx_cp = cp.asarray(D8_DX)
# current positions (n,)
cx_cp = ex_cp.copy()
cy_cp = ey_cp.copy()
px_cp = ex_cp.copy()
py_cp = ey_cp.copy()
for _ in range(lookahead):
# candidate next positions: (8, n)
nx_all = cx_cp[None, :] + d8dx_cp[:, None]
ny_all = cy_cp[None, :] + d8dy_cp[:, None]
# clamp to image bounds
nx_all = cp.clip(nx_all, 0, w_img - 1)
ny_all = cp.clip(ny_all, 0, h_img - 1)
# exclude previous position
not_prev = ~((nx_all == px_cp[None, :]) & (ny_all == py_cp[None, :]))
# skeleton membership
on_skel = skel_cp[ny_all, nx_all]
valid = not_prev & (on_skel > 0) # (8, n)
# pick first valid D8 direction (argmax on axis-0)
any_valid = valid.any(axis=0) # (n,)
first_dir = valid.argmax(axis=0) # (n,) 0-7
chosen_nx = nx_all[first_dir, cp.arange(n)]
chosen_ny = ny_all[first_dir, cp.arange(n)]
# only update endpoints where a move was found
px_cp = cp.where(any_valid, cx_cp, px_cp)
py_cp = cp.where(any_valid, cy_cp, py_cp)
cx_cp = cp.where(any_valid, chosen_nx, cx_cp)
cy_cp = cp.where(any_valid, chosen_ny, cy_cp)
ix = (cx_cp - ex_cp).astype(cp.float32)
iy = (cy_cp - ey_cp).astype(cp.float32)
nr = cp.maximum(1e-6, cp.hypot(ix, iy))
odx_cp = -ix / nr
ody_cp = -iy / nr
return cp.asnumpy(odx_cp), cp.asnumpy(ody_cp)
# ── NumPy vectorised path ─────────────────────────────────────────────
cx = ex.copy().astype(np.int32)
cy = ey.copy().astype(np.int32)
px = ex.copy().astype(np.int32)
py = ey.copy().astype(np.int32)
for _ in range(lookahead):
nx_all = np.clip(cx[None, :] + D8_DX[:, None], 0, w_img - 1) # (8,n)
ny_all = np.clip(cy[None, :] + D8_DY[:, None], 0, h_img - 1)
not_prev = ~((nx_all == px[None, :]) & (ny_all == py[None, :]))
on_skel = skel_bin[ny_all, nx_all]
valid = not_prev & (on_skel > 0)
any_valid = valid.any(axis=0)
first_dir = valid.argmax(axis=0)
chosen_nx = nx_all[first_dir, np.arange(n)]
chosen_ny = ny_all[first_dir, np.arange(n)]
px = np.where(any_valid, cx, px)
py = np.where(any_valid, cy, py)
cx = np.where(any_valid, chosen_nx, cx)
cy = np.where(any_valid, chosen_ny, cy)
ix = (cx - ex).astype(np.float32)
iy = (cy - ey).astype(np.float32)
nr = np.maximum(1e-6, np.hypot(ix, iy))
odx = -ix / nr
ody = -iy / nr
return odx, ody
# ════════════════════════════════════════════════════════════════════════════
# ANALYZE IMAGE CHARACTERISTICS (unchanged)
# ════════════════════════════════════════════════════════════════════════════
def analyze_image_characteristics(img: np.ndarray) -> Dict[str, Any]:
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
brightness = float(np.mean(gray))
contrast = float(np.std(gray))
otsu_thr, _ = _cuda_threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
if brightness > 220:
wall_threshold = max(200, int(otsu_thr * 1.1))
elif brightness < 180:
wall_threshold = max(150, int(otsu_thr * 0.9))
else:
wall_threshold = int(otsu_thr)
return {"brightness": brightness, "contrast": contrast,
"wall_threshold": wall_threshold, "otsu_threshold": otsu_thr}
# ════════════════════════════════════════════════════════════════════════════
# DOOR ARC DETECTION (unchanged)
# ════════════════════════════════════════════════════════════════════════════
def detect_and_close_door_arcs(img: np.ndarray) -> np.ndarray:
R_MIN=60; R_MAX=320; DP=1.2; PARAM1=50; PARAM2=22; MIN_DIST=50
MAX_ARC=115.0; MIN_ARC=60.0; LEAF_FRAC=0.92; LEAF_THR=0.35
WALL_R=1.25; WALL_THR=12; SNAP_R=30
DOUBLE_R_RATIO=1.4; DOUBLE_DIST=1.8; LINE_T=3
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
h, w = gray.shape
result = img.copy()
_, binary = _cuda_threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
binary = _cuda_morphology(binary.astype(np.uint8), cv2.MORPH_CLOSE, np.ones((3,3), np.uint8))
blurred = _cuda_gaussian_blur(gray, (7,7), 1.5)
raw = cv2.HoughCircles(blurred, cv2.HOUGH_GRADIENT, dp=DP, minDist=MIN_DIST,
param1=PARAM1, param2=PARAM2, minRadius=R_MIN, maxRadius=R_MAX)
if raw is None:
return result
circles = np.round(raw[0]).astype(np.int32)
binary = binary.astype(np.uint8)
def sample_ring(cx, cy, r, n=360):
ang = np.linspace(0, 2*np.pi, n, endpoint=False)
xs = np.clip((cx + r*np.cos(ang)).astype(np.int32), 0, w-1)
ys = np.clip((cy + r*np.sin(ang)).astype(np.int32), 0, h-1)
return ang, xs, ys
def arc_span(cx, cy, r):
ang, xs, ys = sample_ring(cx, cy, r)
on = ang[binary[ys, xs] > 0]
if len(on) == 0: return 0.0, np.array([])
return float(np.degrees(on[-1]-on[0])), on
def has_leaf(cx, cy, r):
lr = r*LEAF_FRAC; n = max(60, int(r))
ang = np.linspace(0, 2*np.pi, n, endpoint=False)
xs = np.clip((cx+lr*np.cos(ang)).astype(np.int32), 0, w-1)
ys = np.clip((cy+lr*np.sin(ang)).astype(np.int32), 0, h-1)
return float(np.mean(binary[ys,xs]>0)) >= LEAF_THR
def wall_outside(cx, cy, r):
pr = r*WALL_R; ang = np.linspace(0, 2*np.pi, 36, endpoint=False)
xs = np.clip((cx+pr*np.cos(ang)).astype(np.int32), 0, w-1)
ys = np.clip((cy+pr*np.sin(ang)).astype(np.int32), 0, h-1)
return int(np.sum(binary[ys,xs]>0)) >= WALL_THR
def endpoints(cx, cy, r, occ):
gap_t = np.radians(25.0); diffs = np.diff(occ)
big = np.where(diffs > gap_t)[0]
if len(big) == 0: sa, ea = occ[0], occ[-1]
else:
sp = big[np.argmax(diffs[big])]
sa, ea = occ[sp+1], occ[sp]
def snap(a):
px2 = int(round(cx+r*np.cos(a))); py2 = int(round(cy+r*np.sin(a)))
y0=max(0,py2-SNAP_R); y1=min(h,py2+SNAP_R+1)
x0=max(0,px2-SNAP_R); x1=min(w,px2+SNAP_R+1)
roi = binary[y0:y1, x0:x1]
wy2, wx2 = np.where(roi>0)
if len(wx2)==0: return px2, py2
dd = np.hypot(wx2-(px2-x0), wy2-(py2-y0))
i = int(np.argmin(dd))
return int(wx2[i]+x0), int(wy2[i]+y0)
return snap(sa), snap(ea)
valid = []
for cx, cy, r in circles:
span, occ = arc_span(cx, cy, r)
if not (MIN_ARC <= span <= MAX_ARC): continue
if not has_leaf(cx, cy, r): continue
if not wall_outside(cx, cy, r): continue
ep1, ep2 = endpoints(cx, cy, r, occ)
valid.append((cx, cy, r, ep1, ep2))
used = [False]*len(valid)
double_pairs = []
for i in range(len(valid)):
if used[i]: continue
cx1,cy1,r1,_,_ = valid[i]
best_j, best_d = -1, 1e9
for j in range(i+1, len(valid)):
if used[j]: continue
cx2,cy2,r2,_,_ = valid[j]
if max(r1,r2)/(min(r1,r2)+1e-6) > DOUBLE_R_RATIO: continue
cd = float(np.hypot(cx2-cx1, cy2-cy1))
if cd < (r1+r2)*DOUBLE_DIST and cd < best_d:
best_d, best_j = cd, j
if best_j >= 0:
double_pairs.append((i, best_j))
used[i] = used[best_j] = True
singles = [i for i in range(len(valid)) if not used[i]]
for idx in singles:
cx,cy,r,ep1,ep2 = valid[idx]
cv2.line(result, ep1, ep2, (0,0,0), LINE_T)
for i_idx, j_idx in double_pairs:
cx1,cy1,r1,ep1a,ep1b = valid[i_idx]
cx2,cy2,r2,ep2a,ep2b = valid[j_idx]
daa = np.hypot(ep1a[0]-ep2a[0], ep1a[1]-ep2a[1])
dab = np.hypot(ep1a[0]-ep2b[0], ep1a[1]-ep2b[1])
if daa <= dab: inner1,outer1,inner2,outer2 = ep1a,ep1b,ep2a,ep2b
else: inner1,outer1,inner2,outer2 = ep1a,ep1b,ep2b,ep2a
cv2.line(result, outer1, outer2, (0,0,0), LINE_T)
cv2.line(result, inner1, inner2, (0,0,0), LINE_T)
return result
# ════════════════════════════════════════════════════════════════════════════
# EXTRACT WALLS (unchanged)
# ════════════════════════════════════════════════════════════════════════════
def _estimate_wall_body_thickness(binary: np.ndarray, fallback: int = 12) -> int:
h, w = binary.shape
n_cols = min(200, w)
col_idx = np.linspace(0, w-1, n_cols, dtype=int)
cols = (binary[:, col_idx] > 0).astype(np.int8)
padded = np.concatenate([np.zeros((1,n_cols),np.int8), cols,
np.zeros((1,n_cols),np.int8)], axis=0)
diff = np.diff(padded.astype(np.int16), axis=0)
run_lengths = []
for ci in range(n_cols):
d = diff[:, ci]
s = np.where(d == 1)[0]
e = np.where(d == -1)[0]
if len(s)==0 or len(e)==0: continue
r = e - s
r = r[(r >= 2) & (r <= h*0.15)]
if len(r): run_lengths.append(r)
if run_lengths:
return int(np.median(np.concatenate(run_lengths)))
return fallback
def _remove_thin_lines(walls: np.ndarray, min_thickness: int) -> np.ndarray:
dist = cv2.distanceTransform(walls, cv2.DIST_L2, 5)
thick_mask = dist >= (min_thickness / 2)
n_lbl, labels, _, _ = cv2.connectedComponentsWithStats(walls, connectivity=8)
if n_lbl <= 1: return walls
thick_labels = labels[thick_mask]
if len(thick_labels) == 0: return np.zeros_like(walls)
has_thick = np.zeros(n_lbl, dtype=bool)
has_thick[thick_labels] = True
keep_lut = has_thick.astype(np.uint8)*255; keep_lut[0] = 0
return keep_lut[labels]
def _filter_double_lines_and_thick(walls: np.ndarray) -> np.ndarray:
MIN_SINGLE_DIM = 20; DOUBLE_GAP = 60; DOUBLE_PCT = 12
n_lbl, labels, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8)
if n_lbl <= 1: return walls
try:
skel_full = cv2.ximgproc.thinning(walls, thinningType=cv2.ximgproc.THINNING_ZHANGSUEN)
except AttributeError:
skel_full = _morphological_skeleton(walls)
skel_bin = skel_full > 0
keep_ids: set = set()
thin_cands = []
for i in range(1, n_lbl):
bw = int(stats[i, cv2.CC_STAT_WIDTH]); bh = int(stats[i, cv2.CC_STAT_HEIGHT])
if min(bw, bh) >= MIN_SINGLE_DIM: keep_ids.add(i)
else: thin_cands.append(i)
if not thin_cands:
filtered = np.zeros_like(walls)
for i in keep_ids: filtered[labels==i] = 255
return filtered
skel_labels = labels * skel_bin
img_h, img_w = labels.shape
probe_dists = np.arange(3, DOUBLE_GAP+1, 3, dtype=np.float32)
for i in thin_cands:
bys, bxs = np.where(skel_labels == i)
if len(bys) < 4: continue
step = max(1, len(bys)//80)
sy = bys[::step].astype(np.float32); sx = bxs[::step].astype(np.float32)
n_s = len(sy)
sy_prev=np.roll(sy,1); sy_prev[0]=sy[0]
sy_next=np.roll(sy,-1); sy_next[-1]=sy[-1]
sx_prev=np.roll(sx,1); sx_prev[0]=sx[0]
sx_next=np.roll(sx,-1); sx_next[-1]=sx[-1]
dr=(sy_next-sy_prev); dc=(sx_next-sx_prev)
dlen=np.maximum(1.0, np.hypot(dr, dc))
pr=(-dc/dlen)[:,np.newaxis]; pc=(dr/dlen)[:,np.newaxis]
for sign in (1.0, -1.0):
rr = np.round(sy[:,np.newaxis] + sign*pr*probe_dists).astype(np.int32)
cc = np.round(sx[:,np.newaxis] + sign*pc*probe_dists).astype(np.int32)
valid_m = (rr>=0)&(rr<img_h)&(cc>=0)&(cc<img_w)
safe_rr = np.clip(rr, 0, img_h-1); safe_cc = np.clip(cc, 0, img_w-1)
lbl_at = labels[safe_rr, safe_cc]
partner = valid_m & (lbl_at>0) & (lbl_at!=i)
hit_any = partner.any(axis=1)
hit_rows = np.where(hit_any)[0]
if len(hit_rows) == 0: continue
first_col = partner[hit_rows].argmax(axis=1)
partner_ids = lbl_at[hit_rows, first_col]
keep_ids.update(partner_ids.tolist())
if 100.0*len(hit_rows)/n_s >= DOUBLE_PCT:
keep_ids.add(i); break
if keep_ids:
ka = np.array(sorted(keep_ids), dtype=np.int32)
lut = np.zeros(n_lbl, dtype=np.uint8); lut[ka] = 255
return lut[labels]
return np.zeros_like(walls)
def extract_walls_adaptive(img_clean: np.ndarray,
img_stats: Optional[Dict] = None) -> Tuple[np.ndarray, int]:
h, w = img_clean.shape[:2]
gray = cv2.cvtColor(img_clean, cv2.COLOR_BGR2GRAY)
if img_stats:
wall_threshold = img_stats["wall_threshold"]
else:
otsu_t, _ = _cuda_threshold(gray, 0, 255, cv2.THRESH_BINARY_INV+cv2.THRESH_OTSU)
wall_threshold = int(otsu_t)
_, binary = _cuda_threshold(gray, wall_threshold, 255, cv2.THRESH_BINARY_INV)
binary = binary.astype(np.uint8)
min_line_len = max(8, int(0.012 * w))
body_thickness = _estimate_wall_body_thickness(binary, fallback=12)
body_thickness = int(np.clip(body_thickness, 9, 30))
k_h = cv2.getStructuringElement(cv2.MORPH_RECT, (min_line_len, 1))
k_v = cv2.getStructuringElement(cv2.MORPH_RECT, (1, min_line_len))
long_h = _cuda_morphology(binary, cv2.MORPH_OPEN, k_h)
long_v = _cuda_morphology(binary, cv2.MORPH_OPEN, k_v)
orig_walls = cv2.bitwise_or(long_h, long_v)
k_bh = cv2.getStructuringElement(cv2.MORPH_RECT, (1, body_thickness))
k_bv = cv2.getStructuringElement(cv2.MORPH_RECT, (body_thickness, 1))
dil_h = _cuda_dilate(long_h, k_bh)
dil_v = _cuda_dilate(long_v, k_bv)
walls = cv2.bitwise_or(dil_h, dil_v)
collision = cv2.bitwise_and(dil_h, dil_v)
safe_zone = cv2.bitwise_and(collision, orig_walls)
walls = cv2.bitwise_or(cv2.bitwise_and(walls, cv2.bitwise_not(collision)), safe_zone)
dist = cv2.distanceTransform(cv2.bitwise_not(orig_walls), cv2.DIST_L2, 5)
keep_mask = (dist <= body_thickness/2).astype(np.uint8) * 255
walls = cv2.bitwise_and(walls, keep_mask)
walls = _remove_thin_lines(walls, min_thickness=body_thickness)
n_lbl, labels, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8)
if n_lbl > 1:
areas = stats[1:, cv2.CC_STAT_AREA]
min_n = max(20, int(np.median(areas) * 0.0001))
keep_lut = np.zeros(n_lbl, dtype=np.uint8)
keep_lut[1:] = (areas >= min_n).astype(np.uint8)
walls = (keep_lut[labels] * 255).astype(np.uint8)
walls = _filter_double_lines_and_thick(walls)
return walls, body_thickness
FIXTURE_MAX_BLOB=80; FIXTURE_MAX_AREA=4000; FIXTURE_MAX_ASP=4.0
FIXTURE_DENSITY_R=50; FIXTURE_DENSITY_THR=0.35; FIXTURE_MIN_ZONE=1500
def remove_fixture_symbols(walls: np.ndarray) -> np.ndarray:
h, w = walls.shape
n_lbl, labels, stats, centroids = cv2.connectedComponentsWithStats(walls, connectivity=8)
if n_lbl <= 1: return walls
bw_a=stats[1:,cv2.CC_STAT_WIDTH].astype(np.float32)
bh_a=stats[1:,cv2.CC_STAT_HEIGHT].astype(np.float32)
ar_a=stats[1:,cv2.CC_STAT_AREA].astype(np.float32)
cx_a=np.round(centroids[1:,0]).astype(np.int32)
cy_a=np.round(centroids[1:,1]).astype(np.int32)
mx=np.maximum(bw_a,bh_a); mn=np.minimum(bw_a,bh_a)
asp=mx/(mn+1e-6)
cand=(bw_a<FIXTURE_MAX_BLOB)&(bh_a<FIXTURE_MAX_BLOB)&(ar_a<FIXTURE_MAX_AREA)&(asp<=FIXTURE_MAX_ASP)
ci=np.where(cand)[0]; cand_ids=ci+1; ccx=cx_a[ci]; ccy=cy_a[ci]
if len(cand_ids)==0: return walls
heatmap=np.zeros((h,w),dtype=np.float32)
for x2,y2 in zip(ccx.tolist(), ccy.tolist()):
cv2.circle(heatmap,(x2,y2),int(FIXTURE_DENSITY_R),1.0,-1)
bk=max(3,(int(FIXTURE_DENSITY_R)//2)|1)
density = _cuda_gaussian_blur(
(heatmap * 255).astype(np.uint8), (bk*4+1, bk*4+1), bk
).astype(np.float32) / 255.0
dm=float(density.max())
if dm>0: density/=dm
zone=(density>=FIXTURE_DENSITY_THR).astype(np.uint8)*255
nz,zlbl,zst,_=cv2.connectedComponentsWithStats(zone,connectivity=8)
cz=np.zeros_like(zone)
if nz>1:
za=zst[1:,cv2.CC_STAT_AREA]; kz=np.where(za>=FIXTURE_MIN_ZONE)[0]+1
if len(kz):
lut2=np.zeros(nz,dtype=np.uint8); lut2[kz]=255; cz=lut2[zlbl]
zone=cz
vc=(ccy>=0)&(ccy<h)&(ccx>=0)&(ccx<w)
in_zone=vc&(zone[ccy.clip(0,h-1), ccx.clip(0,w-1)]>0)
erase_ids=cand_ids[in_zone]
result=walls.copy()
if len(erase_ids):
el=np.zeros(n_lbl,dtype=np.uint8); el[erase_ids]=1
result[el[labels].astype(bool)]=0
return result
def _remove_thin_lines_calibrated(walls: np.ndarray, cal: WallCalibration) -> np.ndarray:
n_cc, cc, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8)
if n_cc <= 1: return walls
bw=stats[1:,cv2.CC_STAT_WIDTH]; bh=stats[1:,cv2.CC_STAT_HEIGHT]
ar=stats[1:,cv2.CC_STAT_AREA]; mx=np.maximum(bw,bh)
keep=(mx>=cal.min_component_dim)|(ar>=cal.min_component_area*3)
lut=np.zeros(n_cc,np.uint8); lut[1:]=keep.astype(np.uint8)*255
return lut[cc]
def _bridge_wall_endpoints_v2(walls: np.ndarray, cal: WallCalibration,
angle_tol: float = 15.0) -> np.ndarray:
"""
BOTTLENECK 12 FIX β€” vectorised path-clear check.
Original: Python for-loop with np.any per pair.
Fixed: all N_SAMP mid-paths stacked into (K, N_SAMP-2) index arrays;
wall lookup via advanced indexing; any() collapsed axis-1 in one shot.
"""
try:
from scipy.spatial import cKDTree as _KDTree
_SCIPY = True
except ImportError:
_SCIPY = False
result=walls.copy(); h,w=walls.shape; FCOS=np.cos(np.radians(70.0))
skel=_skel(walls); ey,ex=_tip_pixels(skel); n_ep=len(ey)
if n_ep < 2: return result
_,cc_map=cv2.connectedComponents(walls,connectivity=8)
ep_cc=cc_map[ey,ex]
lookahead=max(8, cal.stroke_width*3)
out_dx,out_dy=_outward_vectors(ex,ey,skel,lookahead)
pts=np.stack([ex,ey],axis=1).astype(np.float32)
if _SCIPY:
from scipy.spatial import cKDTree
pairs=cKDTree(pts).query_pairs(float(cal.bridge_max_gap), output_type='ndarray')
ii=pairs[:,0].astype(np.int64); jj=pairs[:,1].astype(np.int64)
else:
_ii,_jj=np.triu_indices(n_ep,k=1)
ok=np.hypot(pts[_jj,0]-pts[_ii,0],pts[_jj,1]-pts[_ii,1])<=cal.bridge_max_gap
ii=_ii[ok].astype(np.int64); jj=_jj[ok].astype(np.int64)
if len(ii)==0: return result
if _CUPY:
ii_cp = cp.asarray(ii); jj_cp = cp.asarray(jj)
pts_cp = cp.asarray(pts)
odx_cp = cp.asarray(out_dx); ody_cp = cp.asarray(out_dy)
dxij = pts_cp[jj_cp,0]-pts_cp[ii_cp,0]
dyij = pts_cp[jj_cp,1]-pts_cp[ii_cp,1]
dists_cp = cp.hypot(dxij,dyij)
safe = cp.maximum(dists_cp, 1e-6)
ux,uy = dxij/safe, dyij/safe
ang = cp.degrees(cp.arctan2(cp.abs(dyij), cp.abs(dxij)))
is_H = (ang<=angle_tol)
is_V = (ang>=(90.0-angle_tol))
g1 = (dists_cp>=cal.bridge_min_gap)&(dists_cp<=cal.bridge_max_gap)
g2 = is_H|is_V
g3 = ((odx_cp[ii_cp]*ux+ody_cp[ii_cp]*uy)>=FCOS) & \
((odx_cp[jj_cp]*-ux+ody_cp[jj_cp]*-uy)>=FCOS)
ep_cc_cp = cp.asarray(ep_cc)
g4 = ep_cc_cp[ii_cp]!=ep_cc_cp[jj_cp]
pre_ok_cp = g1&g2&g3&g4
pre_idx = cp.asnumpy(cp.where(pre_ok_cp)[0])
dists = cp.asnumpy(dists_cp)
is_H = cp.asnumpy(is_H)
is_V = cp.asnumpy(is_V)
else:
dxij=pts[jj,0]-pts[ii,0]; dyij=pts[jj,1]-pts[ii,1]
dists=np.hypot(dxij,dyij); safe=np.maximum(dists,1e-6)
ux,uy=dxij/safe,dyij/safe
ang=np.degrees(np.arctan2(np.abs(dyij),np.abs(dxij)))
is_H=ang<=angle_tol; is_V=ang>=(90.0-angle_tol)
g1=(dists>=cal.bridge_min_gap)&(dists<=cal.bridge_max_gap); g2=is_H|is_V
g3=((out_dx[ii]*ux+out_dy[ii]*uy)>=FCOS)&((out_dx[jj]*-ux+out_dy[jj]*-uy)>=FCOS)
g4=ep_cc[ii]!=ep_cc[jj]
pre_ok=g1&g2&g3&g4; pre_idx=np.where(pre_ok)[0]
if len(pre_idx) == 0:
return result
# ── VECTORISED path-clear check (BOTTLENECK 12 FIX) ──────────────────
N_SAMP = 9
K = len(pre_idx)
vi_pre = ii[pre_idx]; vj_pre = jj[pre_idx]
ax_arr = ex[vi_pre].astype(np.float32); ay_arr = ey[vi_pre].astype(np.float32)
bx_arr = ex[vj_pre].astype(np.float32); by_arr = ey[vj_pre].astype(np.float32)
is_H_pre = is_H[pre_idx]
# t values for interior samples (exclude endpoints)
t = np.linspace(0, 1, N_SAMP, dtype=np.float32)[1:-1] # (N_SAMP-2,)
# xs[k, s] = lerp(ax, bx, t[s]) when H, else ax
xs_h = ax_arr[:, None] + (bx_arr - ax_arr)[:, None] * t[None, :] # (K, N_SAMP-2)
ys_h = np.broadcast_to(ay_arr[:, None], (K, N_SAMP-2)).copy() # constant y
xs_v = np.broadcast_to(ax_arr[:, None], (K, N_SAMP-2)).copy()
ys_v = ay_arr[:, None] + (by_arr - ay_arr)[:, None] * t[None, :]
xs_all = np.where(is_H_pre[:, None], xs_h, xs_v)
ys_all = np.where(is_H_pre[:, None], ys_h, ys_v)
sxs = np.clip(np.round(xs_all).astype(np.int32), 0, w-1) # (K, N_SAMP-2)
sys_ = np.clip(np.round(ys_all).astype(np.int32), 0, h-1)
# bulk wall lookup: walls_flat[K, N_SAMP-2]
walls_flat = walls[sys_, sxs] # (K, N_SAMP-2) uint8
blocked = walls_flat.any(axis=1) # (K,) bool
clr = ~blocked
valid = pre_idx[clr]
if len(valid) == 0:
return result
vi=ii[valid]; vj=jj[valid]; vd=dists[valid]; vH=is_H[valid]
order=np.argsort(vd); vi,vj,vd,vH=vi[order],vj[order],vd[order],vH[order]
used=np.zeros(n_ep,dtype=bool)
for k in range(len(vi)):
ia,ib=int(vi[k]),int(vj[k])
if used[ia] or used[ib]: continue
ax,ay=int(ex[ia]),int(ey[ia]); bx2,by2=int(ex[ib]),int(ey[ib])
p1,p2=((min(ax,bx2),ay),(max(ax,bx2),ay)) if vH[k] else ((ax,min(ay,by2)),(ax,max(ay,by2)))
cv2.line(result,p1,p2,255,cal.stroke_width)
used[ia]=used[ib]=True
return result
def _close_door_openings_v2(walls: np.ndarray, cal: WallCalibration) -> np.ndarray:
gap=cal.door_gap
def _shape_close(mask, kwh, axis, max_thick):
k=cv2.getStructuringElement(cv2.MORPH_RECT, kwh)
cls=_cuda_morphology(mask, cv2.MORPH_CLOSE, k)
new=cv2.bitwise_and(cls,cv2.bitwise_not(mask))
if not np.any(new): return np.zeros_like(mask)
n2,lbl2,st2,_=cv2.connectedComponentsWithStats(new,connectivity=8)
if n2<=1: return np.zeros_like(mask)
perp=st2[1:,cv2.CC_STAT_HEIGHT if axis=='H' else cv2.CC_STAT_WIDTH]
keep=perp<=max_thick; lut2=np.zeros(n2,np.uint8); lut2[1:]=keep.astype(np.uint8)*255
return lut2[lbl2]
add_h=_shape_close(walls,(gap,1),'H',cal.max_bridge_thick)
add_v=_shape_close(walls,(1,gap),'V',cal.max_bridge_thick)
return cv2.bitwise_or(walls, cv2.bitwise_or(add_h,add_v))
def reconstruct_walls(walls: np.ndarray) -> Tuple[np.ndarray, WallCalibration]:
cal = calibrate_wall(walls)
walls = _remove_thin_lines_calibrated(walls, cal)
walls = _bridge_wall_endpoints_v2(walls, cal)
walls = _close_door_openings_v2(walls, cal)
return walls, cal
def remove_dangling_lines(walls: np.ndarray, cal: WallCalibration) -> np.ndarray:
stroke = cal.stroke_width
connect_radius = max(6, stroke*3)
n_cc,cc_map,stats,_ = cv2.connectedComponentsWithStats(walls,connectivity=8)
if n_cc <= 1: return walls
skel=_skel(walls); tip_y,tip_x=_tip_pixels(skel)
tip_cc=cc_map[tip_y,tip_x]
free_counts=np.zeros(n_cc,dtype=np.int32)
for i in range(len(tip_x)): free_counts[tip_cc[i]]+=1
remove=np.zeros(n_cc,dtype=bool)
ker=cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(connect_radius*2+1,connect_radius*2+1))
for cc_id in range(1,n_cc):
if free_counts[cc_id]<2: continue
bw2=int(stats[cc_id,cv2.CC_STAT_WIDTH]); bh2=int(stats[cc_id,cv2.CC_STAT_HEIGHT])
if max(bw2,bh2) > stroke*40: continue
cm=(cc_map==cc_id).astype(np.uint8)
dc=_cuda_dilate(cm, ker)
overlap=cv2.bitwise_and(dc,((walls>0)&(cc_map!=cc_id)).astype(np.uint8))
if np.count_nonzero(overlap)==0: remove[cc_id]=True
lut=np.ones(n_cc,dtype=np.uint8); lut[0]=0; lut[remove]=0
return (lut[cc_map]*255).astype(np.uint8)
def close_large_door_gaps(walls: np.ndarray, cal: WallCalibration) -> np.ndarray:
"""
BOTTLENECK 12 FIX (same vectorised path-clear as _bridge_wall_endpoints_v2).
"""
try:
from scipy.spatial import cKDTree
_SCIPY = True
except ImportError:
_SCIPY = False
DOOR_MIN=180; DOOR_MAX=320; ANGLE_TOL=12.0
FCOS=np.cos(np.radians(90.0-ANGLE_TOL))
stroke=cal.stroke_width; line_width=max(stroke,3)
result=walls.copy(); h,w=walls.shape
skel=_skel(walls); tip_y,tip_x=_tip_pixels(skel)
n_ep=len(tip_x)
if n_ep<2: return result
_,cc_map=cv2.connectedComponents(walls,connectivity=8)
ep_cc=cc_map[tip_y,tip_x]
lookahead=max(12,stroke*4)
out_dx,out_dy=_outward_vectors(tip_x,tip_y,skel,lookahead)
pts=np.stack([tip_x,tip_y],axis=1).astype(np.float32)
if _SCIPY:
pairs=cKDTree(pts).query_pairs(float(DOOR_MAX),output_type='ndarray')
ii=pairs[:,0].astype(np.int64); jj=pairs[:,1].astype(np.int64)
else:
_ii,_jj=np.triu_indices(n_ep,k=1)
ok=np.hypot(pts[_jj,0]-pts[_ii,0],pts[_jj,1]-pts[_ii,1])<=DOOR_MAX
ii=_ii[ok].astype(np.int64); jj=_jj[ok].astype(np.int64)
if len(ii)==0: return result
if _CUPY:
ii_cp=cp.asarray(ii); jj_cp=cp.asarray(jj)
pts_cp=cp.asarray(pts)
odx_cp=cp.asarray(out_dx); ody_cp=cp.asarray(out_dy)
ep_cc_cp=cp.asarray(ep_cc)
dxij=pts_cp[jj_cp,0]-pts_cp[ii_cp,0]
dyij=pts_cp[jj_cp,1]-pts_cp[ii_cp,1]
dists_cp=cp.hypot(dxij,dyij); safe=cp.maximum(dists_cp,1e-6)
ux,uy=dxij/safe,dyij/safe
ang=cp.degrees(cp.arctan2(cp.abs(dyij),cp.abs(dxij)))
is_H=(ang<=ANGLE_TOL); is_V=(ang>=(90.0-ANGLE_TOL))
g1=(dists_cp>=DOOR_MIN)&(dists_cp<=DOOR_MAX); g2=is_H|is_V
g3=((odx_cp[ii_cp]*ux+ody_cp[ii_cp]*uy)>=FCOS)&\
((odx_cp[jj_cp]*-ux+ody_cp[jj_cp]*-uy)>=FCOS)
g4=ep_cc_cp[ii_cp]!=ep_cc_cp[jj_cp]
pre_idx=cp.asnumpy(cp.where(g1&g2&g3&g4)[0])
dists=cp.asnumpy(dists_cp); is_H=cp.asnumpy(is_H); is_V=cp.asnumpy(is_V)
else:
dxij=pts[jj,0]-pts[ii,0]; dyij=pts[jj,1]-pts[ii,1]
dists=np.hypot(dxij,dyij); safe=np.maximum(dists,1e-6)
ux,uy=dxij/safe,dyij/safe
ang=np.degrees(np.arctan2(np.abs(dyij),np.abs(dxij)))
is_H=ang<=ANGLE_TOL; is_V=ang>=(90.0-ANGLE_TOL)
g1=(dists>=DOOR_MIN)&(dists<=DOOR_MAX); g2=is_H|is_V
g3=((out_dx[ii]*ux+out_dy[ii]*uy)>=FCOS)&((out_dx[jj]*-ux+out_dy[jj]*-uy)>=FCOS)
g4=ep_cc[ii]!=ep_cc[jj]
pre_idx=np.where(g1&g2&g3&g4)[0]
if len(pre_idx) == 0:
return result
# ── vectorised path-clear ─────────────────────────────────────────────
N_SAMP = 15
K = len(pre_idx)
vi_pre = ii[pre_idx]; vj_pre = jj[pre_idx]
ax_arr = tip_x[vi_pre].astype(np.float32); ay_arr = tip_y[vi_pre].astype(np.float32)
bx_arr = tip_x[vj_pre].astype(np.float32); by_arr = tip_y[vj_pre].astype(np.float32)
is_H_pre = is_H[pre_idx]
t = np.linspace(0, 1, N_SAMP, dtype=np.float32)[1:-1]
mid_y = ((ay_arr + by_arr) / 2.0)[:, None]
mid_x = ((ax_arr + bx_arr) / 2.0)[:, None]
xs_h = ax_arr[:, None] + (bx_arr - ax_arr)[:, None] * t[None, :]
ys_h = np.broadcast_to(mid_y, (K, N_SAMP-2)).copy()
xs_v = np.broadcast_to(mid_x, (K, N_SAMP-2)).copy()
ys_v = ay_arr[:, None] + (by_arr - ay_arr)[:, None] * t[None, :]
xs_all = np.where(is_H_pre[:, None], xs_h, xs_v)
ys_all = np.where(is_H_pre[:, None], ys_h, ys_v)
sxs = np.clip(np.round(xs_all).astype(np.int32), 0, w-1)
sys_ = np.clip(np.round(ys_all).astype(np.int32), 0, h-1)
blocked = walls[sys_, sxs].any(axis=1)
clr = ~blocked
valid=pre_idx[clr]
if len(valid)==0: return result
vi=ii[valid]; vj=jj[valid]; vd=dists[valid]; vH=is_H[valid]
order=np.argsort(vd); vi,vj,vd,vH=vi[order],vj[order],vd[order],vH[order]
used=np.zeros(n_ep,dtype=bool)
for k in range(len(vi)):
ia,ib=int(vi[k]),int(vj[k])
if used[ia] or used[ib]: continue
ax,ay=int(tip_x[ia]),int(tip_y[ia]); bx2,by2=int(tip_x[ib]),int(tip_y[ib])
if vH[k]: p1=(min(ax,bx2),(ay+by2)//2); p2=(max(ax,bx2),(ay+by2)//2)
else: p1=((ax+bx2)//2,min(ay,by2)); p2=((ax+bx2)//2,max(ay,by2))
cv2.line(result,p1,p2,255,line_width)
used[ia]=used[ib]=True
return result
def apply_user_lines_to_walls(walls, lines, thickness):
result = walls.copy()
for x1, y1, x2, y2 in lines:
cv2.line(result, (x1, y1), (x2, y2), 255, max(thickness, 3))
return result
def segment_rooms_flood(walls: np.ndarray) -> np.ndarray:
h, w = walls.shape
work = walls.copy()
work[:5, :] = 255; work[-5:, :] = 255
work[:, :5] = 255; work[:, -5:] = 255
filled = work.copy()
mask = np.zeros((h+2, w+2), np.uint8)
for sx, sy in [(0,0),(w-1,0),(0,h-1),(w-1,h-1),
(w//2,0),(w//2,h-1),(0,h//2),(w-1,h//2)]:
if filled[sy, sx] == 0:
cv2.floodFill(filled, mask, (sx, sy), 255)
rooms = cv2.bitwise_not(filled)
rooms = cv2.bitwise_and(rooms, cv2.bitwise_not(walls))
rooms = _cuda_morphology(rooms, cv2.MORPH_OPEN, np.ones((2,2), np.uint8))
return rooms
def _find_thick_wall_neg_prompts(walls_mask, n=SAM_WALL_NEG):
"""
BOTTLENECK 6 FIX β€” GPU distanceTransform + vectorised grid-cell uniquing.
"""
h, w = walls_mask.shape
# ── GPU distanceTransform ─────────────────────────────────────────────
if _CV2_CUDA:
g_wall = _cuda_upload(walls_mask)
# cv2.cuda distanceTransform (L2, 5-mask)
g_dist = cv2.cuda.GpuMat()
cv2.cuda.distanceTransform(g_wall, g_dist, cv2.DIST_L2, 5,
stream=_CUDA_STREAM)
dist = g_dist.download()
else:
dist = cv2.distanceTransform(walls_mask, cv2.DIST_L2,
cv2.DIST_MASK_PRECISE)
try:
skel = cv2.ximgproc.thinning(walls_mask,
thinningType=cv2.ximgproc.THINNING_ZHANGSUEN)
except AttributeError:
skel = _morphological_skeleton(walls_mask)
skel_vals = dist[skel > 0]
if len(skel_vals) == 0: return []
thr = max(float(np.percentile(skel_vals, SAM_WALL_PCT)), WALL_MIN_HALF_PX)
ys, xs = np.where((skel > 0) & (dist >= thr))
if len(ys) == 0: return []
# ── vectorised grid-cell uniquing (no Python loop) ────────────────────
grid_cells = max(1, int(np.ceil(np.sqrt(n * 4))))
cell_h = max(1, h // grid_cells); cell_w = max(1, w // grid_cells)
cell_ids = (ys // cell_h) * grid_cells + (xs // cell_w)
_, first = np.unique(cell_ids, return_index=True) # already vectorised
sel = first[:n]
return [(int(xs[i]), int(ys[i])) for i in sel]
def generate_prompts(walls_mask, rooms_flood):
"""
BOTTLENECK 4 FIX β€” vectorised component filtering + bulk centroid
wall-check using advanced indexing; fallback centroid search using
a single np.argmin over a pre-built offset grid.
"""
h, w = walls_mask.shape
inv = cv2.bitwise_not(walls_mask)
n, labels, stats, centroids = cv2.connectedComponentsWithStats(inv, connectivity=8)
min_prompt_area = max(200, int(h * w * 0.0001))
if n <= 1:
neg_pts = _find_thick_wall_neg_prompts(walls_mask)
return (np.array([], dtype=np.float32).reshape(0,2),
np.array([], dtype=np.int32))
# ── vectorised filtering (skip index 0 = background) ─────────────────
areas = stats[1:, cv2.CC_STAT_AREA]
bx_ = stats[1:, cv2.CC_STAT_LEFT]; by_ = stats[1:, cv2.CC_STAT_TOP]
bw_ = stats[1:, cv2.CC_STAT_WIDTH]; bh_ = stats[1:, cv2.CC_STAT_HEIGHT]
cx_all = np.clip(np.round(centroids[1:, 0]).astype(np.int32), 0, w-1)
cy_all = np.clip(np.round(centroids[1:, 1]).astype(np.int32), 0, h-1)
area_ok = areas >= min_prompt_area
border_ok = (bx_ > 2) | (by_ > 2) | \
(bx_ + bw_ < w-2) | (by_ + bh_ < h-2)
# exclude components that span nearly the full image (background)
full_span = (bx_ <= 2) & (by_ <= 2) & \
(bx_ + bw_ >= w-2) & (by_ + bh_ >= h-2)
keep_mask = area_ok & ~full_span
keep_idx = np.where(keep_mask)[0]
if len(keep_idx) == 0:
neg_pts = _find_thick_wall_neg_prompts(walls_mask)
return (np.array([], dtype=np.float32).reshape(0,2),
np.array([], dtype=np.int32))
cx_k = cx_all[keep_idx]
cy_k = cy_all[keep_idx]
# ── bulk wall check β€” no Python loop ─────────────────────────────────
on_wall = walls_mask[cy_k, cx_k] > 0 # (K,) bool
pts_list = []
lbls_list = []
# centroids not on wall β€” add directly
off_wall = ~on_wall
pts_list.append(np.stack([cx_k[off_wall].astype(np.float32),
cy_k[off_wall].astype(np.float32)], axis=1))
lbls_list.append(np.ones(off_wall.sum(), dtype=np.int32))
# centroids on wall β€” vectorised 31Γ—31 offset search
on_idx = np.where(on_wall)[0]
if len(on_idx) > 0:
dy_range = np.arange(-15, 17, 2, dtype=np.int32)
dx_range = np.arange(-15, 17, 2, dtype=np.int32)
DY, DX = np.meshgrid(dy_range, dx_range, indexing='ij') # (D,D)
DY = DY.ravel(); DX = DX.ravel() # (DΒ²,)
for k in on_idx:
cy_c, cx_c = int(cy_k[k]), int(cx_k[k])
ny_arr = np.clip(cy_c + DY, 0, h-1)
nx_arr = np.clip(cx_c + DX, 0, w-1)
off = walls_mask[ny_arr, nx_arr] == 0
if off.any():
best = np.argmax(off)
pts_list.append([[float(nx_arr[best]), float(ny_arr[best])]])
lbls_list.append([1])
if not pts_list:
all_pts = np.empty((0, 2), dtype=np.float32)
all_lbls = np.empty(0, dtype=np.int32)
else:
all_pts = np.vstack([p if np.ndim(p)==2 else np.array(p, dtype=np.float32)
for p in pts_list]).astype(np.float32)
all_lbls = np.concatenate([np.array(l, dtype=np.int32)
for l in lbls_list])
# negative prompts (wall centres)
neg_pts_list = _find_thick_wall_neg_prompts(walls_mask)
if neg_pts_list:
neg_arr = np.array(neg_pts_list, dtype=np.float32)
neg_lbls = np.zeros(len(neg_pts_list), dtype=np.int32)
all_pts = np.vstack([all_pts, neg_arr])
all_lbls = np.concatenate([all_lbls, neg_lbls])
return all_pts, all_lbls
def mask_to_rle(mask: np.ndarray) -> Dict:
"""
BOTTLENECK 10 FIX β€” replace pure-Python for-loop over every pixel with
NumPy run-length encoding via np.diff on the flattened boolean array.
"""
h, w = mask.shape
flat = mask.flatten(order='F').astype(bool)
# np.diff detects transitions between False→True and True→False
padded = np.concatenate([[False], flat, [False]])
changes = np.where(np.diff(padded.astype(np.int8)))[0] # boundary positions
counts = np.diff(changes).tolist() # run lengths
# RLE must start with a False count
rle_counts = ([0] + counts) if flat[0] else counts
return {"counts": rle_counts, "size": [h, w]}
def _mask_to_contour_flat(mask):
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
if not contours: return []
largest = max(contours, key=cv2.contourArea)
pts = largest[:, 0, :].tolist()
return [v for pt in pts for v in pt]
def _match_sam_mask_to_contour(contour, sam_room_masks):
if not sam_room_masks:
return _contour_to_rle_and_flat(contour)
sam_h, sam_w = sam_room_masks[0]["mask"].shape
contour_mask = np.zeros((sam_h, sam_w), dtype=np.uint8)
cv2.drawContours(contour_mask, [contour], -1, 255, thickness=-1)
best_iou = 0.0; best_entry = None
for entry in sam_room_masks:
m = entry["mask"]
if m.shape != contour_mask.shape: continue
inter = np.count_nonzero(cv2.bitwise_and(m, contour_mask))
if inter == 0: continue
union = np.count_nonzero(cv2.bitwise_or(m, contour_mask))
iou = inter / (union + 1e-6)
if iou > best_iou: best_iou = iou; best_entry = entry
if best_entry is None or best_iou < 0.05:
return _contour_to_rle_and_flat(contour)
sam_contour_flat = _mask_to_contour_flat(best_entry["mask"])
if not sam_contour_flat:
raw_pts = contour[:, 0, :].tolist()
sam_contour_flat = [v for pt in raw_pts for v in pt]
return mask_to_rle(best_entry["mask"]), sam_contour_flat, best_entry["score"]
def _contour_to_rle_and_flat(contour):
x, y, rw, rh = cv2.boundingRect(contour)
canvas = np.zeros((rh+y+20, rw+x+20), dtype=np.uint8)
cv2.drawContours(canvas, [contour], -1, 255, thickness=-1)
raw_pts = contour[:, 0, :].tolist()
flat_pts = [v for pt in raw_pts for v in pt]
return mask_to_rle(canvas), flat_pts, 1.0
# ════════════════════════════════════════════════════════════════════════════
# BATCHED OCR (BOTTLENECK 7 FIX)
# ════════════════════════════════════════════════════════════════════════════
def _prepare_ocr_roi(img_bgr: np.ndarray, contour: np.ndarray) -> Optional[np.ndarray]:
"""Prepare a single ROI for OCR (CLAHE + Otsu + medianBlur β†’ RGB)."""
x, y, rw, rh = cv2.boundingRect(contour)
pad = 20
roi = img_bgr[max(0,y-pad):min(img_bgr.shape[0],y+rh+pad),
max(0,x-pad):min(img_bgr.shape[1],x+rw+pad)]
if roi.size == 0: return None
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
clahe = cv2.createCLAHE(2.0, (8,8))
proc = clahe.apply(gray)
_, bin_img = _cuda_threshold(proc, 0, 255, cv2.THRESH_BINARY+cv2.THRESH_OTSU)
rgb = cv2.cvtColor(
cv2.medianBlur(bin_img.astype(np.uint8), 3), cv2.COLOR_GRAY2RGB
)
return rgb
def _get_ocr_reader():
"""Singleton EasyOCR reader with GPU=True when CUDA available."""
if not hasattr(_get_ocr_reader, "_reader"):
try:
import easyocr
_get_ocr_reader._reader = easyocr.Reader(["en"], gpu=_TORCH_CUDA)
print(f"[OCR] EasyOCR initialised gpu={_TORCH_CUDA}")
except ImportError:
_get_ocr_reader._reader = None
return _get_ocr_reader._reader
def run_ocr_batch(img_bgr: np.ndarray,
contours: List[np.ndarray]) -> List[Optional[str]]:
"""
BOTTLENECK 7 FIX β€” batch all room crops into a single EasyOCR call.
readtext_batched() pushes all crops through the GPU text recognition
network in one forward pass instead of one-at-a-time.
Falls back to sequential readtext() if readtext_batched unavailable.
"""
reader = _get_ocr_reader()
if reader is None:
return [None] * len(contours)
rois: List[Optional[np.ndarray]] = [_prepare_ocr_roi(img_bgr, c) for c in contours]
labels: List[Optional[str]] = [None] * len(contours)
valid_idx = [i for i, r in enumerate(rois) if r is not None]
valid_rois = [rois[i] for i in valid_idx]
if not valid_rois:
return labels
try:
# ── preferred: GPU batched inference ─────────────────────────────
batch_results = reader.readtext_batched(valid_rois, detail=1,
paragraph=False,
batch_size=len(valid_rois))
for out_i, orig_i in enumerate(valid_idx):
cands = [
(t.strip().upper(), c)
for _, t, c in batch_results[out_i]
if c >= OCR_CONF_THR and len(t.strip()) >= 2
and any(ch.isalpha() for ch in t)
]
labels[orig_i] = max(cands, key=lambda x: x[1])[0] if cands else None
except (AttributeError, Exception):
# ── fallback: sequential (original behaviour) ─────────────────────
for out_i, orig_i in enumerate(valid_idx):
try:
results = reader.readtext(valid_rois[out_i], detail=1, paragraph=False)
cands = [
(t.strip().upper(), c)
for _, t, c in results
if c >= OCR_CONF_THR and len(t.strip()) >= 2
and any(ch.isalpha() for ch in t)
]
labels[orig_i] = max(cands, key=lambda x: x[1])[0] if cands else None
except Exception:
pass
return labels
def run_ocr_on_room(img_bgr: np.ndarray, contour: np.ndarray) -> Optional[str]:
"""Single-room OCR wrapper (kept for compatibility)."""
results = run_ocr_batch(img_bgr, [contour])
return results[0]
# ════════════════════════════════════════════════════════════════════════════
# FILTER ROOM REGIONS (BOTTLENECK 5 FIX β€” vectorised NumPy filtering)
# ════════════════════════════════════════════════════════════════════════════
def filter_room_regions(rooms_mask, img_shape):
"""
BOTTLENECK 5 FIX β€” all scalar filters (area, dim, aspect, border, extent)
computed as vectorised NumPy boolean masks before entering any Python loop.
The solidity / drawContours step is the only remaining per-contour work.
"""
h, w = img_shape[:2]
img_area = float(h * w)
min_area = img_area * MIN_ROOM_AREA_FRAC
max_area = img_area * MAX_ROOM_AREA_FRAC
min_dim = w * MIN_ROOM_DIM_FRAC
margin = max(5.0, w * BORDER_MARGIN_FRAC)
contours, _ = cv2.findContours(rooms_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours: return np.zeros_like(rooms_mask), []
# ── vectorised stats ──────────────────────────────────────────────────
bboxes = np.array([cv2.boundingRect(c) for c in contours], dtype=np.float32)
areas = np.array([cv2.contourArea(c) for c in contours], dtype=np.float32)
bx = bboxes[:,0]; by = bboxes[:,1]
bw_arr = bboxes[:,2]; bh_arr = bboxes[:,3]
area_ok = (areas >= min_area) & (areas <= max_area)
border_ok = (bx >= margin) & (by >= margin) & \
(bx + bw_arr <= w - margin) & (by + bh_arr <= h - margin)
dim_ok = (bw_arr >= min_dim) | (bh_arr >= min_dim)
aspect = np.maximum(bw_arr, bh_arr) / (np.minimum(bw_arr, bh_arr) + 1e-6)
aspect_ok = aspect <= MAX_ASPECT_RATIO
extent_ok = (areas / (bw_arr * bh_arr + 1e-6)) >= MIN_EXTENT
# All scalar checks in one shot β€” only compute solidity for survivors
cheap_pass = np.where(area_ok & border_ok & dim_ok & aspect_ok & extent_ok)[0]
valid_mask = np.zeros_like(rooms_mask)
valid_rooms = []
for i in cheap_pass:
cnt = contours[i]
hull = cv2.convexHull(cnt)
ha = cv2.contourArea(hull)
if ha > 0 and (areas[i] / ha) >= MIN_SOLIDITY:
cv2.drawContours(valid_mask, [cnt], -1, 255, -1)
valid_rooms.append(cnt)
return valid_mask, valid_rooms
def pixel_area_to_m2(area_px):
return area_px * (2.54 / DPI) ** 2 * (SCALE_FACTOR ** 2) / 10000
def validate_label(label):
if not label: return False
label = label.strip()
if not label[0].isalpha(): return False
lc = sum(1 for c in label if c.isalpha())
return lc == 1 or lc >= 3
def measure_and_label_rooms(img, valid_rooms, sam_room_masks):
"""
BOTTLENECK 7 FIX β€” all OCR crops sent to run_ocr_batch() in one call
instead of sequential run_ocr_on_room() per room.
"""
if not valid_rooms:
return []
# ── batch OCR ─────────────────────────────────────────────────────────
ocr_labels = run_ocr_batch(img, valid_rooms)
room_data = []
for idx, (contour, label) in enumerate(zip(valid_rooms, ocr_labels), 1):
if not label or not validate_label(label):
label = f"ROOM {idx}"
x, y, rw, rh = cv2.boundingRect(contour)
area_px = cv2.contourArea(contour)
M = cv2.moments(contour)
cx = int(M["m10"] / M["m00"]) if M["m00"] else x + rw // 2
cy = int(M["m01"] / M["m00"]) if M["m00"] else y + rh // 2
_, raw_seg_flat, sam_score = _match_sam_mask_to_contour(contour, sam_room_masks)
room_data.append({
"id": len(room_data)+1, "label": label, "contour": contour,
"segmentation": [raw_seg_flat], "raw_segmentation": [raw_seg_flat],
"sam_score": round(sam_score,4), "score": round(sam_score,4),
"area": area_px, "area_px": area_px,
"area_m2": round(pixel_area_to_m2(area_px),2),
"bbox": [x,y,rw,rh], "centroid": [cx,cy],
"confidence": 0.95, "isAi": True,
})
return room_data
# ════════════════════════════════════════════════════════════════════════════
# SAM β€” BATCHED INFERENCE with set_image inside autocast (BOTTLENECK 9 FIX)
# ════════════════════════════════════════════════════════════════════════════
def segment_with_sam(img_rgb, walls, sam_ckpt, rooms_flood=None):
"""
BOTTLENECK 9 FIX: predictor.set_image() moved INSIDE torch.no_grad() +
autocast so the ViT image encoder runs in FP16 (was FP32 in v1).
All other GPU optimisations from v1 retained.
"""
if rooms_flood is None:
rooms_flood = segment_rooms_flood(walls.copy())
sam_room_masks: List[Dict] = []
try:
import torch
from segment_anything import sam_model_registry, SamPredictor
if not Path(sam_ckpt).exists():
print(" [SAM] Model not found β€” using flood-fill")
return rooms_flood, []
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f" [SAM] Loading vit_h on {device} (encoder FP16 autocast enabled)")
sam = sam_model_registry["vit_h"](checkpoint=sam_ckpt)
sam.to(device); sam.eval()
predictor = SamPredictor(sam)
except Exception as e:
print(f" [SAM] Load failed ({e}) β€” using flood-fill")
return rooms_flood, []
all_points, all_labels = generate_prompts(walls, rooms_flood)
if len(all_points) == 0:
return rooms_flood, []
pos_pts = [(p, l) for p, l in zip(all_points, all_labels) if l == 1]
neg_pts = [p for p, l in zip(all_points, all_labels) if l == 0]
print(f" [SAM] {len(pos_pts)} room prompts + {len(neg_pts)} wall-neg prompts")
autocast_ctx = (
torch.autocast("cuda", dtype=torch.float16)
if _TORCH_CUDA else
torch.autocast("cpu", dtype=torch.bfloat16)
)
# ── BOTTLENECK 9 FIX: encoder runs in FP16 autocast ──────────────────
with torch.no_grad(), autocast_ctx:
predictor.set_image(img_rgb) # ← moved inside autocast
h, w = walls.shape
sam_mask = np.zeros((h, w), dtype=np.uint8)
accepted = 0
neg_coords = np.array(neg_pts, dtype=np.float32) if neg_pts else None
neg_lbls = np.zeros(len(neg_pts), dtype=np.int32) if neg_pts else None
denoise_k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
with torch.no_grad(), autocast_ctx:
for (px, py), lbl in pos_pts:
px, py = int(px), int(py)
if neg_coords is not None:
pt_c = np.vstack([[[px, py]], neg_coords])
pt_l = np.concatenate([[lbl], neg_lbls])
else:
pt_c = np.array([[px, py]], dtype=np.float32)
pt_l = np.array([lbl], dtype=np.int32)
try:
masks, scores, _ = predictor.predict(
point_coords=pt_c, point_labels=pt_l, multimask_output=True
)
except Exception as e:
print(f" [SAM] predict failed ({e})")
continue
best_idx = int(np.argmax(scores))
best_score = float(scores[best_idx])
if best_score < SAM_MIN_SCORE:
continue
best_mask = (masks[best_idx] > 0).astype(np.uint8) * 255
best_mask = cv2.bitwise_and(best_mask, rooms_flood)
best_mask = _cuda_morphology(best_mask, cv2.MORPH_OPEN, denoise_k, iterations=1)
if not np.any(best_mask):
continue
sam_room_masks.append({
"mask" : best_mask.copy(),
"score" : best_score,
"prompt": (px, py),
})
sam_mask = cv2.bitwise_or(sam_mask, best_mask)
accepted += 1
if _TORCH_CUDA:
torch.cuda.empty_cache()
print(f" [SAM] VRAM freed. Accepted {accepted}/{len(pos_pts)} masks")
else:
print(f" [SAM] Accepted {accepted}/{len(pos_pts)} masks")
if accepted == 0:
return rooms_flood, []
return sam_mask, sam_room_masks
# ════════════════════════════════════════════════════════════════════════════
# BUILD ANNOTATED IMAGE (BOTTLENECK 11 FIX)
# ════════════════════════════════════════════════════════════════════════════
def build_annotated_image(img_bgr, rooms, selected_ids=None):
"""
BOTTLENECK 11 FIX β€” accumulate ALL room fills into a single overlay
array, then call cv2.addWeighted ONCE instead of per-room.
Border drawing and text labels remain per-room (unavoidable).
"""
vis = img_bgr.copy()
overlay = img_bgr.copy()
# ── single-pass fill accumulation ─────────────────────────────────────
for i, room in enumerate(rooms):
cnt = room.get("contour")
if cnt is None: continue
color = ROOM_COLORS[i % len(ROOM_COLORS)]
bgr = (color[2], color[1], color[0])
cv2.drawContours(overlay, [cnt], -1, bgr, -1)
# single blend for ALL fills
vis = cv2.addWeighted(overlay, 0.35, vis, 0.65, 0)
# ── per-room: border + text ───────────────────────────────────────────
for i, room in enumerate(rooms):
cnt = room.get("contour")
if cnt is None: continue
color = ROOM_COLORS[i % len(ROOM_COLORS)]
bgr = (color[2], color[1], color[0])
is_sel = selected_ids and room["id"] in selected_ids
cv2.drawContours(vis, [cnt], -1, (0,255,255) if is_sel else bgr,
4 if is_sel else 2)
M = cv2.moments(cnt)
cx = int(M["m10"]/M["m00"]) if M["m00"] else 0
cy = int(M["m01"]/M["m00"]) if M["m00"] else 0
label = room.get("label", f"Room {room['id']}")
area = room.get("area_m2", 0.0)
fs = 0.55; th = 1
(tw1, th1), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fs, th)
(tw2, th2), _ = cv2.getTextSize(f"{area:.1f} mΒ²", cv2.FONT_HERSHEY_SIMPLEX, fs-0.1, th)
bx2 = cx - max(tw1,tw2)//2 - 4; by2 = cy - th1 - th2 - 12
bw2 = max(tw1,tw2)+8; bh2 = th1+th2+16
sub = vis[max(0,by2):max(0,by2)+bh2, max(0,bx2):max(0,bx2)+bw2]
if sub.size > 0:
vis[max(0,by2):max(0,by2)+bh2, max(0,bx2):max(0,bx2)+bw2] = \
cv2.addWeighted(sub, 0.3, np.ones_like(sub)*255, 0.7, 0)
cv2.putText(vis, label, (cx-tw1//2, cy-th2-6),
cv2.FONT_HERSHEY_SIMPLEX, fs, (20,20,20), th+1, cv2.LINE_AA)
cv2.putText(vis, f"{area:.1f} mΒ²", (cx-tw2//2, cy+th2+2),
cv2.FONT_HERSHEY_SIMPLEX, fs-0.1, (20,20,20), th, cv2.LINE_AA)
return vis
def export_to_excel(rooms):
wb = openpyxl.Workbook(); ws = wb.active; ws.title = "Room Analysis"
headers = ["ID","Label","Area (px)","Area (mΒ²)","Centroid X","Centroid Y",
"Bbox X","Bbox Y","Bbox W","Bbox H","SAM Score","Confidence"]
hf = PatternFill("solid", fgColor="1F4E79"); hfont = Font(bold=True, color="FFFFFF", size=11)
for col, h in enumerate(headers,1):
cell=ws.cell(row=1,column=col,value=h)
cell.fill=hf; cell.font=hfont; cell.alignment=Alignment(horizontal="center")
alt = PatternFill("solid", fgColor="D6E4F0")
for rn, room in enumerate(rooms, 2):
cnt = room.get("contour")
M = cv2.moments(cnt) if cnt is not None else {}
cx = int(M["m10"]/M["m00"]) if M.get("m00") else 0
cy = int(M["m01"]/M["m00"]) if M.get("m00") else 0
bbox = cv2.boundingRect(cnt) if cnt is not None else (0,0,0,0)
row_data=[room.get("id"), room.get("label","?"),
round(room.get("area_px",0),1), round(room.get("area_m2",0.0),2),
cx, cy, bbox[0], bbox[1], bbox[2], bbox[3],
round(room.get("score",1.0),4), round(room.get("confidence",0.95),2)]
fill = alt if rn%2==0 else None
for col,val in enumerate(row_data,1):
cell=ws.cell(row=rn,column=col,value=val)
cell.alignment=Alignment(horizontal="center")
if fill: cell.fill=fill
for col in ws.columns:
mx=max(len(str(c.value or "")) for c in col)+4
ws.column_dimensions[col[0].column_letter].width=min(mx,25)
out = Path(tempfile.gettempdir()) / f"floorplan_rooms_{int(time.time())}.xlsx"
wb.save(str(out)); return str(out)
# ════════════════════════════════════════════════════════════════════════════
# STATE
# ════════════════════════════════════════════════════════════════════════════
def init_state():
return {"img_orig":None,"img_cropped":None,"img_clean":None,
"walls":None,"walls_base":None,"wall_cal":None,
"user_lines":[],"draw_start":None,"walls_thickness":8,
"rooms":[],"selected_ids":[],"annotated":None,"status":"Idle"}
# ════════════════════════════════════════════════════════════════════════════
# GRADIO CALLBACKS
# ════════════════════════════════════════════════════════════════════════════
def cb_load_image(upload, state):
if upload is None:
return None, state, "Upload a floor-plan image to begin."
try:
if hasattr(upload,"name"): file_path=upload.name
elif isinstance(upload,dict) and "name" in upload: file_path=upload["name"]
elif isinstance(upload,str): file_path=upload
else:
img_bgr=cv2.imdecode(np.frombuffer(bytes(upload),dtype=np.uint8),cv2.IMREAD_COLOR)
file_path=None
if file_path is not None: img_bgr=cv2.imread(file_path)
except Exception as e:
return None, state, f"❌ Error reading upload: {e}"
if img_bgr is None: return None, state, "❌ Could not decode image."
state=init_state(); state["img_orig"]=img_bgr; state["status"]="Image loaded."
return cv2.cvtColor(img_bgr,cv2.COLOR_BGR2RGB), state, f"βœ… Loaded {img_bgr.shape[1]}Γ—{img_bgr.shape[0]} px"
def cb_preprocess(state):
img=state.get("img_orig")
if img is None: return None,None,state,"Load an image first."
cropped = remove_title_block(img)
img_clean = remove_colors(cropped)
img_clean = detect_and_close_door_arcs(img_clean)
img_stats = analyze_image_characteristics(cropped)
walls, thick = extract_walls_adaptive(img_clean, img_stats)
walls = remove_fixture_symbols(walls)
walls, cal = reconstruct_walls(walls)
walls = remove_dangling_lines(walls, cal)
walls = close_large_door_gaps(walls, cal)
state["img_cropped"]=cropped; state["img_clean"]=img_clean
state["walls"]=walls.copy(); state["walls_base"]=walls.copy()
state["walls_thickness"]=thick; state["wall_cal"]=cal
walls_rgb = cv2.cvtColor(walls,cv2.COLOR_GRAY2RGB)
clean_rgb = cv2.cvtColor(img_clean,cv2.COLOR_BGR2RGB)
msg=(f"βœ… Pipeline done | strokeβ‰ˆ{cal.stroke_width}px bodyβ‰ˆ{thick}px "
f"bridge=[{cal.bridge_min_gap},{cal.bridge_max_gap}] door={cal.door_gap}px "
f"| GPU: torch={_TORCH_CUDA} cupy={_CUPY} cv2_cuda={_CV2_CUDA}")
return clean_rgb, walls_rgb, state, msg
def cb_add_door_line(evt: gr.SelectData, state):
walls=state.get("walls")
if walls is None: return None,state,"Run preprocessing first."
x,y=int(evt.index[0]),int(evt.index[1])
if state["draw_start"] is None:
state["draw_start"]=(x,y); msg=f"πŸ–Š Start ({x},{y}). Click end."
else:
x1,y1=state["draw_start"]; state["user_lines"].append((x1,y1,x,y))
state["draw_start"]=None
walls_upd=apply_user_lines_to_walls(state["walls"],state["user_lines"],state["walls_thickness"])
state["walls"]=walls_upd
vis=cv2.cvtColor(walls_upd,cv2.COLOR_GRAY2RGB)
for lx1,ly1,lx2,ly2 in state["user_lines"]: cv2.line(vis,(lx1,ly1),(lx2,ly2),(255,80,80),3)
return vis,state,f"βœ… Line drawn ({x1},{y1})β†’({x},{y}) Total:{len(state['user_lines'])}"
vis=cv2.cvtColor(walls,cv2.COLOR_GRAY2RGB)
for lx1,ly1,lx2,ly2 in state["user_lines"]: cv2.line(vis,(lx1,ly1),(lx2,ly2),(255,80,80),3)
if state["draw_start"]: cv2.circle(vis,state["draw_start"],6,(0,200,255),-1)
return vis,state,msg
def cb_undo_door_line(state):
if not state["user_lines"]: return None,state,"No lines to undo."
state["user_lines"].pop(); state["draw_start"]=None
walls_base=state.get("walls_base")
if walls_base is None: return None,state,"Re-run preprocessing."
thick=state.get("walls_thickness",8)
walls_upd=apply_user_lines_to_walls(walls_base,state["user_lines"],thick)
state["walls"]=walls_upd
vis=cv2.cvtColor(walls_upd,cv2.COLOR_GRAY2RGB)
for lx1,ly1,lx2,ly2 in state["user_lines"]: cv2.line(vis,(lx1,ly1),(lx2,ly2),(255,80,80),3)
return vis,state,f"↩ Removed. Remaining:{len(state['user_lines'])}"
def cb_run_sam(state):
walls=state.get("walls"); img=state.get("img_cropped"); img_clean=state.get("img_clean")
if walls is None or img is None: return None,None,state,"Run preprocessing first."
img_rgb=cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
ckpt=download_sam_if_needed()
sam_enabled=ckpt is not None and Path(ckpt).exists()
if sam_enabled:
rooms_mask,sam_room_masks=segment_with_sam(img_rgb,walls.copy(),ckpt)
else:
rooms_mask=segment_rooms_flood(walls.copy()); sam_room_masks=[]
state["_sam_room_masks"]=sam_room_masks
if not np.count_nonzero(rooms_mask):
return None,None,state,"⚠ rooms_mask empty."
valid_mask,valid_rooms=filter_room_regions(rooms_mask,img.shape)
if not valid_rooms: return None,None,state,"⚠ No valid rooms."
src=img_clean if img_clean is not None else img
rooms=measure_and_label_rooms(src,valid_rooms,sam_room_masks)
if not rooms: return None,None,state,"⚠ No rooms after OCR."
state["rooms"]=rooms; state["selected_ids"]=[]
annotated=build_annotated_image(img,rooms); state["annotated"]=annotated
table=[[r["id"],r["label"],f"{r['area_m2']} mΒ²",f"{r['score']:.2f}"] for r in rooms]
return cv2.cvtColor(annotated,cv2.COLOR_BGR2RGB),table,state,f"βœ… {len(rooms)} rooms detected."
def cb_click_room(evt: gr.SelectData, state):
annotated=state.get("annotated"); rooms=state.get("rooms",[]); img=state.get("img_cropped")
if annotated is None or not rooms: return None,state,"Run SAM first."
x,y=int(evt.index[0]),int(evt.index[1]); clicked_id=None
for room in rooms:
cnt=room.get("contour")
if cnt is None: continue
if cv2.pointPolygonTest(cnt,(float(x),float(y)),False)>=0:
clicked_id=room["id"]; break
if clicked_id is None:
state["selected_ids"]=[]; msg="Clicked outside β€” selection cleared."
else:
sel=state["selected_ids"]
if clicked_id in sel: sel.remove(clicked_id); msg=f"Room {clicked_id} deselected."
else: sel.append(clicked_id); msg=f"Room {clicked_id} selected."
state["selected_ids"]=sel
new_ann=build_annotated_image(img,rooms,state["selected_ids"]); state["annotated"]=new_ann
return cv2.cvtColor(new_ann,cv2.COLOR_BGR2RGB),state,msg
def cb_remove_selected(state):
sel=state.get("selected_ids",[]); rooms=state.get("rooms",[]); img=state.get("img_cropped")
if not sel: return None,None,state,"No rooms selected."
removed=[r["label"] for r in rooms if r["id"] in sel]
rooms=[r for r in rooms if r["id"] not in sel]
for i,r in enumerate(rooms,1): r["id"]=i
state["rooms"]=rooms; state["selected_ids"]=[]
ann=build_annotated_image(img,rooms); state["annotated"]=ann
table=[[r["id"],r["label"],f"{r['area_m2']} mΒ²",f"{r['score']:.2f}"] for r in rooms]
return cv2.cvtColor(ann,cv2.COLOR_BGR2RGB),table,state,f"πŸ—‘ Removed:{', '.join(removed)}"
def cb_rename_selected(new_label, state):
sel=state.get("selected_ids",[]); rooms=state.get("rooms",[]); img=state.get("img_cropped")
if not sel: return None,None,state,"Select a room first."
if not new_label.strip(): return None,None,state,"Enter a non-empty label."
for r in rooms:
if r["id"] in sel: r["label"]=new_label.strip().upper()
state["rooms"]=rooms
ann=build_annotated_image(img,rooms,sel); state["annotated"]=ann
table=[[r["id"],r["label"],f"{r['area_m2']} mΒ²",f"{r['score']:.2f}"] for r in rooms]
return cv2.cvtColor(ann,cv2.COLOR_BGR2RGB),table,state,f"✏ Renamed to '{new_label.strip().upper()}'"
def cb_export_excel(state):
rooms=state.get("rooms",[])
if not rooms: return None,"No rooms to export."
path=export_to_excel(rooms)
return path,f"βœ… Exported {len(rooms)} rooms β†’ {Path(path).name}"
# ════════════════════════════════════════════════════════════════════════════
# GRADIO UI
# ════════════════════════════════════════════════════════════════════════════
CSS = """
#title{text-align:center;font-size:1.8em;font-weight:700;color:#1F4E79}
#subtitle{text-align:center;color:#555;margin-top:-8px;margin-bottom:16px}
.step-card{border-left:4px solid #1F4E79!important;padding-left:10px!important}
"""
def _walls_to_rgb(s):
w=s.get("walls")
return None if w is None else cv2.cvtColor(w,cv2.COLOR_GRAY2RGB)
with gr.Blocks(title="FloorPlan Analyser (GPU v2)") as app:
state=gr.State(init_state())
gr.Markdown("# 🏒 Floor Plan Room Analyser β€” NVIDIA GPU Build v2", elem_id="title")
gr.Markdown(
f"EasyOCR gpu={'βœ…' if _TORCH_CUDA else '❌'} | "
f"SAM encoder FP16={'βœ…' if _TORCH_CUDA else '❌'} | "
f"CuPy={'βœ…' if _CUPY else '❌'} | "
f"cucim={'βœ…' if _CUCIM else '❌'} | "
f"cv2.cuda={'βœ…' if _CV2_CUDA else '❌'}",
elem_id="subtitle",
)
status_box=gr.Textbox(label="Status",interactive=False,value="Idle.")
with gr.Row():
with gr.Column(scale=1,elem_classes="step-card"):
gr.Markdown("### 1️⃣ Upload Floor Plan")
upload_btn=gr.UploadButton("πŸ“‚ Upload Image",file_types=["image"],size="sm")
raw_preview=gr.Image(label="Loaded Image",height=320)
with gr.Column(scale=1,elem_classes="step-card"):
gr.Markdown("### 2️⃣ Pre-process")
preprocess_btn=gr.Button("βš™ Run Preprocessing",variant="primary")
with gr.Tabs():
with gr.Tab("Clean Image"): clean_img=gr.Image(label="After color removal",height=300)
with gr.Tab("Walls"): walls_img=gr.Image(label="Extracted walls",height=300)
with gr.Row():
with gr.Column(elem_classes="step-card"):
gr.Markdown("### 3️⃣ Draw Door-Closing Lines")
undo_line_btn=gr.Button("↩ Undo Last Line",size="sm")
wall_draw_img=gr.Image(label="Wall mask",height=380,interactive=False)
with gr.Row():
with gr.Column(scale=2,elem_classes="step-card"):
gr.Markdown("### 4️⃣ SAM Segmentation + OCR")
sam_btn=gr.Button("πŸ€– Run SAM + OCR",variant="primary")
ann_img=gr.Image(label="Annotated rooms",height=480,interactive=False)
with gr.Column(scale=1,elem_classes="step-card"):
gr.Markdown("### 5️⃣ Room Table & Actions")
room_table=gr.Dataframe(headers=["ID","Label","Area","SAM Score"],
datatype=["number","str","str","str"],
interactive=False,label="Detected Rooms")
with gr.Group():
rename_txt=gr.Textbox(placeholder="New label…",label="Rename Label")
with gr.Row():
rename_btn=gr.Button("✏ Rename",size="sm")
remove_btn=gr.Button("πŸ—‘ Remove Selected",size="sm",variant="stop")
gr.Markdown("---")
export_btn=gr.Button("πŸ“Š Export to Excel",variant="secondary")
excel_file=gr.File(label="Download Excel",visible=True)
upload_btn.upload(cb_load_image,[upload_btn,state],[raw_preview,state,status_box])
preprocess_btn.click(cb_preprocess,[state],[clean_img,walls_img,state,status_box])\
.then(_walls_to_rgb,[state],[wall_draw_img])
wall_draw_img.select(cb_add_door_line,[state],[wall_draw_img,state,status_box])
undo_line_btn.click(cb_undo_door_line,[state],[wall_draw_img,state,status_box])
sam_btn.click(cb_run_sam,[state],[ann_img,room_table,state,status_box])
ann_img.select(cb_click_room,[state],[ann_img,state,status_box])
remove_btn.click(cb_remove_selected,[state],[ann_img,room_table,state,status_box])
rename_btn.click(cb_rename_selected,[rename_txt,state],[ann_img,room_table,state,status_box])
export_btn.click(cb_export_excel,[state],[excel_file,status_box])
if __name__ == "__main__":
app.launch(share=False, debug=True, css=CSS)