from __future__ import annotations from pathlib import Path import cv2 import numpy as np import torch from PIL import Image try: # pragma: no cover - optional dependency resolution from depth_anything_3.api import DepthAnything3 # type: ignore from depth_anything_3.utils.visualize import visualize_depth # type: ignore except ModuleNotFoundError: # pragma: no cover import sys ROOT = Path(__file__).resolve().parents[1] sys.path.append(str(ROOT / "src")) from depth_anything_3.api import DepthAnything3 # type: ignore # noqa: E402 from depth_anything_3.utils.visualize import visualize_depth # type: ignore # noqa: E402 def crop_nonblack(img: Image.Image, frac: float = 0.05) -> Image.Image: w, h = img.size dx = int(round(w * frac)) dy = int(round(h * frac)) return img.crop((dx, dy, w - dx, h - dy)) def remove_global_plane(depth: np.ndarray, method: str = "least_squares") -> np.ndarray: if depth.ndim != 2: return depth method = (method or "least_squares").lower() if method in {"none", "off"}: return depth h, w = depth.shape yy, xx = np.mgrid[0:h, 0:w].astype(np.float32) points = np.stack((xx.flatten(), yy.flatten()), axis=1) values = depth.astype(np.float32).reshape(-1, 1) coef = None if method in {"ls", "least_squares", "lstsq"}: try: coef, *_ = np.linalg.lstsq( np.concatenate([points, np.ones((points.shape[0], 1), dtype=np.float32)], axis=1), values, rcond=None, ) except np.linalg.LinAlgError: coef = None if coef is None: return depth plane = (points @ coef[:2] + coef[2]).reshape(h, w) return depth - plane def pick_flat_patch( depth: np.ndarray, patch: int = 96, std_thresh: float = 0.03, grad_thresh: float = 0.35, water_mask: np.ndarray | None = None, ): depth = depth.astype(np.float32) if depth.ndim != 2: raise ValueError("Depth map must be 2D (H, W)") patch = max(3, min(patch, min(depth.shape))) if patch % 2 == 0: patch += 1 depth_norm = (depth - depth.min()) / (np.ptp(depth) + 1e-6) import torch.nn.functional as F def box_mean(arr, k): pad = k // 2 t = torch.from_numpy(arr).unsqueeze(0).unsqueeze(0) t = F.pad(t, (pad, pad, pad, pad), mode="reflect") mean = F.avg_pool2d(t, kernel_size=k, stride=1, padding=0, count_include_pad=False) return mean.squeeze(0).squeeze(0).numpy() mean = box_mean(depth_norm, patch) mean_sq = box_mean(depth_norm * depth_norm, patch) var = np.maximum(mean_sq - mean * mean, 0.0) std_map = np.sqrt(var) dy, dx = np.gradient(depth_norm) grad = np.sqrt(dx * dx + dy * dy) grad_ref = np.percentile(grad, 95) + 1e-6 grad_norm = np.clip(grad / grad_ref, 0.0, 1.0) grad_mask = grad_norm < grad_thresh landing_mask = grad_mask if water_mask is not None and water_mask.shape == grad_mask.shape: landing_mask = landing_mask & (~water_mask) masked_std = np.where(landing_mask, std_map, np.inf) if not np.isfinite(masked_std).any(): masked_std = std_map y, x = np.unravel_index(np.argmin(masked_std), masked_std.shape) half = patch // 2 y0, y1 = max(y - half, 0), min(y + half, depth.shape[0] - 1) x0, x1 = max(x - half, 0), min(x + half, depth.shape[1] - 1) return (x0, y0, x1, y1), std_map, grad_norm, grad_mask, landing_mask class DepthEngine: """Caches DepthAnything models and runs inference at bounded resolution.""" def __init__(self): self._model_cache: dict[str, tuple[DepthAnything3, torch.device]] = {} def _load_model(self, model_id: str) -> tuple[DepthAnything3, torch.device]: device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = DepthAnything3.from_pretrained(model_id).to(device) model.eval() return model, device def get_model(self, model_id: str) -> tuple[DepthAnything3, torch.device]: if model_id not in self._model_cache: self._model_cache[model_id] = self._load_model(model_id) return self._model_cache[model_id] def predict_depth( self, image: np.ndarray, model_id: str, process_res_cap: int, plane_method: str = "least_squares" ) -> tuple[np.ndarray, np.ndarray, int, dict[str, float]]: import time as _time t0 = _time.perf_counter() model, device = self.get_model(model_id) process_res = min(max(image.shape[0], image.shape[1]), int(process_res_cap)) t_pre = _time.perf_counter() with torch.inference_mode(): pred = model.inference( image=[image], process_res=process_res, process_res_method="upper_bound_resize", export_dir=None, ) t_model = _time.perf_counter() depth_raw = np.array(pred.depth[0]) depth = remove_global_plane(depth_raw, method=plane_method) t_post = _time.perf_counter() timings = { "prep_ms": (t_pre - t0) * 1000.0, "model_ms": (t_model - t_pre) * 1000.0, "plane_ms": (t_post - t_model) * 1000.0, } return depth_raw, depth, process_res, timings def smooth_depth(depth: np.ndarray, sigma: float) -> np.ndarray: if sigma <= 0: return depth k = max(3, int(round(sigma * 3)) * 2 + 1) try: depth = cv2.GaussianBlur(depth, (k, k), sigmaX=sigma, sigmaY=sigma) except Exception: pass return depth __all__ = [ "DepthEngine", "crop_nonblack", "pick_flat_patch", "remove_global_plane", "smooth_depth", "visualize_depth", ]