krupal02's picture
Deploy Multi-Hazard Warning System - MTL model for wildfire risk + AQI forecasting
d5b0af1
Raw
History Blame Contribute Delete
11.6 kB
"""
preprocess.py — Data normalization, cloud masking, and patching utilities.
Handles:
- Per-channel mean/std normalization for satellite images
- MinMaxScaler on time-series (fit on train split only)
- Cloud masking via NDVI threshold
- Saving as .npy for fast DataLoader access
"""
import logging
from pathlib import Path
from typing import Dict, Optional, Tuple
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from src.training.config import (
PATCH_SIZE, IMAGE_CHANNELS, TIMESERIES_WINDOW, TIMESERIES_FEATURES,
IMAGE_PATCHES_DIR, TIMESERIES_DIR, PROCESSED_DIR,
)
logger = logging.getLogger(__name__)
class ImageNormalizer:
"""
Per-channel mean/std normalization for 4-channel satellite images.
Fit on training split only, then apply to val/test to prevent data leakage.
"""
def __init__(self):
self.channel_means: Optional[np.ndarray] = None
self.channel_stds: Optional[np.ndarray] = None
def fit(self, images: np.ndarray) -> "ImageNormalizer":
"""
Compute per-channel statistics from training images.
Args:
images: Shape (N, C, H, W).
"""
# Compute mean and std over all spatial positions and samples
self.channel_means = images.mean(axis=(0, 2, 3)) # (C,)
self.channel_stds = images.std(axis=(0, 2, 3)) # (C,)
# Avoid division by zero
self.channel_stds = np.where(
self.channel_stds < 1e-8, 1.0, self.channel_stds
)
logger.info(f"Image normalizer fit — means: {self.channel_means}, "
f"stds: {self.channel_stds}")
return self
def transform(self, images: np.ndarray) -> np.ndarray:
"""Normalize images using fitted statistics."""
if self.channel_means is None:
raise RuntimeError("ImageNormalizer not fitted. Call .fit() first.")
normalized = images.copy()
for c in range(images.shape[1]):
normalized[:, c] = (
(images[:, c] - self.channel_means[c]) / self.channel_stds[c]
)
return normalized
def fit_transform(self, images: np.ndarray) -> np.ndarray:
"""Fit and transform in one step."""
return self.fit(images).transform(images)
def save(self, path: Path) -> None:
"""Save normalizer statistics."""
np.savez(path, means=self.channel_means, stds=self.channel_stds)
def load(self, path: Path) -> "ImageNormalizer":
"""Load normalizer statistics."""
data = np.load(path)
self.channel_means = data["means"]
self.channel_stds = data["stds"]
return self
class TimeSeriesScaler:
"""
MinMaxScaler for weather + AQI time series data.
Fit on training split only to prevent data leakage.
"""
def __init__(self):
self.scaler = MinMaxScaler()
self._is_fitted = False
def fit(self, data: np.ndarray) -> "TimeSeriesScaler":
"""
Fit scaler on training data.
Args:
data: Shape (N, T, F) — N samples, T timesteps, F features.
"""
# Reshape to 2D for sklearn: (N*T, F)
n, t, f = data.shape
flat = data.reshape(-1, f)
self.scaler.fit(flat)
self._is_fitted = True
logger.info(f"TimeSeriesScaler fit on {n} samples with {f} features.")
return self
def transform(self, data: np.ndarray) -> np.ndarray:
"""Scale time series to [0, 1]."""
if not self._is_fitted:
raise RuntimeError("TimeSeriesScaler not fitted. Call .fit() first.")
n, t, f = data.shape
flat = data.reshape(-1, f)
scaled = self.scaler.transform(flat)
return scaled.reshape(n, t, f).astype(np.float32)
def fit_transform(self, data: np.ndarray) -> np.ndarray:
"""Fit and transform in one step."""
return self.fit(data).transform(data)
def inverse_transform(self, data: np.ndarray) -> np.ndarray:
"""Inverse scale back to original range."""
n, t, f = data.shape
flat = data.reshape(-1, f)
original = self.scaler.inverse_transform(flat)
return original.reshape(n, t, f).astype(np.float32)
def save(self, path: Path) -> None:
"""Save scaler parameters."""
np.savez(
path,
data_min=self.scaler.data_min_,
data_max=self.scaler.data_max_,
scale=self.scaler.scale_,
min_val=self.scaler.min_,
)
def load(self, path: Path) -> "TimeSeriesScaler":
"""Load scaler parameters."""
data = np.load(path)
self.scaler.data_min_ = data["data_min"]
self.scaler.data_max_ = data["data_max"]
self.scaler.scale_ = data["scale"]
self.scaler.min_ = data["min_val"]
self.scaler.n_features_in_ = len(data["data_min"])
self._is_fitted = True
return self
def apply_cloud_mask(
images: np.ndarray,
ndvi_threshold: float = 0.1,
) -> np.ndarray:
"""
Apply cloud masking using NDVI threshold.
Pixels where NDVI is below the threshold (likely cloud or snow)
are zeroed out. NDVI = (NIR - Red) / (NIR + Red + epsilon).
Args:
images: Shape (N, 4, H, W) — channels [R, G, B, NIR].
ndvi_threshold: NDVI values below this are masked.
Returns:
Cloud-masked images.
"""
red = images[:, 0, :, :] # Red channel
nir = images[:, 3, :, :] # NIR channel
# NDVI calculation with epsilon for numerical stability
epsilon = 1e-8
ndvi = (nir - red) / (nir + red + epsilon)
# Create cloud mask: True where likely cloud (low NDVI)
cloud_mask = ndvi < ndvi_threshold # (N, H, W)
# Apply mask to all channels
masked = images.copy()
for c in range(images.shape[1]):
masked[:, c][cloud_mask] = 0.0
num_masked = cloud_mask.sum()
total_pixels = cloud_mask.size
logger.info(f"Cloud masking: {num_masked}/{total_pixels} pixels masked "
f"({100 * num_masked / total_pixels:.1f}%)")
return masked
def combine_weather_aqi(
weather: np.ndarray,
aqi_history: np.ndarray,
) -> np.ndarray:
"""
Combine weather features with PM2.5 into a single time-series tensor.
Args:
weather: Shape (N, T, 5) — [temp, humidity, wind_speed, wind_dir, precip]
aqi_history: Shape (N, T, 1) — daily PM2.5
Returns:
Combined array of shape (N, T, 6).
"""
assert weather.shape[0] == aqi_history.shape[0], \
f"Sample count mismatch: weather={weather.shape[0]}, aqi={aqi_history.shape[0]}"
assert weather.shape[1] == aqi_history.shape[1], \
f"Timestep mismatch: weather={weather.shape[1]}, aqi={aqi_history.shape[1]}"
combined = np.concatenate([weather, aqi_history], axis=2)
logger.info(f"Combined time series: {combined.shape}")
return combined
def create_geographic_splits(
num_samples: int,
train_ratio: float = 0.70,
val_ratio: float = 0.15,
seed: int = 42,
) -> Dict[str, np.ndarray]:
"""
Create train/val/test split indices simulating geographic splitting.
Instead of random splitting (which causes spatial data leakage),
we assign contiguous blocks of indices to simulate geographic regions.
Args:
num_samples: Total number of samples.
train_ratio: Fraction for training.
val_ratio: Fraction for validation.
seed: Random seed for reproducibility.
Returns:
Dict with 'train', 'val', 'test' arrays of indices.
"""
np.random.seed(seed)
# Simulate geographic regions by dividing samples into blocks
num_blocks = 10
block_size = num_samples // num_blocks
indices = np.arange(num_samples)
# Shuffle blocks (not individual samples)
block_starts = np.arange(0, num_samples, max(block_size, 1))
np.random.shuffle(block_starts)
# Reassemble shuffled blocks
shuffled = []
for start in block_starts:
end = min(start + block_size, num_samples)
shuffled.extend(range(start, end))
shuffled = np.array(shuffled[:num_samples])
# Split by ratio
n_train = int(num_samples * train_ratio)
n_val = int(num_samples * val_ratio)
splits = {
"train": shuffled[:n_train],
"val": shuffled[n_train:n_train + n_val],
"test": shuffled[n_train + n_val:],
}
logger.info(f"Data splits — train: {len(splits['train'])}, "
f"val: {len(splits['val'])}, test: {len(splits['test'])}")
return splits
def preprocess_pipeline(
images: np.ndarray,
weather: np.ndarray,
aqi_history: np.ndarray,
aqi_targets: np.ndarray,
fire_masks: np.ndarray,
save: bool = True,
) -> Dict:
"""
Complete preprocessing pipeline.
1. Apply cloud masking to images
2. Create geographic splits
3. Fit normalizers on training data only
4. Transform all splits
5. Combine weather + AQI into time-series tensor
6. Save processed data
Returns:
Dict with all processed arrays and fitted scalers.
"""
logger.info("=" * 60)
logger.info("Starting preprocessing pipeline...")
# Step 1: Cloud masking
images = apply_cloud_mask(images)
# Step 2: Geographic splits
n = images.shape[0]
splits = create_geographic_splits(n)
# Step 3: Fit normalizers on training data
train_idx = splits["train"]
img_normalizer = ImageNormalizer()
img_normalizer.fit(images[train_idx])
# Combine weather + AQI for time-series scaling
timeseries = combine_weather_aqi(weather, aqi_history)
ts_scaler = TimeSeriesScaler()
ts_scaler.fit(timeseries[train_idx])
# Step 4: Transform all data
images_norm = img_normalizer.transform(images)
timeseries_norm = ts_scaler.transform(timeseries)
# Step 5: Save processed data
if save:
np.save(IMAGE_PATCHES_DIR / "images_normalized.npy", images_norm)
np.save(IMAGE_PATCHES_DIR / "fire_masks.npy", fire_masks)
np.save(TIMESERIES_DIR / "timeseries_normalized.npy", timeseries_norm)
np.save(TIMESERIES_DIR / "aqi_targets.npy", aqi_targets)
for split_name, idx in splits.items():
np.save(PROCESSED_DIR.parent / "splits" / f"{split_name}_indices.npy", idx)
img_normalizer.save(PROCESSED_DIR / "image_normalizer.npz")
ts_scaler.save(PROCESSED_DIR / "timeseries_scaler.npz")
logger.info(f"Saved all processed data to {PROCESSED_DIR}")
logger.info("Preprocessing pipeline complete.")
logger.info("=" * 60)
return {
"images": images_norm,
"timeseries": timeseries_norm,
"fire_masks": fire_masks,
"aqi_targets": aqi_targets,
"splits": splits,
"img_normalizer": img_normalizer,
"ts_scaler": ts_scaler,
}
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# Quick test with synthetic data
from src.data.fetch_firms import generate_synthetic_fire_data
from src.data.fetch_weather import generate_synthetic_weather
from src.data.fetch_aqi import generate_synthetic_aqi
n = 100
images, masks = generate_synthetic_fire_data(n)
weather = generate_synthetic_weather(n)
aqi_hist, aqi_tgt = generate_synthetic_aqi(n)
result = preprocess_pipeline(images, weather, aqi_hist, aqi_tgt, masks)
print(f"Images: {result['images'].shape}")
print(f"Timeseries: {result['timeseries'].shape}")
print(f"Fire masks: {result['fire_masks'].shape}")
print(f"AQI targets: {result['aqi_targets'].shape}")