cyclone-pred-api / src /disaster_predictors.py
clarindasusan's picture
Update src/disaster_predictors.py
172802e verified
"""
Disaster Predictors
===================
One predictor class per disaster type, each wrapping a FuzzyNeuralNetwork.
Each class defines:
- Feature schema (what inputs are expected + their valid ranges)
- Preprocessing (normalization, derived features)
- Prediction with uncertainty
- Risk tier classification (LOW / MODERATE / HIGH / CRITICAL)
Disaster types:
FloodPredictor β†’ primary, supports lane-level output
CyclonePredictor β†’ wind/pressure/track based
LandslidePredictor β†’ terrain + rainfall based
EarthquakePredictor β†’ seismicity + soil based
"""
import torch
import numpy as np
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from enum import Enum
import os
from src.fuzzy_neural_network import FuzzyNeuralNetwork, load_model, save_model
# ============================================================================
# SHARED TYPES
# ============================================================================
class RiskTier(str, Enum):
LOW = "LOW" # 0.00 – 0.25
MODERATE = "MODERATE" # 0.25 – 0.50
HIGH = "HIGH" # 0.50 – 0.75
CRITICAL = "CRITICAL" # 0.75 – 1.00
def score_to_tier(score: float) -> RiskTier:
if score < 0.25: return RiskTier.LOW
if score < 0.50: return RiskTier.MODERATE
if score < 0.75: return RiskTier.HIGH
return RiskTier.CRITICAL
@dataclass
class PredictionResult:
risk_score: float # [0, 1]
risk_tier: RiskTier
uncertainty: float # std dev from MC dropout
confidence_interval: Tuple[float, float] # (lower, upper) 95%
feature_memberships: Optional[Dict[str, List[float]]] = None
metadata: Dict = field(default_factory=dict)
# ============================================================================
# FEATURE SCHEMAS
# ============================================================================
# ── 1. Update FEATURE_RANGES to match exact column names ──────────────────
FLOOD_FEATURES = [
"rainfall_mm", # 24-hour cumulative rainfall (mm)
"elevation_m", # Terrain elevation (m above sea level)
# Terrain slope (degrees)
"soil_saturation_pct", # Soil moisture saturation (%)
"dist_river", # Distance to nearest river (km)
"drainage_capacity_index", # Drainage infrastructure quality [0–1]
"flow_accumulation", # Historical flood frequency [0–1]
"twi", # Normalized population density [0–1]
]
CYCLONE_FEATURES = [
"wind_speed_kmh", # Maximum sustained wind speed (km/h)
"central_pressure_hpa", # Central pressure (hPa)
"sea_surface_temp_c", # SST in track region (Β°C)
"track_curvature", # Track curvature index [0–1]
"distance_to_coast_km", # Distance from eye to coast (km)
"storm_surge_potential", # Estimated storm surge potential [0–1]
"atmospheric_moisture", # Precipitable water [0–1 normalized]
"shear_index", # Vertical wind shear index [0–1]
]
LANDSLIDE_FEATURES = [
"slope_degrees", # Slope angle (degrees)
"rainfall_intensity_mmh", # Rainfall intensity (mm/hour)
"soil_type_index", # Soil cohesion index [0–1] (1=stable)
"vegetation_cover_pct", # Vegetation cover percentage (%)
"seismic_activity_index", # Recent seismic activity [0–1]
"distance_to_fault_km", # Distance to nearest fault (km)
"aspect_index", # Terrain aspect [0–1]
"historical_landslide_freq",# Historical occurrence [0–1]
]
EARTHQUAKE_FEATURES = [
"historical_seismicity", # Historical earthquake frequency [0–1]
"distance_to_fault_km", # Distance to nearest active fault (km)
"soil_liquefaction_index", # Soil liquefaction susceptibility [0–1]
"focal_depth_km", # Typical focal depth (km)
"tectonic_stress_index", # Regional tectonic stress [0–1]
"building_vulnerability", # Local structural vulnerability [0–1]
"population_density_norm", # Normalized population density [0–1]
"bedrock_amplification", # Seismic amplification factor [0–1]
]
# ── 2. Guard against missing FEATURE_RANGES entries ───────────────────────
def normalize_features(raw: Dict[str, float], feature_list: List[str]) -> np.ndarray:
normalized = []
for feat in feature_list:
val = raw[feat]
if feat not in FEATURE_RANGES:
raise KeyError(
f"No range defined for '{feat}' β€” add it to FEATURE_RANGES"
)
lo, hi = FEATURE_RANGES[feat]
norm = np.clip((val - lo) / (hi - lo + 1e-8), 0.0, 1.0)
normalized.append(norm)
return np.array(normalized, dtype=np.float32)
FEATURE_SCHEMAS = {
"flood": FLOOD_FEATURES,
"cyclone": CYCLONE_FEATURES,
"landslide": LANDSLIDE_FEATURES,
"earthquake": EARTHQUAKE_FEATURES,
}
# Valid input ranges for normalization β€” (min, max)
# ── 1. Update FEATURE_RANGES to match exact column names ──────────────────
FEATURE_RANGES = {
# Flood
"rainfall_mm": (0, 500),
"elevation_m": (0, 3000),
"soil_saturation_pct": (0, 100),
"dist_river": (0, 50),
"drainage_capacity_index": (0, 1),
"flow_accumulation": (0, 1),
"twi": (0, 20), # TWI typically 0–20
# Cyclone β€” keys match cyclone_tracks_clean column names exactly
"wind_speed_kmh": (0, 350),
"central_pressure_hpa": (870, 1013),
"sea_surface_temp_c": (20, 35),
"track_curvature": (0, 1),
"distance_to_coast_km": (0, 500),
"storm_surge_potential": (0, 1),
"atmospheric_moisture": (0, 1), # normalised col from your CSV
"shear_index": (0, 1), # normalised col from wind_shear CSV
# Landslide
"slope_degrees": (0, 90),
"rainfall_intensity_mmh": (0, 200),
"soil_type_index": (0, 1),
"vegetation_cover_pct": (0, 100),
"seismic_activity_index": (0, 1),
"distance_to_fault_km": (0, 200),
"aspect_index": (0, 1),
"historical_landslide_freq": (0, 1),
# Earthquake
"historical_seismicity": (0, 1),
"soil_liquefaction_index": (0, 1),
"focal_depth_km": (0, 700),
"tectonic_stress_index": (0, 1),
"building_vulnerability": (0, 1),
"population_density_norm": (0, 1),
"bedrock_amplification": (0, 1),
}
# ============================================================================
# BASE PREDICTOR
# ============================================================================
class BaseDisasterPredictor:
"""
Base class shared by all disaster predictors.
Subclasses define disaster_type and feature_list.
"""
disaster_type: str = ""
feature_list: List[str] = []
def __init__(self, model_dir: str = "models"):
self.model_dir = model_dir
self.model: Optional[FuzzyNeuralNetwork] = None
self._try_load_model()
def _model_path(self) -> str:
return os.path.join(self.model_dir, f"fnn_{self.disaster_type}_model.pt")
def _try_load_model(self):
path = self._model_path()
if os.path.exists(path):
self.model, _ = load_model(path)
self.model.eval()
def is_ready(self) -> bool:
return self.model is not None
def _to_tensor(self, features: Dict[str, float]) -> torch.Tensor:
arr = normalize_features(features, self.feature_list)
return torch.tensor(arr, dtype=torch.float32).unsqueeze(0)
def predict(self, features: Dict[str, float], n_mc_samples: int = 50) -> PredictionResult:
if not self.is_ready():
raise RuntimeError(
f"{self.disaster_type} model not loaded. Run train_model.py first."
)
x = self._to_tensor(features)
with torch.no_grad():
mean_pred, std_pred = self.model.predict_with_uncertainty(x, n_mc_samples)
score = float(mean_pred.item())
uncertainty = float(std_pred.item())
ci_lower = max(0.0, score - 1.96 * uncertainty)
ci_upper = min(1.0, score + 1.96 * uncertainty)
# Get membership degrees for interpretability
raw_memberships = self.model.get_membership_degrees(x)
memberships = {
feat: raw_memberships[i].tolist()
for i, feat in enumerate(self.feature_list)
}
return PredictionResult(
risk_score=round(score, 4),
risk_tier=score_to_tier(score),
uncertainty=round(uncertainty, 4),
confidence_interval=(round(ci_lower, 4), round(ci_upper, 4)),
feature_memberships=memberships,
)
def predict_batch(
self,
feature_list: List[Dict[str, float]],
n_mc_samples: int = 30
) -> List[PredictionResult]:
"""Efficient batch prediction β€” used for lane-level flood mapping."""
if not self.is_ready():
raise RuntimeError(f"{self.disaster_type} model not loaded.")
arrays = [normalize_features(f, self.feature_list) for f in feature_list]
X = torch.tensor(np.stack(arrays), dtype=torch.float32)
with torch.no_grad():
means, stds = self.model.predict_with_uncertainty(X, n_mc_samples)
results = []
for i in range(len(feature_list)):
score = float(means[i].item())
uncertainty = float(stds[i].item())
results.append(PredictionResult(
risk_score=round(score, 4),
risk_tier=score_to_tier(score),
uncertainty=round(uncertainty, 4),
confidence_interval=(
round(max(0.0, score - 1.96 * uncertainty), 4),
round(min(1.0, score + 1.96 * uncertainty), 4)
)
))
return results
def validate_input(self, features: Dict[str, float]) -> List[str]:
"""Returns list of validation errors, empty if valid."""
errors = []
for feat in self.feature_list:
if feat not in features:
errors.append(f"Missing feature: {feat}")
else:
lo, hi = FEATURE_RANGES.get(feat, (0, 1))
val = features[feat]
if not (lo <= val <= hi):
errors.append(
f"Feature '{feat}' = {val} out of range [{lo}, {hi}]"
)
return errors
# ============================================================================
# CONCRETE PREDICTORS
# ============================================================================
class FloodPredictor(BaseDisasterPredictor):
"""
Flood risk predictor.
Primary use: lane-level flood risk map via predict_batch().
"""
disaster_type = "flood"
feature_list = FLOOD_FEATURES
def predict_lane_risks(
self,
lane_features: List[Dict[str, float]]
) -> List[PredictionResult]:
"""
Predict flood risk for a list of road lanes/segments.
Each dict in lane_features must contain all FLOOD_FEATURES.
Returns one PredictionResult per lane β€” attach to GeoJSON in lane_flood_mapper.
"""
return self.predict_batch(lane_features, n_mc_samples=30)
class CyclonePredictor(BaseDisasterPredictor):
"""
Cyclone impact risk predictor.
Inputs describe storm characteristics and landfall context.
"""
disaster_type = "cyclone"
feature_list = CYCLONE_FEATURES
def predict_with_track(
self,
track_points: List[Dict[str, float]]
) -> List[PredictionResult]:
"""
Predict risk at each point along a cyclone track.
Returns time-series of risk scores useful for animation.
"""
return self.predict_batch(track_points, n_mc_samples=30)
class LandslidePredictor(BaseDisasterPredictor):
"""
Landslide susceptibility predictor.
Inputs describe terrain and triggering conditions.
"""
disaster_type = "landslide"
feature_list = LANDSLIDE_FEATURES
class EarthquakePredictor(BaseDisasterPredictor):
"""
Earthquake impact risk predictor.
Note: earthquakes are not directly predictable in time, but
this models structural/spatial risk given seismic hazard inputs.
"""
disaster_type = "earthquake"
feature_list = EARTHQUAKE_FEATURES
# ============================================================================
# MULTI-HAZARD COMPOSITE
# ============================================================================
DISASTER_WEIGHTS = {
"flood": 0.35,
"cyclone": 0.30,
"landslide": 0.20,
"earthquake": 0.15,
}
class MultiHazardPredictor:
"""
Combines all four predictors into a single composite risk score.
Weights are domain-informed (adjustable via API parameter).
"""
def __init__(self, model_dir: str = "models"):
self.predictors = {
"flood": FloodPredictor(model_dir),
"cyclone": CyclonePredictor(model_dir),
"landslide": LandslidePredictor(model_dir),
"earthquake": EarthquakePredictor(model_dir),
}
def ready_predictors(self) -> List[str]:
return [k for k, v in self.predictors.items() if v.is_ready()]
def predict_all(
self,
features_by_type: Dict[str, Dict[str, float]],
weights: Optional[Dict[str, float]] = None,
) -> Dict:
"""
features_by_type: {"flood": {...}, "cyclone": {...}, ...}
Returns per-type results + weighted composite score.
"""
weights = weights or DISASTER_WEIGHTS
results = {}
composite_score = 0.0
total_weight = 0.0
for disaster_type, features in features_by_type.items():
predictor = self.predictors.get(disaster_type)
if predictor and predictor.is_ready():
result = predictor.predict(features)
results[disaster_type] = result
w = weights.get(disaster_type, 0.25)
composite_score += result.risk_score * w
total_weight += w
if total_weight > 0:
composite_score /= total_weight
return {
"composite_risk_score": round(composite_score, 4),
"composite_risk_tier": score_to_tier(composite_score),
"by_disaster": results,
"active_predictors": list(results.keys()),
}