Spaces:
Sleeping
Sleeping
| """ | |
| Disaster Predictors | |
| =================== | |
| One predictor class per disaster type, each wrapping a FuzzyNeuralNetwork. | |
| Each class defines: | |
| - Feature schema (what inputs are expected + their valid ranges) | |
| - Preprocessing (normalization, derived features) | |
| - Prediction with uncertainty | |
| - Risk tier classification (LOW / MODERATE / HIGH / CRITICAL) | |
| Disaster types: | |
| FloodPredictor β primary, supports lane-level output | |
| CyclonePredictor β wind/pressure/track based | |
| LandslidePredictor β terrain + rainfall based | |
| EarthquakePredictor β seismicity + soil based | |
| """ | |
| import torch | |
| import numpy as np | |
| from dataclasses import dataclass, field | |
| from typing import Dict, List, Optional, Tuple | |
| from enum import Enum | |
| import os | |
| from src.fuzzy_neural_network import FuzzyNeuralNetwork, load_model, save_model | |
| # ============================================================================ | |
| # SHARED TYPES | |
| # ============================================================================ | |
| class RiskTier(str, Enum): | |
| LOW = "LOW" # 0.00 β 0.25 | |
| MODERATE = "MODERATE" # 0.25 β 0.50 | |
| HIGH = "HIGH" # 0.50 β 0.75 | |
| CRITICAL = "CRITICAL" # 0.75 β 1.00 | |
| def score_to_tier(score: float) -> RiskTier: | |
| if score < 0.25: return RiskTier.LOW | |
| if score < 0.50: return RiskTier.MODERATE | |
| if score < 0.75: return RiskTier.HIGH | |
| return RiskTier.CRITICAL | |
| class PredictionResult: | |
| risk_score: float # [0, 1] | |
| risk_tier: RiskTier | |
| uncertainty: float # std dev from MC dropout | |
| confidence_interval: Tuple[float, float] # (lower, upper) 95% | |
| feature_memberships: Optional[Dict[str, List[float]]] = None | |
| metadata: Dict = field(default_factory=dict) | |
| # ============================================================================ | |
| # FEATURE SCHEMAS | |
| # ============================================================================ | |
| # ββ 1. Update FEATURE_RANGES to match exact column names ββββββββββββββββββ | |
| FLOOD_FEATURES = [ | |
| "rainfall_mm", # 24-hour cumulative rainfall (mm) | |
| "elevation_m", # Terrain elevation (m above sea level) | |
| # Terrain slope (degrees) | |
| "soil_saturation_pct", # Soil moisture saturation (%) | |
| "dist_river", # Distance to nearest river (km) | |
| "drainage_capacity_index", # Drainage infrastructure quality [0β1] | |
| "flow_accumulation", # Historical flood frequency [0β1] | |
| "twi", # Normalized population density [0β1] | |
| ] | |
| CYCLONE_FEATURES = [ | |
| "wind_speed_kmh", # Maximum sustained wind speed (km/h) | |
| "central_pressure_hpa", # Central pressure (hPa) | |
| "sea_surface_temp_c", # SST in track region (Β°C) | |
| "track_curvature", # Track curvature index [0β1] | |
| "distance_to_coast_km", # Distance from eye to coast (km) | |
| "storm_surge_potential", # Estimated storm surge potential [0β1] | |
| "atmospheric_moisture", # Precipitable water [0β1 normalized] | |
| "shear_index", # Vertical wind shear index [0β1] | |
| ] | |
| LANDSLIDE_FEATURES = [ | |
| "slope_degrees", # Slope angle (degrees) | |
| "rainfall_intensity_mmh", # Rainfall intensity (mm/hour) | |
| "soil_type_index", # Soil cohesion index [0β1] (1=stable) | |
| "vegetation_cover_pct", # Vegetation cover percentage (%) | |
| "seismic_activity_index", # Recent seismic activity [0β1] | |
| "distance_to_fault_km", # Distance to nearest fault (km) | |
| "aspect_index", # Terrain aspect [0β1] | |
| "historical_landslide_freq",# Historical occurrence [0β1] | |
| ] | |
| EARTHQUAKE_FEATURES = [ | |
| "historical_seismicity", # Historical earthquake frequency [0β1] | |
| "distance_to_fault_km", # Distance to nearest active fault (km) | |
| "soil_liquefaction_index", # Soil liquefaction susceptibility [0β1] | |
| "focal_depth_km", # Typical focal depth (km) | |
| "tectonic_stress_index", # Regional tectonic stress [0β1] | |
| "building_vulnerability", # Local structural vulnerability [0β1] | |
| "population_density_norm", # Normalized population density [0β1] | |
| "bedrock_amplification", # Seismic amplification factor [0β1] | |
| ] | |
| # ββ 2. Guard against missing FEATURE_RANGES entries βββββββββββββββββββββββ | |
| def normalize_features(raw: Dict[str, float], feature_list: List[str]) -> np.ndarray: | |
| normalized = [] | |
| for feat in feature_list: | |
| val = raw[feat] | |
| if feat not in FEATURE_RANGES: | |
| raise KeyError( | |
| f"No range defined for '{feat}' β add it to FEATURE_RANGES" | |
| ) | |
| lo, hi = FEATURE_RANGES[feat] | |
| norm = np.clip((val - lo) / (hi - lo + 1e-8), 0.0, 1.0) | |
| normalized.append(norm) | |
| return np.array(normalized, dtype=np.float32) | |
| FEATURE_SCHEMAS = { | |
| "flood": FLOOD_FEATURES, | |
| "cyclone": CYCLONE_FEATURES, | |
| "landslide": LANDSLIDE_FEATURES, | |
| "earthquake": EARTHQUAKE_FEATURES, | |
| } | |
| # Valid input ranges for normalization β (min, max) | |
| # ββ 1. Update FEATURE_RANGES to match exact column names ββββββββββββββββββ | |
| FEATURE_RANGES = { | |
| # Flood | |
| "rainfall_mm": (0, 500), | |
| "elevation_m": (0, 3000), | |
| "soil_saturation_pct": (0, 100), | |
| "dist_river": (0, 50), | |
| "drainage_capacity_index": (0, 1), | |
| "flow_accumulation": (0, 1), | |
| "twi": (0, 20), # TWI typically 0β20 | |
| # Cyclone β keys match cyclone_tracks_clean column names exactly | |
| "wind_speed_kmh": (0, 350), | |
| "central_pressure_hpa": (870, 1013), | |
| "sea_surface_temp_c": (20, 35), | |
| "track_curvature": (0, 1), | |
| "distance_to_coast_km": (0, 500), | |
| "storm_surge_potential": (0, 1), | |
| "atmospheric_moisture": (0, 1), # normalised col from your CSV | |
| "shear_index": (0, 1), # normalised col from wind_shear CSV | |
| # Landslide | |
| "slope_degrees": (0, 90), | |
| "rainfall_intensity_mmh": (0, 200), | |
| "soil_type_index": (0, 1), | |
| "vegetation_cover_pct": (0, 100), | |
| "seismic_activity_index": (0, 1), | |
| "distance_to_fault_km": (0, 200), | |
| "aspect_index": (0, 1), | |
| "historical_landslide_freq": (0, 1), | |
| # Earthquake | |
| "historical_seismicity": (0, 1), | |
| "soil_liquefaction_index": (0, 1), | |
| "focal_depth_km": (0, 700), | |
| "tectonic_stress_index": (0, 1), | |
| "building_vulnerability": (0, 1), | |
| "population_density_norm": (0, 1), | |
| "bedrock_amplification": (0, 1), | |
| } | |
| # ============================================================================ | |
| # BASE PREDICTOR | |
| # ============================================================================ | |
| class BaseDisasterPredictor: | |
| """ | |
| Base class shared by all disaster predictors. | |
| Subclasses define disaster_type and feature_list. | |
| """ | |
| disaster_type: str = "" | |
| feature_list: List[str] = [] | |
| def __init__(self, model_dir: str = "models"): | |
| self.model_dir = model_dir | |
| self.model: Optional[FuzzyNeuralNetwork] = None | |
| self._try_load_model() | |
| def _model_path(self) -> str: | |
| return os.path.join(self.model_dir, f"fnn_{self.disaster_type}_model.pt") | |
| def _try_load_model(self): | |
| path = self._model_path() | |
| if os.path.exists(path): | |
| self.model, _ = load_model(path) | |
| self.model.eval() | |
| def is_ready(self) -> bool: | |
| return self.model is not None | |
| def _to_tensor(self, features: Dict[str, float]) -> torch.Tensor: | |
| arr = normalize_features(features, self.feature_list) | |
| return torch.tensor(arr, dtype=torch.float32).unsqueeze(0) | |
| def predict(self, features: Dict[str, float], n_mc_samples: int = 50) -> PredictionResult: | |
| if not self.is_ready(): | |
| raise RuntimeError( | |
| f"{self.disaster_type} model not loaded. Run train_model.py first." | |
| ) | |
| x = self._to_tensor(features) | |
| with torch.no_grad(): | |
| mean_pred, std_pred = self.model.predict_with_uncertainty(x, n_mc_samples) | |
| score = float(mean_pred.item()) | |
| uncertainty = float(std_pred.item()) | |
| ci_lower = max(0.0, score - 1.96 * uncertainty) | |
| ci_upper = min(1.0, score + 1.96 * uncertainty) | |
| # Get membership degrees for interpretability | |
| raw_memberships = self.model.get_membership_degrees(x) | |
| memberships = { | |
| feat: raw_memberships[i].tolist() | |
| for i, feat in enumerate(self.feature_list) | |
| } | |
| return PredictionResult( | |
| risk_score=round(score, 4), | |
| risk_tier=score_to_tier(score), | |
| uncertainty=round(uncertainty, 4), | |
| confidence_interval=(round(ci_lower, 4), round(ci_upper, 4)), | |
| feature_memberships=memberships, | |
| ) | |
| def predict_batch( | |
| self, | |
| feature_list: List[Dict[str, float]], | |
| n_mc_samples: int = 30 | |
| ) -> List[PredictionResult]: | |
| """Efficient batch prediction β used for lane-level flood mapping.""" | |
| if not self.is_ready(): | |
| raise RuntimeError(f"{self.disaster_type} model not loaded.") | |
| arrays = [normalize_features(f, self.feature_list) for f in feature_list] | |
| X = torch.tensor(np.stack(arrays), dtype=torch.float32) | |
| with torch.no_grad(): | |
| means, stds = self.model.predict_with_uncertainty(X, n_mc_samples) | |
| results = [] | |
| for i in range(len(feature_list)): | |
| score = float(means[i].item()) | |
| uncertainty = float(stds[i].item()) | |
| results.append(PredictionResult( | |
| risk_score=round(score, 4), | |
| risk_tier=score_to_tier(score), | |
| uncertainty=round(uncertainty, 4), | |
| confidence_interval=( | |
| round(max(0.0, score - 1.96 * uncertainty), 4), | |
| round(min(1.0, score + 1.96 * uncertainty), 4) | |
| ) | |
| )) | |
| return results | |
| def validate_input(self, features: Dict[str, float]) -> List[str]: | |
| """Returns list of validation errors, empty if valid.""" | |
| errors = [] | |
| for feat in self.feature_list: | |
| if feat not in features: | |
| errors.append(f"Missing feature: {feat}") | |
| else: | |
| lo, hi = FEATURE_RANGES.get(feat, (0, 1)) | |
| val = features[feat] | |
| if not (lo <= val <= hi): | |
| errors.append( | |
| f"Feature '{feat}' = {val} out of range [{lo}, {hi}]" | |
| ) | |
| return errors | |
| # ============================================================================ | |
| # CONCRETE PREDICTORS | |
| # ============================================================================ | |
| class FloodPredictor(BaseDisasterPredictor): | |
| """ | |
| Flood risk predictor. | |
| Primary use: lane-level flood risk map via predict_batch(). | |
| """ | |
| disaster_type = "flood" | |
| feature_list = FLOOD_FEATURES | |
| def predict_lane_risks( | |
| self, | |
| lane_features: List[Dict[str, float]] | |
| ) -> List[PredictionResult]: | |
| """ | |
| Predict flood risk for a list of road lanes/segments. | |
| Each dict in lane_features must contain all FLOOD_FEATURES. | |
| Returns one PredictionResult per lane β attach to GeoJSON in lane_flood_mapper. | |
| """ | |
| return self.predict_batch(lane_features, n_mc_samples=30) | |
| class CyclonePredictor(BaseDisasterPredictor): | |
| """ | |
| Cyclone impact risk predictor. | |
| Inputs describe storm characteristics and landfall context. | |
| """ | |
| disaster_type = "cyclone" | |
| feature_list = CYCLONE_FEATURES | |
| def predict_with_track( | |
| self, | |
| track_points: List[Dict[str, float]] | |
| ) -> List[PredictionResult]: | |
| """ | |
| Predict risk at each point along a cyclone track. | |
| Returns time-series of risk scores useful for animation. | |
| """ | |
| return self.predict_batch(track_points, n_mc_samples=30) | |
| class LandslidePredictor(BaseDisasterPredictor): | |
| """ | |
| Landslide susceptibility predictor. | |
| Inputs describe terrain and triggering conditions. | |
| """ | |
| disaster_type = "landslide" | |
| feature_list = LANDSLIDE_FEATURES | |
| class EarthquakePredictor(BaseDisasterPredictor): | |
| """ | |
| Earthquake impact risk predictor. | |
| Note: earthquakes are not directly predictable in time, but | |
| this models structural/spatial risk given seismic hazard inputs. | |
| """ | |
| disaster_type = "earthquake" | |
| feature_list = EARTHQUAKE_FEATURES | |
| # ============================================================================ | |
| # MULTI-HAZARD COMPOSITE | |
| # ============================================================================ | |
| DISASTER_WEIGHTS = { | |
| "flood": 0.35, | |
| "cyclone": 0.30, | |
| "landslide": 0.20, | |
| "earthquake": 0.15, | |
| } | |
| class MultiHazardPredictor: | |
| """ | |
| Combines all four predictors into a single composite risk score. | |
| Weights are domain-informed (adjustable via API parameter). | |
| """ | |
| def __init__(self, model_dir: str = "models"): | |
| self.predictors = { | |
| "flood": FloodPredictor(model_dir), | |
| "cyclone": CyclonePredictor(model_dir), | |
| "landslide": LandslidePredictor(model_dir), | |
| "earthquake": EarthquakePredictor(model_dir), | |
| } | |
| def ready_predictors(self) -> List[str]: | |
| return [k for k, v in self.predictors.items() if v.is_ready()] | |
| def predict_all( | |
| self, | |
| features_by_type: Dict[str, Dict[str, float]], | |
| weights: Optional[Dict[str, float]] = None, | |
| ) -> Dict: | |
| """ | |
| features_by_type: {"flood": {...}, "cyclone": {...}, ...} | |
| Returns per-type results + weighted composite score. | |
| """ | |
| weights = weights or DISASTER_WEIGHTS | |
| results = {} | |
| composite_score = 0.0 | |
| total_weight = 0.0 | |
| for disaster_type, features in features_by_type.items(): | |
| predictor = self.predictors.get(disaster_type) | |
| if predictor and predictor.is_ready(): | |
| result = predictor.predict(features) | |
| results[disaster_type] = result | |
| w = weights.get(disaster_type, 0.25) | |
| composite_score += result.risk_score * w | |
| total_weight += w | |
| if total_weight > 0: | |
| composite_score /= total_weight | |
| return { | |
| "composite_risk_score": round(composite_score, 4), | |
| "composite_risk_tier": score_to_tier(composite_score), | |
| "by_disaster": results, | |
| "active_predictors": list(results.keys()), | |
| } |