import asyncio import base64 import io import json import os import shutil import time try: import tomllib except ImportError: try: import tomli as tomllib except ImportError: try: import tomlkit as tomllib except ImportError: raise ImportError( "No TOML library found. Please run on Python 3.11+, or run 'pip install tomli' to support Python 3.10." ) import uuid from concurrent.futures import ThreadPoolExecutor from pathlib import Path import cv2 import numpy as np import torch from fastapi import FastAPI, File, HTTPException, Response, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from PIL import Image, ImageOps from transformers import ( AutoImageProcessor, AutoModelForDepthEstimation, Mask2FormerForUniversalSegmentation, OneFormerForUniversalSegmentation, OneFormerProcessor, SegformerForSemanticSegmentation, ) ADE20K_CLASSES = [ "wall", "building", "sky", "floor", "tree", "ceiling", "road", "bed", "window", "grass", "cabinet", "sidewalk", "person", "ground", "door", "table", "mountain", "plant", "curtain", "chair", "car", "water", "painting", "sofa", "shelf", "house", "sea", "mirror", "rug", "field", "armchair", "seat", "fence", "desk", "rock", "wardrobe", "lamp", "bathtub", "railing", "cushion", "base", "box", "column", "signboard", "chest of drawers", "counter", "sand", "sink", "skyscraper", "fireplace", "refrigerator", "stairs", "runway", "bookcase", "blind", "coffee table", "toilet", "flower", "book", "hill", "bench", "countertop", "stove", "palm", "kitchen island", "computer", "swivel chair", "boat", "bar", "arcade machine", "hovel", "bus", "towel", "light", "truck", "tower", "chandelier", "awning", "streetlight", "booth", "television", "airplane", "dirt track", "apparel", "pole", "land", "bannister", "escalator", "ottoman", "bottle", "buffet", "poster", "stage", "van", "ship", "fountain", "conveyer belt", "canopy", "washer", "plaything", "swimming pool", "stool", "barrel", "basket", "waterfall", "tent", "bag", "minibike", "cradle", "oven", "ball", "food", "step", "tank", "trade name", "microwave", "pot", "animal", "bicycle", "lake", "dishwasher", "screen", "blanket", "sculpture", "hood", "sconce", "vase", "traffic light", "tray", "ashcan", "fan", "pier", "crt screen", "plate", "monitor", "bulletin board", "shower", "radiator", "glass", "clock", "flag", ] def load_config() -> dict: config_path = os.getenv("VISUALIZER_CONFIG") if not config_path: return {} path = Path(config_path).expanduser() if not path.is_absolute(): path = Path(__file__).resolve().parent / path if not path.exists(): raise RuntimeError(f"VISUALIZER_CONFIG does not exist: {path}") with path.open("rb") as config_file: return tomllib.load(config_file) CONFIG = load_config() def config_value(env_name: str, section: str, key: str, default): if env_name in os.environ: return os.environ[env_name] return CONFIG.get(section, {}).get(key, default) SEGMENTATION_MODEL = str( config_value("SEGMENTATION_MODEL", "models", "segmentation_model", "oneformer") ).lower() ONEFORMER_MODEL_NAME = str(config_value( "ONEFORMER_MODEL_NAME", "models", "oneformer_model_name", "shi-labs/oneformer_ade20k_swin_large", )) MASK2FORMER_MODEL_NAME = str(config_value( "MASK2FORMER_MODEL_NAME", "models", "mask2former_model_name", "facebook/mask2former-swin-small-ade-semantic", )) SEGFORMER_MODEL_NAME = str(config_value( "SEGFORMER_MODEL_NAME", "models", "segformer_model_name", "nvidia/segformer-b2-finetuned-ade-512-512", )) DEPTH_MODEL_NAME = str(config_value( "DEPTH_MODEL_NAME", "models", "depth_model_name", "Intel/dpt-large", )) ENABLE_DEPTH_ESTIMATION = str(config_value( "ENABLE_DEPTH_ESTIMATION", "runtime", "enable_depth_estimation", "1", )).lower() in {"1", "true", "yes", "on"} INTRINSIC_MODEL_VERSION = str(config_value( "INTRINSIC_MODEL_VERSION", "models", "intrinsic_model_version", "v2", )) ENABLE_INTRINSIC_SHADING = str(config_value( "ENABLE_INTRINSIC_SHADING", "runtime", "enable_intrinsic_shading", "0", )).lower() in {"1", "true", "yes", "on"} VISUALIZER_DATA_DIR = str(config_value( "VISUALIZER_DATA_DIR", "runtime", "data_dir", "data", )) # Use the depth map (when available) to estimate the floor plane / homography # instead of the geometric edge-fit. The edge-fit only sees perspective when the # floor's side boundaries converge inside the frame, so it badly under-estimates # perspective for floors that fill the frame. The depth-based fit recovers true # foreshortening from the depth gradient. Edge-fit remains the fallback. FLOOR_PLANE_FROM_DEPTH = str(config_value( "FLOOR_PLANE_FROM_DEPTH", "runtime", "floor_plane_from_depth", "1", )).lower() in {"1", "true", "yes", "on"} # Where to place the floor's horizon relative to the visible depth range, as a # fraction of that range beyond the farthest visible floor pixel. Smaller -> the # horizon sits closer to the visible floor -> stronger foreshortening. Larger -> # milder perspective. This is the main knob for tuning perspective strength. FLOOR_HORIZON_GAMMA = float(config_value( "FLOOR_HORIZON_GAMMA", "runtime", "floor_horizon_gamma", "0.25", )) # Assumed pinhole focal length as a multiple of the image's long side. Only # affects the cross-floor (X) scale / tile aspect, not foreshortening strength. FLOOR_FOCAL_FACTOR = float(config_value( "FLOOR_FOCAL_FACTOR", "runtime", "floor_focal_factor", "1.0", )) # Number of concurrent inference jobs. The segmentation/depth models are shared, # global, and not thread-safe, and a single GPU/CPU can only run one at a time, # so this defaults to 1 (fully serialized). Raise only with per-worker models. INFERENCE_WORKERS = max(1, int(config_value( "INFERENCE_WORKERS", "runtime", "inference_workers", "1", ))) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") seg_processor = None seg_model = None segmentation_backend = "segformer" depth_processor = None depth_model = None intrinsic_models = None def hf_offline() -> bool: return os.getenv("HF_HUB_OFFLINE") == "1" or os.getenv("TRANSFORMERS_OFFLINE") == "1" def _load_segmentation_model(): global seg_processor, seg_model, segmentation_backend if SEGMENTATION_MODEL == "oneformer": try: print(f"Loading OneFormer: {ONEFORMER_MODEL_NAME} ...", flush=True) start_time = time.perf_counter() seg_processor = OneFormerProcessor.from_pretrained( ONEFORMER_MODEL_NAME, local_files_only=hf_offline(), ) seg_model = OneFormerForUniversalSegmentation.from_pretrained( ONEFORMER_MODEL_NAME, local_files_only=hf_offline(), ).to(device) seg_model.eval() segmentation_backend = "oneformer" print(f"OneFormer loaded in {time.perf_counter() - start_time:.4f}s.", flush=True) return except Exception as exc: print(f"OneFormer failed ({exc}), falling back to Mask2Former.", flush=True) if SEGMENTATION_MODEL in {"oneformer", "mask2former"}: try: print(f"Loading Mask2Former: {MASK2FORMER_MODEL_NAME} ...", flush=True) start_time = time.perf_counter() seg_processor = AutoImageProcessor.from_pretrained( MASK2FORMER_MODEL_NAME, local_files_only=hf_offline(), ) seg_model = Mask2FormerForUniversalSegmentation.from_pretrained( MASK2FORMER_MODEL_NAME, local_files_only=hf_offline(), ).to(device) seg_model.eval() segmentation_backend = "mask2former" print(f"Mask2Former loaded in {time.perf_counter() - start_time:.4f}s.", flush=True) return except Exception as exc: print(f"Mask2Former failed ({exc}), falling back to SegFormer.", flush=True) print(f"Loading SegFormer: {SEGFORMER_MODEL_NAME} ...", flush=True) start_time = time.perf_counter() seg_processor = AutoImageProcessor.from_pretrained( SEGFORMER_MODEL_NAME, local_files_only=hf_offline(), ) seg_model = SegformerForSemanticSegmentation.from_pretrained( SEGFORMER_MODEL_NAME, local_files_only=hf_offline(), ).to(device) seg_model.eval() segmentation_backend = "segformer" print(f"SegFormer loaded in {time.perf_counter() - start_time:.4f}s.", flush=True) _load_segmentation_model() def _load_intrinsic_model(): global intrinsic_models if ENABLE_INTRINSIC_SHADING and intrinsic_models is None: try: print(f"Loading Intrinsic Image Decomposition model: {INTRINSIC_MODEL_VERSION} ...", flush=True) start_time = time.perf_counter() from intrinsic.pipeline import load_models intrinsic_models = load_models(INTRINSIC_MODEL_VERSION, device=str(device)) print(f"Intrinsic model loaded in {time.perf_counter() - start_time:.4f}s.", flush=True) except Exception as exc: print(f"Intrinsic model failed to load ({exc}). Falling back to luminance shading.", flush=True) _load_intrinsic_model() def _load_depth_model(): global depth_processor, depth_model if ENABLE_DEPTH_ESTIMATION and (depth_processor is None or depth_model is None): try: model_name = DEPTH_MODEL_NAME print(f"Loading depth model: {model_name} ...", flush=True) start_time = time.perf_counter() depth_processor = AutoImageProcessor.from_pretrained( model_name, local_files_only=hf_offline(), ) depth_model = AutoModelForDepthEstimation.from_pretrained( model_name, local_files_only=hf_offline(), ).to(device) depth_model.eval() print(f"Depth model loaded in {time.perf_counter() - start_time:.4f}s.", flush=True) except Exception as exc: print(f"Depth model failed to load ({exc}).", flush=True) _load_depth_model() app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) DATA_DIR = Path(VISUALIZER_DATA_DIR).resolve() UPLOAD_DIR = DATA_DIR / "uploads" JOB_DIR = DATA_DIR / "jobs" UPLOAD_DIR.mkdir(parents=True, exist_ok=True) JOB_DIR.mkdir(parents=True, exist_ok=True) app.mount("/uploads", StaticFiles(directory=UPLOAD_DIR), name="uploads") # All heavy model inference runs here, off the event loop and serialized to # INFERENCE_WORKERS (default 1) so the shared, non-thread-safe models are never # run concurrently. Both /segment and the background /viz2d/convert jobs submit # to this single pool, bounding total in-flight inference regardless of traffic. INFERENCE_POOL = ThreadPoolExecutor( max_workers=INFERENCE_WORKERS, thread_name_prefix="inference", ) PRIMARY_FLOOR_CLASSES = {"floor"} FLOOR_SURFACE_CLASSES = { "floor", "road", "sidewalk", "ground", "field", "grass", "sand", "runway", "dirt track", "land", "stairs", "step", } REJECT_SURFACE_CLASSES = {"wall", "ceiling", "building", "sky", "window"} OCCLUDER_CLASSES = { "bed", "cabinet", "person", "door", "table", "plant", "curtain", "chair", "car", "painting", "sofa", "shelf", "mirror", "rug", "armchair", "seat", "desk", "wardrobe", "lamp", "bathtub", "railing", "cushion", "base", "box", "column", "chest of drawers", "counter", "sink", "fireplace", "refrigerator", "bookcase", "blind", "coffee table", "toilet", "bench", "countertop", "stove", "kitchen island", "computer", "swivel chair", "bar", "ottoman", "bottle", "buffet", "poster", "towel", "television", "washer", "plaything", "stool", "basket", "bag", "cradle", "oven", "ball", "food", "microwave", "pot", "dishwasher", "blanket", "sculpture", "vase", "tray", "fan", "plate", "monitor", "shower", "radiator", "clock", } def class_name_for_id(class_id: int) -> str: return ADE20K_CLASSES[class_id] if class_id < len(ADE20K_CLASSES) else f"class_{class_id}" def class_ids(names: set[str]) -> list[int]: return [idx for idx, name in enumerate(ADE20K_CLASSES) if name in names] def estimate_depth(img: Image.Image, width: int, height: int, task_id: str = "segment"): global depth_processor, depth_model if not ENABLE_DEPTH_ESTIMATION: return None model_name = DEPTH_MODEL_NAME print(f"[{task_id}] Starting depth estimation...", flush=True) start_time = time.perf_counter() try: if depth_processor is None or depth_model is None: print(f"[{task_id}] Loading depth model: {model_name} ...", flush=True) start_load = time.perf_counter() depth_processor = AutoImageProcessor.from_pretrained( model_name, local_files_only=hf_offline(), ) depth_model = AutoModelForDepthEstimation.from_pretrained( model_name, local_files_only=hf_offline(), ).to(device) depth_model.eval() print(f"[{task_id}] Depth model loaded in {time.perf_counter() - start_load:.4f}s.", flush=True) inputs = depth_processor(images=img, return_tensors="pt").to(device) with torch.no_grad(): outputs = depth_model(**inputs) depth = torch.nn.functional.interpolate( outputs.predicted_depth.unsqueeze(1), size=(height, width), mode="bicubic", align_corners=False, ).squeeze().cpu().numpy() depth = cv2.GaussianBlur(depth.astype(np.float32), (0, 0), sigmaX=3) depth_min, depth_max = float(np.min(depth)), float(np.max(depth)) duration = time.perf_counter() - start_time print(f"[{task_id}] Depth estimation completed in {duration:.4f}s", flush=True) if depth_max - depth_min < 1e-6: return None return (depth - depth_min) / (depth_max - depth_min) except Exception as exc: print(f"[{task_id}] Depth estimation skipped ({exc}).", flush=True) return None def build_shade_map(img_np: np.ndarray, surface_mask: np.ndarray, task_id: str = "segment") -> np.ndarray | None: if not surface_mask.any(): return None print(f"[{task_id}] Starting shade map build...", flush=True) start_time = time.perf_counter() mask = surface_mask.astype(np.uint8) luminance = ( img_np[:, :, 0].astype(np.float32) * 0.299 + img_np[:, :, 1].astype(np.float32) * 0.587 + img_np[:, :, 2].astype(np.float32) * 0.114 ) h, w = mask.shape[:2] floor_values = luminance[mask > 0] if floor_values.size < max(256, int(h * w * 0.002)): return None median_lum = float(np.median(floor_values)) if median_lum < 1e-3: return None filled = luminance.copy() filled[mask == 0] = median_lum missing = (mask == 0).astype(np.uint8) * 255 try: filled = cv2.inpaint( np.clip(filled, 0, 255).astype(np.uint8), missing, max(3, min(h, w) // 160), cv2.INPAINT_TELEA, ).astype(np.float32) except cv2.error: pass sigma = max(8.0, min(h, w) / 28.0) smooth = cv2.GaussianBlur(filled, (0, 0), sigmaX=sigma, sigmaY=sigma) shade = np.clip(smooth / median_lum, 0.55, 1.35) shade[mask == 0] = 1.0 result = np.round((shade - 0.55) * (255.0 / 0.80)).clip(0, 255).astype(np.uint8) duration = time.perf_counter() - start_time print(f"[{task_id}] Shade map built in {duration:.4f}s", flush=True) return result def build_intrinsic_shade_map(img_np: np.ndarray, surface_mask: np.ndarray, task_id: str = "segment") -> np.ndarray | None: if not surface_mask.any() or intrinsic_models is None: return None print(f"[{task_id}] Starting intrinsic shade map build...", flush=True) start_time = time.perf_counter() try: # Convert image to float32 range [0.0, 1.0] as expected by compphoto/Intrinsic img_float = img_np.astype(np.float32) / 255.0 # Run pipeline from intrinsic.pipeline import run_pipeline # Use CPU/CUDA device string results = run_pipeline(intrinsic_models, img_float, stage=1, device=str(device)) # Extract shading map shading = None if "gry_shd" in results: shading = results["gry_shd"] elif "dif_shd" in results: # If 3-channel diffuse shading, convert to grayscale luminance dif = results["dif_shd"] shading = dif[:, :, 0] * 0.299 + dif[:, :, 1] * 0.587 + dif[:, :, 2] * 0.114 else: # Check other fallback keys matching "shd" or "shading" for k in results.keys(): if "shd" in k or "shading" in k: shading = results[k] if len(shading.shape) == 3: shading = shading[:, :, 0] * 0.299 + shading[:, :, 1] * 0.587 + shading[:, :, 2] * 0.114 break if shading is None: return None # Resize shading to match original image size if there's any mismatch h, w = surface_mask.shape[:2] if shading.shape[:2] != (h, w): shading = cv2.resize(shading, (w, h), interpolation=cv2.INTER_LINEAR) # Smooth the shading map to eliminate high-frequency grout lines/patterns # while preserving broad ambient shadows (e.g. plant shadows). sigma = max(3.0, min(h, w) / 80.0) shading = cv2.GaussianBlur(shading.astype(np.float32), (0, 0), sigmaX=sigma, sigmaY=sigma) floor_vals = shading[surface_mask > 0] if floor_vals.size == 0: return None median_val = float(np.median(floor_vals)) if median_val < 1e-3: return None # Divide by median to get relative shading multiplier (1.0 is neutral) relative_shading = shading / median_val # Clip relative shading to [0.55, 1.35] relative_shading = np.clip(relative_shading, 0.55, 1.35) # Non-floor pixels are neutral (1.0) relative_shading[surface_mask == 0] = 1.0 # Encode to [0, 255] byte range matching the frontend result = np.round((relative_shading - 0.55) * (255.0 / 0.80)).clip(0, 255).astype(np.uint8) duration = time.perf_counter() - start_time print(f"[{task_id}] Intrinsic shade map built in {duration:.4f}s", flush=True) return result except Exception as exc: print(f"[{task_id}] Intrinsic shading decomposition failed: {exc}. Falling back to default luminance shading.", flush=True) return None def clean_floor_mask(mask: np.ndarray) -> np.ndarray: if mask.dtype != np.uint8: mask = mask.astype(np.uint8) h, w = mask.shape[:2] min_side = max(3, min(h, w)) close_size = max(5, int(round(min_side * 0.018))) | 1 open_size = max(3, int(round(min_side * 0.006))) | 1 closed = cv2.morphologyEx( mask, cv2.MORPH_CLOSE, cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (close_size, close_size)), ) cleaned = cv2.morphologyEx( closed, cv2.MORPH_OPEN, cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (open_size, open_size)), ) count, labels, stats, _ = cv2.connectedComponentsWithStats(cleaned, connectivity=8) if count <= 1: return cleaned gravity_threshold = int(h * 0.60) min_area = max(1000, int(h * w * 0.01)) result = np.zeros_like(cleaned) for component_id in range(1, count): area = stats[component_id, cv2.CC_STAT_AREA] if area < min_area: continue comp_bottom = stats[component_id, cv2.CC_STAT_TOP] + stats[component_id, cv2.CC_STAT_HEIGHT] if comp_bottom <= gravity_threshold: continue result[labels == component_id] = 1 if result.any(): return result largest = 1 + int(np.argmax(stats[1:, cv2.CC_STAT_AREA])) return (labels == largest).astype(np.uint8) def wall_subtract(mask: np.ndarray, seg_map: np.ndarray, dilation: int = 1) -> np.ndarray: reject_raw = np.isin(seg_map, class_ids(REJECT_SURFACE_CLASSES)).astype(np.uint8) if dilation > 0: kern = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) reject_raw = cv2.dilate(reject_raw, kern, iterations=dilation) result = mask.copy() result[reject_raw > 0] = 0 return result def fit_floor_edges(mask: np.ndarray): h, w = mask.shape[:2] row_ys, lefts, rights = [], [], [] step = max(1, h // 260) for y in range(0, h, step): row_xs = np.where(mask[y] > 0)[0] if len(row_xs) < max(8, w * 0.01): continue row_ys.append(float(y)) lefts.append(float(np.percentile(row_xs, 3))) rights.append(float(np.percentile(row_xs, 97))) if len(row_ys) < 8: return None row_ys_np = np.asarray(row_ys, dtype=np.float32) return np.polyfit(row_ys_np, np.asarray(lefts, dtype=np.float32), 1), np.polyfit( row_ys_np, np.asarray(rights, dtype=np.float32), 1, ) def detect_vanishing_point(img_np: np.ndarray, floor_mask: np.ndarray): gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) gray = cv2.GaussianBlur(gray, (5, 5), 0) edges = cv2.Canny(gray, 60, 160) edges[floor_mask == 0] = 0 lines = cv2.HoughLinesP( edges, rho=1, theta=np.pi / 180, threshold=60, minLineLength=max(40, min(img_np.shape[:2]) // 16), maxLineGap=24, ) if lines is None: return None h, w = img_np.shape[:2] candidates = [] for line in lines[:, 0, :]: x1, y1, x2, y2 = [float(v) for v in line] dx, dy = x2 - x1, y2 - y1 length = float(np.hypot(dx, dy)) if length < 40 or abs(dx) < 1: continue slope = dy / dx if abs(slope) >= 0.18: candidates.append((x1, y1, x2, y2, slope, length)) intersections = [] for i, (x1, y1, _, _, s1, l1) in enumerate(candidates): a1 = y1 - s1 * x1 for x3, y3, _, _, s2, l2 in candidates[i + 1:]: if s1 * s2 > 0 or abs(s1 - s2) < 0.12: continue a2 = y3 - s2 * x3 x = (a2 - a1) / (s1 - s2) y = s1 * x + a1 if -w * 0.5 <= x <= w * 1.5 and -h <= y <= h * 0.95: intersections.append((x, y, min(l1, l2))) if not intersections: return None pts = np.asarray([[p[0], p[1]] for p in intersections], dtype=np.float32) weights = np.asarray([p[2] for p in intersections], dtype=np.float32) center = np.average(pts, axis=0, weights=weights) dist = np.linalg.norm(pts - center, axis=1) keep = dist <= np.percentile(dist, 70) if keep.sum() >= 3: center = np.average(pts[keep], axis=0, weights=weights[keep]) return {"x": float(center[0]), "y": float(center[1])} def _fit_disparity_plane(xs: np.ndarray, ys: np.ndarray, d: np.ndarray): """Least-squares fit of d ~= a*x + b*y + c with one robust reweighting pass. For a planar floor the (inverse-)depth a DPT-style model produces is an affine function of image coordinates, so this recovers the plane directly. """ A = np.stack([xs, ys, np.ones_like(xs)], axis=1) coeffs, *_ = np.linalg.lstsq(A, d, rcond=None) resid = d - A @ coeffs scale = float(np.median(np.abs(resid))) + 1e-9 keep = np.abs(resid) < 3.0 * scale if int(keep.sum()) > A.shape[1] * 8: coeffs, *_ = np.linalg.lstsq(A[keep], d[keep], rcond=None) return float(coeffs[0]), float(coeffs[1]), float(coeffs[2]) def estimate_floor_plane_from_depth(mask: np.ndarray, depth: np.ndarray, task_id: str = "segment"): """Estimate the floor homography from the depth map. A planar floor's inverse-depth d is affine in pixel coords, d = a*u + b*v + c, so the map image-pixel -> world floor coords (X, Z) is the homography H = [[1/f, 0, -cx/f], [0, 0, 1 ], [a, b, c-b ]] (b = horizon offset beta) where Z = 1/(a*u+b*v+(c-beta)) is world depth and X = (u-cx)/(f*Z) is the world cross-floor coordinate. Tiling uniformly in (X, Z) yields correct perspective foreshortening. Returns (homography_flat, plane) in the same format as estimate_floor_plane, or (None, None) if depth is uninformative. """ print(f"[{task_id}] Starting depth-based floor plane estimation...", flush=True) start_time = time.perf_counter() h, w = mask.shape[:2] ys, xs = np.where(mask > 0) if len(xs) < 1000: return None, None xs_f = xs.astype(np.float64) ys_f = ys.astype(np.float64) d_all = depth[ys, xs].astype(np.float64) # Subsample for the fit to keep it fast on full floors. if len(xs_f) > 150000: idx = np.linspace(0, len(xs_f) - 1, 150000).astype(np.int64) a, b, c = _fit_disparity_plane(xs_f[idx], ys_f[idx], d_all[idx]) else: a, b, c = _fit_disparity_plane(xs_f, ys_f, d_all) if np.hypot(a, b) < 1e-9: # Disparity is ~constant across the floor: no usable perspective signal. print(f"[{task_id}] Depth plane fit degenerate (flat disparity), falling back.", flush=True) return None, None # Predicted disparity over the floor; place the horizon GAMMA*range beyond the # farthest visible pixel so the denominator stays strictly positive everywhere. d_pred = a * xs_f + b * ys_f + c d_min = float(d_pred.min()) d_range = float(np.percentile(d_pred, 99.5) - np.percentile(d_pred, 0.5)) if d_range < 1e-9: return None, None beta = d_min - max(FLOOR_HORIZON_GAMMA, 1e-3) * d_range cc = c - beta cx, cy = w * 0.5, h * 0.5 f = max(FLOOR_FOCAL_FACTOR * max(w, h), 1.0) H = np.array([ [1.0 / f, 0.0, -cx / f], [0.0, 0.0, 1.0], [a, b, cc], ], dtype=np.float64) denom = a * xs_f + b * ys_f + cc if float(denom.min()) <= 1e-6: # Numerical safety: keep every floor pixel strictly in front of the horizon. return None, None fx = (xs_f / f - cx / f) / denom fy = 1.0 / denom x1, x2 = float(np.percentile(fx, 1)), float(np.percentile(fx, 99)) y1, y2 = float(np.percentile(fy, 1)), float(np.percentile(fy, 99)) width, height = x2 - x1, y2 - y1 if width < 1e-9 or height < 1e-9: return None, None # The rectified coords above are in world/disparity units (tens), but the # frontend sizes tiles from plane.width/height assuming image-pixel scale # (like the legacy edge-fit). Without this the tiles come out enormous. # Rescale the homography output (rows 0/1) and the plane rect to ~pixel scale. target = float(max(w, h)) k = target / max(width, height) H[0, :] *= k H[1, :] *= k x1, x2, y1, y2 = x1 * k, x2 * k, y1 * k, y2 * k width, height = width * k, height * k # Image-space quad = rectified bbox corners mapped back through H^-1. try: h_inv = np.linalg.inv(H) except np.linalg.LinAlgError: return None, None rect = np.array([ [x1, y2, 1.0], [x2, y2, 1.0], [x2, y1, 1.0], [x1, y1, 1.0], ], dtype=np.float64).T quad_h = h_inv @ rect quad = (quad_h[:2] / quad_h[2]).T quad[:, 0] = np.clip(quad[:, 0], 0, w - 1) quad[:, 1] = np.clip(quad[:, 1], 0, h - 1) # Vanishing point: image location of the horizon along the recede direction # (denominator -> 0), for debugging / downstream consumers. vanishing_point = None norm2 = a * a + b * b if norm2 > 1e-12: vp_x = cx - a * cc / norm2 vp_y = cy - b * cc / norm2 vanishing_point = {"x": float(vp_x), "y": float(vp_y)} homography = H.flatten().tolist() duration = time.perf_counter() - start_time print(f"[{task_id}] Depth-based floor plane estimation completed in {duration:.4f}s " f"(grad=({a:.3e},{b:.3e}), beta={beta:.4f}).", flush=True) return homography, { "x": x1, "y": y1, "width": width, "height": height, "quad": quad.flatten().tolist(), "vanishingPoint": vanishing_point, } def estimate_floor_plane(mask: np.ndarray, img_np: np.ndarray, task_id: str = "segment"): print(f"[{task_id}] Starting floor plane estimation...", flush=True) start_time = time.perf_counter() ys, xs = np.where(mask > 0) if len(xs) < 1000: return None, None xs_f, ys_f = xs.astype(np.float32), ys.astype(np.float32) x1, x2 = float(np.percentile(xs_f, 1)), float(np.percentile(xs_f, 99)) y1, y2 = float(np.percentile(ys_f, 1)), float(np.percentile(ys_f, 99)) width, height = x2 - x1, y2 - y1 if width < 20 or height < 20: return None, None top_y = float(np.percentile(ys_f, 8)) bottom_y = float(np.percentile(ys_f, 97)) edge_fits = fit_floor_edges(mask) if edge_fits is None: return None, None left_fit, right_fit = edge_fits top_left = float(np.polyval(left_fit, top_y)) top_right = float(np.polyval(right_fit, top_y)) bottom_left = float(np.polyval(left_fit, bottom_y)) bottom_right = float(np.polyval(right_fit, bottom_y)) lower_xs = xs_f[ys_f >= np.percentile(ys_f, 80)] bottom_left = min(bottom_left, float(np.percentile(lower_xs, 4))) bottom_right = max(bottom_right, float(np.percentile(lower_xs, 96))) min_top_width = max(24.0, width * 0.18) top_center = (top_left + top_right) * 0.5 if top_right - top_left < min_top_width: top_left = top_center - min_top_width * 0.5 top_right = top_center + min_top_width * 0.5 min_bottom_width = max(min_top_width * 1.25, width * 0.45) bottom_center = (bottom_left + bottom_right) * 0.5 if bottom_right - bottom_left < min_bottom_width: bottom_left = bottom_center - min_bottom_width * 0.5 bottom_right = bottom_center + min_bottom_width * 0.5 h, w = mask.shape[:2] src = np.float32([ [np.clip(bottom_left, 0, w - 1), np.clip(bottom_y, 0, h - 1)], [np.clip(bottom_right, 0, w - 1), np.clip(bottom_y, 0, h - 1)], [np.clip(top_right, 0, w - 1), np.clip(top_y, 0, h - 1)], [np.clip(top_left, 0, w - 1), np.clip(top_y, 0, h - 1)], ]) vanishing_point = detect_vanishing_point(img_np, mask) if vanishing_point is not None and vanishing_point["y"] < bottom_y: vp_x = float(np.clip(vanishing_point["x"], -w * 0.25, w * 1.25)) top_width = max(src[2][0] - src[3][0], width * 0.16) horizon_gap = max(bottom_y - top_y, 1.0) convergence = np.clip((top_y - vanishing_point["y"]) / horizon_gap, 0.12, 0.75) top_center = top_center * (1 - convergence * 0.35) + vp_x * (convergence * 0.35) src[3][0] = np.clip(top_center - top_width * 0.5, 0, w - 1) src[2][0] = np.clip(top_center + top_width * 0.5, 0, w - 1) if cv2.contourArea(src) < 100: return None, None dst = np.float32([[x1, y2], [x2, y2], [x2, y1], [x1, y1]]) homography = cv2.getPerspectiveTransform(src, dst).flatten().tolist() duration = time.perf_counter() - start_time print(f"[{task_id}] Floor plane estimation completed in {duration:.4f}s", flush=True) return homography, { "x": x1, "y": y1, "width": width, "height": height, "quad": src.flatten().tolist(), "vanishingPoint": vanishing_point, } def build_floor_surface_mask( floor_mask: np.ndarray, seg_map: np.ndarray, quad: np.ndarray | None, depth: np.ndarray | None, task_id: str = "segment", ): print(f"[{task_id}] Starting floor surface mask build...", flush=True) start_time = time.perf_counter() h, w = floor_mask.shape[:2] kern_size = max(5, min(h, w) // 160) | 1 kern = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kern_size, kern_size)) occluder_mask = np.isin(seg_map, class_ids(OCCLUDER_CLASSES)).astype(np.uint8) occ_dilated = cv2.dilate(occluder_mask, kern, iterations=2) reject_mask = np.isin(seg_map, class_ids(REJECT_SURFACE_CLASSES)).astype(np.uint8) reject_dilated = cv2.dilate(reject_mask, kern, iterations=2) surface = floor_mask.copy() surface[reject_dilated > 0] = 0 if not surface.any(): surface = floor_mask.copy() contours, _ = cv2.findContours(surface, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if contours: filled = np.zeros((h, w), dtype=np.uint8) cv2.drawContours(filled, contours, -1, 1, cv2.FILLED) filled[reject_dilated > 0] = 0 surface = filled if quad is not None and surface.any(): plane_mask = np.zeros((h, w), dtype=np.uint8) cv2.fillConvexPoly(plane_mask, np.round(quad).astype(np.int32), 1) plane_mask[reject_dilated > 0] = 0 near_floor = cv2.dilate(surface, kern, iterations=6) surface = cv2.bitwise_or(surface, cv2.bitwise_and(plane_mask, near_floor)) surface[occ_dilated > 0] = 0 if depth is not None and floor_mask.any(): floor_depth = depth[floor_mask > 0] lo, hi = float(np.percentile(floor_depth, 2)), float(np.percentile(floor_depth, 98)) margin = max(0.08, (hi - lo) * 0.35) depth_keep = (depth >= lo - margin) & (depth <= hi + margin) surface = (surface & depth_keep.astype(np.uint8)).astype(np.uint8) surface[floor_mask > 0] = np.maximum(surface[floor_mask > 0], 1) surface[occ_dilated > 0] = 0 surface[reject_dilated > 0] = 0 surface = clean_floor_mask(surface) surface[occ_dilated > 0] = 0 surface[reject_dilated > 0] = 0 boundary_kern = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)) surface = cv2.dilate(surface, boundary_kern, iterations=1) surface[occ_dilated > 0] = 0 surface[reject_dilated > 0] = 0 duration = time.perf_counter() - start_time print(f"[{task_id}] Floor surface mask built in {duration:.4f}s", flush=True) return surface def run_segmentation(img: Image.Image, img_np: np.ndarray, task_id: str = "segment"): h, w = img_np.shape[:2] print(f"[{task_id}] Running segmentation (backend: {segmentation_backend})...", flush=True) start_time = time.perf_counter() if segmentation_backend == "oneformer": inputs = seg_processor( images=img, task_inputs=["semantic"], return_tensors="pt", ).to(device) with torch.no_grad(): outputs = seg_model(**inputs) result = seg_processor.post_process_semantic_segmentation( outputs, target_sizes=[(h, w)], )[0] seg_map = result.cpu().numpy().astype(np.uint8) elif segmentation_backend == "mask2former": inputs = seg_processor(images=img, return_tensors="pt").to(device) with torch.no_grad(): outputs = seg_model(**inputs) is_panoptic = "panoptic" in MASK2FORMER_MODEL_NAME if is_panoptic: pan_result = seg_processor.post_process_panoptic_segmentation( outputs, target_sizes=[(h, w)], )[0] seg_map = np.zeros((h, w), dtype=np.uint8) pan_map = pan_result["segmentation"].cpu().numpy() for seg_info in pan_result["segments_info"]: seg_map[pan_map == seg_info["id"]] = min(seg_info["label_id"], 255) else: result = seg_processor.post_process_semantic_segmentation( outputs, target_sizes=[(h, w)], )[0] seg_map = result.cpu().numpy().astype(np.uint8) else: inputs = seg_processor(images=img, return_tensors="pt").to(device) with torch.no_grad(): outputs = seg_model(**inputs) seg = outputs.logits.argmax(dim=1).squeeze().cpu().numpy() seg_map = cv2.resize(seg.astype(np.uint8), (w, h), interpolation=cv2.INTER_NEAREST) duration = time.perf_counter() - start_time print(f"[{task_id}] Segmentation completed in {duration:.4f}s", flush=True) return seg_map def segmenter_metadata_name() -> str: if segmentation_backend == "oneformer": return "oneformer-ade20k-swin-large" return segmentation_backend def build_segmentation_bundle(contents: bytes, task_id: str = "segment"): print(f"[{task_id}] Starting bundle build...", flush=True) start_total = time.perf_counter() # Load original high-resolution image. Bake in the EXIF orientation so the # pixels are upright before any processing — phone photos (e.g. iPhone) are # stored in sensor orientation with an EXIF rotation tag, which we re-encode # away below, so without this the output bundle would come out rotated. img = Image.open(io.BytesIO(contents)) img = ImageOps.exif_transpose(img).convert("RGB") img_np = np.array(img) h, w = img_np.shape[:2] # Calculate scale factor and downscaled image max_dim = 1024 if w > max_dim or h > max_dim: if w > h: new_w = max_dim new_h = int(round(h * max_dim / w)) else: new_h = max_dim new_w = int(round(w * max_dim / h)) img_resized = img.resize((new_w, new_h), Image.Resampling.BILINEAR) img_np_resized = np.array(img_resized) else: img_resized = img img_np_resized = img_np new_w, new_h = w, h # Run the full processing pipeline at the downscaled resolution for high speed min_floor_area_resized = max(1200, int(new_w * new_h * 0.015)) seg_map_resized = run_segmentation(img_resized, img_np_resized, task_id=task_id) primary_floor_ids = class_ids(PRIMARY_FLOOR_CLASSES) floor_class_ids = class_ids(FLOOR_SURFACE_CLASSES) floor_mask_resized = np.isin(seg_map_resized, primary_floor_ids).astype(np.uint8) floor_mask_resized = wall_subtract(floor_mask_resized, seg_map_resized, dilation=1) floor_mask_resized = clean_floor_mask(floor_mask_resized) if int(floor_mask_resized.sum()) < min_floor_area_resized: floor_mask_resized = np.isin(seg_map_resized, floor_class_ids).astype(np.uint8) floor_mask_resized = wall_subtract(floor_mask_resized, seg_map_resized, dilation=1) floor_mask_resized = clean_floor_mask(floor_mask_resized) depth_resized = estimate_depth(img_resized, new_w, new_h, task_id=task_id) homography_resized, plane_resized = None, None if FLOOR_PLANE_FROM_DEPTH and depth_resized is not None: homography_resized, plane_resized = estimate_floor_plane_from_depth( floor_mask_resized, depth_resized, task_id=task_id ) if homography_resized is None: homography_resized, plane_resized = estimate_floor_plane( floor_mask_resized, img_np_resized, task_id=task_id ) quad_resized = np.asarray(plane_resized["quad"], dtype=np.float32).reshape(4, 2) if plane_resized and plane_resized.get("quad") else None surface_mask_resized = build_floor_surface_mask(floor_mask_resized, seg_map_resized, quad_resized, depth_resized, task_id=task_id) shade_map_resized = None if ENABLE_INTRINSIC_SHADING and intrinsic_models is not None: shade_map_resized = build_intrinsic_shade_map(img_np_resized, surface_mask_resized, task_id=task_id) if shade_map_resized is None: shade_map_resized = build_shade_map(img_np_resized, surface_mask_resized, task_id=task_id) # Now upscale the results back to the original high-resolution space # 1. Base64-encode the original high-resolution image as a high-quality JPEG buffer = io.BytesIO() img.save(buffer, format="JPEG", quality=95) pixels_b64 = base64.b64encode(buffer.getvalue()).decode() # 2. Resize masks back to original resolution seg_map = cv2.resize(seg_map_resized, (w, h), interpolation=cv2.INTER_NEAREST) floor_mask = cv2.resize(floor_mask_resized, (w, h), interpolation=cv2.INTER_NEAREST) surface_mask = cv2.resize(surface_mask_resized, (w, h), interpolation=cv2.INTER_NEAREST) surface_indices = np.flatnonzero(surface_mask.ravel()).astype(np.uint32) if depth_resized is not None: depth = cv2.resize(depth_resized, (w, h), interpolation=cv2.INTER_LINEAR) else: depth = None if shade_map_resized is not None: shade_map = cv2.resize(shade_map_resized, (w, h), interpolation=cv2.INTER_LINEAR) else: shade_map = None # 3. Scale homography and plane coordinates to original resolution sx, sy = w / new_w, h / new_h homography = None plane = None if homography_resized is not None: # The homography maps source image pixels -> destination floor-plane # coords, both estimated in the downscaled space. To express it in the # original-resolution space we conjugate by the scale transform: # H_orig = diag(sx, sy, 1) @ H_resized @ diag(1/sx, 1/sy, 1) # The right factor rescales the input pixels (columns); the left factor # rescales the output floor coords (rows) so they match the plane rect, # which is also scaled to original resolution below. h_matrix = np.array(homography_resized).reshape(3, 3) h_matrix[:, 0] /= sx h_matrix[:, 1] /= sy h_matrix[0, :] *= sx h_matrix[1, :] *= sy homography = h_matrix.flatten().tolist() if plane_resized is not None: # Scale quad coords quad_orig = [] if plane_resized.get("quad"): for i, val in enumerate(plane_resized["quad"]): if i % 2 == 0: quad_orig.append(val * sx) else: quad_orig.append(val * sy) # Scale vanishing point vp_orig = None if plane_resized.get("vanishingPoint") is not None: vp_orig = { "x": plane_resized["vanishingPoint"]["x"] * sx, "y": plane_resized["vanishingPoint"]["y"] * sy, } plane = { "x": plane_resized["x"] * sx, "y": plane_resized["y"] * sy, "width": plane_resized["width"] * sx, "height": plane_resized["height"] * sy, "quad": quad_orig, "vanishingPoint": vp_orig, } # Populate final segments bundle list min_floor_area = max(1200, int(w * h * 0.015)) segments = [] if len(surface_indices) >= min_floor_area: segments.append({ "id": 0, "className": "floor", "mask": base64.b64encode(surface_indices.tobytes()).decode(), "homography": homography, "plane": plane, "shadeMap": base64.b64encode(shade_map.tobytes()).decode() if shade_map is not None else None, "metadata": { "segmenter": segmenter_metadata_name(), "floorPixels": int(floor_mask.sum()), "surfacePixels": int(surface_mask.sum()), "depthEnabled": depth is not None, "shadingEnabled": shade_map is not None, }, }) if not segments: flat_seg = seg_map.ravel() for seg_id, class_id in enumerate(np.unique(flat_seg)): indices = np.where(flat_seg == class_id)[0].astype(np.uint32) if len(indices) < 1000: continue segments.append({ "id": int(seg_id), "className": class_name_for_id(int(class_id)), "mask": base64.b64encode(indices.tobytes()).decode(), "homography": None, "plane": None, "shadeMap": None, "metadata": { "segmenter": segmenter_metadata_name(), "depthEnabled": depth is not None, "shadingEnabled": False, }, }) duration = time.perf_counter() - start_total print(f"[{task_id}] Bundle build completed in {duration:.4f}s", flush=True) return {"width": w, "height": h, "pixels": pixels_b64, "segments": segments} def job_path(job_id: str) -> Path: return JOB_DIR / f"{job_id}.json" def read_job(job_id: str): path = job_path(job_id) if not path.exists(): raise HTTPException(status_code=404, detail="Job not found.") return json.loads(path.read_text()) def write_job(job: dict): job_path(job["id"]).write_text(json.dumps(job)) def run_conversion_task(job_id: str, upload_path: Path): print(f"[{job_id}] Starting background conversion task...", flush=True) start_time = time.perf_counter() try: image_bytes = upload_path.read_bytes() bundle = build_segmentation_bundle(image_bytes, task_id=job_id) (JOB_DIR / f"{job_id}.bundle.json").write_text(json.dumps(bundle)) job = read_job(job_id) job["status"] = "COMPLETED" write_job(job) duration = time.perf_counter() - start_time print(f"[{job_id}] Background conversion task completed in {duration:.4f}s", flush=True) except Exception as exc: duration = time.perf_counter() - start_time print(f"[{job_id}] Background conversion failed after {duration:.4f}s: {exc}", flush=True) try: job = read_job(job_id) job["status"] = "FAILED" job["error"] = str(exc) write_job(job) except Exception: pass @app.get("/") async def root(): return Response(status_code=200) @app.post("/viz2d/convert") async def convert_to_viz2d(file: UploadFile = File(...)): if file.content_type and not file.content_type.startswith("image/"): raise HTTPException(status_code=400, detail="Upload must be a JPG or PNG image.") job_id = uuid.uuid4().hex ext = Path(file.filename or "room.jpg").suffix.lower() if ext not in {".jpg", ".jpeg", ".png", ".webp"}: ext = ".jpg" upload_path = UPLOAD_DIR / f"{job_id}{ext}" with upload_path.open("wb") as out: shutil.copyfileobj(file.file, out) job = { "id": job_id, "status": "PROCESSING", "inputUrl": f"/uploads/{upload_path.name}", "outputUrl": f"/viz2d/jobs/{job_id}/file", } write_job(job) # Queue the work on the serialized inference pool rather than Starlette's # background-task thread pool, which would run many jobs against the shared # model at once. The job is fire-and-forget; status is tracked on disk. INFERENCE_POOL.submit(run_conversion_task, job_id, upload_path) return job @app.get("/viz2d/jobs/{job_id}") async def viz2d_job_status(job_id: str): return read_job(job_id) @app.get("/viz2d/jobs/{job_id}/file") async def viz2d_job_file(job_id: str): job = read_job(job_id) if job.get("status") != "COMPLETED": raise HTTPException(status_code=409, detail="Job is not completed yet.") bundle_path = JOB_DIR / f"{job_id}.bundle.json" if not bundle_path.exists(): raise HTTPException(status_code=404, detail="Job output not found.") return Response( content=bundle_path.read_bytes(), media_type="application/json", headers={"Content-Disposition": 'attachment; filename="visualizer.vizbundle.json"'}, ) @app.post("/segment") async def segment(file: UploadFile = File(...)): contents = await file.read() task_id = f"segment_{uuid.uuid4().hex[:8]}" # Run the heavy inference in the serialized pool so it never blocks the event # loop (status polls, health checks) and never overlaps another inference. loop = asyncio.get_running_loop() return await loop.run_in_executor( INFERENCE_POOL, build_segmentation_bundle, contents, task_id ) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)