#!/usr/bin/env python3 from __future__ import annotations import json import os from functools import lru_cache from pathlib import Path from typing import Any import numpy as np import trimesh import uvicorn from fastapi import FastAPI, HTTPException from fastapi.responses import FileResponse, HTMLResponse, JSONResponse DEFAULT_DATA_ROOT = Path(__file__).resolve().parent / "demo_data" REQUIRED_FILES = ("mesh.ply", "instance_seg_mesh.ply", "metadata.json") HIDDEN_CLASS_NAMES = {"wall", "floor", "ceiling"} app = FastAPI(title="SceneVerse++ Demo") def list_scene_ids(data_root: Path) -> list[str]: if not data_root.is_dir(): raise FileNotFoundError(f"Data root does not exist: {data_root}") scene_ids: list[str] = [] for scene_dir in sorted(path for path in data_root.iterdir() if path.is_dir()): if all((scene_dir / name).is_file() for name in REQUIRED_FILES): scene_ids.append(scene_dir.name) return scene_ids def load_mesh(path: Path) -> trimesh.Trimesh: mesh = trimesh.load(str(path), process=False, force="mesh") if isinstance(mesh, trimesh.Scene): geometries = [geom for geom in mesh.geometry.values() if isinstance(geom, trimesh.Trimesh)] if not geometries: raise ValueError(f"No mesh geometry found in {path}") mesh = trimesh.util.concatenate(geometries) if not isinstance(mesh, trimesh.Trimesh): raise TypeError(f"Unsupported mesh type for {path}: {type(mesh)!r}") return mesh def sanitize_ids(point_ids: list[int], vertex_count: int) -> np.ndarray: array = np.asarray(point_ids, dtype=np.int64) return array[(array >= 0) & (array < vertex_count)] def compute_instances(metadata: dict[str, Any], vertices: np.ndarray) -> list[dict[str, Any]]: instances: list[dict[str, Any]] = [] for instance_id, payload in metadata.items(): class_name = str(payload.get("pred_class_name", "unknown")).strip() if class_name.lower() in HIDDEN_CLASS_NAMES: continue point_ids = sanitize_ids(payload.get("point_ids", []), len(vertices)) if point_ids.size == 0: continue points = vertices[point_ids] mins = points.min(axis=0) maxs = points.max(axis=0) center = ((mins + maxs) / 2.0).tolist() size = (maxs - mins).tolist() instances.append( { "instance_id": str(instance_id), "class_name": class_name or "unknown", "describe": payload.get("pred_describe", ""), "class_id": payload.get("pred_class_id"), "bbox_min": mins.tolist(), "bbox_max": maxs.tolist(), "bbox_center": center, "bbox_size": size, } ) instances.sort(key=lambda item: int(item["instance_id"]) if item["instance_id"].isdigit() else item["instance_id"]) return instances @lru_cache(maxsize=32) def load_scene_bundle(scene_id: str) -> dict[str, Any]: scene_dir = DEFAULT_DATA_ROOT / scene_id if not scene_dir.is_dir(): raise FileNotFoundError(f"Scene directory does not exist: {scene_dir}") paths = {name: scene_dir / name for name in REQUIRED_FILES} for name, path in paths.items(): if not path.is_file(): raise FileNotFoundError(f"Missing required file {name} for scene {scene_id}") mesh = load_mesh(paths["mesh.ply"]) with open(paths["metadata.json"], "r", encoding="utf-8") as f: metadata = json.load(f) instances = compute_instances(metadata, np.asarray(mesh.vertices)) return { "scene_id": scene_id, "mesh_url": f"/api/scenes/{scene_id}/files/mesh.ply", "seg_mesh_url": f"/api/scenes/{scene_id}/files/instance_seg_mesh.ply", "metadata_url": f"/api/scenes/{scene_id}/files/metadata.json", "instances": instances, } INDEX_HTML = """ SceneVerse++ Demo

SceneVerse++ Demo

Load SceneVerse++ scene, view mesh.ply and instance_seg_mesh.ply side by side, double-click a bbox wireframe (edges) to inspect its pred_describe, and keep both views synchronized while rotating.

Loading scene list...
mesh.ply
Original reconstructed scene mesh
Loading...
instance_seg_mesh.ply
Instance-segmented scene mesh
Loading...
Instance
Select manually if needed

Double-click any bbox wireframe (edges) in either viewer. The corresponding bbox will highlight in both viewers, and both cameras stay synchronized.

pred_describe
Instance-level description
""" @app.get("/", response_class=HTMLResponse) def index() -> str: return INDEX_HTML @app.get("/api/scenes", response_class=JSONResponse) def api_scenes() -> list[str]: try: return list_scene_ids(DEFAULT_DATA_ROOT) except FileNotFoundError as exc: raise HTTPException(status_code=404, detail=str(exc)) from exc @app.get("/api/scenes/{scene_id}", response_class=JSONResponse) def api_scene(scene_id: str) -> dict[str, Any]: try: return load_scene_bundle(scene_id) except FileNotFoundError as exc: raise HTTPException(status_code=404, detail=str(exc)) from exc except Exception as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc @app.get("/api/scenes/{scene_id}/files/{filename}") def api_scene_file(scene_id: str, filename: str) -> FileResponse: if filename not in REQUIRED_FILES: raise HTTPException(status_code=404, detail=f"Unsupported file: {filename}") file_path = DEFAULT_DATA_ROOT / scene_id / filename if not file_path.is_file(): raise HTTPException(status_code=404, detail=f"File not found: {file_path}") return FileResponse(file_path) if __name__ == "__main__": port = int(os.getenv("PORT", "7860")) uvicorn.run("app:app", host="0.0.0.0", port=port, reload=False)