yakvrz commited on
Commit
05c6078
·
1 Parent(s): 34a328a

Add app module and helpers (depth pipeline, data sources, viz)

Browse files
app/__init__.py ADDED
File without changes
app/data_sources.py ADDED
@@ -0,0 +1,51 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ from functools import lru_cache
4
+ from pathlib import Path
5
+
6
+ from .config import HAGDAVS_DIR, IMAGE_EXTS, VIDEO_DIR, VIDEO_EXTS, VISLOC_DIR
7
+
8
+
9
+ @lru_cache(maxsize=1)
10
+ def list_visloc_images() -> list[Path]:
11
+ if not VISLOC_DIR.exists():
12
+ return []
13
+ files = [p for p in VISLOC_DIR.iterdir() if p.suffix in IMAGE_EXTS]
14
+ return sorted(files)
15
+
16
+
17
+ @lru_cache(maxsize=1)
18
+ def list_hagdavs_images() -> list[Path]:
19
+ if not HAGDAVS_DIR.exists():
20
+ return []
21
+ files = [p for p in HAGDAVS_DIR.iterdir() if p.suffix in IMAGE_EXTS]
22
+ return sorted(files)
23
+
24
+
25
+ @lru_cache(maxsize=1)
26
+ def list_videos() -> list[Path]:
27
+ if not VIDEO_DIR.exists():
28
+ return []
29
+ files = [p for p in VIDEO_DIR.iterdir() if p.suffix.lower() in VIDEO_EXTS]
30
+ return sorted(files)
31
+
32
+
33
+ @lru_cache(maxsize=1)
34
+ def list_all_data_inputs() -> list[str]:
35
+ return [str(p) for p in list_visloc_images()]
36
+
37
+
38
+ def clear_caches() -> None:
39
+ list_visloc_images.cache_clear()
40
+ list_hagdavs_images.cache_clear()
41
+ list_videos.cache_clear()
42
+ list_all_data_inputs.cache_clear()
43
+
44
+
45
+ __all__ = [
46
+ "list_visloc_images",
47
+ "list_hagdavs_images",
48
+ "list_videos",
49
+ "list_all_data_inputs",
50
+ "clear_caches",
51
+ ]
app/depth_pipeline.py ADDED
@@ -0,0 +1,190 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import functools
4
+ import math
5
+ from pathlib import Path
6
+ from typing import Tuple
7
+
8
+ import cv2
9
+ import numpy as np
10
+ import torch
11
+ from PIL import Image
12
+
13
+ try: # pragma: no cover - optional dependency resolution
14
+ from depth_anything_3.api import DepthAnything3 # type: ignore
15
+ from depth_anything_3.utils.visualize import visualize_depth # type: ignore
16
+ except ModuleNotFoundError: # pragma: no cover
17
+ import sys
18
+
19
+ ROOT = Path(__file__).resolve().parents[1]
20
+ sys.path.append(str(ROOT / "src"))
21
+ from depth_anything_3.api import DepthAnything3 # type: ignore # noqa: E402
22
+ from depth_anything_3.utils.visualize import visualize_depth # type: ignore # noqa: E402
23
+
24
+
25
+ def crop_nonblack(img: Image.Image, frac: float = 0.05) -> Image.Image:
26
+ w, h = img.size
27
+ dx = int(round(w * frac))
28
+ dy = int(round(h * frac))
29
+ return img.crop((dx, dy, w - dx, h - dy))
30
+
31
+
32
+ def compute_roof_mask_depth(depth: np.ndarray, aggressiveness: float = 1.3, morph_kernel: int = 5) -> np.ndarray:
33
+ d = depth.astype(np.float32)
34
+ med = np.median(d)
35
+ mad = np.median(np.abs(d - med)) + 1e-6
36
+ threshold = med - aggressiveness * mad
37
+ mask = d < threshold
38
+ mask = mask.astype(np.uint8)
39
+ k = max(1, int(morph_kernel))
40
+ if k % 2 == 0:
41
+ k += 1
42
+ kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k, k))
43
+ try:
44
+ mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
45
+ mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
46
+ except Exception:
47
+ pass
48
+ return mask > 0
49
+
50
+
51
+ def fit_plane_ransac(points: np.ndarray, values: np.ndarray, iterations: int = 200, threshold: float = 0.01):
52
+ best_coef = None
53
+ best_inliers = -1
54
+ n_samples = points.shape[0]
55
+ if n_samples < 3:
56
+ return None
57
+ for _ in range(iterations):
58
+ idx = np.random.choice(n_samples, 3, replace=False)
59
+ A = np.concatenate([points[idx], np.ones((3, 1))], axis=1)
60
+ try:
61
+ coef = np.linalg.lstsq(A, values[idx], rcond=None)[0]
62
+ except np.linalg.LinAlgError:
63
+ continue
64
+ residuals = np.abs(points[:, 0] * coef[0] + points[:, 1] * coef[1] + coef[2] - values.flatten())
65
+ inliers = np.sum(residuals < threshold)
66
+ if inliers > best_inliers:
67
+ best_inliers = inliers
68
+ best_coef = coef
69
+ return best_coef
70
+
71
+
72
+ def remove_global_plane(depth: np.ndarray) -> np.ndarray:
73
+ if depth.ndim != 2:
74
+ return depth
75
+ h, w = depth.shape
76
+ yy, xx = np.mgrid[0:h, 0:w].astype(np.float32)
77
+ points = np.stack((xx.flatten(), yy.flatten()), axis=1)
78
+ values = depth.astype(np.float32).reshape(-1, 1)
79
+ coef = fit_plane_ransac(points, values, iterations=300, threshold=0.01 * np.ptp(depth))
80
+ if coef is None:
81
+ return depth
82
+ plane = (points @ coef[:2] + coef[2]).reshape(h, w)
83
+ return depth - plane
84
+
85
+
86
+ def pick_flat_patch(
87
+ depth: np.ndarray,
88
+ patch: int = 96,
89
+ std_thresh: float = 0.03,
90
+ grad_thresh: float = 0.35,
91
+ water_mask: np.ndarray | None = None,
92
+ ):
93
+ depth = depth.astype(np.float32)
94
+ if depth.ndim != 2:
95
+ raise ValueError("Depth map must be 2D (H, W)")
96
+
97
+ patch = max(3, min(patch, min(depth.shape)))
98
+ if patch % 2 == 0:
99
+ patch += 1
100
+ depth_norm = (depth - depth.min()) / (np.ptp(depth) + 1e-6)
101
+
102
+ import torch.nn.functional as F
103
+
104
+ def box_mean(arr, k):
105
+ pad = k // 2
106
+ t = torch.from_numpy(arr).unsqueeze(0).unsqueeze(0)
107
+ t = F.pad(t, (pad, pad, pad, pad), mode="reflect")
108
+ mean = F.avg_pool2d(t, kernel_size=k, stride=1, padding=0, count_include_pad=False)
109
+ return mean.squeeze(0).squeeze(0).numpy()
110
+
111
+ mean = box_mean(depth_norm, patch)
112
+ mean_sq = box_mean(depth_norm * depth_norm, patch)
113
+ var = np.maximum(mean_sq - mean * mean, 0.0)
114
+ std_map = np.sqrt(var)
115
+
116
+ dy, dx = np.gradient(depth_norm)
117
+ grad = np.sqrt(dx * dx + dy * dy)
118
+ grad_ref = np.percentile(grad, 95) + 1e-6
119
+ grad_norm = np.clip(grad / grad_ref, 0.0, 1.0)
120
+ grad_mask = grad_norm < grad_thresh
121
+
122
+ landing_mask = grad_mask
123
+ if water_mask is not None and water_mask.shape == grad_mask.shape:
124
+ landing_mask = landing_mask & (~water_mask)
125
+
126
+ masked_std = np.where(landing_mask, std_map, np.inf)
127
+ if not np.isfinite(masked_std).any():
128
+ masked_std = std_map
129
+ y, x = np.unravel_index(np.argmin(masked_std), masked_std.shape)
130
+ half = patch // 2
131
+ y0, y1 = max(y - half, 0), min(y + half, depth.shape[0] - 1)
132
+ x0, x1 = max(x - half, 0), min(x + half, depth.shape[1] - 1)
133
+ return (x0, y0, x1, y1), std_map, grad_norm, grad_mask, landing_mask
134
+
135
+
136
+ class DepthEngine:
137
+ """Caches DepthAnything models and runs inference at bounded resolution."""
138
+
139
+ def __init__(self):
140
+ self._model_cache: dict[str, tuple[DepthAnything3, torch.device]] = {}
141
+
142
+ def _load_model(self, model_id: str) -> tuple[DepthAnything3, torch.device]:
143
+ device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
144
+ model = DepthAnything3.from_pretrained(model_id).to(device)
145
+ model.eval()
146
+ return model, device
147
+
148
+ def get_model(self, model_id: str) -> tuple[DepthAnything3, torch.device]:
149
+ if model_id not in self._model_cache:
150
+ self._model_cache[model_id] = self._load_model(model_id)
151
+ return self._model_cache[model_id]
152
+
153
+ def predict_depth(
154
+ self, image: np.ndarray, model_id: str, process_res_cap: int
155
+ ) -> tuple[np.ndarray, np.ndarray, int]:
156
+ model, device = self.get_model(model_id)
157
+ process_res = min(max(image.shape[0], image.shape[1]), int(process_res_cap))
158
+ with torch.inference_mode():
159
+ pred = model.inference(
160
+ image=[image],
161
+ process_res=process_res,
162
+ process_res_method="upper_bound_resize",
163
+ export_dir=None,
164
+ )
165
+ depth_raw = np.array(pred.depth[0])
166
+ depth = remove_global_plane(depth_raw)
167
+ return depth_raw, depth, process_res
168
+
169
+
170
+ def smooth_depth(depth: np.ndarray, sigma: float) -> np.ndarray:
171
+ if sigma <= 0:
172
+ return depth
173
+ k = max(3, int(round(sigma * 3)) * 2 + 1)
174
+ try:
175
+ depth = cv2.GaussianBlur(depth, (k, k), sigmaX=sigma, sigmaY=sigma)
176
+ except Exception:
177
+ pass
178
+ return depth
179
+
180
+
181
+ __all__ = [
182
+ "DepthEngine",
183
+ "compute_roof_mask_depth",
184
+ "crop_nonblack",
185
+ "fit_plane_ransac",
186
+ "pick_flat_patch",
187
+ "remove_global_plane",
188
+ "smooth_depth",
189
+ "visualize_depth",
190
+ ]
app/visualization.py ADDED
@@ -0,0 +1,214 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ from typing import Dict, Tuple
4
+
5
+ import numpy as np
6
+ from PIL import Image, ImageDraw
7
+
8
+ from .depth_pipeline import visualize_depth
9
+
10
+ GRAD_ALPHA = 0.35
11
+ FLAT_ALPHA = 0.25
12
+
13
+ def make_safety_heatmap(
14
+ rgb: Image.Image,
15
+ safe_mask: np.ndarray,
16
+ hazard_mask: np.ndarray,
17
+ risk_map: np.ndarray,
18
+ risk_threshold: float = 0.35,
19
+ ):
20
+ safe = np.clip(safe_mask.astype(np.float32), 0.0, 1.0)
21
+ hazard = hazard_mask.astype(bool)
22
+ risk = np.clip(risk_map.astype(np.float32), 0.0, 1.0)
23
+
24
+ h, w = safe.shape
25
+ safe_overlay = np.zeros((h, w, 4), dtype=np.uint8)
26
+ safe_pixels = safe > 0.0
27
+ safe_overlay[safe_pixels, 1] = 255
28
+ safe_overlay[safe_pixels, 3] = 255
29
+
30
+ risk_focus = np.zeros_like(risk)
31
+ risk_focus[risk > risk_threshold] = risk[risk > risk_threshold]
32
+ hazard_intensity = np.where(hazard, np.maximum(risk_focus, 1.0), risk_focus)
33
+ hazard_alpha = (np.clip(hazard_intensity, 0.0, 1.0) * 255).astype(np.uint8)
34
+ hazard_overlay = np.zeros((h, w, 4), dtype=np.uint8)
35
+ hazard_overlay[..., 0] = 255
36
+ hazard_overlay[..., 3] = hazard_alpha
37
+
38
+ safe_img = Image.fromarray(safe_overlay, mode="RGBA").resize(rgb.size, resample=Image.NEAREST)
39
+ hazard_img = Image.fromarray(hazard_overlay, mode="RGBA").resize(rgb.size, resample=Image.NEAREST)
40
+ score_gray = Image.fromarray((safe * 255).astype(np.uint8)).resize(rgb.size, resample=Image.NEAREST)
41
+ return safe_img, hazard_img, score_gray
42
+
43
+
44
+ def build_result_layers(
45
+ image: Image.Image,
46
+ depth_raw: np.ndarray,
47
+ std_map_vis: np.ndarray,
48
+ grad_norm: np.ndarray,
49
+ grad_thresh: float,
50
+ safe_mask: np.ndarray,
51
+ risk_map: np.ndarray,
52
+ footprint_img_px: int,
53
+ center_img: Tuple[int, int],
54
+ water_mask: np.ndarray | None,
55
+ road_mask: np.ndarray | None,
56
+ roof_mask: np.ndarray | None,
57
+ seg_mask_union: np.ndarray | None,
58
+ hazard_mask: np.ndarray,
59
+ ) -> Dict[str, Image.Image]:
60
+ depth_vis = Image.fromarray(visualize_depth(depth_raw, cmap="Spectral")).resize(
61
+ image.size, resample=Image.BILINEAR
62
+ )
63
+ flatness_img = Image.fromarray((std_map_vis / (std_map_vis.max() + 1e-6) * 255).astype(np.uint8)).resize(
64
+ image.size, resample=Image.NEAREST
65
+ )
66
+ grad_img = Image.fromarray((grad_norm * 255).astype(np.uint8)).resize(image.size, resample=Image.BILINEAR)
67
+ grad_mask_img = Image.fromarray(((grad_norm < grad_thresh).astype(np.uint8) * 255)).resize(
68
+ image.size, resample=Image.NEAREST
69
+ )
70
+
71
+ def _mask_to_image(mask: np.ndarray | None) -> Image.Image:
72
+ if mask is None:
73
+ return Image.new("L", image.size, 0)
74
+ return Image.fromarray((mask.astype(np.uint8) * 255)).resize(image.size, resample=Image.NEAREST)
75
+
76
+ water_mask_img = _mask_to_image(water_mask)
77
+ road_mask_img = _mask_to_image(road_mask)
78
+ roof_mask_img = _mask_to_image(roof_mask)
79
+ seg_mask_img = _mask_to_image(seg_mask_union) if seg_mask_union is not None else Image.new("L", image.size, 0)
80
+
81
+ safe_overlay, hazard_overlay, heat_gray = make_safety_heatmap(image, safe_mask, hazard_mask, risk_map)
82
+
83
+ spot_overlay = Image.new("RGBA", image.size, (0, 0, 0, 0))
84
+ draw = ImageDraw.Draw(spot_overlay)
85
+ cx_img, cy_img = center_img
86
+ side_img = max(3, footprint_img_px | 1)
87
+ half_img = side_img // 2
88
+ bx0 = cx_img - half_img
89
+ by0 = cy_img - half_img
90
+ bx1 = bx0 + side_img - 1
91
+ by1 = by0 + side_img - 1
92
+ clipped_x = False
93
+ clipped_y = False
94
+ if bx0 < 0:
95
+ shift = -bx0
96
+ bx0 = 0
97
+ bx1 += shift
98
+ clipped_x = True
99
+ if bx1 >= image.width:
100
+ shift = bx1 - (image.width - 1)
101
+ bx1 = image.width - 1
102
+ bx0 = max(0, bx0 - shift)
103
+ clipped_x = True
104
+ if by0 < 0:
105
+ shift = -by0
106
+ by0 = 0
107
+ by1 += shift
108
+ clipped_y = True
109
+ if by1 >= image.height:
110
+ shift = by1 - (image.height - 1)
111
+ by1 = image.height - 1
112
+ by0 = max(0, by0 - shift)
113
+ clipped_y = True
114
+ if clipped_x:
115
+ cx_draw = int(round((bx0 + bx1) / 2.0))
116
+ else:
117
+ cx_draw = int(round(min(max(cx_img, bx0), bx1)))
118
+ if clipped_y:
119
+ cy_draw = int(round((by0 + by1) / 2.0))
120
+ else:
121
+ cy_draw = int(round(min(max(cy_img, by0), by1)))
122
+ overlay_box = Image.new("RGBA", image.size, (0, 0, 0, 0))
123
+ box_draw = ImageDraw.Draw(overlay_box)
124
+ fill = (0, 102, 255, 60)
125
+ outline = (0, 102, 255, 255)
126
+ box_draw.rectangle((bx0, by0, bx1, by1), fill=fill, outline=outline, width=4)
127
+ box_draw.line((cx_draw, by0, cx_draw, by1), fill=outline, width=2)
128
+ box_draw.line((bx0, cy_draw, bx1, cy_draw), fill=outline, width=2)
129
+ radius = 8
130
+ box_draw.ellipse((cx_draw - radius, cy_draw - radius, cx_draw + radius, cy_draw + radius), fill=outline)
131
+
132
+ return {
133
+ "RGB": image,
134
+ "Depth": depth_vis,
135
+ "Flatness map (std)": flatness_img,
136
+ "Depth gradient": grad_img,
137
+ "Gradient mask": grad_mask_img,
138
+ "Water mask": water_mask_img,
139
+ "Road mask": road_mask_img,
140
+ "Roof mask": roof_mask_img,
141
+ "Segmentation hazards": seg_mask_img,
142
+ "Safety heatmap overlay": safe_overlay,
143
+ "Hazard overlay": hazard_overlay,
144
+ "Safety score": heat_gray,
145
+ "Landing spot overlay": Image.alpha_composite(spot_overlay, overlay_box),
146
+ }
147
+
148
+
149
+ def compose_view(
150
+ images_dict: dict,
151
+ base_view: str,
152
+ heat_on: bool,
153
+ heat_alpha: float,
154
+ hazard_on: bool,
155
+ hazard_alpha: float,
156
+ grad_on: bool,
157
+ flat_on: bool,
158
+ spot_on: bool,
159
+ ) -> Image.Image:
160
+ import gradio as gr
161
+
162
+ if not images_dict:
163
+ raise gr.Error("Run inference first, then select a view.")
164
+ if base_view not in images_dict:
165
+ raise gr.Error(f"Unknown view: {base_view}")
166
+
167
+ base = images_dict.get(base_view)
168
+ if base is None:
169
+ raise gr.Error(f"No image for view: {base_view}")
170
+ out = base.convert("RGBA")
171
+
172
+ if heat_on and "Safety heatmap overlay" in images_dict:
173
+ heat = images_dict["Safety heatmap overlay"]
174
+ if heat is not None:
175
+ heat_rgba = heat.convert("RGBA")
176
+ alpha_factor = max(0.0, min(1.0, heat_alpha))
177
+ alpha_channel = np.array(heat_rgba.getchannel("A"), dtype=np.uint8)
178
+ alpha_channel = (alpha_channel.astype(np.float32) * alpha_factor).astype(np.uint8)
179
+ heat_rgba.putalpha(Image.fromarray(alpha_channel, mode="L"))
180
+ out = Image.alpha_composite(out, heat_rgba)
181
+
182
+ if hazard_on and "Segmentation hazards" in images_dict:
183
+ hazard = images_dict["Segmentation hazards"]
184
+ if hazard is not None:
185
+ mask = hazard.convert("L")
186
+ alpha = int(max(0.0, min(1.0, hazard_alpha)) * 255)
187
+ overlay = Image.new("RGBA", hazard.size, (0, 0, 0, 0))
188
+ pattern = Image.new("RGBA", hazard.size, (255, 0, 0, alpha))
189
+ overlay = Image.composite(pattern, overlay, mask)
190
+ out = Image.alpha_composite(out, overlay)
191
+
192
+ if grad_on and "Depth gradient" in images_dict:
193
+ grad_img = images_dict["Depth gradient"]
194
+ if grad_img is not None:
195
+ grad_rgba = grad_img.convert("RGBA")
196
+ grad_rgba.putalpha(int(GRAD_ALPHA * 255))
197
+ out = Image.alpha_composite(out, grad_rgba)
198
+
199
+ if flat_on and "Flatness map (std)" in images_dict:
200
+ flat_img = images_dict["Flatness map (std)"]
201
+ if flat_img is not None:
202
+ flat_rgba = flat_img.convert("RGBA")
203
+ flat_rgba.putalpha(int(FLAT_ALPHA * 255))
204
+ out = Image.alpha_composite(out, flat_rgba)
205
+
206
+ if spot_on and "Landing spot overlay" in images_dict:
207
+ spot = images_dict["Landing spot overlay"]
208
+ if spot is not None:
209
+ out = Image.alpha_composite(out, spot.convert("RGBA"))
210
+
211
+ return out.convert("RGB")
212
+
213
+
214
+ __all__ = ["build_result_layers", "compose_view", "make_safety_heatmap"]