drone-landing-safety / gradio_app.py
yakvrz's picture
Initial import
0c4c32b
raw
history blame
52 kB
#!/usr/bin/env python3
"""
Gradio demo: depth overlays on VISLOC imagery using Depth Anything 3.
Run:
python gradio_app.py
Then open the printed local URL. Requires: gradio, pillow, torch, transformers (for water mask).
"""
import cv2
import functools
import math
import os
from pathlib import Path
import gradio as gr
import numpy as np
import torch
from PIL import Image, ImageDraw, ImageFilter
import matplotlib.cm as cm
# Prefer installed package; fall back to local src for dev runs.
try:
from depth_anything_3.api import DepthAnything3 # type: ignore
from depth_anything_3.utils.visualize import visualize_depth # type: ignore
except ModuleNotFoundError:
import sys
ROOT = Path(__file__).resolve().parent
sys.path.append(str(ROOT / "src"))
from depth_anything_3.api import DepthAnything3 # noqa: E402
from depth_anything_3.utils.visualize import visualize_depth # noqa: E402
VISLOC_DIR = Path("data/Image/VISLOC")
HAGDAVS_DIR = Path("data/Image/HAGDAVS")
VIDEO_DIR = Path("data/Video")
IMAGE_EXTS = (".jpg", ".jpeg", ".png", ".JPG", ".JPEG", ".PNG")
VIDEO_EXTS = {".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv", ".webm", ".m4v"}
DEFAULT_ALTITUDE_M = 450.0
ASSUMED_FOV_DEG = 90.0
WATER_MODEL_ID = "facebook/mask2former-swin-large-ade-semantic"
ROAD_MODEL_ID = "facebook/mask2former-swin-large-ade-semantic"
def crop_nonblack(img: Image.Image, frac: float = 0.05) -> Image.Image:
"""Naively crop a fixed fraction off each border (to drop black padding)."""
w, h = img.size
dx = int(round(w * frac))
dy = int(round(h * frac))
return img.crop((dx, dy, w - dx, h - dy))
@functools.lru_cache(maxsize=1)
def get_water_segmenter(model_id: str):
"""Load Mask2Former for water masking (kept on CPU to avoid OOM)."""
try:
from transformers import AutoImageProcessor, Mask2FormerForUniversalSegmentation
except ImportError as e:
raise ImportError("transformers is required for water masking; install with `pip install transformers`") from e
device = torch.device("cpu")
try:
processor = AutoImageProcessor.from_pretrained(model_id, use_fast=True)
except TypeError:
processor = AutoImageProcessor.from_pretrained(model_id)
model = Mask2FormerForUniversalSegmentation.from_pretrained(model_id).to(device)
model.eval()
return processor, model, device
def compute_water_mask(img: Image.Image, model_id: str, max_side: int = 640) -> np.ndarray | None:
"""Return boolean mask for water-like classes using Mask2Former ADE weights."""
processor, model, device = get_water_segmenter(model_id)
try:
img_proc = img
if max(img.size) > max_side:
scale = max_side / max(img.size)
new_size = (int(round(img.size[0] * scale)), int(round(img.size[1] * scale)))
img_proc = img.resize(new_size, resample=Image.BILINEAR)
try:
inputs = processor(images=img_proc, return_tensors="pt", use_fast=True).to(device)
except TypeError:
inputs = processor(images=img_proc, return_tensors="pt").to(device)
with torch.inference_mode():
outputs = model(**inputs)
seg = processor.post_process_semantic_segmentation(outputs, target_sizes=[img_proc.size[::-1]])[0]
if torch.is_tensor(seg):
seg = seg.cpu()
labels = model.config.id2label
keywords = ["water", "sea", "lake", "river", "ocean", "pond"]
water_ids = {i for i, name in labels.items() if any(k in name.lower() for k in keywords)}
seg_np = np.array(seg)
mask_small = np.isin(seg_np, list(water_ids)).astype(np.uint8) * 255
mask_img = Image.fromarray(mask_small).resize(img.size, resample=Image.NEAREST)
return np.array(mask_img) > 0
except RuntimeError as e:
print(f"[WARN] Water masking failed (fallback to no water mask): {e}")
return None
def compute_road_mask(img: Image.Image, model_id: str, max_side: int = 640) -> np.ndarray | None:
"""Return boolean mask for road/highway classes using Mask2Former ADE weights."""
processor, model, device = get_water_segmenter(model_id)
try:
img_proc = img
if max(img.size) > max_side:
scale = max_side / max(img.size)
new_size = (int(round(img.size[0] * scale)), int(round(img.size[1] * scale)))
img_proc = img.resize(new_size, resample=Image.BILINEAR)
try:
inputs = processor(images=img_proc, return_tensors="pt", use_fast=True).to(device)
except TypeError:
inputs = processor(images=img_proc, return_tensors="pt").to(device)
with torch.inference_mode():
outputs = model(**inputs)
seg = processor.post_process_semantic_segmentation(outputs, target_sizes=[img_proc.size[::-1]])[0]
if torch.is_tensor(seg):
seg = seg.cpu()
labels = model.config.id2label
keywords = ["highway", "road", "street", "runway"]
blocklist = ["field", "park", "grass", "lawn", "garden", "court", "yard", "green"]
road_ids = {
i
for i, name in labels.items()
if any(k in name.lower() for k in keywords) and not any(b in name.lower() for b in blocklist)
}
seg_np = np.array(seg)
mask_small = np.isin(seg_np, list(road_ids)).astype(np.uint8) * 255
mask_img = Image.fromarray(mask_small).resize(img.size, resample=Image.NEAREST)
return np.array(mask_img) > 0
except RuntimeError as e:
print(f"[WARN] Road masking failed (fallback to no road mask): {e}")
return None
def compute_roof_mask_depth(depth: np.ndarray, aggressiveness: float = 1.3, morph_kernel: int = 5) -> np.ndarray:
"""Depth-based roof/structure mask: flag pixels significantly closer than the median (raised surfaces)."""
d = depth.astype(np.float32)
med = np.median(d)
mad = np.median(np.abs(d - med)) + 1e-6
threshold = med - aggressiveness * mad
mask = d < threshold
mask = mask.astype(np.uint8)
k = max(1, int(morph_kernel))
if k % 2 == 0:
k += 1
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k, k))
try:
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
except Exception:
pass
return mask > 0
def remove_global_plane(depth: np.ndarray) -> np.ndarray:
"""Remove best-fit global plane from depth to avoid penalizing large flat areas viewed at an angle."""
if depth.ndim != 2:
return depth
h, w = depth.shape
yy, xx = np.mgrid[0:h, 0:w].astype(np.float32)
A = np.stack((xx, yy, np.ones_like(xx)), axis=-1).reshape(-1, 3)
b = depth.astype(np.float32).reshape(-1, 1)
try:
coef, _, _, _ = np.linalg.lstsq(A, b, rcond=None)
plane = (A @ coef).reshape(h, w)
return depth - plane
except np.linalg.LinAlgError:
return depth
def pick_flat_patch(
depth: np.ndarray,
patch: int = 96,
std_thresh: float = 0.03,
grad_thresh: float = 0.35,
water_mask: np.ndarray | None = None,
):
"""Find a low-variance depth window as a proxy for flat landing area."""
depth = depth.astype(np.float32)
if depth.ndim != 2:
raise ValueError("Depth map must be 2D (H, W)")
patch = max(3, min(patch, min(depth.shape)))
if patch % 2 == 0:
patch += 1 # keeps pooling output same size
depth_norm = (depth - depth.min()) / (depth.ptp() + 1e-6)
# Efficient box std via torch avg pooling
import torch.nn.functional as F
def box_mean(arr, k):
pad = k // 2
t = torch.from_numpy(arr).unsqueeze(0).unsqueeze(0)
# Reflective padding avoids dark/bright rims in the std map
t = F.pad(t, (pad, pad, pad, pad), mode="reflect")
mean = F.avg_pool2d(t, kernel_size=k, stride=1, padding=0, count_include_pad=False)
return mean.squeeze(0).squeeze(0).numpy()
mean = box_mean(depth_norm, patch)
mean_sq = box_mean(depth_norm * depth_norm, patch)
var = np.maximum(mean_sq - mean * mean, 0.0)
std_map = np.sqrt(var)
# Gradient mask to down-weight slopes/edges
dy, dx = np.gradient(depth_norm)
grad = np.sqrt(dx * dx + dy * dy)
grad_ref = np.percentile(grad, 95) + 1e-6
grad_norm = np.clip(grad / grad_ref, 0.0, 1.0)
grad_mask = grad_norm < grad_thresh
landing_mask = grad_mask
if water_mask is not None and water_mask.shape == grad_mask.shape:
landing_mask = landing_mask & (~water_mask)
masked_std = np.where(landing_mask, std_map, np.inf)
if not np.isfinite(masked_std).any():
masked_std = std_map # fallback: just take the flattest spot
y, x = np.unravel_index(np.argmin(masked_std), masked_std.shape)
half = patch // 2
y0, y1 = max(y - half, 0), min(y + half, depth.shape[0] - 1)
x0, x1 = max(x - half, 0), min(x + half, depth.shape[1] - 1)
return (x0, y0, x1, y1), std_map, grad_norm, grad_mask, landing_mask
def make_safety_heatmap(
rgb: Image.Image,
safe_mask: np.ndarray,
):
"""Produce a safety heatmap overlay on RGB from a provided safe mask."""
score = np.clip(safe_mask.astype(np.float32), 0.0, 1.0)
# Color: red (unsafe) -> green (safe). Gamma the green channel and cap its max
# so bright green does not overpower red when blended on the base image.
green = np.power(score, 1.2) * 200.0
red = np.power(1.0 - score, 0.9) * 255.0
heat = np.zeros((*score.shape, 3), dtype=np.uint8)
heat[..., 0] = red
heat[..., 1] = green
heat_img = Image.fromarray(heat).resize(rgb.size, resample=Image.NEAREST)
score_gray = Image.fromarray((score * 255).astype(np.uint8)).resize(rgb.size, resample=Image.NEAREST)
return heat_img, score_gray
@functools.lru_cache(maxsize=1)
def get_model(model_id: str = "depth-anything/DA3METRIC-LARGE"):
"""Load model once and cache."""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = DepthAnything3.from_pretrained(model_id).to(device)
model.eval()
return model, device
@functools.lru_cache(maxsize=1)
def list_visloc_images() -> list[Path]:
"""Return sorted VISLOC image paths from data/Image/VISLOC."""
if not VISLOC_DIR.exists():
return []
files = [p for p in VISLOC_DIR.iterdir() if p.suffix in IMAGE_EXTS]
return sorted(files)
@functools.lru_cache(maxsize=1)
def list_hagdavs_images() -> list[Path]:
"""Return sorted HAGDAVS image paths from data/Image/HAGDAVS."""
if not HAGDAVS_DIR.exists():
return []
files = [p for p in HAGDAVS_DIR.iterdir() if p.suffix in IMAGE_EXTS]
return sorted(files)
@functools.lru_cache(maxsize=1)
def list_videos() -> list[Path]:
if not VIDEO_DIR.exists():
return []
files = [p for p in VIDEO_DIR.iterdir() if p.suffix.lower() in VIDEO_EXTS]
return sorted(files)
@functools.lru_cache(maxsize=1)
def list_all_data_inputs() -> list[str]:
"""Collect VISLOC image files for selection."""
return [str(p) for p in list_visloc_images()]
# Simple cache for water/road masks keyed by (model_id, path)
WATER_MASK_CACHE: dict[tuple[str, str], np.ndarray] = {}
ROAD_MASK_CACHE: dict[tuple[str, str], np.ndarray] = {}
def run_on_image(
image: Image.Image,
footprint_m: float,
std_thresh: float,
grad_thresh: float,
use_water_mask: bool,
use_road_mask: bool,
use_roof_mask: bool,
altitude_m: float,
fov_deg: float,
flatness_detail: float,
clearance_factor: float,
process_res_cap: int,
roof_aggressiveness: float,
roof_morph_frac: float,
segmentation_max_side: int,
depth_smoothing_base: float,
coverage_strictness: float,
min_component_multiplier: float,
model_id: str,
source_path: str | None = None,
) -> dict:
rgb_np = np.array(image)
model, device = get_model(model_id)
# Fixed upper-bound resolution (cap) while avoiding upscaling small images.
process_res = min(max(image.size), int(process_res_cap))
with torch.inference_mode():
pred = model.inference(
image=[rgb_np],
process_res=process_res,
process_res_method="upper_bound_resize",
export_dir=None,
)
depth_raw = np.array(pred.depth[0])
depth = remove_global_plane(depth_raw)
# Smooth depth for resolution-invariant flatness/gradient (higher res -> slightly more smoothing)
res_scale = max(0.5, min(2.5, process_res / 1024))
sigma = max(0.0, depth_smoothing_base) * res_scale
k = max(3, int(round(sigma * 3)) * 2 + 1)
try:
depth = cv2.GaussianBlur(depth, (k, k), sigmaX=sigma, sigmaY=sigma)
except Exception:
pass
# Convert landing footprint (meters) to pixels at current processed resolution
fov = max(10.0, min(170.0, float(fov_deg)))
altitude = max(1.0, float(altitude_m))
fx = (depth.shape[1] / 2.0) / math.tan(math.radians(fov) / 2.0)
patch_px = footprint_m * fx / altitude
patch_px = max(3, min(int(round(patch_px)), min(depth.shape) - 1))
if patch_px % 2 == 0:
patch_px += 1 # keep pooling symmetric
# For visualization, compute a flatness map with a smaller, sharper window (decoupled from footprint)
depth_norm = (depth - depth.min()) / (depth.ptp() + 1e-6)
vis_patch = max(
5,
min(
int(max(1.0, flatness_detail) * patch_px),
min(depth.shape) // 10,
min(depth.shape) - 1,
),
)
if vis_patch % 2 == 0:
vis_patch += 1
import torch.nn.functional as F
def box_mean_np(arr: np.ndarray, k: int):
pad = k // 2
t = torch.from_numpy(arr).unsqueeze(0).unsqueeze(0)
t = F.pad(t, (pad, pad, pad, pad), mode="reflect")
mean = F.avg_pool2d(t, kernel_size=k, stride=1, padding=0, count_include_pad=False)
return mean.squeeze(0).squeeze(0).numpy()
std_map_vis = np.sqrt(
np.maximum(box_mean_np(depth_norm * depth_norm, vis_patch) - box_mean_np(depth_norm, vis_patch) ** 2, 0.0)
)
# Optional water mask (resized to depth resolution)
water_mask_resized = None
water_mask_img = None
if use_water_mask:
cache_key = (WATER_MODEL_ID, source_path or "", int(segmentation_max_side))
if cache_key in WATER_MASK_CACHE:
water_mask_img = WATER_MASK_CACHE[cache_key]
else:
water_mask_img = compute_water_mask(image, WATER_MODEL_ID, max_side=segmentation_max_side)
if source_path is not None and water_mask_img is not None:
WATER_MASK_CACHE[cache_key] = water_mask_img
if water_mask_img is not None:
water_mask_resized = (
np.array(water_mask_img)
if isinstance(water_mask_img, np.ndarray)
else np.array(water_mask_img)
)
water_mask_resized = (
Image.fromarray(water_mask_resized.astype(np.uint8) * 255)
.resize((depth.shape[1], depth.shape[0]), resample=Image.NEAREST)
)
water_mask_resized = np.array(water_mask_resized) > 0
road_mask_resized = None
road_mask_img = None
if use_road_mask:
cache_key_r = (ROAD_MODEL_ID, source_path or "", int(segmentation_max_side))
if cache_key_r in ROAD_MASK_CACHE:
road_mask_img = ROAD_MASK_CACHE[cache_key_r]
else:
road_mask_img = compute_road_mask(image, ROAD_MODEL_ID, max_side=segmentation_max_side)
if source_path is not None and road_mask_img is not None:
ROAD_MASK_CACHE[cache_key_r] = road_mask_img
if road_mask_img is not None:
road_mask_resized = (
np.array(road_mask_img)
if isinstance(road_mask_img, np.ndarray)
else np.array(road_mask_img)
)
road_mask_resized = (
Image.fromarray(road_mask_resized.astype(np.uint8) * 255)
.resize((depth.shape[1], depth.shape[0]), resample=Image.NEAREST)
)
road_mask_resized = np.array(road_mask_resized) > 0
roof_mask_resized = None
if use_roof_mask:
# Depth-based elevation mask: closer-than-median surfaces are treated as roofs/structures.
aggressiveness = max(0.5, min(3.0, roof_aggressiveness))
morph_k = max(3, int(round(patch_px * roof_morph_frac)))
roof_mask_resized = compute_roof_mask_depth(depth, aggressiveness=aggressiveness, morph_kernel=morph_k)
box, std_map, grad_norm, grad_mask, landing_mask = pick_flat_patch(
depth,
patch=patch_px,
std_thresh=std_thresh,
grad_thresh=grad_thresh,
water_mask=water_mask_resized,
)
if road_mask_resized is not None:
landing_mask = landing_mask & (~road_mask_resized)
if roof_mask_resized is not None:
landing_mask = landing_mask & (~roof_mask_resized)
safe_mask = (std_map < std_thresh) & (grad_norm < grad_thresh) & landing_mask
# Clearance: dilate hazards to enforce buffer around unsafe regions
try:
clearance_px = max(1, int(round(clearance_factor * patch_px)))
if clearance_px % 2 == 0:
clearance_px += 1
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (clearance_px, clearance_px))
hazard = (~safe_mask).astype(np.uint8)
buffered = cv2.dilate(hazard, kernel, iterations=1).astype(bool)
safe_mask = safe_mask & (~buffered)
except Exception:
pass
# Strict footprint coverage: a center is safe only if the full footprint is safe
try:
coverage = cv2.boxFilter(
safe_mask.astype(np.float32),
ddepth=-1,
ksize=(patch_px, patch_px),
normalize=True,
anchor=(patch_px // 2, patch_px // 2),
)
safe_mask = coverage >= max(0.0, min(1.0, coverage_strictness))
except Exception:
pass
# Drop tiny components: require at least one footprint area
area_thresh = max(1, int(patch_px * patch_px * max(0.1, min_component_multiplier)))
num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(safe_mask.astype(np.uint8), connectivity=8)
if num_labels > 1:
keep = np.zeros_like(labels, dtype=bool)
for i in range(1, num_labels):
if stats[i, cv2.CC_STAT_AREA] >= area_thresh:
keep |= labels == i
safe_mask = keep
# Recommended landing spot overlay (scaled to input image size)
# Prefer centers where the full footprint is safe; fall back to best flat spot
safe_fit = safe_mask.astype(np.float32)
try:
coverage = cv2.boxFilter(
safe_fit.astype(np.float32),
ddepth=-1,
ksize=(patch_px, patch_px),
normalize=True,
anchor=(patch_px // 2, patch_px // 2),
)
valid_centers = coverage >= 1.0
except Exception:
valid_centers = safe_fit > 0.5
if valid_centers.any():
cc_mask = valid_centers.astype(np.uint8)
num_c, labels_c, stats_c, _ = cv2.connectedComponentsWithStats(cc_mask, connectivity=8)
target_mask = valid_centers
if num_c > 1:
# Pick largest safe component by area (skip background)
areas = stats_c[1:, cv2.CC_STAT_AREA]
largest_idx = 1 + int(np.argmax(areas))
target_mask = labels_c == largest_idx
cand = np.where(target_mask)
std_cand = std_map[cand]
idx = np.argmin(std_cand)
cy, cx = cand[0][idx], cand[1][idx]
else:
y0, x0, y1, x1 = box[1], box[0], box[3], box[2]
cy, cx = (y0 + y1) // 2, (x0 + x1) // 2
half = patch_px // 2
x0 = max(int(cx - half), 0)
x1 = min(int(cx + half), depth.shape[1] - 1)
y0 = max(int(cy - half), 0)
y1 = min(int(cy + half), depth.shape[0] - 1)
scale_x = image.width / depth.shape[1]
scale_y = image.height / depth.shape[0]
# Draw a box whose side length matches the footprint in input-image pixels
side_img = max(3, int(round(patch_px * scale_x)))
cx_img = int(round(cx * scale_x))
cy_img = int(round(cy * scale_y))
half_img = side_img // 2
bx0 = max(cx_img - half_img, 0)
bx1 = min(cx_img + half_img, image.width - 1)
by0 = max(cy_img - half_img, 0)
by1 = min(cy_img + half_img, image.height - 1)
spot_overlay = Image.new("RGBA", image.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(spot_overlay)
draw.rectangle((bx0, by0, bx1, by1), outline=(0, 255, 0, 255), width=4)
cx, cy = (bx0 + bx1) // 2, (by0 + by1) // 2
draw.ellipse((cx - 5, cy - 5, cx + 5, cy + 5), fill=(0, 255, 0, 255))
depth_vis = Image.fromarray(visualize_depth(depth_raw, cmap="Spectral")).resize(
image.size, resample=Image.BILINEAR
)
flatness_img = Image.fromarray((std_map_vis / (std_map_vis.max() + 1e-6) * 255).astype(np.uint8)).resize(
image.size, resample=Image.NEAREST
)
grad_img = Image.fromarray((grad_norm * 255).astype(np.uint8)).resize(
image.size, resample=Image.BILINEAR
)
grad_mask_img = Image.fromarray(((grad_norm < grad_thresh).astype(np.uint8) * 255)).resize(
image.size, resample=Image.NEAREST
)
water_mask_view = None
if use_water_mask and water_mask_img is not None:
water_mask_view = Image.fromarray((np.array(water_mask_img).astype(np.uint8) * 255))
water_mask_view = water_mask_view.resize(image.size, resample=Image.NEAREST)
road_mask_view = None
if use_road_mask and road_mask_img is not None:
road_mask_view = Image.fromarray((np.array(road_mask_img).astype(np.uint8) * 255))
road_mask_view = road_mask_view.resize(image.size, resample=Image.NEAREST)
roof_mask_view = None
if use_roof_mask and roof_mask_resized is not None:
roof_mask_view = Image.fromarray((roof_mask_resized.astype(np.uint8) * 255))
roof_mask_view = roof_mask_view.resize(image.size, resample=Image.NEAREST)
heat_overlay, heat_gray = make_safety_heatmap(image, safe_mask)
images = {
"RGB": image,
"Depth": depth_vis,
"Flatness map (std)": flatness_img,
"Depth gradient": grad_img,
"Gradient mask": grad_mask_img,
"Water mask": water_mask_view if water_mask_view is not None else Image.new("L", image.size, 0),
"Road mask": road_mask_view if road_mask_view is not None else Image.new("L", image.size, 0),
"Roof mask": roof_mask_view if roof_mask_view is not None else Image.new("L", image.size, 0),
"Safety heatmap overlay": heat_overlay,
"Safety score": heat_gray,
"Landing spot overlay": spot_overlay,
}
return images
def process_image(
input_path: str,
footprint_m: float,
std_thresh: float,
grad_thresh: float,
use_water_mask: bool,
use_road_mask: bool,
use_roof_mask: bool,
altitude_m: float,
fov_deg: float,
flatness_detail: float,
clearance_factor: float,
process_res_cap: int,
roof_aggressiveness: float,
roof_morph_frac: float,
segmentation_max_side: int,
depth_smoothing_base: float,
coverage_strictness: float,
min_component_multiplier: float,
model_id: str,
source_path: str | None = None,
) -> dict:
path = Path(input_path)
if not path.exists():
raise gr.Error(f"Input path not found: {path}")
if path.suffix.lower() not in IMAGE_EXTS:
raise gr.Error(f"Unsupported image type for path: {path}")
image = crop_nonblack(Image.open(path).convert("RGB"))
return run_on_image(
image=image,
footprint_m=footprint_m,
std_thresh=std_thresh,
grad_thresh=grad_thresh,
use_water_mask=use_water_mask,
use_road_mask=use_road_mask,
use_roof_mask=use_roof_mask,
altitude_m=altitude_m,
fov_deg=fov_deg,
flatness_detail=flatness_detail,
clearance_factor=clearance_factor,
process_res_cap=process_res_cap,
roof_aggressiveness=roof_aggressiveness,
roof_morph_frac=roof_morph_frac,
segmentation_max_side=segmentation_max_side,
depth_smoothing_base=depth_smoothing_base,
coverage_strictness=coverage_strictness,
min_component_multiplier=min_component_multiplier,
model_id=model_id,
source_path=str(path),
)
def compose_view(
images_dict: dict,
base_view: str,
heat_on: bool,
heat_alpha: float,
grad_on: bool,
grad_alpha: float,
flat_on: bool,
flat_alpha: float,
water_on: bool,
water_alpha: float,
water_enabled: bool,
spot_on: bool,
road_on: bool,
road_alpha: float,
road_enabled: bool,
roof_on: bool,
roof_alpha: float,
roof_enabled: bool,
) -> Image.Image:
"""Return a composited view with per-layer alpha controls."""
if not images_dict:
raise gr.Error("Run inference first, then select a view.")
if base_view not in images_dict:
raise gr.Error(f"Unknown view: {base_view}")
base = images_dict.get(base_view)
if base is None:
raise gr.Error(f"No image for view: {base_view}")
out = base.convert("RGBA")
if heat_on and "Safety heatmap overlay" in images_dict:
heat = images_dict["Safety heatmap overlay"]
if heat is not None:
heat_rgba = heat.convert("RGBA")
alpha = int(min(max(heat_alpha, 0.0), 1.0) * 255)
heat_rgba.putalpha(alpha)
out = Image.alpha_composite(out, heat_rgba)
if grad_on and "Depth gradient" in images_dict:
grad_img = images_dict["Depth gradient"]
if grad_img is not None:
grad_rgba = grad_img.convert("RGBA")
grad_rgba.putalpha(int(min(max(grad_alpha, 0.0), 1.0) * 255))
out = Image.alpha_composite(out, grad_rgba)
if flat_on and "Flatness map (std)" in images_dict:
flat_img = images_dict["Flatness map (std)"]
if flat_img is not None:
flat_rgba = flat_img.convert("RGBA")
flat_rgba.putalpha(int(min(max(flat_alpha, 0.0), 1.0) * 255))
out = Image.alpha_composite(out, flat_rgba)
if water_on and water_enabled and "Water mask" in images_dict:
wm = images_dict["Water mask"]
if wm is not None:
m = wm.convert("L")
overlay = Image.new("RGBA", wm.size, (255, 0, 0, 0))
alpha = int(min(max(water_alpha, 0.0), 1.0) * 255)
overlay.putalpha(Image.eval(m, lambda px: int(px * (alpha / 255.0))))
out = Image.alpha_composite(out, overlay)
if road_on and road_enabled and "Road mask" in images_dict:
rm = images_dict["Road mask"]
if rm is not None:
m = rm.convert("L")
overlay = Image.new("RGBA", rm.size, (255, 165, 0, 0)) # orange
alpha = int(min(max(road_alpha, 0.0), 1.0) * 255)
overlay.putalpha(Image.eval(m, lambda px: int(px * (alpha / 255.0))))
out = Image.alpha_composite(out, overlay)
if roof_on and roof_enabled and "Roof mask" in images_dict:
rf = images_dict["Roof mask"]
if rf is not None:
m = rf.convert("L")
overlay = Image.new("RGBA", rf.size, (255, 0, 255, 0)) # magenta tint for roofs
alpha = int(min(max(roof_alpha, 0.0), 1.0) * 255)
overlay.putalpha(Image.eval(m, lambda px: int(px * (alpha / 255.0))))
out = Image.alpha_composite(out, overlay)
if spot_on and "Landing spot overlay" in images_dict:
spot = images_dict["Landing spot overlay"]
if spot is not None:
out = Image.alpha_composite(out, spot.convert("RGBA"))
return out.convert("RGB")
def build_ui():
with gr.Blocks(title="Landing Site Safety Analyzer (VISLOC)") as demo:
gr.Markdown(
"## Landing Site Safety Analyzer\n"
"Run DepthAnything3 on VISLOC images under `data/Image/VISLOC` to evaluate landing zones: depth, safety heatmap, gradients, flatness, and water masks. Toggle layers, footprint, and opacity to assess safety."
)
with gr.Row():
with gr.Column(scale=1, min_width=320):
gr.Markdown("### Input")
all_choices = list_all_data_inputs()
input_path = gr.Dropdown(
label="Input file",
choices=all_choices,
value=all_choices[0] if all_choices else "",
info="Pick any VISLOC image under data/Image/VISLOC/.",
)
footprint_m = gr.Slider(
label="Landing footprint (meters)",
value=10,
minimum=1,
maximum=150,
step=1,
info="Side length (meters) of the clear area required for landing (assumes ~450m altitude, 90° FOV).",
)
std_thresh = gr.Slider(
label="Flatness threshold",
value=0.01,
minimum=0.001,
maximum=0.08,
step=0.001,
info="Lower values favor flatter regions when computing the heatmap.",
)
grad_thresh = gr.Slider(
label="Gradient threshold",
value=0.1,
minimum=0.02,
maximum=1.0,
step=0.01,
info="Lower values suppress sloped/edgy areas in the heatmap.",
)
flatness_detail = gr.Slider(
label="Flatness detail (relative)",
value=1.0,
minimum=0.5,
maximum=2.5,
step=0.1,
info="Scales the window for the flatness visualization; lower = finer detail.",
)
clearance_factor = gr.Slider(
label="Clearance factor",
value=0.5,
minimum=0.0,
maximum=2.0,
step=0.05,
info="How much to dilate unsafe regions relative to the footprint to enforce buffer distance.",
)
process_res_cap = gr.Slider(
label="Processing resolution cap",
value=1024,
minimum=512,
maximum=2048,
step=32,
info="Upper bound on the longest side fed to the depth model; avoids oversized, noisy inference.",
)
depth_smoothing_base = gr.Slider(
label="Depth smoothing base",
value=0.8,
minimum=0.0,
maximum=2.0,
step=0.05,
info="Base Gaussian sigma multiplier for depth smoothing (scaled by resolution).",
)
coverage_strictness = gr.Slider(
label="Coverage strictness",
value=0.999,
minimum=0.8,
maximum=1.0,
step=0.001,
info="Minimum fraction of a footprint that must be safe to count a center as safe.",
)
min_component_multiplier = gr.Slider(
label="Min safe area (x footprint)",
value=1.0,
minimum=0.1,
maximum=5.0,
step=0.1,
info="Minimum safe component area in multiples of footprint^2.",
)
segmentation_max_side = gr.Slider(
label="Segmentation max side",
value=640,
minimum=256,
maximum=1024,
step=32,
info="Resize longest image side to this for water/road segmentation.",
)
with gr.Accordion("Camera settings", open=False):
altitude_m = gr.Slider(
label="Camera altitude (m)",
value=450,
minimum=10,
maximum=1500,
step=5,
info="Altitude used to convert footprint meters to pixels.",
)
fov_deg = gr.Slider(
label="Camera FOV (deg)",
value=90,
minimum=30,
maximum=150,
step=1,
info="Horizontal field of view used for footprint sizing.",
)
model_id = gr.Dropdown(
label="Model",
value="depth-anything/DA3MONO-LARGE",
choices=[
"depth-anything/DA3MONO-LARGE",
"depth-anything/DA3METRIC-LARGE",
"depth-anything/DA3-BASE",
"depth-anything/DA3NESTED-GIANT-LARGE",
],
info="Which pretrained DepthAnything3 checkpoint to use.",
)
with gr.Accordion("Masking", open=True):
with gr.Row():
use_water_mask = gr.Checkbox(
label="Exclude water (segmentation)", value=True, info="Apply water segmentation to down-weight water regions."
)
use_road_mask = gr.Checkbox(
label="Exclude roads (segmentation)", value=True, info="Apply road segmentation to avoid roads/highways."
)
use_roof_mask = gr.Checkbox(
label="Exclude rooftops (depth)", value=True, info="Use depth (closer-than-median) to avoid rooftops/raised structures."
)
roof_aggressiveness = gr.Slider(
label="Rooftop aggressiveness (MAD multiplier)",
value=1.3,
minimum=0.5,
maximum=3.0,
step=0.05,
info="Higher = more aggressive exclusion of raised areas in the depth-based rooftop mask.",
)
roof_morph_frac = gr.Slider(
label="Rooftop morph kernel (fraction of footprint px)",
value=0.15,
minimum=0.05,
maximum=0.5,
step=0.01,
info="Controls smoothing/merging of rooftop mask relative to footprint size.",
)
with gr.Row():
run_btn = gr.Button("Run", variant="primary", scale=1)
stop_btn = gr.Button("Stop", variant="stop", scale=1)
images_state = gr.State({})
with gr.Column(scale=3):
gr.Markdown("### Preview")
main_view = gr.Image(
label="Preview",
height=800,
elem_id="main-preview",
show_fullscreen_button=False,
)
gr.HTML(
"""
<style>
#main-preview img,
#main-preview canvas { cursor: zoom-in; }
#main-preview-zoom-overlay {
position: fixed;
inset: 0;
z-index: 1000;
display: none;
align-items: center;
justify-content: center;
background: rgba(0, 0, 0, 0.85);
}
#main-preview-zoom-overlay img {
max-width: 95vw;
max-height: 95vh;
box-shadow: 0 0 24px rgba(0, 0, 0, 0.6);
}
</style>
<div id="main-preview-zoom-overlay"></div>
<script>
(() => {
const containerId = "main-preview";
const overlayId = "main-preview-zoom-overlay";
const ensureOverlay = () => {
let overlay = document.getElementById(overlayId);
if (!overlay) {
overlay = document.createElement("div");
overlay.id = overlayId;
document.body.appendChild(overlay);
}
overlay.onclick = () => {
overlay.style.display = "none";
overlay.innerHTML = "";
};
return overlay;
};
const getMedia = (container) => {
if (!container) return null;
const img = container.querySelector("img");
if (img) return { type: "img", el: img, getSrc: () => img.currentSrc || img.src };
const canvas = container.querySelector("canvas");
if (canvas) return { type: "canvas", el: canvas, getSrc: () => canvas.toDataURL("image/png") };
return null;
};
const bind = () => {
const container = document.getElementById(containerId);
if (!container || container.dataset.zoomBound) return;
container.dataset.zoomBound = "1";
container.addEventListener("click", (ev) => {
const media = getMedia(container);
if (!media) return;
const src = media.getSrc();
if (!src) return;
const overlay = ensureOverlay();
overlay.innerHTML = "";
const zoomed = document.createElement("img");
zoomed.src = src;
overlay.appendChild(zoomed);
overlay.style.display = "flex";
ev.stopPropagation();
});
};
// Poll because Gradio swaps the image element on updates.
const interval = setInterval(() => {
const media = getMedia(document.getElementById(containerId));
if (media && media.el && !media.el.dataset.cursorSet) {
media.el.dataset.cursorSet = "1";
media.el.style.cursor = "zoom-in";
}
bind();
}, 500);
window.addEventListener("beforeunload", () => clearInterval(interval));
})();
</script>
""",
elem_id="main-preview-zoom-helper",
)
with gr.Column(scale=1, min_width=260):
gr.Markdown("### Overlays")
base_view = gr.Dropdown(
label="Base view",
value="RGB",
choices=[
"RGB",
"Depth",
"Flatness map (std)",
"Depth gradient",
"Gradient mask",
"Water mask",
"Safety score",
"Safety heatmap overlay",
],
)
heat_on = gr.Checkbox(label="Heatmap", value=True, info="Show the safety heatmap overlay.")
heat_alpha = gr.Slider(
label="Heatmap alpha", value=0.15, minimum=0.0, maximum=1.0, step=0.05, info="Heatmap opacity."
)
grad_on = gr.Checkbox(label="Depth gradient", value=False, info="Overlay the depth gradient magnitude.")
grad_alpha = gr.Slider(
label="Gradient alpha", value=0.35, minimum=0.0, maximum=1.0, step=0.05, info="Gradient overlay opacity."
)
flat_on = gr.Checkbox(label="Flatness map", value=False, info="Overlay per-pixel flatness (std).")
flat_alpha = gr.Slider(
label="Flatness alpha", value=0.25, minimum=0.0, maximum=1.0, step=0.05, info="Flatness overlay opacity."
)
spot_on = gr.Checkbox(label="Show landing spot", value=True, info="Overlay the recommended landing box.")
with gr.Accordion("Mask overlays", open=True):
water_on = gr.Checkbox(label="Water mask overlay", value=False, info="Overlay detected water regions.")
water_alpha = gr.Slider(
label="Water mask alpha",
value=0.5,
minimum=0.0,
maximum=1.0,
step=0.05,
info="Water overlay opacity.",
)
road_on = gr.Checkbox(label="Road mask overlay", value=False, info="Overlay detected road regions.")
road_alpha = gr.Slider(
label="Road mask alpha",
value=0.5,
minimum=0.0,
maximum=1.0,
step=0.05,
info="Road overlay opacity.",
)
roof_on = gr.Checkbox(label="Roof mask overlay", value=False, info="Overlay detected roof regions.")
roof_alpha = gr.Slider(
label="Roof mask alpha",
value=0.5,
minimum=0.0,
maximum=1.0,
step=0.05,
info="Roof overlay opacity.",
)
def process_any(
input_path,
footprint_m,
std_thresh,
grad_thresh,
use_water_mask,
use_road_mask,
use_roof_mask,
altitude_m,
fov_deg,
flatness_detail,
clearance_factor,
model_id,
base_view,
heat_on,
heat_alpha,
grad_on,
grad_alpha,
flat_on,
flat_alpha,
water_on,
water_alpha,
spot_on,
road_on,
road_alpha,
roof_on,
roof_alpha,
):
if not input_path:
raise gr.Error("Select an input image first.")
path = Path(input_path)
if not path.exists():
raise gr.Error(f"Input not found: {path}")
if path.suffix.lower() in IMAGE_EXTS:
imgs = process_image(
input_path=str(path),
footprint_m=footprint_m,
std_thresh=std_thresh,
grad_thresh=grad_thresh,
use_water_mask=use_water_mask,
use_road_mask=use_road_mask,
use_roof_mask=use_roof_mask,
altitude_m=altitude_m,
fov_deg=fov_deg,
flatness_detail=flatness_detail,
clearance_factor=clearance_factor,
model_id=model_id,
source_path=str(path),
)
composed = compose_view(
imgs,
base_view,
heat_on,
heat_alpha,
grad_on,
grad_alpha,
flat_on,
flat_alpha,
water_on,
water_alpha,
water_enabled=use_water_mask,
road_on=road_on,
road_alpha=road_alpha,
road_enabled=use_road_mask,
roof_on=roof_on,
roof_alpha=roof_alpha,
roof_enabled=use_roof_mask,
spot_on=spot_on,
)
yield imgs, composed
else:
raise gr.Error(f"Unsupported input type for path: {path} (images only)")
run_event = run_btn.click(
fn=process_any,
inputs=[
input_path,
footprint_m,
std_thresh,
grad_thresh,
use_water_mask,
use_road_mask,
use_roof_mask,
altitude_m,
fov_deg,
flatness_detail,
clearance_factor,
model_id,
base_view,
heat_on,
heat_alpha,
grad_on,
grad_alpha,
flat_on,
flat_alpha,
water_on,
water_alpha,
spot_on,
road_on,
road_alpha,
roof_on,
roof_alpha,
],
outputs=[images_state, main_view],
)
stop_btn.click(fn=None, inputs=None, outputs=None, cancels=[run_event])
def update_preview_ui(
images_state_val,
input_path_val,
footprint_m_val,
std_thresh_val,
grad_thresh_val,
use_water_mask_val,
use_road_mask_val,
use_roof_mask_val,
altitude_m_val,
fov_deg_val,
flatness_detail_val,
clearance_factor_val,
model_id_val,
base_view_val,
heat_on_val,
heat_alpha_val,
grad_on_val,
grad_alpha_val,
flat_on_val,
flat_alpha_val,
water_on_val,
water_alpha_val,
spot_on_val,
road_on_val,
road_alpha_val,
roof_on_val,
roof_alpha_val,
):
path = Path(str(input_path_val))
imgs_val = images_state_val
# If current input is an image, re-run processing to reflect new settings
if path.exists() and path.suffix.lower() in IMAGE_EXTS:
try:
imgs_val = process_image(
input_path=str(path),
footprint_m=footprint_m_val,
std_thresh=std_thresh_val,
grad_thresh=grad_thresh_val,
use_water_mask=use_water_mask_val,
use_road_mask=use_road_mask_val,
use_roof_mask=use_roof_mask_val,
altitude_m=altitude_m_val,
fov_deg=fov_deg_val,
flatness_detail=flatness_detail_val,
clearance_factor=clearance_factor_val,
model_id=model_id_val,
)
except Exception:
imgs_val = images_state_val
if not imgs_val:
return images_state_val, gr.update()
composed = compose_view(
imgs_val,
base_view_val,
heat_on_val,
heat_alpha_val,
grad_on_val,
grad_alpha_val,
flat_on_val,
flat_alpha_val,
water_on_val,
water_alpha_val,
use_water_mask_val,
spot_on_val,
road_on_val,
road_alpha_val,
use_road_mask_val,
roof_on_val,
roof_alpha_val,
use_roof_mask_val,
)
return imgs_val, composed
overlay_inputs = [
images_state,
base_view,
heat_on,
heat_alpha,
grad_on,
grad_alpha,
flat_on,
flat_alpha,
water_on,
water_alpha,
spot_on,
use_water_mask,
road_on,
road_alpha,
use_road_mask,
roof_on,
roof_alpha,
use_roof_mask,
]
def update_overlays_only(
images_state_val,
base_view_val,
heat_on_val,
heat_alpha_val,
grad_on_val,
grad_alpha_val,
flat_on_val,
flat_alpha_val,
water_on_val,
water_alpha_val,
spot_on_val,
use_water_mask_val,
road_on_val,
road_alpha_val,
use_road_mask_val,
roof_on_val,
roof_alpha_val,
use_roof_mask_val,
):
if not images_state_val:
return images_state_val, gr.update()
return images_state_val, compose_view(
images_state_val,
base_view_val,
heat_on_val,
heat_alpha_val,
grad_on_val,
grad_alpha_val,
flat_on_val,
flat_alpha_val,
water_on_val,
water_alpha_val,
use_water_mask_val,
spot_on_val,
road_on_val,
road_alpha_val,
use_road_mask_val,
roof_on_val,
roof_alpha_val,
use_roof_mask_val,
)
base_view.change(fn=update_overlays_only, inputs=overlay_inputs, outputs=[images_state, main_view])
for control in (
heat_on,
heat_alpha,
grad_on,
grad_alpha,
flat_on,
flat_alpha,
water_on,
water_alpha,
spot_on,
use_water_mask,
road_on,
road_alpha,
use_road_mask,
roof_on,
roof_alpha,
use_roof_mask,
):
control.change(fn=update_overlays_only, inputs=overlay_inputs, outputs=[images_state, main_view])
model_inputs = [
images_state,
input_path,
footprint_m,
std_thresh,
grad_thresh,
use_water_mask,
use_road_mask,
use_roof_mask,
altitude_m,
fov_deg,
flatness_detail,
clearance_factor,
model_id,
base_view,
heat_on,
heat_alpha,
grad_on,
grad_alpha,
flat_on,
flat_alpha,
water_on,
water_alpha,
spot_on,
road_on,
road_alpha,
roof_on,
roof_alpha,
]
for control in (
input_path,
footprint_m,
std_thresh,
grad_thresh,
use_water_mask,
use_road_mask,
use_roof_mask,
altitude_m,
fov_deg,
flatness_detail,
clearance_factor,
model_id,
):
control.change(fn=update_preview_ui, inputs=model_inputs, outputs=[images_state, main_view])
return demo
if __name__ == "__main__":
demo = build_ui()
demo.queue().launch()