""" Disaster Predictors =================== One predictor class per disaster type, each wrapping a FuzzyNeuralNetwork. Each class defines: - Feature schema (what inputs are expected + their valid ranges) - Preprocessing (normalization, derived features) - Prediction with uncertainty - Risk tier classification (LOW / MODERATE / HIGH / CRITICAL) Disaster types: FloodPredictor → primary, supports lane-level output CyclonePredictor → wind/pressure/track based LandslidePredictor → terrain + rainfall based EarthquakePredictor → seismicity + soil based """ import torch import numpy as np from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple from enum import Enum import os from src.fuzzy_neural_network import FuzzyNeuralNetwork, load_model, save_model # ============================================================================ # SHARED TYPES # ============================================================================ class RiskTier(str, Enum): LOW = "LOW" # 0.00 – 0.25 MODERATE = "MODERATE" # 0.25 – 0.50 HIGH = "HIGH" # 0.50 – 0.75 CRITICAL = "CRITICAL" # 0.75 – 1.00 def score_to_tier(score: float) -> RiskTier: if score < 0.25: return RiskTier.LOW if score < 0.50: return RiskTier.MODERATE if score < 0.75: return RiskTier.HIGH return RiskTier.CRITICAL @dataclass class PredictionResult: risk_score: float # [0, 1] risk_tier: RiskTier uncertainty: float # std dev from MC dropout confidence_interval: Tuple[float, float] # (lower, upper) 95% feature_memberships: Optional[Dict[str, List[float]]] = None metadata: Dict = field(default_factory=dict) # ============================================================================ # FEATURE SCHEMAS # ============================================================================ # ── 1. Update FEATURE_RANGES to match exact column names ────────────────── FLOOD_FEATURES = [ "rainfall_mm", # 24-hour cumulative rainfall (mm) "elevation_m", # Terrain elevation (m above sea level) # Terrain slope (degrees) "soil_saturation_pct", # Soil moisture saturation (%) "dist_river", # Distance to nearest river (km) "drainage_capacity_index", # Drainage infrastructure quality [0–1] "flow_accumulation", # Historical flood frequency [0–1] "twi", # Normalized population density [0–1] ] CYCLONE_FEATURES = [ "wind_speed_kmh", # Maximum sustained wind speed (km/h) "central_pressure_hpa", # Central pressure (hPa) "sea_surface_temp_c", # SST in track region (°C) "track_curvature", # Track curvature index [0–1] "distance_to_coast_km", # Distance from eye to coast (km) "storm_surge_potential", # Estimated storm surge potential [0–1] "atmospheric_moisture", # Precipitable water [0–1 normalized] "shear_index", # Vertical wind shear index [0–1] ] LANDSLIDE_FEATURES = [ "slope_degrees", # Slope angle (degrees) "rainfall_intensity_mmh", # Rainfall intensity (mm/hour) "soil_type_index", # Soil cohesion index [0–1] (1=stable) "vegetation_cover_pct", # Vegetation cover percentage (%) "seismic_activity_index", # Recent seismic activity [0–1] "distance_to_fault_km", # Distance to nearest fault (km) "aspect_index", # Terrain aspect [0–1] "historical_landslide_freq",# Historical occurrence [0–1] ] EARTHQUAKE_FEATURES = [ "historical_seismicity", # Historical earthquake frequency [0–1] "distance_to_fault_km", # Distance to nearest active fault (km) "soil_liquefaction_index", # Soil liquefaction susceptibility [0–1] "focal_depth_km", # Typical focal depth (km) "tectonic_stress_index", # Regional tectonic stress [0–1] "building_vulnerability", # Local structural vulnerability [0–1] "population_density_norm", # Normalized population density [0–1] "bedrock_amplification", # Seismic amplification factor [0–1] ] # ── 2. Guard against missing FEATURE_RANGES entries ─────────────────────── def normalize_features(raw: Dict[str, float], feature_list: List[str]) -> np.ndarray: normalized = [] for feat in feature_list: val = raw[feat] if feat not in FEATURE_RANGES: raise KeyError( f"No range defined for '{feat}' — add it to FEATURE_RANGES" ) lo, hi = FEATURE_RANGES[feat] norm = np.clip((val - lo) / (hi - lo + 1e-8), 0.0, 1.0) normalized.append(norm) return np.array(normalized, dtype=np.float32) FEATURE_SCHEMAS = { "flood": FLOOD_FEATURES, "cyclone": CYCLONE_FEATURES, "landslide": LANDSLIDE_FEATURES, "earthquake": EARTHQUAKE_FEATURES, } # Valid input ranges for normalization — (min, max) # ── 1. Update FEATURE_RANGES to match exact column names ────────────────── FEATURE_RANGES = { # Flood "rainfall_mm": (0, 500), "elevation_m": (0, 3000), "soil_saturation_pct": (0, 100), "dist_river": (0, 50), "drainage_capacity_index": (0, 1), "flow_accumulation": (0, 1), "twi": (0, 20), # TWI typically 0–20 # Cyclone — keys match cyclone_tracks_clean column names exactly "wind_speed_kmh": (0, 350), "central_pressure_hpa": (870, 1013), "sea_surface_temp_c": (20, 35), "track_curvature": (0, 1), "distance_to_coast_km": (0, 500), "storm_surge_potential": (0, 1), "atmospheric_moisture": (0, 1), # normalised col from your CSV "shear_index": (0, 1), # normalised col from wind_shear CSV # Landslide "slope_degrees": (0, 90), "rainfall_intensity_mmh": (0, 200), "soil_type_index": (0, 1), "vegetation_cover_pct": (0, 100), "seismic_activity_index": (0, 1), "distance_to_fault_km": (0, 200), "aspect_index": (0, 1), "historical_landslide_freq": (0, 1), # Earthquake "historical_seismicity": (0, 1), "soil_liquefaction_index": (0, 1), "focal_depth_km": (0, 700), "tectonic_stress_index": (0, 1), "building_vulnerability": (0, 1), "population_density_norm": (0, 1), "bedrock_amplification": (0, 1), } # ============================================================================ # BASE PREDICTOR # ============================================================================ class BaseDisasterPredictor: """ Base class shared by all disaster predictors. Subclasses define disaster_type and feature_list. """ disaster_type: str = "" feature_list: List[str] = [] def __init__(self, model_dir: str = "models"): self.model_dir = model_dir self.model: Optional[FuzzyNeuralNetwork] = None self._try_load_model() def _model_path(self) -> str: return os.path.join(self.model_dir, f"fnn_{self.disaster_type}_model.pt") def _try_load_model(self): path = self._model_path() if os.path.exists(path): self.model, _ = load_model(path) self.model.eval() def is_ready(self) -> bool: return self.model is not None def _to_tensor(self, features: Dict[str, float]) -> torch.Tensor: arr = normalize_features(features, self.feature_list) return torch.tensor(arr, dtype=torch.float32).unsqueeze(0) def predict(self, features: Dict[str, float], n_mc_samples: int = 50) -> PredictionResult: if not self.is_ready(): raise RuntimeError( f"{self.disaster_type} model not loaded. Run train_model.py first." ) x = self._to_tensor(features) with torch.no_grad(): mean_pred, std_pred = self.model.predict_with_uncertainty(x, n_mc_samples) score = float(mean_pred.item()) uncertainty = float(std_pred.item()) ci_lower = max(0.0, score - 1.96 * uncertainty) ci_upper = min(1.0, score + 1.96 * uncertainty) # Get membership degrees for interpretability raw_memberships = self.model.get_membership_degrees(x) memberships = { feat: raw_memberships[i].tolist() for i, feat in enumerate(self.feature_list) } return PredictionResult( risk_score=round(score, 4), risk_tier=score_to_tier(score), uncertainty=round(uncertainty, 4), confidence_interval=(round(ci_lower, 4), round(ci_upper, 4)), feature_memberships=memberships, ) def predict_batch( self, feature_list: List[Dict[str, float]], n_mc_samples: int = 30 ) -> List[PredictionResult]: """Efficient batch prediction — used for lane-level flood mapping.""" if not self.is_ready(): raise RuntimeError(f"{self.disaster_type} model not loaded.") arrays = [normalize_features(f, self.feature_list) for f in feature_list] X = torch.tensor(np.stack(arrays), dtype=torch.float32) with torch.no_grad(): means, stds = self.model.predict_with_uncertainty(X, n_mc_samples) results = [] for i in range(len(feature_list)): score = float(means[i].item()) uncertainty = float(stds[i].item()) results.append(PredictionResult( risk_score=round(score, 4), risk_tier=score_to_tier(score), uncertainty=round(uncertainty, 4), confidence_interval=( round(max(0.0, score - 1.96 * uncertainty), 4), round(min(1.0, score + 1.96 * uncertainty), 4) ) )) return results def validate_input(self, features: Dict[str, float]) -> List[str]: """Returns list of validation errors, empty if valid.""" errors = [] for feat in self.feature_list: if feat not in features: errors.append(f"Missing feature: {feat}") else: lo, hi = FEATURE_RANGES.get(feat, (0, 1)) val = features[feat] if not (lo <= val <= hi): errors.append( f"Feature '{feat}' = {val} out of range [{lo}, {hi}]" ) return errors # ============================================================================ # CONCRETE PREDICTORS # ============================================================================ class FloodPredictor(BaseDisasterPredictor): """ Flood risk predictor. Primary use: lane-level flood risk map via predict_batch(). """ disaster_type = "flood" feature_list = FLOOD_FEATURES def predict_lane_risks( self, lane_features: List[Dict[str, float]] ) -> List[PredictionResult]: """ Predict flood risk for a list of road lanes/segments. Each dict in lane_features must contain all FLOOD_FEATURES. Returns one PredictionResult per lane — attach to GeoJSON in lane_flood_mapper. """ return self.predict_batch(lane_features, n_mc_samples=30) class CyclonePredictor(BaseDisasterPredictor): """ Cyclone impact risk predictor. Inputs describe storm characteristics and landfall context. """ disaster_type = "cyclone" feature_list = CYCLONE_FEATURES def predict_with_track( self, track_points: List[Dict[str, float]] ) -> List[PredictionResult]: """ Predict risk at each point along a cyclone track. Returns time-series of risk scores useful for animation. """ return self.predict_batch(track_points, n_mc_samples=30) class LandslidePredictor(BaseDisasterPredictor): """ Landslide susceptibility predictor. Inputs describe terrain and triggering conditions. """ disaster_type = "landslide" feature_list = LANDSLIDE_FEATURES class EarthquakePredictor(BaseDisasterPredictor): """ Earthquake impact risk predictor. Note: earthquakes are not directly predictable in time, but this models structural/spatial risk given seismic hazard inputs. """ disaster_type = "earthquake" feature_list = EARTHQUAKE_FEATURES # ============================================================================ # MULTI-HAZARD COMPOSITE # ============================================================================ DISASTER_WEIGHTS = { "flood": 0.35, "cyclone": 0.30, "landslide": 0.20, "earthquake": 0.15, } class MultiHazardPredictor: """ Combines all four predictors into a single composite risk score. Weights are domain-informed (adjustable via API parameter). """ def __init__(self, model_dir: str = "models"): self.predictors = { "flood": FloodPredictor(model_dir), "cyclone": CyclonePredictor(model_dir), "landslide": LandslidePredictor(model_dir), "earthquake": EarthquakePredictor(model_dir), } def ready_predictors(self) -> List[str]: return [k for k, v in self.predictors.items() if v.is_ready()] def predict_all( self, features_by_type: Dict[str, Dict[str, float]], weights: Optional[Dict[str, float]] = None, ) -> Dict: """ features_by_type: {"flood": {...}, "cyclone": {...}, ...} Returns per-type results + weighted composite score. """ weights = weights or DISASTER_WEIGHTS results = {} composite_score = 0.0 total_weight = 0.0 for disaster_type, features in features_by_type.items(): predictor = self.predictors.get(disaster_type) if predictor and predictor.is_ready(): result = predictor.predict(features) results[disaster_type] = result w = weights.get(disaster_type, 0.25) composite_score += result.risk_score * w total_weight += w if total_weight > 0: composite_score /= total_weight return { "composite_risk_score": round(composite_score, 4), "composite_risk_tier": score_to_tier(composite_score), "by_disaster": results, "active_predictors": list(results.keys()), }