""" preprocess.py — Data normalization, cloud masking, and patching utilities. Handles: - Per-channel mean/std normalization for satellite images - MinMaxScaler on time-series (fit on train split only) - Cloud masking via NDVI threshold - Saving as .npy for fast DataLoader access """ import logging from pathlib import Path from typing import Dict, Optional, Tuple import numpy as np from sklearn.preprocessing import MinMaxScaler from src.training.config import ( PATCH_SIZE, IMAGE_CHANNELS, TIMESERIES_WINDOW, TIMESERIES_FEATURES, IMAGE_PATCHES_DIR, TIMESERIES_DIR, PROCESSED_DIR, ) logger = logging.getLogger(__name__) class ImageNormalizer: """ Per-channel mean/std normalization for 4-channel satellite images. Fit on training split only, then apply to val/test to prevent data leakage. """ def __init__(self): self.channel_means: Optional[np.ndarray] = None self.channel_stds: Optional[np.ndarray] = None def fit(self, images: np.ndarray) -> "ImageNormalizer": """ Compute per-channel statistics from training images. Args: images: Shape (N, C, H, W). """ # Compute mean and std over all spatial positions and samples self.channel_means = images.mean(axis=(0, 2, 3)) # (C,) self.channel_stds = images.std(axis=(0, 2, 3)) # (C,) # Avoid division by zero self.channel_stds = np.where( self.channel_stds < 1e-8, 1.0, self.channel_stds ) logger.info(f"Image normalizer fit — means: {self.channel_means}, " f"stds: {self.channel_stds}") return self def transform(self, images: np.ndarray) -> np.ndarray: """Normalize images using fitted statistics.""" if self.channel_means is None: raise RuntimeError("ImageNormalizer not fitted. Call .fit() first.") normalized = images.copy() for c in range(images.shape[1]): normalized[:, c] = ( (images[:, c] - self.channel_means[c]) / self.channel_stds[c] ) return normalized def fit_transform(self, images: np.ndarray) -> np.ndarray: """Fit and transform in one step.""" return self.fit(images).transform(images) def save(self, path: Path) -> None: """Save normalizer statistics.""" np.savez(path, means=self.channel_means, stds=self.channel_stds) def load(self, path: Path) -> "ImageNormalizer": """Load normalizer statistics.""" data = np.load(path) self.channel_means = data["means"] self.channel_stds = data["stds"] return self class TimeSeriesScaler: """ MinMaxScaler for weather + AQI time series data. Fit on training split only to prevent data leakage. """ def __init__(self): self.scaler = MinMaxScaler() self._is_fitted = False def fit(self, data: np.ndarray) -> "TimeSeriesScaler": """ Fit scaler on training data. Args: data: Shape (N, T, F) — N samples, T timesteps, F features. """ # Reshape to 2D for sklearn: (N*T, F) n, t, f = data.shape flat = data.reshape(-1, f) self.scaler.fit(flat) self._is_fitted = True logger.info(f"TimeSeriesScaler fit on {n} samples with {f} features.") return self def transform(self, data: np.ndarray) -> np.ndarray: """Scale time series to [0, 1].""" if not self._is_fitted: raise RuntimeError("TimeSeriesScaler not fitted. Call .fit() first.") n, t, f = data.shape flat = data.reshape(-1, f) scaled = self.scaler.transform(flat) return scaled.reshape(n, t, f).astype(np.float32) def fit_transform(self, data: np.ndarray) -> np.ndarray: """Fit and transform in one step.""" return self.fit(data).transform(data) def inverse_transform(self, data: np.ndarray) -> np.ndarray: """Inverse scale back to original range.""" n, t, f = data.shape flat = data.reshape(-1, f) original = self.scaler.inverse_transform(flat) return original.reshape(n, t, f).astype(np.float32) def save(self, path: Path) -> None: """Save scaler parameters.""" np.savez( path, data_min=self.scaler.data_min_, data_max=self.scaler.data_max_, scale=self.scaler.scale_, min_val=self.scaler.min_, ) def load(self, path: Path) -> "TimeSeriesScaler": """Load scaler parameters.""" data = np.load(path) self.scaler.data_min_ = data["data_min"] self.scaler.data_max_ = data["data_max"] self.scaler.scale_ = data["scale"] self.scaler.min_ = data["min_val"] self.scaler.n_features_in_ = len(data["data_min"]) self._is_fitted = True return self def apply_cloud_mask( images: np.ndarray, ndvi_threshold: float = 0.1, ) -> np.ndarray: """ Apply cloud masking using NDVI threshold. Pixels where NDVI is below the threshold (likely cloud or snow) are zeroed out. NDVI = (NIR - Red) / (NIR + Red + epsilon). Args: images: Shape (N, 4, H, W) — channels [R, G, B, NIR]. ndvi_threshold: NDVI values below this are masked. Returns: Cloud-masked images. """ red = images[:, 0, :, :] # Red channel nir = images[:, 3, :, :] # NIR channel # NDVI calculation with epsilon for numerical stability epsilon = 1e-8 ndvi = (nir - red) / (nir + red + epsilon) # Create cloud mask: True where likely cloud (low NDVI) cloud_mask = ndvi < ndvi_threshold # (N, H, W) # Apply mask to all channels masked = images.copy() for c in range(images.shape[1]): masked[:, c][cloud_mask] = 0.0 num_masked = cloud_mask.sum() total_pixels = cloud_mask.size logger.info(f"Cloud masking: {num_masked}/{total_pixels} pixels masked " f"({100 * num_masked / total_pixels:.1f}%)") return masked def combine_weather_aqi( weather: np.ndarray, aqi_history: np.ndarray, ) -> np.ndarray: """ Combine weather features with PM2.5 into a single time-series tensor. Args: weather: Shape (N, T, 5) — [temp, humidity, wind_speed, wind_dir, precip] aqi_history: Shape (N, T, 1) — daily PM2.5 Returns: Combined array of shape (N, T, 6). """ assert weather.shape[0] == aqi_history.shape[0], \ f"Sample count mismatch: weather={weather.shape[0]}, aqi={aqi_history.shape[0]}" assert weather.shape[1] == aqi_history.shape[1], \ f"Timestep mismatch: weather={weather.shape[1]}, aqi={aqi_history.shape[1]}" combined = np.concatenate([weather, aqi_history], axis=2) logger.info(f"Combined time series: {combined.shape}") return combined def create_geographic_splits( num_samples: int, train_ratio: float = 0.70, val_ratio: float = 0.15, seed: int = 42, ) -> Dict[str, np.ndarray]: """ Create train/val/test split indices simulating geographic splitting. Instead of random splitting (which causes spatial data leakage), we assign contiguous blocks of indices to simulate geographic regions. Args: num_samples: Total number of samples. train_ratio: Fraction for training. val_ratio: Fraction for validation. seed: Random seed for reproducibility. Returns: Dict with 'train', 'val', 'test' arrays of indices. """ np.random.seed(seed) # Simulate geographic regions by dividing samples into blocks num_blocks = 10 block_size = num_samples // num_blocks indices = np.arange(num_samples) # Shuffle blocks (not individual samples) block_starts = np.arange(0, num_samples, max(block_size, 1)) np.random.shuffle(block_starts) # Reassemble shuffled blocks shuffled = [] for start in block_starts: end = min(start + block_size, num_samples) shuffled.extend(range(start, end)) shuffled = np.array(shuffled[:num_samples]) # Split by ratio n_train = int(num_samples * train_ratio) n_val = int(num_samples * val_ratio) splits = { "train": shuffled[:n_train], "val": shuffled[n_train:n_train + n_val], "test": shuffled[n_train + n_val:], } logger.info(f"Data splits — train: {len(splits['train'])}, " f"val: {len(splits['val'])}, test: {len(splits['test'])}") return splits def preprocess_pipeline( images: np.ndarray, weather: np.ndarray, aqi_history: np.ndarray, aqi_targets: np.ndarray, fire_masks: np.ndarray, save: bool = True, ) -> Dict: """ Complete preprocessing pipeline. 1. Apply cloud masking to images 2. Create geographic splits 3. Fit normalizers on training data only 4. Transform all splits 5. Combine weather + AQI into time-series tensor 6. Save processed data Returns: Dict with all processed arrays and fitted scalers. """ logger.info("=" * 60) logger.info("Starting preprocessing pipeline...") # Step 1: Cloud masking images = apply_cloud_mask(images) # Step 2: Geographic splits n = images.shape[0] splits = create_geographic_splits(n) # Step 3: Fit normalizers on training data train_idx = splits["train"] img_normalizer = ImageNormalizer() img_normalizer.fit(images[train_idx]) # Combine weather + AQI for time-series scaling timeseries = combine_weather_aqi(weather, aqi_history) ts_scaler = TimeSeriesScaler() ts_scaler.fit(timeseries[train_idx]) # Step 4: Transform all data images_norm = img_normalizer.transform(images) timeseries_norm = ts_scaler.transform(timeseries) # Step 5: Save processed data if save: np.save(IMAGE_PATCHES_DIR / "images_normalized.npy", images_norm) np.save(IMAGE_PATCHES_DIR / "fire_masks.npy", fire_masks) np.save(TIMESERIES_DIR / "timeseries_normalized.npy", timeseries_norm) np.save(TIMESERIES_DIR / "aqi_targets.npy", aqi_targets) for split_name, idx in splits.items(): np.save(PROCESSED_DIR.parent / "splits" / f"{split_name}_indices.npy", idx) img_normalizer.save(PROCESSED_DIR / "image_normalizer.npz") ts_scaler.save(PROCESSED_DIR / "timeseries_scaler.npz") logger.info(f"Saved all processed data to {PROCESSED_DIR}") logger.info("Preprocessing pipeline complete.") logger.info("=" * 60) return { "images": images_norm, "timeseries": timeseries_norm, "fire_masks": fire_masks, "aqi_targets": aqi_targets, "splits": splits, "img_normalizer": img_normalizer, "ts_scaler": ts_scaler, } if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Quick test with synthetic data from src.data.fetch_firms import generate_synthetic_fire_data from src.data.fetch_weather import generate_synthetic_weather from src.data.fetch_aqi import generate_synthetic_aqi n = 100 images, masks = generate_synthetic_fire_data(n) weather = generate_synthetic_weather(n) aqi_hist, aqi_tgt = generate_synthetic_aqi(n) result = preprocess_pipeline(images, weather, aqi_hist, aqi_tgt, masks) print(f"Images: {result['images'].shape}") print(f"Timeseries: {result['timeseries'].shape}") print(f"Fire masks: {result['fire_masks'].shape}") print(f"AQI targets: {result['aqi_targets'].shape}")