# main.py - FastAPI application for Flood Vulnerability Assessment from fastapi import FastAPI, File, UploadFile, HTTPException, Request from fastapi.responses import StreamingResponse, HTMLResponse from fastapi.templating import Jinja2Templates from pydantic import BaseModel, field_validator from typing import Optional, Dict import pandas as pd import io import asyncio from concurrent.futures import ThreadPoolExecutor from spatial_queries import get_terrain_metrics, distance_to_water from vulnerability import calculate_vulnerability_index from gee_auth import initialize_gee from height_predictor.inference import get_predictor from height_predictor.get_height_gba import GlobalBuildingAtlasHeight # SHAP Explainer Initialization try: from explainability import VulnerabilityExplainer explainer = VulnerabilityExplainer() # Automatically loads rf_explainer.pkl if present print("✅ SHAP model initialized successfully.") except Exception as e: print(f"⚠️ SHAP explainer not available: {e}") explainer = None # Initialize GEE once at startup try: initialize_gee() print("✅ GEE initialized once at startup.") except Exception as e: print(f"⚠️ GEE initialization failed at startup: {e}") # APP INITIALIZATION app = FastAPI(title="Flood Vulnerability Assessment API", version="1.0") # Frontend templates setup templates = Jinja2Templates(directory="templates") # Thread pool for batch processing executor = ThreadPoolExecutor(max_workers=10) gba_getter = GlobalBuildingAtlasHeight() # DATA MODEL class SingleAssessment(BaseModel): latitude: float longitude: float height: Optional[float] = 0.0 basement: Optional[float] = 0.0 @field_validator('latitude') @classmethod def check_lat(cls, v: float) -> float: if not -90 <= v <= 90: raise ValueError('Latitude must be between -90 and 90') return v @field_validator('longitude') @classmethod def check_lon(cls, v: float) -> float: if not -180 <= v <= 180: raise ValueError('Longitude must be between -180 and 180') return v @field_validator('basement') @classmethod def check_basement(cls, v: float) -> float: if v > 0: raise ValueError('Basement height must be 0 or negative (e.g., -1, -2, -3)') return v # FRONTEND ROUTE @app.get("/", response_class=HTMLResponse) async def home(request: Request): """Serve the main web interface""" return templates.TemplateResponse("index.html", {"request": request}) # API ROUTES @app.get("/api") async def root() -> Dict: """API info endpoint""" return { "service": "Flood Vulnerability Assessment API", "version": "1.0", "endpoints": { "POST /assess": "Assess single location", "POST /assess_batch": "Assess batch from CSV file", "GET /health": "Health check" } } @app.post("/assess") async def assess_single(data: SingleAssessment) -> Dict: """Assess flood vulnerability for a single location (non-blocking).""" loop = asyncio.get_event_loop() try: # Run slow terrain + water queries in a background thread terrain, water_dist = await loop.run_in_executor( None, lambda: ( get_terrain_metrics(data.latitude, data.longitude), distance_to_water(data.latitude, data.longitude) ) ) # Calculate vulnerability after terrain + water distance retrieved result = calculate_vulnerability_index( lat=data.latitude, lon=data.longitude, height=data.height, basement=data.basement, terrain_metrics=terrain, water_distance=water_dist ) return { "status": "success", "input": data.dict(), "assessment": result } except Exception as e: raise HTTPException(status_code=500, detail=f"Assessment failed: {e}") @app.post("/predict_height") async def predict_height(data: SingleAssessment) -> Dict: try: predictor = get_predictor() result = predictor.predict_from_coordinates(data.latitude, data.longitude) if result['status'] == 'error': raise HTTPException(status_code=500, detail=result['error']) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) def process_single_row(row, use_predicted_height=False, use_gba_height=False): """Process a single row from CSV - used for parallel processing.""" try: lat = row['latitude'] lon = row['longitude'] height = row.get('height', 0.0) basement = row.get('basement', 0.0) if use_gba_height: try: result = gba_getter.get_height_m(lat, lon, buffer_m=5.0) if result.get('status') == 'success' and result.get('predicted_height') is not None: h = result['predicted_height'] if h >= 0: # Only use valid positive heights height = h except Exception as e: print(f"GBA height failed for {lat},{lon}: {e}") elif use_predicted_height: try: predictor = get_predictor() pred = predictor.predict_from_coordinates(lat, lon) if pred['status'] == 'success' and pred['predicted_height'] is not None: height = pred['predicted_height'] except Exception as e: print(f"Height prediction failed for {lat},{lon}: {e}") terrain = get_terrain_metrics(lat, lon) water_dist = distance_to_water(lat, lon) result = calculate_vulnerability_index( lat=lat, lon=lon, height=height, basement=basement, terrain_metrics=terrain, water_distance=water_dist ) # CSV output - essential columns return { 'latitude': lat, 'longitude': lon, 'height': height, 'basement': basement, 'vulnerability_index': result['vulnerability_index'], 'ci_lower_95': result['confidence_interval']['lower_bound_95'], 'ci_upper_95': result['confidence_interval']['upper_bound_95'], 'vulnerability_level': result['risk_level'], 'confidence': result['uncertainty_analysis']['confidence'], 'confidence_interpretation': result['uncertainty_analysis']['interpretation'], 'elevation_m': result['elevation_m'], 'tpi_m': result['relative_elevation_m'], 'slope_degrees': result['slope_degrees'], 'distance_to_water_m': result['distance_to_water_m'], 'quality_flags': ','.join(result['uncertainty_analysis']['data_quality_flags']) if result['uncertainty_analysis']['data_quality_flags'] else '' } except Exception as e: return { 'latitude': row.get('latitude'), 'longitude': row.get('longitude'), 'height': row.get('height', 0.0), 'basement': row.get('basement', 0.0), 'error': str(e), 'vulnerability_index': None, 'ci_lower_95': None, 'ci_upper_95': None, 'risk_level': None, 'confidence': None, 'confidence_interpretation': None, 'elevation_m': None, 'tpi_m': None, 'slope_degrees': None, 'distance_to_water_m': None, 'quality_flags': '' } @app.post("/assess_batch") async def assess_batch(file: UploadFile = File(...), use_predicted_height:bool=False, use_gba_height:bool=False) -> StreamingResponse: """Assess flood vulnerability for multiple locations from a CSV file.""" try: contents = await file.read() df = pd.read_csv(io.StringIO(contents.decode('utf-8'))) if 'latitude' not in df.columns or 'longitude' not in df.columns: raise HTTPException( status_code=400, detail="CSV must contain 'latitude' and 'longitude' columns" ) import numpy as np df = df[(np.abs(df['latitude']) <= 90) & (np.abs(df['longitude']) <= 180)] if len(df) == 0: raise HTTPException(status_code=400, detail="No valid coordinates in CSV (lat -90..90, lon -180..180)") # Set defaults for optional columns if 'height' not in df.columns: df['height'] = 0.0 if 'basement' not in df.columns: df['basement'] = 0.0 loop = asyncio.get_event_loop() results = await loop.run_in_executor( executor, lambda: [process_single_row(row, use_predicted_height, use_gba_height) for _, row in df.iterrows()] ) results_df = pd.DataFrame(results) output = io.StringIO() results_df.to_csv(output, index=False) output.seek(0) return StreamingResponse( io.BytesIO(output.getvalue().encode('utf-8')), media_type="text/csv", headers={ "Content-Disposition": ( "attachment; filename=vulnerability_results.csv; " "filename*=UTF-8''vulnerability_results.csv" ) } ) except Exception as e: raise HTTPException(status_code=500, detail=f"Batch processing failed: {str(e)}") @app.post("/assess_batch_multihazard") async def assess_batch_multihazard(file: UploadFile = File(...), use_predicted_height: bool = False, use_gba_height: bool = False) -> StreamingResponse: try: contents = await file.read() df = pd.read_csv(io.StringIO(contents.decode('utf-8'))) if 'latitude' not in df.columns or 'longitude' not in df.columns: raise HTTPException( status_code=400, detail="CSV must contain 'latitude' and 'longitude' columns" ) loop = asyncio.get_event_loop() from vulnerability import calculate_multi_hazard_vulnerability results = await loop.run_in_executor( executor, lambda: [process_single_row_multihazard(row, use_predicted_height, use_gba_height) for _, row in df.iterrows()] ) results_df = pd.DataFrame(results) output = io.StringIO() results_df.to_csv(output, index=False) output.seek(0) return StreamingResponse( io.BytesIO(output.getvalue().encode('utf-8')), media_type="text/csv", headers={ "Content-Disposition": ( "attachment; filename=multihazard_results.csv; " "filename*=UTF-8''multihazard_results.csv" ) } ) except Exception as e: raise HTTPException(status_code=500, detail=f"Batch multihazard failed: {str(e)}") @app.post("/explain") async def explain_assessment(data: SingleAssessment) -> Dict: """Assess vulnerability with SHAP explanation""" loop = asyncio.get_event_loop() try: # Run slow terrain + water queries in a background thread terrain, water_dist = await loop.run_in_executor( None, lambda: ( get_terrain_metrics(data.latitude, data.longitude), distance_to_water(data.latitude, data.longitude) ) ) result = calculate_vulnerability_index( lat=data.latitude, lon=data.longitude, height=data.height, basement=data.basement, terrain_metrics=terrain, water_distance=water_dist ) # Generate explanation if explainer available explanation = None if explainer: try: explanation = explainer.explain(result['components']) except Exception as e: print(f"SHAP explanation failed: {e}") return { "status": "success", "input": data.dict(), "assessment": result, "explanation": explanation } except Exception as e: raise HTTPException(status_code=500, detail=f"Assessment failed: {e}") def process_single_row_multihazard(row, use_predicted_height=False, use_gba_height=False): """Process a single row with multi-hazard assessment.""" try: from vulnerability import calculate_multi_hazard_vulnerability lat = row['latitude'] lon = row['longitude'] height = row.get('height', 0.0) basement = row.get('basement', 0.0) if use_gba_height: try: result = gba_getter.get_height_m(lat, lon, buffer_m=5.0) if result.get('status') == 'success' and result.get('predicted_height') is not None: h = result['predicted_height'] if h >= 0: # Only use valid positive heights height = h except Exception as e: print(f"GBA height failed for {lat},{lon}: {e}") elif use_predicted_height: try: predictor = get_predictor() pred = predictor.predict_from_coordinates(lat, lon) if pred['status'] == 'success' and pred['predicted_height'] is not None: height = pred['predicted_height'] except Exception as e: print(f"Height prediction failed for {lat},{lon}: {e}") terrain = get_terrain_metrics(lat, lon) water_dist = distance_to_water(lat, lon) result = calculate_multi_hazard_vulnerability( lat=lat, lon=lon, height=height, basement=basement, terrain_metrics=terrain, water_distance=water_dist ) return { 'latitude': lat, 'longitude': lon, 'height': height, 'basement': basement, 'vulnerability_index': result['vulnerability_index'], 'ci_lower_95': result['confidence_interval']['lower_bound_95'], 'ci_upper_95': result['confidence_interval']['upper_bound_95'], 'vulnerability_level': result['risk_level'], 'confidence': result['uncertainty_analysis']['confidence'], 'confidence_interpretation': result['uncertainty_analysis']['interpretation'], 'elevation_m': result['elevation_m'], 'tpi_m': result['relative_elevation_m'], 'slope_degrees': result['slope_degrees'], 'distance_to_water_m': result['distance_to_water_m'], 'dominant_hazard': result['dominant_hazard'], 'fluvial_risk': result['hazard_breakdown']['fluvial_riverine'], 'coastal_risk': result['hazard_breakdown']['coastal_surge'], 'pluvial_risk': result['hazard_breakdown']['pluvial_drainage'], 'combined_risk': result['hazard_breakdown']['combined_index'], 'quality_flags': ','.join(result['uncertainty_analysis']['data_quality_flags']) if result['uncertainty_analysis']['data_quality_flags'] else '' } except Exception as e: return { 'latitude': row.get('latitude'), 'longitude': row.get('longitude'), 'height': row.get('height', 0.0), 'basement': row.get('basement', 0.0), 'error': str(e), 'vulnerability_index': None } @app.post("/assess_multihazard") async def assess_multihazard(data: SingleAssessment) -> Dict: """Multi-hazard assessment (fluvial + coastal + pluvial)""" loop = asyncio.get_event_loop() try: from vulnerability import calculate_multi_hazard_vulnerability # Run slow terrain + water queries in a background thread terrain, water_dist = await loop.run_in_executor( None, lambda: ( get_terrain_metrics(data.latitude, data.longitude), distance_to_water(data.latitude, data.longitude) ) ) result = calculate_multi_hazard_vulnerability( lat=data.latitude, lon=data.longitude, height=data.height, basement=data.basement, terrain_metrics=terrain, water_distance=water_dist ) return { "status": "success", "input": data.dict(), "assessment": result } except Exception as e: raise HTTPException(status_code=500, detail=f"Assessment failed: {e}") @app.post("/get_height_gba") async def get_height_gba(data: SingleAssessment): try: result = gba_getter.get_height_m(data.latitude, data.longitude, buffer_m=5.0) if result.get("status") != "success": raise HTTPException(status_code=404, detail="GBA height not found for this location. Please try predicting the height.") return result except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/health") async def health_check() -> Dict: """Health check endpoint.""" return {"status": "healthy", "gee_initialized": True}