"""Pixel-level change detection: z-score rasters, hotspot masks, clustering.""" from __future__ import annotations from typing import Any import numpy as np from scipy import ndimage def compute_zscore_raster( current: np.ndarray, baseline_mean: np.ndarray, baseline_std: np.ndarray, min_std: float, ) -> np.ndarray: """Compute per-pixel z-score raster.""" effective_std = np.maximum(baseline_std, min_std) return (current - baseline_mean) / effective_std def detect_hotspots( zscore_raster: np.ndarray, threshold: float = 2.0, ) -> tuple[np.ndarray, float]: """Create boolean hotspot mask and compute percentage of area affected.""" valid = ~np.isnan(zscore_raster) mask = valid & (np.abs(zscore_raster) > threshold) n_valid = np.sum(valid) pct = float(np.sum(mask) / n_valid * 100) if n_valid > 0 else 0.0 return mask, pct def cluster_hotspots( mask: np.ndarray, zscore_raster: np.ndarray, pixel_area_ha: float, min_pixels: int = 4, top_n: int = 3, ) -> list[dict[str, Any]]: """Find connected hotspot clusters and return the top N by area.""" labeled, n_features = ndimage.label(mask) clusters: list[dict[str, Any]] = [] for label_id in range(1, n_features + 1): pixels = labeled == label_id n_pixels = int(np.sum(pixels)) if n_pixels < min_pixels: continue rows, cols = np.where(pixels) clusters.append({ "area_ha": n_pixels * pixel_area_ha, "centroid_row": float(np.mean(rows)), "centroid_col": float(np.mean(cols)), "mean_zscore": float(np.nanmean(zscore_raster[pixels])), "n_pixels": n_pixels, }) clusters.sort(key=lambda c: c["area_ha"], reverse=True) return clusters[:top_n]