flood-vulnerability / spatial_queries.py
adema5051's picture
Upload 10 files
a359779 verified
raw
history blame
26.6 kB
import ee
import geopandas as gpd
from shapely.geometry import Point
import requests
import numpy as np
from functools import lru_cache
import warnings
import json
from pyproj import CRS, Transformer
import time
from datetime import datetime
# Initialize GEE
from gee_auth import initialize_gee
# Suppress shapely distance warnings
warnings.filterwarnings("ignore", category=RuntimeWarning, module="shapely.measurement")
# LAZY LOADING
_RIVERS = None
_LAKES = None
def get_rivers():
"""Lazy load rivers dataset"""
global _RIVERS
if _RIVERS is None:
_RIVERS = gpd.read_file('data/natural_earth/ne_10m_rivers_lake_centerlines.shp')
_RIVERS = _RIVERS[_RIVERS.geometry.is_valid].copy()
print("✅ Rivers shapefile loaded")
return _RIVERS
def get_lakes():
"""Lazy load lakes dataset"""
global _LAKES
if _LAKES is None:
_LAKES = gpd.read_file('data/natural_earth/ne_10m_lakes.shp')
_LAKES = _LAKES[_LAKES.geometry.is_valid].copy()
print("✅ Lakes shapefile loaded")
return _LAKES
def get_terrain_metrics(lat, lon, buffer_m=500, force_dem=None):
"""
Extract DEM-based metrics with hierarchical fallback strategy.
"""
initialize_gee()
if abs(lat) > 70:
buffer_m = 100
try:
if abs(lat) > 85:
print(f"Polar region {lat},{lon} - no terrain data")
return {'elevation': None, 'slope': None, 'tpi': None, 'mean_elevation': None, 'dem_source': None}
point = ee.Geometry.Point([lon, lat])
region = point.buffer(buffer_m)
# Hierarchical DEM selection OR forced DEM for validation
if force_dem:
dem, dem_source = _get_forced_dem(lat, lon, force_dem)
if dem is None:
# Forced DEM not available at this location
return {'elevation': None, 'slope': None, 'tpi': None, 'mean_elevation': None, 'dem_source': None}
else:
dem, dem_source = _select_best_dem(lat, lon)
if dem is None:
print(f"All DEM sources failed for {lat},{lon}")
return {'elevation': None, 'slope': None, 'tpi': None, 'mean_elevation': None, 'dem_source': None}
# Point elevation with smaller buffer
elevation_sample = dem.reduceRegion(
reducer=ee.Reducer.mean(),
geometry=point.buffer(15),
scale=30,
maxPixels=1e9,
bestEffort=True
)
elevation = elevation_sample.get('elevation').getInfo()
if elevation is None:
print(f"GEE elevation failed for {lat},{lon} using {dem_source}")
return {'elevation': None, 'slope': None, 'tpi': None, 'mean_elevation': None, 'dem_source': dem_source}
try:
mean_elevation_sample = dem.reduceRegion(
reducer=ee.Reducer.mean(),
geometry=region,
scale=30,
maxPixels=1e9,
bestEffort=True
)
mean_elevation = mean_elevation_sample.get('elevation').getInfo()
except Exception as me_err:
print(f"GEE mean elev failed for {lat},{lon}: {me_err}")
mean_elevation = None
# Slope
slope_img = ee.Terrain.slope(dem)
slope_mean = None
slope_max = None
def safe_reduce(reducer_type):
try:
reducer = ee.Reducer.mean() if reducer_type == 'mean' else ee.Reducer.max()
stats_dict = slope_img.reduceRegion(
reducer=reducer,
geometry=point.buffer(200),
scale=30,
maxPixels=1e9,
bestEffort=True
)
return stats_dict.get('slope').getInfo()
except Exception as err:
if "transform edge" not in str(err):
print(f"GEE slope {reducer_type} failed for {lat},{lon}: {err}")
return None
slope_mean = safe_reduce('mean')
slope_max = safe_reduce('max')
if slope_max is not None and slope_mean is not None:
if slope_max >= slope_mean * 1.8:
slope = slope_max
else:
slope = slope_mean
elif slope_mean is not None:
slope = slope_mean
elif slope_max is not None:
slope = slope_max
else:
slope = None
# TPI
tpi = None
if elevation is not None and mean_elevation is not None:
try:
tpi = float(elevation) - float(mean_elevation)
except (ValueError, TypeError):
tpi = None
return {
'elevation': round(float(elevation), 2) if elevation is not None else None,
'slope': round(float(slope), 2) if slope is not None else None,
'tpi': round(float(tpi), 2) if tpi is not None else None,
'mean_elevation': round(float(mean_elevation), 2) if mean_elevation is not None else None,
'dem_source': dem_source
}
except Exception as e:
print(f"GEE error for {lat},{lon}: {e}")
return {
'elevation': None,
'slope': None,
'tpi': None,
'mean_elevation': None,
'dem_source': None
}
def _select_best_dem(lat, lon):
"""
Hierarchical DEM selection: prioritize highest-resolution DEM available.
"""
point = ee.Geometry.Point([lon, lat])
# Regional high-resolution DEMs
# 1. USGS 3DEP 10m (USA)
if -130 < lon < -60 and 20 < lat < 55:
try:
usgs_10m = (
ee.ImageCollection("USGS/3DEP/10m_collection")
.filterBounds(point)
.mosaic()
)
# Dynamically detect elevation band
elev_band = usgs_10m.bandNames().getInfo()[0]
usgs_10m = usgs_10m.select(elev_band).rename("elevation")
usgs_10m = usgs_10m.reproject(crs="EPSG:4326", scale=10)
test = usgs_10m.reduceRegion(
ee.Reducer.first(),
point,
10,
bestEffort=True
).get("elevation").getInfo()
if test is not None:
print(f"Using USGS 3DEP 10m for {lat},{lon}")
return usgs_10m, "USGS_3DEP_10m_collection"
except Exception:
pass
# Netherlands AHN2/3/ (0.5 m – best national DEM globally)
if 50 < lat < 54 and 3 < lon < 8:
# Priority: AHN3 > AHN2
try:
# AHN3 (2014–2019)
ahn3 = ee.ImageCollection("AHN/AHN3").select("DTM").mosaic()
test = ahn3.reduceRegion(
ee.Reducer.first(), point, 1, bestEffort=True
).get("DTM").getInfo()
if test is not None:
print(f"Using AHN3 0.5m DTM for {lat},{lon}")
return ahn3.rename("elevation"), "AHN3_0.5m"
except:
pass
try:
# AHN2 (2012)
ahn2 = ee.Image("AHN/AHN2_05M_INT").select("elevation")
test = ahn2.reduceRegion(
ee.Reducer.first(), point, 1, bestEffort=True
).get("elevation").getInfo()
if test is not None:
print(f"Using AHN2 0.5m DTM for {lat},{lon}")
return ahn2, "AHN2_0.5m"
except:
pass
# 3. UK Environment Agency Composite DTM/DSM (1m)
if 49 < lat < 61 and -8 < lon < 3:
try:
ea = ee.Image("UK/EA/ENGLAND_1M_TERRAIN/2022")
# Identify available elevation band
bands = ea.bandNames().getInfo()
elev_candidates = [b for b in bands if b.lower() in ["dtm", "elevation", "b1"]]
if not elev_candidates:
raise Exception("No valid elevation band found")
elev_band = elev_candidates[0]
# Reproject to WGS84 before sampling
ea_reproj = ea.select(elev_band).reproject(
crs="EPSG:4326",
scale=2
)
test = ea_reproj.reduceRegion(
reducer=ee.Reducer.first(),
geometry=point,
scale=2,
bestEffort=True,
maxPixels=1e9
).get(elev_band).getInfo()
if test is not None:
print(f"Using UK EA DTM 1m for {lat},{lon}")
return ea_reproj.rename("elevation"), "EA_UK_1m"
except Exception as e:
print(f"EA UK DEM failed for {lat},{lon}: {e}")
pass
# 4. Australia 5m DEM (LiDAR coastal & urban areas)
if -45 < lat < -10 and 110 < lon < 155:
try:
aus_col = ee.ImageCollection("AU/GA/AUSTRALIA_5M_DEM")
# Mosaic all tiles that intersect the point
aus = aus_col.filterBounds(point).mosaic()
elev_band = "elevation"
test = aus.select(elev_band).reduceRegion(
reducer=ee.Reducer.first(),
geometry=point,
scale=5,
bestEffort=True,
maxPixels=1e9
).get(elev_band).getInfo()
if test is not None:
print(f"Using Australia 5m DEM for {lat},{lon}")
return aus.select(elev_band), "Australia_5m"
except Exception as e:
print(f"AU DEM failed for {lat},{lon}: {e}")
pass
# Global 30m DEMs
# 5. NASADEM
if -56 <= lat <= 60:
try:
nasadem = ee.Image("NASA/NASADEM_HGT/001").select("elevation")
test = nasadem.reduceRegion(
ee.Reducer.first(), point, 30, bestEffort=True
).get("elevation").getInfo()
if test is not None:
print(f"Using NASADEM for {lat},{lon}")
return nasadem, "NASADEM"
except Exception:
pass
# 6. Copernicus GLO-30
try:
cop = ee.ImageCollection("COPERNICUS/DEM/GLO30").mosaic().select("DEM").rename("elevation")
test = cop.reduceRegion(
ee.Reducer.first(), point, 30, bestEffort=True
).get("elevation").getInfo()
if test is not None:
print(f"Using Copernicus GLO-30 for {lat},{lon}")
return cop, "Copernicus_GLO30"
except Exception:
pass
# 7. ALOS World 3D-30m
if abs(lat) <= 82:
try:
alos = ee.ImageCollection("JAXA/ALOS/AW3D30/V4_1").mosaic().select("AVE").rename("elevation")
test = alos.reduceRegion(
ee.Reducer.first(), point, 30, bestEffort=True
).get("elevation").getInfo()
if test is not None:
print(f"Using ALOS AW3D30 AVE for {lat},{lon}")
return alos, 'ALOS_AW3D30_AVE'
except Exception:
pass
# 8. SRTM fallback
if -56 <= lat <= 60:
try:
srtm = ee.Image("USGS/SRTMGL1_003").select("elevation")
test = srtm.reduceRegion(
ee.Reducer.first(), point, 30, bestEffort=True
).get("elevation").getInfo()
if test is not None:
print(f"Using SRTM fallback for {lat},{lon}")
return srtm, "SRTM_v3"
except Exception:
pass
print(f"All DEM sources failed for {lat},{lon}")
return None, None
def _get_forced_dem(lat, lon, dem_name):
"""
Force specific DEM retrieval for validation studies.
Returns None if DEM unavailable at location.
"""
point = ee.Geometry.Point([lon, lat])
# Map DEM names to their retrieval logic
dem_map = {
'ALOS_AW3D30': lambda: (
ee.ImageCollection("JAXA/ALOS/AW3D30/V4_1").mosaic().select("AVE").rename("elevation"),
30
),
'Copernicus_GLO30': lambda: (
ee.ImageCollection("COPERNICUS/DEM/GLO30").mosaic().select("DEM").rename("elevation"),
30
),
'NASADEM': lambda: (
ee.Image("NASA/NASADEM_HGT/001").select("elevation"),
30
),
'SRTM_v3': lambda: (
ee.Image("USGS/SRTMGL1_003").select("elevation"),
30
),
'AHN3_0.5m': lambda: (
ee.ImageCollection("AHN/AHN3").select("DTM").mosaic().rename("elevation"),
1
),
'AHN2_0.5m': lambda: (
ee.Image("AHN/AHN2_05M_INT").select("elevation"),
1
),
'EA_UK_1m': lambda: (
ee.Image("UK/EA/ENGLAND_1M_TERRAIN/2022").select("dtm").reproject(crs="EPSG:4326", scale=2).rename("elevation"),
2
),
'Australia_5m': lambda: (
ee.ImageCollection("AU/GA/AUSTRALIA_5M_DEM").filterBounds(point).mosaic().select("elevation"),
5
),
'USGS_3DEP_10m_collection': lambda: (
ee.ImageCollection("USGS/3DEP/10m_collection").filterBounds(point).mosaic().select("elevation"),
10
)
}
if dem_name not in dem_map:
print(f"Unknown DEM name: {dem_name}")
return None, None
try:
dem, scale = dem_map[dem_name]()
# Test if data exists at this location
test = dem.reduceRegion(
ee.Reducer.first(),
point,
scale,
bestEffort=True
).get("elevation").getInfo()
if test is not None:
print(f"Forced DEM {dem_name} available at {lat},{lon}")
return dem, dem_name
else:
print(f"Forced DEM {dem_name} has no data at {lat},{lon}")
return None, None
except Exception as e:
print(f"Failed to get forced DEM {dem_name} at {lat},{lon}: {e}")
return None, None
def is_significant_water_body(element):
"""
Determine if water feature is significant for flood risk assessment
"""
tags = element.get('tags', {})
name = tags.get('name', '')
# Filter by name - fountains
if name and ('fuente' in name.lower() or 'fountain' in name.lower() or
'fonte' in name.lower()):
return False
# Filter by water type tag
water_type = tags.get('water', '')
if water_type in ['fountain', 'reflecting_pool', 'pond', 'ornamental']:
return False
# Filter by amenity tag
if tags.get('amenity') == 'fountain':
return False
# Check if it's a waterway (rivers/streams/canals are significant)
if tags.get('waterway') in ['river', 'stream', 'canal', 'drain']:
return True
# Calculate approximate area for unnamed water bodies
if tags.get('natural') == 'water' and 'geometry' in element:
coords = element.get('geometry', [])
if len(coords) >= 3:
lons = [c['lon'] for c in coords]
lats = [c['lat'] for c in coords]
width = (max(lons) - min(lons)) * 111320
height = (max(lats) - min(lats)) * 111320
approx_area = width * height
if approx_area < 500:
return False
if len(coords) < 10 and approx_area < 2000:
return False
# Natural water bodies with names (excluding fountains)
if tags.get('natural') == 'water' and name:
return True
# Large unnamed water bodies
if tags.get('natural') == 'water' and not name:
coords = element.get('geometry', [])
if len(coords) > 50:
return True
return False
def distance_to_water_osm(lat, lon, radius_m=5000, timeout=20, retry_count=2):
"""
Query OpenStreetMap for nearby SIGNIFICANT water bodies with retry logic
"""
overpass_url = "http://overpass-api.de/api/interpreter"
query = f"""
[out:json][timeout:{timeout}];
(
way["natural"="water"](around:{radius_m},{lat},{lon});
way["waterway"="river"](around:{radius_m},{lat},{lon});
way["waterway"="canal"](around:{radius_m},{lat},{lon});
way["waterway"="stream"](around:{radius_m},{lat},{lon});
relation["natural"="water"](around:{radius_m},{lat},{lon});
way["natural"="bay"](around:{radius_m},{lat},{lon});
);
out geom;
"""
for attempt in range(retry_count):
try:
if not (-90 <= lat <= 90 and -180 <= lon <= 180):
print(f"Invalid coords for OSM: {lat},{lon}")
return None
response = requests.post(overpass_url, data={'data': query}, timeout=timeout)
if response.status_code == 429:
print(f"OSM rate limited for {lat},{lon} - waiting {2 ** attempt}s")
time.sleep(2 ** attempt)
continue
if response.status_code == 400:
print(f"OSM 400 for {lat},{lon} - bad query")
return None
if response.status_code != 200:
print(f"OSM HTTP {response.status_code} for {lat},{lon}")
if attempt < retry_count - 1:
time.sleep(1)
continue
return None
if not response.text.strip():
print(f"OSM empty response for {lat},{lon}")
return None
try:
data = response.json()
except (json.JSONDecodeError, ValueError) as je:
print(f"OSM JSON decode failed for {lat},{lon}: {je}")
return None
if not data.get('elements'):
print(f"OSM no elements found for {lat},{lon}")
return None
point = Point(lon, lat)
min_distance = float('inf')
significant_features = [e for e in data['elements'] if is_significant_water_body(e)]
if not significant_features and radius_m < 12500:
print(f"Retrying {lat},{lon} with extended radius...")
return distance_to_water_osm(lat, lon, radius_m=10000, timeout=timeout, retry_count=1)
if not significant_features:
print(f"OSM only ornamental features for {lat},{lon}")
return None
from shapely.geometry import LineString, Polygon
for element in significant_features:
if 'geometry' in element and len(element['geometry']) >= 2:
coords = [(node['lon'], node['lat']) for node in element['geometry']]
if element.get('tags', {}).get('waterway'):
try:
water_geom = LineString(coords)
except Exception:
continue
else:
try:
water_geom = Polygon(coords)
except:
try:
water_geom = LineString(coords)
except:
continue
if not water_geom.is_valid:
continue
distance = point.distance(water_geom) * 111320
if not np.isnan(distance):
min_distance = min(min_distance, distance)
result = min_distance if min_distance != float('inf') else None
if result is not None:
print(f"OSM success for {lat},{lon}: {result:.1f}m")
return result
except requests.exceptions.Timeout:
print(f"OSM timeout for {lat},{lon} (attempt {attempt + 1}/{retry_count})")
if attempt < retry_count - 1:
time.sleep(1)
continue
return None
except Exception as e:
print(f"OSM exception for {lat},{lon}: {e}")
if attempt < retry_count - 1:
time.sleep(1)
continue
return None
return None
def distance_to_water_static(lat, lon):
"""
Fallback: calculate distance to Natural Earth water bodies
"""
point = Point(lon, lat)
utm_zone = int((lon + 180) / 6) + 1
hemisphere = 'north' if lat >= 0 else 'south'
utm_crs = CRS.from_string(f"+proj=utm +zone={utm_zone} +{hemisphere} +datum=WGS84")
transformer = Transformer.from_crs("EPSG:4326", utm_crs, always_xy=True)
point_utm_coords = transformer.transform(lon, lat)
point_utm = Point(point_utm_coords)
try:
# Use lazy-loaded datasets
rivers_utm = get_rivers().to_crs(utm_crs)
lakes_utm = get_lakes().to_crs(utm_crs)
river_distances = rivers_utm.geometry.distance(point_utm)
river_distances = river_distances[river_distances.notna()]
min_river_dist = river_distances.min() if len(river_distances) > 0 else np.inf
lake_distances = lakes_utm.geometry.distance(point_utm)
lake_distances = lake_distances[lake_distances.notna()]
min_lake_dist = lake_distances.min() if len(lake_distances) > 0 else np.inf
min_dist = min(min_river_dist, min_lake_dist)
result = min_dist if min_dist != np.inf else None
if result is not None:
print(f"Static fallback for {lat},{lon}: {result:.1f}m")
else:
print(f"Static fallback failed for {lat},{lon}")
return result
except Exception as p_err:
print(f"Static distance error for {lat},{lon}: {p_err}")
return None
def check_coastal(lat, lon, timeout=15):
"""
Adaptive coastal detection: expands search radius until coastline is found.
"""
overpass_url = "http://overpass-api.de/api/interpreter"
point = Point(lon, lat)
# Sweep radii from 1 km to 5 km
radii = [1000, 2000, 5000]
print(f"[Coastal] Starting coastal search for {lat},{lon} ...")
for r in radii:
query = f"""
[out:json][timeout:{timeout}];
(
way["natural"="coastline"](around:{r},{lat},{lon});
);
out geom;
"""
try:
response = requests.post(overpass_url, data={'data': query}, timeout=timeout)
if not response.text.strip():
continue
try:
data = response.json()
except:
continue
if not data.get('elements'):
print(f"[Coastal] No coastline found at {r} m")
continue
min_distance = float('inf')
from shapely.geometry import LineString
for element in data['elements']:
if 'geometry' in element and len(element['geometry']) >= 2:
coords = [(node['lon'], node['lat']) for node in element['geometry']]
coastline = LineString(coords)
distance = point.distance(coastline) * 111320
min_distance = min(min_distance, distance)
if min_distance != float('inf'):
print(f"Coastal detected for {lat},{lon}: {min_distance:.1f}m (radius={r})")
return True, min_distance
except Exception as e:
print(f"[Coastal] Error at radius {r}: {e}")
continue
# If nothing is found
print(f"[Coastal] No coastline detected for {lat},{lon}. Continuing with OSM water search.")
return False, None
@lru_cache(maxsize=1000)
def distance_to_water(lat, lon):
"""
Combined water distance with caching for batch efficiency.
Uses OSM first, then Natural Earth fallback.
"""
lat, lon = round(float(lat), 6), round(float(lon), 6)
print(f"--- Water distance query for {lat},{lon} ---")
# 1. Check coastal proximity
try:
is_coastal, coast_distance = check_coastal(lat, lon)
if is_coastal and coast_distance is not None:
print(f"Coastal detected for {lat},{lon}: {coast_distance:.1f} m")
return coast_distance
except Exception as e:
print(f"Coastal check failed for {lat},{lon}: {e}")
# 2. Try OSM query with retries
for radius in [3000, 5000, 8000]:
for attempt in range(3):
try:
print(f"OSM attempt {attempt + 1}/3 at radius {radius} m for {lat},{lon}")
d = distance_to_water_osm(lat, lon, radius_m=radius)
if d is not None:
print(f"OSM success for {lat},{lon}: {d:.1f} m (radius={radius})")
return d
except Exception as e:
print(f"OSM exception on attempt {attempt + 1} for {lat},{lon}: {e}")
time.sleep(1.5)
time.sleep(1.5)
# 3. Static fallback
try:
d_static = distance_to_water_static(lat, lon)
if d_static is not None:
corrected = d_static * 0.7
print(f"Static fallback for {lat},{lon}: raw={d_static:.1f} m, corrected={corrected:.1f} m")
return corrected
else:
print(f"Static fallback failed for {lat},{lon}")
except Exception as e:
print(f"Static distance error for {lat},{lon}: {e}")
print(f"All water distance queries failed for {lat},{lon}")
return None