| import os |
| import torch |
| import numpy as np |
| import cv2 |
| from transformers import pipeline |
| from transformers.utils import logging as hf_logging |
| from PIL import Image |
|
|
| class DepthEstimator: |
| """ |
| Depth estimation using Depth Anything v2 |
| """ |
| def __init__(self, model_size='small', device=None): |
| """ |
| Initialize the depth estimator |
| |
| Args: |
| model_size (str): Model size ('small', 'base', 'large') |
| device (str): Device to run inference on ('cuda', 'cpu', 'mps') |
| """ |
| |
| if device is None: |
| if torch.cuda.is_available(): |
| device = 'cuda' |
| elif hasattr(torch, 'backends') and hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): |
| device = 'mps' |
| else: |
| device = 'cpu' |
| |
| self.device = device |
| hf_logging.set_verbosity_error() |
| hf_logging.disable_progress_bar() |
| |
| |
| if self.device == 'mps': |
| print("Using MPS device with CPU fallback for unsupported operations") |
| os.environ['PYTORCH_ENABLE_MPS_FALLBACK'] = '1' |
| |
| self.pipe_device = -1 |
| print("Forcing CPU for depth estimation pipeline due to MPS compatibility issues") |
| elif self.device == 'cpu': |
| self.pipe_device = -1 |
| elif isinstance(self.device, str) and self.device.startswith('cuda'): |
| self.pipe_device = 0 |
| else: |
| self.pipe_device = -1 |
| |
| print(f"Using device: {self.device} for depth estimation (pipeline on {self.pipe_device})") |
| |
| |
| model_map = { |
| 'small': 'depth-anything/Depth-Anything-V2-Small-hf', |
| 'base': 'depth-anything/Depth-Anything-V2-Base-hf', |
| 'large': 'depth-anything/Depth-Anything-V2-Large-hf' |
| } |
| |
| model_name = model_map.get(model_size.lower(), model_map['small']) |
| |
| |
| try: |
| self.pipe = pipeline(task="depth-estimation", model=model_name, device=self.pipe_device) |
| print(f"Loaded Depth Anything v2 {model_size} model on {self.pipe_device}") |
| except Exception as e: |
| |
| print(f"Error loading model on {self.pipe_device}: {e}") |
| print("Falling back to CPU for depth estimation") |
| self.pipe_device = -1 |
| self.pipe = pipeline(task="depth-estimation", model=model_name, device=self.pipe_device) |
| print(f"Loaded Depth Anything v2 {model_size} model on CPU (fallback)") |
| |
| def estimate_depth(self, image): |
| """ |
| Estimate depth from an image |
| |
| Args: |
| image (numpy.ndarray): Input image (BGR format) |
| |
| Returns: |
| numpy.ndarray: Depth map (normalized to 0-1) |
| """ |
| |
| image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
| |
| |
| pil_image = Image.fromarray(image_rgb) |
| |
| |
| try: |
| depth_result = self.pipe(pil_image) |
| depth_map = depth_result["depth"] |
| |
| |
| if isinstance(depth_map, Image.Image): |
| depth_map = np.array(depth_map) |
| elif isinstance(depth_map, torch.Tensor): |
| depth_map = depth_map.cpu().numpy() |
| except RuntimeError as e: |
| |
| if self.device == 'mps': |
| print(f"MPS error during depth estimation: {e}") |
| print("Temporarily falling back to CPU for this frame") |
| |
| cpu_pipe = pipeline(task="depth-estimation", model=self.pipe.model.config._name_or_path, device=-1) |
| depth_result = cpu_pipe(pil_image) |
| depth_map = depth_result["depth"] |
| |
| |
| if isinstance(depth_map, Image.Image): |
| depth_map = np.array(depth_map) |
| elif isinstance(depth_map, torch.Tensor): |
| depth_map = depth_map.cpu().numpy() |
| else: |
| |
| raise |
| |
| |
| depth_min = depth_map.min() |
| depth_max = depth_map.max() |
| if depth_max > depth_min: |
| depth_map = (depth_map - depth_min) / (depth_max - depth_min) |
| |
| return depth_map |
| |
| def colorize_depth(self, depth_map, cmap=cv2.COLORMAP_INFERNO): |
| """ |
| Colorize depth map for visualization |
| |
| Args: |
| depth_map (numpy.ndarray): Depth map (normalized to 0-1) |
| cmap (int): OpenCV colormap |
| |
| Returns: |
| numpy.ndarray: Colorized depth map (BGR format) |
| """ |
| depth_map_uint8 = (depth_map * 255).astype(np.uint8) |
| colored_depth = cv2.applyColorMap(depth_map_uint8, cmap) |
| return colored_depth |
| |
| def get_depth_at_point(self, depth_map, x, y): |
| """ |
| Get depth value at a specific point |
| |
| Args: |
| depth_map (numpy.ndarray): Depth map |
| x (int): X coordinate |
| y (int): Y coordinate |
| |
| Returns: |
| float: Depth value at (x, y) |
| """ |
| if 0 <= y < depth_map.shape[0] and 0 <= x < depth_map.shape[1]: |
| return depth_map[y, x] |
| return 0.0 |
| |
| def get_depth_in_region(self, depth_map, bbox, method='median'): |
| """ |
| Get depth value in a region defined by a bounding box |
| |
| Args: |
| depth_map (numpy.ndarray): Depth map |
| bbox (list): Bounding box [x1, y1, x2, y2] |
| method (str): Method to compute depth ('median', 'mean', 'min') |
| |
| Returns: |
| float: Depth value in the region |
| """ |
| x1, y1, x2, y2 = [int(coord) for coord in bbox] |
| |
| |
| x1 = max(0, x1) |
| y1 = max(0, y1) |
| x2 = min(depth_map.shape[1] - 1, x2) |
| y2 = min(depth_map.shape[0] - 1, y2) |
| |
| |
| region = depth_map[y1:y2, x1:x2] |
| |
| if region.size == 0: |
| return 0.0 |
| |
| |
| if method == 'median': |
| return float(np.median(region)) |
| elif method == 'mean': |
| return float(np.mean(region)) |
| elif method == 'min': |
| return float(np.min(region)) |
| else: |
| return float(np.median(region)) |
|
|