satdetect / app /landslide_engine.py
coderuday21's picture
Add landslide detection menu, separate engine, and Uttarakhand integration plan
5cee5a6
raw
history blame
7.65 kB
"""
Landslide Detection Engine (Uttarakhand-focused starter).
This module is intentionally separate from the generic change detection engine.
It uses landslide-oriented cues from before/after optical imagery:
- vegetation loss
- bare-soil increase
- texture roughness change
- edge disruption
"""
from __future__ import annotations
import cv2
import numpy as np
from PIL import Image
def _preprocess(image: Image.Image, max_size: int = 2200) -> np.ndarray:
arr = np.array(image.convert("RGB"))
h, w = arr.shape[:2]
if max(h, w) > max_size:
s = max_size / max(h, w)
arr = cv2.resize(arr, (max(1, int(w * s)), max(1, int(h * s))), interpolation=cv2.INTER_AREA)
return arr
def _norm01(x: np.ndarray) -> np.ndarray:
x = x.astype(np.float32)
lo = float(np.min(x))
hi = float(np.max(x))
if hi - lo < 1e-8:
return np.zeros_like(x, dtype=np.float32)
return (x - lo) / (hi - lo)
def _green_index(rgb: np.ndarray) -> np.ndarray:
# RGB proxy for vegetation index when NIR is unavailable.
r = rgb[:, :, 0].astype(np.float32)
g = rgb[:, :, 1].astype(np.float32)
return (g - r) / (g + r + 1e-6)
def _soil_score(rgb: np.ndarray) -> np.ndarray:
hsv = cv2.cvtColor(rgb, cv2.COLOR_RGB2HSV).astype(np.float32)
h = hsv[:, :, 0]
s = hsv[:, :, 1] / 255.0
v = hsv[:, :, 2] / 255.0
# Dry/bare soil often: warm hue, medium saturation, medium/high brightness.
warm = ((h >= 8) & (h <= 38)).astype(np.float32)
sat = np.clip(1.0 - np.abs(s - 0.45) / 0.45, 0, 1)
bri = np.clip((v - 0.25) / 0.75, 0, 1)
return _norm01(0.5 * warm + 0.25 * sat + 0.25 * bri)
def _texture_roughness(gray: np.ndarray) -> np.ndarray:
lap = cv2.Laplacian(gray, cv2.CV_32F, ksize=3)
rough = cv2.GaussianBlur(np.abs(lap), (5, 5), 0)
return _norm01(rough)
def _edge_change(before: np.ndarray, after: np.ndarray) -> np.ndarray:
g1 = cv2.cvtColor(before, cv2.COLOR_RGB2GRAY)
g2 = cv2.cvtColor(after, cv2.COLOR_RGB2GRAY)
e1 = cv2.Canny(g1, 60, 140)
e2 = cv2.Canny(g2, 60, 140)
diff = cv2.absdiff(e1, e2).astype(np.float32) / 255.0
return cv2.GaussianBlur(diff, (5, 5), 0)
def _clean(mask: np.ndarray) -> np.ndarray:
m = mask.copy()
h, w = m.shape[:2]
b = max(8, int(min(h, w) * 0.01))
m[:b, :] = 0
m[-b:, :] = 0
m[:, :b] = 0
m[:, -b:] = 0
m = cv2.medianBlur(m, 5)
k_open = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
k_close = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (9, 9))
m = cv2.morphologyEx(m, cv2.MORPH_OPEN, k_open)
m = cv2.morphologyEx(m, cv2.MORPH_CLOSE, k_close)
return m
def _extract_regions(mask: np.ndarray, after: np.ndarray, min_area: int = 350):
n, labels, stats, cents = cv2.connectedComponentsWithStats(mask, connectivity=8)
h, w = mask.shape[:2]
img_area = h * w
regions = []
rid = 0
for i in range(1, n):
area = int(stats[i, cv2.CC_STAT_AREA])
if area < min_area:
continue
x = int(stats[i, cv2.CC_STAT_LEFT])
y = int(stats[i, cv2.CC_STAT_TOP])
bw = int(stats[i, cv2.CC_STAT_WIDTH])
bh = int(stats[i, cv2.CC_STAT_HEIGHT])
if bw * bh > img_area * 0.9:
continue
cx, cy = cents[i]
ratio = area / max(1, bw * bh)
conf = float(np.clip(0.25 + ratio * 0.65, 0.25, 0.95))
sev = "minor"
if area / img_area > 0.02:
sev = "major"
elif area / img_area > 0.006:
sev = "moderate"
rid += 1
regions.append(
{
"id": rid,
"area": area,
"bbox": (x, y, bw, bh),
"center": (int(cx), int(cy)),
"object_type": "Landslide Suspected Zone",
"confidence": conf,
"severity": sev,
"sub_type": "Debris / Slope Failure",
"sub_type_confidence": conf,
"estimated_stories": None,
"estimated_height_m": None,
"construction_stage": None,
}
)
return regions[:80]
def _visualize(after: np.ndarray, mask: np.ndarray, regions: list[dict]) -> np.ndarray:
out = after.copy().astype(np.float32)
m = (mask > 127).astype(np.float32)
amber = np.zeros_like(out)
amber[:, :, 0] = 255 # R
amber[:, :, 1] = 165 # G
alpha = 0.35
for c in range(3):
out[:, :, c] = out[:, :, c] * (1 - m * alpha) + amber[:, :, c] * (m * alpha)
vis = np.clip(out, 0, 255).astype(np.uint8)
for r in regions:
x, y, w, h = r["bbox"]
color = (0, 140, 255) # BGR-like style for warning tone in RGB draw context
cv2.rectangle(vis, (x, y), (x + w, y + h), color, 2)
label = f'{r["id"]}'
cv2.putText(vis, label, (x + 4, max(14, y - 4)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1, cv2.LINE_AA)
return vis
def run_landslide_detection(
before_pil: Image.Image,
after_pil: Image.Image,
model_name: str = "Rule-Based v1",
detection_sensitivity: float = 0.6,
min_region_area: int | None = None,
):
"""
Returns: change_mask, result_image, stats, regions.
"""
before = _preprocess(before_pil)
after = _preprocess(after_pil)
if before.shape != after.shape:
after = cv2.resize(after, (before.shape[1], before.shape[0]), interpolation=cv2.INTER_LINEAR)
g_before = _green_index(before)
g_after = _green_index(after)
veg_loss = _norm01(np.clip(g_before - g_after, 0, None))
soil_before = _soil_score(before)
soil_after = _soil_score(after)
soil_gain = _norm01(np.clip(soil_after - soil_before, 0, None))
gray_before = cv2.cvtColor(before, cv2.COLOR_RGB2GRAY).astype(np.float32)
gray_after = cv2.cvtColor(after, cv2.COLOR_RGB2GRAY).astype(np.float32)
rough_before = _texture_roughness(gray_before)
rough_after = _texture_roughness(gray_after)
rough_change = _norm01(np.abs(rough_after - rough_before))
edge_change = _edge_change(before, after)
sens = float(np.clip(detection_sensitivity, 0.0, 1.0))
# Landslide-oriented fusion
fused = (
0.38 * veg_loss
+ 0.30 * soil_gain
+ 0.20 * rough_change
+ 0.12 * edge_change
)
fused = cv2.GaussianBlur(fused.astype(np.float32), (7, 7), 0)
# Higher sensitivity => lower quantile threshold.
q = float(np.clip(0.965 - (sens - 0.5) * 0.08, 0.88, 0.98))
thr = float(np.quantile(fused, q))
mask = (fused >= thr).astype(np.uint8) * 255
mask = _clean(mask)
if min_region_area is None:
min_region_area = int(max(250, min(1400, mask.shape[0] * mask.shape[1] * 0.00010)))
regions = _extract_regions(mask, after, min_area=int(min_region_area))
result = _visualize(after, mask, regions)
total = int(mask.shape[0] * mask.shape[1])
changed = int(np.sum(mask > 127))
stats = {
"total_pixels": total,
"changed_pixels": changed,
"unchanged_pixels": total - changed,
"change_percentage": (changed / total * 100.0) if total else 0.0,
"image_width": mask.shape[1],
"image_height": mask.shape[0],
"threshold_debug": {
"method": f"Landslide Detection ({model_name})",
"threshold_used": int(np.clip(thr * 255.0, 0, 255)),
"threshold_percentile_q": q,
"sensitivity": sens,
},
"params": {
"detection_sensitivity": sens,
"min_region_area": int(min_region_area),
"model_name": model_name,
},
}
return mask, result, stats, regions