import ee import geopandas as gpd from shapely.geometry import Point import requests import numpy as np from functools import lru_cache import warnings import json from pyproj import CRS, Transformer import time from datetime import datetime # Initialize GEE from gee_auth import initialize_gee warnings.filterwarnings("ignore", category=RuntimeWarning, module="shapely.measurement") # LAZY LOADING _RIVERS = None _LAKES = None def get_rivers(): """Lazy load rivers dataset""" global _RIVERS if _RIVERS is None: _RIVERS = gpd.read_file('data/natural_earth/ne_10m_rivers_lake_centerlines.shp') _RIVERS = _RIVERS[_RIVERS.geometry.is_valid].copy() print("✅ Rivers shapefile loaded") return _RIVERS def get_lakes(): """Lazy load lakes dataset""" global _LAKES if _LAKES is None: _LAKES = gpd.read_file('data/natural_earth/ne_10m_lakes.shp') _LAKES = _LAKES[_LAKES.geometry.is_valid].copy() print("✅ Lakes shapefile loaded") return _LAKES def get_terrain_metrics(lat, lon, buffer_m=500, force_dem=None): """ Extract DEM-based metrics with hierarchical fallback strategy. """ initialize_gee() if abs(lat) > 70: buffer_m = 100 try: if abs(lat) > 85: print(f"Polar region {lat},{lon} - no terrain data") return {'elevation': None, 'slope': None, 'tpi': None, 'mean_elevation': None, 'dem_source': None} point = ee.Geometry.Point([lon, lat]) region = point.buffer(buffer_m) # Hierarchical DEM selection OR forced DEM for validation if force_dem: dem, dem_source = _get_forced_dem(lat, lon, force_dem) if dem is None: # Forced DEM not available at this location return {'elevation': None, 'slope': None, 'tpi': None, 'mean_elevation': None, 'dem_source': None} else: dem, dem_source = _select_best_dem(lat, lon) if dem is None: print(f"All DEM sources failed for {lat},{lon}") return {'elevation': None, 'slope': None, 'tpi': None, 'mean_elevation': None, 'dem_source': None} # Point elevation with smaller buffer elevation_sample = dem.reduceRegion( reducer=ee.Reducer.mean(), geometry=point.buffer(15), scale=30, maxPixels=1e9, bestEffort=True ) elevation = elevation_sample.get('elevation').getInfo() if elevation is None: print(f"GEE elevation failed for {lat},{lon} using {dem_source}") return {'elevation': None, 'slope': None, 'tpi': None, 'mean_elevation': None, 'dem_source': dem_source} try: mean_elevation_sample = dem.reduceRegion( reducer=ee.Reducer.mean(), geometry=region, scale=30, maxPixels=1e9, bestEffort=True ) mean_elevation = mean_elevation_sample.get('elevation').getInfo() except Exception as me_err: print(f"GEE mean elev failed for {lat},{lon}: {me_err}") mean_elevation = None # Slope slope_img = ee.Terrain.slope(dem) slope_mean = None slope_max = None def safe_reduce(reducer_type): try: reducer = ee.Reducer.mean() if reducer_type == 'mean' else ee.Reducer.max() stats_dict = slope_img.reduceRegion( reducer=reducer, geometry=point.buffer(200), scale=30, maxPixels=1e9, bestEffort=True ) return stats_dict.get('slope').getInfo() except Exception as err: if "transform edge" not in str(err): print(f"GEE slope {reducer_type} failed for {lat},{lon}: {err}") return None slope_mean = safe_reduce('mean') slope_max = safe_reduce('max') if slope_max is not None and slope_mean is not None: if slope_max >= slope_mean * 1.8: slope = slope_max else: slope = slope_mean elif slope_mean is not None: slope = slope_mean elif slope_max is not None: slope = slope_max else: slope = None # TPI tpi = None if elevation is not None and mean_elevation is not None: try: tpi = float(elevation) - float(mean_elevation) except (ValueError, TypeError): tpi = None return { 'elevation': round(float(elevation), 2) if elevation is not None else None, 'slope': round(float(slope), 2) if slope is not None else None, 'tpi': round(float(tpi), 2) if tpi is not None else None, 'mean_elevation': round(float(mean_elevation), 2) if mean_elevation is not None else None, 'dem_source': dem_source } except Exception as e: print(f"GEE error for {lat},{lon}: {e}") return { 'elevation': None, 'slope': None, 'tpi': None, 'mean_elevation': None, 'dem_source': None } def _select_best_dem(lat, lon): """ Hierarchical DEM selection: prioritize highest-resolution DEM available. """ point = ee.Geometry.Point([lon, lat]) # Regional high-resolution DEMs # 1. USGS 3DEP 10m (USA) if -130 < lon < -60 and 20 < lat < 55: try: usgs_10m = ( ee.ImageCollection("USGS/3DEP/10m_collection") .filterBounds(point) .mosaic() ) # Dynamically detect elevation band elev_band = usgs_10m.bandNames().getInfo()[0] usgs_10m = usgs_10m.select(elev_band).rename("elevation") usgs_10m = usgs_10m.reproject(crs="EPSG:4326", scale=10) test = usgs_10m.reduceRegion( ee.Reducer.first(), point, 10, bestEffort=True ).get("elevation").getInfo() if test is not None: print(f"Using USGS 3DEP 10m for {lat},{lon}") return usgs_10m, "USGS_3DEP_10m_collection" except Exception: pass # Netherlands AHN2/3/ (0.5 m – best national DEM globally) if 50 < lat < 54 and 3 < lon < 8: # Priority: AHN3 > AHN2 try: # AHN3 (2014–2019) ahn3 = ee.ImageCollection("AHN/AHN3").select("DTM").mosaic() test = ahn3.reduceRegion( ee.Reducer.first(), point, 1, bestEffort=True ).get("DTM").getInfo() if test is not None: print(f"Using AHN3 0.5m DTM for {lat},{lon}") return ahn3.rename("elevation"), "AHN3_0.5m" except: pass try: # AHN2 (2012) ahn2 = ee.Image("AHN/AHN2_05M_INT").select("elevation") test = ahn2.reduceRegion( ee.Reducer.first(), point, 1, bestEffort=True ).get("elevation").getInfo() if test is not None: print(f"Using AHN2 0.5m DTM for {lat},{lon}") return ahn2, "AHN2_0.5m" except: pass # 3. UK Environment Agency Composite DTM/DSM (1m) if 49 < lat < 61 and -8 < lon < 3: try: ea = ee.Image("UK/EA/ENGLAND_1M_TERRAIN/2022") # Identify available elevation band bands = ea.bandNames().getInfo() elev_candidates = [b for b in bands if b.lower() in ["dtm", "elevation", "b1"]] if not elev_candidates: raise Exception("No valid elevation band found") elev_band = elev_candidates[0] # Reproject to WGS84 before sampling ea_reproj = ea.select(elev_band).reproject( crs="EPSG:4326", scale=2 ) test = ea_reproj.reduceRegion( reducer=ee.Reducer.first(), geometry=point, scale=2, bestEffort=True, maxPixels=1e9 ).get(elev_band).getInfo() if test is not None: print(f"Using UK EA DTM 1m for {lat},{lon}") return ea_reproj.rename("elevation"), "EA_UK_1m" except Exception as e: print(f"EA UK DEM failed for {lat},{lon}: {e}") pass # 4. Australia 5m DEM (LiDAR coastal & urban areas) if -45 < lat < -10 and 110 < lon < 155: try: aus_col = ee.ImageCollection("AU/GA/AUSTRALIA_5M_DEM") # Mosaic all tiles that intersect the point aus = aus_col.filterBounds(point).mosaic() elev_band = "elevation" test = aus.select(elev_band).reduceRegion( reducer=ee.Reducer.first(), geometry=point, scale=5, bestEffort=True, maxPixels=1e9 ).get(elev_band).getInfo() if test is not None: print(f"Using Australia 5m DEM for {lat},{lon}") return aus.select(elev_band), "Australia_5m" except Exception as e: print(f"AU DEM failed for {lat},{lon}: {e}") pass # Global 30m DEMs # 5. NASADEM if -56 <= lat <= 60: try: nasadem = ee.Image("NASA/NASADEM_HGT/001").select("elevation") test = nasadem.reduceRegion( ee.Reducer.first(), point, 30, bestEffort=True ).get("elevation").getInfo() if test is not None: print(f"Using NASADEM for {lat},{lon}") return nasadem, "NASADEM" except Exception: pass # 6. Copernicus GLO-30 try: cop = ee.ImageCollection("COPERNICUS/DEM/GLO30").mosaic().select("DEM").rename("elevation") test = cop.reduceRegion( ee.Reducer.first(), point, 30, bestEffort=True ).get("elevation").getInfo() if test is not None: print(f"Using Copernicus GLO-30 for {lat},{lon}") return cop, "Copernicus_GLO30" except Exception: pass # 7. ALOS World 3D-30m if abs(lat) <= 82: try: alos = ee.ImageCollection("JAXA/ALOS/AW3D30/V4_1").mosaic().select("AVE").rename("elevation") test = alos.reduceRegion( ee.Reducer.first(), point, 30, bestEffort=True ).get("elevation").getInfo() if test is not None: print(f"Using ALOS AW3D30 AVE for {lat},{lon}") return alos, 'ALOS_AW3D30_AVE' except Exception: pass # 8. SRTM fallback if -56 <= lat <= 60: try: srtm = ee.Image("USGS/SRTMGL1_003").select("elevation") test = srtm.reduceRegion( ee.Reducer.first(), point, 30, bestEffort=True ).get("elevation").getInfo() if test is not None: print(f"Using SRTM fallback for {lat},{lon}") return srtm, "SRTM_v3" except Exception: pass print(f"All DEM sources failed for {lat},{lon}") return None, None def _get_forced_dem(lat, lon, dem_name): """ Force specific DEM retrieval for validation studies. Returns None if DEM unavailable at location. """ point = ee.Geometry.Point([lon, lat]) # Map DEM names to their retrieval logic dem_map = { 'ALOS_AW3D30': lambda: ( ee.ImageCollection("JAXA/ALOS/AW3D30/V4_1").mosaic().select("AVE").rename("elevation"), 30 ), 'Copernicus_GLO30': lambda: ( ee.ImageCollection("COPERNICUS/DEM/GLO30").mosaic().select("DEM").rename("elevation"), 30 ), 'NASADEM': lambda: ( ee.Image("NASA/NASADEM_HGT/001").select("elevation"), 30 ), 'SRTM_v3': lambda: ( ee.Image("USGS/SRTMGL1_003").select("elevation"), 30 ), 'AHN3_0.5m': lambda: ( ee.ImageCollection("AHN/AHN3").select("DTM").mosaic().rename("elevation"), 1 ), 'AHN2_0.5m': lambda: ( ee.Image("AHN/AHN2_05M_INT").select("elevation"), 1 ), 'EA_UK_1m': lambda: ( ee.Image("UK/EA/ENGLAND_1M_TERRAIN/2022").select("dtm").reproject(crs="EPSG:4326", scale=2).rename("elevation"), 2 ), 'Australia_5m': lambda: ( ee.ImageCollection("AU/GA/AUSTRALIA_5M_DEM").filterBounds(point).mosaic().select("elevation"), 5 ), 'USGS_3DEP_10m_collection': lambda: ( ee.ImageCollection("USGS/3DEP/10m_collection").filterBounds(point).mosaic().select("elevation"), 10 ) } if dem_name not in dem_map: print(f"Unknown DEM name: {dem_name}") return None, None try: dem, scale = dem_map[dem_name]() # Test if data exists at this location test = dem.reduceRegion( ee.Reducer.first(), point, scale, bestEffort=True ).get("elevation").getInfo() if test is not None: print(f"Forced DEM {dem_name} available at {lat},{lon}") return dem, dem_name else: print(f"Forced DEM {dem_name} has no data at {lat},{lon}") return None, None except Exception as e: print(f"Failed to get forced DEM {dem_name} at {lat},{lon}: {e}") return None, None def is_significant_water_body(element): """ Determine if water feature is significant for flood risk assessment """ tags = element.get('tags', {}) name = tags.get('name', '') # Filter by name - fountains if name and ('fuente' in name.lower() or 'fountain' in name.lower() or 'fonte' in name.lower()): return False # Filter by water type tag water_type = tags.get('water', '') if water_type in ['fountain', 'reflecting_pool', 'pond', 'ornamental']: return False # Filter by amenity tag if tags.get('amenity') == 'fountain': return False # Check if it's a waterway (rivers/streams/canals are significant) if tags.get('waterway') in ['river', 'stream', 'canal', 'drain']: return True # Calculate approximate area for unnamed water bodies if tags.get('natural') == 'water' and 'geometry' in element: coords = element.get('geometry', []) if len(coords) >= 3: lons = [c['lon'] for c in coords] lats = [c['lat'] for c in coords] width = (max(lons) - min(lons)) * 111320 height = (max(lats) - min(lats)) * 111320 approx_area = width * height if approx_area < 500: return False if len(coords) < 10 and approx_area < 2000: return False # Natural water bodies with names (excluding fountains) if tags.get('natural') == 'water' and name: return True # Large unnamed water bodies if tags.get('natural') == 'water' and not name: coords = element.get('geometry', []) if len(coords) > 50: return True return False def distance_to_water_osm(lat, lon, radius_m=5000, timeout=20, retry_count=2): """ Query OpenStreetMap for nearby SIGNIFICANT water bodies with retry logic """ overpass_url = "http://overpass-api.de/api/interpreter" query = f""" [out:json][timeout:{timeout}]; ( way["natural"="water"](around:{radius_m},{lat},{lon}); way["waterway"="river"](around:{radius_m},{lat},{lon}); way["waterway"="canal"](around:{radius_m},{lat},{lon}); way["waterway"="stream"](around:{radius_m},{lat},{lon}); relation["natural"="water"](around:{radius_m},{lat},{lon}); way["natural"="bay"](around:{radius_m},{lat},{lon}); ); out geom; """ for attempt in range(retry_count): try: if not (-90 <= lat <= 90 and -180 <= lon <= 180): print(f"Invalid coords for OSM: {lat},{lon}") return None response = requests.post(overpass_url, data={'data': query}, timeout=timeout) if response.status_code == 429: print(f"OSM rate limited for {lat},{lon} - waiting {2 ** attempt}s") time.sleep(2 ** attempt) continue if response.status_code == 400: print(f"OSM 400 for {lat},{lon} - bad query") return None if response.status_code != 200: print(f"OSM HTTP {response.status_code} for {lat},{lon}") if attempt < retry_count - 1: time.sleep(1) continue return None if not response.text.strip(): print(f"OSM empty response for {lat},{lon}") return None try: data = response.json() except (json.JSONDecodeError, ValueError) as je: print(f"OSM JSON decode failed for {lat},{lon}: {je}") return None if not data.get('elements'): print(f"OSM no elements found for {lat},{lon}") return None point = Point(lon, lat) min_distance = float('inf') significant_features = [e for e in data['elements'] if is_significant_water_body(e)] if not significant_features and radius_m < 12500: print(f"Retrying {lat},{lon} with extended radius...") return distance_to_water_osm(lat, lon, radius_m=10000, timeout=timeout, retry_count=1) if not significant_features: print(f"OSM only ornamental features for {lat},{lon}") return None from shapely.geometry import LineString, Polygon for element in significant_features: if 'geometry' in element and len(element['geometry']) >= 2: coords = [(node['lon'], node['lat']) for node in element['geometry']] if element.get('tags', {}).get('waterway'): try: water_geom = LineString(coords) except Exception: continue else: try: water_geom = Polygon(coords) except: try: water_geom = LineString(coords) except: continue if not water_geom.is_valid: continue distance = point.distance(water_geom) * 111320 if not np.isnan(distance): min_distance = min(min_distance, distance) result = min_distance if min_distance != float('inf') else None if result is not None: print(f"OSM success for {lat},{lon}: {result:.1f}m") return result except requests.exceptions.Timeout: print(f"OSM timeout for {lat},{lon} (attempt {attempt + 1}/{retry_count})") if attempt < retry_count - 1: time.sleep(1) continue return None except Exception as e: print(f"OSM exception for {lat},{lon}: {e}") if attempt < retry_count - 1: time.sleep(1) continue return None return None def distance_to_water_static(lat, lon): """ Fallback: calculate distance to Natural Earth water bodies """ point = Point(lon, lat) utm_zone = int((lon + 180) / 6) + 1 hemisphere = 'north' if lat >= 0 else 'south' utm_crs = CRS.from_string(f"+proj=utm +zone={utm_zone} +{hemisphere} +datum=WGS84") transformer = Transformer.from_crs("EPSG:4326", utm_crs, always_xy=True) point_utm_coords = transformer.transform(lon, lat) point_utm = Point(point_utm_coords) try: # Use lazy-loaded datasets rivers_utm = get_rivers().to_crs(utm_crs) lakes_utm = get_lakes().to_crs(utm_crs) river_distances = rivers_utm.geometry.distance(point_utm) river_distances = river_distances[river_distances.notna()] min_river_dist = river_distances.min() if len(river_distances) > 0 else np.inf lake_distances = lakes_utm.geometry.distance(point_utm) lake_distances = lake_distances[lake_distances.notna()] min_lake_dist = lake_distances.min() if len(lake_distances) > 0 else np.inf min_dist = min(min_river_dist, min_lake_dist) result = min_dist if min_dist != np.inf else None if result is not None: print(f"Static fallback for {lat},{lon}: {result:.1f}m") else: print(f"Static fallback failed for {lat},{lon}") return result except Exception as p_err: print(f"Static distance error for {lat},{lon}: {p_err}") return None def check_coastal(lat, lon, timeout=15): """ Adaptive coastal detection: expands search radius until coastline is found. """ overpass_url = "http://overpass-api.de/api/interpreter" point = Point(lon, lat) # Sweep radii from 1 km to 5 km radii = [1000, 2000, 5000] print(f"[Coastal] Starting coastal search for {lat},{lon} ...") for r in radii: query = f""" [out:json][timeout:{timeout}]; ( way["natural"="coastline"](around:{r},{lat},{lon}); ); out geom; """ try: response = requests.post(overpass_url, data={'data': query}, timeout=timeout) if not response.text.strip(): continue try: data = response.json() except: continue if not data.get('elements'): print(f"[Coastal] No coastline found at {r} m") continue min_distance = float('inf') from shapely.geometry import LineString for element in data['elements']: if 'geometry' in element and len(element['geometry']) >= 2: coords = [(node['lon'], node['lat']) for node in element['geometry']] coastline = LineString(coords) distance = point.distance(coastline) * 111320 min_distance = min(min_distance, distance) if min_distance != float('inf'): print(f"Coastal detected for {lat},{lon}: {min_distance:.1f}m (radius={r})") return True, min_distance except Exception as e: print(f"[Coastal] Error at radius {r}: {e}") continue # If nothing is found print(f"[Coastal] No coastline detected for {lat},{lon}. Continuing with OSM water search.") return False, None @lru_cache(maxsize=1000) def distance_to_water(lat, lon): """ Combined water distance with caching for batch efficiency. Uses OSM first, then Natural Earth fallback. """ lat, lon = round(float(lat), 6), round(float(lon), 6) print(f"--- Water distance query for {lat},{lon} ---") # 1. Check coastal proximity try: is_coastal, coast_distance = check_coastal(lat, lon) if is_coastal and coast_distance is not None: print(f"Coastal detected for {lat},{lon}: {coast_distance:.1f} m") return coast_distance except Exception as e: print(f"Coastal check failed for {lat},{lon}: {e}") # 2. Try OSM query with retries for radius in [3000, 5000, 8000]: for attempt in range(3): try: print(f"OSM attempt {attempt + 1}/3 at radius {radius} m for {lat},{lon}") d = distance_to_water_osm(lat, lon, radius_m=radius) if d is not None: print(f"OSM success for {lat},{lon}: {d:.1f} m (radius={radius})") return d except Exception as e: print(f"OSM exception on attempt {attempt + 1} for {lat},{lon}: {e}") time.sleep(1.5) time.sleep(1.5) # 3. Static fallback try: d_static = distance_to_water_static(lat, lon) if d_static is not None: corrected = d_static * 0.7 print(f"Static fallback for {lat},{lon}: raw={d_static:.1f} m, corrected={corrected:.1f} m") return corrected else: print(f"Static fallback failed for {lat},{lon}") except Exception as e: print(f"Static distance error for {lat},{lon}: {e}") print(f"All water distance queries failed for {lat},{lon}") return None