TerrainExplorer / main.py
gauthamnairy's picture
Update main.py
ef5b01a verified
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse, PlainTextResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
import os
import signal
import re
import threading
import time
import math
import logging
import requests
import rasterio
import numpy as np
from collections import Counter
from typing import Optional, Tuple, Dict, List
import urllib3
import asyncio
import concurrent.futures
from functools import lru_cache
import geopy.distance
from geopy import Point
# Disable SSL verification warnings for testing
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
os.environ["GDAL_HTTP_UNSAFESSL"] = "YES"
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
app = FastAPI(title="Enhanced Terrain Analysis API", version="2.0.0")
# Enable CORS for frontend communication
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Enhanced Pydantic models for request/response
class LocationRequest(BaseModel):
input: str
radius_meters: Optional[int] = 100
detailed_analysis: Optional[bool] = True
class LandCoverAnalysis(BaseModel):
primary_classification: str
confidence: float
total_samples: int
valid_samples: int
classifications: Dict[str, int]
percentages: Dict[str, float]
radius_meters: int
sampling_method: str
class TerrainResponse(BaseModel):
latitude: float
longitude: float
elevation: Optional[float] = None
terrain_type: str
landcover_analysis: Optional[LandCoverAnalysis] = None
elevation_confidence: Optional[str] = None
coastal_detection: Optional[bool] = None
success: bool
message: Optional[str] = None
processing_time: Optional[float] = None
# Enhanced dataset configuration with bathymetry support
DATASETS = [
{'name': 'nzdem8m', 'min_lat': -47.6, 'max_lat': -34.4, 'min_lon': 166.5, 'max_lon': 178.6, 'resolution': 8},
{'name': 'ned10m', 'min_lat': 19.5, 'max_lat': 49.4, 'min_lon': -160.3,'max_lon': -66.9, 'resolution': 10},
{'name': 'eudem25m', 'min_lat': 25.0, 'max_lat': 72.0, 'min_lon': -60.0, 'max_lon': 40.0, 'resolution': 25},
{'name': 'mapzen', 'min_lat': -90.0, 'max_lat': 90.0, 'min_lon': -180.0,'max_lon': 180.0, 'resolution': 30},
{'name': 'aster30m', 'min_lat': -90.0, 'max_lat': 90.0, 'min_lon': -180.0,'max_lon': 180.0, 'resolution': 30},
{'name': 'srtm30m', 'min_lat': -60.0, 'max_lat': 60.0, 'min_lon': -180.0,'max_lon': 180.0, 'resolution': 30},
{'name': 'srtm90m', 'min_lat': -60.0, 'max_lat': 60.0, 'min_lon': -180.0,'max_lon': 180.0, 'resolution': 90},
{'name': 'gebco2020', 'min_lat': -90.0, 'max_lat': 90.0, 'min_lon': -180.0,'max_lon': 180.0, 'resolution': 450},
{'name': 'etopo1', 'min_lat': -90.0, 'max_lat': 90.0, 'min_lon': -180.0,'max_lon': 180.0, 'resolution': 1800},
]
# HTTP session with optimized settings
session = requests.Session()
session.verify = False
# Optimize session for speed
adapter = requests.adapters.HTTPAdapter(
pool_connections=10,
pool_maxsize=20,
max_retries=1
)
session.mount('http://', adapter)
session.mount('https://', adapter)
def timeout_handler(signum, frame):
"""Handle timeout signals"""
raise TimeoutError("Operation timed out")
def periodic_warmup():
"""Periodic warmup to keep connections alive"""
while True:
try:
get_elevation_opentopo_single(0, 0)
except Exception as e:
logging.warning(f"Periodic warmup elevation failed: {e}")
try:
sample_worldcover(0, 0)
except Exception as e:
logging.warning(f"Periodic warmup worldcover failed: {e}")
# Sleep for 5 minutes (adjust as needed)
time.sleep(300)
def warm_up_connections():
"""Enhanced warm up with bathymetry services"""
try:
# Test elevation API
test_url = "https://api.opentopodata.org/v1/aster30m?locations=40.7589,-73.9851"
session.get(test_url, timeout=5)
# Test SRTMGL1 for offshore
test_url2 = "https://api.opentopodata.org/v1/srtmgl1?locations=40.0,-70.0"
session.get(test_url2, timeout=5)
# Test ETOPO1 for deep bathymetry
test_url3 = "https://api.opentopodata.org/v1/etopo1?locations=35.0,-75.0"
session.get(test_url3, timeout=5)
# Test WorldCover (small request)
test_tile_url = "https://esa-worldcover.s3.eu-central-1.amazonaws.com/v100/2020/map/ESA_WorldCover_10m_2020_v100_N39W075_Map.tif"
session.head(test_tile_url, timeout=5)
logging.info("Enhanced connection warm-up completed successfully")
except Exception as e:
logging.warning(f"Connection warm-up failed: {e}")
def select_dataset(lat: float, lon: float) -> str:
"""Select the best available dataset for given coordinates"""
candidates = [
ds for ds in DATASETS
if ds['min_lat'] <= lat <= ds['max_lat'] and ds['min_lon'] <= lon <= ds['max_lon']
]
if not candidates:
logging.debug(f"No regional dataset covers ({lat}, {lon}); falling back to 'mapzen'.")
return 'mapzen'
chosen = min(candidates, key=lambda x: x['resolution'])['name']
logging.debug(f"Selected OpenTopoData dataset '{chosen}' for ({lat}, {lon}).")
return chosen
def get_elevation_opentopo_single(lat: float, lon: float) -> Tuple[Optional[float], str]:
"""Fetch elevation for a single coordinate from OpenTopoData with enhanced fallback"""
dataset = select_dataset(lat, lon)
url = f"https://api.opentopodata.org/v1/{dataset}?locations={lat},{lon}"
try:
resp = session.get(url, timeout=10)
if resp.status_code == 200:
data = resp.json()
if data.get('status') == 'OK':
elev = data['results'][0].get('elevation')
if elev is not None:
logging.debug(f"Got elevation {elev} m from '{dataset}' for ({lat}, {lon}).")
return elev, f"High ({dataset})"
else:
logging.warning(f"Null elevation from '{dataset}' for ({lat}, {lon}).")
else:
logging.warning(f"OpenTopoData status '{data.get('status')}' for '{dataset}' at ({lat}, {lon}).")
else:
logging.warning(f"HTTP {resp.status_code} from OpenTopoData for '{dataset}' at ({lat}, {lon}).")
except requests.RequestException as e:
logging.error(f"OpenTopoData request exception at ({lat}, {lon}): {e}")
# Fallback to 'mapzen' if primary dataset failed
if dataset != 'mapzen':
try:
url2 = f"https://api.opentopodata.org/v1/mapzen?locations={lat},{lon}"
resp2 = session.get(url2, timeout=10)
if resp2.status_code == 200:
data2 = resp2.json()
if data2.get('status') == 'OK':
elev2 = data2['results'][0].get('elevation')
if elev2 is not None:
logging.debug(f"Got elevation {elev2} m from fallback 'mapzen' for ({lat}, {lon}).")
return elev2, "Medium (mapzen fallback)"
except requests.RequestException as e2:
logging.error(f"Fallback OpenTopoData request exception at ({lat}, {lon}): {e2}")
return None, "No data available"
def is_coastal_location(lat: float, lon: float, elevation: Optional[float] = None) -> bool:
"""Enhanced coastal detection with stricter criteria to avoid false positives"""
# Method 1: Check distance to nearest coastline using known coastal points
# REDUCED distance threshold to avoid marking offshore areas as coastal
# Major coastal reference points (same as before)
coastal_reference_points = [
# North America - Atlantic Coast
(40.7589, -73.9851), # New York Harbor
(25.7617, -80.1918), # Miami
(32.0835, -81.0998), # Savannah
(39.2904, -76.6122), # Baltimore
(42.3601, -71.0589), # Boston
(44.3106, -69.7795), # Maine Coast
# North America - Pacific Coast
(37.7749, -122.4194), # San Francisco
(34.0522, -118.2437), # Los Angeles
(47.6062, -122.3321), # Seattle
(49.2827, -123.1207), # Vancouver
(32.7157, -117.1611), # San Diego
# North America - Gulf Coast
(29.9511, -90.0715), # New Orleans
(29.7604, -95.3698), # Houston
(27.7663, -82.6404), # Tampa
# Europe - Atlantic Coast
(51.5074, -0.1278), # London Thames
(53.3498, -6.2603), # Dublin
(55.9533, -3.1883), # Edinburgh
(59.9139, 10.7522), # Oslo
(60.1699, 24.9384), # Helsinki
# Europe - Mediterranean
(41.9028, 12.4964), # Rome
(43.2965, 5.3698), # Marseille
(41.3851, 2.1734), # Barcelona
(36.7213, -4.4214), # Malaga
(37.9755, 23.7348), # Athens
# Asia - Pacific Coast
(35.6762, 139.6503), # Tokyo Bay
(22.3193, 114.1694), # Hong Kong
(1.3521, 103.8198), # Singapore
(31.2304, 121.4737), # Shanghai
(37.5665, 126.9780), # Seoul (Incheon)
# Australia/Oceania
(-33.8688, 151.2093), # Sydney
(-37.8136, 144.9631), # Melbourne
(-31.9505, 115.8605), # Perth
(-27.4698, 153.0251), # Brisbane
# South America
(-22.9068, -43.1729), # Rio de Janeiro
(-34.6118, -58.3960), # Buenos Aires
(-33.4489, -70.6693), # Valparaiso
(-12.0464, -77.0428), # Lima
# Africa
(-33.9249, 18.4241), # Cape Town
(30.0444, 31.2357), # Cairo (Nile Delta)
(6.5244, 3.3792), # Lagos
(-26.2041, 28.0473), # Johannesburg (inland reference)
# Middle East
(25.2048, 55.2708), # Dubai
(26.8206, 30.8025), # Red Sea Coast
(31.7683, 35.2137), # Jerusalem (Mediterranean proximity)
]
# Calculate minimum distance to any coastal reference point
min_distance_km = float('inf')
current_point = (lat, lon)
for coast_lat, coast_lon in coastal_reference_points:
distance = geopy.distance.geodesic(current_point, (coast_lat, coast_lon)).kilometers
min_distance_km = min(min_distance_km, distance)
# STRICTER: Only within 25km of major coastal city (reduced from 50km)
if min_distance_km <= 25:
return True
# Method 2: Geographic coastal zones with MUCH smaller buffers
coastal_zones = [
# North America - East Coast (Atlantic) - REDUCED buffer
{'lat_range': (25, 50), 'lon_range': (-85, -65), 'buffer': 0.5}, # Reduced from 2.0
# North America - West Coast (Pacific) - REDUCED buffer
{'lat_range': (30, 60), 'lon_range': (-130, -115), 'buffer': 0.5}, # Reduced from 2.0
# North America - Gulf Coast - REDUCED buffer
{'lat_range': (25, 35), 'lon_range': (-100, -80), 'buffer': 0.5}, # Reduced from 2.0
# Europe - Atlantic Coast - REDUCED buffer
{'lat_range': (35, 70), 'lon_range': (-15, 5), 'buffer': 0.5}, # Reduced from 2.0
# Europe - Mediterranean - REDUCED buffer
{'lat_range': (30, 46), 'lon_range': (-6, 42), 'buffer': 0.3}, # Reduced from 1.5
# Europe - Baltic Sea - REDUCED buffer
{'lat_range': (53, 66), 'lon_range': (10, 30), 'buffer': 0.3}, # Reduced from 1.5
# Asia - East Coast (Pacific) - REDUCED buffer
{'lat_range': (20, 50), 'lon_range': (120, 145), 'buffer': 0.5}, # Reduced from 2.0
# Asia - South/Southeast Coast (Indian Ocean) - REDUCED buffer
{'lat_range': (-10, 25), 'lon_range': (65, 140), 'buffer': 0.5}, # Reduced from 2.0
# Australia - All coasts - REDUCED buffer
{'lat_range': (-45, -10), 'lon_range': (110, 155), 'buffer': 0.5}, # Reduced from 2.0
# South America - East Coast (Atlantic) - REDUCED buffer
{'lat_range': (-55, 15), 'lon_range': (-70, -30), 'buffer': 0.5}, # Reduced from 2.0
# South America - West Coast (Pacific) - REDUCED buffer
{'lat_range': (-55, 15), 'lon_range': (-85, -70), 'buffer': 0.5}, # Reduced from 2.0
# Africa - All coasts - REDUCED buffer
{'lat_range': (-35, 37), 'lon_range': (-20, 55), 'buffer': 0.5}, # Reduced from 2.0
# Arctic coasts - REDUCED buffer
{'lat_range': (65, 85), 'lon_range': (-180, 180), 'buffer': 1.0}, # Reduced from 3.0
# Caribbean and Central America - REDUCED buffer
{'lat_range': (10, 30), 'lon_range': (-90, -60), 'buffer': 0.3}, # Reduced from 1.5
# Middle East - REDUCED buffer
{'lat_range': (12, 42), 'lon_range': (32, 65), 'buffer': 0.3}, # Reduced from 1.5
]
for zone in coastal_zones:
lat_min, lat_max = zone['lat_range']
lon_min, lon_max = zone['lon_range']
buffer = zone['buffer']
if (lat_min - buffer) <= lat <= (lat_max + buffer) and (lon_min - buffer) <= lon <= (lon_max + buffer):
# Check if close to zone edges (indicating coastline proximity)
edge_distance = min(
abs(lat - lat_min), abs(lat - lat_max),
abs(lon - lon_min), abs(lon - lon_max)
)
# STRICTER: Only if very close to edges
if edge_distance <= buffer * 0.5: # Even stricter threshold
return True
# Method 3: Island detection (small land masses surrounded by water)
# Check for known island regions with SMALLER ranges
island_regions = [
# Caribbean Islands - REDUCED range
{'lat_range': (10, 28), 'lon_range': (-85, -60)},
# Pacific Islands - REDUCED range
{'lat_range': (-25, 25), 'lon_range': (120, -120)}, # Crosses dateline
# Mediterranean Islands - REDUCED range
{'lat_range': (35, 42), 'lon_range': (8, 20)},
# British Isles - REDUCED range
{'lat_range': (49, 61), 'lon_range': (-11, 2)},
# Japanese Islands - REDUCED range
{'lat_range': (24, 46), 'lon_range': (123, 146)},
# Indonesian Islands - REDUCED range
{'lat_range': (-11, 6), 'lon_range': (95, 141)},
]
for island in island_regions:
lat_min, lat_max = island['lat_range']
lon_min, lon_max = island['lon_range']
if lat_min <= lat <= lat_max and lon_min <= lon <= lon_max:
# Additional check: only if elevation suggests land nearby
if elevation is not None and elevation > -50: # Not deep ocean
return True
return False
def is_onshore_water_body(lat: float, lon: float, elevation: Optional[float] = None, landcover_analysis: Optional[LandCoverAnalysis] = None) -> bool:
"""
SIMPLIFIED: Detect onshore water bodies (lakes, reservoirs, rivers)
Based on your requirements: positive elevation or up to -2m elevation = Water Body
"""
# MAIN CRITERION: Elevation-based detection
if elevation is not None:
# If elevation is positive or down to -2m, it's an onshore water body
if elevation >= -2:
return True
# Additional context check for borderline cases
if elevation is not None and -5 <= elevation < -2:
# For slightly deeper areas, check if there's significant land nearby
if landcover_analysis and landcover_analysis.classifications:
land_types = ['Urban', 'Farmland', 'Woodland', 'Grassland', 'Desert', 'Shrubland', 'Tundra', 'Arctic/mountain']
land_percentage = sum(
landcover_analysis.percentages.get(land_type, 0)
for land_type in land_types
)
# If more than 30% land nearby, likely an onshore water body
if land_percentage > 30:
return True
return False
def map_worldcover_to_terrain_simplified(code: int, lat: float, lon: float, elevation: Optional[float] = None, landcover_analysis: Optional[LandCoverAnalysis] = None) -> str:
"""SIMPLIFIED terrain mapping with new water body classification"""
# WATER CLASSIFICATION (Code 80 - Permanent water bodies)
if code == 80: # Permanent water bodies
# CORRECTED LOGIC: First check elevation for marine vs onshore
if elevation is not None:
if elevation >= -2: # Onshore water body threshold
return "Water Body"
else: # Marine water (elevation < -2)
depth = -elevation
if depth <= 200:
return "Continental Shelf"
else:
return "Deep Ocean" # This will now work correctly
# Fallback: if no elevation data, check if it's onshore water body
if is_onshore_water_body(lat, lon, elevation, landcover_analysis):
return "Water Body"
# Default fallback for unclear cases
return "Water Body"
# COASTAL LOGIC for non-water codes (keep existing logic but simplified)
is_coastal = is_coastal_location(lat, lon, elevation)
if is_coastal:
if code == 60: # Bare/sparse vegetation
return "Tidal Zone" # Keep tidal zone for coastal bare areas
elif code == 95: # Mangroves
return "Tidal Zone" # Keep tidal zone for mangroves
elif code == 90: # Herbaceous wetland
return "Tidal Zone" # Keep tidal zone for coastal wetlands
elif code in [10, 20, 30]: # Vegetation near coast
# Check elevation for coastal vegetation
if elevation is not None and elevation <= 20: # Up to 20m for coastal vegetation
return "Tidal Zone"
# STANDARD TERRAIN CLASSIFICATION for non-coastal areas (unchanged)
if code == 10: # Tree cover
if elevation is not None and elevation > 2000:
return "Arctic/mountain"
elif -23.5 <= lat <= 23.5: # Tropical zone
if elevation is not None and elevation < 500:
return "Jungle"
else:
return "Closed forest"
else:
return "Woodland"
elif code == 20: # Shrubland
if elevation is not None and elevation > 2000:
return "Arctic/mountain"
else:
return "Shrubland"
elif code == 30: # Grassland
if lat > 60 or lat < -60:
return "Tundra"
else:
return "Grassland"
elif code == 40: # Cropland
return "Farmland"
elif code == 50: # Built-up
return "Urban"
elif code == 60: # Bare/sparse vegetation
# Non-coastal bare areas
if lat > 66.5 or lat < -66.5:
return "Arctic/mountain"
elif elevation is not None and elevation > 2500:
return "Arctic/mountain"
else:
return "Desert"
elif code == 70: # Snow and ice
return "Arctic/mountain"
elif code == 90: # Herbaceous wetland
return "Swamp"
elif code == 95: # Mangroves
return "Tidal Zone" # Mangroves are always tidal
elif code == 100: # Moss and lichen
return "Tundra"
else:
# Unknown codes - IMPROVED logic for water classification when WorldCover fails
if elevation is not None:
if elevation >= -2: # Onshore water body threshold
return "Water Body"
elif elevation < -2: # Marine water - THIS IS THE KEY FIX
depth = -elevation # Convert negative elevation to positive depth
if depth <= 200:
return "Continental Shelf"
else:
return "Deep Ocean" # This should now work for deep ocean
elif is_coastal_location(lat, lon, elevation) and elevation <= 10:
return "Tidal Zone"
# If no elevation data available, try to infer from location
if is_coastal_location(lat, lon, elevation):
return "Tidal Zone"
return "Unknown"
def detect_coastal_area(lat: float, lon: float, elevation: Optional[float], landcover_analysis: Optional[LandCoverAnalysis]) -> bool:
"""IMPROVED coastal area detection with stricter criteria"""
if elevation is None:
return False
# Primary indicators for tidal zone (shoreline areas)
is_low_elevation = -2 <= elevation <= 10 # Reduced range for more accurate tidal zone detection
# Check land cover analysis for water presence
has_water_nearby = False
if landcover_analysis and landcover_analysis.classifications:
water_types = ['Continental Shelf', 'Deep Ocean', 'Water Body', 'Swamp', 'Tidal Zone'] # Updated water types
water_percentage = sum(
landcover_analysis.percentages.get(water_type, 0)
for water_type in water_types
)
has_water_nearby = water_percentage > 20 # Increased threshold for more accuracy
# Geographic coastal detection (now stricter)
is_geographically_coastal = is_coastal_location(lat, lon, elevation)
# Enhanced tidal zone detection - must meet multiple criteria
return (is_low_elevation and is_geographically_coastal) or (has_water_nearby and is_geographically_coastal)
def generate_sample_points(center_lat: float, center_lon: float, radius_meters: int = 100, num_points: int = 16) -> List[Tuple[float, float]]:
"""Optimized sample point generation with fewer points for speed"""
points = [(center_lat, center_lon)]
# Reduced number of points for faster processing
if radius_meters <= 100:
num_points = 12 # Reduced from 25
elif radius_meters <= 300:
num_points = 20 # Reduced from 49
else:
num_points = 30 # Reduced from 81
# Convert radius from meters to degrees
lat_factor = 111320.0
lon_factor = 111320.0 * math.cos(math.radians(center_lat))
radius_lat = radius_meters / lat_factor
radius_lon = radius_meters / lon_factor
# Generate points in 2 concentric circles (reduced from 3)
circles = 2
points_per_circle = (num_points - 1) // circles
for circle in range(1, circles + 1):
circle_radius_lat = radius_lat * (circle / circles)
circle_radius_lon = radius_lon * (circle / circles)
points_in_this_circle = points_per_circle
if circle == circles:
points_in_this_circle += (num_points - 1) % circles
for i in range(points_in_this_circle):
angle = 2 * math.pi * i / points_in_this_circle
lat_offset = circle_radius_lat * math.cos(angle)
lon_offset = circle_radius_lon * math.sin(angle)
sample_lat = center_lat + lat_offset
sample_lon = center_lon + lon_offset
if -90 <= sample_lat <= 90 and -180 <= sample_lon <= 180:
points.append((sample_lat, sample_lon))
return points
@lru_cache(maxsize=500)
def worldcover_tile(lat: float, lon: float) -> str:
"""Cached tile computation"""
lat_floor = math.floor(lat / 3) * 3
lon_floor = math.floor(lon / 3) * 3
if lat_floor >= 0:
lat_label = f"N{int(lat_floor):02d}"
else:
lat_label = f"S{int(abs(lat_floor)):02d}"
if lon_floor >= 0:
lon_label = f"E{int(lon_floor):03d}"
else:
lon_label = f"W{int(abs(lon_floor)):03d}"
return lat_label + lon_label
def sample_worldcover(lat: float, lon: float, year: int = 2020, version: str = "v100") -> Optional[int]:
"""Optimized WorldCover sampling with faster timeout"""
tile = worldcover_tile(lat, lon)
url = (
f"https://esa-worldcover.s3.eu-central-1.amazonaws.com/"
f"{version}/{year}/map/ESA_WorldCover_10m_{year}_{version}_{tile}_Map.tif"
)
try:
with rasterio.Env(GDAL_HTTP_TIMEOUT=10): # Reduced timeout
with rasterio.open(url) as ds:
row, col = ds.index(lon, lat)
if 0 <= row < ds.height and 0 <= col < ds.width:
arr = ds.read(1, window=((row, row+1), (col, col+1)))
val = arr[0, 0]
if ds.nodata is not None and val == ds.nodata:
return None
return int(val)
else:
return None
except Exception as e:
logging.warning(f"WorldCover sampling failed for {tile}: {e}")
return None
async def get_landcover_consensus_async(lat: float, lon: float, radius_meters: int = 100) -> Tuple[Optional[int], str, Optional[LandCoverAnalysis]]:
"""Async land cover classification with parallel processing"""
start_time = time.time()
# Limit radius and reduce sample points for speed
radius_meters = max(50, min(500, radius_meters))
# Fewer sample points for faster processing
if radius_meters <= 100:
num_points = 12
elif radius_meters <= 300:
num_points = 20
else:
num_points = 25
sample_points = generate_sample_points(lat, lon, radius_meters, num_points)
# Get elevation for enhanced terrain mapping
elevation, _ = get_elevation_opentopo_single(lat, lon)
# Parallel processing of sample points with better timeout handling
classifications = []
valid_samples = 0
# Use ThreadPoolExecutor for parallel requests with improved timeout handling
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Submit all sampling tasks
future_to_point = {
executor.submit(sample_worldcover, sample_lat, sample_lon): (sample_lat, sample_lon)
for sample_lat, sample_lon in sample_points[:15] # Limit to 15 points for speed
}
# Collect results with improved timeout handling
completed_futures = []
try:
# Wait for completion with overall timeout
completed_futures = list(concurrent.futures.as_completed(future_to_point, timeout=12))
except concurrent.futures.TimeoutError:
logging.warning("Some futures timed out, processing completed ones")
# Get completed futures without timeout
completed_futures = [f for f in future_to_point.keys() if f.done()]
# Process completed futures
for future in completed_futures:
try:
sample_lat, sample_lon = future_to_point[future]
code = future.result(timeout=1) # Short timeout for individual results
if code is not None:
terrain_type = map_worldcover_to_terrain_simplified(code, sample_lat, sample_lon, elevation) # Updated function call
classifications.append((code, terrain_type))
valid_samples += 1
except (concurrent.futures.TimeoutError, Exception) as e:
logging.warning(f"Sample point failed: {e}")
continue
# Cancel any remaining futures
for future in future_to_point.keys():
if not future.done():
future.cancel()
# Ensure we have at least some data
if not classifications:
# Fallback: try center point only
try:
center_code = sample_worldcover(lat, lon)
if center_code is not None:
center_terrain = map_worldcover_to_terrain_simplified(center_code, lat, lon, elevation) # Updated function call
classifications.append((center_code, center_terrain))
valid_samples = 1
except Exception as e:
logging.error(f"Fallback center point sampling failed: {e}")
return None, "Unknown", None
# Rest of the function remains the same...
terrain_counts = Counter([terrain for _, terrain in classifications])
code_counts = Counter([code for code, _ in classifications])
most_common_terrain = terrain_counts.most_common(1)[0][0]
most_common_code = code_counts.most_common(1)[0][0]
confidence = (terrain_counts[most_common_terrain] / valid_samples) * 100
percentages = {}
for terrain, count in terrain_counts.items():
percentages[terrain] = round((count / valid_samples) * 100, 1)
analysis = LandCoverAnalysis(
primary_classification=most_common_terrain,
confidence=round(confidence, 1),
total_samples=len(sample_points),
valid_samples=valid_samples,
classifications=dict(terrain_counts),
percentages=percentages,
radius_meters=radius_meters,
sampling_method="Parallel Systematic Sampling"
)
processing_time = time.time() - start_time
logging.info(f"Land cover analysis completed in {processing_time:.2f}s: {most_common_terrain} ({confidence:.1f}% confidence)")
return most_common_code, most_common_terrain, analysis
async def classify_terrain_enhanced_async(lat: float, lon: float, radius_meters: int = 100) -> Dict[str, any]:
"""Async enhanced terrain classification with simplified water body logic"""
start_time = time.time()
# Get elevation and land cover in parallel with better timeout handling
elevation_task = asyncio.create_task(
asyncio.to_thread(get_elevation_opentopo_single, lat, lon)
)
landcover_task = asyncio.create_task(
get_landcover_consensus_async(lat, lon, radius_meters)
)
# Wait for both tasks with improved error handling
elev, elev_confidence = None, "Failed"
code, terrain, landcover_analysis = None, "Unknown", None
try:
# Try to get elevation with timeout
elev, elev_confidence = await asyncio.wait_for(elevation_task, timeout=15)
except asyncio.TimeoutError:
logging.warning("Elevation fetch timed out")
elev, elev_confidence = None, "Timeout"
except Exception as e:
logging.error(f"Elevation fetch failed: {e}")
elev, elev_confidence = None, "Error"
try:
# Try to get landcover with timeout
code, terrain, landcover_analysis = await asyncio.wait_for(landcover_task, timeout=18)
except asyncio.TimeoutError:
logging.warning("Landcover analysis timed out")
code, terrain, landcover_analysis = None, "Unknown", None
except Exception as e:
logging.error(f"Landcover analysis failed: {e}")
code, terrain, landcover_analysis = None, "Unknown", None
# Cancel any remaining tasks
if not elevation_task.done():
elevation_task.cancel()
if not landcover_task.done():
landcover_task.cancel()
# Coastal detection
is_coastal = detect_coastal_area(lat, lon, elev, landcover_analysis)
if elev is None:
return {
"elevation": None,
"terrain_type": terrain or "Unknown",
"landcover_analysis": landcover_analysis,
"elevation_confidence": elev_confidence,
"coastal_detection": is_coastal,
"processing_time": time.time() - start_time
}
# SIMPLIFIED terrain classification logic
final_terrain = terrain
if terrain == "Unknown" and elev is not None:
# Use elevation-only classification for ocean areas
if elev < -2: # Marine water
depth = -elev
if depth <= 200:
terrain = "Continental Shelf"
else:
terrain = "Deep Ocean"
elif elev >= -2 and elev <= 0:
terrain = "Water Body"
elif is_coastal_location(lat, lon, elev) and elev <= 10:
terrain = "Tidal Zone"
# Coastal detection
is_coastal = detect_coastal_area(lat, lon, elev, landcover_analysis)
# Override terrain classification based on elevation for water bodies
if elev is not None and terrain in ["Water Body", "Continental Shelf", "Deep Ocean"]:
if elev >= -2: # Onshore water body threshold
final_terrain = "Water Body"
elif elev < -2: # Marine water
depth = -elev
if depth <= 200:
final_terrain = "Continental Shelf"
else:
final_terrain = "Deep Ocean"
elif is_coastal and elev is not None and elev <= 10:
# Keep tidal zone for coastal areas
if terrain not in ["Water Body", "Continental Shelf", "Deep Ocean"]:
final_terrain = "Tidal Zone"
elif elev is not None and elev > 3000:
final_terrain = "Arctic/mountain"
return {
"elevation": elev,
"terrain_type": final_terrain,
"landcover_analysis": landcover_analysis,
"elevation_confidence": elev_confidence,
"coastal_detection": is_coastal,
"processing_time": time.time() - start_time
}
def parse_decimal(input_str: str) -> Optional[Tuple[float, float]]:
"""Parse decimal degree coordinates from input string"""
float_pattern = r"[-+]?\d*\.\d+|\d+"
nums = re.findall(float_pattern, input_str)
if len(nums) >= 2:
try:
lat = float(nums[0])
lon = float(nums[1])
if -90 <= lat <= 90 and -180 <= lon <= 180:
return lat, lon
except ValueError:
return None
return None
def parse_dms(input_str: str) -> Optional[Tuple[float, float]]:
"""Parse DMS (degrees-minutes-seconds) coordinates"""
dms_pattern = r"""(?P<deg>\d{1,3})[°\s]+(?P<min>\d{1,2})['\s]+(?P<sec>\d{1,2}(?:\.\d+)?)[\"\s]*?(?P<dir>[NSEW])"""
matches = re.finditer(dms_pattern, input_str, re.IGNORECASE | re.VERBOSE)
coords = []
for m in matches:
deg = float(m.group('deg'))
minute = float(m.group('min'))
sec = float(m.group('sec'))
direction = m.group('dir').upper()
dec = deg + minute/60.0 + sec/3600.0
if direction in ('S', 'W'):
dec = -dec
coords.append(dec)
if len(coords) >= 2:
lat, lon = coords[0], coords[1]
if -90 <= lat <= 90 and -180 <= lon <= 180:
return lat, lon
return None
def geocode_place(place: str) -> Optional[Tuple[float, float]]:
"""Optimized geocoding with faster timeout"""
url = "https://nominatim.openstreetmap.org/search"
params = {
'q': place,
'format': 'json',
'limit': 1,
'addressdetails': 1
}
headers = {
'User-Agent': "enhanced-terrain-tool/2.0 (terrain-analysis@spglobal.com)"
}
try:
resp = requests.get(url, params=params, headers=headers, timeout=5, verify=False)
if resp.status_code == 200:
results = resp.json()
if results:
result = results[0]
lat = float(result['lat'])
lon = float(result['lon'])
logging.info(f"Geocoded '{place}' to ({lat}, {lon})")
return lat, lon
except requests.RequestException as e:
logging.error(f"Geocoding failed for '{place}': {e}")
return None
def parse_location(input_str: str) -> Optional[Tuple[float, float]]:
"""Parse location from various input formats or geocode place name"""
input_str = input_str.strip()
# Try decimal degrees first
decimal_coords = parse_decimal(input_str)
if decimal_coords:
return decimal_coords
# Try DMS format
dms_coords = parse_dms(input_str)
if dms_coords:
return dms_coords
# Fall back to geocoding
return geocode_place(input_str)
# Add periodic connection refresh
async def refresh_connections():
"""Periodically refresh connections to prevent timeouts"""
while True:
await asyncio.sleep(300) # Every 5 minutes
try:
# Quick test request to keep connections alive
test_url = "https://api.opentopodata.org/v1/aster30m?locations=0,0"
session.get(test_url, timeout=3)
except:
pass
# API Endpoints
@app.get("/", include_in_schema=False)
async def root():
"""Serve the main application page"""
if not os.path.exists("index.html"):
return PlainTextResponse("index.html not found", status_code=404)
return FileResponse("index.html")
@app.on_event("startup")
async def startup_event():
"""Initialize services on startup with connection warming"""
logging.info("Starting Enhanced Terrain Analysis API v2.0.0 - Simplified Water Bodies")
# Initial warmup
def do_warmup():
try:
get_elevation_opentopo_single(0, 0)
except Exception as e:
logging.warning(f"Startup warmup elevation failed: {e}")
try:
sample_worldcover(0, 0)
except Exception as e:
logging.warning(f"Startup warmup worldcover failed: {e}")
threading.Thread(target=do_warmup).start()
# Start periodic warmup in a background thread
threading.Thread(target=periodic_warmup, daemon=True).start()
# Start connection refresh task
@app.on_event("startup")
async def start_background_tasks():
asyncio.create_task(refresh_connections())
@app.post("/api/analyze", response_model=TerrainResponse)
async def analyze_terrain(request: LocationRequest):
"""Optimized terrain analysis with simplified water body classification"""
request_start_time = time.time()
try:
# Validate and parse input
coordinates = parse_location(request.input.strip())
if coordinates is None:
raise HTTPException(
status_code=400,
detail="Could not parse coordinates or geocode the location. Please check your input format."
)
lat, lon = coordinates
# Validate radius (reduced max for speed)
radius = max(50, min(500, request.radius_meters or 100))
# Perform enhanced terrain classification
result = await classify_terrain_enhanced_async(lat, lon, radius)
# Minimal rate limiting
await asyncio.sleep(0.1)
total_processing_time = time.time() - request_start_time
response = TerrainResponse(
latitude=lat,
longitude=lon,
elevation=result["elevation"],
terrain_type=result["terrain_type"],
landcover_analysis=result.get("landcover_analysis"),
elevation_confidence=result.get("elevation_confidence"),
coastal_detection=result.get("coastal_detection"),
success=True,
message=f"Enhanced terrain analysis completed with {radius}m radius sampling (Simplified water bodies)",
processing_time=round(total_processing_time, 2)
)
logging.info(f"Analysis completed for ({lat:.6f}, {lon:.6f}) in {total_processing_time:.2f}s")
return response
except HTTPException:
raise
except Exception as e:
logging.error(f"Error analyzing terrain: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@app.get("/api/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"service": "Enhanced Terrain Analysis API",
"version": "2.0.0 - Simplified Water Bodies",
"timestamp": time.time()
}
@app.get("/api/datasets")
async def get_available_datasets():
"""Get information about available datasets"""
return {
"elevation_datasets": DATASETS,
"landcover_datasets": [
{
"name": "ESA WorldCover",
"resolution": "10m",
"coverage": "Global",
"years": [2020, 2021],
"classes": 11
}
]
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=7860,
reload=False,
log_level="info"
)