drone-landing-safety / app /depth_pipeline.py
yakvrz's picture
Switch rooftop masking to SAM3 and refresh demos
c5794e7
from __future__ import annotations
from pathlib import Path
import cv2
import numpy as np
import torch
from PIL import Image
try: # pragma: no cover - optional dependency resolution
from depth_anything_3.api import DepthAnything3 # type: ignore
from depth_anything_3.utils.visualize import visualize_depth # type: ignore
except ModuleNotFoundError: # pragma: no cover
import sys
ROOT = Path(__file__).resolve().parents[1]
sys.path.append(str(ROOT / "src"))
from depth_anything_3.api import DepthAnything3 # type: ignore # noqa: E402
from depth_anything_3.utils.visualize import visualize_depth # type: ignore # noqa: E402
def crop_nonblack(img: Image.Image, frac: float = 0.05) -> Image.Image:
w, h = img.size
dx = int(round(w * frac))
dy = int(round(h * frac))
return img.crop((dx, dy, w - dx, h - dy))
def remove_global_plane(depth: np.ndarray, method: str = "least_squares") -> np.ndarray:
if depth.ndim != 2:
return depth
method = (method or "least_squares").lower()
if method in {"none", "off"}:
return depth
h, w = depth.shape
yy, xx = np.mgrid[0:h, 0:w].astype(np.float32)
points = np.stack((xx.flatten(), yy.flatten()), axis=1)
values = depth.astype(np.float32).reshape(-1, 1)
coef = None
if method in {"ls", "least_squares", "lstsq"}:
try:
coef, *_ = np.linalg.lstsq(
np.concatenate([points, np.ones((points.shape[0], 1), dtype=np.float32)], axis=1),
values,
rcond=None,
)
except np.linalg.LinAlgError:
coef = None
if coef is None:
return depth
plane = (points @ coef[:2] + coef[2]).reshape(h, w)
return depth - plane
def pick_flat_patch(
depth: np.ndarray,
patch: int = 96,
std_thresh: float = 0.03,
grad_thresh: float = 0.35,
water_mask: np.ndarray | None = None,
):
depth = depth.astype(np.float32)
if depth.ndim != 2:
raise ValueError("Depth map must be 2D (H, W)")
patch = max(3, min(patch, min(depth.shape)))
if patch % 2 == 0:
patch += 1
depth_norm = (depth - depth.min()) / (np.ptp(depth) + 1e-6)
import torch.nn.functional as F
def box_mean(arr, k):
pad = k // 2
t = torch.from_numpy(arr).unsqueeze(0).unsqueeze(0)
t = F.pad(t, (pad, pad, pad, pad), mode="reflect")
mean = F.avg_pool2d(t, kernel_size=k, stride=1, padding=0, count_include_pad=False)
return mean.squeeze(0).squeeze(0).numpy()
mean = box_mean(depth_norm, patch)
mean_sq = box_mean(depth_norm * depth_norm, patch)
var = np.maximum(mean_sq - mean * mean, 0.0)
std_map = np.sqrt(var)
dy, dx = np.gradient(depth_norm)
grad = np.sqrt(dx * dx + dy * dy)
grad_ref = np.percentile(grad, 95) + 1e-6
grad_norm = np.clip(grad / grad_ref, 0.0, 1.0)
grad_mask = grad_norm < grad_thresh
landing_mask = grad_mask
if water_mask is not None and water_mask.shape == grad_mask.shape:
landing_mask = landing_mask & (~water_mask)
masked_std = np.where(landing_mask, std_map, np.inf)
if not np.isfinite(masked_std).any():
masked_std = std_map
y, x = np.unravel_index(np.argmin(masked_std), masked_std.shape)
half = patch // 2
y0, y1 = max(y - half, 0), min(y + half, depth.shape[0] - 1)
x0, x1 = max(x - half, 0), min(x + half, depth.shape[1] - 1)
return (x0, y0, x1, y1), std_map, grad_norm, grad_mask, landing_mask
class DepthEngine:
"""Caches DepthAnything models and runs inference at bounded resolution."""
def __init__(self):
self._model_cache: dict[str, tuple[DepthAnything3, torch.device]] = {}
def _load_model(self, model_id: str) -> tuple[DepthAnything3, torch.device]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = DepthAnything3.from_pretrained(model_id).to(device)
model.eval()
return model, device
def get_model(self, model_id: str) -> tuple[DepthAnything3, torch.device]:
if model_id not in self._model_cache:
self._model_cache[model_id] = self._load_model(model_id)
return self._model_cache[model_id]
def predict_depth(
self, image: np.ndarray, model_id: str, process_res_cap: int, plane_method: str = "least_squares"
) -> tuple[np.ndarray, np.ndarray, int, dict[str, float]]:
import time as _time
t0 = _time.perf_counter()
model, device = self.get_model(model_id)
process_res = min(max(image.shape[0], image.shape[1]), int(process_res_cap))
t_pre = _time.perf_counter()
with torch.inference_mode():
pred = model.inference(
image=[image],
process_res=process_res,
process_res_method="upper_bound_resize",
export_dir=None,
)
t_model = _time.perf_counter()
depth_raw = np.array(pred.depth[0])
depth = remove_global_plane(depth_raw, method=plane_method)
t_post = _time.perf_counter()
timings = {
"prep_ms": (t_pre - t0) * 1000.0,
"model_ms": (t_model - t_pre) * 1000.0,
"plane_ms": (t_post - t_model) * 1000.0,
}
return depth_raw, depth, process_res, timings
def smooth_depth(depth: np.ndarray, sigma: float) -> np.ndarray:
if sigma <= 0:
return depth
k = max(3, int(round(sigma * 3)) * 2 + 1)
try:
depth = cv2.GaussianBlur(depth, (k, k), sigmaX=sigma, sigmaY=sigma)
except Exception:
pass
return depth
__all__ = [
"DepthEngine",
"crop_nonblack",
"pick_flat_patch",
"remove_global_plane",
"smooth_depth",
"visualize_depth",
]