from fastapi import FastAPI, HTTPException from fastapi.responses import FileResponse, PlainTextResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pydantic import BaseModel import os import signal import re import threading import time import math import logging import requests import rasterio import numpy as np from collections import Counter from typing import Optional, Tuple, Dict, List import urllib3 import asyncio import concurrent.futures from functools import lru_cache import geopy.distance from geopy import Point # Disable SSL verification warnings for testing urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) os.environ["GDAL_HTTP_UNSAFESSL"] = "YES" # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) app = FastAPI(title="Enhanced Terrain Analysis API", version="2.0.0") # Enable CORS for frontend communication app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Enhanced Pydantic models for request/response class LocationRequest(BaseModel): input: str radius_meters: Optional[int] = 100 detailed_analysis: Optional[bool] = True class LandCoverAnalysis(BaseModel): primary_classification: str confidence: float total_samples: int valid_samples: int classifications: Dict[str, int] percentages: Dict[str, float] radius_meters: int sampling_method: str class TerrainResponse(BaseModel): latitude: float longitude: float elevation: Optional[float] = None terrain_type: str landcover_analysis: Optional[LandCoverAnalysis] = None elevation_confidence: Optional[str] = None coastal_detection: Optional[bool] = None success: bool message: Optional[str] = None processing_time: Optional[float] = None # Enhanced dataset configuration with bathymetry support DATASETS = [ {'name': 'nzdem8m', 'min_lat': -47.6, 'max_lat': -34.4, 'min_lon': 166.5, 'max_lon': 178.6, 'resolution': 8}, {'name': 'ned10m', 'min_lat': 19.5, 'max_lat': 49.4, 'min_lon': -160.3,'max_lon': -66.9, 'resolution': 10}, {'name': 'eudem25m', 'min_lat': 25.0, 'max_lat': 72.0, 'min_lon': -60.0, 'max_lon': 40.0, 'resolution': 25}, {'name': 'mapzen', 'min_lat': -90.0, 'max_lat': 90.0, 'min_lon': -180.0,'max_lon': 180.0, 'resolution': 30}, {'name': 'aster30m', 'min_lat': -90.0, 'max_lat': 90.0, 'min_lon': -180.0,'max_lon': 180.0, 'resolution': 30}, {'name': 'srtm30m', 'min_lat': -60.0, 'max_lat': 60.0, 'min_lon': -180.0,'max_lon': 180.0, 'resolution': 30}, {'name': 'srtm90m', 'min_lat': -60.0, 'max_lat': 60.0, 'min_lon': -180.0,'max_lon': 180.0, 'resolution': 90}, {'name': 'gebco2020', 'min_lat': -90.0, 'max_lat': 90.0, 'min_lon': -180.0,'max_lon': 180.0, 'resolution': 450}, {'name': 'etopo1', 'min_lat': -90.0, 'max_lat': 90.0, 'min_lon': -180.0,'max_lon': 180.0, 'resolution': 1800}, ] # HTTP session with optimized settings session = requests.Session() session.verify = False # Optimize session for speed adapter = requests.adapters.HTTPAdapter( pool_connections=10, pool_maxsize=20, max_retries=1 ) session.mount('http://', adapter) session.mount('https://', adapter) def timeout_handler(signum, frame): """Handle timeout signals""" raise TimeoutError("Operation timed out") def periodic_warmup(): """Periodic warmup to keep connections alive""" while True: try: get_elevation_opentopo_single(0, 0) except Exception as e: logging.warning(f"Periodic warmup elevation failed: {e}") try: sample_worldcover(0, 0) except Exception as e: logging.warning(f"Periodic warmup worldcover failed: {e}") # Sleep for 5 minutes (adjust as needed) time.sleep(300) def warm_up_connections(): """Enhanced warm up with bathymetry services""" try: # Test elevation API test_url = "https://api.opentopodata.org/v1/aster30m?locations=40.7589,-73.9851" session.get(test_url, timeout=5) # Test SRTMGL1 for offshore test_url2 = "https://api.opentopodata.org/v1/srtmgl1?locations=40.0,-70.0" session.get(test_url2, timeout=5) # Test ETOPO1 for deep bathymetry test_url3 = "https://api.opentopodata.org/v1/etopo1?locations=35.0,-75.0" session.get(test_url3, timeout=5) # Test WorldCover (small request) test_tile_url = "https://esa-worldcover.s3.eu-central-1.amazonaws.com/v100/2020/map/ESA_WorldCover_10m_2020_v100_N39W075_Map.tif" session.head(test_tile_url, timeout=5) logging.info("Enhanced connection warm-up completed successfully") except Exception as e: logging.warning(f"Connection warm-up failed: {e}") def select_dataset(lat: float, lon: float) -> str: """Select the best available dataset for given coordinates""" candidates = [ ds for ds in DATASETS if ds['min_lat'] <= lat <= ds['max_lat'] and ds['min_lon'] <= lon <= ds['max_lon'] ] if not candidates: logging.debug(f"No regional dataset covers ({lat}, {lon}); falling back to 'mapzen'.") return 'mapzen' chosen = min(candidates, key=lambda x: x['resolution'])['name'] logging.debug(f"Selected OpenTopoData dataset '{chosen}' for ({lat}, {lon}).") return chosen def get_elevation_opentopo_single(lat: float, lon: float) -> Tuple[Optional[float], str]: """Fetch elevation for a single coordinate from OpenTopoData with enhanced fallback""" dataset = select_dataset(lat, lon) url = f"https://api.opentopodata.org/v1/{dataset}?locations={lat},{lon}" try: resp = session.get(url, timeout=10) if resp.status_code == 200: data = resp.json() if data.get('status') == 'OK': elev = data['results'][0].get('elevation') if elev is not None: logging.debug(f"Got elevation {elev} m from '{dataset}' for ({lat}, {lon}).") return elev, f"High ({dataset})" else: logging.warning(f"Null elevation from '{dataset}' for ({lat}, {lon}).") else: logging.warning(f"OpenTopoData status '{data.get('status')}' for '{dataset}' at ({lat}, {lon}).") else: logging.warning(f"HTTP {resp.status_code} from OpenTopoData for '{dataset}' at ({lat}, {lon}).") except requests.RequestException as e: logging.error(f"OpenTopoData request exception at ({lat}, {lon}): {e}") # Fallback to 'mapzen' if primary dataset failed if dataset != 'mapzen': try: url2 = f"https://api.opentopodata.org/v1/mapzen?locations={lat},{lon}" resp2 = session.get(url2, timeout=10) if resp2.status_code == 200: data2 = resp2.json() if data2.get('status') == 'OK': elev2 = data2['results'][0].get('elevation') if elev2 is not None: logging.debug(f"Got elevation {elev2} m from fallback 'mapzen' for ({lat}, {lon}).") return elev2, "Medium (mapzen fallback)" except requests.RequestException as e2: logging.error(f"Fallback OpenTopoData request exception at ({lat}, {lon}): {e2}") return None, "No data available" def is_coastal_location(lat: float, lon: float, elevation: Optional[float] = None) -> bool: """Enhanced coastal detection with stricter criteria to avoid false positives""" # Method 1: Check distance to nearest coastline using known coastal points # REDUCED distance threshold to avoid marking offshore areas as coastal # Major coastal reference points (same as before) coastal_reference_points = [ # North America - Atlantic Coast (40.7589, -73.9851), # New York Harbor (25.7617, -80.1918), # Miami (32.0835, -81.0998), # Savannah (39.2904, -76.6122), # Baltimore (42.3601, -71.0589), # Boston (44.3106, -69.7795), # Maine Coast # North America - Pacific Coast (37.7749, -122.4194), # San Francisco (34.0522, -118.2437), # Los Angeles (47.6062, -122.3321), # Seattle (49.2827, -123.1207), # Vancouver (32.7157, -117.1611), # San Diego # North America - Gulf Coast (29.9511, -90.0715), # New Orleans (29.7604, -95.3698), # Houston (27.7663, -82.6404), # Tampa # Europe - Atlantic Coast (51.5074, -0.1278), # London Thames (53.3498, -6.2603), # Dublin (55.9533, -3.1883), # Edinburgh (59.9139, 10.7522), # Oslo (60.1699, 24.9384), # Helsinki # Europe - Mediterranean (41.9028, 12.4964), # Rome (43.2965, 5.3698), # Marseille (41.3851, 2.1734), # Barcelona (36.7213, -4.4214), # Malaga (37.9755, 23.7348), # Athens # Asia - Pacific Coast (35.6762, 139.6503), # Tokyo Bay (22.3193, 114.1694), # Hong Kong (1.3521, 103.8198), # Singapore (31.2304, 121.4737), # Shanghai (37.5665, 126.9780), # Seoul (Incheon) # Australia/Oceania (-33.8688, 151.2093), # Sydney (-37.8136, 144.9631), # Melbourne (-31.9505, 115.8605), # Perth (-27.4698, 153.0251), # Brisbane # South America (-22.9068, -43.1729), # Rio de Janeiro (-34.6118, -58.3960), # Buenos Aires (-33.4489, -70.6693), # Valparaiso (-12.0464, -77.0428), # Lima # Africa (-33.9249, 18.4241), # Cape Town (30.0444, 31.2357), # Cairo (Nile Delta) (6.5244, 3.3792), # Lagos (-26.2041, 28.0473), # Johannesburg (inland reference) # Middle East (25.2048, 55.2708), # Dubai (26.8206, 30.8025), # Red Sea Coast (31.7683, 35.2137), # Jerusalem (Mediterranean proximity) ] # Calculate minimum distance to any coastal reference point min_distance_km = float('inf') current_point = (lat, lon) for coast_lat, coast_lon in coastal_reference_points: distance = geopy.distance.geodesic(current_point, (coast_lat, coast_lon)).kilometers min_distance_km = min(min_distance_km, distance) # STRICTER: Only within 25km of major coastal city (reduced from 50km) if min_distance_km <= 25: return True # Method 2: Geographic coastal zones with MUCH smaller buffers coastal_zones = [ # North America - East Coast (Atlantic) - REDUCED buffer {'lat_range': (25, 50), 'lon_range': (-85, -65), 'buffer': 0.5}, # Reduced from 2.0 # North America - West Coast (Pacific) - REDUCED buffer {'lat_range': (30, 60), 'lon_range': (-130, -115), 'buffer': 0.5}, # Reduced from 2.0 # North America - Gulf Coast - REDUCED buffer {'lat_range': (25, 35), 'lon_range': (-100, -80), 'buffer': 0.5}, # Reduced from 2.0 # Europe - Atlantic Coast - REDUCED buffer {'lat_range': (35, 70), 'lon_range': (-15, 5), 'buffer': 0.5}, # Reduced from 2.0 # Europe - Mediterranean - REDUCED buffer {'lat_range': (30, 46), 'lon_range': (-6, 42), 'buffer': 0.3}, # Reduced from 1.5 # Europe - Baltic Sea - REDUCED buffer {'lat_range': (53, 66), 'lon_range': (10, 30), 'buffer': 0.3}, # Reduced from 1.5 # Asia - East Coast (Pacific) - REDUCED buffer {'lat_range': (20, 50), 'lon_range': (120, 145), 'buffer': 0.5}, # Reduced from 2.0 # Asia - South/Southeast Coast (Indian Ocean) - REDUCED buffer {'lat_range': (-10, 25), 'lon_range': (65, 140), 'buffer': 0.5}, # Reduced from 2.0 # Australia - All coasts - REDUCED buffer {'lat_range': (-45, -10), 'lon_range': (110, 155), 'buffer': 0.5}, # Reduced from 2.0 # South America - East Coast (Atlantic) - REDUCED buffer {'lat_range': (-55, 15), 'lon_range': (-70, -30), 'buffer': 0.5}, # Reduced from 2.0 # South America - West Coast (Pacific) - REDUCED buffer {'lat_range': (-55, 15), 'lon_range': (-85, -70), 'buffer': 0.5}, # Reduced from 2.0 # Africa - All coasts - REDUCED buffer {'lat_range': (-35, 37), 'lon_range': (-20, 55), 'buffer': 0.5}, # Reduced from 2.0 # Arctic coasts - REDUCED buffer {'lat_range': (65, 85), 'lon_range': (-180, 180), 'buffer': 1.0}, # Reduced from 3.0 # Caribbean and Central America - REDUCED buffer {'lat_range': (10, 30), 'lon_range': (-90, -60), 'buffer': 0.3}, # Reduced from 1.5 # Middle East - REDUCED buffer {'lat_range': (12, 42), 'lon_range': (32, 65), 'buffer': 0.3}, # Reduced from 1.5 ] for zone in coastal_zones: lat_min, lat_max = zone['lat_range'] lon_min, lon_max = zone['lon_range'] buffer = zone['buffer'] if (lat_min - buffer) <= lat <= (lat_max + buffer) and (lon_min - buffer) <= lon <= (lon_max + buffer): # Check if close to zone edges (indicating coastline proximity) edge_distance = min( abs(lat - lat_min), abs(lat - lat_max), abs(lon - lon_min), abs(lon - lon_max) ) # STRICTER: Only if very close to edges if edge_distance <= buffer * 0.5: # Even stricter threshold return True # Method 3: Island detection (small land masses surrounded by water) # Check for known island regions with SMALLER ranges island_regions = [ # Caribbean Islands - REDUCED range {'lat_range': (10, 28), 'lon_range': (-85, -60)}, # Pacific Islands - REDUCED range {'lat_range': (-25, 25), 'lon_range': (120, -120)}, # Crosses dateline # Mediterranean Islands - REDUCED range {'lat_range': (35, 42), 'lon_range': (8, 20)}, # British Isles - REDUCED range {'lat_range': (49, 61), 'lon_range': (-11, 2)}, # Japanese Islands - REDUCED range {'lat_range': (24, 46), 'lon_range': (123, 146)}, # Indonesian Islands - REDUCED range {'lat_range': (-11, 6), 'lon_range': (95, 141)}, ] for island in island_regions: lat_min, lat_max = island['lat_range'] lon_min, lon_max = island['lon_range'] if lat_min <= lat <= lat_max and lon_min <= lon <= lon_max: # Additional check: only if elevation suggests land nearby if elevation is not None and elevation > -50: # Not deep ocean return True return False def is_onshore_water_body(lat: float, lon: float, elevation: Optional[float] = None, landcover_analysis: Optional[LandCoverAnalysis] = None) -> bool: """ SIMPLIFIED: Detect onshore water bodies (lakes, reservoirs, rivers) Based on your requirements: positive elevation or up to -2m elevation = Water Body """ # MAIN CRITERION: Elevation-based detection if elevation is not None: # If elevation is positive or down to -2m, it's an onshore water body if elevation >= -2: return True # Additional context check for borderline cases if elevation is not None and -5 <= elevation < -2: # For slightly deeper areas, check if there's significant land nearby if landcover_analysis and landcover_analysis.classifications: land_types = ['Urban', 'Farmland', 'Woodland', 'Grassland', 'Desert', 'Shrubland', 'Tundra', 'Arctic/mountain'] land_percentage = sum( landcover_analysis.percentages.get(land_type, 0) for land_type in land_types ) # If more than 30% land nearby, likely an onshore water body if land_percentage > 30: return True return False def map_worldcover_to_terrain_simplified(code: int, lat: float, lon: float, elevation: Optional[float] = None, landcover_analysis: Optional[LandCoverAnalysis] = None) -> str: """SIMPLIFIED terrain mapping with new water body classification""" # WATER CLASSIFICATION (Code 80 - Permanent water bodies) if code == 80: # Permanent water bodies # CORRECTED LOGIC: First check elevation for marine vs onshore if elevation is not None: if elevation >= -2: # Onshore water body threshold return "Water Body" else: # Marine water (elevation < -2) depth = -elevation if depth <= 200: return "Continental Shelf" else: return "Deep Ocean" # This will now work correctly # Fallback: if no elevation data, check if it's onshore water body if is_onshore_water_body(lat, lon, elevation, landcover_analysis): return "Water Body" # Default fallback for unclear cases return "Water Body" # COASTAL LOGIC for non-water codes (keep existing logic but simplified) is_coastal = is_coastal_location(lat, lon, elevation) if is_coastal: if code == 60: # Bare/sparse vegetation return "Tidal Zone" # Keep tidal zone for coastal bare areas elif code == 95: # Mangroves return "Tidal Zone" # Keep tidal zone for mangroves elif code == 90: # Herbaceous wetland return "Tidal Zone" # Keep tidal zone for coastal wetlands elif code in [10, 20, 30]: # Vegetation near coast # Check elevation for coastal vegetation if elevation is not None and elevation <= 20: # Up to 20m for coastal vegetation return "Tidal Zone" # STANDARD TERRAIN CLASSIFICATION for non-coastal areas (unchanged) if code == 10: # Tree cover if elevation is not None and elevation > 2000: return "Arctic/mountain" elif -23.5 <= lat <= 23.5: # Tropical zone if elevation is not None and elevation < 500: return "Jungle" else: return "Closed forest" else: return "Woodland" elif code == 20: # Shrubland if elevation is not None and elevation > 2000: return "Arctic/mountain" else: return "Shrubland" elif code == 30: # Grassland if lat > 60 or lat < -60: return "Tundra" else: return "Grassland" elif code == 40: # Cropland return "Farmland" elif code == 50: # Built-up return "Urban" elif code == 60: # Bare/sparse vegetation # Non-coastal bare areas if lat > 66.5 or lat < -66.5: return "Arctic/mountain" elif elevation is not None and elevation > 2500: return "Arctic/mountain" else: return "Desert" elif code == 70: # Snow and ice return "Arctic/mountain" elif code == 90: # Herbaceous wetland return "Swamp" elif code == 95: # Mangroves return "Tidal Zone" # Mangroves are always tidal elif code == 100: # Moss and lichen return "Tundra" else: # Unknown codes - IMPROVED logic for water classification when WorldCover fails if elevation is not None: if elevation >= -2: # Onshore water body threshold return "Water Body" elif elevation < -2: # Marine water - THIS IS THE KEY FIX depth = -elevation # Convert negative elevation to positive depth if depth <= 200: return "Continental Shelf" else: return "Deep Ocean" # This should now work for deep ocean elif is_coastal_location(lat, lon, elevation) and elevation <= 10: return "Tidal Zone" # If no elevation data available, try to infer from location if is_coastal_location(lat, lon, elevation): return "Tidal Zone" return "Unknown" def detect_coastal_area(lat: float, lon: float, elevation: Optional[float], landcover_analysis: Optional[LandCoverAnalysis]) -> bool: """IMPROVED coastal area detection with stricter criteria""" if elevation is None: return False # Primary indicators for tidal zone (shoreline areas) is_low_elevation = -2 <= elevation <= 10 # Reduced range for more accurate tidal zone detection # Check land cover analysis for water presence has_water_nearby = False if landcover_analysis and landcover_analysis.classifications: water_types = ['Continental Shelf', 'Deep Ocean', 'Water Body', 'Swamp', 'Tidal Zone'] # Updated water types water_percentage = sum( landcover_analysis.percentages.get(water_type, 0) for water_type in water_types ) has_water_nearby = water_percentage > 20 # Increased threshold for more accuracy # Geographic coastal detection (now stricter) is_geographically_coastal = is_coastal_location(lat, lon, elevation) # Enhanced tidal zone detection - must meet multiple criteria return (is_low_elevation and is_geographically_coastal) or (has_water_nearby and is_geographically_coastal) def generate_sample_points(center_lat: float, center_lon: float, radius_meters: int = 100, num_points: int = 16) -> List[Tuple[float, float]]: """Optimized sample point generation with fewer points for speed""" points = [(center_lat, center_lon)] # Reduced number of points for faster processing if radius_meters <= 100: num_points = 12 # Reduced from 25 elif radius_meters <= 300: num_points = 20 # Reduced from 49 else: num_points = 30 # Reduced from 81 # Convert radius from meters to degrees lat_factor = 111320.0 lon_factor = 111320.0 * math.cos(math.radians(center_lat)) radius_lat = radius_meters / lat_factor radius_lon = radius_meters / lon_factor # Generate points in 2 concentric circles (reduced from 3) circles = 2 points_per_circle = (num_points - 1) // circles for circle in range(1, circles + 1): circle_radius_lat = radius_lat * (circle / circles) circle_radius_lon = radius_lon * (circle / circles) points_in_this_circle = points_per_circle if circle == circles: points_in_this_circle += (num_points - 1) % circles for i in range(points_in_this_circle): angle = 2 * math.pi * i / points_in_this_circle lat_offset = circle_radius_lat * math.cos(angle) lon_offset = circle_radius_lon * math.sin(angle) sample_lat = center_lat + lat_offset sample_lon = center_lon + lon_offset if -90 <= sample_lat <= 90 and -180 <= sample_lon <= 180: points.append((sample_lat, sample_lon)) return points @lru_cache(maxsize=500) def worldcover_tile(lat: float, lon: float) -> str: """Cached tile computation""" lat_floor = math.floor(lat / 3) * 3 lon_floor = math.floor(lon / 3) * 3 if lat_floor >= 0: lat_label = f"N{int(lat_floor):02d}" else: lat_label = f"S{int(abs(lat_floor)):02d}" if lon_floor >= 0: lon_label = f"E{int(lon_floor):03d}" else: lon_label = f"W{int(abs(lon_floor)):03d}" return lat_label + lon_label def sample_worldcover(lat: float, lon: float, year: int = 2020, version: str = "v100") -> Optional[int]: """Optimized WorldCover sampling with faster timeout""" tile = worldcover_tile(lat, lon) url = ( f"https://esa-worldcover.s3.eu-central-1.amazonaws.com/" f"{version}/{year}/map/ESA_WorldCover_10m_{year}_{version}_{tile}_Map.tif" ) try: with rasterio.Env(GDAL_HTTP_TIMEOUT=10): # Reduced timeout with rasterio.open(url) as ds: row, col = ds.index(lon, lat) if 0 <= row < ds.height and 0 <= col < ds.width: arr = ds.read(1, window=((row, row+1), (col, col+1))) val = arr[0, 0] if ds.nodata is not None and val == ds.nodata: return None return int(val) else: return None except Exception as e: logging.warning(f"WorldCover sampling failed for {tile}: {e}") return None async def get_landcover_consensus_async(lat: float, lon: float, radius_meters: int = 100) -> Tuple[Optional[int], str, Optional[LandCoverAnalysis]]: """Async land cover classification with parallel processing""" start_time = time.time() # Limit radius and reduce sample points for speed radius_meters = max(50, min(500, radius_meters)) # Fewer sample points for faster processing if radius_meters <= 100: num_points = 12 elif radius_meters <= 300: num_points = 20 else: num_points = 25 sample_points = generate_sample_points(lat, lon, radius_meters, num_points) # Get elevation for enhanced terrain mapping elevation, _ = get_elevation_opentopo_single(lat, lon) # Parallel processing of sample points with better timeout handling classifications = [] valid_samples = 0 # Use ThreadPoolExecutor for parallel requests with improved timeout handling with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Submit all sampling tasks future_to_point = { executor.submit(sample_worldcover, sample_lat, sample_lon): (sample_lat, sample_lon) for sample_lat, sample_lon in sample_points[:15] # Limit to 15 points for speed } # Collect results with improved timeout handling completed_futures = [] try: # Wait for completion with overall timeout completed_futures = list(concurrent.futures.as_completed(future_to_point, timeout=12)) except concurrent.futures.TimeoutError: logging.warning("Some futures timed out, processing completed ones") # Get completed futures without timeout completed_futures = [f for f in future_to_point.keys() if f.done()] # Process completed futures for future in completed_futures: try: sample_lat, sample_lon = future_to_point[future] code = future.result(timeout=1) # Short timeout for individual results if code is not None: terrain_type = map_worldcover_to_terrain_simplified(code, sample_lat, sample_lon, elevation) # Updated function call classifications.append((code, terrain_type)) valid_samples += 1 except (concurrent.futures.TimeoutError, Exception) as e: logging.warning(f"Sample point failed: {e}") continue # Cancel any remaining futures for future in future_to_point.keys(): if not future.done(): future.cancel() # Ensure we have at least some data if not classifications: # Fallback: try center point only try: center_code = sample_worldcover(lat, lon) if center_code is not None: center_terrain = map_worldcover_to_terrain_simplified(center_code, lat, lon, elevation) # Updated function call classifications.append((center_code, center_terrain)) valid_samples = 1 except Exception as e: logging.error(f"Fallback center point sampling failed: {e}") return None, "Unknown", None # Rest of the function remains the same... terrain_counts = Counter([terrain for _, terrain in classifications]) code_counts = Counter([code for code, _ in classifications]) most_common_terrain = terrain_counts.most_common(1)[0][0] most_common_code = code_counts.most_common(1)[0][0] confidence = (terrain_counts[most_common_terrain] / valid_samples) * 100 percentages = {} for terrain, count in terrain_counts.items(): percentages[terrain] = round((count / valid_samples) * 100, 1) analysis = LandCoverAnalysis( primary_classification=most_common_terrain, confidence=round(confidence, 1), total_samples=len(sample_points), valid_samples=valid_samples, classifications=dict(terrain_counts), percentages=percentages, radius_meters=radius_meters, sampling_method="Parallel Systematic Sampling" ) processing_time = time.time() - start_time logging.info(f"Land cover analysis completed in {processing_time:.2f}s: {most_common_terrain} ({confidence:.1f}% confidence)") return most_common_code, most_common_terrain, analysis async def classify_terrain_enhanced_async(lat: float, lon: float, radius_meters: int = 100) -> Dict[str, any]: """Async enhanced terrain classification with simplified water body logic""" start_time = time.time() # Get elevation and land cover in parallel with better timeout handling elevation_task = asyncio.create_task( asyncio.to_thread(get_elevation_opentopo_single, lat, lon) ) landcover_task = asyncio.create_task( get_landcover_consensus_async(lat, lon, radius_meters) ) # Wait for both tasks with improved error handling elev, elev_confidence = None, "Failed" code, terrain, landcover_analysis = None, "Unknown", None try: # Try to get elevation with timeout elev, elev_confidence = await asyncio.wait_for(elevation_task, timeout=15) except asyncio.TimeoutError: logging.warning("Elevation fetch timed out") elev, elev_confidence = None, "Timeout" except Exception as e: logging.error(f"Elevation fetch failed: {e}") elev, elev_confidence = None, "Error" try: # Try to get landcover with timeout code, terrain, landcover_analysis = await asyncio.wait_for(landcover_task, timeout=18) except asyncio.TimeoutError: logging.warning("Landcover analysis timed out") code, terrain, landcover_analysis = None, "Unknown", None except Exception as e: logging.error(f"Landcover analysis failed: {e}") code, terrain, landcover_analysis = None, "Unknown", None # Cancel any remaining tasks if not elevation_task.done(): elevation_task.cancel() if not landcover_task.done(): landcover_task.cancel() # Coastal detection is_coastal = detect_coastal_area(lat, lon, elev, landcover_analysis) if elev is None: return { "elevation": None, "terrain_type": terrain or "Unknown", "landcover_analysis": landcover_analysis, "elevation_confidence": elev_confidence, "coastal_detection": is_coastal, "processing_time": time.time() - start_time } # SIMPLIFIED terrain classification logic final_terrain = terrain if terrain == "Unknown" and elev is not None: # Use elevation-only classification for ocean areas if elev < -2: # Marine water depth = -elev if depth <= 200: terrain = "Continental Shelf" else: terrain = "Deep Ocean" elif elev >= -2 and elev <= 0: terrain = "Water Body" elif is_coastal_location(lat, lon, elev) and elev <= 10: terrain = "Tidal Zone" # Coastal detection is_coastal = detect_coastal_area(lat, lon, elev, landcover_analysis) # Override terrain classification based on elevation for water bodies if elev is not None and terrain in ["Water Body", "Continental Shelf", "Deep Ocean"]: if elev >= -2: # Onshore water body threshold final_terrain = "Water Body" elif elev < -2: # Marine water depth = -elev if depth <= 200: final_terrain = "Continental Shelf" else: final_terrain = "Deep Ocean" elif is_coastal and elev is not None and elev <= 10: # Keep tidal zone for coastal areas if terrain not in ["Water Body", "Continental Shelf", "Deep Ocean"]: final_terrain = "Tidal Zone" elif elev is not None and elev > 3000: final_terrain = "Arctic/mountain" return { "elevation": elev, "terrain_type": final_terrain, "landcover_analysis": landcover_analysis, "elevation_confidence": elev_confidence, "coastal_detection": is_coastal, "processing_time": time.time() - start_time } def parse_decimal(input_str: str) -> Optional[Tuple[float, float]]: """Parse decimal degree coordinates from input string""" float_pattern = r"[-+]?\d*\.\d+|\d+" nums = re.findall(float_pattern, input_str) if len(nums) >= 2: try: lat = float(nums[0]) lon = float(nums[1]) if -90 <= lat <= 90 and -180 <= lon <= 180: return lat, lon except ValueError: return None return None def parse_dms(input_str: str) -> Optional[Tuple[float, float]]: """Parse DMS (degrees-minutes-seconds) coordinates""" dms_pattern = r"""(?P\d{1,3})[°\s]+(?P\d{1,2})['\s]+(?P\d{1,2}(?:\.\d+)?)[\"\s]*?(?P[NSEW])""" matches = re.finditer(dms_pattern, input_str, re.IGNORECASE | re.VERBOSE) coords = [] for m in matches: deg = float(m.group('deg')) minute = float(m.group('min')) sec = float(m.group('sec')) direction = m.group('dir').upper() dec = deg + minute/60.0 + sec/3600.0 if direction in ('S', 'W'): dec = -dec coords.append(dec) if len(coords) >= 2: lat, lon = coords[0], coords[1] if -90 <= lat <= 90 and -180 <= lon <= 180: return lat, lon return None def geocode_place(place: str) -> Optional[Tuple[float, float]]: """Optimized geocoding with faster timeout""" url = "https://nominatim.openstreetmap.org/search" params = { 'q': place, 'format': 'json', 'limit': 1, 'addressdetails': 1 } headers = { 'User-Agent': "enhanced-terrain-tool/2.0 (terrain-analysis@spglobal.com)" } try: resp = requests.get(url, params=params, headers=headers, timeout=5, verify=False) if resp.status_code == 200: results = resp.json() if results: result = results[0] lat = float(result['lat']) lon = float(result['lon']) logging.info(f"Geocoded '{place}' to ({lat}, {lon})") return lat, lon except requests.RequestException as e: logging.error(f"Geocoding failed for '{place}': {e}") return None def parse_location(input_str: str) -> Optional[Tuple[float, float]]: """Parse location from various input formats or geocode place name""" input_str = input_str.strip() # Try decimal degrees first decimal_coords = parse_decimal(input_str) if decimal_coords: return decimal_coords # Try DMS format dms_coords = parse_dms(input_str) if dms_coords: return dms_coords # Fall back to geocoding return geocode_place(input_str) # Add periodic connection refresh async def refresh_connections(): """Periodically refresh connections to prevent timeouts""" while True: await asyncio.sleep(300) # Every 5 minutes try: # Quick test request to keep connections alive test_url = "https://api.opentopodata.org/v1/aster30m?locations=0,0" session.get(test_url, timeout=3) except: pass # API Endpoints @app.get("/", include_in_schema=False) async def root(): """Serve the main application page""" if not os.path.exists("index.html"): return PlainTextResponse("index.html not found", status_code=404) return FileResponse("index.html") @app.on_event("startup") async def startup_event(): """Initialize services on startup with connection warming""" logging.info("Starting Enhanced Terrain Analysis API v2.0.0 - Simplified Water Bodies") # Initial warmup def do_warmup(): try: get_elevation_opentopo_single(0, 0) except Exception as e: logging.warning(f"Startup warmup elevation failed: {e}") try: sample_worldcover(0, 0) except Exception as e: logging.warning(f"Startup warmup worldcover failed: {e}") threading.Thread(target=do_warmup).start() # Start periodic warmup in a background thread threading.Thread(target=periodic_warmup, daemon=True).start() # Start connection refresh task @app.on_event("startup") async def start_background_tasks(): asyncio.create_task(refresh_connections()) @app.post("/api/analyze", response_model=TerrainResponse) async def analyze_terrain(request: LocationRequest): """Optimized terrain analysis with simplified water body classification""" request_start_time = time.time() try: # Validate and parse input coordinates = parse_location(request.input.strip()) if coordinates is None: raise HTTPException( status_code=400, detail="Could not parse coordinates or geocode the location. Please check your input format." ) lat, lon = coordinates # Validate radius (reduced max for speed) radius = max(50, min(500, request.radius_meters or 100)) # Perform enhanced terrain classification result = await classify_terrain_enhanced_async(lat, lon, radius) # Minimal rate limiting await asyncio.sleep(0.1) total_processing_time = time.time() - request_start_time response = TerrainResponse( latitude=lat, longitude=lon, elevation=result["elevation"], terrain_type=result["terrain_type"], landcover_analysis=result.get("landcover_analysis"), elevation_confidence=result.get("elevation_confidence"), coastal_detection=result.get("coastal_detection"), success=True, message=f"Enhanced terrain analysis completed with {radius}m radius sampling (Simplified water bodies)", processing_time=round(total_processing_time, 2) ) logging.info(f"Analysis completed for ({lat:.6f}, {lon:.6f}) in {total_processing_time:.2f}s") return response except HTTPException: raise except Exception as e: logging.error(f"Error analyzing terrain: {str(e)}") raise HTTPException( status_code=500, detail=f"Internal server error: {str(e)}" ) @app.get("/api/health") async def health_check(): """Health check endpoint""" return { "status": "healthy", "service": "Enhanced Terrain Analysis API", "version": "2.0.0 - Simplified Water Bodies", "timestamp": time.time() } @app.get("/api/datasets") async def get_available_datasets(): """Get information about available datasets""" return { "elevation_datasets": DATASETS, "landcover_datasets": [ { "name": "ESA WorldCover", "resolution": "10m", "coverage": "Global", "years": [2020, 2021], "classes": 11 } ] } if __name__ == "__main__": import uvicorn uvicorn.run( "main:app", host="0.0.0.0", port=7860, reload=False, log_level="info" )