# flake8: noqa: E501 # Copyright (c) 2025 ByteDance Ltd. and/or its affiliates # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Model backend service for Depth Anything 3. Provides HTTP API for model inference with persistent model loading. """ import os import posixpath import time import uuid from concurrent.futures import ThreadPoolExecutor from typing import Any, Dict, List, Optional from urllib.parse import quote import numpy as np import uvicorn from fastapi import FastAPI, HTTPException from fastapi.responses import FileResponse, HTMLResponse from pydantic import BaseModel from PIL import Image from ..api import DepthAnything3 from ..utils.memory import ( get_gpu_memory_info, cleanup_cuda_memory, check_memory_availability, estimate_memory_requirement, ) class InferenceRequest(BaseModel): """Request model for inference API.""" image_paths: List[str] export_dir: Optional[str] = None export_format: str = "mini_npz-glb" extrinsics: Optional[List[List[List[float]]]] = None intrinsics: Optional[List[List[List[float]]]] = None process_res: Optional[int] = None process_res_method: str = "keep" export_feat_layers: List[int] = [] align_to_input_ext_scale: bool = True # GLB export parameters conf_thresh_percentile: float = 40.0 num_max_points: int = 1_000_000 show_cameras: bool = True # Feat_vis export parameters feat_vis_fps: int = 15 class InferenceResponse(BaseModel): """Response model for inference API.""" success: bool message: str task_id: Optional[str] = None export_dir: Optional[str] = None export_format: str = "mini_npz-glb" processing_time: Optional[float] = None class TaskStatus(BaseModel): """Task status model.""" task_id: str status: str # "pending", "running", "completed", "failed" message: str progress: Optional[float] = None # 0.0 to 1.0 created_at: float started_at: Optional[float] = None completed_at: Optional[float] = None export_dir: Optional[str] = None request: Optional[InferenceRequest] = None # Store the original request # Essential task parameters num_images: Optional[int] = None # Number of input images export_format: Optional[str] = None # Export format process_res_method: Optional[str] = None # Processing resolution method video_path: Optional[str] = None # Source video path class ModelBackend: """Model backend service with persistent model loading.""" def __init__(self, model_dir: str, device: str = "cuda"): self.model_dir = model_dir self.device = device self.model = None self.model_loaded = False self.load_time = None self.load_start_time = None # Time when model loading started self.load_completed_time = None # Time when model loading completed self.last_used = None def load_model(self): """Load model if not already loaded.""" if self.model_loaded and self.model is not None: self.last_used = time.time() return self.model try: print(f"Loading model from {self.model_dir}...") self.load_start_time = time.time() start_time = time.time() self.model = DepthAnything3.from_pretrained(self.model_dir).to(self.device) self.model.eval() self.model_loaded = True self.load_time = time.time() - start_time self.load_completed_time = time.time() self.last_used = time.time() print(f"Model loaded successfully in {self.load_time:.2f}s") return self.model except Exception as e: print(f"Failed to load model: {e}") raise e def get_model(self): """Get model, loading if necessary.""" if not self.model_loaded: return self.load_model() self.last_used = time.time() return self.model def get_status(self) -> Dict[str, Any]: """Get backend status information.""" # Calculate uptime from when model loading completed uptime = 0 if self.model_loaded and self.load_completed_time: uptime = time.time() - self.load_completed_time return { "model_loaded": self.model_loaded, "model_dir": self.model_dir, "device": self.device, "load_time": self.load_time, "last_used": self.last_used, "uptime": uptime, } # Global backend instance _backend: Optional[ModelBackend] = None _app: Optional[FastAPI] = None _tasks: Dict[str, TaskStatus] = {} _executor = ThreadPoolExecutor(max_workers=1) # Restrict to single-task execution _running_task_id: Optional[str] = None # Currently running task ID _task_queue: List[str] = [] # Pending task queue # Task cleanup configuration MAX_TASK_HISTORY = 100 # Maximum number of tasks to keep in memory CLEANUP_INTERVAL = 300 # Cleanup interval in seconds (5 minutes) def _process_next_task(): """Process the next task in the queue.""" global _task_queue, _running_task_id if not _task_queue or _running_task_id is not None: return # Get next task from queue task_id = _task_queue.pop(0) # Get task request from tasks dict (we need to store the request) if task_id not in _tasks: return # Submit task to executor _executor.submit(_run_inference_task, task_id) # get_gpu_memory_info imported from depth_anything_3.utils.memory # cleanup_cuda_memory imported from depth_anything_3.utils.memory # check_memory_availability imported from depth_anything_3.utils.memory # estimate_memory_requirement imported from depth_anything_3.utils.memory def _run_inference_task(task_id: str): """Run inference task in background thread with OOM protection.""" global _tasks, _backend, _running_task_id, _task_queue model = None inference_started = False start_time = time.time() try: # Get task request if task_id not in _tasks or _tasks[task_id].request is None: print(f"[{task_id}] Task not found or request missing") return request = _tasks[task_id].request num_images = len(request.image_paths) # Set current running task _running_task_id = task_id # Update task status to running _tasks[task_id].status = "running" _tasks[task_id].started_at = start_time _tasks[task_id].message = f"[{task_id}] Starting inference on {num_images} frames..." print(f"[{task_id}] Starting inference on {num_images} frames") # Pre-inference cleanup to ensure maximum available memory print(f"[{task_id}] Pre-inference cleanup...") cleanup_cuda_memory() # Check memory availability effective_res = request.process_res if not effective_res or effective_res <= 0: try: first_path = request.image_paths[0] with Image.open(first_path) as img: effective_res = max(img.size) except Exception: effective_res = 504 # Fall back to baseline heuristic estimated_memory = estimate_memory_requirement(num_images, effective_res) mem_available, mem_msg = check_memory_availability(estimated_memory) print(f"[{task_id}] {mem_msg}") if not mem_available: # Try aggressive cleanup print(f"[{task_id}] Insufficient memory, attempting aggressive cleanup...") cleanup_cuda_memory() time.sleep(0.5) # Give system time to reclaim memory # Check again mem_available, mem_msg = check_memory_availability(estimated_memory) if not mem_available: raise RuntimeError( f"Insufficient GPU memory after cleanup. {mem_msg}\n" f"Suggestions:\n" f" 1. Reduce process_res (current: {request.process_res})\n" f" 2. Process fewer images at once (current: {num_images})\n" f" 3. Clear other GPU processes" ) # Get model (with error handling) print(f"[{task_id}] Loading model...") _tasks[task_id].message = f"[{task_id}] Loading model..." _tasks[task_id].progress = 0.1 try: model = _backend.get_model() except RuntimeError as e: if "out of memory" in str(e).lower(): cleanup_cuda_memory() raise RuntimeError( f"OOM during model loading: {str(e)}\n" f"Try reducing the batch size or resolution." ) raise print(f"[{task_id}] Model loaded successfully") _tasks[task_id].progress = 0.2 # Prepare inference parameters inference_kwargs = { "image": request.image_paths, "export_format": request.export_format, "process_res": request.process_res, "process_res_method": request.process_res_method, "export_feat_layers": request.export_feat_layers, "align_to_input_ext_scale": request.align_to_input_ext_scale, "conf_thresh_percentile": request.conf_thresh_percentile, "num_max_points": request.num_max_points, "show_cameras": request.show_cameras, "feat_vis_fps": request.feat_vis_fps, } if request.export_dir: inference_kwargs["export_dir"] = request.export_dir if request.extrinsics: inference_kwargs["extrinsics"] = np.array(request.extrinsics, dtype=np.float32) if request.intrinsics: inference_kwargs["intrinsics"] = np.array(request.intrinsics, dtype=np.float32) # Run inference with timing inference_start_time = time.time() print(f"[{task_id}] Running model inference...") _tasks[task_id].message = f"[{task_id}] Running model inference on {num_images} images..." _tasks[task_id].progress = 0.3 inference_started = True try: model.inference(**inference_kwargs) inference_time = time.time() - inference_start_time avg_time_per_image = inference_time / num_images if num_images > 0 else 0 print( f"[{task_id}] Inference completed in {inference_time:.2f}s " f"({avg_time_per_image:.2f}s per image)" ) except RuntimeError as e: if "out of memory" in str(e).lower(): cleanup_cuda_memory() raise RuntimeError( f"OOM during inference: {str(e)}\n" f"Settings: {num_images} images, resolution={request.process_res}\n" f"Suggestions:\n" f" 1. Reduce process_res to {int(request.process_res * 0.75)}\n" f" 2. Process images in smaller batches\n" f" 3. Use process_res_method='resize' instead of 'upper_bound_resize'" ) raise _tasks[task_id].progress = 0.9 # Post-inference cleanup print(f"[{task_id}] Post-inference cleanup...") cleanup_cuda_memory() # Calculate total processing time total_time = time.time() - start_time # Update task status to completed _tasks[task_id].status = "completed" _tasks[task_id].completed_at = time.time() _tasks[task_id].message = ( f"[{task_id}] Completed in {total_time:.2f}s " f"({avg_time_per_image:.2f}s per image)" ) _tasks[task_id].progress = 1.0 _tasks[task_id].export_dir = request.export_dir # Clear running state _running_task_id = None # Process next task in queue _process_next_task() print(f"[{task_id}] Task completed successfully") print( f"[{task_id}] Total time: {total_time:.2f}s, " f"Inference time: {inference_time:.2f}s, " f"Avg per image: {avg_time_per_image:.2f}s" ) except Exception as e: # Update task status to failed error_msg = str(e) total_time = time.time() - start_time print(f"[{task_id}] Task failed after {total_time:.2f}s: {error_msg}") # Always attempt cleanup on failure cleanup_cuda_memory() _tasks[task_id].status = "failed" _tasks[task_id].completed_at = time.time() _tasks[task_id].message = f"[{task_id}] Failed after {total_time:.2f}s: {error_msg}" # Clear running state _running_task_id = None # Process next task in queue _process_next_task() finally: # Final cleanup in finally block to ensure it always runs # This is critical for releasing resources even if unexpected errors occur try: if inference_started: print(f"[{task_id}] Final cleanup in finally block...") cleanup_cuda_memory() except Exception as e: print(f"[{task_id}] Warning: Finally block cleanup failed: {e}") # Schedule cleanup after task completion _schedule_task_cleanup() def _cleanup_old_tasks(): """Clean up old completed/failed tasks to prevent memory buildup.""" global _tasks current_time = time.time() tasks_to_remove = [] # Find tasks to remove - more aggressive cleanup for task_id, task in _tasks.items(): # Remove completed/failed tasks older than 10 minutes (instead of 1 hour) if ( task.status in ["completed", "failed"] and task.completed_at and current_time - task.completed_at > 600 ): # 10 minutes tasks_to_remove.append(task_id) # Remove old tasks for task_id in tasks_to_remove: del _tasks[task_id] print(f"[CLEANUP] Removed old task: {task_id}") # If still too many tasks, remove oldest completed/failed tasks if len(_tasks) > MAX_TASK_HISTORY: completed_tasks = [ (task_id, task) for task_id, task in _tasks.items() if task.status in ["completed", "failed"] ] completed_tasks.sort(key=lambda x: x[1].completed_at or 0) excess_count = len(_tasks) - MAX_TASK_HISTORY for i in range(min(excess_count, len(completed_tasks))): task_id = completed_tasks[i][0] del _tasks[task_id] print(f"[CLEANUP] Removed excess task: {task_id}") # Count active tasks (only pending and running) active_count = sum(1 for task in _tasks.values() if task.status in ["pending", "running"]) print( "[CLEANUP] Task cleanup completed. " f"Total tasks: {len(_tasks)}, Active tasks: {active_count}" ) def _schedule_task_cleanup(): """Schedule task cleanup in background.""" def cleanup_worker(): try: time.sleep(2) # Small delay to ensure task status is updated _cleanup_old_tasks() except Exception as e: print(f"[CLEANUP] Cleanup worker failed: {e}") # Run cleanup in background thread _executor.submit(cleanup_worker) # ============================================================================ # Gallery utilities (extracted from gallery.py) # ============================================================================ GALLERY_IMAGE_EXTS = (".png", ".jpg", ".jpeg", ".webp", ".bmp") def _load_gallery_html() -> str: """ Load and modify gallery HTML to work under /gallery/ subdirectory. Replaces API paths from root to /gallery/ prefix. """ from ..services.gallery import HTML_PAGE # Replace API paths to be under /gallery/ subdirectory html = ( HTML_PAGE.replace("fetch('/manifest.json'", "fetch('/gallery/manifest.json'") .replace("fetch('/manifest/'+", "fetch('/gallery/manifest/'+") .replace( "if(location.pathname!=\"/\")history.replaceState(null,'','/'+location.search)", "if(!location.pathname.startsWith(\"/gallery\"))history.replaceState(null,'','/gallery/'+location.search)", ) ) return html def _gallery_url_join(*parts: str) -> str: """Join URL parts safely.""" norm = posixpath.join(*[p.replace("\\", "/") for p in parts]) segs = [s for s in norm.split("/") if s not in ("", ".")] return "/".join(quote(s) for s in segs) def _is_plain_name(name: str) -> bool: """Check if name is safe for use in paths.""" return all(c not in name for c in ("/", "\\")) and name not in (".", "..") def build_group_list(root_dir: str) -> dict: """Build list of groups from gallery directory.""" groups = [] try: for gname in sorted(os.listdir(root_dir)): gpath = os.path.join(root_dir, gname) if not os.path.isdir(gpath): continue has_scene = False try: for sname in os.listdir(gpath): spath = os.path.join(gpath, sname) if not os.path.isdir(spath): continue if os.path.exists(os.path.join(spath, "scene.glb")) and os.path.exists( os.path.join(spath, "scene.jpg") ): has_scene = True break except Exception: pass if has_scene: groups.append({"id": gname, "title": gname}) except Exception as e: print(f"[warn] build_group_list failed: {e}") return {"groups": groups} def build_group_manifest(root_dir: str, group: str) -> dict: """Build manifest for a specific group.""" items = [] gpath = os.path.join(root_dir, group) try: if not os.path.isdir(gpath): return {"group": group, "items": []} for sname in sorted(os.listdir(gpath)): spath = os.path.join(gpath, sname) if not os.path.isdir(spath): continue glb_fs = os.path.join(spath, "scene.glb") jpg_fs = os.path.join(spath, "scene.jpg") if not (os.path.exists(glb_fs) and os.path.exists(jpg_fs)): continue depth_images = [] dpath = os.path.join(spath, "depth_vis") if os.path.isdir(dpath): files = [ f for f in os.listdir(dpath) if os.path.splitext(f)[1].lower() in GALLERY_IMAGE_EXTS ] for fn in sorted(files): depth_images.append( "/gallery/" + _gallery_url_join(group, sname, "depth_vis", fn) ) items.append( { "id": sname, "title": sname, "model": "/gallery/" + _gallery_url_join(group, sname, "scene.glb"), "thumbnail": "/gallery/" + _gallery_url_join(group, sname, "scene.jpg"), "depth_images": depth_images, } ) except Exception as e: print(f"[warn] build_group_manifest failed for {group}: {e}") return {"group": group, "items": items} def create_app(model_dir: str, device: str = "cuda", gallery_dir: Optional[str] = None) -> FastAPI: """Create FastAPI application with model backend.""" global _backend, _app _backend = ModelBackend(model_dir, device) _app = FastAPI( title="Depth Anything 3 Backend", description="Model inference service for Depth Anything 3", version="1.0.0", ) # Store gallery directory globally for use in routes _gallery_dir = gallery_dir @_app.get("/", response_class=HTMLResponse) async def root(): """Home page with navigation to dashboard and gallery.""" html_content = ( """
Model Backend Service
No active tasks
" completed_tasks_html = "" if completed_tasks: for task in completed_tasks[-10:]: task_details = f"""No completed tasks
" # Generate HTML html_content = f"""Real-time monitoring of model status and inference tasks