# vulnerability.py import numpy as np def normalize_component(value, max_value, inverse=False): """ Normalize to 0-1 range """ if value is None: return 0.5 if inverse: normalized = min(1.0, abs(value) / max_value) else: normalized = max(0.0, 1.0 - (abs(value) / max_value)) return normalized def assess_flood_context(elevation, tpi, water_distance): # Context 1: Coastal (<10m) if elevation < 10: if water_distance is not None and water_distance < 500: return 'very_high', 1.0 elif water_distance is not None and water_distance < 2000: return 'very_high' if tpi < -3 else 'very high', 1.0 if tpi < -3 else 0.98 elif water_distance is not None and water_distance < 5000: return 'high' if tpi < -3 else 'moderate', 0.9 if tpi < -3 else 0.75 else: return 'moderate', 0.7 if tpi < -5 else 0.6 # Context 2: High plateau (>600m) elif elevation > 600: if tpi < -15 and water_distance is not None and water_distance < 100: return 'moderate', 0.65 elif tpi < -10: return 'low', 0.55 else: return 'low', 0.50 # Context 3: Mountain (300–600m) elif elevation > 300: if water_distance is not None and water_distance < 200 and tpi < -10: return 'moderate', 0.75 elif water_distance is not None and water_distance < 500: return 'low', 0.65 else: return 'low', 0.55 # Context 4: River valley (100–300m) elif 100 < elevation < 300: if water_distance is not None and water_distance < 300 and tpi < -5: return 'high', 1.0 elif water_distance is not None and water_distance < 500: return 'moderate', 0.85 else: return 'moderate', 0.7 # Context 5: Low inland (10–100m) else: if water_distance is None: return 'moderate', 0.7 elif water_distance < 200: if tpi < -8: return 'very_high', 1.0 elif tpi < -5: return 'high', 0.95 else: return 'high', 0.85 elif water_distance < 500: return 'high' if tpi < -5 else 'moderate', 0.85 if tpi < -5 else 0.75 elif water_distance < 1000: return 'moderate', 0.70 if tpi < -5 else 0.65 else: if tpi < -8: return 'moderate', 0.65 elif tpi < -5: return 'low', 0.60 else: return 'low', 0.55 def calculate_vulnerability_index(lat, lon, height, basement, terrain_metrics, water_distance): """ Calculate flood vulnerability index with basement consideration """ elevation = terrain_metrics.get('elevation') or 0 tpi = terrain_metrics.get('tpi') or 0 slope = terrain_metrics.get('slope') or 0 # GET FLOOD CONTEXT try: context_risk_level, context_factor = assess_flood_context(elevation, tpi, water_distance) except (TypeError, ValueError) as te: print(f"Context failed for {lat},{lon}: {te} - default moderate") context_risk_level, context_factor = 'moderate', 0.8 # Apply elevation penalty for high-altitude locations if elevation > 500: elevation_factor = max(0.3, 1.0 - (elevation - 500) / 1000) else: elevation_factor = 1.0 # Component 1: Proximity if water_distance is None: proximity_score = 0.5 elif water_distance < 100: proximity_score = 1.0 * elevation_factor elif water_distance < 500: proximity_score = (0.9 - ((water_distance - 100) / 400) * 0.5) * elevation_factor elif water_distance < 2000: proximity_score = (0.4 - ((water_distance - 500) / 1500) * 0.3) * elevation_factor elif water_distance < 5000: proximity_score = max(0.0, 0.1 - ((water_distance - 2000) / 3000) * 0.1) * elevation_factor else: proximity_score = 0.001 # Component 2: TPI (Topographic Position Index) if tpi is not None: if tpi < -5: tpi_score = min(1.0, 0.7 + abs(tpi + 5) / 30) elif tpi > 5: tpi_score = max(0.0, 0.3 - (tpi - 5) / 50) else: tpi_score = 0.5 - (tpi / 20) else: tpi_score = 0.5 tpi_score = max(0.0, min(1.0, tpi_score)) if elevation > 500: tpi_score = tpi_score * elevation_factor # Component 3: Slope if slope < 0.5: slope_score = 0.9 elif slope < 2: slope_score = 0.8 - ((slope - 0.5) / 1.5) * 0.3 elif slope < 6: slope_score = 0.5 - ((slope - 2) / 4) * 0.3 else: slope_score = max(0.05, 0.2 - (slope - 6) / 20) # Component 4: Building protection factor net_protection = height + abs(basement) # Height protection calculation (without basement penalty) if net_protection <= 0: height_score = 0.9 elif net_protection < 3: height_score = 0.8 - (net_protection / 3) * 0.3 elif net_protection < 8: height_score = 0.5 - ((net_protection - 3) / 5) * 0.3 else: height_score = max(0.1, 0.2 - ((net_protection - 8) / 15) * 0.15) height_score = max(0.0, min(1.0, height_score)) # Increase weight for building characteristics when basement present if basement < 0: weights = { 'proximity': 0.25, 'tpi': 0.30, 'slope': 0.15, 'height': 0.30 } else: weights = { 'proximity': 0.30, 'tpi': 0.35, 'slope': 0.20, 'height': 0.15 } # Base vulnerability base_vulnerability = ( weights['proximity'] * proximity_score + weights['tpi'] * tpi_score + weights['slope'] * slope_score + weights['height'] * height_score ) # Basement as multiplier if basement < 0: basement_multiplier = 1.0 + (abs(basement) * 0.15) base_vulnerability = min(1.0, base_vulnerability * basement_multiplier) # Apply context adjustment vulnerability_index = base_vulnerability * context_factor # Risk level based on final vulnerability_index with threshold mapping if vulnerability_index >= 0.80: final_risk = 'very_high' elif vulnerability_index >= 0.65: final_risk = 'high' elif vulnerability_index >= 0.40: final_risk = 'moderate' elif vulnerability_index >= 0.20: final_risk = 'low' else: final_risk = 'very_low' # Keep context-based label if more severe risk_levels_order = ['very_low', 'low', 'moderate', 'high', 'very_high'] context_severity = risk_levels_order.index(context_risk_level) if context_risk_level in risk_levels_order else 2 final_severity = risk_levels_order.index(final_risk) risk_level = risk_levels_order[max(context_severity, final_severity)] # Track component scores for SHAP components = { 'proximity_score': proximity_score, 'tpi_score': tpi_score, 'slope_score': slope_score, 'height_score': height_score, 'elevation': elevation } # Calculate uncertainty uncertainty_analysis = calculate_uncertainty( terrain_metrics, water_distance, context_factor, lat, lon ) # Calculate confidence interval confidence_interval = calculate_confidence_interval( vulnerability_index, uncertainty_analysis['uncertainty'] ) return { 'vulnerability_index': round(vulnerability_index, 3), 'confidence_interval': confidence_interval, 'risk_level': risk_level, 'distance_to_water_m': round(water_distance, 1) if water_distance else None, 'elevation_m': elevation, 'relative_elevation_m': round(tpi, 2) if tpi is not None else None, 'slope_degrees': round(slope, 2) if slope is not None else None, 'uncertainty_analysis': uncertainty_analysis, 'components': components } def calculate_uncertainty(terrain_metrics, water_distance, context_factor, lat, lon): """ Physically-based uncertainty quantification - FIXED scaling """ uncertainties = {} # 1. ELEVATION UNCERTAINTY elevation = terrain_metrics.get('elevation') slope = terrain_metrics.get('slope') or 0 if elevation is None: uncertainties['elevation'] = 0.15 else: # Base DEM error in meters if abs(lat) < 60: base_error_m = 2.5 else: base_error_m = 4.0 # Slope increases error if slope > 15: slope_multiplier = 1 + (slope - 15) / 30 base_error_m *= slope_multiplier # Convert to normalized uncertainty if elevation < 10: uncertainties['elevation'] = 0.08 # coastal - elevation matters a lot elif elevation < 100: uncertainties['elevation'] = 0.06 # low inland else: uncertainties['elevation'] = 0.03 # elevated - less critical # 2. TPI UNCERTAINTY tpi = terrain_metrics.get('tpi') if tpi is None: uncertainties['tpi'] = 0.12 else: # TPI uncertainty affects the depression detection if abs(tpi) < 2: uncertainties['tpi'] = 0.10 # near-flat, hard to classify elif abs(tpi) < 5: uncertainties['tpi'] = 0.06 else: uncertainties['tpi'] = 0.04 # clear depression/ridge # 3. SLOPE UNCERTAINTY if slope is None: uncertainties['slope'] = 0.10 else: if slope < 2: uncertainties['slope'] = 0.08 # very flat = uncertain elif slope < 10: uncertainties['slope'] = 0.04 else: uncertainties['slope'] = 0.03 # steep = clear signal # 4. WATER DISTANCE UNCERTAINTY if water_distance is None: uncertainties['water_proximity'] = 0.20 elif water_distance < 50: uncertainties['water_proximity'] = 0.03 elif water_distance < 500: uncertainties['water_proximity'] = 0.06 elif water_distance < 2000: uncertainties['water_proximity'] = 0.10 else: uncertainties['water_proximity'] = 0.15 # 5. CONTEXT UNCERTAINTY if context_factor < 0.7: uncertainties['context'] = 0.04 elif context_factor > 0.95: uncertainties['context'] = 0.06 else: uncertainties['context'] = 0.03 # 6. MODEL STRUCTURAL UNCERTAINTY uncertainties['model'] = 0.08 # Weight by component importance in vulnerability calculation weights = { 'elevation': 0.20, 'tpi': 0.30, 'slope': 0.15, 'water_proximity': 0.25, 'context': 0.05, 'model': 0.05 } # Weighted root-sum-of-squares weighted_variance = sum(weights[k] * (v ** 2) for k, v in uncertainties.items()) total_uncertainty = np.sqrt(weighted_variance) # Additional damping factor total_uncertainty *= 0.7 # empirical adjustment confidence = max(0.0, min(1.0, 1.0 - total_uncertainty)) # Get dominant error sources sorted_uncertainties = sorted(uncertainties.items(), key=lambda x: x[1], reverse=True) dominant_sources = sorted_uncertainties[:3] return { 'confidence': round(confidence, 3), 'uncertainty': round(total_uncertainty, 3), 'components': {k: round(v, 3) for k, v in uncertainties.items()}, 'interpretation': interpret_confidence(confidence), 'data_quality_flags': get_quality_flags(terrain_metrics, water_distance), 'dominant_error_sources': dominant_sources } def get_quality_flags(terrain_metrics, water_distance): """ Identify specific data quality issues """ flags = [] if terrain_metrics.get('elevation') is None: flags.append('missing_elevation') if terrain_metrics.get('tpi') is None: flags.append('missing_tpi') if terrain_metrics.get('slope') is None: flags.append('missing_slope') if water_distance is None: flags.append('water_distance_unknown') elif water_distance > 5000: flags.append('far_from_water_search_limited') elevation = terrain_metrics.get('elevation') or 0 slope = terrain_metrics.get('slope') or 0 if slope > 20: flags.append('steep_terrain_dem_error_high') if elevation < 1 and water_distance is not None and water_distance < 100: flags.append('coastal_surge_risk_not_modeled') return flags def interpret_confidence(confidence): """ Realistic confidence interpretation """ if confidence >= 0.85: return "High confidence - complete terrain data with low uncertainty" elif confidence >= 0.75: return "Good confidence - reliable data sources available" elif confidence >= 0.65: return "Moderate confidence - some data limitations present" elif confidence >= 0.50: return "Fair confidence - significant data gaps or measurement uncertainty" else: return "Low confidence - substantial missing data, use with caution" def calculate_confidence_interval(vulnerability_index, uncertainty): """ Calculate 95% confidence interval with proper bounds """ margin = 1.96 * uncertainty # Clip to valid 0-1 range lower = max(0.0, vulnerability_index - margin) upper = min(1.0, vulnerability_index + margin) return { 'point_estimate': round(vulnerability_index, 3), 'lower_bound_95': round(lower, 3), 'upper_bound_95': round(upper, 3), 'margin_of_error': round(margin, 3) } def calculate_multi_hazard_vulnerability(lat, lon, height, basement, terrain_metrics, water_distance): """ Multi-hazard assessment """ # Base assessment base_result = calculate_vulnerability_index( lat, lon, height, basement, terrain_metrics, water_distance ) elevation = terrain_metrics.get('elevation') or 0 # Coastal surge risk from spatial_queries import check_coastal is_coastal, coast_distance = check_coastal(lat, lon) # Guards against odd inputs if coast_distance is None or coast_distance < 0: coast_distance = 0.0 if elevation is None: raise ValueError("elevation is required") if elevation < 0: elevation = 0.0 if coast_distance < 5000: # Near coast — elevation governs risk if elevation < 2: coastal_risk = 0.99 elif elevation < 10: # Linear decline from 0.99 at 2 m coastal_risk = max(0.05, 0.99 + ((0.15 - 0.99) / 8.0) * (elevation - 2.0)) else: coastal_risk = 0.15 # Residual surge elif coast_distance < 20000: # Distance decay factor decay_factor = (coast_distance - 5000.0) / 15000.0 decay_factor = min(max(decay_factor, 0.0), 1.0) # Base residual distance_risk = 0.15 * (1.0 - decay_factor) # Elevation modifier elev_multiplier = 1.0 - (elevation / 10.0) elev_multiplier = min(max(elev_multiplier, 0.3), 1.0) coastal_risk = max(0.01, distance_risk * elev_multiplier) else: coastal_risk = 0.01 # Minimal residual background # Safety clamp coastal_risk = min(max(coastal_risk, 0.0), 1.0) # Pluvial risk – global-friendly (refined) tpi = terrain_metrics.get('tpi') or 0 slope = terrain_metrics.get('slope') or 0 elev = elevation # Clamp inputs tpi_clamped = max(min(tpi, 10), -10) slope_clamped = max(min(slope, 10), 0) # TPI factor: -10 (deep depression) # Mild convexity topo_linear = 1.0 - (tpi_clamped + 10) / 20.0 topo_factor = max(0.0, min(1.0, topo_linear**0.9)) # Nonlinear drop slope_fraction = 1.0 - (slope_clamped / 10.0) slope_factor = max(0.0, min(1.0, slope_fraction**1.2)) # Elevation decay: if elev <= 200: elevation_decay = 1.0 elif elev <= 1000: # linear to 0.1 across 800 m elevation_decay = 1.0 - ((elev - 200) / 800.0) * 0.9 else: elevation_decay = 0.1 # Combine (weights are tunable) pluvial_risk = (topo_factor * 0.6 + slope_factor * 0.4) * elevation_decay # Clamp final risk pluvial_risk = min(max(pluvial_risk, 0.0), 1.0) # Combined hazard with adaptive weights if elevation < 10: # Coastal zone weights = {'fluvial': 0.3, 'coastal': 0.5, 'pluvial': 0.2} elif elevation < 100: # Low inland weights = {'fluvial': 0.5, 'coastal': 0.1, 'pluvial': 0.4} else: # Elevated weights = {'fluvial': 0.6, 'coastal': 0.0, 'pluvial': 0.4} combined = (base_result['vulnerability_index'] * weights['fluvial'] + coastal_risk * weights['coastal'] + pluvial_risk * weights['pluvial']) # Identify dominant hazard hazards = { 'fluvial_riverine': base_result['vulnerability_index'], 'coastal_surge': coastal_risk, 'pluvial_drainage': pluvial_risk } dominant = max(hazards, key=hazards.get) return { **base_result, 'hazard_breakdown': { 'fluvial_riverine': round(base_result['vulnerability_index'], 3), 'coastal_surge': round(coastal_risk, 3), 'pluvial_drainage': round(pluvial_risk, 3), 'combined_index': round(combined, 3) }, 'dominant_hazard': dominant }