Spaces:
Sleeping
Sleeping
File size: 4,626 Bytes
5cee5a6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | """
Dataset preprocessing and feature extraction starter for landslide modeling.
Usage example:
python -m app.landslide_preprocessing --pairs_dir data/landslide_pairs --out_csv data/landslide_features.csv
Expected pairs_dir structure:
pairs_dir/
event_001/
before.png
after.png
label.png # optional (binary mask)
"""
from __future__ import annotations
import argparse
import csv
from pathlib import Path
import cv2
import numpy as np
from PIL import Image
def _norm01(x: np.ndarray) -> np.ndarray:
x = x.astype(np.float32)
lo = float(np.min(x))
hi = float(np.max(x))
if hi - lo < 1e-8:
return np.zeros_like(x, dtype=np.float32)
return (x - lo) / (hi - lo)
def _green_index(rgb: np.ndarray) -> np.ndarray:
r = rgb[:, :, 0].astype(np.float32)
g = rgb[:, :, 1].astype(np.float32)
return (g - r) / (g + r + 1e-6)
def _soil_score(rgb: np.ndarray) -> np.ndarray:
hsv = cv2.cvtColor(rgb, cv2.COLOR_RGB2HSV).astype(np.float32)
h = hsv[:, :, 0]
s = hsv[:, :, 1] / 255.0
v = hsv[:, :, 2] / 255.0
warm = ((h >= 8) & (h <= 38)).astype(np.float32)
sat = np.clip(1.0 - np.abs(s - 0.45) / 0.45, 0, 1)
bri = np.clip((v - 0.25) / 0.75, 0, 1)
return _norm01(0.5 * warm + 0.25 * sat + 0.25 * bri)
def _texture(gray: np.ndarray) -> np.ndarray:
lap = cv2.Laplacian(gray.astype(np.float32), cv2.CV_32F, ksize=3)
return _norm01(cv2.GaussianBlur(np.abs(lap), (5, 5), 0))
def _chip_stats(chip: np.ndarray) -> tuple[float, float, float]:
return float(np.mean(chip)), float(np.std(chip)), float(np.quantile(chip, 0.9))
def extract_pair_features(before_rgb: np.ndarray, after_rgb: np.ndarray, chip: int = 64):
if before_rgb.shape != after_rgb.shape:
after_rgb = cv2.resize(after_rgb, (before_rgb.shape[1], before_rgb.shape[0]))
g_before = _green_index(before_rgb)
g_after = _green_index(after_rgb)
veg_loss = _norm01(np.clip(g_before - g_after, 0, None))
soil_before = _soil_score(before_rgb)
soil_after = _soil_score(after_rgb)
soil_gain = _norm01(np.clip(soil_after - soil_before, 0, None))
gray_before = cv2.cvtColor(before_rgb, cv2.COLOR_RGB2GRAY)
gray_after = cv2.cvtColor(after_rgb, cv2.COLOR_RGB2GRAY)
tex_before = _texture(gray_before)
tex_after = _texture(gray_after)
tex_delta = _norm01(np.abs(tex_after - tex_before))
h, w = veg_loss.shape
rows = []
for y in range(0, h - chip + 1, chip):
for x in range(0, w - chip + 1, chip):
v = veg_loss[y:y + chip, x:x + chip]
s = soil_gain[y:y + chip, x:x + chip]
t = tex_delta[y:y + chip, x:x + chip]
v_m, v_sd, v_q = _chip_stats(v)
s_m, s_sd, s_q = _chip_stats(s)
t_m, t_sd, t_q = _chip_stats(t)
rows.append({
"x": x, "y": y,
"veg_loss_mean": v_m, "veg_loss_std": v_sd, "veg_loss_q90": v_q,
"soil_gain_mean": s_m, "soil_gain_std": s_sd, "soil_gain_q90": s_q,
"tex_delta_mean": t_m, "tex_delta_std": t_sd, "tex_delta_q90": t_q,
})
return rows
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--pairs_dir", required=True, help="Directory containing event folders with before/after images.")
parser.add_argument("--out_csv", required=True, help="Output CSV path.")
parser.add_argument("--chip", type=int, default=64, help="Chip size for feature aggregation.")
args = parser.parse_args()
pairs_dir = Path(args.pairs_dir)
out_csv = Path(args.out_csv)
out_csv.parent.mkdir(parents=True, exist_ok=True)
all_rows = []
for event_dir in sorted([p for p in pairs_dir.iterdir() if p.is_dir()]):
before_path = event_dir / "before.png"
after_path = event_dir / "after.png"
if not before_path.exists() or not after_path.exists():
continue
before = np.array(Image.open(before_path).convert("RGB"))
after = np.array(Image.open(after_path).convert("RGB"))
rows = extract_pair_features(before, after, chip=args.chip)
for r in rows:
r["event_id"] = event_dir.name
all_rows.extend(rows)
if not all_rows:
print("No valid before/after pairs found.")
return
fieldnames = list(all_rows[0].keys())
with out_csv.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(all_rows)
print(f"Wrote {len(all_rows)} rows to {out_csv}")
if __name__ == "__main__":
main()
|