satdetect / app /landslide_preprocessing.py
coderuday21's picture
Add landslide detection menu, separate engine, and Uttarakhand integration plan
5cee5a6
raw
history blame
4.63 kB
"""
Dataset preprocessing and feature extraction starter for landslide modeling.
Usage example:
python -m app.landslide_preprocessing --pairs_dir data/landslide_pairs --out_csv data/landslide_features.csv
Expected pairs_dir structure:
pairs_dir/
event_001/
before.png
after.png
label.png # optional (binary mask)
"""
from __future__ import annotations
import argparse
import csv
from pathlib import Path
import cv2
import numpy as np
from PIL import Image
def _norm01(x: np.ndarray) -> np.ndarray:
x = x.astype(np.float32)
lo = float(np.min(x))
hi = float(np.max(x))
if hi - lo < 1e-8:
return np.zeros_like(x, dtype=np.float32)
return (x - lo) / (hi - lo)
def _green_index(rgb: np.ndarray) -> np.ndarray:
r = rgb[:, :, 0].astype(np.float32)
g = rgb[:, :, 1].astype(np.float32)
return (g - r) / (g + r + 1e-6)
def _soil_score(rgb: np.ndarray) -> np.ndarray:
hsv = cv2.cvtColor(rgb, cv2.COLOR_RGB2HSV).astype(np.float32)
h = hsv[:, :, 0]
s = hsv[:, :, 1] / 255.0
v = hsv[:, :, 2] / 255.0
warm = ((h >= 8) & (h <= 38)).astype(np.float32)
sat = np.clip(1.0 - np.abs(s - 0.45) / 0.45, 0, 1)
bri = np.clip((v - 0.25) / 0.75, 0, 1)
return _norm01(0.5 * warm + 0.25 * sat + 0.25 * bri)
def _texture(gray: np.ndarray) -> np.ndarray:
lap = cv2.Laplacian(gray.astype(np.float32), cv2.CV_32F, ksize=3)
return _norm01(cv2.GaussianBlur(np.abs(lap), (5, 5), 0))
def _chip_stats(chip: np.ndarray) -> tuple[float, float, float]:
return float(np.mean(chip)), float(np.std(chip)), float(np.quantile(chip, 0.9))
def extract_pair_features(before_rgb: np.ndarray, after_rgb: np.ndarray, chip: int = 64):
if before_rgb.shape != after_rgb.shape:
after_rgb = cv2.resize(after_rgb, (before_rgb.shape[1], before_rgb.shape[0]))
g_before = _green_index(before_rgb)
g_after = _green_index(after_rgb)
veg_loss = _norm01(np.clip(g_before - g_after, 0, None))
soil_before = _soil_score(before_rgb)
soil_after = _soil_score(after_rgb)
soil_gain = _norm01(np.clip(soil_after - soil_before, 0, None))
gray_before = cv2.cvtColor(before_rgb, cv2.COLOR_RGB2GRAY)
gray_after = cv2.cvtColor(after_rgb, cv2.COLOR_RGB2GRAY)
tex_before = _texture(gray_before)
tex_after = _texture(gray_after)
tex_delta = _norm01(np.abs(tex_after - tex_before))
h, w = veg_loss.shape
rows = []
for y in range(0, h - chip + 1, chip):
for x in range(0, w - chip + 1, chip):
v = veg_loss[y:y + chip, x:x + chip]
s = soil_gain[y:y + chip, x:x + chip]
t = tex_delta[y:y + chip, x:x + chip]
v_m, v_sd, v_q = _chip_stats(v)
s_m, s_sd, s_q = _chip_stats(s)
t_m, t_sd, t_q = _chip_stats(t)
rows.append({
"x": x, "y": y,
"veg_loss_mean": v_m, "veg_loss_std": v_sd, "veg_loss_q90": v_q,
"soil_gain_mean": s_m, "soil_gain_std": s_sd, "soil_gain_q90": s_q,
"tex_delta_mean": t_m, "tex_delta_std": t_sd, "tex_delta_q90": t_q,
})
return rows
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--pairs_dir", required=True, help="Directory containing event folders with before/after images.")
parser.add_argument("--out_csv", required=True, help="Output CSV path.")
parser.add_argument("--chip", type=int, default=64, help="Chip size for feature aggregation.")
args = parser.parse_args()
pairs_dir = Path(args.pairs_dir)
out_csv = Path(args.out_csv)
out_csv.parent.mkdir(parents=True, exist_ok=True)
all_rows = []
for event_dir in sorted([p for p in pairs_dir.iterdir() if p.is_dir()]):
before_path = event_dir / "before.png"
after_path = event_dir / "after.png"
if not before_path.exists() or not after_path.exists():
continue
before = np.array(Image.open(before_path).convert("RGB"))
after = np.array(Image.open(after_path).convert("RGB"))
rows = extract_pair_features(before, after, chip=args.chip)
for r in rows:
r["event_id"] = event_dir.name
all_rows.extend(rows)
if not all_rows:
print("No valid before/after pairs found.")
return
fieldnames = list(all_rows[0].keys())
with out_csv.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(all_rows)
print(f"Wrote {len(all_rows)} rows to {out_csv}")
if __name__ == "__main__":
main()