flood-vulnerability / spatial_queries.py
adema5051's picture
Update spatial_queries.py
38e1ad2 verified
import ee
import geopandas as gpd
from shapely.geometry import Point
import requests
import numpy as np
from functools import lru_cache
import warnings
import json
from pyproj import CRS, Transformer
import time
from datetime import datetime
# Initialize GEE
from gee_auth import initialize_gee
warnings.filterwarnings("ignore", category=RuntimeWarning, module="shapely.measurement")
# LAZY LOADING
_RIVERS = None
_LAKES = None
def get_rivers():
"""Lazy load rivers dataset"""
global _RIVERS
if _RIVERS is None:
_RIVERS = gpd.read_file('data/natural_earth/ne_10m_rivers_lake_centerlines.shp')
_RIVERS = _RIVERS[_RIVERS.geometry.is_valid].copy()
print("βœ… Rivers shapefile loaded")
return _RIVERS
def get_lakes():
"""Lazy load lakes dataset"""
global _LAKES
if _LAKES is None:
_LAKES = gpd.read_file('data/natural_earth/ne_10m_lakes.shp')
_LAKES = _LAKES[_LAKES.geometry.is_valid].copy()
print("βœ… Lakes shapefile loaded")
return _LAKES
def get_terrain_metrics(lat, lon, buffer_m=500, force_dem=None):
"""
Extract DEM-based metrics with hierarchical fallback strategy.
"""
initialize_gee()
if abs(lat) > 70:
buffer_m = 100
try:
if abs(lat) > 85:
print(f"Polar region {lat},{lon} - no terrain data")
return {'elevation': None, 'slope': None, 'tpi': None, 'mean_elevation': None, 'dem_source': None}
point = ee.Geometry.Point([lon, lat])
region = point.buffer(buffer_m)
# Hierarchical DEM selection OR forced DEM for validation
if force_dem:
dem, dem_source = _get_forced_dem(lat, lon, force_dem)
if dem is None:
# Forced DEM not available at this location
return {'elevation': None, 'slope': None, 'tpi': None, 'mean_elevation': None, 'dem_source': None}
else:
dem, dem_source = _select_best_dem(lat, lon)
if dem is None:
print(f"All DEM sources failed for {lat},{lon}")
return {'elevation': None, 'slope': None, 'tpi': None, 'mean_elevation': None, 'dem_source': None}
# Point elevation with smaller buffer
elevation_sample = dem.reduceRegion(
reducer=ee.Reducer.mean(),
geometry=point.buffer(15),
scale=30,
maxPixels=1e9,
bestEffort=True
)
elevation = elevation_sample.get('elevation').getInfo()
if elevation is None:
print(f"GEE elevation failed for {lat},{lon} using {dem_source}")
return {'elevation': None, 'slope': None, 'tpi': None, 'mean_elevation': None, 'dem_source': dem_source}
try:
mean_elevation_sample = dem.reduceRegion(
reducer=ee.Reducer.mean(),
geometry=region,
scale=30,
maxPixels=1e9,
bestEffort=True
)
mean_elevation = mean_elevation_sample.get('elevation').getInfo()
except Exception as me_err:
print(f"GEE mean elev failed for {lat},{lon}: {me_err}")
mean_elevation = None
# Slope
slope_img = ee.Terrain.slope(dem)
slope_mean = None
slope_max = None
def safe_reduce(reducer_type):
try:
reducer = ee.Reducer.mean() if reducer_type == 'mean' else ee.Reducer.max()
stats_dict = slope_img.reduceRegion(
reducer=reducer,
geometry=point.buffer(200),
scale=30,
maxPixels=1e9,
bestEffort=True
)
return stats_dict.get('slope').getInfo()
except Exception as err:
if "transform edge" not in str(err):
print(f"GEE slope {reducer_type} failed for {lat},{lon}: {err}")
return None
slope_mean = safe_reduce('mean')
slope_max = safe_reduce('max')
if slope_max is not None and slope_mean is not None:
if slope_max >= slope_mean * 1.8:
slope = slope_max
else:
slope = slope_mean
elif slope_mean is not None:
slope = slope_mean
elif slope_max is not None:
slope = slope_max
else:
slope = None
# TPI
tpi = None
if elevation is not None and mean_elevation is not None:
try:
tpi = float(elevation) - float(mean_elevation)
except (ValueError, TypeError):
tpi = None
return {
'elevation': round(float(elevation), 2) if elevation is not None else None,
'slope': round(float(slope), 2) if slope is not None else None,
'tpi': round(float(tpi), 2) if tpi is not None else None,
'mean_elevation': round(float(mean_elevation), 2) if mean_elevation is not None else None,
'dem_source': dem_source
}
except Exception as e:
print(f"GEE error for {lat},{lon}: {e}")
return {
'elevation': None,
'slope': None,
'tpi': None,
'mean_elevation': None,
'dem_source': None
}
def _select_best_dem(lat, lon):
"""
Hierarchical DEM selection: prioritize highest-resolution DEM available.
"""
point = ee.Geometry.Point([lon, lat])
# Regional high-resolution DEMs
# 1. USGS 3DEP 10m (USA)
if -130 < lon < -60 and 20 < lat < 55:
try:
usgs_10m = (
ee.ImageCollection("USGS/3DEP/10m_collection")
.filterBounds(point)
.mosaic()
)
# Dynamically detect elevation band
elev_band = usgs_10m.bandNames().getInfo()[0]
usgs_10m = usgs_10m.select(elev_band).rename("elevation")
usgs_10m = usgs_10m.reproject(crs="EPSG:4326", scale=10)
test = usgs_10m.reduceRegion(
ee.Reducer.first(),
point,
10,
bestEffort=True
).get("elevation").getInfo()
if test is not None:
print(f"Using USGS 3DEP 10m for {lat},{lon}")
return usgs_10m, "USGS_3DEP_10m_collection"
except Exception:
pass
# Netherlands AHN2/3/ (0.5 m – best national DEM globally)
if 50 < lat < 54 and 3 < lon < 8:
# Priority: AHN3 > AHN2
try:
# AHN3 (2014–2019)
ahn3 = ee.ImageCollection("AHN/AHN3").select("DTM").mosaic()
test = ahn3.reduceRegion(
ee.Reducer.first(), point, 1, bestEffort=True
).get("DTM").getInfo()
if test is not None:
print(f"Using AHN3 0.5m DTM for {lat},{lon}")
return ahn3.rename("elevation"), "AHN3_0.5m"
except:
pass
try:
# AHN2 (2012)
ahn2 = ee.Image("AHN/AHN2_05M_INT").select("elevation")
test = ahn2.reduceRegion(
ee.Reducer.first(), point, 1, bestEffort=True
).get("elevation").getInfo()
if test is not None:
print(f"Using AHN2 0.5m DTM for {lat},{lon}")
return ahn2, "AHN2_0.5m"
except:
pass
# 3. UK Environment Agency Composite DTM/DSM (1m)
if 49 < lat < 61 and -8 < lon < 3:
try:
ea = ee.Image("UK/EA/ENGLAND_1M_TERRAIN/2022")
# Identify available elevation band
bands = ea.bandNames().getInfo()
elev_candidates = [b for b in bands if b.lower() in ["dtm", "elevation", "b1"]]
if not elev_candidates:
raise Exception("No valid elevation band found")
elev_band = elev_candidates[0]
# Reproject to WGS84 before sampling
ea_reproj = ea.select(elev_band).reproject(
crs="EPSG:4326",
scale=2
)
test = ea_reproj.reduceRegion(
reducer=ee.Reducer.first(),
geometry=point,
scale=2,
bestEffort=True,
maxPixels=1e9
).get(elev_band).getInfo()
if test is not None:
print(f"Using UK EA DTM 1m for {lat},{lon}")
return ea_reproj.rename("elevation"), "EA_UK_1m"
except Exception as e:
print(f"EA UK DEM failed for {lat},{lon}: {e}")
pass
# 4. Australia 5m DEM (LiDAR coastal & urban areas)
if -45 < lat < -10 and 110 < lon < 155:
try:
aus_col = ee.ImageCollection("AU/GA/AUSTRALIA_5M_DEM")
# Mosaic all tiles that intersect the point
aus = aus_col.filterBounds(point).mosaic()
elev_band = "elevation"
test = aus.select(elev_band).reduceRegion(
reducer=ee.Reducer.first(),
geometry=point,
scale=5,
bestEffort=True,
maxPixels=1e9
).get(elev_band).getInfo()
if test is not None:
print(f"Using Australia 5m DEM for {lat},{lon}")
return aus.select(elev_band), "Australia_5m"
except Exception as e:
print(f"AU DEM failed for {lat},{lon}: {e}")
pass
# Global 30m DEMs
# 5. NASADEM
if -56 <= lat <= 60:
try:
nasadem = ee.Image("NASA/NASADEM_HGT/001").select("elevation")
test = nasadem.reduceRegion(
ee.Reducer.first(), point, 30, bestEffort=True
).get("elevation").getInfo()
if test is not None:
print(f"Using NASADEM for {lat},{lon}")
return nasadem, "NASADEM"
except Exception:
pass
# 6. Copernicus GLO-30
try:
cop = ee.ImageCollection("COPERNICUS/DEM/GLO30").mosaic().select("DEM").rename("elevation")
test = cop.reduceRegion(
ee.Reducer.first(), point, 30, bestEffort=True
).get("elevation").getInfo()
if test is not None:
print(f"Using Copernicus GLO-30 for {lat},{lon}")
return cop, "Copernicus_GLO30"
except Exception:
pass
# 7. ALOS World 3D-30m
if abs(lat) <= 82:
try:
alos = ee.ImageCollection("JAXA/ALOS/AW3D30/V4_1").mosaic().select("AVE").rename("elevation")
test = alos.reduceRegion(
ee.Reducer.first(), point, 30, bestEffort=True
).get("elevation").getInfo()
if test is not None:
print(f"Using ALOS AW3D30 AVE for {lat},{lon}")
return alos, 'ALOS_AW3D30_AVE'
except Exception:
pass
# 8. SRTM fallback
if -56 <= lat <= 60:
try:
srtm = ee.Image("USGS/SRTMGL1_003").select("elevation")
test = srtm.reduceRegion(
ee.Reducer.first(), point, 30, bestEffort=True
).get("elevation").getInfo()
if test is not None:
print(f"Using SRTM fallback for {lat},{lon}")
return srtm, "SRTM_v3"
except Exception:
pass
print(f"All DEM sources failed for {lat},{lon}")
return None, None
def _get_forced_dem(lat, lon, dem_name):
"""
Force specific DEM retrieval for validation studies.
Returns None if DEM unavailable at location.
"""
point = ee.Geometry.Point([lon, lat])
# Map DEM names to their retrieval logic
dem_map = {
'ALOS_AW3D30': lambda: (
ee.ImageCollection("JAXA/ALOS/AW3D30/V4_1").mosaic().select("AVE").rename("elevation"),
30
),
'Copernicus_GLO30': lambda: (
ee.ImageCollection("COPERNICUS/DEM/GLO30").mosaic().select("DEM").rename("elevation"),
30
),
'NASADEM': lambda: (
ee.Image("NASA/NASADEM_HGT/001").select("elevation"),
30
),
'SRTM_v3': lambda: (
ee.Image("USGS/SRTMGL1_003").select("elevation"),
30
),
'AHN3_0.5m': lambda: (
ee.ImageCollection("AHN/AHN3").select("DTM").mosaic().rename("elevation"),
1
),
'AHN2_0.5m': lambda: (
ee.Image("AHN/AHN2_05M_INT").select("elevation"),
1
),
'EA_UK_1m': lambda: (
ee.Image("UK/EA/ENGLAND_1M_TERRAIN/2022").select("dtm").reproject(crs="EPSG:4326", scale=2).rename("elevation"),
2
),
'Australia_5m': lambda: (
ee.ImageCollection("AU/GA/AUSTRALIA_5M_DEM").filterBounds(point).mosaic().select("elevation"),
5
),
'USGS_3DEP_10m_collection': lambda: (
ee.ImageCollection("USGS/3DEP/10m_collection").filterBounds(point).mosaic().select("elevation"),
10
)
}
if dem_name not in dem_map:
print(f"Unknown DEM name: {dem_name}")
return None, None
try:
dem, scale = dem_map[dem_name]()
# Test if data exists at this location
test = dem.reduceRegion(
ee.Reducer.first(),
point,
scale,
bestEffort=True
).get("elevation").getInfo()
if test is not None:
print(f"Forced DEM {dem_name} available at {lat},{lon}")
return dem, dem_name
else:
print(f"Forced DEM {dem_name} has no data at {lat},{lon}")
return None, None
except Exception as e:
print(f"Failed to get forced DEM {dem_name} at {lat},{lon}: {e}")
return None, None
def is_significant_water_body(element):
"""
Determine if water feature is significant for flood risk assessment
"""
tags = element.get('tags', {})
name = tags.get('name', '')
# Filter by name - fountains
if name and ('fuente' in name.lower() or 'fountain' in name.lower() or
'fonte' in name.lower()):
return False
# Filter by water type tag
water_type = tags.get('water', '')
if water_type in ['fountain', 'reflecting_pool', 'pond', 'ornamental']:
return False
# Filter by amenity tag
if tags.get('amenity') == 'fountain':
return False
# Check if it's a waterway (rivers/streams/canals are significant)
if tags.get('waterway') in ['river', 'stream', 'canal', 'drain']:
return True
# Calculate approximate area for unnamed water bodies
if tags.get('natural') == 'water' and 'geometry' in element:
coords = element.get('geometry', [])
if len(coords) >= 3:
lons = [c['lon'] for c in coords]
lats = [c['lat'] for c in coords]
width = (max(lons) - min(lons)) * 111320
height = (max(lats) - min(lats)) * 111320
approx_area = width * height
if approx_area < 500:
return False
if len(coords) < 10 and approx_area < 2000:
return False
# Natural water bodies with names (excluding fountains)
if tags.get('natural') == 'water' and name:
return True
# Large unnamed water bodies
if tags.get('natural') == 'water' and not name:
coords = element.get('geometry', [])
if len(coords) > 50:
return True
return False
def distance_to_water_osm(lat, lon, radius_m=5000, timeout=20, retry_count=2):
"""
Query OpenStreetMap for nearby SIGNIFICANT water bodies with retry logic
"""
overpass_url = "http://overpass-api.de/api/interpreter"
query = f"""
[out:json][timeout:{timeout}];
(
way["natural"="water"](around:{radius_m},{lat},{lon});
way["waterway"="river"](around:{radius_m},{lat},{lon});
way["waterway"="canal"](around:{radius_m},{lat},{lon});
way["waterway"="stream"](around:{radius_m},{lat},{lon});
relation["natural"="water"](around:{radius_m},{lat},{lon});
way["natural"="bay"](around:{radius_m},{lat},{lon});
);
out geom;
"""
for attempt in range(retry_count):
try:
if not (-90 <= lat <= 90 and -180 <= lon <= 180):
print(f"Invalid coords for OSM: {lat},{lon}")
return None
response = requests.post(overpass_url, data={'data': query}, timeout=timeout)
if response.status_code == 429:
print(f"OSM rate limited for {lat},{lon} - waiting {2 ** attempt}s")
time.sleep(2 ** attempt)
continue
if response.status_code == 400:
print(f"OSM 400 for {lat},{lon} - bad query")
return None
if response.status_code != 200:
print(f"OSM HTTP {response.status_code} for {lat},{lon}")
if attempt < retry_count - 1:
time.sleep(1)
continue
return None
if not response.text.strip():
print(f"OSM empty response for {lat},{lon}")
return None
try:
data = response.json()
except (json.JSONDecodeError, ValueError) as je:
print(f"OSM JSON decode failed for {lat},{lon}: {je}")
return None
if not data.get('elements'):
print(f"OSM no elements found for {lat},{lon}")
return None
point = Point(lon, lat)
min_distance = float('inf')
significant_features = [e for e in data['elements'] if is_significant_water_body(e)]
if not significant_features and radius_m < 12500:
print(f"Retrying {lat},{lon} with extended radius...")
return distance_to_water_osm(lat, lon, radius_m=10000, timeout=timeout, retry_count=1)
if not significant_features:
print(f"OSM only ornamental features for {lat},{lon}")
return None
from shapely.geometry import LineString, Polygon
for element in significant_features:
if 'geometry' in element and len(element['geometry']) >= 2:
coords = [(node['lon'], node['lat']) for node in element['geometry']]
if element.get('tags', {}).get('waterway'):
try:
water_geom = LineString(coords)
except Exception:
continue
else:
try:
water_geom = Polygon(coords)
except:
try:
water_geom = LineString(coords)
except:
continue
if not water_geom.is_valid:
continue
distance = point.distance(water_geom) * 111320
if not np.isnan(distance):
min_distance = min(min_distance, distance)
result = min_distance if min_distance != float('inf') else None
if result is not None:
print(f"OSM success for {lat},{lon}: {result:.1f}m")
return result
except requests.exceptions.Timeout:
print(f"OSM timeout for {lat},{lon} (attempt {attempt + 1}/{retry_count})")
if attempt < retry_count - 1:
time.sleep(1)
continue
return None
except Exception as e:
print(f"OSM exception for {lat},{lon}: {e}")
if attempt < retry_count - 1:
time.sleep(1)
continue
return None
return None
def distance_to_water_static(lat, lon):
"""
Fallback: calculate distance to Natural Earth water bodies
"""
point = Point(lon, lat)
utm_zone = int((lon + 180) / 6) + 1
hemisphere = 'north' if lat >= 0 else 'south'
utm_crs = CRS.from_string(f"+proj=utm +zone={utm_zone} +{hemisphere} +datum=WGS84")
transformer = Transformer.from_crs("EPSG:4326", utm_crs, always_xy=True)
point_utm_coords = transformer.transform(lon, lat)
point_utm = Point(point_utm_coords)
try:
# Use lazy-loaded datasets
rivers_utm = get_rivers().to_crs(utm_crs)
lakes_utm = get_lakes().to_crs(utm_crs)
river_distances = rivers_utm.geometry.distance(point_utm)
river_distances = river_distances[river_distances.notna()]
min_river_dist = river_distances.min() if len(river_distances) > 0 else np.inf
lake_distances = lakes_utm.geometry.distance(point_utm)
lake_distances = lake_distances[lake_distances.notna()]
min_lake_dist = lake_distances.min() if len(lake_distances) > 0 else np.inf
min_dist = min(min_river_dist, min_lake_dist)
result = min_dist if min_dist != np.inf else None
if result is not None:
print(f"Static fallback for {lat},{lon}: {result:.1f}m")
else:
print(f"Static fallback failed for {lat},{lon}")
return result
except Exception as p_err:
print(f"Static distance error for {lat},{lon}: {p_err}")
return None
def check_coastal(lat, lon, timeout=15):
"""
Adaptive coastal detection: expands search radius until coastline is found.
"""
overpass_url = "http://overpass-api.de/api/interpreter"
point = Point(lon, lat)
# Sweep radii from 1 km to 5 km
radii = [1000, 2000, 5000]
print(f"[Coastal] Starting coastal search for {lat},{lon} ...")
for r in radii:
query = f"""
[out:json][timeout:{timeout}];
(
way["natural"="coastline"](around:{r},{lat},{lon});
);
out geom;
"""
try:
response = requests.post(overpass_url, data={'data': query}, timeout=timeout)
if not response.text.strip():
continue
try:
data = response.json()
except:
continue
if not data.get('elements'):
print(f"[Coastal] No coastline found at {r} m")
continue
min_distance = float('inf')
from shapely.geometry import LineString
for element in data['elements']:
if 'geometry' in element and len(element['geometry']) >= 2:
coords = [(node['lon'], node['lat']) for node in element['geometry']]
coastline = LineString(coords)
distance = point.distance(coastline) * 111320
min_distance = min(min_distance, distance)
if min_distance != float('inf'):
print(f"Coastal detected for {lat},{lon}: {min_distance:.1f}m (radius={r})")
return True, min_distance
except Exception as e:
print(f"[Coastal] Error at radius {r}: {e}")
continue
# If nothing is found
print(f"[Coastal] No coastline detected for {lat},{lon}. Continuing with OSM water search.")
return False, None
@lru_cache(maxsize=1000)
def distance_to_water(lat, lon):
"""
Combined water distance with caching for batch efficiency.
Uses OSM first, then Natural Earth fallback.
"""
lat, lon = round(float(lat), 6), round(float(lon), 6)
print(f"--- Water distance query for {lat},{lon} ---")
# 1. Check coastal proximity
try:
is_coastal, coast_distance = check_coastal(lat, lon)
if is_coastal and coast_distance is not None:
print(f"Coastal detected for {lat},{lon}: {coast_distance:.1f} m")
return coast_distance
except Exception as e:
print(f"Coastal check failed for {lat},{lon}: {e}")
# 2. Try OSM query with retries
for radius in [3000, 5000, 8000]:
for attempt in range(3):
try:
print(f"OSM attempt {attempt + 1}/3 at radius {radius} m for {lat},{lon}")
d = distance_to_water_osm(lat, lon, radius_m=radius)
if d is not None:
print(f"OSM success for {lat},{lon}: {d:.1f} m (radius={radius})")
return d
except Exception as e:
print(f"OSM exception on attempt {attempt + 1} for {lat},{lon}: {e}")
time.sleep(1.5)
time.sleep(1.5)
# 3. Static fallback
try:
d_static = distance_to_water_static(lat, lon)
if d_static is not None:
corrected = d_static * 0.7
print(f"Static fallback for {lat},{lon}: raw={d_static:.1f} m, corrected={corrected:.1f} m")
return corrected
else:
print(f"Static fallback failed for {lat},{lon}")
except Exception as e:
print(f"Static distance error for {lat},{lon}: {e}")
print(f"All water distance queries failed for {lat},{lon}")
return None