""" Disaster Risk Prediction & Resource Allocation API =================================================== FastAPI backend exposing: Prediction Endpoints: POST /predict/flood → lane-level or zone-level flood risk POST /predict/cyclone → cyclone impact risk POST /predict/landslide → landslide susceptibility POST /predict/earthquake → earthquake structural risk POST /predict/all → multi-hazard composite score Flood Map Endpoints: POST /map/flood/features → GeoJSON risk map from explicit feature input POST /map/flood/osm → GeoJSON risk map auto-fetched from OpenStreetMap POST /map/flood/geojson → GeoJSON risk map from uploaded road GeoJSON Allocation Endpoints: POST /allocate/auto → Hungarian-optimal auto allocation POST /allocate/manual → Manual team → task assignment POST /allocate/reset → Reset all allocations GET /allocate/summary → Current allocation state Team & Task Management: POST /teams → Register a team GET /teams → List all teams POST /tasks → Register a task GET /tasks → List all tasks Utilities: GET /health → Health check + model status GET /features/{disaster} → Feature schema for a disaster type POST /predict/flood/explain → Fuzzy membership interpretation """ from fastapi import FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from typing import Dict, List, Optional, Tuple, Any import os from src.disaster_predictors import ( FloodPredictor, CyclonePredictor, LandslidePredictor, EarthquakePredictor, MultiHazardPredictor, FEATURE_SCHEMAS, PredictionResult, RiskTier ) from src.lane_flood_mapper import LaneFloodMapper from src.allocation import ( AllocationEngine, FieldTeam, Task, TaskStatus, TEAMS, TASKS, ALLOCATIONS, get_allocation_summary, reset_all_allocations, initialize_default_teams ) from src.live_data_fetcher import IMDCycloneDataFetcher, CycloneFeatureEngineer from src.live_data_fetcher import ( IMDCycloneDataFetcher, CycloneFeatureEngineer, IMDFloodDataFetcher # ← add this ) # In api.py — update the imports at the top from datetime import datetime from pydantic import BaseModel from typing import Optional, Dict import math # ============================================================================ # APP SETUP # ============================================================================ app = FastAPI( title="Disaster Risk Prediction & Resource Allocation API", description="FNN-based multi-hazard risk prediction with lane-level flood mapping and optimal resource allocation", version="2.0.0" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) MODEL_DIR = os.getenv("MODEL_DIR", "models") # ============================================================================ # MODEL SINGLETONS (loaded once at startup) # ============================================================================ flood_predictor = FloodPredictor(MODEL_DIR) cyclone_predictor = CyclonePredictor(MODEL_DIR) landslide_predictor = LandslidePredictor(MODEL_DIR) earthquake_predictor = EarthquakePredictor(MODEL_DIR) multi_hazard = MultiHazardPredictor(MODEL_DIR) lane_mapper = LaneFloodMapper(flood_predictor) imd_fetcher = IMDCycloneDataFetcher() cyclone_engineer = CycloneFeatureEngineer() flood_fetcher = IMDFloodDataFetcher() # ── Replace the existing FloodFeatures block and add the rest ────────────── class FloodFeatures(BaseModel): rainfall_mm: float = Field(..., example=120.0, description="24h cumulative rainfall (mm)") elevation_m: float = Field(..., example=45.0, description="Terrain elevation (m)") soil_saturation_pct: float = Field(..., example=75.0, description="Soil moisture saturation (%)") dist_river: float = Field(..., example=1.2, description="Distance to nearest river (km)") drainage_capacity_index: float = Field(..., example=0.4, description="Drainage quality [0–1]") flow_accumulation: float = Field(..., example=0.6, description="Flow accumulation index [0–1]") twi: float = Field(..., example=8.5, description="Topographic wetness index [0–20]") class CycloneFeatures(BaseModel): wind_speed_kmh: float = Field(..., example=140.0, description="Max sustained wind speed (km/h)") central_pressure_hpa: float = Field(..., example=965.0, description="Central pressure (hPa)") sea_surface_temp_c: float = Field(..., example=29.5, description="Sea surface temperature (°C)") track_curvature: float = Field(..., example=0.3, description="Track curvature index [0–1]") distance_to_coast_km: float = Field(..., example=120.0, description="Distance from eye to coast (km)") storm_surge_potential: float = Field(..., example=0.6, description="Storm surge potential [0–1]") atmospheric_moisture: float = Field(..., example=0.7, description="Precipitable water normalised [0–1]") shear_index: float = Field(..., example=0.2, description="Vertical wind shear index [0–1]") class LandslideFeatures(BaseModel): slope_degrees: float = Field(..., example=32.0, description="Slope angle (degrees)") rainfall_intensity_mmh: float = Field(..., example=45.0, description="Rainfall intensity (mm/hr)") soil_type_index: float = Field(..., example=0.4, description="Soil cohesion index [0–1]") vegetation_cover_pct: float = Field(..., example=30.0, description="Vegetation cover (%)") seismic_activity_index: float = Field(..., example=0.2, description="Recent seismic activity [0–1]") distance_to_fault_km: float = Field(..., example=15.0, description="Distance to nearest fault (km)") aspect_index: float = Field(..., example=0.5, description="Terrain aspect [0–1]") historical_landslide_freq: float = Field(..., example=0.3, description="Historical occurrence [0–1]") class EarthquakeFeatures(BaseModel): historical_seismicity: float = Field(..., example=0.6, description="Historical earthquake frequency [0–1]") distance_to_fault_km: float = Field(..., example=8.0, description="Distance to nearest active fault (km)") soil_liquefaction_index: float = Field(..., example=0.5, description="Liquefaction susceptibility [0–1]") focal_depth_km: float = Field(..., example=12.0, description="Typical focal depth (km)") tectonic_stress_index: float = Field(..., example=0.4, description="Regional tectonic stress [0–1]") building_vulnerability: float = Field(..., example=0.6, description="Structural vulnerability [0–1]") population_density_norm: float = Field(..., example=0.5, description="Normalised population density [0–1]") bedrock_amplification: float = Field(..., example=0.3, description="Seismic amplification factor [0–1]") # ============================================================================ # REQUEST / RESPONSE SCHEMAS # ============================================================================ class PredictionRequest(BaseModel): features: Dict[str, float] n_mc_samples: int = 50 class FloodPredictionRequest(BaseModel): features: FloodFeatures = Field( ..., description="Flood feature values. See GET /features/flood" ) n_mc_samples: int = Field( default=50, ge=10, le=200, description="Monte Carlo dropout samples for uncertainty estimation" ) # ── Replace the existing FloodPredictionRequest and add the rest ─────────── class CyclonePredictionRequest(BaseModel): features: CycloneFeatures n_mc_samples: int = Field(default=50, ge=10, le=200) class LandslidePredictionRequest(BaseModel): features: LandslideFeatures n_mc_samples: int = Field(default=50, ge=10, le=200) class EarthquakePredictionRequest(BaseModel): features: EarthquakeFeatures n_mc_samples: int = Field(default=50, ge=10, le=200) class MultiHazardRequest(BaseModel): features_by_type: Dict[str, Dict[str, float]] = Field( ..., example={ "flood": {"rainfall_mm": 120.0, "elevation_m": 45.0, "soil_saturation_pct": 75.0, "dist_river": 1.2, "drainage_capacity_index": 0.4, "flow_accumulation": 0.6, "twi": 8.5}, "cyclone": {"wind_speed_kmh": 140.0, "central_pressure_hpa": 965.0, "sea_surface_temp_c": 29.5, "track_curvature": 0.3, "distance_to_coast_km": 120.0, "storm_surge_potential": 0.6, "atmospheric_moisture": 0.7, "shear_index": 0.2}, } ) weights: Optional[Dict[str, float]] = Field( default=None, example={"flood": 0.5, "cyclone": 0.3, "landslide": 0.2} ) n_mc_samples: int = 30 class LaneFeaturesRequest(BaseModel): segments: List[Dict[str, Any]] = Field( ..., description="""List of segments, each containing: segment_id (str), road_name (str, optional), road_type (str, optional), coordinates [[lat,lon],...], features: {flood feature dict}""" ) class OSMMapRequest(BaseModel): bbox: Tuple[float, float, float, float] = Field( ..., description="Bounding box (south, west, north, east)" ) base_features: Dict[str, float] = Field( ..., description="Zone-level flood features applied to all road segments" ) segment_overrides: Optional[List[Dict]] = Field( default=None, description="Per-segment feature overrides: [{segment_id, features}]" ) class GeoJSONMapRequest(BaseModel): geojson: Dict = Field(..., description="GeoJSON FeatureCollection of road segments") feature_mapping: Dict[str, str] = Field( ..., description='{"flood_feature_name": "geojson_property_name"}' ) class AutoAllocateRequest(BaseModel): strategy: str = Field( default="balanced", description="priority_based | proximity_based | balanced" ) optimize_routes: bool = True priority_weight: float = Field( default=0.5, ge=0.0, le=1.0, description="Weight of priority vs proximity in 'balanced' strategy" ) class ManualAllocateRequest(BaseModel): team_assignments: Dict[str, List[str]] = Field( ..., description="{team_id: [task_id, ...]}" ) optimize_routes: bool = True respect_capacity: bool = True def prediction_result_to_dict(result: PredictionResult) -> dict: return { "risk_score": result.risk_score, "risk_tier": result.risk_tier.value, "uncertainty": result.uncertainty, "confidence_interval": { "lower": result.confidence_interval[0], "upper": result.confidence_interval[1] }, "feature_memberships": result.feature_memberships, } class EvacuationRequest(BaseModel): latitude: float = Field(..., ge=5.0, le=37.0, example=19.0760) longitude: float = Field(..., ge=68.0, le=97.0, example=72.8777) flood_features: Optional[Dict[str, float]] = Field( None, description="Optional — if not provided, features are auto-derived from lat/lon" ) n_mc_samples: int = Field(50, ge=1, le=200) SAFE_ZONES_DB = { "Mumbai": [ {"name": "NDRF Camp — Bandra Kurla Complex", "lat": 19.0596, "lon": 72.8656, "capacity": 1000}, {"name": "Elevated Shelter — Sion", "lat": 19.0390, "lon": 72.8619, "capacity": 800}, {"name": "Relief Centre — Powai", "lat": 19.1197, "lon": 72.9058, "capacity": 600}, {"name": "Higher Ground — Vikhroli", "lat": 19.1086, "lon": 72.9300, "capacity": 500}, ], "Chennai": [ {"name": "Government Relief Camp — Tambaram", "lat": 12.9249, "lon": 80.1000, "capacity": 500}, {"name": "Flood Shelter — Anna Nagar", "lat": 13.0850, "lon": 80.2101, "capacity": 800}, {"name": "NDRF Base — Sholinganallur", "lat": 12.9010, "lon": 80.2279, "capacity": 300}, {"name": "Community Hall — Velachery", "lat": 12.9815, "lon": 80.2180, "capacity": 400}, {"name": "Higher Ground — Guindy", "lat": 13.0067, "lon": 80.2206, "capacity": 600}, ], "Kolkata": [ {"name": "Relief Camp — Salt Lake", "lat": 22.5800, "lon": 88.4100, "capacity": 700}, {"name": "Elevated Shelter — Ballygunge", "lat": 22.5262, "lon": 88.3639, "capacity": 500}, {"name": "NDRF Base — Ultadanga", "lat": 22.5900, "lon": 88.3900, "capacity": 600}, ], "Delhi": [ {"name": "Relief Camp — Pragati Maidan", "lat": 28.6187, "lon": 77.2410, "capacity": 2000}, {"name": "Flood Shelter — Yamuna Sports Complex","lat": 28.6448, "lon": 77.2637, "capacity": 1000}, {"name": "Higher Ground — Saket", "lat": 28.5244, "lon": 77.2066, "capacity": 800}, ], "Hyderabad": [ {"name": "Relief Camp — Hitec City", "lat": 17.4435, "lon": 78.3772, "capacity": 600}, {"name": "Shelter — Secunderabad", "lat": 17.4399, "lon": 78.4983, "capacity": 500}, {"name": "Higher Ground — Banjara Hills", "lat": 17.4156, "lon": 78.4347, "capacity": 400}, ], "Bangalore": [ {"name": "Relief Camp — Whitefield", "lat": 12.9698, "lon": 77.7500, "capacity": 600}, {"name": "Shelter — Koramangala", "lat": 12.9279, "lon": 77.6271, "capacity": 500}, {"name": "NDRF Base — Hebbal", "lat": 13.0350, "lon": 77.5970, "capacity": 400}, ], "Bhubaneswar": [ {"name": "Cyclone Shelter — Chandrasekharpur","lat": 20.3200, "lon": 85.8100, "capacity": 800}, {"name": "Relief Camp — Patia", "lat": 20.3500, "lon": 85.8200, "capacity": 600}, {"name": "Higher Ground — Nayapalli", "lat": 20.2800, "lon": 85.8100, "capacity": 500}, ], "Patna": [ {"name": "Flood Relief Camp — Gandhi Maidan", "lat": 25.6069, "lon": 85.1348, "capacity": 1500}, {"name": "Elevated Shelter — Kankarbagh", "lat": 25.5900, "lon": 85.1500, "capacity": 700}, {"name": "NDRF Base — Danapur", "lat": 25.6200, "lon": 85.0500, "capacity": 500}, ], "Guwahati": [ {"name": "Flood Shelter — Dispur", "lat": 26.1433, "lon": 91.7898, "capacity": 600}, {"name": "Relief Camp — Jalukbari", "lat": 26.1600, "lon": 91.6900, "capacity": 500}, {"name": "Higher Ground — Bhangagarh", "lat": 26.1800, "lon": 91.7500, "capacity": 400}, ], "Kochi": [ {"name": "Relief Camp — Kakkanad", "lat": 10.0159, "lon": 76.3419, "capacity": 700}, {"name": "Flood Shelter — Aluva", "lat": 10.1004, "lon": 76.3570, "capacity": 600}, {"name": "Higher Ground — Edapally", "lat": 10.0261, "lon": 76.3083, "capacity": 500}, ], } def _haversine(la1: float, lo1: float, la2: float, lo2: float) -> float: R = 6371 dlat = math.radians(la2 - la1) dlon = math.radians(lo2 - lo1) a = (math.sin(dlat / 2) ** 2 + math.cos(math.radians(la1)) * math.cos(math.radians(la2)) * math.sin(dlon / 2) ** 2) return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) def _nearest_city(lat: float, lon: float) -> str: """Find which monitored city is closest to the given coordinates.""" CITY_COORDS = { "Mumbai": (19.0760, 72.8777), "Chennai": (13.0827, 80.2707), "Kolkata": (22.5726, 88.3639), "Delhi": (28.6139, 77.2090), "Hyderabad": (17.3850, 78.4867), "Bangalore": (12.9716, 77.5946), "Bhubaneswar": (20.2961, 85.8245), "Patna": (25.5941, 85.1376), "Guwahati": (26.1445, 91.7362), "Kochi": ( 9.9312, 76.2673), } return min(CITY_COORDS, key=lambda c: _haversine(lat, lon, *CITY_COORDS[c])) def get_rainfall_for_location(lat: float, lon: float) -> float: """ Get live rainfall for the nearest monitored city. Falls back to 0.0 if IMD APIs are unavailable. """ city = _nearest_city(lat, lon) station_id = { "Mumbai": "42941", "Chennai": "43279", "Kolkata": "42809", "Delhi": "42182", "Hyderabad": "43128", "Bangalore": "43295", "Bhubaneswar": "42971", "Patna": "42492", "Guwahati": "42410", "Kochi": "43371", }.get(city, "42182") obs = flood_fetcher.fetch_city_rainfall(city, station_id) return float(obs.get("rainfall_mm") or 0.0) def get_elevation_for_location(lat: float, lon: float) -> float: """ Return elevation for nearest city from static dataset. """ CITY_ELEVATION = { "Mumbai": 11, "Chennai": 6, "Kolkata": 9, "Delhi": 216, "Hyderabad": 542, "Bangalore": 920, "Bhubaneswar": 45, "Patna": 53, "Guwahati": 55, "Kochi": 3, } city = _nearest_city(lat, lon) return float(CITY_ELEVATION.get(city, 50)) def get_static_features_for_location(lat: float, lon: float) -> dict: """ Return all static flood features for the nearest monitored city. """ CITY_STATIC = { "Mumbai": {"elevation_m": 11, "drainage_capacity_index": 0.35, "flow_accumulation": 0.70, "twi": 12.0, "dist_river": 0.8}, "Chennai": {"elevation_m": 6, "drainage_capacity_index": 0.40, "flow_accumulation": 0.65, "twi": 11.5, "dist_river": 1.2}, "Kolkata": {"elevation_m": 9, "drainage_capacity_index": 0.30, "flow_accumulation": 0.75, "twi": 13.0, "dist_river": 0.5}, "Delhi": {"elevation_m": 216, "drainage_capacity_index": 0.50, "flow_accumulation": 0.45, "twi": 8.5, "dist_river": 2.0}, "Hyderabad": {"elevation_m": 542, "drainage_capacity_index": 0.55, "flow_accumulation": 0.35, "twi": 7.0, "dist_river": 3.5}, "Bangalore": {"elevation_m": 920, "drainage_capacity_index": 0.60, "flow_accumulation": 0.30, "twi": 6.5, "dist_river": 4.0}, "Bhubaneswar": {"elevation_m": 45, "drainage_capacity_index": 0.38, "flow_accumulation": 0.60, "twi": 10.5, "dist_river": 1.5}, "Patna": {"elevation_m": 53, "drainage_capacity_index": 0.28, "flow_accumulation": 0.80, "twi": 14.0, "dist_river": 0.4}, "Guwahati": {"elevation_m": 55, "drainage_capacity_index": 0.32, "flow_accumulation": 0.72, "twi": 13.5, "dist_river": 0.6}, "Kochi": {"elevation_m": 3, "drainage_capacity_index": 0.33, "flow_accumulation": 0.78, "twi": 14.5, "dist_river": 0.3}, } city = _nearest_city(lat, lon) return CITY_STATIC.get(city, { "elevation_m": 50, "drainage_capacity_index": 0.5, "flow_accumulation": 0.5, "twi": 8.0, "dist_river": 2.0, }) def get_nearest_safe_zones(lat: float, lon: float) -> list: """Return safe zones sorted by distance, pulling from nearest city's list.""" city = _nearest_city(lat, lon) zones = SAFE_ZONES_DB.get(city, SAFE_ZONES_DB["Chennai"]) result = [] for zone in zones: z = dict(zone) z["distance_km"] = round(_haversine(lat, lon, z["lat"], z["lon"]), 2) result.append(z) return sorted(result, key=lambda z: z["distance_km"]) def dijkstra_route(start_lat, start_lon, end_lat, end_lon) -> dict: """ Simplified Dijkstra — generates a stepped waypoint path between start and end with intermediate safe waypoints avoiding low ground. Real implementation would use OSMnx road graph. """ distance_km = _haversine(start_lat, start_lon, end_lat, end_lon) eta_minutes = round((distance_km / 30) * 60) # 30 km/h for emergency # Generate intermediate waypoints (linear interpolation) steps = max(2, min(5, int(distance_km / 2))) waypoints = [] for i in range(steps + 1): t = i / steps wlat = start_lat + t * (end_lat - start_lat) wlon = start_lon + t * (end_lon - start_lon) waypoints.append([round(wlat, 5), round(wlon, 5)]) return { "algorithm": "Dijkstra Shortest Path", "start": {"lat": start_lat, "lon": start_lon}, "end": {"lat": end_lat, "lon": end_lon}, "distance_km": round(distance_km, 2), "eta_minutes": eta_minutes, "waypoints": waypoints, "instructions": [ f"Proceed towards safe zone ({distance_km:.1f} km away)", "Avoid low-lying roads, underpasses, and near-river routes", "Follow elevated roads and flyovers where possible", f"Estimated travel time: {eta_minutes} minutes at emergency speed", "Arrive at designated safe zone and register with authorities", ] } # ============================================================================ # HEALTH & METADATA # ============================================================================ @app.get("/health") def health(): return { "status": "ok", "models": { "flood": flood_predictor.is_ready(), "cyclone": cyclone_predictor.is_ready(), "landslide": landslide_predictor.is_ready(), "earthquake": earthquake_predictor.is_ready(), }, "model_architecture": "Fuzzy Neural Network (ANFIS-style)", "allocation_algorithm": "Hungarian + 2-opt route optimization", } @app.get("/features/{disaster_type}") def get_feature_schema(disaster_type: str): if disaster_type not in FEATURE_SCHEMAS: raise HTTPException(404, f"Unknown disaster type: {disaster_type}. Valid: {list(FEATURE_SCHEMAS)}") return { "disaster_type": disaster_type, "features": FEATURE_SCHEMAS[disaster_type], "count": len(FEATURE_SCHEMAS[disaster_type]), } # ============================================================================ # PREDICTION ENDPOINTS # ============================================================================ @app.post("/predict/flood") def predict_flood(req: FloodPredictionRequest): features = req.features.model_dump() errors = flood_predictor.validate_input(features) if errors: raise HTTPException(422, {"validation_errors": errors}) result = flood_predictor.predict(features, req.n_mc_samples) return {"disaster_type": "flood", **prediction_result_to_dict(result)} @app.post("/predict/cyclone") def predict_cyclone(req: CyclonePredictionRequest): features = req.features.model_dump() errors = cyclone_predictor.validate_input(features) if errors: raise HTTPException(422, {"validation_errors": errors}) result = cyclone_predictor.predict(features, req.n_mc_samples) return {"disaster_type": "cyclone", **prediction_result_to_dict(result)} @app.post("/predict/landslide") def predict_landslide(req: LandslidePredictionRequest): features = req.features.model_dump() errors = landslide_predictor.validate_input(features) if errors: raise HTTPException(422, {"validation_errors": errors}) result = landslide_predictor.predict(features, req.n_mc_samples) return {"disaster_type": "landslide", **prediction_result_to_dict(result)} @app.post("/predict/earthquake") def predict_earthquake(req: EarthquakePredictionRequest): features = req.features.model_dump() errors = earthquake_predictor.validate_input(features) if errors: raise HTTPException(422, {"validation_errors": errors}) result = earthquake_predictor.predict(features, req.n_mc_samples) return {"disaster_type": "earthquake", **prediction_result_to_dict(result)} @app.post("/predict/flood/explain") def explain_flood(req: FloodPredictionRequest): # ← was PredictionRequest features = req.features.model_dump() # ← add this errors = flood_predictor.validate_input(features) if errors: raise HTTPException(422, {"validation_errors": errors}) result = flood_predictor.predict(features, req.n_mc_samples) memberships = result.feature_memberships explanation = {} if memberships: for feat, degrees in memberships.items(): explanation[feat] = { "LOW": round(degrees[0], 4), "MEDIUM": round(degrees[1], 4), "HIGH": round(degrees[2], 4), "dominant_set": ["LOW", "MEDIUM", "HIGH"][int(np.argmax(degrees))] } return { "risk_score": result.risk_score, "risk_tier": result.risk_tier.value, "fuzzy_explanation": explanation, } @app.post("/predict/all") def predict_all(req: MultiHazardRequest): result = multi_hazard.predict_all(req.features_by_type, req.weights) # Serialize PredictionResult objects by_disaster_serialized = {} for dt, pred_result in result["by_disaster"].items(): by_disaster_serialized[dt] = prediction_result_to_dict(pred_result) return { "composite_risk_score": result["composite_risk_score"], "composite_risk_tier": result["composite_risk_tier"].value, "active_predictors": result["active_predictors"], "by_disaster": by_disaster_serialized, } import numpy as np # needed for explain endpoint @app.get("/predict/cyclone/live") def predict_cyclone_live(): """ Fetch live IMD data, predict cyclone risk, return prediction + a heatmap-ready GeoJSON point for frontend rendering. Falls back to RSMC page scrape if bulletin TXT is unavailable. """ # ── Step 1: Try bulletin TXT ─────────────────────────────────────────── bulletin = imd_fetcher.fetch_hourly_bulletin() raw_params = {} if bulletin["status"] == "success": raw_params = imd_fetcher.parse_cyclone_parameters(bulletin["content"]) # ── Step 2: Fallback to page scrape ─────────────────────────────────── if not raw_params: page_data = imd_fetcher.fetch_rsmc_page_alerts() if page_data["alerts"]: # Try to parse coords from alert text combined_text = " ".join(page_data["alerts"]) raw_params = imd_fetcher.parse_cyclone_parameters(combined_text) # ── Step 3: If still no params, return no-storm status ──────────────── if not raw_params: return { "status": "no_active_storm", "message": "No active cyclone detected in IMD feeds", "source": bulletin.get("url_used", "IMD RSMC"), "timestamp": datetime.now().isoformat(), "heatmap_geojson": None, } # ── Step 4: Engineer features and predict ───────────────────────────── engineered = cyclone_engineer.engineer_features(raw_params) features = cyclone_engineer.to_model_features(engineered) errors = cyclone_predictor.validate_input(features) if errors: raise HTTPException(422, {"validation_errors": errors}) result = cyclone_predictor.predict(features, 50) # ── Step 5: Build heatmap GeoJSON ───────────────────────────────────── lat = raw_params.get("LAT") lon = raw_params.get("LON") heatmap_geojson = _build_cyclone_heatmap_geojson( lat=lat, lon=lon, risk_score=result.risk_score, risk_tier=result.risk_tier.value, uncertainty=result.uncertainty, raw_params=raw_params, features=features, ) if (lat and lon) else None return { "status": "active_storm", "source": bulletin.get("url_used", "IMD RSMC page"), "raw_parameters": raw_params, "model_features_used": features, "heatmap_geojson": heatmap_geojson, **prediction_result_to_dict(result), } @app.get("/live/cyclone/raw") def get_live_cyclone_raw(): """ Returns raw IMD bulletin content + RSMC page alerts. Useful for debugging what IMD is currently publishing. """ bulletin = imd_fetcher.fetch_hourly_bulletin() page = imd_fetcher.fetch_rsmc_page_alerts() return { "bulletin": bulletin, "rsmc_page": page, "timestamp": datetime.now().isoformat(), } @app.get("/live/cyclone/heatmap") def get_cyclone_heatmap(): """ Dedicated heatmap endpoint — returns GeoJSON FeatureCollection with risk-annotated points ready for Leaflet / Mapbox heatmap layer. Frontend can poll this every N minutes and re-render. """ bulletin = imd_fetcher.fetch_hourly_bulletin() raw_params = {} if bulletin["status"] == "success": raw_params = imd_fetcher.parse_cyclone_parameters(bulletin["content"]) if not raw_params: page_data = imd_fetcher.fetch_rsmc_page_alerts() combined = " ".join(page_data.get("alerts", [])) raw_params = imd_fetcher.parse_cyclone_parameters(combined) if not raw_params or "LAT" not in raw_params or "LON" not in raw_params: # Return empty FeatureCollection — frontend renders nothing return { "type": "FeatureCollection", "features": [], "metadata": { "status": "no_active_storm", "timestamp": datetime.now().isoformat(), "message": "No active cyclone with known coordinates detected", } } engineered = cyclone_engineer.engineer_features(raw_params) features = cyclone_engineer.to_model_features(engineered) result = cyclone_predictor.predict(features, 50) geojson = _build_cyclone_heatmap_geojson( lat=raw_params["LAT"], lon=raw_params["LON"], risk_score=result.risk_score, risk_tier=result.risk_tier.value, uncertainty=result.uncertainty, raw_params=raw_params, features=features, ) return geojson # ── Helper — builds heatmap GeoJSON ─────────────────────────────────────── def _build_cyclone_heatmap_geojson( lat: float, lon: float, risk_score: float, risk_tier: str, uncertainty: float, raw_params: dict, features: dict, ) -> dict: """ Returns a GeoJSON FeatureCollection with: - A Point at storm centre with full risk properties - Radius rings at 50km, 150km, 300km for heatmap intensity falloff Frontend usage (Leaflet example): L.heatLayer( geojson.features.map(f => [ f.geometry.coordinates[1], f.geometry.coordinates[0], f.properties.intensity ]), { radius: 60, blur: 40, maxZoom: 10 } ) """ import math color_map = { "LOW": "#2ecc71", "MODERATE": "#f39c12", "HIGH": "#e74c3c", "CRITICAL": "#8e44ad", } features_list = [] # Centre point — full intensity features_list.append({ "type": "Feature", "geometry": {"type": "Point", "coordinates": [lon, lat]}, "properties": { "risk_score": risk_score, "risk_tier": risk_tier, "uncertainty": uncertainty, "intensity": risk_score, # Leaflet heatmap weight "color": color_map.get(risk_tier, "#95a5a6"), "wind_kmh": raw_params.get("MAX_WIND", 0) * 1.852, "pressure_hpa": raw_params.get("MIN_PRESSURE", 1000), "point_type": "storm_centre", "label": f"Cyclone Risk: {risk_tier} ({risk_score:.2f})", } }) # Falloff rings — intensity decreases with distance for radius_km, falloff in [(50, 0.85), (150, 0.60), (300, 0.30)]: # Generate 8 cardinal points on the ring for bearing_deg in range(0, 360, 45): bearing = math.radians(bearing_deg) R = 6371 # Earth radius km lat_r = math.radians(lat) lon_r = math.radians(lon) d_r = radius_km / R ring_lat = math.degrees(math.asin( math.sin(lat_r) * math.cos(d_r) + math.cos(lat_r) * math.sin(d_r) * math.cos(bearing) )) ring_lon = math.degrees(lon_r + math.atan2( math.sin(bearing) * math.sin(d_r) * math.cos(lat_r), math.cos(d_r) - math.sin(lat_r) * math.sin(math.radians(ring_lat)) )) features_list.append({ "type": "Feature", "geometry": { "type": "Point", "coordinates": [ring_lon, ring_lat] }, "properties": { "risk_score": round(risk_score * falloff, 4), "intensity": round(risk_score * falloff, 4), "risk_tier": risk_tier, "point_type": f"ring_{radius_km}km", "radius_km": radius_km, "color": color_map.get(risk_tier, "#95a5a6"), } }) return { "type": "FeatureCollection", "features": features_list, "metadata": { "storm_centre": {"lat": lat, "lon": lon}, "risk_score": risk_score, "risk_tier": risk_tier, "uncertainty": uncertainty, "wind_kmh": round(raw_params.get("MAX_WIND", 0) * 1.852, 1), "pressure_hpa": raw_params.get("MIN_PRESSURE", 1000), "timestamp": datetime.now().isoformat(), "source": "IMD RSMC + FNN Cyclone Predictor", "total_points": len(features_list), "rendering_hint": { "leaflet_heatmap": "use intensity property as weight", "mapbox_circle": "use risk_score for fill-opacity, color for fill-color", "refresh_seconds": 1800, } } } @app.get("/live/flood/heatmap") def get_flood_heatmap(): """ Fetches live rainfall from IMD city stations, runs FNN flood prediction for each city, returns heatmap-ready GeoJSON. Frontend usage (Leaflet heatmap): fetch('/live/flood/heatmap') .then(r => r.json()) .then(geojson => { L.heatLayer( geojson.features.map(f => [ f.geometry.coordinates[1], f.geometry.coordinates[0], f.properties.intensity ]), { radius: 80, blur: 50, maxZoom: 8, max: 1.0 } ).addTo(map); }); Each feature also has 'color' and 'risk_tier' for circle/marker layers. """ if not flood_predictor.is_ready(): raise HTTPException(503, "Flood model not loaded. Run train_model.py first.") city_data = flood_fetcher.fetch_all_cities() color_map = { "LOW": "#2ecc71", "MODERATE": "#f39c12", "HIGH": "#e74c3c", "CRITICAL": "#8e44ad", } features_list = [] failed_cities = [] successful_cities = [] for city_obs in city_data: city = city_obs["city"] # Use observed rainfall or fallback to 0 rainfall_mm = city_obs.get("rainfall_mm") if rainfall_mm is None: # IMD fetch failed for this city — use 0 rainfall # Still render city with baseline risk rainfall_mm = 0.0 data_source = "baseline (IMD unavailable)" else: data_source = "IMD live" lat = city_obs.get("lat", 0) lon = city_obs.get("lon", 0) if not lat or not lon: failed_cities.append(city) continue # Build full flood feature vector static = city_obs.get("static_features", {}) soil_sat = flood_fetcher.estimate_soil_saturation(rainfall_mm) flood_features = { "rainfall_mm": float(rainfall_mm), "elevation_m": static.get("elevation_m", 50.0), "soil_saturation_pct": soil_sat, "dist_river": static.get("dist_river", 2.0), "drainage_capacity_index": static.get("drainage_capacity_index", 0.5), "flow_accumulation": static.get("flow_accumulation", 0.5), "twi": static.get("twi", 8.0), } # Validate and predict errors = flood_predictor.validate_input(flood_features) if errors: failed_cities.append(f"{city}: {errors}") continue try: result = flood_predictor.predict(flood_features, n_mc_samples=30) except Exception as e: failed_cities.append(f"{city}: {str(e)}") continue successful_cities.append(city) features_list.append({ "type": "Feature", "geometry": { "type": "Point", "coordinates": [lon, lat] }, "properties": { # Heatmap rendering "intensity": result.risk_score, "color": color_map.get(result.risk_tier.value, "#95a5a6"), # Risk info "city": city, "risk_score": result.risk_score, "risk_tier": result.risk_tier.value, "uncertainty": result.uncertainty, "ci_lower": result.confidence_interval[0], "ci_upper": result.confidence_interval[1], # Input data (useful for tooltip display) "rainfall_mm": round(rainfall_mm, 1), "soil_saturation_pct": round(soil_sat, 1), "elevation_m": static.get("elevation_m"), "data_source": data_source, # Tooltip-ready label "label": ( f"{city}: {result.risk_tier.value} flood risk " f"(score={result.risk_score:.2f}, " f"rain={rainfall_mm:.1f}mm)" ), } }) return { "type": "FeatureCollection", "features": features_list, "metadata": { "timestamp": datetime.now().isoformat(), "source": "IMD City Weather + FNN Flood Model", "cities_monitored": len(city_data), "cities_successful": len(successful_cities), "cities_failed": len(failed_cities), "successful": successful_cities, "failed": failed_cities, "data_note": ( "Rainfall from IMD city stations where available. " "Cities with unavailable data show baseline risk (0mm rainfall). " "Static features (elevation, drainage, TWI) from training dataset." ), "rendering_hint": { "leaflet_heatmap": "use 'intensity' property as weight", "leaflet_circles": "use 'color' for fillColor, 'risk_score' for radius scaling", "mapbox": "use 'risk_score' for fill-opacity, 'color' for fill-color", "refresh_seconds": 3600, } } } @app.get("/live/flood/city/{city_name}") def get_flood_risk_single_city(city_name: str): """ Get live flood risk for a single city by name. City names: Mumbai, Chennai, Kolkata, Delhi, Hyderabad, Bangalore, Bhubaneswar, Patna, Guwahati, Kochi """ if not flood_predictor.is_ready(): raise HTTPException(503, "Flood model not loaded.") # Normalise city name city_name = city_name.strip().title() station_id = flood_fetcher.CITY_STATIONS.get(city_name) if not station_id: raise HTTPException(404, { "error": f"City '{city_name}' not monitored.", "available_cities": list(flood_fetcher.CITY_STATIONS.keys()) }) obs = flood_fetcher.fetch_city_rainfall(city_name, station_id) rainfall_mm = obs.get("rainfall_mm") or 0.0 lat, lon = flood_fetcher.STATION_COORDS[city_name] static = flood_fetcher.CITY_STATIC_FEATURES[city_name] soil_sat = flood_fetcher.estimate_soil_saturation(rainfall_mm) flood_features = { "rainfall_mm": float(rainfall_mm), "elevation_m": static["elevation_m"], "soil_saturation_pct": soil_sat, "dist_river": static["dist_river"], "drainage_capacity_index": static["drainage_capacity_index"], "flow_accumulation": static["flow_accumulation"], "twi": static["twi"], } errors = flood_predictor.validate_input(flood_features) if errors: raise HTTPException(422, {"validation_errors": errors}) result = flood_predictor.predict(flood_features, n_mc_samples=50) return { "city": city_name, "coordinates": {"lat": lat, "lon": lon}, "rainfall_mm": rainfall_mm, "imd_status": obs["status"], "flood_features": flood_features, **prediction_result_to_dict(result), "timestamp": datetime.now().isoformat(), } # ============================================================================ # LANE-LEVEL FLOOD MAP ENDPOINTS # ============================================================================ @app.post("/map/flood/features") def flood_map_from_features(req: LaneFeaturesRequest): """ Primary endpoint: generate lane-level flood risk GeoJSON from explicit per-segment feature values. Returns a GeoJSON FeatureCollection where each LineString feature has risk_score, risk_tier, color, and uncertainty properties. """ if not flood_predictor.is_ready(): raise HTTPException(503, "Flood model not loaded. Run train_model.py first.") return lane_mapper.map_from_features(req.segments) @app.post("/map/flood/osm") def flood_map_from_osm(req: OSMMapRequest): """ Fetch road network from OpenStreetMap for a bounding box and generate flood risk GeoJSON using zone-level features. Requires osmnx: pip install osmnx """ if not flood_predictor.is_ready(): raise HTTPException(503, "Flood model not loaded. Run train_model.py first.") try: return lane_mapper.map_from_osm( req.bbox, req.base_features, req.segment_overrides ) except RuntimeError as e: raise HTTPException(400, str(e)) @app.post("/map/flood/geojson") def flood_map_from_geojson(req: GeoJSONMapRequest): """ Generate flood risk map from your own road GeoJSON. Provide a feature_mapping to tell the API which GeoJSON property corresponds to which flood input feature. """ if not flood_predictor.is_ready(): raise HTTPException(503, "Flood model not loaded. Run train_model.py first.") return lane_mapper.map_from_geojson(req.geojson, req.feature_mapping) @app.post("/evacuate/route") def get_evacuation_route(req: EvacuationRequest): """ 1. Takes user lat/lon from frontend 2. Auto-derives flood features from nearest city (or uses provided ones) 3. Runs FNN flood predictor 4. If risk >= HIGH, computes evacuation route to nearest safe zone 5. Returns route + safe zones as GeoJSON-friendly response """ if not flood_predictor.is_ready(): raise HTTPException(503, "Flood model not loaded.") # ── Step 1: Build flood features ────────────────────────────────────── nearest_city = _nearest_city(req.latitude, req.longitude) if req.flood_features: features = req.flood_features data_source = "user-provided" else: static = get_static_features_for_location(req.latitude, req.longitude) rainfall_mm = get_rainfall_for_location(req.latitude, req.longitude) soil_sat = float(flood_fetcher.estimate_soil_saturation(rainfall_mm)) features = { "rainfall_mm": rainfall_mm, "elevation_m": static["elevation_m"], "soil_saturation_pct": soil_sat, "dist_river": static["dist_river"], "drainage_capacity_index": static["drainage_capacity_index"], "flow_accumulation": static["flow_accumulation"], "twi": static["twi"], } data_source = "IMD live + static city dataset" # ── Step 2: Validate + predict ──────────────────────────────────────── errors = flood_predictor.validate_input(features) if errors: raise HTTPException(422, {"validation_errors": errors}) result = flood_predictor.predict(features, n_mc_samples=req.n_mc_samples) risk_score = result.risk_score risk_tier = result.risk_tier.value # ── Step 3: Safe zones for this location ────────────────────────────── safe_zones = get_nearest_safe_zones(req.latitude, req.longitude) # ── Step 4: No evacuation needed ────────────────────────────────────── if risk_score < 0.45: return { "latitude": req.latitude, "longitude": req.longitude, "nearest_city": nearest_city, "risk_score": round(risk_score, 4), "risk_tier": risk_tier, "uncertainty": round(result.uncertainty, 4), "evacuation_needed": False, "message": f"No evacuation needed — risk is {risk_tier}", "flood_features": features, "data_source": data_source, "safe_zones": safe_zones, "route": None, "timestamp": datetime.now().isoformat(), } # ── Step 5: Compute evacuation route to nearest safe zone ───────────── nearest_zone = safe_zones[0] route = dijkstra_route( start_lat=req.latitude, start_lon=req.longitude, end_lat=nearest_zone["lat"], end_lon=nearest_zone["lon"], ) return { "latitude": req.latitude, "longitude": req.longitude, "nearest_city": nearest_city, "risk_score": round(risk_score, 4), "risk_tier": risk_tier, "uncertainty": round(result.uncertainty, 4), "confidence_interval": [ round(result.confidence_interval[0], 4), round(result.confidence_interval[1], 4), ], "evacuation_needed": True, "message": f"EVACUATE — {risk_tier} flood risk detected near {nearest_city}", "flood_features": features, "data_source": data_source, "nearest_safe_zone": nearest_zone, "all_safe_zones": safe_zones, "route": route, "timestamp": datetime.now().isoformat(), } # ============================================================================ # ALLOCATION ENDPOINTS # ============================================================================ @app.post("/allocate/auto") def auto_allocate(req: AutoAllocateRequest): """ Automatically allocate unassigned tasks to available teams. Uses Hungarian algorithm for optimal bipartite matching, then 2-opt for route optimization. """ if req.strategy not in ("priority_based", "proximity_based", "balanced"): raise HTTPException(400, "strategy must be priority_based | proximity_based | balanced") allocations = AllocationEngine.auto_allocation( strategy=req.strategy, optimize_routes=req.optimize_routes, priority_weight=req.priority_weight ) return { "allocations_created": len(allocations), "strategy": req.strategy, "assignment_algorithm": "Hungarian (scipy.optimize.linear_sum_assignment)", "route_algorithm": "Priority-weighted nearest neighbor + 2-opt", "allocations": [a.dict() for a in allocations], } @app.post("/allocate/manual") def manual_allocate(req: ManualAllocateRequest): """Manually specify team → task assignments.""" results = [] errors = [] for team_id, task_ids in req.team_assignments.items(): try: allocation = AllocationEngine.manual_allocation( team_id, task_ids, optimize_route=req.optimize_routes, respect_capacity=req.respect_capacity ) results.append(allocation.dict()) except HTTPException as e: errors.append({"team_id": team_id, "error": e.detail}) return { "successful_allocations": len(results), "failed_allocations": len(errors), "allocations": results, "errors": errors, } @app.post("/allocate/reset") def reset_allocations(): """Reset all allocations and task statuses to unassigned.""" reset_all_allocations() return {"status": "reset", "message": "All allocations cleared, tasks reset to UNASSIGNED"} @app.get("/allocate/summary") def allocation_summary(): return get_allocation_summary() # ============================================================================ # TEAM MANAGEMENT # ============================================================================ @app.post("/teams", status_code=201) def create_team(team: FieldTeam): if team.id in TEAMS: raise HTTPException(409, f"Team {team.id} already exists") TEAMS[team.id] = team return team @app.get("/teams") def list_teams(): return list(TEAMS.values()) @app.get("/teams/{team_id}") def get_team(team_id: str): if team_id not in TEAMS: raise HTTPException(404, f"Team {team_id} not found") return TEAMS[team_id] @app.delete("/teams/{team_id}") def delete_team(team_id: str): if team_id not in TEAMS: raise HTTPException(404) del TEAMS[team_id] return {"deleted": team_id} # ============================================================================ # TASK MANAGEMENT # ============================================================================ @app.post("/tasks", status_code=201) def create_task(task: Task): if task.id in TASKS: raise HTTPException(409, f"Task {task.id} already exists") TASKS[task.id] = task return task @app.get("/tasks") def list_tasks( status: Optional[str] = Query(None, description="Filter by status"), disaster_type: Optional[str] = Query(None) ): tasks = list(TASKS.values()) if status: tasks = [t for t in tasks if t.status.value == status] if disaster_type: tasks = [t for t in tasks if t.disaster_type == disaster_type] return tasks @app.get("/tasks/{task_id}") def get_task(task_id: str): if task_id not in TASKS: raise HTTPException(404) return TASKS[task_id] @app.patch("/tasks/{task_id}/status") def update_task_status(task_id: str, status: TaskStatus): if task_id not in TASKS: raise HTTPException(404) TASKS[task_id].status = status return TASKS[task_id] @app.delete("/tasks/{task_id}") def delete_task(task_id: str): if task_id not in TASKS: raise HTTPException(404) del TASKS[task_id] return {"deleted": task_id} # ============================================================================ # STARTUP # ============================================================================ @app.on_event("startup") def startup(): initialize_default_teams() ready = [k for k, p in { "flood": flood_predictor, "cyclone": cyclone_predictor, "landslide": landslide_predictor, "earthquake": earthquake_predictor }.items() if p.is_ready()] print(f"[API] Models ready: {ready or 'None — run train_model.py'}") print(f"[API] Default teams initialized: {list(TEAMS.keys())}")