perception / models /depth_estimators /depth_anything_v2.py
Zhen Ye
perf: Tune batch sizes and queues for A10 GPUs
a5f8d15
import logging
from typing import Sequence
import numpy as np
import torch
from PIL import Image
from transformers import AutoImageProcessor, AutoModelForDepthEstimation
from .base import DepthEstimator, DepthResult
class DepthAnythingV2Estimator(DepthEstimator):
"""Depth-Anything depth estimator (Transformers-compatible)."""
name = "depth"
supports_batch = True
max_batch_size = 16
def _resize_depth(self, raw_depth, height, width):
if raw_depth.dim() == 2:
raw_depth = raw_depth.unsqueeze(0).unsqueeze(0)
elif raw_depth.dim() == 3:
raw_depth = raw_depth.unsqueeze(1) if raw_depth.shape[0] == 1 else raw_depth.unsqueeze(0)
if raw_depth.shape[-2:] != (height, width):
import torch.nn.functional as F
raw_depth = F.interpolate(
raw_depth,
size=(height, width),
mode="bilinear",
align_corners=False,
)
return raw_depth.squeeze().cpu().numpy().astype(np.float32, copy=False)
def __init__(self, device: str = None) -> None:
logging.info("Loading Depth-Anything model from Hugging Face (transformers)...")
if device:
self.device = torch.device(device)
else:
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_id = "LiheYoung/depth-anything-large-hf"
self.image_processor = AutoImageProcessor.from_pretrained(model_id)
self.model = AutoModelForDepthEstimation.from_pretrained(model_id).to(self.device).eval()
if torch.cuda.is_available():
logging.info("Depth-Anything V2 model loaded on GPU")
else:
logging.warning("Depth-Anything V2 model loaded on CPU (no CUDA available)")
def predict(self, frame: np.ndarray) -> DepthResult:
"""
Run depth estimation on a single frame.
Args:
frame: HxWx3 BGR uint8 numpy array (OpenCV format)
Returns:
DepthResult with depth_map (HxW float32) and focal_length
"""
try:
rgb_frame = frame[:, :, ::-1] # BGR -> RGB
pil_image = Image.fromarray(rgb_frame)
height, width = pil_image.height, pil_image.width
inputs = self.image_processor(images=pil_image, return_tensors="pt").to(self.device)
with torch.no_grad():
outputs = self.model(**inputs)
raw_depth = outputs.predicted_depth
depth_map = self._resize_depth(raw_depth, height, width)
except Exception as exc:
logging.error("Depth-Anything inference failed: %s", exc)
h, w = frame.shape[:2]
return DepthResult(depth_map=np.zeros((h, w), dtype=np.float32), focal_length=1.0)
return DepthResult(depth_map=depth_map, focal_length=1.0)
def predict_batch(self, frames: Sequence[np.ndarray]) -> Sequence[DepthResult]:
# Convert frames to PIL images
pil_images = [Image.fromarray(f[:, :, ::-1]) for f in frames] # BGR->RGB
sizes = [(img.height, img.width) for img in pil_images]
inputs = self.image_processor(images=pil_images, return_tensors="pt").to(self.device)
with torch.no_grad():
outputs = self.model(**inputs)
# outputs.predicted_depth is (B, H, W)
depths = outputs.predicted_depth
results = []
for i, (h, w) in enumerate(sizes):
depth_map = self._resize_depth(depths[i], h, w)
results.append(DepthResult(depth_map=depth_map, focal_length=1.0))
return results