#!/usr/bin/env python3 """ Gradio demo: depth overlays on VISLOC imagery using Depth Anything 3. Run: python gradio_app.py Then open the printed local URL. Requires: gradio, pillow, torch, transformers (for water mask). """ import cv2 import functools import math import os from pathlib import Path import gradio as gr import numpy as np import torch from PIL import Image, ImageDraw, ImageFilter import matplotlib.cm as cm # Prefer installed package; fall back to local src for dev runs. try: from depth_anything_3.api import DepthAnything3 # type: ignore from depth_anything_3.utils.visualize import visualize_depth # type: ignore except ModuleNotFoundError: import sys ROOT = Path(__file__).resolve().parent sys.path.append(str(ROOT / "src")) from depth_anything_3.api import DepthAnything3 # noqa: E402 from depth_anything_3.utils.visualize import visualize_depth # noqa: E402 VISLOC_DIR = Path("data/Image/VISLOC") HAGDAVS_DIR = Path("data/Image/HAGDAVS") VIDEO_DIR = Path("data/Video") IMAGE_EXTS = (".jpg", ".jpeg", ".png", ".JPG", ".JPEG", ".PNG") VIDEO_EXTS = {".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv", ".webm", ".m4v"} DEFAULT_ALTITUDE_M = 450.0 ASSUMED_FOV_DEG = 90.0 WATER_MODEL_ID = "facebook/mask2former-swin-large-ade-semantic" ROAD_MODEL_ID = "facebook/mask2former-swin-large-ade-semantic" def crop_nonblack(img: Image.Image, frac: float = 0.05) -> Image.Image: """Naively crop a fixed fraction off each border (to drop black padding).""" w, h = img.size dx = int(round(w * frac)) dy = int(round(h * frac)) return img.crop((dx, dy, w - dx, h - dy)) @functools.lru_cache(maxsize=1) def get_water_segmenter(model_id: str): """Load Mask2Former for water masking (kept on CPU to avoid OOM).""" try: from transformers import AutoImageProcessor, Mask2FormerForUniversalSegmentation except ImportError as e: raise ImportError("transformers is required for water masking; install with `pip install transformers`") from e device = torch.device("cpu") try: processor = AutoImageProcessor.from_pretrained(model_id, use_fast=True) except TypeError: processor = AutoImageProcessor.from_pretrained(model_id) model = Mask2FormerForUniversalSegmentation.from_pretrained(model_id).to(device) model.eval() return processor, model, device def compute_water_mask(img: Image.Image, model_id: str, max_side: int = 640) -> np.ndarray | None: """Return boolean mask for water-like classes using Mask2Former ADE weights.""" processor, model, device = get_water_segmenter(model_id) try: img_proc = img if max(img.size) > max_side: scale = max_side / max(img.size) new_size = (int(round(img.size[0] * scale)), int(round(img.size[1] * scale))) img_proc = img.resize(new_size, resample=Image.BILINEAR) try: inputs = processor(images=img_proc, return_tensors="pt", use_fast=True).to(device) except TypeError: inputs = processor(images=img_proc, return_tensors="pt").to(device) with torch.inference_mode(): outputs = model(**inputs) seg = processor.post_process_semantic_segmentation(outputs, target_sizes=[img_proc.size[::-1]])[0] if torch.is_tensor(seg): seg = seg.cpu() labels = model.config.id2label keywords = ["water", "sea", "lake", "river", "ocean", "pond"] water_ids = {i for i, name in labels.items() if any(k in name.lower() for k in keywords)} seg_np = np.array(seg) mask_small = np.isin(seg_np, list(water_ids)).astype(np.uint8) * 255 mask_img = Image.fromarray(mask_small).resize(img.size, resample=Image.NEAREST) return np.array(mask_img) > 0 except RuntimeError as e: print(f"[WARN] Water masking failed (fallback to no water mask): {e}") return None def compute_road_mask(img: Image.Image, model_id: str, max_side: int = 640) -> np.ndarray | None: """Return boolean mask for road/highway classes using Mask2Former ADE weights.""" processor, model, device = get_water_segmenter(model_id) try: img_proc = img if max(img.size) > max_side: scale = max_side / max(img.size) new_size = (int(round(img.size[0] * scale)), int(round(img.size[1] * scale))) img_proc = img.resize(new_size, resample=Image.BILINEAR) try: inputs = processor(images=img_proc, return_tensors="pt", use_fast=True).to(device) except TypeError: inputs = processor(images=img_proc, return_tensors="pt").to(device) with torch.inference_mode(): outputs = model(**inputs) seg = processor.post_process_semantic_segmentation(outputs, target_sizes=[img_proc.size[::-1]])[0] if torch.is_tensor(seg): seg = seg.cpu() labels = model.config.id2label keywords = ["highway", "road", "street", "runway"] blocklist = ["field", "park", "grass", "lawn", "garden", "court", "yard", "green"] road_ids = { i for i, name in labels.items() if any(k in name.lower() for k in keywords) and not any(b in name.lower() for b in blocklist) } seg_np = np.array(seg) mask_small = np.isin(seg_np, list(road_ids)).astype(np.uint8) * 255 mask_img = Image.fromarray(mask_small).resize(img.size, resample=Image.NEAREST) return np.array(mask_img) > 0 except RuntimeError as e: print(f"[WARN] Road masking failed (fallback to no road mask): {e}") return None def compute_roof_mask_depth(depth: np.ndarray, aggressiveness: float = 1.3, morph_kernel: int = 5) -> np.ndarray: """Depth-based roof/structure mask: flag pixels significantly closer than the median (raised surfaces).""" d = depth.astype(np.float32) med = np.median(d) mad = np.median(np.abs(d - med)) + 1e-6 threshold = med - aggressiveness * mad mask = d < threshold mask = mask.astype(np.uint8) k = max(1, int(morph_kernel)) if k % 2 == 0: k += 1 kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k, k)) try: mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel) mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel) except Exception: pass return mask > 0 def remove_global_plane(depth: np.ndarray) -> np.ndarray: """Remove best-fit global plane from depth to avoid penalizing large flat areas viewed at an angle.""" if depth.ndim != 2: return depth h, w = depth.shape yy, xx = np.mgrid[0:h, 0:w].astype(np.float32) A = np.stack((xx, yy, np.ones_like(xx)), axis=-1).reshape(-1, 3) b = depth.astype(np.float32).reshape(-1, 1) try: coef, _, _, _ = np.linalg.lstsq(A, b, rcond=None) plane = (A @ coef).reshape(h, w) return depth - plane except np.linalg.LinAlgError: return depth def pick_flat_patch( depth: np.ndarray, patch: int = 96, std_thresh: float = 0.03, grad_thresh: float = 0.35, water_mask: np.ndarray | None = None, ): """Find a low-variance depth window as a proxy for flat landing area.""" depth = depth.astype(np.float32) if depth.ndim != 2: raise ValueError("Depth map must be 2D (H, W)") patch = max(3, min(patch, min(depth.shape))) if patch % 2 == 0: patch += 1 # keeps pooling output same size depth_norm = (depth - depth.min()) / (depth.ptp() + 1e-6) # Efficient box std via torch avg pooling import torch.nn.functional as F def box_mean(arr, k): pad = k // 2 t = torch.from_numpy(arr).unsqueeze(0).unsqueeze(0) # Reflective padding avoids dark/bright rims in the std map t = F.pad(t, (pad, pad, pad, pad), mode="reflect") mean = F.avg_pool2d(t, kernel_size=k, stride=1, padding=0, count_include_pad=False) return mean.squeeze(0).squeeze(0).numpy() mean = box_mean(depth_norm, patch) mean_sq = box_mean(depth_norm * depth_norm, patch) var = np.maximum(mean_sq - mean * mean, 0.0) std_map = np.sqrt(var) # Gradient mask to down-weight slopes/edges dy, dx = np.gradient(depth_norm) grad = np.sqrt(dx * dx + dy * dy) grad_ref = np.percentile(grad, 95) + 1e-6 grad_norm = np.clip(grad / grad_ref, 0.0, 1.0) grad_mask = grad_norm < grad_thresh landing_mask = grad_mask if water_mask is not None and water_mask.shape == grad_mask.shape: landing_mask = landing_mask & (~water_mask) masked_std = np.where(landing_mask, std_map, np.inf) if not np.isfinite(masked_std).any(): masked_std = std_map # fallback: just take the flattest spot y, x = np.unravel_index(np.argmin(masked_std), masked_std.shape) half = patch // 2 y0, y1 = max(y - half, 0), min(y + half, depth.shape[0] - 1) x0, x1 = max(x - half, 0), min(x + half, depth.shape[1] - 1) return (x0, y0, x1, y1), std_map, grad_norm, grad_mask, landing_mask def make_safety_heatmap( rgb: Image.Image, safe_mask: np.ndarray, ): """Produce a safety heatmap overlay on RGB from a provided safe mask.""" score = np.clip(safe_mask.astype(np.float32), 0.0, 1.0) # Color: red (unsafe) -> green (safe). Gamma the green channel and cap its max # so bright green does not overpower red when blended on the base image. green = np.power(score, 1.2) * 200.0 red = np.power(1.0 - score, 0.9) * 255.0 heat = np.zeros((*score.shape, 3), dtype=np.uint8) heat[..., 0] = red heat[..., 1] = green heat_img = Image.fromarray(heat).resize(rgb.size, resample=Image.NEAREST) score_gray = Image.fromarray((score * 255).astype(np.uint8)).resize(rgb.size, resample=Image.NEAREST) return heat_img, score_gray @functools.lru_cache(maxsize=1) def get_model(model_id: str = "depth-anything/DA3METRIC-LARGE"): """Load model once and cache.""" device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = DepthAnything3.from_pretrained(model_id).to(device) model.eval() return model, device @functools.lru_cache(maxsize=1) def list_visloc_images() -> list[Path]: """Return sorted VISLOC image paths from data/Image/VISLOC.""" if not VISLOC_DIR.exists(): return [] files = [p for p in VISLOC_DIR.iterdir() if p.suffix in IMAGE_EXTS] return sorted(files) @functools.lru_cache(maxsize=1) def list_hagdavs_images() -> list[Path]: """Return sorted HAGDAVS image paths from data/Image/HAGDAVS.""" if not HAGDAVS_DIR.exists(): return [] files = [p for p in HAGDAVS_DIR.iterdir() if p.suffix in IMAGE_EXTS] return sorted(files) @functools.lru_cache(maxsize=1) def list_videos() -> list[Path]: if not VIDEO_DIR.exists(): return [] files = [p for p in VIDEO_DIR.iterdir() if p.suffix.lower() in VIDEO_EXTS] return sorted(files) @functools.lru_cache(maxsize=1) def list_all_data_inputs() -> list[str]: """Collect VISLOC image files for selection.""" return [str(p) for p in list_visloc_images()] # Simple cache for water/road masks keyed by (model_id, path) WATER_MASK_CACHE: dict[tuple[str, str], np.ndarray] = {} ROAD_MASK_CACHE: dict[tuple[str, str], np.ndarray] = {} def run_on_image( image: Image.Image, footprint_m: float, std_thresh: float, grad_thresh: float, use_water_mask: bool, use_road_mask: bool, use_roof_mask: bool, altitude_m: float, fov_deg: float, flatness_detail: float, clearance_factor: float, process_res_cap: int, roof_aggressiveness: float, roof_morph_frac: float, segmentation_max_side: int, depth_smoothing_base: float, coverage_strictness: float, min_component_multiplier: float, model_id: str, source_path: str | None = None, ) -> dict: rgb_np = np.array(image) model, device = get_model(model_id) # Fixed upper-bound resolution (cap) while avoiding upscaling small images. process_res = min(max(image.size), int(process_res_cap)) with torch.inference_mode(): pred = model.inference( image=[rgb_np], process_res=process_res, process_res_method="upper_bound_resize", export_dir=None, ) depth_raw = np.array(pred.depth[0]) depth = remove_global_plane(depth_raw) # Smooth depth for resolution-invariant flatness/gradient (higher res -> slightly more smoothing) res_scale = max(0.5, min(2.5, process_res / 1024)) sigma = max(0.0, depth_smoothing_base) * res_scale k = max(3, int(round(sigma * 3)) * 2 + 1) try: depth = cv2.GaussianBlur(depth, (k, k), sigmaX=sigma, sigmaY=sigma) except Exception: pass # Convert landing footprint (meters) to pixels at current processed resolution fov = max(10.0, min(170.0, float(fov_deg))) altitude = max(1.0, float(altitude_m)) fx = (depth.shape[1] / 2.0) / math.tan(math.radians(fov) / 2.0) patch_px = footprint_m * fx / altitude patch_px = max(3, min(int(round(patch_px)), min(depth.shape) - 1)) if patch_px % 2 == 0: patch_px += 1 # keep pooling symmetric # For visualization, compute a flatness map with a smaller, sharper window (decoupled from footprint) depth_norm = (depth - depth.min()) / (depth.ptp() + 1e-6) vis_patch = max( 5, min( int(max(1.0, flatness_detail) * patch_px), min(depth.shape) // 10, min(depth.shape) - 1, ), ) if vis_patch % 2 == 0: vis_patch += 1 import torch.nn.functional as F def box_mean_np(arr: np.ndarray, k: int): pad = k // 2 t = torch.from_numpy(arr).unsqueeze(0).unsqueeze(0) t = F.pad(t, (pad, pad, pad, pad), mode="reflect") mean = F.avg_pool2d(t, kernel_size=k, stride=1, padding=0, count_include_pad=False) return mean.squeeze(0).squeeze(0).numpy() std_map_vis = np.sqrt( np.maximum(box_mean_np(depth_norm * depth_norm, vis_patch) - box_mean_np(depth_norm, vis_patch) ** 2, 0.0) ) # Optional water mask (resized to depth resolution) water_mask_resized = None water_mask_img = None if use_water_mask: cache_key = (WATER_MODEL_ID, source_path or "", int(segmentation_max_side)) if cache_key in WATER_MASK_CACHE: water_mask_img = WATER_MASK_CACHE[cache_key] else: water_mask_img = compute_water_mask(image, WATER_MODEL_ID, max_side=segmentation_max_side) if source_path is not None and water_mask_img is not None: WATER_MASK_CACHE[cache_key] = water_mask_img if water_mask_img is not None: water_mask_resized = ( np.array(water_mask_img) if isinstance(water_mask_img, np.ndarray) else np.array(water_mask_img) ) water_mask_resized = ( Image.fromarray(water_mask_resized.astype(np.uint8) * 255) .resize((depth.shape[1], depth.shape[0]), resample=Image.NEAREST) ) water_mask_resized = np.array(water_mask_resized) > 0 road_mask_resized = None road_mask_img = None if use_road_mask: cache_key_r = (ROAD_MODEL_ID, source_path or "", int(segmentation_max_side)) if cache_key_r in ROAD_MASK_CACHE: road_mask_img = ROAD_MASK_CACHE[cache_key_r] else: road_mask_img = compute_road_mask(image, ROAD_MODEL_ID, max_side=segmentation_max_side) if source_path is not None and road_mask_img is not None: ROAD_MASK_CACHE[cache_key_r] = road_mask_img if road_mask_img is not None: road_mask_resized = ( np.array(road_mask_img) if isinstance(road_mask_img, np.ndarray) else np.array(road_mask_img) ) road_mask_resized = ( Image.fromarray(road_mask_resized.astype(np.uint8) * 255) .resize((depth.shape[1], depth.shape[0]), resample=Image.NEAREST) ) road_mask_resized = np.array(road_mask_resized) > 0 roof_mask_resized = None if use_roof_mask: # Depth-based elevation mask: closer-than-median surfaces are treated as roofs/structures. aggressiveness = max(0.5, min(3.0, roof_aggressiveness)) morph_k = max(3, int(round(patch_px * roof_morph_frac))) roof_mask_resized = compute_roof_mask_depth(depth, aggressiveness=aggressiveness, morph_kernel=morph_k) box, std_map, grad_norm, grad_mask, landing_mask = pick_flat_patch( depth, patch=patch_px, std_thresh=std_thresh, grad_thresh=grad_thresh, water_mask=water_mask_resized, ) if road_mask_resized is not None: landing_mask = landing_mask & (~road_mask_resized) if roof_mask_resized is not None: landing_mask = landing_mask & (~roof_mask_resized) safe_mask = (std_map < std_thresh) & (grad_norm < grad_thresh) & landing_mask # Clearance: dilate hazards to enforce buffer around unsafe regions try: clearance_px = max(1, int(round(clearance_factor * patch_px))) if clearance_px % 2 == 0: clearance_px += 1 kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (clearance_px, clearance_px)) hazard = (~safe_mask).astype(np.uint8) buffered = cv2.dilate(hazard, kernel, iterations=1).astype(bool) safe_mask = safe_mask & (~buffered) except Exception: pass # Strict footprint coverage: a center is safe only if the full footprint is safe try: coverage = cv2.boxFilter( safe_mask.astype(np.float32), ddepth=-1, ksize=(patch_px, patch_px), normalize=True, anchor=(patch_px // 2, patch_px // 2), ) safe_mask = coverage >= max(0.0, min(1.0, coverage_strictness)) except Exception: pass # Drop tiny components: require at least one footprint area area_thresh = max(1, int(patch_px * patch_px * max(0.1, min_component_multiplier))) num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(safe_mask.astype(np.uint8), connectivity=8) if num_labels > 1: keep = np.zeros_like(labels, dtype=bool) for i in range(1, num_labels): if stats[i, cv2.CC_STAT_AREA] >= area_thresh: keep |= labels == i safe_mask = keep # Recommended landing spot overlay (scaled to input image size) # Prefer centers where the full footprint is safe; fall back to best flat spot safe_fit = safe_mask.astype(np.float32) try: coverage = cv2.boxFilter( safe_fit.astype(np.float32), ddepth=-1, ksize=(patch_px, patch_px), normalize=True, anchor=(patch_px // 2, patch_px // 2), ) valid_centers = coverage >= 1.0 except Exception: valid_centers = safe_fit > 0.5 if valid_centers.any(): cc_mask = valid_centers.astype(np.uint8) num_c, labels_c, stats_c, _ = cv2.connectedComponentsWithStats(cc_mask, connectivity=8) target_mask = valid_centers if num_c > 1: # Pick largest safe component by area (skip background) areas = stats_c[1:, cv2.CC_STAT_AREA] largest_idx = 1 + int(np.argmax(areas)) target_mask = labels_c == largest_idx cand = np.where(target_mask) std_cand = std_map[cand] idx = np.argmin(std_cand) cy, cx = cand[0][idx], cand[1][idx] else: y0, x0, y1, x1 = box[1], box[0], box[3], box[2] cy, cx = (y0 + y1) // 2, (x0 + x1) // 2 half = patch_px // 2 x0 = max(int(cx - half), 0) x1 = min(int(cx + half), depth.shape[1] - 1) y0 = max(int(cy - half), 0) y1 = min(int(cy + half), depth.shape[0] - 1) scale_x = image.width / depth.shape[1] scale_y = image.height / depth.shape[0] # Draw a box whose side length matches the footprint in input-image pixels side_img = max(3, int(round(patch_px * scale_x))) cx_img = int(round(cx * scale_x)) cy_img = int(round(cy * scale_y)) half_img = side_img // 2 bx0 = max(cx_img - half_img, 0) bx1 = min(cx_img + half_img, image.width - 1) by0 = max(cy_img - half_img, 0) by1 = min(cy_img + half_img, image.height - 1) spot_overlay = Image.new("RGBA", image.size, (0, 0, 0, 0)) draw = ImageDraw.Draw(spot_overlay) draw.rectangle((bx0, by0, bx1, by1), outline=(0, 255, 0, 255), width=4) cx, cy = (bx0 + bx1) // 2, (by0 + by1) // 2 draw.ellipse((cx - 5, cy - 5, cx + 5, cy + 5), fill=(0, 255, 0, 255)) depth_vis = Image.fromarray(visualize_depth(depth_raw, cmap="Spectral")).resize( image.size, resample=Image.BILINEAR ) flatness_img = Image.fromarray((std_map_vis / (std_map_vis.max() + 1e-6) * 255).astype(np.uint8)).resize( image.size, resample=Image.NEAREST ) grad_img = Image.fromarray((grad_norm * 255).astype(np.uint8)).resize( image.size, resample=Image.BILINEAR ) grad_mask_img = Image.fromarray(((grad_norm < grad_thresh).astype(np.uint8) * 255)).resize( image.size, resample=Image.NEAREST ) water_mask_view = None if use_water_mask and water_mask_img is not None: water_mask_view = Image.fromarray((np.array(water_mask_img).astype(np.uint8) * 255)) water_mask_view = water_mask_view.resize(image.size, resample=Image.NEAREST) road_mask_view = None if use_road_mask and road_mask_img is not None: road_mask_view = Image.fromarray((np.array(road_mask_img).astype(np.uint8) * 255)) road_mask_view = road_mask_view.resize(image.size, resample=Image.NEAREST) roof_mask_view = None if use_roof_mask and roof_mask_resized is not None: roof_mask_view = Image.fromarray((roof_mask_resized.astype(np.uint8) * 255)) roof_mask_view = roof_mask_view.resize(image.size, resample=Image.NEAREST) heat_overlay, heat_gray = make_safety_heatmap(image, safe_mask) images = { "RGB": image, "Depth": depth_vis, "Flatness map (std)": flatness_img, "Depth gradient": grad_img, "Gradient mask": grad_mask_img, "Water mask": water_mask_view if water_mask_view is not None else Image.new("L", image.size, 0), "Road mask": road_mask_view if road_mask_view is not None else Image.new("L", image.size, 0), "Roof mask": roof_mask_view if roof_mask_view is not None else Image.new("L", image.size, 0), "Safety heatmap overlay": heat_overlay, "Safety score": heat_gray, "Landing spot overlay": spot_overlay, } return images def process_image( input_path: str, footprint_m: float, std_thresh: float, grad_thresh: float, use_water_mask: bool, use_road_mask: bool, use_roof_mask: bool, altitude_m: float, fov_deg: float, flatness_detail: float, clearance_factor: float, process_res_cap: int, roof_aggressiveness: float, roof_morph_frac: float, segmentation_max_side: int, depth_smoothing_base: float, coverage_strictness: float, min_component_multiplier: float, model_id: str, source_path: str | None = None, ) -> dict: path = Path(input_path) if not path.exists(): raise gr.Error(f"Input path not found: {path}") if path.suffix.lower() not in IMAGE_EXTS: raise gr.Error(f"Unsupported image type for path: {path}") image = crop_nonblack(Image.open(path).convert("RGB")) return run_on_image( image=image, footprint_m=footprint_m, std_thresh=std_thresh, grad_thresh=grad_thresh, use_water_mask=use_water_mask, use_road_mask=use_road_mask, use_roof_mask=use_roof_mask, altitude_m=altitude_m, fov_deg=fov_deg, flatness_detail=flatness_detail, clearance_factor=clearance_factor, process_res_cap=process_res_cap, roof_aggressiveness=roof_aggressiveness, roof_morph_frac=roof_morph_frac, segmentation_max_side=segmentation_max_side, depth_smoothing_base=depth_smoothing_base, coverage_strictness=coverage_strictness, min_component_multiplier=min_component_multiplier, model_id=model_id, source_path=str(path), ) def compose_view( images_dict: dict, base_view: str, heat_on: bool, heat_alpha: float, grad_on: bool, grad_alpha: float, flat_on: bool, flat_alpha: float, water_on: bool, water_alpha: float, water_enabled: bool, spot_on: bool, road_on: bool, road_alpha: float, road_enabled: bool, roof_on: bool, roof_alpha: float, roof_enabled: bool, ) -> Image.Image: """Return a composited view with per-layer alpha controls.""" if not images_dict: raise gr.Error("Run inference first, then select a view.") if base_view not in images_dict: raise gr.Error(f"Unknown view: {base_view}") base = images_dict.get(base_view) if base is None: raise gr.Error(f"No image for view: {base_view}") out = base.convert("RGBA") if heat_on and "Safety heatmap overlay" in images_dict: heat = images_dict["Safety heatmap overlay"] if heat is not None: heat_rgba = heat.convert("RGBA") alpha = int(min(max(heat_alpha, 0.0), 1.0) * 255) heat_rgba.putalpha(alpha) out = Image.alpha_composite(out, heat_rgba) if grad_on and "Depth gradient" in images_dict: grad_img = images_dict["Depth gradient"] if grad_img is not None: grad_rgba = grad_img.convert("RGBA") grad_rgba.putalpha(int(min(max(grad_alpha, 0.0), 1.0) * 255)) out = Image.alpha_composite(out, grad_rgba) if flat_on and "Flatness map (std)" in images_dict: flat_img = images_dict["Flatness map (std)"] if flat_img is not None: flat_rgba = flat_img.convert("RGBA") flat_rgba.putalpha(int(min(max(flat_alpha, 0.0), 1.0) * 255)) out = Image.alpha_composite(out, flat_rgba) if water_on and water_enabled and "Water mask" in images_dict: wm = images_dict["Water mask"] if wm is not None: m = wm.convert("L") overlay = Image.new("RGBA", wm.size, (255, 0, 0, 0)) alpha = int(min(max(water_alpha, 0.0), 1.0) * 255) overlay.putalpha(Image.eval(m, lambda px: int(px * (alpha / 255.0)))) out = Image.alpha_composite(out, overlay) if road_on and road_enabled and "Road mask" in images_dict: rm = images_dict["Road mask"] if rm is not None: m = rm.convert("L") overlay = Image.new("RGBA", rm.size, (255, 165, 0, 0)) # orange alpha = int(min(max(road_alpha, 0.0), 1.0) * 255) overlay.putalpha(Image.eval(m, lambda px: int(px * (alpha / 255.0)))) out = Image.alpha_composite(out, overlay) if roof_on and roof_enabled and "Roof mask" in images_dict: rf = images_dict["Roof mask"] if rf is not None: m = rf.convert("L") overlay = Image.new("RGBA", rf.size, (255, 0, 255, 0)) # magenta tint for roofs alpha = int(min(max(roof_alpha, 0.0), 1.0) * 255) overlay.putalpha(Image.eval(m, lambda px: int(px * (alpha / 255.0)))) out = Image.alpha_composite(out, overlay) if spot_on and "Landing spot overlay" in images_dict: spot = images_dict["Landing spot overlay"] if spot is not None: out = Image.alpha_composite(out, spot.convert("RGBA")) return out.convert("RGB") def build_ui(): with gr.Blocks(title="Landing Site Safety Analyzer (VISLOC)") as demo: gr.Markdown( "## Landing Site Safety Analyzer\n" "Run DepthAnything3 on VISLOC images under `data/Image/VISLOC` to evaluate landing zones: depth, safety heatmap, gradients, flatness, and water masks. Toggle layers, footprint, and opacity to assess safety." ) with gr.Row(): with gr.Column(scale=1, min_width=320): gr.Markdown("### Input") all_choices = list_all_data_inputs() input_path = gr.Dropdown( label="Input file", choices=all_choices, value=all_choices[0] if all_choices else "", info="Pick any VISLOC image under data/Image/VISLOC/.", ) footprint_m = gr.Slider( label="Landing footprint (meters)", value=10, minimum=1, maximum=150, step=1, info="Side length (meters) of the clear area required for landing (assumes ~450m altitude, 90° FOV).", ) std_thresh = gr.Slider( label="Flatness threshold", value=0.01, minimum=0.001, maximum=0.08, step=0.001, info="Lower values favor flatter regions when computing the heatmap.", ) grad_thresh = gr.Slider( label="Gradient threshold", value=0.1, minimum=0.02, maximum=1.0, step=0.01, info="Lower values suppress sloped/edgy areas in the heatmap.", ) flatness_detail = gr.Slider( label="Flatness detail (relative)", value=1.0, minimum=0.5, maximum=2.5, step=0.1, info="Scales the window for the flatness visualization; lower = finer detail.", ) clearance_factor = gr.Slider( label="Clearance factor", value=0.5, minimum=0.0, maximum=2.0, step=0.05, info="How much to dilate unsafe regions relative to the footprint to enforce buffer distance.", ) process_res_cap = gr.Slider( label="Processing resolution cap", value=1024, minimum=512, maximum=2048, step=32, info="Upper bound on the longest side fed to the depth model; avoids oversized, noisy inference.", ) depth_smoothing_base = gr.Slider( label="Depth smoothing base", value=0.8, minimum=0.0, maximum=2.0, step=0.05, info="Base Gaussian sigma multiplier for depth smoothing (scaled by resolution).", ) coverage_strictness = gr.Slider( label="Coverage strictness", value=0.999, minimum=0.8, maximum=1.0, step=0.001, info="Minimum fraction of a footprint that must be safe to count a center as safe.", ) min_component_multiplier = gr.Slider( label="Min safe area (x footprint)", value=1.0, minimum=0.1, maximum=5.0, step=0.1, info="Minimum safe component area in multiples of footprint^2.", ) segmentation_max_side = gr.Slider( label="Segmentation max side", value=640, minimum=256, maximum=1024, step=32, info="Resize longest image side to this for water/road segmentation.", ) with gr.Accordion("Camera settings", open=False): altitude_m = gr.Slider( label="Camera altitude (m)", value=450, minimum=10, maximum=1500, step=5, info="Altitude used to convert footprint meters to pixels.", ) fov_deg = gr.Slider( label="Camera FOV (deg)", value=90, minimum=30, maximum=150, step=1, info="Horizontal field of view used for footprint sizing.", ) model_id = gr.Dropdown( label="Model", value="depth-anything/DA3MONO-LARGE", choices=[ "depth-anything/DA3MONO-LARGE", "depth-anything/DA3METRIC-LARGE", "depth-anything/DA3-BASE", "depth-anything/DA3NESTED-GIANT-LARGE", ], info="Which pretrained DepthAnything3 checkpoint to use.", ) with gr.Accordion("Masking", open=True): with gr.Row(): use_water_mask = gr.Checkbox( label="Exclude water (segmentation)", value=True, info="Apply water segmentation to down-weight water regions." ) use_road_mask = gr.Checkbox( label="Exclude roads (segmentation)", value=True, info="Apply road segmentation to avoid roads/highways." ) use_roof_mask = gr.Checkbox( label="Exclude rooftops (depth)", value=True, info="Use depth (closer-than-median) to avoid rooftops/raised structures." ) roof_aggressiveness = gr.Slider( label="Rooftop aggressiveness (MAD multiplier)", value=1.3, minimum=0.5, maximum=3.0, step=0.05, info="Higher = more aggressive exclusion of raised areas in the depth-based rooftop mask.", ) roof_morph_frac = gr.Slider( label="Rooftop morph kernel (fraction of footprint px)", value=0.15, minimum=0.05, maximum=0.5, step=0.01, info="Controls smoothing/merging of rooftop mask relative to footprint size.", ) with gr.Row(): run_btn = gr.Button("Run", variant="primary", scale=1) stop_btn = gr.Button("Stop", variant="stop", scale=1) images_state = gr.State({}) with gr.Column(scale=3): gr.Markdown("### Preview") main_view = gr.Image( label="Preview", height=800, elem_id="main-preview", show_fullscreen_button=False, ) gr.HTML( """
""", elem_id="main-preview-zoom-helper", ) with gr.Column(scale=1, min_width=260): gr.Markdown("### Overlays") base_view = gr.Dropdown( label="Base view", value="RGB", choices=[ "RGB", "Depth", "Flatness map (std)", "Depth gradient", "Gradient mask", "Water mask", "Safety score", "Safety heatmap overlay", ], ) heat_on = gr.Checkbox(label="Heatmap", value=True, info="Show the safety heatmap overlay.") heat_alpha = gr.Slider( label="Heatmap alpha", value=0.15, minimum=0.0, maximum=1.0, step=0.05, info="Heatmap opacity." ) grad_on = gr.Checkbox(label="Depth gradient", value=False, info="Overlay the depth gradient magnitude.") grad_alpha = gr.Slider( label="Gradient alpha", value=0.35, minimum=0.0, maximum=1.0, step=0.05, info="Gradient overlay opacity." ) flat_on = gr.Checkbox(label="Flatness map", value=False, info="Overlay per-pixel flatness (std).") flat_alpha = gr.Slider( label="Flatness alpha", value=0.25, minimum=0.0, maximum=1.0, step=0.05, info="Flatness overlay opacity." ) spot_on = gr.Checkbox(label="Show landing spot", value=True, info="Overlay the recommended landing box.") with gr.Accordion("Mask overlays", open=True): water_on = gr.Checkbox(label="Water mask overlay", value=False, info="Overlay detected water regions.") water_alpha = gr.Slider( label="Water mask alpha", value=0.5, minimum=0.0, maximum=1.0, step=0.05, info="Water overlay opacity.", ) road_on = gr.Checkbox(label="Road mask overlay", value=False, info="Overlay detected road regions.") road_alpha = gr.Slider( label="Road mask alpha", value=0.5, minimum=0.0, maximum=1.0, step=0.05, info="Road overlay opacity.", ) roof_on = gr.Checkbox(label="Roof mask overlay", value=False, info="Overlay detected roof regions.") roof_alpha = gr.Slider( label="Roof mask alpha", value=0.5, minimum=0.0, maximum=1.0, step=0.05, info="Roof overlay opacity.", ) def process_any( input_path, footprint_m, std_thresh, grad_thresh, use_water_mask, use_road_mask, use_roof_mask, altitude_m, fov_deg, flatness_detail, clearance_factor, model_id, base_view, heat_on, heat_alpha, grad_on, grad_alpha, flat_on, flat_alpha, water_on, water_alpha, spot_on, road_on, road_alpha, roof_on, roof_alpha, ): if not input_path: raise gr.Error("Select an input image first.") path = Path(input_path) if not path.exists(): raise gr.Error(f"Input not found: {path}") if path.suffix.lower() in IMAGE_EXTS: imgs = process_image( input_path=str(path), footprint_m=footprint_m, std_thresh=std_thresh, grad_thresh=grad_thresh, use_water_mask=use_water_mask, use_road_mask=use_road_mask, use_roof_mask=use_roof_mask, altitude_m=altitude_m, fov_deg=fov_deg, flatness_detail=flatness_detail, clearance_factor=clearance_factor, model_id=model_id, source_path=str(path), ) composed = compose_view( imgs, base_view, heat_on, heat_alpha, grad_on, grad_alpha, flat_on, flat_alpha, water_on, water_alpha, water_enabled=use_water_mask, road_on=road_on, road_alpha=road_alpha, road_enabled=use_road_mask, roof_on=roof_on, roof_alpha=roof_alpha, roof_enabled=use_roof_mask, spot_on=spot_on, ) yield imgs, composed else: raise gr.Error(f"Unsupported input type for path: {path} (images only)") run_event = run_btn.click( fn=process_any, inputs=[ input_path, footprint_m, std_thresh, grad_thresh, use_water_mask, use_road_mask, use_roof_mask, altitude_m, fov_deg, flatness_detail, clearance_factor, model_id, base_view, heat_on, heat_alpha, grad_on, grad_alpha, flat_on, flat_alpha, water_on, water_alpha, spot_on, road_on, road_alpha, roof_on, roof_alpha, ], outputs=[images_state, main_view], ) stop_btn.click(fn=None, inputs=None, outputs=None, cancels=[run_event]) def update_preview_ui( images_state_val, input_path_val, footprint_m_val, std_thresh_val, grad_thresh_val, use_water_mask_val, use_road_mask_val, use_roof_mask_val, altitude_m_val, fov_deg_val, flatness_detail_val, clearance_factor_val, model_id_val, base_view_val, heat_on_val, heat_alpha_val, grad_on_val, grad_alpha_val, flat_on_val, flat_alpha_val, water_on_val, water_alpha_val, spot_on_val, road_on_val, road_alpha_val, roof_on_val, roof_alpha_val, ): path = Path(str(input_path_val)) imgs_val = images_state_val # If current input is an image, re-run processing to reflect new settings if path.exists() and path.suffix.lower() in IMAGE_EXTS: try: imgs_val = process_image( input_path=str(path), footprint_m=footprint_m_val, std_thresh=std_thresh_val, grad_thresh=grad_thresh_val, use_water_mask=use_water_mask_val, use_road_mask=use_road_mask_val, use_roof_mask=use_roof_mask_val, altitude_m=altitude_m_val, fov_deg=fov_deg_val, flatness_detail=flatness_detail_val, clearance_factor=clearance_factor_val, model_id=model_id_val, ) except Exception: imgs_val = images_state_val if not imgs_val: return images_state_val, gr.update() composed = compose_view( imgs_val, base_view_val, heat_on_val, heat_alpha_val, grad_on_val, grad_alpha_val, flat_on_val, flat_alpha_val, water_on_val, water_alpha_val, use_water_mask_val, spot_on_val, road_on_val, road_alpha_val, use_road_mask_val, roof_on_val, roof_alpha_val, use_roof_mask_val, ) return imgs_val, composed overlay_inputs = [ images_state, base_view, heat_on, heat_alpha, grad_on, grad_alpha, flat_on, flat_alpha, water_on, water_alpha, spot_on, use_water_mask, road_on, road_alpha, use_road_mask, roof_on, roof_alpha, use_roof_mask, ] def update_overlays_only( images_state_val, base_view_val, heat_on_val, heat_alpha_val, grad_on_val, grad_alpha_val, flat_on_val, flat_alpha_val, water_on_val, water_alpha_val, spot_on_val, use_water_mask_val, road_on_val, road_alpha_val, use_road_mask_val, roof_on_val, roof_alpha_val, use_roof_mask_val, ): if not images_state_val: return images_state_val, gr.update() return images_state_val, compose_view( images_state_val, base_view_val, heat_on_val, heat_alpha_val, grad_on_val, grad_alpha_val, flat_on_val, flat_alpha_val, water_on_val, water_alpha_val, use_water_mask_val, spot_on_val, road_on_val, road_alpha_val, use_road_mask_val, roof_on_val, roof_alpha_val, use_roof_mask_val, ) base_view.change(fn=update_overlays_only, inputs=overlay_inputs, outputs=[images_state, main_view]) for control in ( heat_on, heat_alpha, grad_on, grad_alpha, flat_on, flat_alpha, water_on, water_alpha, spot_on, use_water_mask, road_on, road_alpha, use_road_mask, roof_on, roof_alpha, use_roof_mask, ): control.change(fn=update_overlays_only, inputs=overlay_inputs, outputs=[images_state, main_view]) model_inputs = [ images_state, input_path, footprint_m, std_thresh, grad_thresh, use_water_mask, use_road_mask, use_roof_mask, altitude_m, fov_deg, flatness_detail, clearance_factor, model_id, base_view, heat_on, heat_alpha, grad_on, grad_alpha, flat_on, flat_alpha, water_on, water_alpha, spot_on, road_on, road_alpha, roof_on, roof_alpha, ] for control in ( input_path, footprint_m, std_thresh, grad_thresh, use_water_mask, use_road_mask, use_roof_mask, altitude_m, fov_deg, flatness_detail, clearance_factor, model_id, ): control.change(fn=update_preview_ui, inputs=model_inputs, outputs=[images_state, main_view]) return demo if __name__ == "__main__": demo = build_ui() demo.queue().launch()