| """Pixel-level change detection: z-score rasters, hotspot masks, clustering.""" |
| from __future__ import annotations |
|
|
| from typing import Any |
|
|
| import numpy as np |
| from scipy import ndimage |
|
|
|
|
| def compute_zscore_raster( |
| current: np.ndarray, |
| baseline_mean: np.ndarray, |
| baseline_std: np.ndarray, |
| min_std: float, |
| ) -> np.ndarray: |
| """Compute per-pixel z-score raster.""" |
| effective_std = np.maximum(baseline_std, min_std) |
| return (current - baseline_mean) / effective_std |
|
|
|
|
| def detect_hotspots( |
| zscore_raster: np.ndarray, |
| threshold: float = 2.0, |
| ) -> tuple[np.ndarray, float]: |
| """Create boolean hotspot mask and compute percentage of area affected.""" |
| valid = ~np.isnan(zscore_raster) |
| mask = valid & (np.abs(zscore_raster) > threshold) |
| n_valid = np.sum(valid) |
| pct = float(np.sum(mask) / n_valid * 100) if n_valid > 0 else 0.0 |
| return mask, pct |
|
|
|
|
| def cluster_hotspots( |
| mask: np.ndarray, |
| zscore_raster: np.ndarray, |
| pixel_area_ha: float, |
| min_pixels: int = 4, |
| top_n: int = 3, |
| ) -> list[dict[str, Any]]: |
| """Find connected hotspot clusters and return the top N by area.""" |
| labeled, n_features = ndimage.label(mask) |
|
|
| clusters: list[dict[str, Any]] = [] |
| for label_id in range(1, n_features + 1): |
| pixels = labeled == label_id |
| n_pixels = int(np.sum(pixels)) |
| if n_pixels < min_pixels: |
| continue |
|
|
| rows, cols = np.where(pixels) |
| clusters.append({ |
| "area_ha": n_pixels * pixel_area_ha, |
| "centroid_row": float(np.mean(rows)), |
| "centroid_col": float(np.mean(cols)), |
| "mean_zscore": float(np.nanmean(zscore_raster[pixels])), |
| "n_pixels": n_pixels, |
| }) |
|
|
| clusters.sort(key=lambda c: c["area_ha"], reverse=True) |
| return clusters[:top_n] |
|
|