Spaces:
Running
Running
| """ | |
| Landslide Detection Engine (Uttarakhand-focused starter). | |
| This module is intentionally separate from the generic change detection engine. | |
| It uses landslide-oriented cues from before/after optical imagery: | |
| - vegetation loss | |
| - bare-soil increase | |
| - texture roughness change | |
| - edge disruption | |
| """ | |
| from __future__ import annotations | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| def _preprocess(image: Image.Image, max_size: int = 2200) -> np.ndarray: | |
| arr = np.array(image.convert("RGB")) | |
| h, w = arr.shape[:2] | |
| if max(h, w) > max_size: | |
| s = max_size / max(h, w) | |
| arr = cv2.resize(arr, (max(1, int(w * s)), max(1, int(h * s))), interpolation=cv2.INTER_AREA) | |
| return arr | |
| def _norm01(x: np.ndarray) -> np.ndarray: | |
| x = x.astype(np.float32) | |
| lo = float(np.min(x)) | |
| hi = float(np.max(x)) | |
| if hi - lo < 1e-8: | |
| return np.zeros_like(x, dtype=np.float32) | |
| return (x - lo) / (hi - lo) | |
| def _green_index(rgb: np.ndarray) -> np.ndarray: | |
| # RGB proxy for vegetation index when NIR is unavailable. | |
| r = rgb[:, :, 0].astype(np.float32) | |
| g = rgb[:, :, 1].astype(np.float32) | |
| return (g - r) / (g + r + 1e-6) | |
| def _soil_score(rgb: np.ndarray) -> np.ndarray: | |
| hsv = cv2.cvtColor(rgb, cv2.COLOR_RGB2HSV).astype(np.float32) | |
| h = hsv[:, :, 0] | |
| s = hsv[:, :, 1] / 255.0 | |
| v = hsv[:, :, 2] / 255.0 | |
| # Dry/bare soil often: warm hue, medium saturation, medium/high brightness. | |
| warm = ((h >= 8) & (h <= 38)).astype(np.float32) | |
| sat = np.clip(1.0 - np.abs(s - 0.45) / 0.45, 0, 1) | |
| bri = np.clip((v - 0.25) / 0.75, 0, 1) | |
| return _norm01(0.5 * warm + 0.25 * sat + 0.25 * bri) | |
| def _texture_roughness(gray: np.ndarray) -> np.ndarray: | |
| lap = cv2.Laplacian(gray, cv2.CV_32F, ksize=3) | |
| rough = cv2.GaussianBlur(np.abs(lap), (5, 5), 0) | |
| return _norm01(rough) | |
| def _edge_change(before: np.ndarray, after: np.ndarray) -> np.ndarray: | |
| g1 = cv2.cvtColor(before, cv2.COLOR_RGB2GRAY) | |
| g2 = cv2.cvtColor(after, cv2.COLOR_RGB2GRAY) | |
| e1 = cv2.Canny(g1, 60, 140) | |
| e2 = cv2.Canny(g2, 60, 140) | |
| diff = cv2.absdiff(e1, e2).astype(np.float32) / 255.0 | |
| return cv2.GaussianBlur(diff, (5, 5), 0) | |
| def _clean(mask: np.ndarray) -> np.ndarray: | |
| m = mask.copy() | |
| h, w = m.shape[:2] | |
| b = max(8, int(min(h, w) * 0.01)) | |
| m[:b, :] = 0 | |
| m[-b:, :] = 0 | |
| m[:, :b] = 0 | |
| m[:, -b:] = 0 | |
| m = cv2.medianBlur(m, 5) | |
| k_open = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) | |
| k_close = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (9, 9)) | |
| m = cv2.morphologyEx(m, cv2.MORPH_OPEN, k_open) | |
| m = cv2.morphologyEx(m, cv2.MORPH_CLOSE, k_close) | |
| return m | |
| def _extract_regions(mask: np.ndarray, after: np.ndarray, min_area: int = 350): | |
| n, labels, stats, cents = cv2.connectedComponentsWithStats(mask, connectivity=8) | |
| h, w = mask.shape[:2] | |
| img_area = h * w | |
| regions = [] | |
| rid = 0 | |
| for i in range(1, n): | |
| area = int(stats[i, cv2.CC_STAT_AREA]) | |
| if area < min_area: | |
| continue | |
| x = int(stats[i, cv2.CC_STAT_LEFT]) | |
| y = int(stats[i, cv2.CC_STAT_TOP]) | |
| bw = int(stats[i, cv2.CC_STAT_WIDTH]) | |
| bh = int(stats[i, cv2.CC_STAT_HEIGHT]) | |
| if bw * bh > img_area * 0.9: | |
| continue | |
| cx, cy = cents[i] | |
| ratio = area / max(1, bw * bh) | |
| conf = float(np.clip(0.25 + ratio * 0.65, 0.25, 0.95)) | |
| sev = "minor" | |
| if area / img_area > 0.02: | |
| sev = "major" | |
| elif area / img_area > 0.006: | |
| sev = "moderate" | |
| rid += 1 | |
| regions.append( | |
| { | |
| "id": rid, | |
| "area": area, | |
| "bbox": (x, y, bw, bh), | |
| "center": (int(cx), int(cy)), | |
| "object_type": "Landslide Suspected Zone", | |
| "confidence": conf, | |
| "severity": sev, | |
| "sub_type": "Debris / Slope Failure", | |
| "sub_type_confidence": conf, | |
| "estimated_stories": None, | |
| "estimated_height_m": None, | |
| "construction_stage": None, | |
| } | |
| ) | |
| return regions[:80] | |
| def _visualize(after: np.ndarray, mask: np.ndarray, regions: list[dict]) -> np.ndarray: | |
| out = after.copy().astype(np.float32) | |
| m = (mask > 127).astype(np.float32) | |
| amber = np.zeros_like(out) | |
| amber[:, :, 0] = 255 # R | |
| amber[:, :, 1] = 165 # G | |
| alpha = 0.35 | |
| for c in range(3): | |
| out[:, :, c] = out[:, :, c] * (1 - m * alpha) + amber[:, :, c] * (m * alpha) | |
| vis = np.clip(out, 0, 255).astype(np.uint8) | |
| for r in regions: | |
| x, y, w, h = r["bbox"] | |
| color = (0, 140, 255) # BGR-like style for warning tone in RGB draw context | |
| cv2.rectangle(vis, (x, y), (x + w, y + h), color, 2) | |
| label = f'{r["id"]}' | |
| cv2.putText(vis, label, (x + 4, max(14, y - 4)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1, cv2.LINE_AA) | |
| return vis | |
| def run_landslide_detection( | |
| before_pil: Image.Image, | |
| after_pil: Image.Image, | |
| model_name: str = "Rule-Based v1", | |
| detection_sensitivity: float = 0.6, | |
| min_region_area: int | None = None, | |
| ): | |
| """ | |
| Returns: change_mask, result_image, stats, regions. | |
| """ | |
| before = _preprocess(before_pil) | |
| after = _preprocess(after_pil) | |
| if before.shape != after.shape: | |
| after = cv2.resize(after, (before.shape[1], before.shape[0]), interpolation=cv2.INTER_LINEAR) | |
| g_before = _green_index(before) | |
| g_after = _green_index(after) | |
| veg_loss = _norm01(np.clip(g_before - g_after, 0, None)) | |
| soil_before = _soil_score(before) | |
| soil_after = _soil_score(after) | |
| soil_gain = _norm01(np.clip(soil_after - soil_before, 0, None)) | |
| gray_before = cv2.cvtColor(before, cv2.COLOR_RGB2GRAY).astype(np.float32) | |
| gray_after = cv2.cvtColor(after, cv2.COLOR_RGB2GRAY).astype(np.float32) | |
| rough_before = _texture_roughness(gray_before) | |
| rough_after = _texture_roughness(gray_after) | |
| rough_change = _norm01(np.abs(rough_after - rough_before)) | |
| edge_change = _edge_change(before, after) | |
| sens = float(np.clip(detection_sensitivity, 0.0, 1.0)) | |
| # Landslide-oriented fusion | |
| fused = ( | |
| 0.38 * veg_loss | |
| + 0.30 * soil_gain | |
| + 0.20 * rough_change | |
| + 0.12 * edge_change | |
| ) | |
| fused = cv2.GaussianBlur(fused.astype(np.float32), (7, 7), 0) | |
| # Higher sensitivity => lower quantile threshold. | |
| q = float(np.clip(0.965 - (sens - 0.5) * 0.08, 0.88, 0.98)) | |
| thr = float(np.quantile(fused, q)) | |
| mask = (fused >= thr).astype(np.uint8) * 255 | |
| mask = _clean(mask) | |
| if min_region_area is None: | |
| min_region_area = int(max(250, min(1400, mask.shape[0] * mask.shape[1] * 0.00010))) | |
| regions = _extract_regions(mask, after, min_area=int(min_region_area)) | |
| result = _visualize(after, mask, regions) | |
| total = int(mask.shape[0] * mask.shape[1]) | |
| changed = int(np.sum(mask > 127)) | |
| stats = { | |
| "total_pixels": total, | |
| "changed_pixels": changed, | |
| "unchanged_pixels": total - changed, | |
| "change_percentage": (changed / total * 100.0) if total else 0.0, | |
| "image_width": mask.shape[1], | |
| "image_height": mask.shape[0], | |
| "threshold_debug": { | |
| "method": f"Landslide Detection ({model_name})", | |
| "threshold_used": int(np.clip(thr * 255.0, 0, 255)), | |
| "threshold_percentile_q": q, | |
| "sensitivity": sens, | |
| }, | |
| "params": { | |
| "detection_sensitivity": sens, | |
| "min_region_area": int(min_region_area), | |
| "model_name": model_name, | |
| }, | |
| } | |
| return mask, result, stats, regions | |