# Copyright (c) 2025 ByteDance Ltd. and/or its affiliates # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Depth Anything 3 API module. This module provides the main API for Depth Anything 3, including model loading, inference, and export capabilities. It supports both single and nested model architectures. """ from __future__ import annotations import time from typing import Optional, Sequence import numpy as np import torch import torch.nn as nn from huggingface_hub import PyTorchModelHubMixin from PIL import Image from depth_anything_3.cache import get_model_cache from depth_anything_3.cfg import create_object, load_config from depth_anything_3.registry import MODEL_REGISTRY from depth_anything_3.specs import Prediction from depth_anything_3.utils.adaptive_batching import ( AdaptiveBatchConfig, AdaptiveBatchSizeCalculator, adaptive_batch_iterator, estimate_max_batch_size, ) from depth_anything_3.utils.export import export from depth_anything_3.utils.geometry import affine_inverse from depth_anything_3.utils.io.gpu_input_processor import GPUInputProcessor from depth_anything_3.utils.io.input_processor import InputProcessor from depth_anything_3.utils.io.output_processor import OutputProcessor from depth_anything_3.utils.logger import logger from depth_anything_3.utils.pose_align import align_poses_umeyama torch.backends.cudnn.benchmark = False # logger.info("CUDNN Benchmark Disabled") SAFETENSORS_NAME = "model.safetensors" CONFIG_NAME = "config.json" class DepthAnything3(nn.Module, PyTorchModelHubMixin): """ Depth Anything 3 main API class. This class provides a high-level interface for depth estimation using Depth Anything 3. It supports both single and nested model architectures with metric scaling capabilities. Features: - Hugging Face Hub integration via PyTorchModelHubMixin - Support for multiple model presets (vitb, vitg, nested variants) - Automatic mixed precision inference - Export capabilities for various formats (GLB, PLY, NPZ, etc.) - Camera pose estimation and metric depth scaling Usage: # Load from Hugging Face Hub model = DepthAnything3.from_pretrained("huggingface/model-name") # Or create with specific preset model = DepthAnything3(preset="vitg") # Run inference prediction = model.inference(images, export_dir="output", export_format="glb") """ _commit_hash: str | None = None # Set by mixin when loading from Hub def __init__(self, model_name: str = "da3-large", device: str | torch.device | None = None, use_cache: bool = True, **kwargs): """ Initialize DepthAnything3 with specified preset. Args: model_name: The name of the model preset to use. Examples: 'da3-giant', 'da3-large', 'da3metric-large', 'da3nested-giant-large'. device: Target device ('cuda', 'mps', 'cpu'). If None, auto-detect. use_cache: Whether to use model caching (default: True). Set to False to force reload model from disk. **kwargs: Additional keyword arguments (currently unused). """ super().__init__() self.model_name = model_name self.use_cache = use_cache # Determine device if device is None: device = self._auto_detect_device() self.device = torch.device(device) if isinstance(device, str) else device # Load model configuration self.config = load_config(MODEL_REGISTRY[self.model_name]) # Build or retrieve model from cache if use_cache: cache = get_model_cache() self.model = cache.get( model_name=self.model_name, device=self.device, loader_fn=lambda: self._create_model() ) else: logger.info(f"Model cache disabled, loading {self.model_name} from disk") self.model = self._create_model() # Ensure model is on correct device and in eval mode self.model = self.model.to(self.device) self.model.eval() # Initialize processors # Use GPUInputProcessor for CUDA/MPS devices to enable GPU ops # Note: NVJPEG decoding is specific to CUDA, MPS will use optimized CPU decoding + GPU resize if self.device.type in ("cuda", "mps"): self.input_processor = GPUInputProcessor(device=self.device) decoding_info = "NVJPEG support enabled" if self.device.type == "cuda" else "TorchVision decoding" logger.info(f"Using GPUInputProcessor ({decoding_info} on {self.device})") else: self.input_processor = InputProcessor() logger.info("Using standard InputProcessor (optimized CPU pipeline)") self.output_processor = OutputProcessor() def _auto_detect_device(self) -> torch.device: """Auto-detect best available device.""" if torch.cuda.is_available(): return torch.device("cuda") elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): return torch.device("mps") else: return torch.device("cpu") def _create_model(self) -> nn.Module: """Create and return new model instance on correct device.""" model = create_object(self.config) model = model.to(self.device) # Move to device before caching model.eval() return model @torch.inference_mode() def forward( self, image: torch.Tensor, extrinsics: torch.Tensor | None = None, intrinsics: torch.Tensor | None = None, export_feat_layers: list[int] | None = None, infer_gs: bool = False, use_ray_pose: bool = False, ref_view_strategy: str = "saddle_balanced", ) -> dict[str, torch.Tensor]: """ Forward pass through the model. Args: image: Input batch with shape ``(B, N, 3, H, W)`` on the model device. extrinsics: Optional camera extrinsics with shape ``(B, N, 4, 4)``. intrinsics: Optional camera intrinsics with shape ``(B, N, 3, 3)``. export_feat_layers: Layer indices to return intermediate features for. infer_gs: Enable Gaussian Splatting branch. use_ray_pose: Use ray-based pose estimation instead of camera decoder. ref_view_strategy: Strategy for selecting reference view from multiple views. Returns: Dictionary containing model predictions """ with torch.no_grad(): # MPS doesn't support autocast well - use float32 for stability if image.device.type == "mps": return self.model( image, extrinsics, intrinsics, export_feat_layers, infer_gs, use_ray_pose, ref_view_strategy ) else: # CUDA: use autocast for performance autocast_dtype = torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16 with torch.autocast(device_type=image.device.type, dtype=autocast_dtype): return self.model( image, extrinsics, intrinsics, export_feat_layers, infer_gs, use_ray_pose, ref_view_strategy ) def inference( self, image: list[np.ndarray | Image.Image | str], extrinsics: np.ndarray | None = None, intrinsics: np.ndarray | None = None, align_to_input_ext_scale: bool = True, infer_gs: bool = False, use_ray_pose: bool = False, ref_view_strategy: str = "saddle_balanced", render_exts: np.ndarray | None = None, render_ixts: np.ndarray | None = None, render_hw: tuple[int, int] | None = None, process_res: int = 504, process_res_method: str = "upper_bound_resize", export_dir: str | None = None, export_format: str = "mini_npz", export_feat_layers: Sequence[int] | None = None, # GLB export parameters conf_thresh_percentile: float = 40.0, num_max_points: int = 1_000_000, show_cameras: bool = True, # Feat_vis export parameters feat_vis_fps: int = 15, # Other export parameters, e.g., gs_ply, gs_video export_kwargs: Optional[dict] = {}, ) -> Prediction: """ Run inference on input images. Args: image: List of input images (numpy arrays, PIL Images, or file paths) extrinsics: Camera extrinsics (N, 4, 4) intrinsics: Camera intrinsics (N, 3, 3) align_to_input_ext_scale: whether to align the input pose scale to the prediction infer_gs: Enable the 3D Gaussian branch (needed for `gs_ply`/`gs_video` exports) use_ray_pose: Use ray-based pose estimation instead of camera decoder (default: False) ref_view_strategy: Strategy for selecting reference view from multiple views. Options: "first", "middle", "saddle_balanced", "saddle_sim_range". Default: "saddle_balanced". For single view input (S ≤ 2), no reordering is performed. render_exts: Optional render extrinsics for Gaussian video export render_ixts: Optional render intrinsics for Gaussian video export render_hw: Optional render resolution for Gaussian video export process_res: Processing resolution process_res_method: Resize method for processing export_dir: Directory to export results export_format: Export format (mini_npz, npz, glb, ply, gs, gs_video) export_feat_layers: Layer indices to export intermediate features from conf_thresh_percentile: [GLB] Lower percentile for adaptive confidence threshold (default: 40.0) # noqa: E501 num_max_points: [GLB] Maximum number of points in the point cloud (default: 1,000,000) show_cameras: [GLB] Show camera wireframes in the exported scene (default: True) feat_vis_fps: [FEAT_VIS] Frame rate for output video (default: 15) export_kwargs: additional arguments to export functions. Returns: Prediction object containing depth maps and camera parameters """ if "gs" in export_format: assert infer_gs, "must set `infer_gs=True` to perform gs-related export." if "colmap" in export_format: assert isinstance(image[0], str), "`image` must be image paths for COLMAP export." # Preprocess images imgs_cpu, extrinsics, intrinsics = self._preprocess_inputs( image, extrinsics, intrinsics, process_res, process_res_method ) # Prepare tensors for model imgs, ex_t, in_t = self._prepare_model_inputs(imgs_cpu, extrinsics, intrinsics) # Normalize extrinsics ex_t_norm = self._normalize_extrinsics(ex_t.clone() if ex_t is not None else None) # Run model forward pass export_feat_layers = list(export_feat_layers) if export_feat_layers is not None else [] raw_output = self._run_model_forward( imgs, ex_t_norm, in_t, export_feat_layers, infer_gs, use_ray_pose, ref_view_strategy ) # Convert raw output to prediction prediction = self._convert_to_prediction(raw_output) # Align prediction to extrinsincs prediction = self._align_to_input_extrinsics_intrinsics( extrinsics, intrinsics, prediction, align_to_input_ext_scale ) # Add processed images for visualization prediction = self._add_processed_images(prediction, imgs_cpu) # Export if requested if export_dir is not None: if "gs" in export_format: if infer_gs and "gs_video" not in export_format: export_format = f"{export_format}-gs_video" if "gs_video" in export_format: if "gs_video" not in export_kwargs: export_kwargs["gs_video"] = {} export_kwargs["gs_video"].update( { "extrinsics": render_exts, "intrinsics": render_ixts, "out_image_hw": render_hw, } ) # Add GLB export parameters if "glb" in export_format: if "glb" not in export_kwargs: export_kwargs["glb"] = {} export_kwargs["glb"].update( { "conf_thresh_percentile": conf_thresh_percentile, "num_max_points": num_max_points, "show_cameras": show_cameras, } ) # Add Feat_vis export parameters if "feat_vis" in export_format: if "feat_vis" not in export_kwargs: export_kwargs["feat_vis"] = {} export_kwargs["feat_vis"].update( { "fps": feat_vis_fps, } ) # Add COLMAP export parameters if "colmap" in export_format: if "colmap" not in export_kwargs: export_kwargs["colmap"] = {} export_kwargs["colmap"].update( { "image_paths": image, "conf_thresh_percentile": conf_thresh_percentile, "process_res_method": process_res_method, } ) self._export_results(prediction, export_format, export_dir, **export_kwargs) return prediction def _preprocess_inputs( self, image: list[np.ndarray | Image.Image | str], extrinsics: np.ndarray | None = None, intrinsics: np.ndarray | None = None, process_res: int = 504, process_res_method: str = "upper_bound_resize", ) -> tuple[torch.Tensor, torch.Tensor | None, torch.Tensor | None]: """Preprocess input images using input processor.""" start_time = time.time() # Determine normalization strategy: # 1. Hybrid (CPU Proc + GPU Device): Skip CPU norm (return uint8), norm on GPU later. # 2. GPU Proc (NVJPEG/Kornia): Perform norm on GPU immediately. # 3. Standard CPU: Perform norm on CPU. perform_norm = True if self.device.type in ("cuda", "mps") and not isinstance(self.input_processor, GPUInputProcessor): perform_norm = False imgs_cpu, extrinsics, intrinsics = self.input_processor( image, extrinsics.copy() if extrinsics is not None else None, intrinsics.copy() if intrinsics is not None else None, process_res, process_res_method, perform_normalization=perform_norm, ) end_time = time.time() logger.info( "Processed Images Done taking", end_time - start_time, "seconds. Shape: ", imgs_cpu.shape, ) return imgs_cpu, extrinsics, intrinsics def _prepare_model_inputs( self, imgs_cpu: torch.Tensor, extrinsics: torch.Tensor | None, intrinsics: torch.Tensor | None, ) -> tuple[torch.Tensor, torch.Tensor | None, torch.Tensor | None]: """ Prepare tensors for model input with optimized device transfer. """ device = self._get_model_device() # 1. Handle Image Tensor # Compare device types (handles cuda:0 vs cuda comparison) imgs_on_target_device = (imgs_cpu.device.type == device.type) if imgs_on_target_device: # Case A: Already on correct device (GPUInputProcessor) # Ensure correct shape: (B, S, C, H, W) where B=1 imgs = imgs_cpu if imgs.dim() == 3: # Single image (C, H, W) -> (1, 1, C, H, W) imgs = imgs.unsqueeze(0).unsqueeze(0) elif imgs.dim() == 4: # Batch of images (N, C, H, W) -> (1, N, C, H, W) imgs = imgs.unsqueeze(0) # dim() == 5 means already correct shape if imgs.dtype == torch.uint8: # Should not happen with GPUInputProcessor default, but safety fallback imgs = imgs.float() / 255.0 imgs = InputProcessor.normalize_tensor( imgs, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225] ) else: # Case B & C: Needs transfer from CPU if imgs_cpu.dtype == torch.uint8: # Hybrid mode: uint8 -> GPU -> float -> normalize if device.type == "cuda": imgs_cpu = imgs_cpu.pin_memory() imgs = imgs_cpu.to(device, non_blocking=True).float() / 255.0 imgs = InputProcessor.normalize_tensor( imgs, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225] ) imgs = imgs[None] # Add batch dimension (1, N, 3, H, W) else: # Standard mode: float -> GPU if device.type == "cuda": imgs_cpu = imgs_cpu.pin_memory() imgs = imgs_cpu.to(device, non_blocking=True)[None].float() # Convert camera parameters to tensors with non-blocking transfer ex_t = ( extrinsics.pin_memory().to(device, non_blocking=True)[None].float() if extrinsics is not None and device.type == "cuda" and extrinsics.device.type == "cpu" else extrinsics.to(device, non_blocking=True)[None].float() if extrinsics is not None and extrinsics.device != device else extrinsics[None].float() if extrinsics is not None else None ) in_t = ( intrinsics.pin_memory().to(device, non_blocking=True)[None].float() if intrinsics is not None and device.type == "cuda" and intrinsics.device.type == "cpu" else intrinsics.to(device, non_blocking=True)[None].float() if intrinsics is not None and intrinsics.device != device else intrinsics[None].float() if intrinsics is not None else None ) return imgs, ex_t, in_t def _normalize_extrinsics(self, ex_t: torch.Tensor | None) -> torch.Tensor | None: """Normalize extrinsics""" if ex_t is None: return None transform = affine_inverse(ex_t[:, :1]) ex_t_norm = ex_t @ transform c2ws = affine_inverse(ex_t_norm) translations = c2ws[..., :3, 3] dists = translations.norm(dim=-1) median_dist = torch.median(dists) median_dist = torch.clamp(median_dist, min=1e-1) ex_t_norm[..., :3, 3] = ex_t_norm[..., :3, 3] / median_dist return ex_t_norm def _align_to_input_extrinsics_intrinsics( self, extrinsics: torch.Tensor | None, intrinsics: torch.Tensor | None, prediction: Prediction, align_to_input_ext_scale: bool = True, ransac_view_thresh: int = 10, ) -> Prediction: """Align depth map to input extrinsics""" if extrinsics is None: return prediction prediction.intrinsics = intrinsics.numpy() _, _, scale, aligned_extrinsics = align_poses_umeyama( prediction.extrinsics, extrinsics.numpy(), ransac=len(extrinsics) >= ransac_view_thresh, return_aligned=True, random_state=42, ) if align_to_input_ext_scale: prediction.extrinsics = extrinsics[..., :3, :].numpy() prediction.depth /= scale else: prediction.extrinsics = aligned_extrinsics return prediction def _run_model_forward( self, imgs: torch.Tensor, ex_t: torch.Tensor | None, in_t: torch.Tensor | None, export_feat_layers: Sequence[int] | None = None, infer_gs: bool = False, use_ray_pose: bool = False, ref_view_strategy: str = "saddle_balanced", ) -> dict[str, torch.Tensor]: """Run model forward pass.""" device = imgs.device need_sync = device.type == "cuda" if need_sync: torch.cuda.synchronize(device) start_time = time.time() feat_layers = list(export_feat_layers) if export_feat_layers is not None else None output = self.forward(imgs, ex_t, in_t, feat_layers, infer_gs, use_ray_pose, ref_view_strategy) if need_sync: torch.cuda.synchronize(device) end_time = time.time() logger.info(f"Model Forward Pass Done. Time: {end_time - start_time} seconds") return output def _convert_to_prediction(self, raw_output: dict[str, torch.Tensor]) -> Prediction: """Convert raw model output to Prediction object.""" start_time = time.time() output = self.output_processor(raw_output) end_time = time.time() logger.info(f"Conversion to Prediction Done. Time: {end_time - start_time} seconds") return output def _add_processed_images(self, prediction: Prediction, imgs_cpu: torch.Tensor) -> Prediction: """Add processed images to prediction for visualization.""" # Convert from (N, 3, H, W) to (N, H, W, 3) processed_imgs = imgs_cpu.permute(0, 2, 3, 1).cpu().numpy() # (N, H, W, 3) if imgs_cpu.dtype == torch.uint8: # Already uint8, no need to denormalize pass else: # Denormalize from ImageNet normalization mean = np.array([0.485, 0.456, 0.406]) std = np.array([0.229, 0.224, 0.225]) processed_imgs = processed_imgs * std + mean processed_imgs = np.clip(processed_imgs, 0, 1) processed_imgs = (processed_imgs * 255).astype(np.uint8) prediction.processed_images = processed_imgs return prediction def _export_results( self, prediction: Prediction, export_format: str, export_dir: str, **kwargs ) -> None: """Export results to specified format and directory.""" start_time = time.time() export(prediction, export_format, export_dir, **kwargs) end_time = time.time() logger.info(f"Export Results Done. Time: {end_time - start_time} seconds") def _get_model_device(self) -> torch.device: """ Get the device where the model is located. Returns: Device where the model parameters are located Raises: ValueError: If no tensors are found in the model """ if self.device is not None: return self.device # Find device from parameters for param in self.parameters(): self.device = param.device return param.device # Find device from buffers for buffer in self.buffers(): self.device = buffer.device return buffer.device raise ValueError("No tensor found in model") # ========================================================================= # Adaptive Batching Methods # ========================================================================= def batch_inference( self, images: list[np.ndarray | Image.Image | str], process_res: int = 504, batch_size: int | str = "auto", max_batch_size: int = 64, target_memory_utilization: float = 0.85, progress_callback: callable | None = None, ) -> list[Prediction]: """ Run inference on multiple images with adaptive batching. This method automatically determines optimal batch sizes based on available GPU memory, maximizing throughput while preventing OOM errors. Args: images: List of input images (numpy arrays, PIL Images, or file paths) process_res: Processing resolution (default: 504) batch_size: Batch size or "auto" for adaptive batching (default: "auto") max_batch_size: Maximum batch size when using adaptive batching (default: 64) target_memory_utilization: Target GPU memory usage 0.0-1.0 (default: 0.85) progress_callback: Optional callback(processed, total) for progress updates Returns: List of Prediction objects, one per batch Example: >>> model = DepthAnything3(model_name="da3-large") >>> images = ["img1.jpg", "img2.jpg", ..., "img100.jpg"] >>> >>> # Adaptive batching (recommended) >>> results = model.batch_inference(images, process_res=518) >>> >>> # Fixed batch size >>> results = model.batch_inference(images, batch_size=4) >>> >>> # With progress callback >>> def on_progress(done, total): ... print(f"Processed {done}/{total}") >>> results = model.batch_inference(images, progress_callback=on_progress) """ import gc num_images = len(images) if num_images == 0: return [] results: list[Prediction] = [] # Determine batch size if batch_size == "auto": config = AdaptiveBatchConfig( max_batch_size=max_batch_size, target_memory_utilization=target_memory_utilization, ) calculator = AdaptiveBatchSizeCalculator( model_name=self.model_name, device=self.device, config=config, ) for batch_info in adaptive_batch_iterator(images, calculator, process_res): # Run inference on this batch prediction = self.inference( image=batch_info.items, process_res=process_res, ) results.append(prediction) # Progress callback if progress_callback: progress_callback(batch_info.end_idx, num_images) # Memory cleanup between batches if not batch_info.is_last: gc.collect() if self.device.type == "cuda": torch.cuda.empty_cache() elif self.device.type == "mps": torch.mps.empty_cache() # Update profiling data for better estimates if calculator.config.enable_profiling and self.device.type == "cuda": memory_used = torch.cuda.max_memory_allocated(self.device) / (1024 * 1024) calculator.update_from_profiling( batch_size=batch_info.batch_size, memory_used_mb=memory_used, process_res=process_res, ) torch.cuda.reset_peak_memory_stats(self.device) else: # Fixed batch size fixed_batch_size = int(batch_size) for i in range(0, num_images, fixed_batch_size): end_idx = min(i + fixed_batch_size, num_images) batch_images = images[i:end_idx] prediction = self.inference( image=batch_images, process_res=process_res, ) results.append(prediction) if progress_callback: progress_callback(end_idx, num_images) # Memory cleanup if end_idx < num_images: gc.collect() if self.device.type == "cuda": torch.cuda.empty_cache() elif self.device.type == "mps": torch.mps.empty_cache() return results def get_optimal_batch_size( self, process_res: int = 504, target_utilization: float = 0.85, ) -> int: """ Get the optimal batch size for current GPU memory state. Args: process_res: Processing resolution (default: 504) target_utilization: Target GPU memory usage 0.0-1.0 (default: 0.85) Returns: Recommended batch size Example: >>> model = DepthAnything3(model_name="da3-large") >>> batch_size = model.get_optimal_batch_size(process_res=518) >>> print(f"Optimal batch size: {batch_size}") """ return estimate_max_batch_size( model_name=self.model_name, device=self.device, process_res=process_res, target_utilization=target_utilization, )