Spaces:
Running on T4
Running on T4
| import aiofiles | |
| import asyncio | |
| import base64 | |
| import io | |
| import json | |
| import os | |
| import shutil | |
| import time | |
| try: | |
| import tomllib | |
| except ImportError: | |
| try: | |
| import tomli as tomllib | |
| except ImportError: | |
| try: | |
| import tomlkit as tomllib | |
| except ImportError: | |
| raise ImportError( | |
| "No TOML library found. Please run on Python 3.11+, or run 'pip install tomli' to support Python 3.10." | |
| ) | |
| import uuid | |
| from pathlib import Path | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| from fastapi import FastAPI, File, HTTPException, Response, UploadFile, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.middleware.gzip import GZipMiddleware | |
| from fastapi.responses import StreamingResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from PIL import Image | |
| from transformers import ( | |
| AutoImageProcessor, | |
| AutoModelForDepthEstimation, | |
| Mask2FormerForUniversalSegmentation, | |
| OneFormerForUniversalSegmentation, | |
| OneFormerProcessor, | |
| SegformerForSemanticSegmentation, | |
| ) | |
| ADE20K_CLASSES = [ | |
| "wall", "building", "sky", "floor", "tree", "ceiling", "road", "bed", | |
| "window", "grass", "cabinet", "sidewalk", "person", "ground", "door", | |
| "table", "mountain", "plant", "curtain", "chair", "car", "water", | |
| "painting", "sofa", "shelf", "house", "sea", "mirror", "rug", "field", | |
| "armchair", "seat", "fence", "desk", "rock", "wardrobe", "lamp", | |
| "bathtub", "railing", "cushion", "base", "box", "column", "signboard", | |
| "chest of drawers", "counter", "sand", "sink", "skyscraper", "fireplace", | |
| "refrigerator", "stairs", "runway", "bookcase", "blind", "coffee table", | |
| "toilet", "flower", "book", "hill", "bench", "countertop", "stove", | |
| "palm", "kitchen island", "computer", "swivel chair", "boat", "bar", | |
| "arcade machine", "hovel", "bus", "towel", "light", "truck", "tower", | |
| "chandelier", "awning", "streetlight", "booth", "television", "airplane", | |
| "dirt track", "apparel", "pole", "land", "bannister", "escalator", | |
| "ottoman", "bottle", "buffet", "poster", "stage", "van", "ship", | |
| "fountain", "conveyer belt", "canopy", "washer", "plaything", | |
| "swimming pool", "stool", "barrel", "basket", "waterfall", "tent", | |
| "bag", "minibike", "cradle", "oven", "ball", "food", "step", "tank", | |
| "trade name", "microwave", "pot", "animal", "bicycle", "lake", | |
| "dishwasher", "screen", "blanket", "sculpture", "hood", "sconce", | |
| "vase", "traffic light", "tray", "ashcan", "fan", "pier", "crt screen", | |
| "plate", "monitor", "bulletin board", "shower", "radiator", "glass", | |
| "clock", "flag", | |
| ] | |
| def load_config() -> dict: | |
| config_path = os.getenv("VISUALIZER_CONFIG") | |
| if not config_path: | |
| return {} | |
| path = Path(config_path).expanduser() | |
| if not path.is_absolute(): | |
| path = Path(__file__).resolve().parent / path | |
| if not path.exists(): | |
| raise RuntimeError(f"VISUALIZER_CONFIG does not exist: {path}") | |
| with path.open("rb") as config_file: | |
| return tomllib.load(config_file) | |
| CONFIG = load_config() | |
| def config_value(env_name: str, section: str, key: str, default): | |
| if env_name in os.environ: | |
| return os.environ[env_name] | |
| return CONFIG.get(section, {}).get(key, default) | |
| SEGMENTATION_MODEL = str( | |
| config_value("SEGMENTATION_MODEL", "models", "segmentation_model", "oneformer") | |
| ).lower() | |
| ONEFORMER_MODEL_NAME = str(config_value( | |
| "ONEFORMER_MODEL_NAME", | |
| "models", | |
| "oneformer_model_name", | |
| "shi-labs/oneformer_ade20k_swin_large", | |
| )) | |
| MASK2FORMER_MODEL_NAME = str(config_value( | |
| "MASK2FORMER_MODEL_NAME", | |
| "models", | |
| "mask2former_model_name", | |
| "facebook/mask2former-swin-small-ade-semantic", | |
| )) | |
| SEGFORMER_MODEL_NAME = str(config_value( | |
| "SEGFORMER_MODEL_NAME", | |
| "models", | |
| "segformer_model_name", | |
| "nvidia/segformer-b2-finetuned-ade-512-512", | |
| )) | |
| DEPTH_MODEL_NAME = str(config_value( | |
| "DEPTH_MODEL_NAME", | |
| "models", | |
| "depth_model_name", | |
| "Intel/dpt-large", | |
| )) | |
| ENABLE_DEPTH_ESTIMATION = str(config_value( | |
| "ENABLE_DEPTH_ESTIMATION", | |
| "runtime", | |
| "enable_depth_estimation", | |
| "1", | |
| )).lower() in {"1", "true", "yes", "on"} | |
| INTRINSIC_MODEL_VERSION = str(config_value( | |
| "INTRINSIC_MODEL_VERSION", | |
| "models", | |
| "intrinsic_model_version", | |
| "v2", | |
| )) | |
| ENABLE_INTRINSIC_SHADING = str(config_value( | |
| "ENABLE_INTRINSIC_SHADING", | |
| "runtime", | |
| "enable_intrinsic_shading", | |
| "0", | |
| )).lower() in {"1", "true", "yes", "on"} | |
| VISUALIZER_DATA_DIR = str(config_value( | |
| "VISUALIZER_DATA_DIR", | |
| "runtime", | |
| "data_dir", | |
| "data", | |
| )) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| seg_processor = None | |
| seg_model = None | |
| segmentation_backend = "segformer" | |
| depth_processor = None | |
| depth_model = None | |
| intrinsic_models = None | |
| def hf_offline() -> bool: | |
| return os.getenv("HF_HUB_OFFLINE") == "1" or os.getenv("TRANSFORMERS_OFFLINE") == "1" | |
| def _load_segmentation_model(): | |
| global seg_processor, seg_model, segmentation_backend | |
| if SEGMENTATION_MODEL == "oneformer": | |
| try: | |
| print(f"Loading OneFormer: {ONEFORMER_MODEL_NAME} ...", flush=True) | |
| seg_processor = OneFormerProcessor.from_pretrained( | |
| ONEFORMER_MODEL_NAME, | |
| local_files_only=hf_offline(), | |
| ) | |
| seg_model = OneFormerForUniversalSegmentation.from_pretrained( | |
| ONEFORMER_MODEL_NAME, | |
| local_files_only=hf_offline(), | |
| ).to(device) | |
| seg_model.eval() | |
| segmentation_backend = "oneformer" | |
| print("OneFormer loaded.", flush=True) | |
| return | |
| except Exception as exc: | |
| print(f"OneFormer failed ({exc}), falling back to Mask2Former.", flush=True) | |
| if SEGMENTATION_MODEL in {"oneformer", "mask2former"}: | |
| try: | |
| print(f"Loading Mask2Former: {MASK2FORMER_MODEL_NAME} ...", flush=True) | |
| seg_processor = AutoImageProcessor.from_pretrained( | |
| MASK2FORMER_MODEL_NAME, | |
| local_files_only=hf_offline(), | |
| ) | |
| seg_model = Mask2FormerForUniversalSegmentation.from_pretrained( | |
| MASK2FORMER_MODEL_NAME, | |
| local_files_only=hf_offline(), | |
| ).to(device) | |
| seg_model.eval() | |
| segmentation_backend = "mask2former" | |
| print("Mask2Former loaded.", flush=True) | |
| return | |
| except Exception as exc: | |
| print(f"Mask2Former failed ({exc}), falling back to SegFormer.", flush=True) | |
| print(f"Loading SegFormer: {SEGFORMER_MODEL_NAME} ...", flush=True) | |
| seg_processor = AutoImageProcessor.from_pretrained( | |
| SEGFORMER_MODEL_NAME, | |
| local_files_only=hf_offline(), | |
| ) | |
| seg_model = SegformerForSemanticSegmentation.from_pretrained( | |
| SEGFORMER_MODEL_NAME, | |
| local_files_only=hf_offline(), | |
| ).to(device) | |
| seg_model.eval() | |
| segmentation_backend = "segformer" | |
| print("SegFormer loaded.", flush=True) | |
| def _load_intrinsic_model(): | |
| global intrinsic_models | |
| if ENABLE_INTRINSIC_SHADING and intrinsic_models is None: | |
| try: | |
| print(f"Loading Intrinsic Image Decomposition model: {INTRINSIC_MODEL_VERSION} ...", flush=True) | |
| from intrinsic.pipeline import load_models | |
| intrinsic_models = load_models(INTRINSIC_MODEL_VERSION, device=str(device)) | |
| print("Intrinsic model loaded.", flush=True) | |
| except Exception as exc: | |
| print(f"Intrinsic model failed to load ({exc}). Falling back to luminance shading.", flush=True) | |
| app = FastAPI() | |
| app.add_middleware(GZipMiddleware, minimum_size=1000) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["https://room-editor-9y3b.vercel.app"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| DATA_DIR = Path(VISUALIZER_DATA_DIR).resolve() | |
| UPLOAD_DIR = DATA_DIR / "uploads" | |
| JOB_DIR = DATA_DIR / "jobs" | |
| UPLOAD_DIR.mkdir(parents=True, exist_ok=True) | |
| JOB_DIR.mkdir(parents=True, exist_ok=True) | |
| app.mount("/uploads", StaticFiles(directory=UPLOAD_DIR), name="uploads") | |
| PRIMARY_FLOOR_CLASSES = {"floor"} | |
| FLOOR_SURFACE_CLASSES = { | |
| "floor", "road", "sidewalk", "ground", "field", "grass", "sand", | |
| "runway", "dirt track", "land", "stairs", "step", | |
| } | |
| REJECT_SURFACE_CLASSES = {"wall", "ceiling", "building", "sky", "window"} | |
| OCCLUDER_CLASSES = { | |
| "bed", "cabinet", "person", "door", "table", "plant", "curtain", "chair", | |
| "car", "painting", "sofa", "shelf", "mirror", "rug", "armchair", "seat", "desk", | |
| "wardrobe", "lamp", "bathtub", "railing", "cushion", "base", "box", | |
| "column", "chest of drawers", "counter", "sink", "fireplace", | |
| "refrigerator", "bookcase", "blind", "coffee table", "toilet", "bench", | |
| "countertop", "stove", "kitchen island", "computer", "swivel chair", | |
| "bar", "ottoman", "bottle", "buffet", "poster", "towel", "television", | |
| "washer", "plaything", "stool", "basket", "bag", "cradle", "oven", | |
| "ball", "food", "microwave", "pot", "dishwasher", "blanket", "sculpture", | |
| "vase", "tray", "fan", "plate", "monitor", "shower", "radiator", "clock", | |
| } | |
| def class_name_for_id(class_id: int) -> str: | |
| return ADE20K_CLASSES[class_id] if class_id < len(ADE20K_CLASSES) else f"class_{class_id}" | |
| def class_ids(names: set[str]) -> list[int]: | |
| return [idx for idx, name in enumerate(ADE20K_CLASSES) if name in names] | |
| def estimate_depth(img: Image.Image, width: int, height: int): | |
| global depth_processor, depth_model | |
| if not ENABLE_DEPTH_ESTIMATION: | |
| return None | |
| model_name = DEPTH_MODEL_NAME | |
| try: | |
| if depth_processor is None or depth_model is None: | |
| print(f"Loading depth model: {model_name} ...", flush=True) | |
| depth_processor = AutoImageProcessor.from_pretrained( | |
| model_name, | |
| local_files_only=hf_offline(), | |
| ) | |
| depth_model = AutoModelForDepthEstimation.from_pretrained( | |
| model_name, | |
| local_files_only=hf_offline(), | |
| ).to(device) | |
| depth_model.eval() | |
| print("Depth model loaded.", flush=True) | |
| inputs = depth_processor(images=img, return_tensors="pt").to(device) | |
| with torch.no_grad(): | |
| outputs = depth_model(**inputs) | |
| depth = torch.nn.functional.interpolate( | |
| outputs.predicted_depth.unsqueeze(1), | |
| size=(height, width), | |
| mode="bicubic", | |
| align_corners=False, | |
| ).squeeze().cpu().numpy() | |
| depth = cv2.GaussianBlur(depth.astype(np.float32), (0, 0), sigmaX=3) | |
| depth_min, depth_max = float(np.min(depth)), float(np.max(depth)) | |
| if depth_max - depth_min < 1e-6: | |
| return None | |
| return (depth - depth_min) / (depth_max - depth_min) | |
| except Exception as exc: | |
| print(f"Depth estimation skipped ({exc}).", flush=True) | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # B4 — Shade Range Expansion | |
| # Encode the shade multiplier using the actual brightness spread of the floor | |
| # rather than a hardcoded [0.55, 1.35] clip, so dark-room images preserve the | |
| # full dynamic range of their shadow patterns. | |
| # --------------------------------------------------------------------------- | |
| def _adaptive_shade_range(relative: np.ndarray, floor_mask: np.ndarray) -> tuple[float, float]: | |
| floor_vals = relative[floor_mask > 0] | |
| if floor_vals.size == 0: | |
| return (0.55, 1.35) | |
| lo = max(0.25, float(np.percentile(floor_vals, 1))) | |
| hi = min(2.5, float(np.percentile(floor_vals, 99))) | |
| span = hi - lo | |
| if span < 0.4: | |
| mid = (lo + hi) / 2.0 | |
| lo, hi = mid - 0.2, mid + 0.2 | |
| return lo, hi | |
| def _encode_shade(relative: np.ndarray, lo: float, hi: float) -> np.ndarray: | |
| span = hi - lo | |
| return np.round((np.clip(relative, lo, hi) - lo) * (255.0 / span)).clip(0, 255).astype(np.uint8) | |
| # --------------------------------------------------------------------------- | |
| # B1 — Shadow Map Extraction | |
| # Luminance-based shade map; returns (encoded_uint8, (lo, hi)) so the frontend | |
| # can decode with the correct range. | |
| # --------------------------------------------------------------------------- | |
| def build_shade_map( | |
| img_np: np.ndarray, surface_mask: np.ndarray | |
| ) -> tuple[np.ndarray | None, tuple[float, float]]: | |
| default_range = (0.55, 1.35) | |
| if not surface_mask.any(): | |
| return None, default_range | |
| mask = surface_mask.astype(np.uint8) | |
| luminance = ( | |
| img_np[:, :, 0].astype(np.float32) * 0.299 | |
| + img_np[:, :, 1].astype(np.float32) * 0.587 | |
| + img_np[:, :, 2].astype(np.float32) * 0.114 | |
| ) | |
| h, w = mask.shape[:2] | |
| floor_values = luminance[mask > 0] | |
| if floor_values.size < max(256, int(h * w * 0.002)): | |
| return None, default_range | |
| median_lum = float(np.median(floor_values)) | |
| if median_lum < 1e-3: | |
| return None, default_range | |
| filled = luminance.copy() | |
| filled[mask == 0] = median_lum | |
| missing = (mask == 0).astype(np.uint8) * 255 | |
| try: | |
| filled = cv2.inpaint( | |
| np.clip(filled, 0, 255).astype(np.uint8), | |
| missing, | |
| max(3, min(h, w) // 160), | |
| cv2.INPAINT_TELEA, | |
| ).astype(np.float32) | |
| except cv2.error: | |
| pass | |
| sigma = max(8.0, min(h, w) / 28.0) | |
| smooth = cv2.GaussianBlur(filled, (0, 0), sigmaX=sigma, sigmaY=sigma) | |
| relative = smooth / median_lum | |
| relative[mask == 0] = 1.0 | |
| lo, hi = _adaptive_shade_range(relative, mask) | |
| return _encode_shade(relative, lo, hi), (lo, hi) | |
| def build_intrinsic_shade_map( | |
| img_np: np.ndarray, surface_mask: np.ndarray | |
| ) -> tuple[np.ndarray | None, tuple[float, float]]: | |
| default_range = (0.55, 1.35) | |
| if not surface_mask.any() or intrinsic_models is None: | |
| return None, default_range | |
| try: | |
| img_float = img_np.astype(np.float32) / 255.0 | |
| from intrinsic.pipeline import run_pipeline | |
| results = run_pipeline(intrinsic_models, img_float, device=str(device)) | |
| shading = None | |
| if "gry_shd" in results: | |
| shading = results["gry_shd"] | |
| elif "dif_shd" in results: | |
| dif = results["dif_shd"] | |
| shading = dif[:, :, 0] * 0.299 + dif[:, :, 1] * 0.587 + dif[:, :, 2] * 0.114 | |
| else: | |
| for k in results.keys(): | |
| if "shd" in k or "shading" in k: | |
| shading = results[k] | |
| if len(shading.shape) == 3: | |
| shading = shading[:, :, 0] * 0.299 + shading[:, :, 1] * 0.587 + shading[:, :, 2] * 0.114 | |
| break | |
| if shading is None: | |
| return None, default_range | |
| h, w = surface_mask.shape[:2] | |
| if shading.shape[:2] != (h, w): | |
| shading = cv2.resize(shading, (w, h), interpolation=cv2.INTER_LINEAR) | |
| sigma = max(3.0, min(h, w) / 80.0) | |
| shading = cv2.GaussianBlur(shading.astype(np.float32), (0, 0), sigmaX=sigma, sigmaY=sigma) | |
| floor_vals = shading[surface_mask > 0] | |
| if floor_vals.size == 0: | |
| return None, default_range | |
| median_val = float(np.median(floor_vals)) | |
| if median_val < 1e-3: | |
| return None, default_range | |
| relative_shading = shading / median_val | |
| relative_shading[surface_mask == 0] = 1.0 | |
| lo, hi = _adaptive_shade_range(relative_shading, surface_mask) | |
| return _encode_shade(relative_shading, lo, hi), (lo, hi) | |
| except Exception as exc: | |
| print(f"Intrinsic shading decomposition failed: {exc}. Falling back to default luminance shading.", flush=True) | |
| return None, default_range | |
| # --------------------------------------------------------------------------- | |
| # B2 — Color Temperature | |
| # Sample the brightest floor pixels to infer the room's lighting colour cast | |
| # and approximate Kelvin value. Returns a dict with `kelvin` and `cast` | |
| # (normalised RGB multipliers) so the frontend can tint replacement tiles. | |
| # --------------------------------------------------------------------------- | |
| def estimate_color_temperature( | |
| img_np: np.ndarray, surface_mask: np.ndarray | |
| ) -> dict | None: | |
| if not surface_mask.any(): | |
| return None | |
| pixels = img_np[surface_mask > 0].astype(np.float32) | |
| if len(pixels) < 100: | |
| return None | |
| lum = pixels[:, 0] * 0.299 + pixels[:, 1] * 0.587 + pixels[:, 2] * 0.114 | |
| thresh = float(np.percentile(lum, 70)) | |
| bright = pixels[lum >= thresh] | |
| if len(bright) < 10: | |
| bright = pixels | |
| mr = float(np.mean(bright[:, 0])) | |
| mg = float(np.mean(bright[:, 1])) | |
| mb = float(np.mean(bright[:, 2])) | |
| ref = max(mr, mg, mb, 1e-3) | |
| rb = mr / max(mb, 1.0) | |
| if rb > 1.6: | |
| kelvin = 2700 | |
| elif rb > 1.3: | |
| kelvin = 3200 | |
| elif rb > 1.1: | |
| kelvin = 4000 | |
| elif rb > 0.9: | |
| kelvin = 5500 | |
| elif rb > 0.7: | |
| kelvin = 6500 | |
| else: | |
| kelvin = 8000 | |
| return { | |
| "kelvin": kelvin, | |
| "cast": {"r": round(mr / ref, 4), "g": round(mg / ref, 4), "b": round(mb / ref, 4)}, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # B3 — Light Vector | |
| # Estimate the primary in-plane light direction from the gradient of the shade | |
| # map. Returns a normalised {x, y} vector pointing toward the light source. | |
| # --------------------------------------------------------------------------- | |
| def estimate_light_vector( | |
| shade_map: np.ndarray | None, surface_mask: np.ndarray | |
| ) -> dict | None: | |
| if shade_map is None or not surface_mask.any(): | |
| return None | |
| shade_f = shade_map.astype(np.float32) | |
| valid = surface_mask.astype(np.float32) | |
| kern = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) | |
| valid_e = cv2.erode(valid, kern, iterations=2) | |
| clean = shade_f * valid_e | |
| gx = cv2.Sobel(clean, cv2.CV_32F, 1, 0, ksize=15) * valid_e | |
| gy = cv2.Sobel(clean, cv2.CV_32F, 0, 1, ksize=15) * valid_e | |
| mag = np.hypot(gx, gy) | |
| total = float(mag.sum()) | |
| if total < 1e-6: | |
| return None | |
| lx = float((gx * mag).sum()) / total | |
| ly = float((gy * mag).sum()) / total | |
| norm = float(np.hypot(lx, ly)) | |
| if norm < 1e-6: | |
| return None | |
| return {"x": round(lx / norm, 4), "y": round(ly / norm, 4)} | |
| def clean_floor_mask(mask: np.ndarray) -> np.ndarray: | |
| if mask.dtype != np.uint8: | |
| mask = mask.astype(np.uint8) | |
| h, w = mask.shape[:2] | |
| min_side = max(3, min(h, w)) | |
| close_size = max(5, int(round(min_side * 0.018))) | 1 | |
| open_size = max(3, int(round(min_side * 0.006))) | 1 | |
| closed = cv2.morphologyEx( | |
| mask, | |
| cv2.MORPH_CLOSE, | |
| cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (close_size, close_size)), | |
| ) | |
| cleaned = cv2.morphologyEx( | |
| closed, | |
| cv2.MORPH_OPEN, | |
| cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (open_size, open_size)), | |
| ) | |
| count, labels, stats, _ = cv2.connectedComponentsWithStats(cleaned, connectivity=8) | |
| if count <= 1: | |
| return cleaned | |
| gravity_threshold = int(h * 0.60) | |
| min_area = max(1000, int(h * w * 0.01)) | |
| result = np.zeros_like(cleaned) | |
| for component_id in range(1, count): | |
| area = stats[component_id, cv2.CC_STAT_AREA] | |
| if area < min_area: | |
| continue | |
| comp_bottom = stats[component_id, cv2.CC_STAT_TOP] + stats[component_id, cv2.CC_STAT_HEIGHT] | |
| if comp_bottom <= gravity_threshold: | |
| continue | |
| result[labels == component_id] = 1 | |
| if result.any(): | |
| return result | |
| largest = 1 + int(np.argmax(stats[1:, cv2.CC_STAT_AREA])) | |
| return (labels == largest).astype(np.uint8) | |
| def wall_subtract(mask: np.ndarray, seg_map: np.ndarray, dilation: int = 1) -> np.ndarray: | |
| reject_raw = np.isin(seg_map, class_ids(REJECT_SURFACE_CLASSES)).astype(np.uint8) | |
| if dilation > 0: | |
| kern = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) | |
| reject_raw = cv2.dilate(reject_raw, kern, iterations=dilation) | |
| result = mask.copy() | |
| result[reject_raw > 0] = 0 | |
| return result | |
| def fit_floor_edges(mask: np.ndarray): | |
| h, w = mask.shape[:2] | |
| row_ys, lefts, rights = [], [], [] | |
| step = max(1, h // 260) | |
| for y in range(0, h, step): | |
| row_xs = np.where(mask[y] > 0)[0] | |
| if len(row_xs) < max(8, w * 0.01): | |
| continue | |
| row_ys.append(float(y)) | |
| lefts.append(float(np.percentile(row_xs, 3))) | |
| rights.append(float(np.percentile(row_xs, 97))) | |
| if len(row_ys) < 8: | |
| return None | |
| row_ys_np = np.asarray(row_ys, dtype=np.float32) | |
| return np.polyfit(row_ys_np, np.asarray(lefts, dtype=np.float32), 1), np.polyfit( | |
| row_ys_np, | |
| np.asarray(rights, dtype=np.float32), | |
| 1, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # B8 — Convex Hull Quad Fitting | |
| # Derive a tight bounding quadrilateral from the convex hull of the floor mask. | |
| # Used alongside the linear edge-fit quad so that corners of L-shaped rooms | |
| # and irregular floor boundaries are fully covered. | |
| # --------------------------------------------------------------------------- | |
| def convex_hull_quad(mask: np.ndarray) -> np.ndarray | None: | |
| ys, xs = np.where(mask > 0) | |
| if len(xs) < 50: | |
| return None | |
| pts = np.column_stack([xs, ys]).astype(np.float32) | |
| hull = cv2.convexHull(pts) | |
| if hull is None or len(hull) < 4: | |
| return None | |
| rect = cv2.minAreaRect(hull.squeeze()) | |
| box = cv2.boxPoints(rect) # (4, 2) — x,y columns | |
| h, w = mask.shape[:2] | |
| box[:, 0] = np.clip(box[:, 0], 0, w - 1) | |
| box[:, 1] = np.clip(box[:, 1], 0, h - 1) | |
| return box | |
| # --------------------------------------------------------------------------- | |
| # B6 — Dual Vanishing Point Detection | |
| # Detect two independent VPs: one from positive-slope lines (converging right) | |
| # and one from negative-slope lines (converging left), covering oblique shots | |
| # and corner-camera perspectives. | |
| # --------------------------------------------------------------------------- | |
| def detect_dual_vanishing_points( | |
| img_np: np.ndarray, floor_mask: np.ndarray | |
| ) -> tuple[dict | None, dict | None]: | |
| gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) | |
| gray = cv2.GaussianBlur(gray, (5, 5), 0) | |
| edges = cv2.Canny(gray, 60, 160) | |
| edges[floor_mask == 0] = 0 | |
| lines = cv2.HoughLinesP( | |
| edges, | |
| rho=1, | |
| theta=np.pi / 180, | |
| threshold=60, | |
| minLineLength=max(40, min(img_np.shape[:2]) // 16), | |
| maxLineGap=24, | |
| ) | |
| if lines is None: | |
| return None, None | |
| h, w = img_np.shape[:2] | |
| pos_lines, neg_lines = [], [] | |
| for line in lines[:, 0, :]: | |
| x1, y1, x2, y2 = [float(v) for v in line] | |
| dx, dy = x2 - x1, y2 - y1 | |
| length = float(np.hypot(dx, dy)) | |
| if length < 40 or abs(dx) < 1: | |
| continue | |
| slope = dy / dx | |
| if abs(slope) < 0.18: | |
| continue | |
| entry = (x1, y1, x2, y2, slope, length) | |
| if slope > 0: | |
| pos_lines.append(entry) | |
| else: | |
| neg_lines.append(entry) | |
| def _find_vp(group: list) -> dict | None: | |
| intersections = [] | |
| for i, (x1, y1, _, _, s1, l1) in enumerate(group): | |
| a1 = y1 - s1 * x1 | |
| for x3, y3, _, _, s2, l2 in group[i + 1:]: | |
| if abs(s1 - s2) < 0.08: | |
| continue | |
| denom = s1 - s2 | |
| if abs(denom) < 1e-9: | |
| continue | |
| x = (a2 := y3 - s2 * x3, (a2 - a1) / denom)[1] | |
| y = s1 * x + a1 | |
| if -w * 0.6 <= x <= w * 1.6 and -h * 1.2 <= y <= h * 1.0: | |
| intersections.append((x, y, min(l1, l2))) | |
| if len(intersections) < 3: | |
| return None | |
| pts = np.array([[p[0], p[1]] for p in intersections], np.float32) | |
| weights = np.array([p[2] for p in intersections], np.float32) | |
| center = np.average(pts, axis=0, weights=weights) | |
| dist = np.linalg.norm(pts - center, axis=1) | |
| keep = dist <= np.percentile(dist, 70) | |
| if keep.sum() >= 3: | |
| center = np.average(pts[keep], axis=0, weights=weights[keep]) | |
| return {"x": float(center[0]), "y": float(center[1])} | |
| vp_right = _find_vp(pos_lines) # positive-slope lines converge to the right | |
| vp_left = _find_vp(neg_lines) # negative-slope lines converge to the left | |
| # Primary VP = the one whose y is lower in the image (closer to the horizon) | |
| candidates = [(vp, abs(vp["y"])) for vp in [vp_right, vp_left] if vp is not None] | |
| if not candidates: | |
| return None, None | |
| candidates.sort(key=lambda t: t[1]) | |
| primary = candidates[0][0] | |
| secondary = candidates[1][0] if len(candidates) > 1 else None | |
| return primary, secondary | |
| def estimate_floor_plane(mask: np.ndarray, img_np: np.ndarray): | |
| ys, xs = np.where(mask > 0) | |
| if len(xs) < 1000: | |
| return None, None | |
| xs_f, ys_f = xs.astype(np.float32), ys.astype(np.float32) | |
| x1, x2 = float(np.percentile(xs_f, 1)), float(np.percentile(xs_f, 99)) | |
| y1, y2 = float(np.percentile(ys_f, 1)), float(np.percentile(ys_f, 99)) | |
| width, height = x2 - x1, y2 - y1 | |
| if width < 20 or height < 20: | |
| return None, None | |
| top_y = float(np.percentile(ys_f, 8)) | |
| bottom_y = float(np.percentile(ys_f, 97)) | |
| edge_fits = fit_floor_edges(mask) | |
| if edge_fits is None: | |
| return None, None | |
| left_fit, right_fit = edge_fits | |
| top_left = float(np.polyval(left_fit, top_y)) | |
| top_right = float(np.polyval(right_fit, top_y)) | |
| bottom_left = float(np.polyval(left_fit, bottom_y)) | |
| bottom_right = float(np.polyval(right_fit, bottom_y)) | |
| lower_xs = xs_f[ys_f >= np.percentile(ys_f, 80)] | |
| bottom_left = min(bottom_left, float(np.percentile(lower_xs, 4))) | |
| bottom_right = max(bottom_right, float(np.percentile(lower_xs, 96))) | |
| min_top_width = max(24.0, width * 0.18) | |
| top_center = (top_left + top_right) * 0.5 | |
| if top_right - top_left < min_top_width: | |
| top_left = top_center - min_top_width * 0.5 | |
| top_right = top_center + min_top_width * 0.5 | |
| min_bottom_width = max(min_top_width * 1.25, width * 0.45) | |
| bottom_center = (bottom_left + bottom_right) * 0.5 | |
| if bottom_right - bottom_left < min_bottom_width: | |
| bottom_left = bottom_center - min_bottom_width * 0.5 | |
| bottom_right = bottom_center + min_bottom_width * 0.5 | |
| h, w = mask.shape[:2] | |
| src = np.float32([ | |
| [np.clip(bottom_left, 0, w - 1), np.clip(bottom_y, 0, h - 1)], | |
| [np.clip(bottom_right, 0, w - 1), np.clip(bottom_y, 0, h - 1)], | |
| [np.clip(top_right, 0, w - 1), np.clip(top_y, 0, h - 1)], | |
| [np.clip(top_left, 0, w - 1), np.clip(top_y, 0, h - 1)], | |
| ]) | |
| # B6 — use dual VP; primary VP guides top-edge convergence | |
| vanishing_point, vanishing_point2 = detect_dual_vanishing_points(img_np, mask) | |
| if vanishing_point is not None and vanishing_point["y"] < bottom_y: | |
| vp_x = float(np.clip(vanishing_point["x"], -w * 0.25, w * 1.25)) | |
| top_width = max(src[2][0] - src[3][0], width * 0.16) | |
| horizon_gap = max(bottom_y - top_y, 1.0) | |
| convergence = np.clip((top_y - vanishing_point["y"]) / horizon_gap, 0.12, 0.75) | |
| top_center = top_center * (1 - convergence * 0.35) + vp_x * (convergence * 0.35) | |
| src[3][0] = np.clip(top_center - top_width * 0.5, 0, w - 1) | |
| src[2][0] = np.clip(top_center + top_width * 0.5, 0, w - 1) | |
| # B8 — expand src quad to cover convex hull corners not reached by linear fits | |
| hull_box = convex_hull_quad(mask) | |
| hull_quad_list = hull_box.flatten().tolist() if hull_box is not None else None | |
| if hull_box is not None: | |
| hull_bottom_y = float(np.max(hull_box[:, 1])) | |
| hull_top_y = float(np.min(hull_box[:, 1])) | |
| hull_left_x = float(np.min(hull_box[:, 0])) | |
| hull_right_x = float(np.max(hull_box[:, 0])) | |
| src[0][0] = min(src[0][0], hull_left_x) | |
| src[1][0] = max(src[1][0], hull_right_x) | |
| src[0][1] = src[1][1] = max(src[0][1], hull_bottom_y) | |
| src[2][1] = src[3][1] = min(src[2][1], hull_top_y) | |
| src = np.clip(src, [0, 0], [w - 1, h - 1]).astype(np.float32) | |
| if cv2.contourArea(src.reshape(-1, 1, 2)) < 100: | |
| return None, None | |
| dst = np.float32([[x1, y2], [x2, y2], [x2, y1], [x1, y1]]) | |
| homography = cv2.getPerspectiveTransform(src, dst).flatten().tolist() | |
| return homography, { | |
| "x": x1, | |
| "y": y1, | |
| "width": width, | |
| "height": height, | |
| "quad": src.flatten().tolist(), | |
| "hullQuad": hull_quad_list, # B8 | |
| "vanishingPoint": vanishing_point, # B6 primary | |
| "vanishingPoint2": vanishing_point2, # B6 secondary | |
| } | |
| # --------------------------------------------------------------------------- | |
| # B5 — Complement-Stamp Furniture | |
| # Use a single dilation pass (down from two) and restore the narrow contact | |
| # zone directly below each occluder so chair legs, table bases, and plant pots | |
| # sit flush against the tile surface without a visible gap or halo. | |
| # --------------------------------------------------------------------------- | |
| def build_floor_surface_mask( | |
| floor_mask: np.ndarray, | |
| seg_map: np.ndarray, | |
| quad: np.ndarray | None, | |
| depth: np.ndarray | None, | |
| ): | |
| h, w = floor_mask.shape[:2] | |
| kern_size = max(5, min(h, w) // 160) | 1 | |
| kern = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kern_size, kern_size)) | |
| occluder_mask = np.isin(seg_map, class_ids(OCCLUDER_CLASSES)).astype(np.uint8) | |
| # One dilation pass instead of two — keeps the occluder boundary tight so | |
| # furniture feet don't leave a visible halo on the replaced tile surface. | |
| occ_dilated = cv2.dilate(occluder_mask, kern, iterations=1) | |
| reject_mask = np.isin(seg_map, class_ids(REJECT_SURFACE_CLASSES)).astype(np.uint8) | |
| reject_dilated = cv2.dilate(reject_mask, kern, iterations=2) | |
| surface = floor_mask.copy() | |
| surface[reject_dilated > 0] = 0 | |
| if not surface.any(): | |
| surface = floor_mask.copy() | |
| contours, _ = cv2.findContours(surface, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| if contours: | |
| filled = np.zeros((h, w), dtype=np.uint8) | |
| cv2.drawContours(filled, contours, -1, 1, cv2.FILLED) | |
| filled[reject_dilated > 0] = 0 | |
| surface = filled | |
| if quad is not None and surface.any(): | |
| plane_mask = np.zeros((h, w), dtype=np.uint8) | |
| cv2.fillConvexPoly(plane_mask, np.round(quad).astype(np.int32), 1) | |
| plane_mask[reject_dilated > 0] = 0 | |
| near_floor = cv2.dilate(surface, kern, iterations=6) | |
| surface = cv2.bitwise_or(surface, cv2.bitwise_and(plane_mask, near_floor)) | |
| surface[occ_dilated > 0] = 0 | |
| if depth is not None and floor_mask.any(): | |
| floor_depth = depth[floor_mask > 0] | |
| lo, hi = float(np.percentile(floor_depth, 2)), float(np.percentile(floor_depth, 98)) | |
| margin = max(0.08, (hi - lo) * 0.35) | |
| depth_keep = (depth >= lo - margin) & (depth <= hi + margin) | |
| surface = (surface & depth_keep.astype(np.uint8)).astype(np.uint8) | |
| surface[floor_mask > 0] = np.maximum(surface[floor_mask > 0], 1) | |
| surface[occ_dilated > 0] = 0 | |
| surface[reject_dilated > 0] = 0 | |
| surface = clean_floor_mask(surface) | |
| surface[occ_dilated > 0] = 0 | |
| surface[reject_dilated > 0] = 0 | |
| boundary_kern = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)) | |
| surface = cv2.dilate(surface, boundary_kern, iterations=1) | |
| surface[occ_dilated > 0] = 0 | |
| surface[reject_dilated > 0] = 0 | |
| # Restore the narrow contact zone at the bottom edge of each occluder so | |
| # furniture touches the tile surface naturally (B5). | |
| contact_kern_v = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 3)) | |
| occ_eroded = cv2.erode(occluder_mask, contact_kern_v, iterations=1) | |
| occ_bottom_edge = cv2.subtract(occluder_mask, occ_eroded) | |
| contact_tiny = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)) | |
| contact_zone = cv2.dilate(occ_bottom_edge, contact_tiny, iterations=1) | |
| restore = cv2.bitwise_and(contact_zone, floor_mask) | |
| surface = cv2.bitwise_or(surface, restore) | |
| surface[reject_dilated > 0] = 0 | |
| return surface | |
| # --------------------------------------------------------------------------- | |
| # B10 — Confidence-Aware Boundaries | |
| # Distance-transform the surface mask so pixels near its edge get a low | |
| # confidence score. The frontend uses this to feather tile blending at | |
| # boundary transitions instead of a hard cut. | |
| # --------------------------------------------------------------------------- | |
| def build_confidence_map(surface_mask: np.ndarray) -> np.ndarray | None: | |
| if not surface_mask.any(): | |
| return None | |
| dist = cv2.distanceTransform(surface_mask.astype(np.uint8), cv2.DIST_L2, 5) | |
| feather = max(10.0, min(surface_mask.shape[:2]) / 50.0) | |
| confidence = np.clip(dist / feather, 0.0, 1.0) | |
| return (confidence * 255).astype(np.uint8) | |
| # --------------------------------------------------------------------------- | |
| # B7 — Multi-Room Grid Alignment | |
| # Find all connected floor regions large enough to tile. All regions share | |
| # the primary region's homography so the tile grid continues seamlessly across | |
| # doorways without restarting. | |
| # --------------------------------------------------------------------------- | |
| def find_floor_regions(surface_mask: np.ndarray, min_area: int) -> list[np.ndarray]: | |
| count, labels, stats, _ = cv2.connectedComponentsWithStats( | |
| surface_mask.astype(np.uint8), connectivity=8 | |
| ) | |
| regions = [] | |
| for comp_id in range(1, count): | |
| if int(stats[comp_id, cv2.CC_STAT_AREA]) >= min_area: | |
| regions.append((labels == comp_id).astype(np.uint8)) | |
| regions.sort(key=lambda m: int(m.sum()), reverse=True) | |
| return regions | |
| def run_segmentation(img: Image.Image, img_np: np.ndarray): | |
| global seg_processor, seg_model | |
| if seg_model is None: | |
| _load_segmentation_model() | |
| h, w = img_np.shape[:2] | |
| if segmentation_backend == "oneformer": | |
| inputs = seg_processor( | |
| images=img, | |
| task_inputs=["semantic"], | |
| return_tensors="pt", | |
| ).to(device) | |
| with torch.no_grad(): | |
| outputs = seg_model(**inputs) | |
| result = seg_processor.post_process_semantic_segmentation( | |
| outputs, | |
| target_sizes=[(h, w)], | |
| )[0] | |
| return result.cpu().numpy().astype(np.uint8) | |
| if segmentation_backend == "mask2former": | |
| inputs = seg_processor(images=img, return_tensors="pt").to(device) | |
| with torch.no_grad(): | |
| outputs = seg_model(**inputs) | |
| is_panoptic = "panoptic" in MASK2FORMER_MODEL_NAME | |
| if is_panoptic: | |
| pan_result = seg_processor.post_process_panoptic_segmentation( | |
| outputs, | |
| target_sizes=[(h, w)], | |
| )[0] | |
| seg_map = np.zeros((h, w), dtype=np.uint8) | |
| pan_map = pan_result["segmentation"].cpu().numpy() | |
| for seg_info in pan_result["segments_info"]: | |
| seg_map[pan_map == seg_info["id"]] = min(seg_info["label_id"], 255) | |
| return seg_map | |
| result = seg_processor.post_process_semantic_segmentation( | |
| outputs, | |
| target_sizes=[(h, w)], | |
| )[0] | |
| return result.cpu().numpy().astype(np.uint8) | |
| inputs = seg_processor(images=img, return_tensors="pt").to(device) | |
| with torch.no_grad(): | |
| outputs = seg_model(**inputs) | |
| seg = outputs.logits.argmax(dim=1).squeeze().cpu().numpy() | |
| return cv2.resize(seg.astype(np.uint8), (w, h), interpolation=cv2.INTER_NEAREST) | |
| def segmenter_metadata_name() -> str: | |
| if segmentation_backend == "oneformer": | |
| return "oneformer-ade20k-swin-large" | |
| return segmentation_backend | |
| def build_segmentation_bundle(contents: bytes): | |
| t_start = time.perf_counter() | |
| t0 = time.perf_counter() | |
| img = Image.open(io.BytesIO(contents)).convert("RGB") | |
| MAX_DIM = 1280 | |
| if max(img.width, img.height) > MAX_DIM: | |
| scale = MAX_DIM / max(img.width, img.height) | |
| img = img.resize((int(img.width * scale), int(img.height * scale)), Image.LANCZOS) | |
| img_np = np.array(img) | |
| h, w = img_np.shape[:2] | |
| min_floor_area = max(1200, int(w * h * 0.015)) | |
| print(f"[TIMING] Image loading/parsing took {time.perf_counter() - t0:.3f} seconds", flush=True) | |
| t0 = time.perf_counter() | |
| seg_map = run_segmentation(img, img_np) | |
| print(f"[TIMING] Floor segmentation took {time.perf_counter() - t0:.3f} seconds", flush=True) | |
| t0 = time.perf_counter() | |
| success, jpeg_buf = cv2.imencode(".jpg", cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR), [cv2.IMWRITE_JPEG_QUALITY, 90]) | |
| pixels_b64 = base64.b64encode(jpeg_buf.tobytes()).decode() | |
| print(f"[TIMING] Image JPEG encoding took {time.perf_counter() - t0:.3f} seconds", flush=True) | |
| t0 = time.perf_counter() | |
| primary_floor_ids = class_ids(PRIMARY_FLOOR_CLASSES) | |
| floor_class_ids = class_ids(FLOOR_SURFACE_CLASSES) | |
| floor_mask = np.isin(seg_map, primary_floor_ids).astype(np.uint8) | |
| floor_mask = wall_subtract(floor_mask, seg_map, dilation=1) | |
| floor_mask = clean_floor_mask(floor_mask) | |
| if int(floor_mask.sum()) < min_floor_area: | |
| floor_mask = np.isin(seg_map, floor_class_ids).astype(np.uint8) | |
| floor_mask = wall_subtract(floor_mask, seg_map, dilation=1) | |
| floor_mask = clean_floor_mask(floor_mask) | |
| print(f"[TIMING] Floor masking/cleanup took {time.perf_counter() - t0:.3f} seconds", flush=True) | |
| t0 = time.perf_counter() | |
| depth = estimate_depth(img, w, h) | |
| print(f"[TIMING] Depth estimation took {time.perf_counter() - t0:.3f} seconds", flush=True) | |
| t0 = time.perf_counter() | |
| homography, plane = estimate_floor_plane(floor_mask, img_np) | |
| print(f"[TIMING] Plane fitting / homography calculation took {time.perf_counter() - t0:.3f} seconds", flush=True) | |
| t0 = time.perf_counter() | |
| quad = np.asarray(plane["quad"], dtype=np.float32).reshape(4, 2) if plane and plane.get("quad") else None | |
| surface_mask = build_floor_surface_mask(floor_mask, seg_map, quad, depth) | |
| print(f"[TIMING] Surface masking took {time.perf_counter() - t0:.3f} seconds", flush=True) | |
| t0 = time.perf_counter() | |
| shade_map, shade_range = None, (0.55, 1.35) | |
| if ENABLE_INTRINSIC_SHADING: | |
| if intrinsic_models is None: | |
| _load_intrinsic_model() | |
| if intrinsic_models is not None: | |
| shade_map, shade_range = build_intrinsic_shade_map(img_np, surface_mask) | |
| if shade_map is None: | |
| shade_map, shade_range = build_shade_map(img_np, surface_mask) | |
| print(f"[TIMING] Shade map construction took {time.perf_counter() - t0:.3f} seconds", flush=True) | |
| t0 = time.perf_counter() | |
| color_temperature = estimate_color_temperature(img_np, surface_mask) # B2 | |
| light_vector = estimate_light_vector(shade_map, surface_mask) # B3 | |
| confidence_map = build_confidence_map(surface_mask) # B10 | |
| print(f"[TIMING] Lighting analysis took {time.perf_counter() - t0:.3f} seconds", flush=True) | |
| # B7 — split the surface mask into connected regions; all share the same | |
| # homography so the tile grid is continuous across doorways. | |
| t0 = time.perf_counter() | |
| floor_regions = find_floor_regions(surface_mask, min_floor_area) | |
| multi_room = len(floor_regions) > 1 | |
| print(f"[TIMING] Floor region detection took {time.perf_counter() - t0:.3f} seconds", flush=True) | |
| t0 = time.perf_counter() | |
| segments = [] | |
| if floor_regions: | |
| for region_idx, region_mask in enumerate(floor_regions): | |
| region_indices = np.flatnonzero(region_mask.ravel()).astype(np.uint32) | |
| if len(region_indices) < min_floor_area: | |
| continue | |
| # Per-region confidence sub-map | |
| region_conf = build_confidence_map(region_mask) | |
| segments.append({ | |
| "id": region_idx, | |
| "className": "floor", | |
| "mask": base64.b64encode(region_indices.tobytes()).decode(), | |
| "homography": homography, # shared across all regions (B7) | |
| "plane": plane, | |
| "shadeMap": base64.b64encode(shade_map.tobytes()).decode() if shade_map is not None else None, | |
| "shadeRange": list(shade_range), # B4 — frontend decodes with this | |
| "colorTemperature": color_temperature, # B2 | |
| "lightVector": light_vector, # B3 | |
| "confidenceMap": base64.b64encode(region_conf.tobytes()).decode() if region_conf is not None else None, # B10 | |
| "multiRoom": multi_room, # B7 | |
| "gridGroup": "primary" if region_idx == 0 else f"room_{region_idx}", # B7 | |
| "metadata": { | |
| "segmenter": segmenter_metadata_name(), | |
| "floorPixels": int(floor_mask.sum()), | |
| "surfacePixels": int(region_mask.sum()), | |
| "depthEnabled": depth is not None, | |
| "shadingEnabled": shade_map is not None, | |
| }, | |
| }) | |
| if not segments: | |
| flat_seg = seg_map.ravel() | |
| for seg_id, class_id in enumerate(np.unique(flat_seg)): | |
| indices = np.where(flat_seg == class_id)[0].astype(np.uint32) | |
| if len(indices) < 1000: | |
| continue | |
| segments.append({ | |
| "id": int(seg_id), | |
| "className": class_name_for_id(int(class_id)), | |
| "mask": base64.b64encode(indices.tobytes()).decode(), | |
| "homography": None, | |
| "plane": None, | |
| "shadeMap": None, | |
| "shadeRange": None, | |
| "colorTemperature": None, | |
| "lightVector": None, | |
| "confidenceMap": None, | |
| "multiRoom": False, | |
| "gridGroup": None, | |
| "metadata": { | |
| "segmenter": segmenter_metadata_name(), | |
| "depthEnabled": depth is not None, | |
| "shadingEnabled": False, | |
| }, | |
| }) | |
| print(f"[TIMING] Total bundle processing completed in {time.perf_counter() - t_start:.3f} seconds", flush=True) | |
| return {"width": w, "height": h, "pixels": pixels_b64, "segments": segments} | |
| def job_path(job_id: str) -> Path: | |
| return JOB_DIR / f"{job_id}.json" | |
| def read_job(job_id: str): | |
| path = job_path(job_id) | |
| if not path.exists(): | |
| raise HTTPException(status_code=404, detail="Job not found.") | |
| return json.loads(path.read_text()) | |
| def write_job(job: dict): | |
| job_path(job["id"]).write_text(json.dumps(job)) | |
| def run_conversion_task(job_id: str, upload_path: Path): | |
| try: | |
| t_start = time.perf_counter() | |
| image_bytes = upload_path.read_bytes() | |
| bundle = build_segmentation_bundle(image_bytes) | |
| (JOB_DIR / f"{job_id}.bundle.json").write_text(json.dumps(bundle)) | |
| job = read_job(job_id) | |
| job["status"] = "COMPLETED" | |
| write_job(job) | |
| print(f"[TIMING] Background conversion task for job {job_id} took {time.perf_counter() - t_start:.3f} seconds", flush=True) | |
| except Exception as exc: | |
| print(f"Background conversion failed: {exc}", flush=True) | |
| try: | |
| job = read_job(job_id) | |
| job["status"] = "FAILED" | |
| job["error"] = str(exc) | |
| write_job(job) | |
| except Exception: | |
| pass | |
| async def convert_to_viz2d(background_tasks: BackgroundTasks, file: UploadFile = File(...)): | |
| if file.content_type and not file.content_type.startswith("image/"): | |
| raise HTTPException(status_code=400, detail="Upload must be a JPG or PNG image.") | |
| job_id = uuid.uuid4().hex | |
| ext = Path(file.filename or "room.jpg").suffix.lower() | |
| if ext not in {".jpg", ".jpeg", ".png", ".webp"}: | |
| ext = ".jpg" | |
| upload_path = UPLOAD_DIR / f"{job_id}{ext}" | |
| with upload_path.open("wb") as out: | |
| shutil.copyfileobj(file.file, out) | |
| job = { | |
| "id": job_id, | |
| "status": "PROCESSING", | |
| "inputUrl": f"/uploads/{upload_path.name}", | |
| "outputUrl": f"/viz2d/jobs/{job_id}/file", | |
| } | |
| write_job(job) | |
| background_tasks.add_task(run_conversion_task, job_id, upload_path) | |
| return job | |
| async def viz2d_job_status(job_id: str): | |
| return read_job(job_id) | |
| async def viz2d_job_file(job_id: str): | |
| job = read_job(job_id) | |
| if job.get("status") != "COMPLETED": | |
| raise HTTPException(status_code=409, detail="Job is not completed yet.") | |
| bundle_path = JOB_DIR / f"{job_id}.bundle.json" | |
| if not bundle_path.exists(): | |
| raise HTTPException(status_code=404, detail="Job output not found.") | |
| async def iter_file(): | |
| async with aiofiles.open(bundle_path, "rb") as f: | |
| data = await f.read() | |
| yield data | |
| return StreamingResponse(iter_file(), media_type="application/json") | |
| async def segment(file: UploadFile = File(...)): | |
| contents = await file.read() | |
| return build_segmentation_bundle(contents) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8002) | |