| """ |
| preprocess.py — Data normalization, cloud masking, and patching utilities. |
| |
| Handles: |
| - Per-channel mean/std normalization for satellite images |
| - MinMaxScaler on time-series (fit on train split only) |
| - Cloud masking via NDVI threshold |
| - Saving as .npy for fast DataLoader access |
| """ |
|
|
| import logging |
| from pathlib import Path |
| from typing import Dict, Optional, Tuple |
|
|
| import numpy as np |
| from sklearn.preprocessing import MinMaxScaler |
|
|
| from src.training.config import ( |
| PATCH_SIZE, IMAGE_CHANNELS, TIMESERIES_WINDOW, TIMESERIES_FEATURES, |
| IMAGE_PATCHES_DIR, TIMESERIES_DIR, PROCESSED_DIR, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class ImageNormalizer: |
| """ |
| Per-channel mean/std normalization for 4-channel satellite images. |
| |
| Fit on training split only, then apply to val/test to prevent data leakage. |
| """ |
|
|
| def __init__(self): |
| self.channel_means: Optional[np.ndarray] = None |
| self.channel_stds: Optional[np.ndarray] = None |
|
|
| def fit(self, images: np.ndarray) -> "ImageNormalizer": |
| """ |
| Compute per-channel statistics from training images. |
| |
| Args: |
| images: Shape (N, C, H, W). |
| """ |
| |
| self.channel_means = images.mean(axis=(0, 2, 3)) |
| self.channel_stds = images.std(axis=(0, 2, 3)) |
|
|
| |
| self.channel_stds = np.where( |
| self.channel_stds < 1e-8, 1.0, self.channel_stds |
| ) |
|
|
| logger.info(f"Image normalizer fit — means: {self.channel_means}, " |
| f"stds: {self.channel_stds}") |
| return self |
|
|
| def transform(self, images: np.ndarray) -> np.ndarray: |
| """Normalize images using fitted statistics.""" |
| if self.channel_means is None: |
| raise RuntimeError("ImageNormalizer not fitted. Call .fit() first.") |
|
|
| normalized = images.copy() |
| for c in range(images.shape[1]): |
| normalized[:, c] = ( |
| (images[:, c] - self.channel_means[c]) / self.channel_stds[c] |
| ) |
| return normalized |
|
|
| def fit_transform(self, images: np.ndarray) -> np.ndarray: |
| """Fit and transform in one step.""" |
| return self.fit(images).transform(images) |
|
|
| def save(self, path: Path) -> None: |
| """Save normalizer statistics.""" |
| np.savez(path, means=self.channel_means, stds=self.channel_stds) |
|
|
| def load(self, path: Path) -> "ImageNormalizer": |
| """Load normalizer statistics.""" |
| data = np.load(path) |
| self.channel_means = data["means"] |
| self.channel_stds = data["stds"] |
| return self |
|
|
|
|
| class TimeSeriesScaler: |
| """ |
| MinMaxScaler for weather + AQI time series data. |
| |
| Fit on training split only to prevent data leakage. |
| """ |
|
|
| def __init__(self): |
| self.scaler = MinMaxScaler() |
| self._is_fitted = False |
|
|
| def fit(self, data: np.ndarray) -> "TimeSeriesScaler": |
| """ |
| Fit scaler on training data. |
| |
| Args: |
| data: Shape (N, T, F) — N samples, T timesteps, F features. |
| """ |
| |
| n, t, f = data.shape |
| flat = data.reshape(-1, f) |
| self.scaler.fit(flat) |
| self._is_fitted = True |
| logger.info(f"TimeSeriesScaler fit on {n} samples with {f} features.") |
| return self |
|
|
| def transform(self, data: np.ndarray) -> np.ndarray: |
| """Scale time series to [0, 1].""" |
| if not self._is_fitted: |
| raise RuntimeError("TimeSeriesScaler not fitted. Call .fit() first.") |
|
|
| n, t, f = data.shape |
| flat = data.reshape(-1, f) |
| scaled = self.scaler.transform(flat) |
| return scaled.reshape(n, t, f).astype(np.float32) |
|
|
| def fit_transform(self, data: np.ndarray) -> np.ndarray: |
| """Fit and transform in one step.""" |
| return self.fit(data).transform(data) |
|
|
| def inverse_transform(self, data: np.ndarray) -> np.ndarray: |
| """Inverse scale back to original range.""" |
| n, t, f = data.shape |
| flat = data.reshape(-1, f) |
| original = self.scaler.inverse_transform(flat) |
| return original.reshape(n, t, f).astype(np.float32) |
|
|
| def save(self, path: Path) -> None: |
| """Save scaler parameters.""" |
| np.savez( |
| path, |
| data_min=self.scaler.data_min_, |
| data_max=self.scaler.data_max_, |
| scale=self.scaler.scale_, |
| min_val=self.scaler.min_, |
| ) |
|
|
| def load(self, path: Path) -> "TimeSeriesScaler": |
| """Load scaler parameters.""" |
| data = np.load(path) |
| self.scaler.data_min_ = data["data_min"] |
| self.scaler.data_max_ = data["data_max"] |
| self.scaler.scale_ = data["scale"] |
| self.scaler.min_ = data["min_val"] |
| self.scaler.n_features_in_ = len(data["data_min"]) |
| self._is_fitted = True |
| return self |
|
|
|
|
| def apply_cloud_mask( |
| images: np.ndarray, |
| ndvi_threshold: float = 0.1, |
| ) -> np.ndarray: |
| """ |
| Apply cloud masking using NDVI threshold. |
| |
| Pixels where NDVI is below the threshold (likely cloud or snow) |
| are zeroed out. NDVI = (NIR - Red) / (NIR + Red + epsilon). |
| |
| Args: |
| images: Shape (N, 4, H, W) — channels [R, G, B, NIR]. |
| ndvi_threshold: NDVI values below this are masked. |
| |
| Returns: |
| Cloud-masked images. |
| """ |
| red = images[:, 0, :, :] |
| nir = images[:, 3, :, :] |
|
|
| |
| epsilon = 1e-8 |
| ndvi = (nir - red) / (nir + red + epsilon) |
|
|
| |
| cloud_mask = ndvi < ndvi_threshold |
|
|
| |
| masked = images.copy() |
| for c in range(images.shape[1]): |
| masked[:, c][cloud_mask] = 0.0 |
|
|
| num_masked = cloud_mask.sum() |
| total_pixels = cloud_mask.size |
| logger.info(f"Cloud masking: {num_masked}/{total_pixels} pixels masked " |
| f"({100 * num_masked / total_pixels:.1f}%)") |
|
|
| return masked |
|
|
|
|
| def combine_weather_aqi( |
| weather: np.ndarray, |
| aqi_history: np.ndarray, |
| ) -> np.ndarray: |
| """ |
| Combine weather features with PM2.5 into a single time-series tensor. |
| |
| Args: |
| weather: Shape (N, T, 5) — [temp, humidity, wind_speed, wind_dir, precip] |
| aqi_history: Shape (N, T, 1) — daily PM2.5 |
| |
| Returns: |
| Combined array of shape (N, T, 6). |
| """ |
| assert weather.shape[0] == aqi_history.shape[0], \ |
| f"Sample count mismatch: weather={weather.shape[0]}, aqi={aqi_history.shape[0]}" |
| assert weather.shape[1] == aqi_history.shape[1], \ |
| f"Timestep mismatch: weather={weather.shape[1]}, aqi={aqi_history.shape[1]}" |
|
|
| combined = np.concatenate([weather, aqi_history], axis=2) |
| logger.info(f"Combined time series: {combined.shape}") |
| return combined |
|
|
|
|
| def create_geographic_splits( |
| num_samples: int, |
| train_ratio: float = 0.70, |
| val_ratio: float = 0.15, |
| seed: int = 42, |
| ) -> Dict[str, np.ndarray]: |
| """ |
| Create train/val/test split indices simulating geographic splitting. |
| |
| Instead of random splitting (which causes spatial data leakage), |
| we assign contiguous blocks of indices to simulate geographic regions. |
| |
| Args: |
| num_samples: Total number of samples. |
| train_ratio: Fraction for training. |
| val_ratio: Fraction for validation. |
| seed: Random seed for reproducibility. |
| |
| Returns: |
| Dict with 'train', 'val', 'test' arrays of indices. |
| """ |
| np.random.seed(seed) |
|
|
| |
| num_blocks = 10 |
| block_size = num_samples // num_blocks |
| indices = np.arange(num_samples) |
|
|
| |
| block_starts = np.arange(0, num_samples, max(block_size, 1)) |
| np.random.shuffle(block_starts) |
|
|
| |
| shuffled = [] |
| for start in block_starts: |
| end = min(start + block_size, num_samples) |
| shuffled.extend(range(start, end)) |
| shuffled = np.array(shuffled[:num_samples]) |
|
|
| |
| n_train = int(num_samples * train_ratio) |
| n_val = int(num_samples * val_ratio) |
|
|
| splits = { |
| "train": shuffled[:n_train], |
| "val": shuffled[n_train:n_train + n_val], |
| "test": shuffled[n_train + n_val:], |
| } |
|
|
| logger.info(f"Data splits — train: {len(splits['train'])}, " |
| f"val: {len(splits['val'])}, test: {len(splits['test'])}") |
|
|
| return splits |
|
|
|
|
| def preprocess_pipeline( |
| images: np.ndarray, |
| weather: np.ndarray, |
| aqi_history: np.ndarray, |
| aqi_targets: np.ndarray, |
| fire_masks: np.ndarray, |
| save: bool = True, |
| ) -> Dict: |
| """ |
| Complete preprocessing pipeline. |
| |
| 1. Apply cloud masking to images |
| 2. Create geographic splits |
| 3. Fit normalizers on training data only |
| 4. Transform all splits |
| 5. Combine weather + AQI into time-series tensor |
| 6. Save processed data |
| |
| Returns: |
| Dict with all processed arrays and fitted scalers. |
| """ |
| logger.info("=" * 60) |
| logger.info("Starting preprocessing pipeline...") |
|
|
| |
| images = apply_cloud_mask(images) |
|
|
| |
| n = images.shape[0] |
| splits = create_geographic_splits(n) |
|
|
| |
| train_idx = splits["train"] |
|
|
| img_normalizer = ImageNormalizer() |
| img_normalizer.fit(images[train_idx]) |
|
|
| |
| timeseries = combine_weather_aqi(weather, aqi_history) |
|
|
| ts_scaler = TimeSeriesScaler() |
| ts_scaler.fit(timeseries[train_idx]) |
|
|
| |
| images_norm = img_normalizer.transform(images) |
| timeseries_norm = ts_scaler.transform(timeseries) |
|
|
| |
| if save: |
| np.save(IMAGE_PATCHES_DIR / "images_normalized.npy", images_norm) |
| np.save(IMAGE_PATCHES_DIR / "fire_masks.npy", fire_masks) |
| np.save(TIMESERIES_DIR / "timeseries_normalized.npy", timeseries_norm) |
| np.save(TIMESERIES_DIR / "aqi_targets.npy", aqi_targets) |
|
|
| for split_name, idx in splits.items(): |
| np.save(PROCESSED_DIR.parent / "splits" / f"{split_name}_indices.npy", idx) |
|
|
| img_normalizer.save(PROCESSED_DIR / "image_normalizer.npz") |
| ts_scaler.save(PROCESSED_DIR / "timeseries_scaler.npz") |
|
|
| logger.info(f"Saved all processed data to {PROCESSED_DIR}") |
|
|
| logger.info("Preprocessing pipeline complete.") |
| logger.info("=" * 60) |
|
|
| return { |
| "images": images_norm, |
| "timeseries": timeseries_norm, |
| "fire_masks": fire_masks, |
| "aqi_targets": aqi_targets, |
| "splits": splits, |
| "img_normalizer": img_normalizer, |
| "ts_scaler": ts_scaler, |
| } |
|
|
|
|
| if __name__ == "__main__": |
| logging.basicConfig(level=logging.INFO) |
|
|
| |
| from src.data.fetch_firms import generate_synthetic_fire_data |
| from src.data.fetch_weather import generate_synthetic_weather |
| from src.data.fetch_aqi import generate_synthetic_aqi |
|
|
| n = 100 |
| images, masks = generate_synthetic_fire_data(n) |
| weather = generate_synthetic_weather(n) |
| aqi_hist, aqi_tgt = generate_synthetic_aqi(n) |
|
|
| result = preprocess_pipeline(images, weather, aqi_hist, aqi_tgt, masks) |
| print(f"Images: {result['images'].shape}") |
| print(f"Timeseries: {result['timeseries'].shape}") |
| print(f"Fire masks: {result['fire_masks'].shape}") |
| print(f"AQI targets: {result['aqi_targets'].shape}") |
|
|