import aiofiles import asyncio import base64 import io import json import os import shutil import time try: import tomllib except ImportError: try: import tomli as tomllib except ImportError: try: import tomlkit as tomllib except ImportError: raise ImportError( "No TOML library found. Please run on Python 3.11+, or run 'pip install tomli' to support Python 3.10." ) import uuid from pathlib import Path import cv2 import numpy as np import torch from fastapi import FastAPI, File, HTTPException, Response, UploadFile, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from fastapi.responses import StreamingResponse from fastapi.staticfiles import StaticFiles from PIL import Image from transformers import ( AutoImageProcessor, AutoModelForDepthEstimation, Mask2FormerForUniversalSegmentation, OneFormerForUniversalSegmentation, OneFormerProcessor, SegformerForSemanticSegmentation, ) ADE20K_CLASSES = [ "wall", "building", "sky", "floor", "tree", "ceiling", "road", "bed", "window", "grass", "cabinet", "sidewalk", "person", "ground", "door", "table", "mountain", "plant", "curtain", "chair", "car", "water", "painting", "sofa", "shelf", "house", "sea", "mirror", "rug", "field", "armchair", "seat", "fence", "desk", "rock", "wardrobe", "lamp", "bathtub", "railing", "cushion", "base", "box", "column", "signboard", "chest of drawers", "counter", "sand", "sink", "skyscraper", "fireplace", "refrigerator", "stairs", "runway", "bookcase", "blind", "coffee table", "toilet", "flower", "book", "hill", "bench", "countertop", "stove", "palm", "kitchen island", "computer", "swivel chair", "boat", "bar", "arcade machine", "hovel", "bus", "towel", "light", "truck", "tower", "chandelier", "awning", "streetlight", "booth", "television", "airplane", "dirt track", "apparel", "pole", "land", "bannister", "escalator", "ottoman", "bottle", "buffet", "poster", "stage", "van", "ship", "fountain", "conveyer belt", "canopy", "washer", "plaything", "swimming pool", "stool", "barrel", "basket", "waterfall", "tent", "bag", "minibike", "cradle", "oven", "ball", "food", "step", "tank", "trade name", "microwave", "pot", "animal", "bicycle", "lake", "dishwasher", "screen", "blanket", "sculpture", "hood", "sconce", "vase", "traffic light", "tray", "ashcan", "fan", "pier", "crt screen", "plate", "monitor", "bulletin board", "shower", "radiator", "glass", "clock", "flag", ] def load_config() -> dict: config_path = os.getenv("VISUALIZER_CONFIG") if not config_path: return {} path = Path(config_path).expanduser() if not path.is_absolute(): path = Path(__file__).resolve().parent / path if not path.exists(): raise RuntimeError(f"VISUALIZER_CONFIG does not exist: {path}") with path.open("rb") as config_file: return tomllib.load(config_file) CONFIG = load_config() def config_value(env_name: str, section: str, key: str, default): if env_name in os.environ: return os.environ[env_name] return CONFIG.get(section, {}).get(key, default) SEGMENTATION_MODEL = str( config_value("SEGMENTATION_MODEL", "models", "segmentation_model", "oneformer") ).lower() ONEFORMER_MODEL_NAME = str(config_value( "ONEFORMER_MODEL_NAME", "models", "oneformer_model_name", "shi-labs/oneformer_ade20k_swin_large", )) MASK2FORMER_MODEL_NAME = str(config_value( "MASK2FORMER_MODEL_NAME", "models", "mask2former_model_name", "facebook/mask2former-swin-small-ade-semantic", )) SEGFORMER_MODEL_NAME = str(config_value( "SEGFORMER_MODEL_NAME", "models", "segformer_model_name", "nvidia/segformer-b2-finetuned-ade-512-512", )) DEPTH_MODEL_NAME = str(config_value( "DEPTH_MODEL_NAME", "models", "depth_model_name", "Intel/dpt-large", )) ENABLE_DEPTH_ESTIMATION = str(config_value( "ENABLE_DEPTH_ESTIMATION", "runtime", "enable_depth_estimation", "1", )).lower() in {"1", "true", "yes", "on"} INTRINSIC_MODEL_VERSION = str(config_value( "INTRINSIC_MODEL_VERSION", "models", "intrinsic_model_version", "v2", )) ENABLE_INTRINSIC_SHADING = str(config_value( "ENABLE_INTRINSIC_SHADING", "runtime", "enable_intrinsic_shading", "0", )).lower() in {"1", "true", "yes", "on"} VISUALIZER_DATA_DIR = str(config_value( "VISUALIZER_DATA_DIR", "runtime", "data_dir", "data", )) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") seg_processor = None seg_model = None segmentation_backend = "segformer" depth_processor = None depth_model = None intrinsic_models = None def hf_offline() -> bool: return os.getenv("HF_HUB_OFFLINE") == "1" or os.getenv("TRANSFORMERS_OFFLINE") == "1" def _load_segmentation_model(): global seg_processor, seg_model, segmentation_backend if SEGMENTATION_MODEL == "oneformer": try: print(f"Loading OneFormer: {ONEFORMER_MODEL_NAME} ...", flush=True) seg_processor = OneFormerProcessor.from_pretrained( ONEFORMER_MODEL_NAME, local_files_only=hf_offline(), ) seg_model = OneFormerForUniversalSegmentation.from_pretrained( ONEFORMER_MODEL_NAME, local_files_only=hf_offline(), ).to(device) seg_model.eval() segmentation_backend = "oneformer" print("OneFormer loaded.", flush=True) return except Exception as exc: print(f"OneFormer failed ({exc}), falling back to Mask2Former.", flush=True) if SEGMENTATION_MODEL in {"oneformer", "mask2former"}: try: print(f"Loading Mask2Former: {MASK2FORMER_MODEL_NAME} ...", flush=True) seg_processor = AutoImageProcessor.from_pretrained( MASK2FORMER_MODEL_NAME, local_files_only=hf_offline(), ) seg_model = Mask2FormerForUniversalSegmentation.from_pretrained( MASK2FORMER_MODEL_NAME, local_files_only=hf_offline(), ).to(device) seg_model.eval() segmentation_backend = "mask2former" print("Mask2Former loaded.", flush=True) return except Exception as exc: print(f"Mask2Former failed ({exc}), falling back to SegFormer.", flush=True) print(f"Loading SegFormer: {SEGFORMER_MODEL_NAME} ...", flush=True) seg_processor = AutoImageProcessor.from_pretrained( SEGFORMER_MODEL_NAME, local_files_only=hf_offline(), ) seg_model = SegformerForSemanticSegmentation.from_pretrained( SEGFORMER_MODEL_NAME, local_files_only=hf_offline(), ).to(device) seg_model.eval() segmentation_backend = "segformer" print("SegFormer loaded.", flush=True) def _load_intrinsic_model(): global intrinsic_models if ENABLE_INTRINSIC_SHADING and intrinsic_models is None: try: print(f"Loading Intrinsic Image Decomposition model: {INTRINSIC_MODEL_VERSION} ...", flush=True) from intrinsic.pipeline import load_models intrinsic_models = load_models(INTRINSIC_MODEL_VERSION, device=str(device)) print("Intrinsic model loaded.", flush=True) except Exception as exc: print(f"Intrinsic model failed to load ({exc}). Falling back to luminance shading.", flush=True) app = FastAPI() app.add_middleware(GZipMiddleware, minimum_size=1000) app.add_middleware( CORSMiddleware, allow_origins=["https://room-editor-9y3b.vercel.app"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) DATA_DIR = Path(VISUALIZER_DATA_DIR).resolve() UPLOAD_DIR = DATA_DIR / "uploads" JOB_DIR = DATA_DIR / "jobs" UPLOAD_DIR.mkdir(parents=True, exist_ok=True) JOB_DIR.mkdir(parents=True, exist_ok=True) app.mount("/uploads", StaticFiles(directory=UPLOAD_DIR), name="uploads") PRIMARY_FLOOR_CLASSES = {"floor"} FLOOR_SURFACE_CLASSES = { "floor", "road", "sidewalk", "ground", "field", "grass", "sand", "runway", "dirt track", "land", "stairs", "step", } REJECT_SURFACE_CLASSES = { "wall", "ceiling", "building", "sky", "window", # Horizontal work surfaces — excluded before contour fill so the segmentation # model mis-labelling them as floor doesn't cause tile bleed onto counters/appliances. "countertop", "kitchen island", "stove", "refrigerator", "oven", "microwave", "dishwasher", "sink", } OCCLUDER_CLASSES = { "bed", "cabinet", "person", "door", "table", "plant", "curtain", "chair", "car", "painting", "sofa", "shelf", "mirror", "rug", "armchair", "seat", "desk", "wardrobe", "lamp", "bathtub", "railing", "cushion", "base", "box", "column", "chest of drawers", "counter", "fireplace", "bookcase", "blind", "coffee table", "toilet", "bench", "computer", "swivel chair", "bar", "ottoman", "bottle", "buffet", "poster", "towel", "television", "washer", "plaything", "stool", "basket", "bag", "cradle", "ball", "food", "blanket", "sculpture", "vase", "tray", "fan", "plate", "monitor", "shower", "radiator", "clock", } def class_name_for_id(class_id: int) -> str: return ADE20K_CLASSES[class_id] if class_id < len(ADE20K_CLASSES) else f"class_{class_id}" def class_ids(names: set[str]) -> list[int]: return [idx for idx, name in enumerate(ADE20K_CLASSES) if name in names] def estimate_depth(img: Image.Image, width: int, height: int): global depth_processor, depth_model if not ENABLE_DEPTH_ESTIMATION: return None model_name = DEPTH_MODEL_NAME try: if depth_processor is None or depth_model is None: print(f"Loading depth model: {model_name} ...", flush=True) depth_processor = AutoImageProcessor.from_pretrained( model_name, local_files_only=hf_offline(), ) depth_model = AutoModelForDepthEstimation.from_pretrained( model_name, local_files_only=hf_offline(), ).to(device) depth_model.eval() print("Depth model loaded.", flush=True) inputs = depth_processor(images=img, return_tensors="pt").to(device) with torch.no_grad(): outputs = depth_model(**inputs) depth = torch.nn.functional.interpolate( outputs.predicted_depth.unsqueeze(1), size=(height, width), mode="bicubic", align_corners=False, ).squeeze().cpu().numpy() depth = cv2.GaussianBlur(depth.astype(np.float32), (0, 0), sigmaX=3) depth_min, depth_max = float(np.min(depth)), float(np.max(depth)) if depth_max - depth_min < 1e-6: return None return (depth - depth_min) / (depth_max - depth_min) except Exception as exc: print(f"Depth estimation skipped ({exc}).", flush=True) return None # --------------------------------------------------------------------------- # B4 — Shade Range Expansion # Encode the shade multiplier using the actual brightness spread of the floor # rather than a hardcoded [0.55, 1.35] clip, so dark-room images preserve the # full dynamic range of their shadow patterns. # --------------------------------------------------------------------------- def _adaptive_shade_range(relative: np.ndarray, floor_mask: np.ndarray) -> tuple[float, float]: floor_vals = relative[floor_mask > 0] if floor_vals.size == 0: return (0.55, 1.35) lo = max(0.25, float(np.percentile(floor_vals, 1))) hi = min(2.5, float(np.percentile(floor_vals, 99))) span = hi - lo if span < 0.4: mid = (lo + hi) / 2.0 lo, hi = mid - 0.2, mid + 0.2 return lo, hi def _encode_shade(relative: np.ndarray, lo: float, hi: float) -> np.ndarray: span = hi - lo return np.round((np.clip(relative, lo, hi) - lo) * (255.0 / span)).clip(0, 255).astype(np.uint8) # --------------------------------------------------------------------------- # B1 — Shadow Map Extraction # Luminance-based shade map; returns (encoded_uint8, (lo, hi)) so the frontend # can decode with the correct range. # --------------------------------------------------------------------------- def build_shade_map( img_np: np.ndarray, surface_mask: np.ndarray ) -> tuple[np.ndarray | None, tuple[float, float]]: default_range = (0.55, 1.35) if not surface_mask.any(): return None, default_range mask = surface_mask.astype(np.uint8) luminance = ( img_np[:, :, 0].astype(np.float32) * 0.299 + img_np[:, :, 1].astype(np.float32) * 0.587 + img_np[:, :, 2].astype(np.float32) * 0.114 ) h, w = mask.shape[:2] floor_values = luminance[mask > 0] if floor_values.size < max(256, int(h * w * 0.002)): return None, default_range median_lum = float(np.median(floor_values)) if median_lum < 1e-3: return None, default_range filled = luminance.copy() filled[mask == 0] = median_lum missing = (mask == 0).astype(np.uint8) * 255 try: filled = cv2.inpaint( np.clip(filled, 0, 255).astype(np.uint8), missing, max(3, min(h, w) // 160), cv2.INPAINT_TELEA, ).astype(np.float32) except cv2.error: pass sigma = max(8.0, min(h, w) / 28.0) smooth = cv2.GaussianBlur(filled, (0, 0), sigmaX=sigma, sigmaY=sigma) relative = smooth / median_lum relative[mask == 0] = 1.0 lo, hi = _adaptive_shade_range(relative, mask) return _encode_shade(relative, lo, hi), (lo, hi) def build_intrinsic_shade_map( img_np: np.ndarray, surface_mask: np.ndarray ) -> tuple[np.ndarray | None, tuple[float, float]]: default_range = (0.55, 1.35) if not surface_mask.any() or intrinsic_models is None: return None, default_range try: img_float = img_np.astype(np.float32) / 255.0 from intrinsic.pipeline import run_pipeline results = run_pipeline(intrinsic_models, img_float, device=str(device)) shading = None if "gry_shd" in results: shading = results["gry_shd"] elif "dif_shd" in results: dif = results["dif_shd"] shading = dif[:, :, 0] * 0.299 + dif[:, :, 1] * 0.587 + dif[:, :, 2] * 0.114 else: for k in results.keys(): if "shd" in k or "shading" in k: shading = results[k] if len(shading.shape) == 3: shading = shading[:, :, 0] * 0.299 + shading[:, :, 1] * 0.587 + shading[:, :, 2] * 0.114 break if shading is None: return None, default_range h, w = surface_mask.shape[:2] if shading.shape[:2] != (h, w): shading = cv2.resize(shading, (w, h), interpolation=cv2.INTER_LINEAR) sigma = max(3.0, min(h, w) / 80.0) shading = cv2.GaussianBlur(shading.astype(np.float32), (0, 0), sigmaX=sigma, sigmaY=sigma) floor_vals = shading[surface_mask > 0] if floor_vals.size == 0: return None, default_range median_val = float(np.median(floor_vals)) if median_val < 1e-3: return None, default_range relative_shading = shading / median_val relative_shading[surface_mask == 0] = 1.0 lo, hi = _adaptive_shade_range(relative_shading, surface_mask) return _encode_shade(relative_shading, lo, hi), (lo, hi) except Exception as exc: print(f"Intrinsic shading decomposition failed: {exc}. Falling back to default luminance shading.", flush=True) return None, default_range # --------------------------------------------------------------------------- # B2 — Color Temperature # Sample the brightest floor pixels to infer the room's lighting colour cast # and approximate Kelvin value. Returns a dict with `kelvin` and `cast` # (normalised RGB multipliers) so the frontend can tint replacement tiles. # --------------------------------------------------------------------------- def estimate_color_temperature( img_np: np.ndarray, surface_mask: np.ndarray ) -> dict | None: if not surface_mask.any(): return None pixels = img_np[surface_mask > 0].astype(np.float32) if len(pixels) < 100: return None lum = pixels[:, 0] * 0.299 + pixels[:, 1] * 0.587 + pixels[:, 2] * 0.114 thresh = float(np.percentile(lum, 70)) bright = pixels[lum >= thresh] if len(bright) < 10: bright = pixels mr = float(np.mean(bright[:, 0])) mg = float(np.mean(bright[:, 1])) mb = float(np.mean(bright[:, 2])) ref = max(mr, mg, mb, 1e-3) rb = mr / max(mb, 1.0) if rb > 1.6: kelvin = 2700 elif rb > 1.3: kelvin = 3200 elif rb > 1.1: kelvin = 4000 elif rb > 0.9: kelvin = 5500 elif rb > 0.7: kelvin = 6500 else: kelvin = 8000 return { "kelvin": kelvin, "cast": {"r": round(mr / ref, 4), "g": round(mg / ref, 4), "b": round(mb / ref, 4)}, } # --------------------------------------------------------------------------- # B3 — Light Vector # Estimate the primary in-plane light direction from the gradient of the shade # map. Returns a normalised {x, y} vector pointing toward the light source. # --------------------------------------------------------------------------- def estimate_light_vector( shade_map: np.ndarray | None, surface_mask: np.ndarray ) -> dict | None: if shade_map is None or not surface_mask.any(): return None shade_f = shade_map.astype(np.float32) valid = surface_mask.astype(np.float32) kern = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) valid_e = cv2.erode(valid, kern, iterations=2) clean = shade_f * valid_e gx = cv2.Sobel(clean, cv2.CV_32F, 1, 0, ksize=15) * valid_e gy = cv2.Sobel(clean, cv2.CV_32F, 0, 1, ksize=15) * valid_e mag = np.hypot(gx, gy) total = float(mag.sum()) if total < 1e-6: return None lx = float((gx * mag).sum()) / total ly = float((gy * mag).sum()) / total norm = float(np.hypot(lx, ly)) if norm < 1e-6: return None return {"x": round(lx / norm, 4), "y": round(ly / norm, 4)} def clean_floor_mask(mask: np.ndarray) -> np.ndarray: if mask.dtype != np.uint8: mask = mask.astype(np.uint8) h, w = mask.shape[:2] min_side = max(3, min(h, w)) close_size = max(5, int(round(min_side * 0.018))) | 1 open_size = max(3, int(round(min_side * 0.006))) | 1 closed = cv2.morphologyEx( mask, cv2.MORPH_CLOSE, cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (close_size, close_size)), ) cleaned = cv2.morphologyEx( closed, cv2.MORPH_OPEN, cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (open_size, open_size)), ) count, labels, stats, _ = cv2.connectedComponentsWithStats(cleaned, connectivity=8) if count <= 1: return cleaned gravity_threshold = int(h * 0.60) min_area = max(1000, int(h * w * 0.01)) result = np.zeros_like(cleaned) for component_id in range(1, count): area = stats[component_id, cv2.CC_STAT_AREA] if area < min_area: continue comp_bottom = stats[component_id, cv2.CC_STAT_TOP] + stats[component_id, cv2.CC_STAT_HEIGHT] if comp_bottom <= gravity_threshold: continue result[labels == component_id] = 1 if result.any(): return result largest = 1 + int(np.argmax(stats[1:, cv2.CC_STAT_AREA])) return (labels == largest).astype(np.uint8) def wall_subtract(mask: np.ndarray, seg_map: np.ndarray, dilation: int = 1) -> np.ndarray: reject_raw = np.isin(seg_map, class_ids(REJECT_SURFACE_CLASSES)).astype(np.uint8) if dilation > 0: kern = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) reject_raw = cv2.dilate(reject_raw, kern, iterations=dilation) result = mask.copy() result[reject_raw > 0] = 0 return result def fit_floor_edges(mask: np.ndarray): h, w = mask.shape[:2] row_ys, lefts, rights = [], [], [] step = max(1, h // 260) for y in range(0, h, step): row_xs = np.where(mask[y] > 0)[0] if len(row_xs) < max(8, w * 0.01): continue row_ys.append(float(y)) lefts.append(float(np.percentile(row_xs, 3))) rights.append(float(np.percentile(row_xs, 97))) if len(row_ys) < 8: return None row_ys_np = np.asarray(row_ys, dtype=np.float32) return np.polyfit(row_ys_np, np.asarray(lefts, dtype=np.float32), 1), np.polyfit( row_ys_np, np.asarray(rights, dtype=np.float32), 1, ) # --------------------------------------------------------------------------- # B8 — Convex Hull Quad Fitting # Derive a tight bounding quadrilateral from the convex hull of the floor mask. # Used alongside the linear edge-fit quad so that corners of L-shaped rooms # and irregular floor boundaries are fully covered. # --------------------------------------------------------------------------- def convex_hull_quad(mask: np.ndarray) -> np.ndarray | None: ys, xs = np.where(mask > 0) if len(xs) < 50: return None pts = np.column_stack([xs, ys]).astype(np.float32) hull = cv2.convexHull(pts) if hull is None or len(hull) < 4: return None rect = cv2.minAreaRect(hull.squeeze()) box = cv2.boxPoints(rect) # (4, 2) — x,y columns h, w = mask.shape[:2] box[:, 0] = np.clip(box[:, 0], 0, w - 1) box[:, 1] = np.clip(box[:, 1], 0, h - 1) return box # --------------------------------------------------------------------------- # B6 — Dual Vanishing Point Detection # Detect two independent VPs: one from positive-slope lines (converging right) # and one from negative-slope lines (converging left), covering oblique shots # and corner-camera perspectives. # --------------------------------------------------------------------------- def detect_dual_vanishing_points( img_np: np.ndarray, floor_mask: np.ndarray ) -> tuple[dict | None, dict | None]: gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) gray = cv2.GaussianBlur(gray, (5, 5), 0) edges = cv2.Canny(gray, 60, 160) edges[floor_mask == 0] = 0 lines = cv2.HoughLinesP( edges, rho=1, theta=np.pi / 180, threshold=60, minLineLength=max(40, min(img_np.shape[:2]) // 16), maxLineGap=24, ) if lines is None: return None, None h, w = img_np.shape[:2] pos_lines, neg_lines = [], [] for line in lines[:, 0, :]: x1, y1, x2, y2 = [float(v) for v in line] dx, dy = x2 - x1, y2 - y1 length = float(np.hypot(dx, dy)) if length < 40 or abs(dx) < 1: continue slope = dy / dx if abs(slope) < 0.18: continue entry = (x1, y1, x2, y2, slope, length) if slope > 0: pos_lines.append(entry) else: neg_lines.append(entry) def _find_vp(group: list) -> dict | None: intersections = [] for i, (x1, y1, _, _, s1, l1) in enumerate(group): a1 = y1 - s1 * x1 for x3, y3, _, _, s2, l2 in group[i + 1:]: if abs(s1 - s2) < 0.08: continue denom = s1 - s2 if abs(denom) < 1e-9: continue x = (a2 := y3 - s2 * x3, (a2 - a1) / denom)[1] y = s1 * x + a1 if -w * 0.6 <= x <= w * 1.6 and -h * 1.2 <= y <= h * 1.0: intersections.append((x, y, min(l1, l2))) if len(intersections) < 3: return None pts = np.array([[p[0], p[1]] for p in intersections], np.float32) weights = np.array([p[2] for p in intersections], np.float32) center = np.average(pts, axis=0, weights=weights) dist = np.linalg.norm(pts - center, axis=1) keep = dist <= np.percentile(dist, 70) if keep.sum() >= 3: center = np.average(pts[keep], axis=0, weights=weights[keep]) return {"x": float(center[0]), "y": float(center[1])} vp_right = _find_vp(pos_lines) # positive-slope lines converge to the right vp_left = _find_vp(neg_lines) # negative-slope lines converge to the left # Primary VP = the one whose y is lower in the image (closer to the horizon) candidates = [(vp, abs(vp["y"])) for vp in [vp_right, vp_left] if vp is not None] if not candidates: return None, None candidates.sort(key=lambda t: t[1]) primary = candidates[0][0] secondary = candidates[1][0] if len(candidates) > 1 else None return primary, secondary def estimate_floor_plane(mask: np.ndarray, img_np: np.ndarray): ys, xs = np.where(mask > 0) if len(xs) < 1000: return None, None xs_f, ys_f = xs.astype(np.float32), ys.astype(np.float32) x1, x2 = float(np.percentile(xs_f, 1)), float(np.percentile(xs_f, 99)) y1, y2 = float(np.percentile(ys_f, 1)), float(np.percentile(ys_f, 99)) width, height = x2 - x1, y2 - y1 if width < 20 or height < 20: return None, None top_y = float(np.percentile(ys_f, 8)) bottom_y = float(np.percentile(ys_f, 97)) edge_fits = fit_floor_edges(mask) if edge_fits is None: return None, None left_fit, right_fit = edge_fits top_left = float(np.polyval(left_fit, top_y)) top_right = float(np.polyval(right_fit, top_y)) bottom_left = float(np.polyval(left_fit, bottom_y)) bottom_right = float(np.polyval(right_fit, bottom_y)) lower_xs = xs_f[ys_f >= np.percentile(ys_f, 80)] bottom_left = min(bottom_left, float(np.percentile(lower_xs, 4))) bottom_right = max(bottom_right, float(np.percentile(lower_xs, 96))) min_top_width = max(24.0, width * 0.18) top_center = (top_left + top_right) * 0.5 if top_right - top_left < min_top_width: top_left = top_center - min_top_width * 0.5 top_right = top_center + min_top_width * 0.5 min_bottom_width = max(min_top_width * 1.25, width * 0.45) bottom_center = (bottom_left + bottom_right) * 0.5 if bottom_right - bottom_left < min_bottom_width: bottom_left = bottom_center - min_bottom_width * 0.5 bottom_right = bottom_center + min_bottom_width * 0.5 h, w = mask.shape[:2] src = np.float32([ [np.clip(bottom_left, 0, w - 1), np.clip(bottom_y, 0, h - 1)], [np.clip(bottom_right, 0, w - 1), np.clip(bottom_y, 0, h - 1)], [np.clip(top_right, 0, w - 1), np.clip(top_y, 0, h - 1)], [np.clip(top_left, 0, w - 1), np.clip(top_y, 0, h - 1)], ]) # B6 — use dual VP; primary VP guides top-edge convergence vanishing_point, vanishing_point2 = detect_dual_vanishing_points(img_np, mask) if vanishing_point is not None and vanishing_point["y"] < bottom_y: vp_x = float(np.clip(vanishing_point["x"], -w * 0.25, w * 1.25)) top_width = max(src[2][0] - src[3][0], width * 0.16) horizon_gap = max(bottom_y - top_y, 1.0) convergence = np.clip((top_y - vanishing_point["y"]) / horizon_gap, 0.12, 0.75) top_center = top_center * (1 - convergence * 0.35) + vp_x * (convergence * 0.35) src[3][0] = np.clip(top_center - top_width * 0.5, 0, w - 1) src[2][0] = np.clip(top_center + top_width * 0.5, 0, w - 1) # B8 — expand src quad to cover convex hull corners not reached by linear fits hull_box = convex_hull_quad(mask) hull_quad_list = hull_box.flatten().tolist() if hull_box is not None else None if hull_box is not None: hull_bottom_y = float(np.max(hull_box[:, 1])) hull_top_y = float(np.min(hull_box[:, 1])) hull_left_x = float(np.min(hull_box[:, 0])) hull_right_x = float(np.max(hull_box[:, 0])) src[0][0] = min(src[0][0], hull_left_x) src[1][0] = max(src[1][0], hull_right_x) src[0][1] = src[1][1] = max(src[0][1], hull_bottom_y) src[2][1] = src[3][1] = min(src[2][1], hull_top_y) src = np.clip(src, [0, 0], [w - 1, h - 1]).astype(np.float32) if cv2.contourArea(src.reshape(-1, 1, 2)) < 100: return None, None dst = np.float32([[x1, y2], [x2, y2], [x2, y1], [x1, y1]]) homography = cv2.getPerspectiveTransform(src, dst).flatten().tolist() return homography, { "x": x1, "y": y1, "width": width, "height": height, "quad": src.flatten().tolist(), "hullQuad": hull_quad_list, # B8 "vanishingPoint": vanishing_point, # B6 primary "vanishingPoint2": vanishing_point2, # B6 secondary } # --------------------------------------------------------------------------- # B5 — Complement-Stamp Furniture # Use a single dilation pass (down from two) and restore the narrow contact # zone directly below each occluder so chair legs, table bases, and plant pots # sit flush against the tile surface without a visible gap or halo. # --------------------------------------------------------------------------- def build_floor_surface_mask( floor_mask: np.ndarray, seg_map: np.ndarray, quad: np.ndarray | None, depth: np.ndarray | None, ): h, w = floor_mask.shape[:2] kern_size = max(5, min(h, w) // 160) | 1 kern = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kern_size, kern_size)) occluder_mask = np.isin(seg_map, class_ids(OCCLUDER_CLASSES)).astype(np.uint8) # One dilation pass instead of two — keeps the occluder boundary tight so # furniture feet don't leave a visible halo on the replaced tile surface. occ_dilated = cv2.dilate(occluder_mask, kern, iterations=1) reject_mask = np.isin(seg_map, class_ids(REJECT_SURFACE_CLASSES)).astype(np.uint8) reject_dilated = cv2.dilate(reject_mask, kern, iterations=2) surface = floor_mask.copy() surface[reject_dilated > 0] = 0 if not surface.any(): surface = floor_mask.copy() contours, _ = cv2.findContours(surface, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if contours: filled = np.zeros((h, w), dtype=np.uint8) cv2.drawContours(filled, contours, -1, 1, cv2.FILLED) filled[reject_dilated > 0] = 0 surface = filled if quad is not None and surface.any(): plane_mask = np.zeros((h, w), dtype=np.uint8) cv2.fillConvexPoly(plane_mask, np.round(quad).astype(np.int32), 1) plane_mask[reject_dilated > 0] = 0 near_floor = cv2.dilate(surface, kern, iterations=6) surface = cv2.bitwise_or(surface, cv2.bitwise_and(plane_mask, near_floor)) surface[occ_dilated > 0] = 0 if depth is not None and floor_mask.any(): floor_depth = depth[floor_mask > 0] lo, hi = float(np.percentile(floor_depth, 2)), float(np.percentile(floor_depth, 98)) margin = max(0.08, (hi - lo) * 0.35) depth_keep = (depth >= lo - margin) & (depth <= hi + margin) surface = (surface & depth_keep.astype(np.uint8)).astype(np.uint8) surface[floor_mask > 0] = np.maximum(surface[floor_mask > 0], 1) surface[occ_dilated > 0] = 0 surface[reject_dilated > 0] = 0 surface = clean_floor_mask(surface) surface[occ_dilated > 0] = 0 surface[reject_dilated > 0] = 0 # Pull the outer boundary slightly INWARD instead of dilating it outward. # The previous outward dilation grew the mask onto adjacent wall/baseboard/ # rug pixels that segmentation didn't reject; combined with the confidence # feather those overshoot pixels turned semi-transparent and let the bright # background bleed through as a white halo. Eroding keeps the tile just # short of the true floor edge so the feather only ever blends tile->floor. boundary_kern = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)) surface = cv2.erode(surface, boundary_kern, iterations=1) # Hard-clamp to the segmented floor so the surface never tiles outside it. surface[floor_mask == 0] = 0 surface[occ_dilated > 0] = 0 surface[reject_dilated > 0] = 0 # Restore the narrow contact zone at the bottom edge of each occluder so # furniture touches the tile surface naturally (B5). contact_kern_v = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 3)) occ_eroded = cv2.erode(occluder_mask, contact_kern_v, iterations=1) occ_bottom_edge = cv2.subtract(occluder_mask, occ_eroded) contact_tiny = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)) contact_zone = cv2.dilate(occ_bottom_edge, contact_tiny, iterations=1) restore = cv2.bitwise_and(contact_zone, floor_mask) surface = cv2.bitwise_or(surface, restore) surface[reject_dilated > 0] = 0 return surface # --------------------------------------------------------------------------- # B10 — Confidence-Aware Boundaries # Distance-transform the surface mask so pixels near its edge get a low # confidence score. The frontend uses this to feather tile blending at # boundary transitions instead of a hard cut. # --------------------------------------------------------------------------- def build_confidence_map(surface_mask: np.ndarray) -> np.ndarray | None: if not surface_mask.any(): return None dist = cv2.distanceTransform(surface_mask.astype(np.uint8), cv2.DIST_L2, 5) # Narrow feather: a ~6px transition instead of ~26px. The wide feather made # a thick translucent ring at every boundary, which is what let the bright # background show through as a halo. Keep the blend confined to a thin edge. feather = max(4.0, min(surface_mask.shape[:2]) / 200.0) confidence = np.clip(dist / feather, 0.0, 1.0) return (confidence * 255).astype(np.uint8) # --------------------------------------------------------------------------- # B7 — Multi-Room Grid Alignment # Find all connected floor regions large enough to tile. All regions share # the primary region's homography so the tile grid continues seamlessly across # doorways without restarting. # --------------------------------------------------------------------------- def find_floor_regions(surface_mask: np.ndarray, min_area: int) -> list[np.ndarray]: count, labels, stats, _ = cv2.connectedComponentsWithStats( surface_mask.astype(np.uint8), connectivity=8 ) regions = [] for comp_id in range(1, count): if int(stats[comp_id, cv2.CC_STAT_AREA]) >= min_area: regions.append((labels == comp_id).astype(np.uint8)) regions.sort(key=lambda m: int(m.sum()), reverse=True) return regions def run_segmentation(img: Image.Image, img_np: np.ndarray): global seg_processor, seg_model if seg_model is None: _load_segmentation_model() h, w = img_np.shape[:2] if segmentation_backend == "oneformer": inputs = seg_processor( images=img, task_inputs=["semantic"], return_tensors="pt", ).to(device) with torch.no_grad(): outputs = seg_model(**inputs) result = seg_processor.post_process_semantic_segmentation( outputs, target_sizes=[(h, w)], )[0] return result.cpu().numpy().astype(np.uint8) if segmentation_backend == "mask2former": inputs = seg_processor(images=img, return_tensors="pt").to(device) with torch.no_grad(): outputs = seg_model(**inputs) is_panoptic = "panoptic" in MASK2FORMER_MODEL_NAME if is_panoptic: pan_result = seg_processor.post_process_panoptic_segmentation( outputs, target_sizes=[(h, w)], )[0] seg_map = np.zeros((h, w), dtype=np.uint8) pan_map = pan_result["segmentation"].cpu().numpy() for seg_info in pan_result["segments_info"]: seg_map[pan_map == seg_info["id"]] = min(seg_info["label_id"], 255) return seg_map result = seg_processor.post_process_semantic_segmentation( outputs, target_sizes=[(h, w)], )[0] return result.cpu().numpy().astype(np.uint8) inputs = seg_processor(images=img, return_tensors="pt").to(device) with torch.no_grad(): outputs = seg_model(**inputs) seg = outputs.logits.argmax(dim=1).squeeze().cpu().numpy() return cv2.resize(seg.astype(np.uint8), (w, h), interpolation=cv2.INTER_NEAREST) def segmenter_metadata_name() -> str: if segmentation_backend == "oneformer": return "oneformer-ade20k-swin-large" return segmentation_backend def build_segmentation_bundle(contents: bytes): t_start = time.perf_counter() t0 = time.perf_counter() img = Image.open(io.BytesIO(contents)).convert("RGB") MAX_DIM = 1280 if max(img.width, img.height) > MAX_DIM: scale = MAX_DIM / max(img.width, img.height) img = img.resize((int(img.width * scale), int(img.height * scale)), Image.LANCZOS) img_np = np.array(img) h, w = img_np.shape[:2] min_floor_area = max(1200, int(w * h * 0.015)) print(f"[TIMING] Image loading/parsing took {time.perf_counter() - t0:.3f} seconds", flush=True) t0 = time.perf_counter() seg_map = run_segmentation(img, img_np) print(f"[TIMING] Floor segmentation took {time.perf_counter() - t0:.3f} seconds", flush=True) t0 = time.perf_counter() success, jpeg_buf = cv2.imencode(".jpg", cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR), [cv2.IMWRITE_JPEG_QUALITY, 90]) pixels_b64 = base64.b64encode(jpeg_buf.tobytes()).decode() print(f"[TIMING] Image JPEG encoding took {time.perf_counter() - t0:.3f} seconds", flush=True) t0 = time.perf_counter() primary_floor_ids = class_ids(PRIMARY_FLOOR_CLASSES) floor_class_ids = class_ids(FLOOR_SURFACE_CLASSES) floor_mask = np.isin(seg_map, primary_floor_ids).astype(np.uint8) floor_mask = wall_subtract(floor_mask, seg_map, dilation=1) floor_mask = clean_floor_mask(floor_mask) if int(floor_mask.sum()) < min_floor_area: floor_mask = np.isin(seg_map, floor_class_ids).astype(np.uint8) floor_mask = wall_subtract(floor_mask, seg_map, dilation=1) floor_mask = clean_floor_mask(floor_mask) print(f"[TIMING] Floor masking/cleanup took {time.perf_counter() - t0:.3f} seconds", flush=True) t0 = time.perf_counter() depth = estimate_depth(img, w, h) print(f"[TIMING] Depth estimation took {time.perf_counter() - t0:.3f} seconds", flush=True) t0 = time.perf_counter() homography, plane = estimate_floor_plane(floor_mask, img_np) print(f"[TIMING] Plane fitting / homography calculation took {time.perf_counter() - t0:.3f} seconds", flush=True) t0 = time.perf_counter() quad = np.asarray(plane["quad"], dtype=np.float32).reshape(4, 2) if plane and plane.get("quad") else None surface_mask = build_floor_surface_mask(floor_mask, seg_map, quad, depth) print(f"[TIMING] Surface masking took {time.perf_counter() - t0:.3f} seconds", flush=True) t0 = time.perf_counter() shade_map, shade_range = None, (0.55, 1.35) if ENABLE_INTRINSIC_SHADING: if intrinsic_models is None: _load_intrinsic_model() if intrinsic_models is not None: shade_map, shade_range = build_intrinsic_shade_map(img_np, surface_mask) if shade_map is None: shade_map, shade_range = build_shade_map(img_np, surface_mask) print(f"[TIMING] Shade map construction took {time.perf_counter() - t0:.3f} seconds", flush=True) t0 = time.perf_counter() color_temperature = estimate_color_temperature(img_np, surface_mask) # B2 light_vector = estimate_light_vector(shade_map, surface_mask) # B3 confidence_map = build_confidence_map(surface_mask) # B10 print(f"[TIMING] Lighting analysis took {time.perf_counter() - t0:.3f} seconds", flush=True) # B7 — split the surface mask into connected regions; all share the same # homography so the tile grid is continuous across doorways. t0 = time.perf_counter() floor_regions = find_floor_regions(surface_mask, min_floor_area) multi_room = len(floor_regions) > 1 print(f"[TIMING] Floor region detection took {time.perf_counter() - t0:.3f} seconds", flush=True) t0 = time.perf_counter() segments = [] if floor_regions: for region_idx, region_mask in enumerate(floor_regions): region_indices = np.flatnonzero(region_mask.ravel()).astype(np.uint32) if len(region_indices) < min_floor_area: continue # Per-region confidence sub-map region_conf = build_confidence_map(region_mask) segments.append({ "id": region_idx, "className": "floor", "mask": base64.b64encode(region_indices.tobytes()).decode(), "homography": homography, # shared across all regions (B7) "plane": plane, "shadeMap": base64.b64encode(shade_map.tobytes()).decode() if shade_map is not None else None, "shadeRange": list(shade_range), # B4 — frontend decodes with this "colorTemperature": color_temperature, # B2 "lightVector": light_vector, # B3 "confidenceMap": base64.b64encode(region_conf.tobytes()).decode() if region_conf is not None else None, # B10 "multiRoom": multi_room, # B7 "gridGroup": "primary" if region_idx == 0 else f"room_{region_idx}", # B7 "metadata": { "segmenter": segmenter_metadata_name(), "floorPixels": int(floor_mask.sum()), "surfacePixels": int(region_mask.sum()), "depthEnabled": depth is not None, "shadingEnabled": shade_map is not None, }, }) if not segments: flat_seg = seg_map.ravel() for seg_id, class_id in enumerate(np.unique(flat_seg)): indices = np.where(flat_seg == class_id)[0].astype(np.uint32) if len(indices) < 1000: continue segments.append({ "id": int(seg_id), "className": class_name_for_id(int(class_id)), "mask": base64.b64encode(indices.tobytes()).decode(), "homography": None, "plane": None, "shadeMap": None, "shadeRange": None, "colorTemperature": None, "lightVector": None, "confidenceMap": None, "multiRoom": False, "gridGroup": None, "metadata": { "segmenter": segmenter_metadata_name(), "depthEnabled": depth is not None, "shadingEnabled": False, }, }) print(f"[TIMING] Total bundle processing completed in {time.perf_counter() - t_start:.3f} seconds", flush=True) return {"width": w, "height": h, "pixels": pixels_b64, "segments": segments} def job_path(job_id: str) -> Path: return JOB_DIR / f"{job_id}.json" def read_job(job_id: str): path = job_path(job_id) if not path.exists(): raise HTTPException(status_code=404, detail="Job not found.") return json.loads(path.read_text()) def write_job(job: dict): job_path(job["id"]).write_text(json.dumps(job)) def run_conversion_task(job_id: str, upload_path: Path): try: t_start = time.perf_counter() image_bytes = upload_path.read_bytes() bundle = build_segmentation_bundle(image_bytes) (JOB_DIR / f"{job_id}.bundle.json").write_text(json.dumps(bundle)) job = read_job(job_id) job["status"] = "COMPLETED" write_job(job) print(f"[TIMING] Background conversion task for job {job_id} took {time.perf_counter() - t_start:.3f} seconds", flush=True) except Exception as exc: print(f"Background conversion failed: {exc}", flush=True) try: job = read_job(job_id) job["status"] = "FAILED" job["error"] = str(exc) write_job(job) except Exception: pass @app.post("/viz2d/convert") async def convert_to_viz2d(background_tasks: BackgroundTasks, file: UploadFile = File(...)): if file.content_type and not file.content_type.startswith("image/"): raise HTTPException(status_code=400, detail="Upload must be a JPG or PNG image.") job_id = uuid.uuid4().hex ext = Path(file.filename or "room.jpg").suffix.lower() if ext not in {".jpg", ".jpeg", ".png", ".webp"}: ext = ".jpg" upload_path = UPLOAD_DIR / f"{job_id}{ext}" with upload_path.open("wb") as out: shutil.copyfileobj(file.file, out) job = { "id": job_id, "status": "PROCESSING", "inputUrl": f"/uploads/{upload_path.name}", "outputUrl": f"/viz2d/jobs/{job_id}/file", } write_job(job) background_tasks.add_task(run_conversion_task, job_id, upload_path) return job @app.get("/viz2d/jobs/{job_id}") async def viz2d_job_status(job_id: str): return read_job(job_id) @app.get("/viz2d/jobs/{job_id}/file") async def viz2d_job_file(job_id: str): job = read_job(job_id) if job.get("status") != "COMPLETED": raise HTTPException(status_code=409, detail="Job is not completed yet.") bundle_path = JOB_DIR / f"{job_id}.bundle.json" if not bundle_path.exists(): raise HTTPException(status_code=404, detail="Job output not found.") async def iter_file(): async with aiofiles.open(bundle_path, "rb") as f: data = await f.read() yield data return StreamingResponse(iter_file(), media_type="application/json") @app.post("/segment") async def segment(file: UploadFile = File(...)): contents = await file.read() return build_segmentation_bundle(contents) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8002)