import logging from typing import Sequence import numpy as np import torch from PIL import Image from transformers import AutoImageProcessor, AutoModelForDepthEstimation from .base import DepthEstimator, DepthResult class DepthAnythingV2Estimator(DepthEstimator): """Depth-Anything depth estimator (Transformers-compatible).""" name = "depth" supports_batch = True max_batch_size = 16 def _resize_depth(self, raw_depth, height, width): if raw_depth.dim() == 2: raw_depth = raw_depth.unsqueeze(0).unsqueeze(0) elif raw_depth.dim() == 3: raw_depth = raw_depth.unsqueeze(1) if raw_depth.shape[0] == 1 else raw_depth.unsqueeze(0) if raw_depth.shape[-2:] != (height, width): import torch.nn.functional as F raw_depth = F.interpolate( raw_depth, size=(height, width), mode="bilinear", align_corners=False, ) return raw_depth.squeeze().cpu().numpy().astype(np.float32, copy=False) def __init__(self, device: str = None) -> None: logging.info("Loading Depth-Anything model from Hugging Face (transformers)...") if device: self.device = torch.device(device) else: self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model_id = "LiheYoung/depth-anything-large-hf" self.image_processor = AutoImageProcessor.from_pretrained(model_id) self.model = AutoModelForDepthEstimation.from_pretrained(model_id).to(self.device).eval() if torch.cuda.is_available(): logging.info("Depth-Anything V2 model loaded on GPU") else: logging.warning("Depth-Anything V2 model loaded on CPU (no CUDA available)") def predict(self, frame: np.ndarray) -> DepthResult: """ Run depth estimation on a single frame. Args: frame: HxWx3 BGR uint8 numpy array (OpenCV format) Returns: DepthResult with depth_map (HxW float32) and focal_length """ try: rgb_frame = frame[:, :, ::-1] # BGR -> RGB pil_image = Image.fromarray(rgb_frame) height, width = pil_image.height, pil_image.width inputs = self.image_processor(images=pil_image, return_tensors="pt").to(self.device) with torch.no_grad(): outputs = self.model(**inputs) raw_depth = outputs.predicted_depth depth_map = self._resize_depth(raw_depth, height, width) except Exception as exc: logging.error("Depth-Anything inference failed: %s", exc) h, w = frame.shape[:2] return DepthResult(depth_map=np.zeros((h, w), dtype=np.float32), focal_length=1.0) return DepthResult(depth_map=depth_map, focal_length=1.0) def predict_batch(self, frames: Sequence[np.ndarray]) -> Sequence[DepthResult]: # Convert frames to PIL images pil_images = [Image.fromarray(f[:, :, ::-1]) for f in frames] # BGR->RGB sizes = [(img.height, img.width) for img in pil_images] inputs = self.image_processor(images=pil_images, return_tensors="pt").to(self.device) with torch.no_grad(): outputs = self.model(**inputs) # outputs.predicted_depth is (B, H, W) depths = outputs.predicted_depth results = [] for i, (h, w) in enumerate(sizes): depth_map = self._resize_depth(depths[i], h, w) results.append(DepthResult(depth_map=depth_map, focal_length=1.0)) return results