Spaces:
Sleeping
Sleeping
| """ | |
| Dataset preprocessing and feature extraction starter for landslide modeling. | |
| Usage example: | |
| python -m app.landslide_preprocessing --pairs_dir data/landslide_pairs --out_csv data/landslide_features.csv | |
| Expected pairs_dir structure: | |
| pairs_dir/ | |
| event_001/ | |
| before.png | |
| after.png | |
| label.png # optional (binary mask) | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import csv | |
| from pathlib import Path | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| def _norm01(x: np.ndarray) -> np.ndarray: | |
| x = x.astype(np.float32) | |
| lo = float(np.min(x)) | |
| hi = float(np.max(x)) | |
| if hi - lo < 1e-8: | |
| return np.zeros_like(x, dtype=np.float32) | |
| return (x - lo) / (hi - lo) | |
| def _green_index(rgb: np.ndarray) -> np.ndarray: | |
| r = rgb[:, :, 0].astype(np.float32) | |
| g = rgb[:, :, 1].astype(np.float32) | |
| return (g - r) / (g + r + 1e-6) | |
| def _soil_score(rgb: np.ndarray) -> np.ndarray: | |
| hsv = cv2.cvtColor(rgb, cv2.COLOR_RGB2HSV).astype(np.float32) | |
| h = hsv[:, :, 0] | |
| s = hsv[:, :, 1] / 255.0 | |
| v = hsv[:, :, 2] / 255.0 | |
| warm = ((h >= 8) & (h <= 38)).astype(np.float32) | |
| sat = np.clip(1.0 - np.abs(s - 0.45) / 0.45, 0, 1) | |
| bri = np.clip((v - 0.25) / 0.75, 0, 1) | |
| return _norm01(0.5 * warm + 0.25 * sat + 0.25 * bri) | |
| def _texture(gray: np.ndarray) -> np.ndarray: | |
| lap = cv2.Laplacian(gray.astype(np.float32), cv2.CV_32F, ksize=3) | |
| return _norm01(cv2.GaussianBlur(np.abs(lap), (5, 5), 0)) | |
| def _chip_stats(chip: np.ndarray) -> tuple[float, float, float]: | |
| return float(np.mean(chip)), float(np.std(chip)), float(np.quantile(chip, 0.9)) | |
| def extract_pair_features(before_rgb: np.ndarray, after_rgb: np.ndarray, chip: int = 64): | |
| if before_rgb.shape != after_rgb.shape: | |
| after_rgb = cv2.resize(after_rgb, (before_rgb.shape[1], before_rgb.shape[0])) | |
| g_before = _green_index(before_rgb) | |
| g_after = _green_index(after_rgb) | |
| veg_loss = _norm01(np.clip(g_before - g_after, 0, None)) | |
| soil_before = _soil_score(before_rgb) | |
| soil_after = _soil_score(after_rgb) | |
| soil_gain = _norm01(np.clip(soil_after - soil_before, 0, None)) | |
| gray_before = cv2.cvtColor(before_rgb, cv2.COLOR_RGB2GRAY) | |
| gray_after = cv2.cvtColor(after_rgb, cv2.COLOR_RGB2GRAY) | |
| tex_before = _texture(gray_before) | |
| tex_after = _texture(gray_after) | |
| tex_delta = _norm01(np.abs(tex_after - tex_before)) | |
| h, w = veg_loss.shape | |
| rows = [] | |
| for y in range(0, h - chip + 1, chip): | |
| for x in range(0, w - chip + 1, chip): | |
| v = veg_loss[y:y + chip, x:x + chip] | |
| s = soil_gain[y:y + chip, x:x + chip] | |
| t = tex_delta[y:y + chip, x:x + chip] | |
| v_m, v_sd, v_q = _chip_stats(v) | |
| s_m, s_sd, s_q = _chip_stats(s) | |
| t_m, t_sd, t_q = _chip_stats(t) | |
| rows.append({ | |
| "x": x, "y": y, | |
| "veg_loss_mean": v_m, "veg_loss_std": v_sd, "veg_loss_q90": v_q, | |
| "soil_gain_mean": s_m, "soil_gain_std": s_sd, "soil_gain_q90": s_q, | |
| "tex_delta_mean": t_m, "tex_delta_std": t_sd, "tex_delta_q90": t_q, | |
| }) | |
| return rows | |
| def main(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--pairs_dir", required=True, help="Directory containing event folders with before/after images.") | |
| parser.add_argument("--out_csv", required=True, help="Output CSV path.") | |
| parser.add_argument("--chip", type=int, default=64, help="Chip size for feature aggregation.") | |
| args = parser.parse_args() | |
| pairs_dir = Path(args.pairs_dir) | |
| out_csv = Path(args.out_csv) | |
| out_csv.parent.mkdir(parents=True, exist_ok=True) | |
| all_rows = [] | |
| for event_dir in sorted([p for p in pairs_dir.iterdir() if p.is_dir()]): | |
| before_path = event_dir / "before.png" | |
| after_path = event_dir / "after.png" | |
| if not before_path.exists() or not after_path.exists(): | |
| continue | |
| before = np.array(Image.open(before_path).convert("RGB")) | |
| after = np.array(Image.open(after_path).convert("RGB")) | |
| rows = extract_pair_features(before, after, chip=args.chip) | |
| for r in rows: | |
| r["event_id"] = event_dir.name | |
| all_rows.extend(rows) | |
| if not all_rows: | |
| print("No valid before/after pairs found.") | |
| return | |
| fieldnames = list(all_rows[0].keys()) | |
| with out_csv.open("w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=fieldnames) | |
| writer.writeheader() | |
| writer.writerows(all_rows) | |
| print(f"Wrote {len(all_rows)} rows to {out_csv}") | |
| if __name__ == "__main__": | |
| main() | |