Spaces:
Sleeping
Sleeping
| import logging | |
| from typing import Sequence | |
| import numpy as np | |
| import torch | |
| from PIL import Image | |
| from transformers import AutoImageProcessor, AutoModelForDepthEstimation | |
| from .base import DepthEstimator, DepthResult | |
| class DepthAnythingV2Estimator(DepthEstimator): | |
| """Depth-Anything depth estimator (Transformers-compatible).""" | |
| name = "depth" | |
| supports_batch = True | |
| max_batch_size = 16 | |
| def _resize_depth(self, raw_depth, height, width): | |
| if raw_depth.dim() == 2: | |
| raw_depth = raw_depth.unsqueeze(0).unsqueeze(0) | |
| elif raw_depth.dim() == 3: | |
| raw_depth = raw_depth.unsqueeze(1) if raw_depth.shape[0] == 1 else raw_depth.unsqueeze(0) | |
| if raw_depth.shape[-2:] != (height, width): | |
| import torch.nn.functional as F | |
| raw_depth = F.interpolate( | |
| raw_depth, | |
| size=(height, width), | |
| mode="bilinear", | |
| align_corners=False, | |
| ) | |
| return raw_depth.squeeze().cpu().numpy().astype(np.float32, copy=False) | |
| def __init__(self, device: str = None) -> None: | |
| logging.info("Loading Depth-Anything model from Hugging Face (transformers)...") | |
| if device: | |
| self.device = torch.device(device) | |
| else: | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model_id = "LiheYoung/depth-anything-large-hf" | |
| self.image_processor = AutoImageProcessor.from_pretrained(model_id) | |
| self.model = AutoModelForDepthEstimation.from_pretrained(model_id).to(self.device).eval() | |
| if torch.cuda.is_available(): | |
| logging.info("Depth-Anything V2 model loaded on GPU") | |
| else: | |
| logging.warning("Depth-Anything V2 model loaded on CPU (no CUDA available)") | |
| def predict(self, frame: np.ndarray) -> DepthResult: | |
| """ | |
| Run depth estimation on a single frame. | |
| Args: | |
| frame: HxWx3 BGR uint8 numpy array (OpenCV format) | |
| Returns: | |
| DepthResult with depth_map (HxW float32) and focal_length | |
| """ | |
| try: | |
| rgb_frame = frame[:, :, ::-1] # BGR -> RGB | |
| pil_image = Image.fromarray(rgb_frame) | |
| height, width = pil_image.height, pil_image.width | |
| inputs = self.image_processor(images=pil_image, return_tensors="pt").to(self.device) | |
| with torch.no_grad(): | |
| outputs = self.model(**inputs) | |
| raw_depth = outputs.predicted_depth | |
| depth_map = self._resize_depth(raw_depth, height, width) | |
| except Exception as exc: | |
| logging.error("Depth-Anything inference failed: %s", exc) | |
| h, w = frame.shape[:2] | |
| return DepthResult(depth_map=np.zeros((h, w), dtype=np.float32), focal_length=1.0) | |
| return DepthResult(depth_map=depth_map, focal_length=1.0) | |
| def predict_batch(self, frames: Sequence[np.ndarray]) -> Sequence[DepthResult]: | |
| # Convert frames to PIL images | |
| pil_images = [Image.fromarray(f[:, :, ::-1]) for f in frames] # BGR->RGB | |
| sizes = [(img.height, img.width) for img in pil_images] | |
| inputs = self.image_processor(images=pil_images, return_tensors="pt").to(self.device) | |
| with torch.no_grad(): | |
| outputs = self.model(**inputs) | |
| # outputs.predicted_depth is (B, H, W) | |
| depths = outputs.predicted_depth | |
| results = [] | |
| for i, (h, w) in enumerate(sizes): | |
| depth_map = self._resize_depth(depths[i], h, w) | |
| results.append(DepthResult(depth_map=depth_map, focal_length=1.0)) | |
| return results | |