import onnxruntime as ort import numpy as np from pathlib import Path from typing import Optional, Tuple import cv2 class DepthAnythingV2: """ Depth Anything V2 model wrapper for ONNX inference Supports both small (25M) and large (1.3B) models """ def __init__( self, model_path: str, use_gpu: bool = True, use_tensorrt: bool = False ): """ Initialize Depth Anything V2 model Args: model_path: Path to ONNX model file use_gpu: Whether to use GPU acceleration use_tensorrt: Whether to use TensorRT optimization """ self.model_path = Path(model_path) if not self.model_path.exists(): raise FileNotFoundError(f"Model not found: {model_path}") # Setup ONNX Runtime session providers = self._get_providers(use_gpu, use_tensorrt) session_options = ort.SessionOptions() session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL self.session = ort.InferenceSession( str(self.model_path), sess_options=session_options, providers=providers ) # Get input/output names self.input_name = self.session.get_inputs()[0].name self.output_name = self.session.get_outputs()[0].name # Get expected input shape input_shape = self.session.get_inputs()[0].shape # Handle dynamic dimensions (e.g., ['batch_size', 3, 'height', 'width']) # Default to 518x518 for Depth-Anything V2 if isinstance(input_shape[2], str): self.input_height = 518 self.input_width = 518 else: self.input_height = input_shape[2] self.input_width = input_shape[3] print(f"✓ Loaded model: {self.model_path.name}") print(f" Input shape: {input_shape}") print(f" Providers: {providers}") def _get_providers(self, use_gpu: bool, use_tensorrt: bool) -> list: """Get ONNX Runtime execution providers""" providers = [] if use_tensorrt and use_gpu: providers.append('TensorrtExecutionProvider') if use_gpu: providers.append('CUDAExecutionProvider') providers.append('CPUExecutionProvider') return providers def preprocess(self, image: np.ndarray) -> Tuple[np.ndarray, Tuple[int, int]]: """ Preprocess image for model input Args: image: Input image (RGB, HxWx3) Returns: Tuple of (preprocessed_image, original_size) """ h, w = image.shape[:2] original_size = (h, w) # Resize to model input size image = cv2.resize( image, (self.input_width, self.input_height), interpolation=cv2.INTER_LINEAR ) # Normalize image = image.astype(np.float32) / 255.0 # ImageNet normalization mean = np.array([0.485, 0.456, 0.406], dtype=np.float32) std = np.array([0.229, 0.224, 0.225], dtype=np.float32) image = (image - mean) / std # Transpose to NCHW format image = image.transpose(2, 0, 1) image = np.expand_dims(image, axis=0) return image, original_size def postprocess( self, depth: np.ndarray, original_size: Tuple[int, int] ) -> np.ndarray: """ Postprocess depth map output Args: depth: Raw depth output from model original_size: Original image size (h, w) Returns: Depth map resized to original size """ # Remove batch dimension if len(depth.shape) == 4: depth = depth[0] # Remove channel dimension if present if len(depth.shape) == 3: depth = depth[0] # Resize to original size h, w = original_size depth = cv2.resize(depth, (w, h), interpolation=cv2.INTER_LINEAR) # Normalize to 0-1 range depth = (depth - depth.min()) / (depth.max() - depth.min() + 1e-8) return depth def predict( self, image: np.ndarray, resize_output: bool = True ) -> np.ndarray: """ Run depth estimation on image Args: image: Input image (RGB, HxWx3) resize_output: Whether to resize output to original size Returns: Depth map (same size as input if resize_output=True) """ # Preprocess input_tensor, original_size = self.preprocess(image) # Run inference outputs = self.session.run( [self.output_name], {self.input_name: input_tensor} ) depth = outputs[0] # Postprocess if resize_output: depth = self.postprocess(depth, original_size) return depth def __call__(self, image: np.ndarray) -> np.ndarray: """Convenience method for prediction""" return self.predict(image) class ModelManager: """ Manages multiple depth models and provides a unified interface """ def __init__(self): self.models = {} def load_model( self, name: str, model_path: str, use_gpu: bool = True, use_tensorrt: bool = False ) -> DepthAnythingV2: """ Load a depth model Args: name: Model identifier (e.g., 'small', 'large') model_path: Path to ONNX model use_gpu: Whether to use GPU use_tensorrt: Whether to use TensorRT Returns: Loaded model instance """ model = DepthAnythingV2(model_path, use_gpu, use_tensorrt) self.models[name] = model return model def get_model(self, name: str) -> Optional[DepthAnythingV2]: """Get a loaded model by name""" return self.models.get(name) def predict(self, image: np.ndarray, model_name: str = 'small') -> np.ndarray: """ Run prediction using specified model Args: image: Input image model_name: Name of model to use Returns: Depth map """ model = self.get_model(model_name) if model is None: raise ValueError(f"Model '{model_name}' not loaded") return model.predict(image)