Geometry_Agent_ / wall_pipeline.py
Pream912's picture
Update wall_pipeline.py
bc97b83 verified
"""
Wall Extraction Pipeline
========================
EXACT algorithm from GeometryAgent v5.
Only the GPU capability detection block has been hardened to probe
actual CUDA allocations before committing β€” this prevents the
cudaErrorInsufficientDriver crash when the host driver is older
than the installed CUDA runtime.
All wall extraction logic (stages 1-8, bridging, calibration, wand)
is byte-for-byte identical to the original GeometryAgent source.
"""
from __future__ import annotations
import numpy as np
import cv2
from dataclasses import dataclass
from typing import List, Dict, Any, Tuple, Optional
# ══════════════════════════════════════════════════════════════════════════════
# GPU capability detection β€” probe before commit
#
# The key insight: CuPy/PyTorch import successfully even when the CUDA *driver*
# is too old for the installed CUDA *runtime*. The error only fires on the
# first real allocation. We do a tiny probe allocation inside a broad
# except-Exception guard so every possible CUDA error degrades gracefully.
# ══════════════════════════════════════════════════════════════════════════════
# ── CuPy ─────────────────────────────────────────────────────────────────────
try:
import cupy as _cp_probe
import cupyx.scipy.ndimage as _cpnd_probe
# Force a real CUDA context + allocation to expose driver mismatches
_cp_probe.zeros(1, dtype=_cp_probe.uint8)
# Survived β†’ re-bind to public names
import cupy as cp # type: ignore[assignment]
import cupyx.scipy.ndimage as cpnd
_GPU = True
_CUPY = True
print(f"[GPU] CuPy OK version={cp.__version__}")
except ImportError:
cp = np # type: ignore[assignment]
cpnd = None
_GPU = False
_CUPY = False
print("[GPU] CuPy not installed β€” CPU fallback")
except Exception as _ce:
# Catches CUDARuntimeError (driver too old), CUDADriverError, etc.
cp = np # type: ignore[assignment]
cpnd = None
_GPU = False
_CUPY = False
print(f"[GPU] CuPy DISABLED ({type(_ce).__name__}: {_ce})")
print("[GPU] All CuPy ops β†’ NumPy fallback")
# ── PyTorch ───────────────────────────────────────────────────────────────────
try:
import torch as _torch_probe
_TORCH = True
try:
_TORCH_CUDA = _torch_probe.cuda.is_available()
if _TORCH_CUDA:
_torch_probe.zeros(1, device="cuda") # probe real allocation
print(f"[GPU] PyTorch CUDA OK device={_torch_probe.cuda.get_device_name(0)}")
else:
print("[GPU] PyTorch: CUDA not available β€” CPU tensors")
except Exception as _te:
_TORCH_CUDA = False
print(f"[GPU] PyTorch CUDA DISABLED ({type(_te).__name__}: {_te})")
import torch
_DEVICE = torch.device("cuda" if _TORCH_CUDA else "cpu")
except ImportError:
_TORCH = _TORCH_CUDA = False
_DEVICE = None
print("[GPU] PyTorch not installed")
# ── OpenCV CUDA ───────────────────────────────────────────────────────────────
_CV_CUDA = False
try:
_n = cv2.cuda.getCudaEnabledDeviceCount()
if _n > 0:
_pm = cv2.cuda_GpuMat()
_pm.upload(np.zeros((2, 2), np.uint8)) # probe
del _pm
_CV_CUDA = True
print(f"[GPU] OpenCV CUDA OK devices={_n}")
else:
print("[GPU] OpenCV CUDA: no CUDA-enabled devices")
except AttributeError:
print("[GPU] OpenCV CUDA module absent")
except Exception as _oce:
print(f"[GPU] OpenCV CUDA DISABLED ({type(_oce).__name__}: {_oce})")
# ── scikit-image skeleton ─────────────────────────────────────────────────────
try:
from skimage.morphology import skeletonize as _sk_skel
_SKIMAGE = True
except ImportError:
_SKIMAGE = False
# ── scipy KD-tree ─────────────────────────────────────────────────────────────
try:
from scipy.spatial import cKDTree
_SCIPY = True
except ImportError:
_SCIPY = False
print(f"[GPU] Summary: CuPy={_CUPY} PyTorchCUDA={_TORCH_CUDA} OpenCV-CUDA={_CV_CUDA}")
# ══════════════════════════════════════════════════════════════════════════════
# CuPy / NumPy shims (unchanged from original)
# ══════════════════════════════════════════════════════════════════════════════
def _to_gpu(arr: np.ndarray):
return cp.asarray(arr) if _GPU else arr
def _to_cpu(arr) -> np.ndarray:
return cp.asnumpy(arr) if _GPU else arr
# ══════════════════════════════════════════════════════════════════════════════
# RLE helpers (original)
# ══════════════════════════════════════════════════════════════════════════════
def mask_to_rle(mask: np.ndarray) -> Dict[str, Any]:
h, w = mask.shape
flat = mask.flatten(order='F').astype(bool)
counts: List[int] = []
current_val = False
run = 0
for v in flat:
if v == current_val:
run += 1
else:
counts.append(run)
run = 1
current_val = v
counts.append(run)
if mask[0, 0]:
counts.insert(0, 0)
return {"counts": counts, "size": [h, w]}
def _mask_to_contour_flat(mask: np.ndarray) -> List[float]:
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
if not contours:
return []
largest = max(contours, key=cv2.contourArea)
pts = largest[:, 0, :].tolist()
return [v for pt in pts for v in pt]
# ══════════════════════════════════════════════════════════════════════════════
# Calibration dataclass (original)
# ══════════════════════════════════════════════════════════════════════════════
@dataclass
class WallCalibration:
stroke_width : int = 3
min_component_dim : int = 30
min_component_area: int = 45
bridge_min_gap : int = 2
bridge_max_gap : int = 14
door_gap : int = 41
max_bridge_thick : int = 15
def as_dict(self):
return {
"stroke_width" : self.stroke_width,
"min_component_dim" : self.min_component_dim,
"min_component_area": self.min_component_area,
"bridge_min_gap" : self.bridge_min_gap,
"bridge_max_gap" : self.bridge_max_gap,
"door_gap" : self.door_gap,
"max_bridge_thick" : self.max_bridge_thick,
}
# ══════════════════════════════════════════════════════════════════════════════
# Core pipeline class β€” EXACT original GeometryAgent implementation
# ══════════════════════════════════════════════════════════════════════════════
class WallPipeline:
"""
Stateless (per-call) wall extraction + room segmentation.
All intermediate images are stored in stage_images for the UI.
"""
MIN_ROOM_AREA_FRAC = 0.000004
MAX_ROOM_AREA_FRAC = 0.08
MIN_ROOM_DIM_FRAC = 0.01
BORDER_MARGIN_FRAC = 0.01
MAX_ASPECT_RATIO = 8.0
MIN_SOLIDITY = 0.25
MIN_EXTENT = 0.08
FIXTURE_MAX_BLOB_DIM = 80
FIXTURE_MAX_AREA = 4000
FIXTURE_MAX_ASPECT = 4.0
FIXTURE_DENSITY_RADIUS = 50
FIXTURE_DENSITY_THRESHOLD = 0.35
FIXTURE_MIN_ZONE_AREA = 1500
DOOR_ARC_MIN_RADIUS = 60
DOOR_ARC_MAX_RADIUS = 320
def __init__(self, progress_cb=None, sam_checkpoint: str = ""):
self.progress_cb = progress_cb or (lambda msg, pct: None)
self._wall_cal : Optional[WallCalibration] = None
self._wall_thickness : int = 8
self.stage_images : Dict[str, np.ndarray] = {}
self._sam_checkpoint = sam_checkpoint
def _log(self, msg: str, pct: int):
print(f" [{pct:3d}%] {msg}")
self.progress_cb(msg, pct)
def _save(self, key: str, img: np.ndarray):
self.stage_images[key] = img.copy()
# ──────────────────────────────────────────────────────────────────────────
# Public entry point (original flow, original step names)
# ──────────────────────────────────────────────────────────────────────────
def run(self, img_bgr: np.ndarray,
extra_door_lines: List[Tuple[int,int,int,int]] = None,
use_sam: bool = True,
) -> Tuple[np.ndarray, np.ndarray, WallCalibration]:
"""
Returns (wall_mask uint8, room_mask uint8, WallCalibration).
extra_door_lines: [(x1,y1,x2,y2), …] painted onto walls before seg.
"""
self.stage_images = {}
self._log("Step 1 β€” Removing title block", 5)
img = self._remove_title_block(img_bgr)
self._save("01_title_removed", img)
self._log("Step 2 β€” Removing colored annotations", 12)
img = self._remove_colors(img)
self._save("02_colors_removed", img)
self._log("Step 3 β€” Closing door arcs", 20)
img = self._close_door_arcs(img)
self._save("03_door_arcs", img)
self._log("Step 4 β€” Extracting walls", 30)
walls = self._extract_walls(img)
self._save("04_walls_raw", walls)
self._log("Step 5b β€” Removing fixture symbols", 38)
walls = self._remove_fixtures(walls)
self._save("05b_no_fixtures", walls)
self._log("Step 5c β€” Calibrating & removing thin lines", 45)
self._wall_cal = self._calibrate_wall(walls)
walls = self._remove_thin_lines_calibrated(walls)
self._save("05c_thin_removed", walls)
self._log("Step 5d β€” Bridging wall endpoints", 55)
walls = self._bridge_endpoints(walls)
self._save("05d_bridged", walls)
self._log("Step 5e β€” Closing door openings", 63)
walls = self._close_door_openings(walls)
self._save("05e_doors_closed", walls)
self._log("Step 5f β€” Removing dangling lines", 70)
walls = self._remove_dangling(walls)
self._save("05f_dangling_removed", walls)
self._log("Step 5g β€” Sealing large door gaps", 76)
walls = self._close_large_gaps(walls)
self._save("05g_large_gaps", walls)
# Paint extra door-seal lines from UI
if extra_door_lines:
self._log("Applying manual door seal lines", 79)
lw = max(3, self._wall_cal.stroke_width if self._wall_cal else 3)
for x1, y1, x2, y2 in extra_door_lines:
cv2.line(walls, (x1, y1), (x2, y2), 255, lw)
self._save("05h_manual_doors", walls)
# SAM segmentation (optional, falls back to flood-fill)
rooms = None
if use_sam and _TORCH_CUDA:
self._log("Step 7 β€” SAM segmentation [Torch GPU]", 80)
rooms = self._segment_with_sam(img_bgr, walls)
if rooms is None:
self._log("Step 7 β€” Flood-fill room segmentation", 85)
rooms = self._segment_rooms(walls)
self._save("07_rooms", rooms)
self._log("Step 8 β€” Filtering room regions", 93)
valid_mask, _ = self._filter_rooms(rooms, img_bgr.shape)
self._save("08_rooms_filtered", valid_mask)
self._log("Done", 100)
return walls, valid_mask, self._wall_cal
# ══════════════════════════════════════════════════════════════════════════
# Stage 1 β€” Remove title block (original)
# ══════════════════════════════════════════════════════════════════════════
def _remove_title_block(self, img: np.ndarray) -> np.ndarray:
h, w = img.shape[:2]
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)
h_kern = cv2.getStructuringElement(cv2.MORPH_RECT, (w // 20, 1))
v_kern = cv2.getStructuringElement(cv2.MORPH_RECT, (1, h // 20))
h_lines = cv2.morphologyEx(edges, cv2.MORPH_OPEN, h_kern)
v_lines = cv2.morphologyEx(edges, cv2.MORPH_OPEN, v_kern)
crop_right, crop_bottom = w, h
right_region = v_lines[:, int(w * 0.7):]
if np.any(right_region):
vp = np.where(np.sum(right_region, axis=0) > h * 0.3)[0]
if len(vp):
crop_right = int(w * 0.7) + vp[0] - 10
bot_region = h_lines[int(h * 0.7):, :]
if np.any(bot_region):
hp = np.where(np.sum(bot_region, axis=1) > w * 0.3)[0]
if len(hp):
crop_bottom = int(h * 0.7) + hp[0] - 10
return img[:crop_bottom, :crop_right].copy()
# ══════════════════════════════════════════════════════════════════════════
# Stage 2 β€” Remove colors (original β€” GPU via CuPy when available)
# ══════════════════════════════════════════════════════════════════════════
def _remove_colors(self, img: np.ndarray) -> np.ndarray:
if _GPU:
g_img = _to_gpu(img.astype(np.int32))
b, gch, r = g_img[:,:,0], g_img[:,:,1], g_img[:,:,2]
gray = (0.114*b + 0.587*gch + 0.299*r)
chroma = cp.maximum(cp.maximum(r,gch),b) - cp.minimum(cp.minimum(r,gch),b)
erase = (chroma > 15) & (gray < 240)
result = _to_gpu(img.copy())
result[erase] = cp.array([255,255,255], dtype=cp.uint8)
return _to_cpu(result)
else:
b = img[:,:,0].astype(np.int32)
g = img[:,:,1].astype(np.int32)
r = img[:,:,2].astype(np.int32)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY).astype(np.int32)
chroma = np.maximum(np.maximum(r,g),b) - np.minimum(np.minimum(r,g),b)
erase = (chroma > 15) & (gray < 240)
result = img.copy()
result[erase] = (255, 255, 255)
return result
# ══════════════════════════════════════════════════════════════════════════
# Stage 3 β€” Close door arcs (original)
# ══════════════════════════════════════════════════════════════════════════
def _close_door_arcs(self, img: np.ndarray) -> np.ndarray:
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
h, w = gray.shape
result = img.copy()
_, binary = cv2.threshold(gray, 0, 255,
cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, np.ones((3,3), np.uint8))
blurred = cv2.GaussianBlur(gray, (7,7), 1.5)
raw = cv2.HoughCircles(blurred, cv2.HOUGH_GRADIENT,
dp=1.2, minDist=50, param1=50, param2=22,
minRadius=self.DOOR_ARC_MIN_RADIUS,
maxRadius=self.DOOR_ARC_MAX_RADIUS)
if raw is None:
return result
circles = np.round(raw[0]).astype(np.int32)
for cx, cy, r in circles:
angles = np.linspace(0, 2*np.pi, 360, endpoint=False)
xs = np.clip((cx + r*np.cos(angles)).astype(np.int32), 0, w-1)
ys = np.clip((cy + r*np.sin(angles)).astype(np.int32), 0, h-1)
on_wall = binary[ys, xs] > 0
if not np.any(on_wall):
continue
occ = angles[on_wall]
span = float(np.degrees(occ[-1] - occ[0]))
if not (60 <= span <= 115):
continue
leaf_r = r * 0.92
n_pts = max(60, int(r))
la = np.linspace(0, 2*np.pi, n_pts, endpoint=False)
lx = np.clip((cx + leaf_r*np.cos(la)).astype(np.int32), 0, w-1)
ly = np.clip((cy + leaf_r*np.sin(la)).astype(np.int32), 0, h-1)
if float(np.mean(binary[ly, lx] > 0)) < 0.35:
continue
gap_thresh = np.radians(25.0)
diffs = np.diff(occ)
big = np.where(diffs > gap_thresh)[0]
if len(big) == 0:
start_a, end_a = occ[0], occ[-1]
else:
split = big[np.argmax(diffs[big])]
start_a, end_a = occ[split+1], occ[split]
ep1 = (int(round(cx + r*np.cos(start_a))),
int(round(cy + r*np.sin(start_a))))
ep2 = (int(round(cx + r*np.cos(end_a))),
int(round(cy + r*np.sin(end_a))))
ep1 = (np.clip(ep1[0],0,w-1), np.clip(ep1[1],0,h-1))
ep2 = (np.clip(ep2[0],0,w-1), np.clip(ep2[1],0,h-1))
cv2.line(result, ep1, ep2, (0,0,0), 3)
return result
# ══════════════════════════════════════════════════════════════════════════
# Stage 4 β€” Extract walls (exact GeometryAgent.extract_walls_adaptive)
# ══════════════════════════════════════════════════════════════════════════
def _extract_walls(self, img: np.ndarray) -> np.ndarray:
"""
Exact port of GeometryAgent.extract_walls_adaptive().
Uses analyze_image_characteristics() for the threshold, then:
H/V morph-open β†’ body dilate β†’ collision resolve β†’ distance gate
β†’ _remove_thin_lines β†’ small-CC noise filter β†’ _filter_double_lines_and_thick
"""
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
h, w = gray.shape
# ── adaptive threshold (identical to analyze_image_characteristics) ──
brightness = float(np.mean(gray))
contrast = float(np.std(gray))
otsu_thr, _ = cv2.threshold(gray, 0, 255,
cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
wall_pct = np.sum(_ > 0) / _.size * 100
if brightness > 220:
wall_threshold = max(200, int(otsu_thr * 1.1))
elif brightness < 180:
wall_threshold = max(150, int(otsu_thr * 0.9))
else:
wall_threshold = int(otsu_thr)
_, binary = cv2.threshold(gray, wall_threshold, 255, cv2.THRESH_BINARY_INV)
min_line_len = max(8, int(0.012 * w))
body_thickness = self._estimate_wall_body_thickness(binary, fallback=12)
body_thickness = int(np.clip(body_thickness, 9, 30))
print(f" min_line={min_line_len}px body={body_thickness}px (w={w}px)")
k_h = cv2.getStructuringElement(cv2.MORPH_RECT, (min_line_len, 1))
k_v = cv2.getStructuringElement(cv2.MORPH_RECT, (1, min_line_len))
long_h = cv2.morphologyEx(binary, cv2.MORPH_OPEN, k_h)
long_v = cv2.morphologyEx(binary, cv2.MORPH_OPEN, k_v)
orig_walls = cv2.bitwise_or(long_h, long_v)
k_bh = cv2.getStructuringElement(cv2.MORPH_RECT, (1, body_thickness))
k_bv = cv2.getStructuringElement(cv2.MORPH_RECT, (body_thickness, 1))
dilated_h = cv2.dilate(long_h, k_bh)
dilated_v = cv2.dilate(long_v, k_bv)
walls = cv2.bitwise_or(dilated_h, dilated_v)
collision = cv2.bitwise_and(dilated_h, dilated_v)
safe_zone = cv2.bitwise_and(collision, orig_walls)
walls = cv2.bitwise_or(
cv2.bitwise_and(walls, cv2.bitwise_not(collision)),
safe_zone
)
dist = cv2.distanceTransform(cv2.bitwise_not(orig_walls), cv2.DIST_L2, 5)
keep_mask = (dist <= (body_thickness / 2)).astype(np.uint8) * 255
walls = cv2.bitwise_and(walls, keep_mask)
walls = self._remove_thin_lines(walls, min_thickness=body_thickness)
n_lbl, labels, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8)
if n_lbl > 1:
areas = stats[1:, cv2.CC_STAT_AREA]
min_noise = max(20, int(np.median(areas) * 0.0001))
keep_lut = np.zeros(n_lbl, dtype=np.uint8)
keep_lut[1:] = (areas >= min_noise).astype(np.uint8)
walls = (keep_lut[labels] * 255).astype(np.uint8)
walls = self._filter_double_lines_and_thick(walls)
self._wall_thickness = body_thickness
print(f" Walls: {np.count_nonzero(walls)} px "
f"({100*np.count_nonzero(walls)/walls.size:.1f}%)")
return walls
def _estimate_wall_body_thickness(self, binary: np.ndarray,
fallback: int = 12) -> int:
"""Exact GeometryAgent._estimate_wall_body_thickness β€” vectorised column scan."""
try:
h, w = binary.shape
n_cols = min(200, w)
col_indices = np.linspace(0, w - 1, n_cols, dtype=int)
cols = (binary[:, col_indices] > 0).astype(np.int8)
padded = np.concatenate(
[np.zeros((1, n_cols), dtype=np.int8), cols,
np.zeros((1, n_cols), dtype=np.int8)], axis=0
)
diff = np.diff(padded.astype(np.int16), axis=0)
run_lengths = []
for ci in range(n_cols):
d = diff[:, ci]
starts = np.where(d == 1)[0]
ends = np.where(d == -1)[0]
if len(starts) == 0 or len(ends) == 0:
continue
runs = ends - starts
runs = runs[(runs >= 2) & (runs <= h * 0.15)]
if len(runs):
run_lengths.append(runs)
if run_lengths:
all_runs = np.concatenate(run_lengths)
thickness = int(np.median(all_runs))
print(f" [WallThickness] Estimated: {thickness} px")
return thickness
except Exception as exc:
print(f" [WallThickness] Estimation failed ({exc}), fallback={fallback}")
return fallback
def _remove_thin_lines(self, walls: np.ndarray,
min_thickness: int) -> np.ndarray:
"""Exact GeometryAgent._remove_thin_lines β€” distance transform CC gate."""
dist = cv2.distanceTransform(walls, cv2.DIST_L2, 5)
thick_mask = dist >= (min_thickness / 2)
n_lbl, labels, _, _ = cv2.connectedComponentsWithStats(walls, connectivity=8)
if n_lbl <= 1:
return walls
thick_labels = labels[thick_mask]
if len(thick_labels) == 0:
return np.zeros_like(walls)
has_thick = np.zeros(n_lbl, dtype=bool)
has_thick[thick_labels] = True
keep_lut = has_thick.astype(np.uint8) * 255
keep_lut[0] = 0
return keep_lut[labels]
def _filter_double_lines_and_thick(
self,
walls: np.ndarray,
min_single_dim: int = 20,
double_line_gap: int = 60,
double_line_search_pct: int = 12,
) -> np.ndarray:
"""
Exact GeometryAgent._filter_double_lines_and_thick.
Keeps blobs that either:
(a) have min(bbox_w, bbox_h) >= min_single_dim (proper wall body), OR
(b) have a parallel partner blob within double_line_gap px
(double-line wall conventions used in CAD drawings).
"""
n_lbl, labels, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8)
if n_lbl <= 1:
return walls
# Try ximgproc thinning, fall back to morphological skeleton
try:
skel_full = cv2.ximgproc.thinning(
walls, thinningType=cv2.ximgproc.THINNING_ZHANGSUEN
)
except AttributeError:
skel_full = self._morphological_skeleton(walls)
skel_bin = (skel_full > 0)
keep_ids: set = set()
thin_candidates = []
for i in range(1, n_lbl):
bw = int(stats[i, cv2.CC_STAT_WIDTH])
bh = int(stats[i, cv2.CC_STAT_HEIGHT])
if min(bw, bh) >= min_single_dim:
keep_ids.add(i)
else:
thin_candidates.append(i)
if not thin_candidates:
filtered = np.zeros_like(walls)
for i in keep_ids:
filtered[labels == i] = 255
print(f" [DblLineFilter] Kept {len(keep_ids)}/{n_lbl-1} blobs "
"(all passed size test)")
return filtered
skel_labels = labels * skel_bin
img_h, img_w = labels.shape
probe_dists = np.arange(3, double_line_gap + 1, 3, dtype=np.float32)
for i in thin_candidates:
blob_skel_ys, blob_skel_xs = np.where(skel_labels == i)
if len(blob_skel_ys) < 4:
continue
step = max(1, len(blob_skel_ys) // 80)
sy = blob_skel_ys[::step].astype(np.float32)
sx = blob_skel_xs[::step].astype(np.float32)
n_s = len(sy)
sy_prev = np.roll(sy, 1); sy_prev[0] = sy[0]
sy_next = np.roll(sy, -1); sy_next[-1] = sy[-1]
sx_prev = np.roll(sx, 1); sx_prev[0] = sx[0]
sx_next = np.roll(sx, -1); sx_next[-1] = sx[-1]
dr = (sy_next - sy_prev).astype(np.float32)
dc = (sx_next - sx_prev).astype(np.float32)
dlen = np.maximum(1.0, np.hypot(dr, dc))
pr = (-dc / dlen)[:, np.newaxis]
pc = ( dr / dlen)[:, np.newaxis]
for sign in (1.0, -1.0):
rr = np.round(sy[:, np.newaxis] + sign * pr * probe_dists).astype(np.int32)
cc = np.round(sx[:, np.newaxis] + sign * pc * probe_dists).astype(np.int32)
valid = (rr >= 0) & (rr < img_h) & (cc >= 0) & (cc < img_w)
safe_rr = np.clip(rr, 0, img_h - 1)
safe_cc = np.clip(cc, 0, img_w - 1)
lbl_at = labels[safe_rr, safe_cc]
partner_mask = valid & (lbl_at > 0) & (lbl_at != i)
hit_any = partner_mask.any(axis=1)
hit_rows = np.where(hit_any)[0]
if len(hit_rows) == 0:
continue
first_hit_col = partner_mask[hit_rows].argmax(axis=1)
partner_ids = lbl_at[hit_rows, first_hit_col]
keep_ids.update(partner_ids.tolist())
if 100.0 * len(hit_rows) / n_s >= double_line_search_pct:
keep_ids.add(i)
break
if keep_ids:
keep_arr = np.array(sorted(keep_ids), dtype=np.int32)
keep_lut = np.zeros(n_lbl, dtype=np.uint8)
keep_lut[keep_arr] = 255
filtered = keep_lut[labels]
else:
filtered = np.zeros_like(walls)
print(f" [DblLineFilter] Kept {len(keep_ids)}/{n_lbl-1} blobs "
f"(min_dim>={min_single_dim}px OR double-line partner found)")
return filtered
# ══════════════════════════════════════════════════════════════════════════
# Stage 5b β€” Remove fixture symbols (original)
# ══════════════════════════════════════════════════════════════════════════
def _remove_fixtures(self, walls: np.ndarray) -> np.ndarray:
h, w = walls.shape
n, labels, stats, centroids = cv2.connectedComponentsWithStats(
walls, connectivity=8)
if n <= 1:
return walls
bw = stats[1:, cv2.CC_STAT_WIDTH].astype(np.float32)
bh = stats[1:, cv2.CC_STAT_HEIGHT].astype(np.float32)
ar = stats[1:, cv2.CC_STAT_AREA].astype(np.float32)
cx = np.round(centroids[1:, 0]).astype(np.int32)
cy = np.round(centroids[1:, 1]).astype(np.int32)
maxs = np.maximum(bw, bh)
mins = np.minimum(bw, bh)
asp = maxs / (mins + 1e-6)
cand = ((bw < self.FIXTURE_MAX_BLOB_DIM) & (bh < self.FIXTURE_MAX_BLOB_DIM)
& (ar < self.FIXTURE_MAX_AREA) & (asp <= self.FIXTURE_MAX_ASPECT))
ci = np.where(cand)[0]
if len(ci) == 0:
return walls
heatmap = np.zeros((h, w), dtype=np.float32)
r_heat = int(self.FIXTURE_DENSITY_RADIUS)
for px, py in zip(cx[ci].tolist(), cy[ci].tolist()):
cv2.circle(heatmap, (px, py), r_heat, 1.0, -1)
blur_k = max(3, (r_heat // 2) | 1)
density = cv2.GaussianBlur(heatmap, (blur_k*4+1, blur_k*4+1), blur_k)
d_max = float(density.max())
if d_max > 0:
density /= d_max
zone = (density >= self.FIXTURE_DENSITY_THRESHOLD).astype(np.uint8) * 255
n_z, z_labels, z_stats, _ = cv2.connectedComponentsWithStats(zone)
clean = np.zeros_like(zone)
if n_z > 1:
za = z_stats[1:, cv2.CC_STAT_AREA]
kz = np.where(za >= self.FIXTURE_MIN_ZONE_AREA)[0] + 1
if len(kz):
lut = np.zeros(n_z, np.uint8)
lut[kz] = 255
clean = lut[z_labels]
zone = clean
valid = (cy[ci].clip(0,h-1) >= 0) & (cx[ci].clip(0,w-1) >= 0)
in_zone = valid & (zone[cy[ci].clip(0,h-1), cx[ci].clip(0,w-1)] > 0)
erase_ids= ci[in_zone] + 1
result = walls.copy()
if len(erase_ids):
lut = np.zeros(n, np.uint8)
lut[erase_ids] = 1
result[(lut[labels]).astype(bool)] = 0
return result
# ══════════════════════════════════════════════════════════════════════════
# Stage 5c β€” Calibrate wall + remove thin lines (original)
# ══════════════════════════════════════════════════════════════════════════
def _calibrate_wall(self, mask: np.ndarray) -> WallCalibration:
cal = WallCalibration()
h, w = mask.shape
n_cols = min(200, w)
col_idx = np.linspace(0, w-1, n_cols, dtype=int)
runs = []
max_run = max(2, int(h * 0.05))
for ci in col_idx:
col = (mask[:, ci] > 0).astype(np.int8)
pad = np.concatenate([[0], col, [0]])
d = np.diff(pad.astype(np.int16))
s = np.where(d == 1)[0]
e = np.where(d == -1)[0]
n_ = min(len(s), len(e))
r = (e[:n_] - s[:n_]).astype(int)
runs.extend(r[(r >= 1) & (r <= max_run)].tolist())
if runs:
arr = np.array(runs, np.int32)
hist = np.bincount(np.clip(arr, 0, 200))
cal.stroke_width = max(2, int(np.argmax(hist[1:])) + 1)
cal.min_component_dim = max(15, cal.stroke_width * 10)
cal.min_component_area = max(30, cal.stroke_width * cal.min_component_dim // 2)
gap_sizes = []
row_step = max(3, h // 200)
col_step = max(3, w // 200)
for row in range(5, h-5, row_step):
rd = (mask[row, :] > 0).astype(np.int8)
pad = np.concatenate([[0], rd, [0]])
dif = np.diff(pad.astype(np.int16))
ends = np.where(dif == -1)[0]
starts = np.where(dif == 1)[0]
for e in ends:
nxt = starts[starts > e]
if len(nxt):
g = int(nxt[0] - e)
if 1 < g < 200:
gap_sizes.append(g)
for col in range(5, w-5, col_step):
cd = (mask[:, col] > 0).astype(np.int8)
pad = np.concatenate([[0], cd, [0]])
dif = np.diff(pad.astype(np.int16))
ends = np.where(dif == -1)[0]
starts = np.where(dif == 1)[0]
for e in ends:
nxt = starts[starts > e]
if len(nxt):
g = int(nxt[0] - e)
if 1 < g < 200:
gap_sizes.append(g)
cal.bridge_min_gap = 2
if len(gap_sizes) >= 20:
g = np.array(gap_sizes)
sm = g[g <= 30]
if len(sm) >= 10:
cal.bridge_max_gap = int(np.clip(np.percentile(sm, 75), 4, 20))
else:
cal.bridge_max_gap = cal.stroke_width * 4
door = g[(g > cal.bridge_max_gap) & (g <= 80)]
if len(door) >= 5:
raw = int(np.percentile(door, 90))
else:
raw = max(35, cal.stroke_width * 12)
raw = int(np.clip(raw, 25, 80))
cal.door_gap = raw if raw % 2 == 1 else raw + 1
cal.max_bridge_thick = cal.stroke_width * 5
self._wall_thickness = cal.stroke_width
return cal
def _remove_thin_lines_calibrated(self, walls: np.ndarray) -> np.ndarray:
cal = self._wall_cal
n, cc, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8)
if n <= 1:
return walls
bw = stats[1:, cv2.CC_STAT_WIDTH]
bh = stats[1:, cv2.CC_STAT_HEIGHT]
ar = stats[1:, cv2.CC_STAT_AREA]
mx = np.maximum(bw, bh)
keep = (mx >= cal.min_component_dim) | (ar >= cal.min_component_area * 3)
lut = np.zeros(n, np.uint8)
lut[1:] = keep.astype(np.uint8) * 255
return lut[cc]
# ══════════════════════════════════════════════════════════════════════════
# Stage 5d β€” Bridge endpoints (original)
# ══════════════════════════════════════════════════════════════════════════
def _skel(self, binary: np.ndarray) -> np.ndarray:
if _SKIMAGE:
return (_sk_skel(binary > 0) * 255).astype(np.uint8)
return self._morphological_skeleton(binary)
def _morphological_skeleton(self, binary: np.ndarray) -> np.ndarray:
skel = np.zeros_like(binary)
img = binary.copy()
cross = cv2.getStructuringElement(cv2.MORPH_CROSS, (3,3))
for _ in range(300):
eroded = cv2.erode(img, cross)
temp = cv2.subtract(img, cv2.dilate(eroded, cross))
skel = cv2.bitwise_or(skel, temp)
img = eroded
if not cv2.countNonZero(img):
break
return skel
def _tip_pixels(self, skel: np.ndarray):
sb = (skel > 0).astype(np.float32)
nbr = cv2.filter2D(sb, -1, np.ones((3,3), np.float32),
borderType=cv2.BORDER_CONSTANT)
return np.where((sb == 1) & (nbr.astype(np.int32) == 2))
def _outward_vectors(self, ex, ey, skel, lookahead):
n = len(ex)
odx = np.zeros(n, np.float32)
ody = np.zeros(n, np.float32)
sy, sx = np.where(skel > 0)
skel_set = set(zip(sx.tolist(), sy.tolist()))
D8 = [(-1,0),(1,0),(0,-1),(0,1),(-1,-1),(-1,1),(1,-1),(1,1)]
for i in range(n):
ox, oy = int(ex[i]), int(ey[i])
cx, cy = ox, oy
px, py = ox, oy
for _ in range(lookahead):
moved = False
for dx, dy in D8:
nx_, ny_ = cx+dx, cy+dy
if (nx_, ny_) == (px, py):
continue
if (nx_, ny_) in skel_set:
px, py = cx, cy
cx, cy = nx_, ny_
moved = True
break
if not moved:
break
ix, iy = float(cx-ox), float(cy-oy)
nr = max(1e-6, np.hypot(ix, iy))
odx[i], ody[i] = -ix/nr, -iy/nr
return odx, ody
def _bridge_endpoints(self, walls: np.ndarray) -> np.ndarray:
cal = self._wall_cal
result = walls.copy()
h, w = walls.shape
FCOS = np.cos(np.radians(70.0))
skel = self._skel(walls)
ey, ex = self._tip_pixels(skel)
n_ep = len(ey)
if n_ep < 2:
return result
_, cc_map = cv2.connectedComponents(walls, connectivity=8)
ep_cc = cc_map[ey, ex]
lookahead = max(8, cal.stroke_width * 3)
out_dx, out_dy = self._outward_vectors(ex, ey, skel, lookahead)
pts = np.stack([ex, ey], axis=1).astype(np.float32)
if _SCIPY:
pairs = cKDTree(pts).query_pairs(float(cal.bridge_max_gap), output_type='ndarray')
ii = pairs[:,0].astype(np.int64)
jj = pairs[:,1].astype(np.int64)
else:
_ii, _jj = np.triu_indices(n_ep, k=1)
ok = np.hypot(pts[_jj,0]-pts[_ii,0], pts[_jj,1]-pts[_ii,1]) <= cal.bridge_max_gap
ii = _ii[ok].astype(np.int64)
jj = _jj[ok].astype(np.int64)
if len(ii) == 0:
return result
dxij = pts[jj,0] - pts[ii,0]
dyij = pts[jj,1] - pts[ii,1]
dists = np.hypot(dxij, dyij)
safe = np.maximum(dists, 1e-6)
ux, uy = dxij/safe, dyij/safe
ang = np.degrees(np.arctan2(np.abs(dyij), np.abs(dxij)))
is_H = ang <= 15.0
is_V = ang >= 75.0
g1 = (dists >= cal.bridge_min_gap) & (dists <= cal.bridge_max_gap)
g2 = is_H | is_V
g3 = ((out_dx[ii]*ux + out_dy[ii]*uy) >= FCOS) & \
((out_dx[jj]*-ux + out_dy[jj]*-uy) >= FCOS)
g4 = ep_cc[ii] != ep_cc[jj]
pre_ok = g1 & g2 & g3 & g4
pre_idx = np.where(pre_ok)[0]
N_SAMP = 9
clr = np.ones(len(pre_idx), dtype=bool)
for k, pidx in enumerate(pre_idx):
ia, ib = int(ii[pidx]), int(jj[pidx])
ax, ay = int(ex[ia]), int(ey[ia])
bx, by = int(ex[ib]), int(ey[ib])
if is_H[pidx]:
xs = np.linspace(ax, bx, N_SAMP, np.float32)
ys = np.full(N_SAMP, ay, np.float32)
else:
xs = np.full(N_SAMP, ax, np.float32)
ys = np.linspace(ay, by, N_SAMP, np.float32)
sxs = np.clip(np.round(xs[1:-1]).astype(np.int32), 0, w-1)
sys_ = np.clip(np.round(ys[1:-1]).astype(np.int32), 0, h-1)
if np.any(walls[sys_, sxs] > 0):
clr[k] = False
valid = pre_idx[clr]
if len(valid) == 0:
return result
vi = ii[valid]; vj = jj[valid]
vd = dists[valid]; vH = is_H[valid]
order = np.argsort(vd)
vi, vj, vd, vH = vi[order], vj[order], vd[order], vH[order]
used = np.zeros(n_ep, dtype=bool)
for k in range(len(vi)):
ia, ib = int(vi[k]), int(vj[k])
if used[ia] or used[ib]:
continue
ax, ay = int(ex[ia]), int(ey[ia])
bx, by = int(ex[ib]), int(ey[ib])
p1, p2 = ((min(ax,bx),ay),(max(ax,bx),ay)) if vH[k] \
else ((ax,min(ay,by)),(ax,max(ay,by)))
cv2.line(result, p1, p2, 255, cal.stroke_width)
used[ia] = used[ib] = True
return result
# ══════════════════════════════════════════════════════════════════════════
# Stage 5e β€” Close door openings (original)
# ══════════════════════════════════════════════════════════════════════════
def _close_door_openings(self, walls: np.ndarray) -> np.ndarray:
cal = self._wall_cal
gap = cal.door_gap
def _shape_close(mask, kwh, axis, max_thick):
k = cv2.getStructuringElement(cv2.MORPH_RECT, kwh)
cls = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k)
new = cv2.bitwise_and(cls, cv2.bitwise_not(mask))
if not np.any(new):
return np.zeros_like(mask)
n, lbl, stats, _ = cv2.connectedComponentsWithStats(new, connectivity=8)
if n <= 1:
return np.zeros_like(mask)
perp = stats[1:, cv2.CC_STAT_HEIGHT if axis == 'H' else cv2.CC_STAT_WIDTH]
keep = perp <= max_thick
lut = np.zeros(n, np.uint8)
lut[1:] = keep.astype(np.uint8) * 255
return lut[lbl]
add_h = _shape_close(walls, (gap,1), 'H', cal.max_bridge_thick)
add_v = _shape_close(walls, (1,gap), 'V', cal.max_bridge_thick)
return cv2.bitwise_or(walls, cv2.bitwise_or(add_h, add_v))
# ══════════════════════════════════════════════════════════════════════════
# Stage 5f β€” Remove dangling lines (original)
# ══════════════════════════════════════════════════════════════════════════
def _remove_dangling(self, walls: np.ndarray) -> np.ndarray:
stroke = self._wall_cal.stroke_width if self._wall_cal else self._wall_thickness
connect_radius = max(6, stroke * 3)
n, cc_map, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8)
if n <= 1:
return walls
skel = self._skel(walls)
tip_y, tip_x = self._tip_pixels(skel)
tip_cc = cc_map[tip_y, tip_x]
free_counts = np.zeros(n, np.int32)
for i in range(len(tip_x)):
free_counts[tip_cc[i]] += 1
remove = np.zeros(n, dtype=bool)
for cc_id in range(1, n):
if free_counts[cc_id] < 2:
continue
bw_ = int(stats[cc_id, cv2.CC_STAT_WIDTH])
bh_ = int(stats[cc_id, cv2.CC_STAT_HEIGHT])
if max(bw_, bh_) > stroke * 40:
continue
comp = (cc_map == cc_id).astype(np.uint8)
dcomp = cv2.dilate(comp, cv2.getStructuringElement(
cv2.MORPH_ELLIPSE, (connect_radius*2+1, connect_radius*2+1)))
overlap = cv2.bitwise_and(
dcomp, ((walls > 0) & (cc_map != cc_id)).astype(np.uint8))
if np.count_nonzero(overlap) == 0:
remove[cc_id] = True
lut = np.ones(n, np.uint8); lut[0] = 0; lut[remove] = 0
return (lut[cc_map] * 255).astype(np.uint8)
# ══════════════════════════════════════════════════════════════════════════
# Stage 5g β€” Close large gaps (original)
# ══════════════════════════════════════════════════════════════════════════
def _close_large_gaps(self, walls: np.ndarray) -> np.ndarray:
DOOR_MIN_GAP = 180
DOOR_MAX_GAP = 320
ANGLE_TOL_DEG = 12.0
FCOS = np.cos(np.radians(90.0 - ANGLE_TOL_DEG))
stroke = self._wall_cal.stroke_width if self._wall_cal else self._wall_thickness
line_width = max(stroke, 3)
result = walls.copy()
h, w = walls.shape
skel = self._skel(walls)
tip_y, tip_x = self._tip_pixels(skel)
n_ep = len(tip_x)
if n_ep < 2:
return result
_, cc_map = cv2.connectedComponents(walls, connectivity=8)
ep_cc = cc_map[tip_y, tip_x]
lookahead = max(12, stroke * 4)
out_dx, out_dy = self._outward_vectors(tip_x, tip_y, skel, lookahead)
pts = np.stack([tip_x, tip_y], axis=1).astype(np.float32)
if _SCIPY:
pairs = cKDTree(pts).query_pairs(float(DOOR_MAX_GAP), output_type='ndarray')
ii = pairs[:,0].astype(np.int64)
jj = pairs[:,1].astype(np.int64)
else:
_ii, _jj = np.triu_indices(n_ep, k=1)
ok = np.hypot(pts[_jj,0]-pts[_ii,0], pts[_jj,1]-pts[_ii,1]) <= DOOR_MAX_GAP
ii = _ii[ok].astype(np.int64)
jj = _jj[ok].astype(np.int64)
if len(ii) == 0:
return result
dxij = pts[jj,0] - pts[ii,0]
dyij = pts[jj,1] - pts[ii,1]
dists = np.hypot(dxij, dyij)
safe = np.maximum(dists, 1e-6)
ux, uy = dxij/safe, dyij/safe
ang = np.degrees(np.arctan2(np.abs(dyij), np.abs(dxij)))
is_H = ang <= ANGLE_TOL_DEG
is_V = ang >= (90.0 - ANGLE_TOL_DEG)
g1 = (dists >= DOOR_MIN_GAP) & (dists <= DOOR_MAX_GAP)
g2 = is_H | is_V
g3 = ((out_dx[ii]*ux + out_dy[ii]*uy) >= FCOS) & \
((out_dx[jj]*-ux + out_dy[jj]*-uy) >= FCOS)
g4 = ep_cc[ii] != ep_cc[jj]
pre_ok = g1 & g2 & g3 & g4
pre_idx = np.where(pre_ok)[0]
N_SAMP = 15
clr = np.ones(len(pre_idx), dtype=bool)
for k, pidx in enumerate(pre_idx):
ia, ib = int(ii[pidx]), int(jj[pidx])
ax, ay = int(tip_x[ia]), int(tip_y[ia])
bx, by = int(tip_x[ib]), int(tip_y[ib])
if is_H[pidx]:
xs = np.linspace(ax, bx, N_SAMP, np.float32)
ys = np.full(N_SAMP, (ay+by)/2.0, np.float32)
else:
xs = np.full(N_SAMP, (ax+bx)/2.0, np.float32)
ys = np.linspace(ay, by, N_SAMP, np.float32)
sxs = np.clip(np.round(xs[1:-1]).astype(np.int32), 0, w-1)
sys_ = np.clip(np.round(ys[1:-1]).astype(np.int32), 0, h-1)
if np.any(walls[sys_, sxs] > 0):
clr[k] = False
valid = pre_idx[clr]
if len(valid) == 0:
return result
vi = ii[valid]; vj = jj[valid]
vd = dists[valid]; vH = is_H[valid]
order = np.argsort(vd)
vi, vj, vd, vH = vi[order], vj[order], vd[order], vH[order]
used = np.zeros(n_ep, dtype=bool)
for k in range(len(vi)):
ia, ib = int(vi[k]), int(vj[k])
if used[ia] or used[ib]:
continue
ax, ay = int(tip_x[ia]), int(tip_y[ia])
bx, by = int(tip_x[ib]), int(tip_y[ib])
if vH[k]:
p1 = (min(ax,bx), (ay+by)//2)
p2 = (max(ax,bx), (ay+by)//2)
else:
p1 = ((ax+bx)//2, min(ay,by))
p2 = ((ax+bx)//2, max(ay,by))
cv2.line(result, p1, p2, 255, line_width)
used[ia] = used[ib] = True
return result
# ══════════════════════════════════════════════════════════════════════════
# Stage 7 β€” Flood-fill segmentation (original)
# ══════════════════════════════════════════════════════════════════════════
def _segment_rooms(self, walls: np.ndarray) -> np.ndarray:
h, w = walls.shape
walls = walls.copy()
walls[:5,:] = 255; walls[-5:,:] = 255
walls[:,:5] = 255; walls[:,-5:] = 255
filled = walls.copy()
mask = np.zeros((h+2, w+2), np.uint8)
for sx, sy in [(0,0),(w-1,0),(0,h-1),(w-1,h-1),
(w//2,0),(w//2,h-1),(0,h//2),(w-1,h//2)]:
if filled[sy, sx] == 0:
cv2.floodFill(filled, mask, (sx, sy), 255)
rooms = cv2.bitwise_not(filled)
rooms = cv2.bitwise_and(rooms, cv2.bitwise_not(walls))
rooms = cv2.morphologyEx(rooms, cv2.MORPH_OPEN, np.ones((2,2), np.uint8))
return rooms
# ══════════════════════════════════════════════════════════════════════════
# Stage 7 (optional) β€” SAM segmentation (GPU Torch)
# ══════════════════════════════════════════════════════════════════════════
def _segment_with_sam(self, orig_bgr: np.ndarray,
walls: np.ndarray) -> Optional[np.ndarray]:
"""GPU SAM pass; returns mask or None to trigger flood-fill fallback."""
if not _TORCH_CUDA:
return None
predictor = self._get_sam_predictor()
if predictor is None:
return None
try:
import torch
h, w = walls.shape
flood = self._segment_rooms(walls)
n, labels, stats, centroids = cv2.connectedComponentsWithStats(
cv2.bitwise_not(walls), 8)
pos_pts = []
for i in range(1, n):
if int(stats[i, cv2.CC_STAT_AREA]) < 300:
continue
bx,by,bw,bh = (int(stats[i,cv2.CC_STAT_LEFT]),
int(stats[i,cv2.CC_STAT_TOP]),
int(stats[i,cv2.CC_STAT_WIDTH]),
int(stats[i,cv2.CC_STAT_HEIGHT]))
if bx<=5 and by<=5 and bx+bw>=w-5 and by+bh>=h-5:
continue
cx_ = int(np.clip(centroids[i][0], 0, w-1))
cy_ = int(np.clip(centroids[i][1], 0, h-1))
if walls[cy_, cx_] > 0:
continue
pos_pts.append((cx_, cy_))
if not pos_pts:
return None
rgb = cv2.cvtColor(orig_bgr, cv2.COLOR_BGR2RGB)
predictor.set_image(rgb)
sam_mask = np.zeros((h,w), np.uint8)
dk = cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(5,5))
for px, py in pos_pts:
pi = np.array([[px,py]], np.float32)
pl = np.array([1], np.int32)
with torch.inference_mode():
masks, scores, _ = predictor.predict(
point_coords=pi, point_labels=pl, multimask_output=True)
best = int(np.argmax(scores))
if float(scores[best]) < 0.70:
continue
m = (masks[best]>0).astype(np.uint8)*255
m = cv2.bitwise_and(m, flood)
m = cv2.morphologyEx(m, cv2.MORPH_OPEN, dk)
if np.any(m):
sam_mask = cv2.bitwise_or(sam_mask, m)
return sam_mask if np.any(sam_mask) else None
except Exception as exc:
import traceback
print(f"[SAM] Error: {exc}\n{traceback.format_exc()}")
return None
_sam_predictor_cache = None
def _get_sam_predictor(self):
if WallPipeline._sam_predictor_cache is not None:
return WallPipeline._sam_predictor_cache
ckpt = self._sam_checkpoint
if not ckpt or not os.path.isfile(ckpt):
ckpt = self._download_sam_checkpoint()
if not ckpt or not os.path.isfile(ckpt):
return None
try:
from segment_anything import sam_model_registry, SamPredictor
name = os.path.basename(ckpt).lower()
mtype = ("vit_h" if "vit_h" in name else
"vit_l" if "vit_l" in name else
"vit_b" if "vit_b" in name else "vit_h")
import torch
sam = sam_model_registry[mtype](checkpoint=ckpt)
sam.to(device="cuda"); sam.eval()
WallPipeline._sam_predictor_cache = SamPredictor(sam)
print(f"[SAM] {mtype} loaded on cuda")
except Exception as exc:
print(f"[SAM] Load failed: {exc}")
WallPipeline._sam_predictor_cache = None
return WallPipeline._sam_predictor_cache
@staticmethod
def _download_sam_checkpoint() -> str:
import os
dest = os.path.join(".models", "sam", "sam_vit_h_4b8939.pth")
if os.path.isfile(dest):
return dest
try:
from huggingface_hub import hf_hub_download
os.makedirs(os.path.dirname(dest), exist_ok=True)
path = hf_hub_download(
repo_id="facebook/sam-vit-huge",
filename="sam_vit_h_4b8939.pth",
local_dir=os.path.dirname(dest))
return path
except Exception as exc:
print(f"[SAM] Download failed: {exc}")
return ""
# ══════════════════════════════════════════════════════════════════════════
# Stage 8 β€” Filter room regions (original)
# ══════════════════════════════════════════════════════════════════════════
def _filter_rooms(self, rooms_mask: np.ndarray,
img_shape: Tuple) -> Tuple[np.ndarray, List]:
h, w = img_shape[:2]
img_area = float(h * w)
min_area = img_area * self.MIN_ROOM_AREA_FRAC
max_area = img_area * self.MAX_ROOM_AREA_FRAC
min_dim = w * self.MIN_ROOM_DIM_FRAC
margin = max(5.0, w * self.BORDER_MARGIN_FRAC)
contours, _ = cv2.findContours(rooms_mask, cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE)
if not contours:
return np.zeros_like(rooms_mask), []
valid_mask = np.zeros_like(rooms_mask)
valid_rooms = []
for cnt in contours:
area = cv2.contourArea(cnt)
if not (min_area <= area <= max_area):
continue
bx, by, bw, bh = cv2.boundingRect(cnt)
if bx < margin or by < margin or bx+bw > w-margin or by+bh > h-margin:
continue
if not (bw >= min_dim or bh >= min_dim):
continue
asp = max(bw,bh) / (min(bw,bh) + 1e-6)
if asp > self.MAX_ASPECT_RATIO:
continue
if (area / (bw*bh + 1e-6)) < self.MIN_EXTENT:
continue
hull = cv2.convexHull(cnt)
ha = cv2.contourArea(hull)
if ha > 0 and (area / ha) < self.MIN_SOLIDITY:
continue
cv2.drawContours(valid_mask, [cnt], -1, 255, -1)
valid_rooms.append(cnt)
return valid_mask, valid_rooms
# ══════════════════════════════════════════════════════════════════════════
# Wand β€” click-to-segment (original)
# ══════════════════════════════════════════════════════════════════════════
def wand_segment(self, walls: np.ndarray, click_x: int, click_y: int,
existing_rooms: List[Dict]) -> Optional[Dict]:
"""Flood-fill from click point β†’ return new room dict or None."""
h, w = walls.shape
if not (0 <= click_x < w and 0 <= click_y < h):
return None
if walls[click_y, click_x] > 0:
return None # clicked on a wall
tmp = walls.copy()
tmp[:5,:] = 255; tmp[-5:,:] = 255
tmp[:,:5] = 255; tmp[:,-5:] = 255
filled = tmp.copy()
mask = np.zeros((h+2, w+2), np.uint8)
for sx, sy in [(0,0),(w-1,0),(0,h-1),(w-1,h-1),
(w//2,0),(w//2,h-1),(0,h//2),(w-1,h//2)]:
if filled[sy, sx] == 0:
cv2.floodFill(filled, mask, (sx, sy), 255)
rooms = cv2.bitwise_not(filled)
rooms = cv2.bitwise_and(rooms, cv2.bitwise_not(tmp))
if rooms[click_y, click_x] == 0:
return None
ff_mask = rooms.copy()
fill_mask = np.zeros((h+2, w+2), np.uint8)
cv2.floodFill(ff_mask, fill_mask, (click_x, click_y), 128)
room_mask = (ff_mask == 128).astype(np.uint8) * 255
area = float(np.count_nonzero(room_mask))
if area < 100:
return None
contours, _ = cv2.findContours(room_mask, cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE)
if not contours:
return None
cnt = max(contours, key=cv2.contourArea)
bx, by, bw, bh = cv2.boundingRect(cnt)
M = cv2.moments(cnt)
cx = int(M["m10"]/M["m00"]) if M["m00"] else bx+bw//2
cy = int(M["m01"]/M["m00"]) if M["m00"] else by+bh//2
flat_seg = cnt[:,0,:].tolist()
flat_seg = [v for pt in flat_seg for v in pt]
new_id = max((r["id"] for r in existing_rooms), default=0) + 1
return {
"id" : new_id,
"label" : f"Room {new_id}",
"segmentation": [flat_seg],
"area" : area,
"bbox" : [bx, by, bw, bh],
"centroid" : [cx, cy],
"confidence" : 0.90,
"isWand" : True,
}
import os