room-visualizer / app.py
GitHub Actions
Deploy from GitHub commit c41d8036f5d265065967accde19305ed620eb213
3c84d3b
import aiofiles
import asyncio
import base64
import io
import json
import os
import shutil
import time
try:
import tomllib
except ImportError:
try:
import tomli as tomllib
except ImportError:
try:
import tomlkit as tomllib
except ImportError:
raise ImportError(
"No TOML library found. Please run on Python 3.11+, or run 'pip install tomli' to support Python 3.10."
)
import uuid
from pathlib import Path
import cv2
import numpy as np
import torch
from fastapi import FastAPI, File, HTTPException, Response, UploadFile, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import StreamingResponse
from fastapi.staticfiles import StaticFiles
from PIL import Image
from transformers import (
AutoImageProcessor,
AutoModelForDepthEstimation,
Mask2FormerForUniversalSegmentation,
OneFormerForUniversalSegmentation,
OneFormerProcessor,
SegformerForSemanticSegmentation,
)
ADE20K_CLASSES = [
"wall", "building", "sky", "floor", "tree", "ceiling", "road", "bed",
"window", "grass", "cabinet", "sidewalk", "person", "ground", "door",
"table", "mountain", "plant", "curtain", "chair", "car", "water",
"painting", "sofa", "shelf", "house", "sea", "mirror", "rug", "field",
"armchair", "seat", "fence", "desk", "rock", "wardrobe", "lamp",
"bathtub", "railing", "cushion", "base", "box", "column", "signboard",
"chest of drawers", "counter", "sand", "sink", "skyscraper", "fireplace",
"refrigerator", "stairs", "runway", "bookcase", "blind", "coffee table",
"toilet", "flower", "book", "hill", "bench", "countertop", "stove",
"palm", "kitchen island", "computer", "swivel chair", "boat", "bar",
"arcade machine", "hovel", "bus", "towel", "light", "truck", "tower",
"chandelier", "awning", "streetlight", "booth", "television", "airplane",
"dirt track", "apparel", "pole", "land", "bannister", "escalator",
"ottoman", "bottle", "buffet", "poster", "stage", "van", "ship",
"fountain", "conveyer belt", "canopy", "washer", "plaything",
"swimming pool", "stool", "barrel", "basket", "waterfall", "tent",
"bag", "minibike", "cradle", "oven", "ball", "food", "step", "tank",
"trade name", "microwave", "pot", "animal", "bicycle", "lake",
"dishwasher", "screen", "blanket", "sculpture", "hood", "sconce",
"vase", "traffic light", "tray", "ashcan", "fan", "pier", "crt screen",
"plate", "monitor", "bulletin board", "shower", "radiator", "glass",
"clock", "flag",
]
def load_config() -> dict:
config_path = os.getenv("VISUALIZER_CONFIG")
if not config_path:
return {}
path = Path(config_path).expanduser()
if not path.is_absolute():
path = Path(__file__).resolve().parent / path
if not path.exists():
raise RuntimeError(f"VISUALIZER_CONFIG does not exist: {path}")
with path.open("rb") as config_file:
return tomllib.load(config_file)
CONFIG = load_config()
def config_value(env_name: str, section: str, key: str, default):
if env_name in os.environ:
return os.environ[env_name]
return CONFIG.get(section, {}).get(key, default)
SEGMENTATION_MODEL = str(
config_value("SEGMENTATION_MODEL", "models", "segmentation_model", "oneformer")
).lower()
ONEFORMER_MODEL_NAME = str(config_value(
"ONEFORMER_MODEL_NAME",
"models",
"oneformer_model_name",
"shi-labs/oneformer_ade20k_swin_large",
))
MASK2FORMER_MODEL_NAME = str(config_value(
"MASK2FORMER_MODEL_NAME",
"models",
"mask2former_model_name",
"facebook/mask2former-swin-small-ade-semantic",
))
SEGFORMER_MODEL_NAME = str(config_value(
"SEGFORMER_MODEL_NAME",
"models",
"segformer_model_name",
"nvidia/segformer-b2-finetuned-ade-512-512",
))
DEPTH_MODEL_NAME = str(config_value(
"DEPTH_MODEL_NAME",
"models",
"depth_model_name",
"Intel/dpt-large",
))
ENABLE_DEPTH_ESTIMATION = str(config_value(
"ENABLE_DEPTH_ESTIMATION",
"runtime",
"enable_depth_estimation",
"1",
)).lower() in {"1", "true", "yes", "on"}
INTRINSIC_MODEL_VERSION = str(config_value(
"INTRINSIC_MODEL_VERSION",
"models",
"intrinsic_model_version",
"v2",
))
ENABLE_INTRINSIC_SHADING = str(config_value(
"ENABLE_INTRINSIC_SHADING",
"runtime",
"enable_intrinsic_shading",
"0",
)).lower() in {"1", "true", "yes", "on"}
VISUALIZER_DATA_DIR = str(config_value(
"VISUALIZER_DATA_DIR",
"runtime",
"data_dir",
"data",
))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
seg_processor = None
seg_model = None
segmentation_backend = "segformer"
depth_processor = None
depth_model = None
intrinsic_models = None
def hf_offline() -> bool:
return os.getenv("HF_HUB_OFFLINE") == "1" or os.getenv("TRANSFORMERS_OFFLINE") == "1"
def _load_segmentation_model():
global seg_processor, seg_model, segmentation_backend
if SEGMENTATION_MODEL == "oneformer":
try:
print(f"Loading OneFormer: {ONEFORMER_MODEL_NAME} ...", flush=True)
seg_processor = OneFormerProcessor.from_pretrained(
ONEFORMER_MODEL_NAME,
local_files_only=hf_offline(),
)
seg_model = OneFormerForUniversalSegmentation.from_pretrained(
ONEFORMER_MODEL_NAME,
local_files_only=hf_offline(),
).to(device)
seg_model.eval()
segmentation_backend = "oneformer"
print("OneFormer loaded.", flush=True)
return
except Exception as exc:
print(f"OneFormer failed ({exc}), falling back to Mask2Former.", flush=True)
if SEGMENTATION_MODEL in {"oneformer", "mask2former"}:
try:
print(f"Loading Mask2Former: {MASK2FORMER_MODEL_NAME} ...", flush=True)
seg_processor = AutoImageProcessor.from_pretrained(
MASK2FORMER_MODEL_NAME,
local_files_only=hf_offline(),
)
seg_model = Mask2FormerForUniversalSegmentation.from_pretrained(
MASK2FORMER_MODEL_NAME,
local_files_only=hf_offline(),
).to(device)
seg_model.eval()
segmentation_backend = "mask2former"
print("Mask2Former loaded.", flush=True)
return
except Exception as exc:
print(f"Mask2Former failed ({exc}), falling back to SegFormer.", flush=True)
print(f"Loading SegFormer: {SEGFORMER_MODEL_NAME} ...", flush=True)
seg_processor = AutoImageProcessor.from_pretrained(
SEGFORMER_MODEL_NAME,
local_files_only=hf_offline(),
)
seg_model = SegformerForSemanticSegmentation.from_pretrained(
SEGFORMER_MODEL_NAME,
local_files_only=hf_offline(),
).to(device)
seg_model.eval()
segmentation_backend = "segformer"
print("SegFormer loaded.", flush=True)
def _load_intrinsic_model():
global intrinsic_models
if ENABLE_INTRINSIC_SHADING and intrinsic_models is None:
try:
print(f"Loading Intrinsic Image Decomposition model: {INTRINSIC_MODEL_VERSION} ...", flush=True)
from intrinsic.pipeline import load_models
intrinsic_models = load_models(INTRINSIC_MODEL_VERSION, device=str(device))
print("Intrinsic model loaded.", flush=True)
except Exception as exc:
print(f"Intrinsic model failed to load ({exc}). Falling back to luminance shading.", flush=True)
app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)
app.add_middleware(
CORSMiddleware,
allow_origins=["https://room-editor-9y3b.vercel.app"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
DATA_DIR = Path(VISUALIZER_DATA_DIR).resolve()
UPLOAD_DIR = DATA_DIR / "uploads"
JOB_DIR = DATA_DIR / "jobs"
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
JOB_DIR.mkdir(parents=True, exist_ok=True)
app.mount("/uploads", StaticFiles(directory=UPLOAD_DIR), name="uploads")
PRIMARY_FLOOR_CLASSES = {"floor"}
FLOOR_SURFACE_CLASSES = {
"floor", "road", "sidewalk", "ground", "field", "grass", "sand",
"runway", "dirt track", "land", "stairs", "step",
}
REJECT_SURFACE_CLASSES = {"wall", "ceiling", "building", "sky", "window"}
OCCLUDER_CLASSES = {
"bed", "cabinet", "person", "door", "table", "plant", "curtain", "chair",
"car", "painting", "sofa", "shelf", "mirror", "rug", "armchair", "seat", "desk",
"wardrobe", "lamp", "bathtub", "railing", "cushion", "base", "box",
"column", "chest of drawers", "counter", "sink", "fireplace",
"refrigerator", "bookcase", "blind", "coffee table", "toilet", "bench",
"countertop", "stove", "kitchen island", "computer", "swivel chair",
"bar", "ottoman", "bottle", "buffet", "poster", "towel", "television",
"washer", "plaything", "stool", "basket", "bag", "cradle", "oven",
"ball", "food", "microwave", "pot", "dishwasher", "blanket", "sculpture",
"vase", "tray", "fan", "plate", "monitor", "shower", "radiator", "clock",
}
def class_name_for_id(class_id: int) -> str:
return ADE20K_CLASSES[class_id] if class_id < len(ADE20K_CLASSES) else f"class_{class_id}"
def class_ids(names: set[str]) -> list[int]:
return [idx for idx, name in enumerate(ADE20K_CLASSES) if name in names]
def estimate_depth(img: Image.Image, width: int, height: int):
global depth_processor, depth_model
if not ENABLE_DEPTH_ESTIMATION:
return None
model_name = DEPTH_MODEL_NAME
try:
if depth_processor is None or depth_model is None:
print(f"Loading depth model: {model_name} ...", flush=True)
depth_processor = AutoImageProcessor.from_pretrained(
model_name,
local_files_only=hf_offline(),
)
depth_model = AutoModelForDepthEstimation.from_pretrained(
model_name,
local_files_only=hf_offline(),
).to(device)
depth_model.eval()
print("Depth model loaded.", flush=True)
inputs = depth_processor(images=img, return_tensors="pt").to(device)
with torch.no_grad():
outputs = depth_model(**inputs)
depth = torch.nn.functional.interpolate(
outputs.predicted_depth.unsqueeze(1),
size=(height, width),
mode="bicubic",
align_corners=False,
).squeeze().cpu().numpy()
depth = cv2.GaussianBlur(depth.astype(np.float32), (0, 0), sigmaX=3)
depth_min, depth_max = float(np.min(depth)), float(np.max(depth))
if depth_max - depth_min < 1e-6:
return None
return (depth - depth_min) / (depth_max - depth_min)
except Exception as exc:
print(f"Depth estimation skipped ({exc}).", flush=True)
return None
# ---------------------------------------------------------------------------
# B4 — Shade Range Expansion
# Encode the shade multiplier using the actual brightness spread of the floor
# rather than a hardcoded [0.55, 1.35] clip, so dark-room images preserve the
# full dynamic range of their shadow patterns.
# ---------------------------------------------------------------------------
def _adaptive_shade_range(relative: np.ndarray, floor_mask: np.ndarray) -> tuple[float, float]:
floor_vals = relative[floor_mask > 0]
if floor_vals.size == 0:
return (0.55, 1.35)
lo = max(0.25, float(np.percentile(floor_vals, 1)))
hi = min(2.5, float(np.percentile(floor_vals, 99)))
span = hi - lo
if span < 0.4:
mid = (lo + hi) / 2.0
lo, hi = mid - 0.2, mid + 0.2
return lo, hi
def _encode_shade(relative: np.ndarray, lo: float, hi: float) -> np.ndarray:
span = hi - lo
return np.round((np.clip(relative, lo, hi) - lo) * (255.0 / span)).clip(0, 255).astype(np.uint8)
# ---------------------------------------------------------------------------
# B1 — Shadow Map Extraction
# Luminance-based shade map; returns (encoded_uint8, (lo, hi)) so the frontend
# can decode with the correct range.
# ---------------------------------------------------------------------------
def build_shade_map(
img_np: np.ndarray, surface_mask: np.ndarray
) -> tuple[np.ndarray | None, tuple[float, float]]:
default_range = (0.55, 1.35)
if not surface_mask.any():
return None, default_range
mask = surface_mask.astype(np.uint8)
luminance = (
img_np[:, :, 0].astype(np.float32) * 0.299
+ img_np[:, :, 1].astype(np.float32) * 0.587
+ img_np[:, :, 2].astype(np.float32) * 0.114
)
h, w = mask.shape[:2]
floor_values = luminance[mask > 0]
if floor_values.size < max(256, int(h * w * 0.002)):
return None, default_range
median_lum = float(np.median(floor_values))
if median_lum < 1e-3:
return None, default_range
filled = luminance.copy()
filled[mask == 0] = median_lum
missing = (mask == 0).astype(np.uint8) * 255
try:
filled = cv2.inpaint(
np.clip(filled, 0, 255).astype(np.uint8),
missing,
max(3, min(h, w) // 160),
cv2.INPAINT_TELEA,
).astype(np.float32)
except cv2.error:
pass
sigma = max(8.0, min(h, w) / 28.0)
smooth = cv2.GaussianBlur(filled, (0, 0), sigmaX=sigma, sigmaY=sigma)
relative = smooth / median_lum
relative[mask == 0] = 1.0
lo, hi = _adaptive_shade_range(relative, mask)
return _encode_shade(relative, lo, hi), (lo, hi)
def build_intrinsic_shade_map(
img_np: np.ndarray, surface_mask: np.ndarray
) -> tuple[np.ndarray | None, tuple[float, float]]:
default_range = (0.55, 1.35)
if not surface_mask.any() or intrinsic_models is None:
return None, default_range
try:
img_float = img_np.astype(np.float32) / 255.0
from intrinsic.pipeline import run_pipeline
results = run_pipeline(intrinsic_models, img_float, device=str(device))
shading = None
if "gry_shd" in results:
shading = results["gry_shd"]
elif "dif_shd" in results:
dif = results["dif_shd"]
shading = dif[:, :, 0] * 0.299 + dif[:, :, 1] * 0.587 + dif[:, :, 2] * 0.114
else:
for k in results.keys():
if "shd" in k or "shading" in k:
shading = results[k]
if len(shading.shape) == 3:
shading = shading[:, :, 0] * 0.299 + shading[:, :, 1] * 0.587 + shading[:, :, 2] * 0.114
break
if shading is None:
return None, default_range
h, w = surface_mask.shape[:2]
if shading.shape[:2] != (h, w):
shading = cv2.resize(shading, (w, h), interpolation=cv2.INTER_LINEAR)
sigma = max(3.0, min(h, w) / 80.0)
shading = cv2.GaussianBlur(shading.astype(np.float32), (0, 0), sigmaX=sigma, sigmaY=sigma)
floor_vals = shading[surface_mask > 0]
if floor_vals.size == 0:
return None, default_range
median_val = float(np.median(floor_vals))
if median_val < 1e-3:
return None, default_range
relative_shading = shading / median_val
relative_shading[surface_mask == 0] = 1.0
lo, hi = _adaptive_shade_range(relative_shading, surface_mask)
return _encode_shade(relative_shading, lo, hi), (lo, hi)
except Exception as exc:
print(f"Intrinsic shading decomposition failed: {exc}. Falling back to default luminance shading.", flush=True)
return None, default_range
# ---------------------------------------------------------------------------
# B2 — Color Temperature
# Sample the brightest floor pixels to infer the room's lighting colour cast
# and approximate Kelvin value. Returns a dict with `kelvin` and `cast`
# (normalised RGB multipliers) so the frontend can tint replacement tiles.
# ---------------------------------------------------------------------------
def estimate_color_temperature(
img_np: np.ndarray, surface_mask: np.ndarray
) -> dict | None:
if not surface_mask.any():
return None
pixels = img_np[surface_mask > 0].astype(np.float32)
if len(pixels) < 100:
return None
lum = pixels[:, 0] * 0.299 + pixels[:, 1] * 0.587 + pixels[:, 2] * 0.114
thresh = float(np.percentile(lum, 70))
bright = pixels[lum >= thresh]
if len(bright) < 10:
bright = pixels
mr = float(np.mean(bright[:, 0]))
mg = float(np.mean(bright[:, 1]))
mb = float(np.mean(bright[:, 2]))
ref = max(mr, mg, mb, 1e-3)
rb = mr / max(mb, 1.0)
if rb > 1.6:
kelvin = 2700
elif rb > 1.3:
kelvin = 3200
elif rb > 1.1:
kelvin = 4000
elif rb > 0.9:
kelvin = 5500
elif rb > 0.7:
kelvin = 6500
else:
kelvin = 8000
return {
"kelvin": kelvin,
"cast": {"r": round(mr / ref, 4), "g": round(mg / ref, 4), "b": round(mb / ref, 4)},
}
# ---------------------------------------------------------------------------
# B3 — Light Vector
# Estimate the primary in-plane light direction from the gradient of the shade
# map. Returns a normalised {x, y} vector pointing toward the light source.
# ---------------------------------------------------------------------------
def estimate_light_vector(
shade_map: np.ndarray | None, surface_mask: np.ndarray
) -> dict | None:
if shade_map is None or not surface_mask.any():
return None
shade_f = shade_map.astype(np.float32)
valid = surface_mask.astype(np.float32)
kern = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
valid_e = cv2.erode(valid, kern, iterations=2)
clean = shade_f * valid_e
gx = cv2.Sobel(clean, cv2.CV_32F, 1, 0, ksize=15) * valid_e
gy = cv2.Sobel(clean, cv2.CV_32F, 0, 1, ksize=15) * valid_e
mag = np.hypot(gx, gy)
total = float(mag.sum())
if total < 1e-6:
return None
lx = float((gx * mag).sum()) / total
ly = float((gy * mag).sum()) / total
norm = float(np.hypot(lx, ly))
if norm < 1e-6:
return None
return {"x": round(lx / norm, 4), "y": round(ly / norm, 4)}
def clean_floor_mask(mask: np.ndarray) -> np.ndarray:
if mask.dtype != np.uint8:
mask = mask.astype(np.uint8)
h, w = mask.shape[:2]
min_side = max(3, min(h, w))
close_size = max(5, int(round(min_side * 0.018))) | 1
open_size = max(3, int(round(min_side * 0.006))) | 1
closed = cv2.morphologyEx(
mask,
cv2.MORPH_CLOSE,
cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (close_size, close_size)),
)
cleaned = cv2.morphologyEx(
closed,
cv2.MORPH_OPEN,
cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (open_size, open_size)),
)
count, labels, stats, _ = cv2.connectedComponentsWithStats(cleaned, connectivity=8)
if count <= 1:
return cleaned
gravity_threshold = int(h * 0.60)
min_area = max(1000, int(h * w * 0.01))
result = np.zeros_like(cleaned)
for component_id in range(1, count):
area = stats[component_id, cv2.CC_STAT_AREA]
if area < min_area:
continue
comp_bottom = stats[component_id, cv2.CC_STAT_TOP] + stats[component_id, cv2.CC_STAT_HEIGHT]
if comp_bottom <= gravity_threshold:
continue
result[labels == component_id] = 1
if result.any():
return result
largest = 1 + int(np.argmax(stats[1:, cv2.CC_STAT_AREA]))
return (labels == largest).astype(np.uint8)
def wall_subtract(mask: np.ndarray, seg_map: np.ndarray, dilation: int = 1) -> np.ndarray:
reject_raw = np.isin(seg_map, class_ids(REJECT_SURFACE_CLASSES)).astype(np.uint8)
if dilation > 0:
kern = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
reject_raw = cv2.dilate(reject_raw, kern, iterations=dilation)
result = mask.copy()
result[reject_raw > 0] = 0
return result
def fit_floor_edges(mask: np.ndarray):
h, w = mask.shape[:2]
row_ys, lefts, rights = [], [], []
step = max(1, h // 260)
for y in range(0, h, step):
row_xs = np.where(mask[y] > 0)[0]
if len(row_xs) < max(8, w * 0.01):
continue
row_ys.append(float(y))
lefts.append(float(np.percentile(row_xs, 3)))
rights.append(float(np.percentile(row_xs, 97)))
if len(row_ys) < 8:
return None
row_ys_np = np.asarray(row_ys, dtype=np.float32)
return np.polyfit(row_ys_np, np.asarray(lefts, dtype=np.float32), 1), np.polyfit(
row_ys_np,
np.asarray(rights, dtype=np.float32),
1,
)
# ---------------------------------------------------------------------------
# B8 — Convex Hull Quad Fitting
# Derive a tight bounding quadrilateral from the convex hull of the floor mask.
# Used alongside the linear edge-fit quad so that corners of L-shaped rooms
# and irregular floor boundaries are fully covered.
# ---------------------------------------------------------------------------
def convex_hull_quad(mask: np.ndarray) -> np.ndarray | None:
ys, xs = np.where(mask > 0)
if len(xs) < 50:
return None
pts = np.column_stack([xs, ys]).astype(np.float32)
hull = cv2.convexHull(pts)
if hull is None or len(hull) < 4:
return None
rect = cv2.minAreaRect(hull.squeeze())
box = cv2.boxPoints(rect) # (4, 2) — x,y columns
h, w = mask.shape[:2]
box[:, 0] = np.clip(box[:, 0], 0, w - 1)
box[:, 1] = np.clip(box[:, 1], 0, h - 1)
return box
# ---------------------------------------------------------------------------
# B6 — Dual Vanishing Point Detection
# Detect two independent VPs: one from positive-slope lines (converging right)
# and one from negative-slope lines (converging left), covering oblique shots
# and corner-camera perspectives.
# ---------------------------------------------------------------------------
def detect_dual_vanishing_points(
img_np: np.ndarray, floor_mask: np.ndarray
) -> tuple[dict | None, dict | None]:
gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY)
gray = cv2.GaussianBlur(gray, (5, 5), 0)
edges = cv2.Canny(gray, 60, 160)
edges[floor_mask == 0] = 0
lines = cv2.HoughLinesP(
edges,
rho=1,
theta=np.pi / 180,
threshold=60,
minLineLength=max(40, min(img_np.shape[:2]) // 16),
maxLineGap=24,
)
if lines is None:
return None, None
h, w = img_np.shape[:2]
pos_lines, neg_lines = [], []
for line in lines[:, 0, :]:
x1, y1, x2, y2 = [float(v) for v in line]
dx, dy = x2 - x1, y2 - y1
length = float(np.hypot(dx, dy))
if length < 40 or abs(dx) < 1:
continue
slope = dy / dx
if abs(slope) < 0.18:
continue
entry = (x1, y1, x2, y2, slope, length)
if slope > 0:
pos_lines.append(entry)
else:
neg_lines.append(entry)
def _find_vp(group: list) -> dict | None:
intersections = []
for i, (x1, y1, _, _, s1, l1) in enumerate(group):
a1 = y1 - s1 * x1
for x3, y3, _, _, s2, l2 in group[i + 1:]:
if abs(s1 - s2) < 0.08:
continue
denom = s1 - s2
if abs(denom) < 1e-9:
continue
x = (a2 := y3 - s2 * x3, (a2 - a1) / denom)[1]
y = s1 * x + a1
if -w * 0.6 <= x <= w * 1.6 and -h * 1.2 <= y <= h * 1.0:
intersections.append((x, y, min(l1, l2)))
if len(intersections) < 3:
return None
pts = np.array([[p[0], p[1]] for p in intersections], np.float32)
weights = np.array([p[2] for p in intersections], np.float32)
center = np.average(pts, axis=0, weights=weights)
dist = np.linalg.norm(pts - center, axis=1)
keep = dist <= np.percentile(dist, 70)
if keep.sum() >= 3:
center = np.average(pts[keep], axis=0, weights=weights[keep])
return {"x": float(center[0]), "y": float(center[1])}
vp_right = _find_vp(pos_lines) # positive-slope lines converge to the right
vp_left = _find_vp(neg_lines) # negative-slope lines converge to the left
# Primary VP = the one whose y is lower in the image (closer to the horizon)
candidates = [(vp, abs(vp["y"])) for vp in [vp_right, vp_left] if vp is not None]
if not candidates:
return None, None
candidates.sort(key=lambda t: t[1])
primary = candidates[0][0]
secondary = candidates[1][0] if len(candidates) > 1 else None
return primary, secondary
def estimate_floor_plane(mask: np.ndarray, img_np: np.ndarray):
ys, xs = np.where(mask > 0)
if len(xs) < 1000:
return None, None
xs_f, ys_f = xs.astype(np.float32), ys.astype(np.float32)
x1, x2 = float(np.percentile(xs_f, 1)), float(np.percentile(xs_f, 99))
y1, y2 = float(np.percentile(ys_f, 1)), float(np.percentile(ys_f, 99))
width, height = x2 - x1, y2 - y1
if width < 20 or height < 20:
return None, None
top_y = float(np.percentile(ys_f, 8))
bottom_y = float(np.percentile(ys_f, 97))
edge_fits = fit_floor_edges(mask)
if edge_fits is None:
return None, None
left_fit, right_fit = edge_fits
top_left = float(np.polyval(left_fit, top_y))
top_right = float(np.polyval(right_fit, top_y))
bottom_left = float(np.polyval(left_fit, bottom_y))
bottom_right = float(np.polyval(right_fit, bottom_y))
lower_xs = xs_f[ys_f >= np.percentile(ys_f, 80)]
bottom_left = min(bottom_left, float(np.percentile(lower_xs, 4)))
bottom_right = max(bottom_right, float(np.percentile(lower_xs, 96)))
min_top_width = max(24.0, width * 0.18)
top_center = (top_left + top_right) * 0.5
if top_right - top_left < min_top_width:
top_left = top_center - min_top_width * 0.5
top_right = top_center + min_top_width * 0.5
min_bottom_width = max(min_top_width * 1.25, width * 0.45)
bottom_center = (bottom_left + bottom_right) * 0.5
if bottom_right - bottom_left < min_bottom_width:
bottom_left = bottom_center - min_bottom_width * 0.5
bottom_right = bottom_center + min_bottom_width * 0.5
h, w = mask.shape[:2]
src = np.float32([
[np.clip(bottom_left, 0, w - 1), np.clip(bottom_y, 0, h - 1)],
[np.clip(bottom_right, 0, w - 1), np.clip(bottom_y, 0, h - 1)],
[np.clip(top_right, 0, w - 1), np.clip(top_y, 0, h - 1)],
[np.clip(top_left, 0, w - 1), np.clip(top_y, 0, h - 1)],
])
# B6 — use dual VP; primary VP guides top-edge convergence
vanishing_point, vanishing_point2 = detect_dual_vanishing_points(img_np, mask)
if vanishing_point is not None and vanishing_point["y"] < bottom_y:
vp_x = float(np.clip(vanishing_point["x"], -w * 0.25, w * 1.25))
top_width = max(src[2][0] - src[3][0], width * 0.16)
horizon_gap = max(bottom_y - top_y, 1.0)
convergence = np.clip((top_y - vanishing_point["y"]) / horizon_gap, 0.12, 0.75)
top_center = top_center * (1 - convergence * 0.35) + vp_x * (convergence * 0.35)
src[3][0] = np.clip(top_center - top_width * 0.5, 0, w - 1)
src[2][0] = np.clip(top_center + top_width * 0.5, 0, w - 1)
# B8 — expand src quad to cover convex hull corners not reached by linear fits
hull_box = convex_hull_quad(mask)
hull_quad_list = hull_box.flatten().tolist() if hull_box is not None else None
if hull_box is not None:
hull_bottom_y = float(np.max(hull_box[:, 1]))
hull_top_y = float(np.min(hull_box[:, 1]))
hull_left_x = float(np.min(hull_box[:, 0]))
hull_right_x = float(np.max(hull_box[:, 0]))
src[0][0] = min(src[0][0], hull_left_x)
src[1][0] = max(src[1][0], hull_right_x)
src[0][1] = src[1][1] = max(src[0][1], hull_bottom_y)
src[2][1] = src[3][1] = min(src[2][1], hull_top_y)
src = np.clip(src, [0, 0], [w - 1, h - 1]).astype(np.float32)
if cv2.contourArea(src.reshape(-1, 1, 2)) < 100:
return None, None
dst = np.float32([[x1, y2], [x2, y2], [x2, y1], [x1, y1]])
homography = cv2.getPerspectiveTransform(src, dst).flatten().tolist()
return homography, {
"x": x1,
"y": y1,
"width": width,
"height": height,
"quad": src.flatten().tolist(),
"hullQuad": hull_quad_list, # B8
"vanishingPoint": vanishing_point, # B6 primary
"vanishingPoint2": vanishing_point2, # B6 secondary
}
# ---------------------------------------------------------------------------
# B5 — Complement-Stamp Furniture
# Use a single dilation pass (down from two) and restore the narrow contact
# zone directly below each occluder so chair legs, table bases, and plant pots
# sit flush against the tile surface without a visible gap or halo.
# ---------------------------------------------------------------------------
def build_floor_surface_mask(
floor_mask: np.ndarray,
seg_map: np.ndarray,
quad: np.ndarray | None,
depth: np.ndarray | None,
):
h, w = floor_mask.shape[:2]
kern_size = max(5, min(h, w) // 160) | 1
kern = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kern_size, kern_size))
occluder_mask = np.isin(seg_map, class_ids(OCCLUDER_CLASSES)).astype(np.uint8)
# One dilation pass instead of two — keeps the occluder boundary tight so
# furniture feet don't leave a visible halo on the replaced tile surface.
occ_dilated = cv2.dilate(occluder_mask, kern, iterations=1)
reject_mask = np.isin(seg_map, class_ids(REJECT_SURFACE_CLASSES)).astype(np.uint8)
reject_dilated = cv2.dilate(reject_mask, kern, iterations=2)
surface = floor_mask.copy()
surface[reject_dilated > 0] = 0
if not surface.any():
surface = floor_mask.copy()
contours, _ = cv2.findContours(surface, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
filled = np.zeros((h, w), dtype=np.uint8)
cv2.drawContours(filled, contours, -1, 1, cv2.FILLED)
filled[reject_dilated > 0] = 0
surface = filled
if quad is not None and surface.any():
plane_mask = np.zeros((h, w), dtype=np.uint8)
cv2.fillConvexPoly(plane_mask, np.round(quad).astype(np.int32), 1)
plane_mask[reject_dilated > 0] = 0
near_floor = cv2.dilate(surface, kern, iterations=6)
surface = cv2.bitwise_or(surface, cv2.bitwise_and(plane_mask, near_floor))
surface[occ_dilated > 0] = 0
if depth is not None and floor_mask.any():
floor_depth = depth[floor_mask > 0]
lo, hi = float(np.percentile(floor_depth, 2)), float(np.percentile(floor_depth, 98))
margin = max(0.08, (hi - lo) * 0.35)
depth_keep = (depth >= lo - margin) & (depth <= hi + margin)
surface = (surface & depth_keep.astype(np.uint8)).astype(np.uint8)
surface[floor_mask > 0] = np.maximum(surface[floor_mask > 0], 1)
surface[occ_dilated > 0] = 0
surface[reject_dilated > 0] = 0
surface = clean_floor_mask(surface)
surface[occ_dilated > 0] = 0
surface[reject_dilated > 0] = 0
boundary_kern = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
surface = cv2.dilate(surface, boundary_kern, iterations=1)
surface[occ_dilated > 0] = 0
surface[reject_dilated > 0] = 0
# Restore the narrow contact zone at the bottom edge of each occluder so
# furniture touches the tile surface naturally (B5).
contact_kern_v = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 3))
occ_eroded = cv2.erode(occluder_mask, contact_kern_v, iterations=1)
occ_bottom_edge = cv2.subtract(occluder_mask, occ_eroded)
contact_tiny = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
contact_zone = cv2.dilate(occ_bottom_edge, contact_tiny, iterations=1)
restore = cv2.bitwise_and(contact_zone, floor_mask)
surface = cv2.bitwise_or(surface, restore)
surface[reject_dilated > 0] = 0
return surface
# ---------------------------------------------------------------------------
# B10 — Confidence-Aware Boundaries
# Distance-transform the surface mask so pixels near its edge get a low
# confidence score. The frontend uses this to feather tile blending at
# boundary transitions instead of a hard cut.
# ---------------------------------------------------------------------------
def build_confidence_map(surface_mask: np.ndarray) -> np.ndarray | None:
if not surface_mask.any():
return None
dist = cv2.distanceTransform(surface_mask.astype(np.uint8), cv2.DIST_L2, 5)
feather = max(10.0, min(surface_mask.shape[:2]) / 50.0)
confidence = np.clip(dist / feather, 0.0, 1.0)
return (confidence * 255).astype(np.uint8)
# ---------------------------------------------------------------------------
# B7 — Multi-Room Grid Alignment
# Find all connected floor regions large enough to tile. All regions share
# the primary region's homography so the tile grid continues seamlessly across
# doorways without restarting.
# ---------------------------------------------------------------------------
def find_floor_regions(surface_mask: np.ndarray, min_area: int) -> list[np.ndarray]:
count, labels, stats, _ = cv2.connectedComponentsWithStats(
surface_mask.astype(np.uint8), connectivity=8
)
regions = []
for comp_id in range(1, count):
if int(stats[comp_id, cv2.CC_STAT_AREA]) >= min_area:
regions.append((labels == comp_id).astype(np.uint8))
regions.sort(key=lambda m: int(m.sum()), reverse=True)
return regions
def run_segmentation(img: Image.Image, img_np: np.ndarray):
global seg_processor, seg_model
if seg_model is None:
_load_segmentation_model()
h, w = img_np.shape[:2]
if segmentation_backend == "oneformer":
inputs = seg_processor(
images=img,
task_inputs=["semantic"],
return_tensors="pt",
).to(device)
with torch.no_grad():
outputs = seg_model(**inputs)
result = seg_processor.post_process_semantic_segmentation(
outputs,
target_sizes=[(h, w)],
)[0]
return result.cpu().numpy().astype(np.uint8)
if segmentation_backend == "mask2former":
inputs = seg_processor(images=img, return_tensors="pt").to(device)
with torch.no_grad():
outputs = seg_model(**inputs)
is_panoptic = "panoptic" in MASK2FORMER_MODEL_NAME
if is_panoptic:
pan_result = seg_processor.post_process_panoptic_segmentation(
outputs,
target_sizes=[(h, w)],
)[0]
seg_map = np.zeros((h, w), dtype=np.uint8)
pan_map = pan_result["segmentation"].cpu().numpy()
for seg_info in pan_result["segments_info"]:
seg_map[pan_map == seg_info["id"]] = min(seg_info["label_id"], 255)
return seg_map
result = seg_processor.post_process_semantic_segmentation(
outputs,
target_sizes=[(h, w)],
)[0]
return result.cpu().numpy().astype(np.uint8)
inputs = seg_processor(images=img, return_tensors="pt").to(device)
with torch.no_grad():
outputs = seg_model(**inputs)
seg = outputs.logits.argmax(dim=1).squeeze().cpu().numpy()
return cv2.resize(seg.astype(np.uint8), (w, h), interpolation=cv2.INTER_NEAREST)
def segmenter_metadata_name() -> str:
if segmentation_backend == "oneformer":
return "oneformer-ade20k-swin-large"
return segmentation_backend
def build_segmentation_bundle(contents: bytes):
t_start = time.perf_counter()
t0 = time.perf_counter()
img = Image.open(io.BytesIO(contents)).convert("RGB")
MAX_DIM = 1280
if max(img.width, img.height) > MAX_DIM:
scale = MAX_DIM / max(img.width, img.height)
img = img.resize((int(img.width * scale), int(img.height * scale)), Image.LANCZOS)
img_np = np.array(img)
h, w = img_np.shape[:2]
min_floor_area = max(1200, int(w * h * 0.015))
print(f"[TIMING] Image loading/parsing took {time.perf_counter() - t0:.3f} seconds", flush=True)
t0 = time.perf_counter()
seg_map = run_segmentation(img, img_np)
print(f"[TIMING] Floor segmentation took {time.perf_counter() - t0:.3f} seconds", flush=True)
t0 = time.perf_counter()
success, jpeg_buf = cv2.imencode(".jpg", cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR), [cv2.IMWRITE_JPEG_QUALITY, 90])
pixels_b64 = base64.b64encode(jpeg_buf.tobytes()).decode()
print(f"[TIMING] Image JPEG encoding took {time.perf_counter() - t0:.3f} seconds", flush=True)
t0 = time.perf_counter()
primary_floor_ids = class_ids(PRIMARY_FLOOR_CLASSES)
floor_class_ids = class_ids(FLOOR_SURFACE_CLASSES)
floor_mask = np.isin(seg_map, primary_floor_ids).astype(np.uint8)
floor_mask = wall_subtract(floor_mask, seg_map, dilation=1)
floor_mask = clean_floor_mask(floor_mask)
if int(floor_mask.sum()) < min_floor_area:
floor_mask = np.isin(seg_map, floor_class_ids).astype(np.uint8)
floor_mask = wall_subtract(floor_mask, seg_map, dilation=1)
floor_mask = clean_floor_mask(floor_mask)
print(f"[TIMING] Floor masking/cleanup took {time.perf_counter() - t0:.3f} seconds", flush=True)
t0 = time.perf_counter()
depth = estimate_depth(img, w, h)
print(f"[TIMING] Depth estimation took {time.perf_counter() - t0:.3f} seconds", flush=True)
t0 = time.perf_counter()
homography, plane = estimate_floor_plane(floor_mask, img_np)
print(f"[TIMING] Plane fitting / homography calculation took {time.perf_counter() - t0:.3f} seconds", flush=True)
t0 = time.perf_counter()
quad = np.asarray(plane["quad"], dtype=np.float32).reshape(4, 2) if plane and plane.get("quad") else None
surface_mask = build_floor_surface_mask(floor_mask, seg_map, quad, depth)
print(f"[TIMING] Surface masking took {time.perf_counter() - t0:.3f} seconds", flush=True)
t0 = time.perf_counter()
shade_map, shade_range = None, (0.55, 1.35)
if ENABLE_INTRINSIC_SHADING:
if intrinsic_models is None:
_load_intrinsic_model()
if intrinsic_models is not None:
shade_map, shade_range = build_intrinsic_shade_map(img_np, surface_mask)
if shade_map is None:
shade_map, shade_range = build_shade_map(img_np, surface_mask)
print(f"[TIMING] Shade map construction took {time.perf_counter() - t0:.3f} seconds", flush=True)
t0 = time.perf_counter()
color_temperature = estimate_color_temperature(img_np, surface_mask) # B2
light_vector = estimate_light_vector(shade_map, surface_mask) # B3
confidence_map = build_confidence_map(surface_mask) # B10
print(f"[TIMING] Lighting analysis took {time.perf_counter() - t0:.3f} seconds", flush=True)
# B7 — split the surface mask into connected regions; all share the same
# homography so the tile grid is continuous across doorways.
t0 = time.perf_counter()
floor_regions = find_floor_regions(surface_mask, min_floor_area)
multi_room = len(floor_regions) > 1
print(f"[TIMING] Floor region detection took {time.perf_counter() - t0:.3f} seconds", flush=True)
t0 = time.perf_counter()
segments = []
if floor_regions:
for region_idx, region_mask in enumerate(floor_regions):
region_indices = np.flatnonzero(region_mask.ravel()).astype(np.uint32)
if len(region_indices) < min_floor_area:
continue
# Per-region confidence sub-map
region_conf = build_confidence_map(region_mask)
segments.append({
"id": region_idx,
"className": "floor",
"mask": base64.b64encode(region_indices.tobytes()).decode(),
"homography": homography, # shared across all regions (B7)
"plane": plane,
"shadeMap": base64.b64encode(shade_map.tobytes()).decode() if shade_map is not None else None,
"shadeRange": list(shade_range), # B4 — frontend decodes with this
"colorTemperature": color_temperature, # B2
"lightVector": light_vector, # B3
"confidenceMap": base64.b64encode(region_conf.tobytes()).decode() if region_conf is not None else None, # B10
"multiRoom": multi_room, # B7
"gridGroup": "primary" if region_idx == 0 else f"room_{region_idx}", # B7
"metadata": {
"segmenter": segmenter_metadata_name(),
"floorPixels": int(floor_mask.sum()),
"surfacePixels": int(region_mask.sum()),
"depthEnabled": depth is not None,
"shadingEnabled": shade_map is not None,
},
})
if not segments:
flat_seg = seg_map.ravel()
for seg_id, class_id in enumerate(np.unique(flat_seg)):
indices = np.where(flat_seg == class_id)[0].astype(np.uint32)
if len(indices) < 1000:
continue
segments.append({
"id": int(seg_id),
"className": class_name_for_id(int(class_id)),
"mask": base64.b64encode(indices.tobytes()).decode(),
"homography": None,
"plane": None,
"shadeMap": None,
"shadeRange": None,
"colorTemperature": None,
"lightVector": None,
"confidenceMap": None,
"multiRoom": False,
"gridGroup": None,
"metadata": {
"segmenter": segmenter_metadata_name(),
"depthEnabled": depth is not None,
"shadingEnabled": False,
},
})
print(f"[TIMING] Total bundle processing completed in {time.perf_counter() - t_start:.3f} seconds", flush=True)
return {"width": w, "height": h, "pixels": pixels_b64, "segments": segments}
def job_path(job_id: str) -> Path:
return JOB_DIR / f"{job_id}.json"
def read_job(job_id: str):
path = job_path(job_id)
if not path.exists():
raise HTTPException(status_code=404, detail="Job not found.")
return json.loads(path.read_text())
def write_job(job: dict):
job_path(job["id"]).write_text(json.dumps(job))
def run_conversion_task(job_id: str, upload_path: Path):
try:
t_start = time.perf_counter()
image_bytes = upload_path.read_bytes()
bundle = build_segmentation_bundle(image_bytes)
(JOB_DIR / f"{job_id}.bundle.json").write_text(json.dumps(bundle))
job = read_job(job_id)
job["status"] = "COMPLETED"
write_job(job)
print(f"[TIMING] Background conversion task for job {job_id} took {time.perf_counter() - t_start:.3f} seconds", flush=True)
except Exception as exc:
print(f"Background conversion failed: {exc}", flush=True)
try:
job = read_job(job_id)
job["status"] = "FAILED"
job["error"] = str(exc)
write_job(job)
except Exception:
pass
@app.post("/viz2d/convert")
async def convert_to_viz2d(background_tasks: BackgroundTasks, file: UploadFile = File(...)):
if file.content_type and not file.content_type.startswith("image/"):
raise HTTPException(status_code=400, detail="Upload must be a JPG or PNG image.")
job_id = uuid.uuid4().hex
ext = Path(file.filename or "room.jpg").suffix.lower()
if ext not in {".jpg", ".jpeg", ".png", ".webp"}:
ext = ".jpg"
upload_path = UPLOAD_DIR / f"{job_id}{ext}"
with upload_path.open("wb") as out:
shutil.copyfileobj(file.file, out)
job = {
"id": job_id,
"status": "PROCESSING",
"inputUrl": f"/uploads/{upload_path.name}",
"outputUrl": f"/viz2d/jobs/{job_id}/file",
}
write_job(job)
background_tasks.add_task(run_conversion_task, job_id, upload_path)
return job
@app.get("/viz2d/jobs/{job_id}")
async def viz2d_job_status(job_id: str):
return read_job(job_id)
@app.get("/viz2d/jobs/{job_id}/file")
async def viz2d_job_file(job_id: str):
job = read_job(job_id)
if job.get("status") != "COMPLETED":
raise HTTPException(status_code=409, detail="Job is not completed yet.")
bundle_path = JOB_DIR / f"{job_id}.bundle.json"
if not bundle_path.exists():
raise HTTPException(status_code=404, detail="Job output not found.")
async def iter_file():
async with aiofiles.open(bundle_path, "rb") as f:
data = await f.read()
yield data
return StreamingResponse(iter_file(), media_type="application/json")
@app.post("/segment")
async def segment(file: UploadFile = File(...)):
contents = await file.read()
return build_segmentation_bundle(contents)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8002)