Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import FileResponse, PlainTextResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel | |
| import os | |
| import signal | |
| import re | |
| import threading | |
| import time | |
| import math | |
| import logging | |
| import requests | |
| import rasterio | |
| import numpy as np | |
| from collections import Counter | |
| from typing import Optional, Tuple, Dict, List | |
| import urllib3 | |
| import asyncio | |
| import concurrent.futures | |
| from functools import lru_cache | |
| import geopy.distance | |
| from geopy import Point | |
| # Disable SSL verification warnings for testing | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| os.environ["GDAL_HTTP_UNSAFESSL"] = "YES" | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| datefmt="%Y-%m-%d %H:%M:%S", | |
| ) | |
| app = FastAPI(title="Enhanced Terrain Analysis API", version="2.0.0") | |
| # Enable CORS for frontend communication | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Enhanced Pydantic models for request/response | |
| class LocationRequest(BaseModel): | |
| input: str | |
| radius_meters: Optional[int] = 100 | |
| detailed_analysis: Optional[bool] = True | |
| class LandCoverAnalysis(BaseModel): | |
| primary_classification: str | |
| confidence: float | |
| total_samples: int | |
| valid_samples: int | |
| classifications: Dict[str, int] | |
| percentages: Dict[str, float] | |
| radius_meters: int | |
| sampling_method: str | |
| class TerrainResponse(BaseModel): | |
| latitude: float | |
| longitude: float | |
| elevation: Optional[float] = None | |
| terrain_type: str | |
| landcover_analysis: Optional[LandCoverAnalysis] = None | |
| elevation_confidence: Optional[str] = None | |
| coastal_detection: Optional[bool] = None | |
| success: bool | |
| message: Optional[str] = None | |
| processing_time: Optional[float] = None | |
| # Enhanced dataset configuration with bathymetry support | |
| DATASETS = [ | |
| {'name': 'nzdem8m', 'min_lat': -47.6, 'max_lat': -34.4, 'min_lon': 166.5, 'max_lon': 178.6, 'resolution': 8}, | |
| {'name': 'ned10m', 'min_lat': 19.5, 'max_lat': 49.4, 'min_lon': -160.3,'max_lon': -66.9, 'resolution': 10}, | |
| {'name': 'eudem25m', 'min_lat': 25.0, 'max_lat': 72.0, 'min_lon': -60.0, 'max_lon': 40.0, 'resolution': 25}, | |
| {'name': 'mapzen', 'min_lat': -90.0, 'max_lat': 90.0, 'min_lon': -180.0,'max_lon': 180.0, 'resolution': 30}, | |
| {'name': 'aster30m', 'min_lat': -90.0, 'max_lat': 90.0, 'min_lon': -180.0,'max_lon': 180.0, 'resolution': 30}, | |
| {'name': 'srtm30m', 'min_lat': -60.0, 'max_lat': 60.0, 'min_lon': -180.0,'max_lon': 180.0, 'resolution': 30}, | |
| {'name': 'srtm90m', 'min_lat': -60.0, 'max_lat': 60.0, 'min_lon': -180.0,'max_lon': 180.0, 'resolution': 90}, | |
| {'name': 'gebco2020', 'min_lat': -90.0, 'max_lat': 90.0, 'min_lon': -180.0,'max_lon': 180.0, 'resolution': 450}, | |
| {'name': 'etopo1', 'min_lat': -90.0, 'max_lat': 90.0, 'min_lon': -180.0,'max_lon': 180.0, 'resolution': 1800}, | |
| ] | |
| # HTTP session with optimized settings | |
| session = requests.Session() | |
| session.verify = False | |
| # Optimize session for speed | |
| adapter = requests.adapters.HTTPAdapter( | |
| pool_connections=10, | |
| pool_maxsize=20, | |
| max_retries=1 | |
| ) | |
| session.mount('http://', adapter) | |
| session.mount('https://', adapter) | |
| def timeout_handler(signum, frame): | |
| """Handle timeout signals""" | |
| raise TimeoutError("Operation timed out") | |
| def periodic_warmup(): | |
| """Periodic warmup to keep connections alive""" | |
| while True: | |
| try: | |
| get_elevation_opentopo_single(0, 0) | |
| except Exception as e: | |
| logging.warning(f"Periodic warmup elevation failed: {e}") | |
| try: | |
| sample_worldcover(0, 0) | |
| except Exception as e: | |
| logging.warning(f"Periodic warmup worldcover failed: {e}") | |
| # Sleep for 5 minutes (adjust as needed) | |
| time.sleep(300) | |
| def warm_up_connections(): | |
| """Enhanced warm up with bathymetry services""" | |
| try: | |
| # Test elevation API | |
| test_url = "https://api.opentopodata.org/v1/aster30m?locations=40.7589,-73.9851" | |
| session.get(test_url, timeout=5) | |
| # Test SRTMGL1 for offshore | |
| test_url2 = "https://api.opentopodata.org/v1/srtmgl1?locations=40.0,-70.0" | |
| session.get(test_url2, timeout=5) | |
| # Test ETOPO1 for deep bathymetry | |
| test_url3 = "https://api.opentopodata.org/v1/etopo1?locations=35.0,-75.0" | |
| session.get(test_url3, timeout=5) | |
| # Test WorldCover (small request) | |
| test_tile_url = "https://esa-worldcover.s3.eu-central-1.amazonaws.com/v100/2020/map/ESA_WorldCover_10m_2020_v100_N39W075_Map.tif" | |
| session.head(test_tile_url, timeout=5) | |
| logging.info("Enhanced connection warm-up completed successfully") | |
| except Exception as e: | |
| logging.warning(f"Connection warm-up failed: {e}") | |
| def select_dataset(lat: float, lon: float) -> str: | |
| """Select the best available dataset for given coordinates""" | |
| candidates = [ | |
| ds for ds in DATASETS | |
| if ds['min_lat'] <= lat <= ds['max_lat'] and ds['min_lon'] <= lon <= ds['max_lon'] | |
| ] | |
| if not candidates: | |
| logging.debug(f"No regional dataset covers ({lat}, {lon}); falling back to 'mapzen'.") | |
| return 'mapzen' | |
| chosen = min(candidates, key=lambda x: x['resolution'])['name'] | |
| logging.debug(f"Selected OpenTopoData dataset '{chosen}' for ({lat}, {lon}).") | |
| return chosen | |
| def get_elevation_opentopo_single(lat: float, lon: float) -> Tuple[Optional[float], str]: | |
| """Fetch elevation for a single coordinate from OpenTopoData with enhanced fallback""" | |
| dataset = select_dataset(lat, lon) | |
| url = f"https://api.opentopodata.org/v1/{dataset}?locations={lat},{lon}" | |
| try: | |
| resp = session.get(url, timeout=10) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| if data.get('status') == 'OK': | |
| elev = data['results'][0].get('elevation') | |
| if elev is not None: | |
| logging.debug(f"Got elevation {elev} m from '{dataset}' for ({lat}, {lon}).") | |
| return elev, f"High ({dataset})" | |
| else: | |
| logging.warning(f"Null elevation from '{dataset}' for ({lat}, {lon}).") | |
| else: | |
| logging.warning(f"OpenTopoData status '{data.get('status')}' for '{dataset}' at ({lat}, {lon}).") | |
| else: | |
| logging.warning(f"HTTP {resp.status_code} from OpenTopoData for '{dataset}' at ({lat}, {lon}).") | |
| except requests.RequestException as e: | |
| logging.error(f"OpenTopoData request exception at ({lat}, {lon}): {e}") | |
| # Fallback to 'mapzen' if primary dataset failed | |
| if dataset != 'mapzen': | |
| try: | |
| url2 = f"https://api.opentopodata.org/v1/mapzen?locations={lat},{lon}" | |
| resp2 = session.get(url2, timeout=10) | |
| if resp2.status_code == 200: | |
| data2 = resp2.json() | |
| if data2.get('status') == 'OK': | |
| elev2 = data2['results'][0].get('elevation') | |
| if elev2 is not None: | |
| logging.debug(f"Got elevation {elev2} m from fallback 'mapzen' for ({lat}, {lon}).") | |
| return elev2, "Medium (mapzen fallback)" | |
| except requests.RequestException as e2: | |
| logging.error(f"Fallback OpenTopoData request exception at ({lat}, {lon}): {e2}") | |
| return None, "No data available" | |
| def is_coastal_location(lat: float, lon: float, elevation: Optional[float] = None) -> bool: | |
| """Enhanced coastal detection with stricter criteria to avoid false positives""" | |
| # Method 1: Check distance to nearest coastline using known coastal points | |
| # REDUCED distance threshold to avoid marking offshore areas as coastal | |
| # Major coastal reference points (same as before) | |
| coastal_reference_points = [ | |
| # North America - Atlantic Coast | |
| (40.7589, -73.9851), # New York Harbor | |
| (25.7617, -80.1918), # Miami | |
| (32.0835, -81.0998), # Savannah | |
| (39.2904, -76.6122), # Baltimore | |
| (42.3601, -71.0589), # Boston | |
| (44.3106, -69.7795), # Maine Coast | |
| # North America - Pacific Coast | |
| (37.7749, -122.4194), # San Francisco | |
| (34.0522, -118.2437), # Los Angeles | |
| (47.6062, -122.3321), # Seattle | |
| (49.2827, -123.1207), # Vancouver | |
| (32.7157, -117.1611), # San Diego | |
| # North America - Gulf Coast | |
| (29.9511, -90.0715), # New Orleans | |
| (29.7604, -95.3698), # Houston | |
| (27.7663, -82.6404), # Tampa | |
| # Europe - Atlantic Coast | |
| (51.5074, -0.1278), # London Thames | |
| (53.3498, -6.2603), # Dublin | |
| (55.9533, -3.1883), # Edinburgh | |
| (59.9139, 10.7522), # Oslo | |
| (60.1699, 24.9384), # Helsinki | |
| # Europe - Mediterranean | |
| (41.9028, 12.4964), # Rome | |
| (43.2965, 5.3698), # Marseille | |
| (41.3851, 2.1734), # Barcelona | |
| (36.7213, -4.4214), # Malaga | |
| (37.9755, 23.7348), # Athens | |
| # Asia - Pacific Coast | |
| (35.6762, 139.6503), # Tokyo Bay | |
| (22.3193, 114.1694), # Hong Kong | |
| (1.3521, 103.8198), # Singapore | |
| (31.2304, 121.4737), # Shanghai | |
| (37.5665, 126.9780), # Seoul (Incheon) | |
| # Australia/Oceania | |
| (-33.8688, 151.2093), # Sydney | |
| (-37.8136, 144.9631), # Melbourne | |
| (-31.9505, 115.8605), # Perth | |
| (-27.4698, 153.0251), # Brisbane | |
| # South America | |
| (-22.9068, -43.1729), # Rio de Janeiro | |
| (-34.6118, -58.3960), # Buenos Aires | |
| (-33.4489, -70.6693), # Valparaiso | |
| (-12.0464, -77.0428), # Lima | |
| # Africa | |
| (-33.9249, 18.4241), # Cape Town | |
| (30.0444, 31.2357), # Cairo (Nile Delta) | |
| (6.5244, 3.3792), # Lagos | |
| (-26.2041, 28.0473), # Johannesburg (inland reference) | |
| # Middle East | |
| (25.2048, 55.2708), # Dubai | |
| (26.8206, 30.8025), # Red Sea Coast | |
| (31.7683, 35.2137), # Jerusalem (Mediterranean proximity) | |
| ] | |
| # Calculate minimum distance to any coastal reference point | |
| min_distance_km = float('inf') | |
| current_point = (lat, lon) | |
| for coast_lat, coast_lon in coastal_reference_points: | |
| distance = geopy.distance.geodesic(current_point, (coast_lat, coast_lon)).kilometers | |
| min_distance_km = min(min_distance_km, distance) | |
| # STRICTER: Only within 25km of major coastal city (reduced from 50km) | |
| if min_distance_km <= 25: | |
| return True | |
| # Method 2: Geographic coastal zones with MUCH smaller buffers | |
| coastal_zones = [ | |
| # North America - East Coast (Atlantic) - REDUCED buffer | |
| {'lat_range': (25, 50), 'lon_range': (-85, -65), 'buffer': 0.5}, # Reduced from 2.0 | |
| # North America - West Coast (Pacific) - REDUCED buffer | |
| {'lat_range': (30, 60), 'lon_range': (-130, -115), 'buffer': 0.5}, # Reduced from 2.0 | |
| # North America - Gulf Coast - REDUCED buffer | |
| {'lat_range': (25, 35), 'lon_range': (-100, -80), 'buffer': 0.5}, # Reduced from 2.0 | |
| # Europe - Atlantic Coast - REDUCED buffer | |
| {'lat_range': (35, 70), 'lon_range': (-15, 5), 'buffer': 0.5}, # Reduced from 2.0 | |
| # Europe - Mediterranean - REDUCED buffer | |
| {'lat_range': (30, 46), 'lon_range': (-6, 42), 'buffer': 0.3}, # Reduced from 1.5 | |
| # Europe - Baltic Sea - REDUCED buffer | |
| {'lat_range': (53, 66), 'lon_range': (10, 30), 'buffer': 0.3}, # Reduced from 1.5 | |
| # Asia - East Coast (Pacific) - REDUCED buffer | |
| {'lat_range': (20, 50), 'lon_range': (120, 145), 'buffer': 0.5}, # Reduced from 2.0 | |
| # Asia - South/Southeast Coast (Indian Ocean) - REDUCED buffer | |
| {'lat_range': (-10, 25), 'lon_range': (65, 140), 'buffer': 0.5}, # Reduced from 2.0 | |
| # Australia - All coasts - REDUCED buffer | |
| {'lat_range': (-45, -10), 'lon_range': (110, 155), 'buffer': 0.5}, # Reduced from 2.0 | |
| # South America - East Coast (Atlantic) - REDUCED buffer | |
| {'lat_range': (-55, 15), 'lon_range': (-70, -30), 'buffer': 0.5}, # Reduced from 2.0 | |
| # South America - West Coast (Pacific) - REDUCED buffer | |
| {'lat_range': (-55, 15), 'lon_range': (-85, -70), 'buffer': 0.5}, # Reduced from 2.0 | |
| # Africa - All coasts - REDUCED buffer | |
| {'lat_range': (-35, 37), 'lon_range': (-20, 55), 'buffer': 0.5}, # Reduced from 2.0 | |
| # Arctic coasts - REDUCED buffer | |
| {'lat_range': (65, 85), 'lon_range': (-180, 180), 'buffer': 1.0}, # Reduced from 3.0 | |
| # Caribbean and Central America - REDUCED buffer | |
| {'lat_range': (10, 30), 'lon_range': (-90, -60), 'buffer': 0.3}, # Reduced from 1.5 | |
| # Middle East - REDUCED buffer | |
| {'lat_range': (12, 42), 'lon_range': (32, 65), 'buffer': 0.3}, # Reduced from 1.5 | |
| ] | |
| for zone in coastal_zones: | |
| lat_min, lat_max = zone['lat_range'] | |
| lon_min, lon_max = zone['lon_range'] | |
| buffer = zone['buffer'] | |
| if (lat_min - buffer) <= lat <= (lat_max + buffer) and (lon_min - buffer) <= lon <= (lon_max + buffer): | |
| # Check if close to zone edges (indicating coastline proximity) | |
| edge_distance = min( | |
| abs(lat - lat_min), abs(lat - lat_max), | |
| abs(lon - lon_min), abs(lon - lon_max) | |
| ) | |
| # STRICTER: Only if very close to edges | |
| if edge_distance <= buffer * 0.5: # Even stricter threshold | |
| return True | |
| # Method 3: Island detection (small land masses surrounded by water) | |
| # Check for known island regions with SMALLER ranges | |
| island_regions = [ | |
| # Caribbean Islands - REDUCED range | |
| {'lat_range': (10, 28), 'lon_range': (-85, -60)}, | |
| # Pacific Islands - REDUCED range | |
| {'lat_range': (-25, 25), 'lon_range': (120, -120)}, # Crosses dateline | |
| # Mediterranean Islands - REDUCED range | |
| {'lat_range': (35, 42), 'lon_range': (8, 20)}, | |
| # British Isles - REDUCED range | |
| {'lat_range': (49, 61), 'lon_range': (-11, 2)}, | |
| # Japanese Islands - REDUCED range | |
| {'lat_range': (24, 46), 'lon_range': (123, 146)}, | |
| # Indonesian Islands - REDUCED range | |
| {'lat_range': (-11, 6), 'lon_range': (95, 141)}, | |
| ] | |
| for island in island_regions: | |
| lat_min, lat_max = island['lat_range'] | |
| lon_min, lon_max = island['lon_range'] | |
| if lat_min <= lat <= lat_max and lon_min <= lon <= lon_max: | |
| # Additional check: only if elevation suggests land nearby | |
| if elevation is not None and elevation > -50: # Not deep ocean | |
| return True | |
| return False | |
| def is_onshore_water_body(lat: float, lon: float, elevation: Optional[float] = None, landcover_analysis: Optional[LandCoverAnalysis] = None) -> bool: | |
| """ | |
| SIMPLIFIED: Detect onshore water bodies (lakes, reservoirs, rivers) | |
| Based on your requirements: positive elevation or up to -2m elevation = Water Body | |
| """ | |
| # MAIN CRITERION: Elevation-based detection | |
| if elevation is not None: | |
| # If elevation is positive or down to -2m, it's an onshore water body | |
| if elevation >= -2: | |
| return True | |
| # Additional context check for borderline cases | |
| if elevation is not None and -5 <= elevation < -2: | |
| # For slightly deeper areas, check if there's significant land nearby | |
| if landcover_analysis and landcover_analysis.classifications: | |
| land_types = ['Urban', 'Farmland', 'Woodland', 'Grassland', 'Desert', 'Shrubland', 'Tundra', 'Arctic/mountain'] | |
| land_percentage = sum( | |
| landcover_analysis.percentages.get(land_type, 0) | |
| for land_type in land_types | |
| ) | |
| # If more than 30% land nearby, likely an onshore water body | |
| if land_percentage > 30: | |
| return True | |
| return False | |
| def map_worldcover_to_terrain_simplified(code: int, lat: float, lon: float, elevation: Optional[float] = None, landcover_analysis: Optional[LandCoverAnalysis] = None) -> str: | |
| """SIMPLIFIED terrain mapping with new water body classification""" | |
| # WATER CLASSIFICATION (Code 80 - Permanent water bodies) | |
| if code == 80: # Permanent water bodies | |
| # CORRECTED LOGIC: First check elevation for marine vs onshore | |
| if elevation is not None: | |
| if elevation >= -2: # Onshore water body threshold | |
| return "Water Body" | |
| else: # Marine water (elevation < -2) | |
| depth = -elevation | |
| if depth <= 200: | |
| return "Continental Shelf" | |
| else: | |
| return "Deep Ocean" # This will now work correctly | |
| # Fallback: if no elevation data, check if it's onshore water body | |
| if is_onshore_water_body(lat, lon, elevation, landcover_analysis): | |
| return "Water Body" | |
| # Default fallback for unclear cases | |
| return "Water Body" | |
| # COASTAL LOGIC for non-water codes (keep existing logic but simplified) | |
| is_coastal = is_coastal_location(lat, lon, elevation) | |
| if is_coastal: | |
| if code == 60: # Bare/sparse vegetation | |
| return "Tidal Zone" # Keep tidal zone for coastal bare areas | |
| elif code == 95: # Mangroves | |
| return "Tidal Zone" # Keep tidal zone for mangroves | |
| elif code == 90: # Herbaceous wetland | |
| return "Tidal Zone" # Keep tidal zone for coastal wetlands | |
| elif code in [10, 20, 30]: # Vegetation near coast | |
| # Check elevation for coastal vegetation | |
| if elevation is not None and elevation <= 20: # Up to 20m for coastal vegetation | |
| return "Tidal Zone" | |
| # STANDARD TERRAIN CLASSIFICATION for non-coastal areas (unchanged) | |
| if code == 10: # Tree cover | |
| if elevation is not None and elevation > 2000: | |
| return "Arctic/mountain" | |
| elif -23.5 <= lat <= 23.5: # Tropical zone | |
| if elevation is not None and elevation < 500: | |
| return "Jungle" | |
| else: | |
| return "Closed forest" | |
| else: | |
| return "Woodland" | |
| elif code == 20: # Shrubland | |
| if elevation is not None and elevation > 2000: | |
| return "Arctic/mountain" | |
| else: | |
| return "Shrubland" | |
| elif code == 30: # Grassland | |
| if lat > 60 or lat < -60: | |
| return "Tundra" | |
| else: | |
| return "Grassland" | |
| elif code == 40: # Cropland | |
| return "Farmland" | |
| elif code == 50: # Built-up | |
| return "Urban" | |
| elif code == 60: # Bare/sparse vegetation | |
| # Non-coastal bare areas | |
| if lat > 66.5 or lat < -66.5: | |
| return "Arctic/mountain" | |
| elif elevation is not None and elevation > 2500: | |
| return "Arctic/mountain" | |
| else: | |
| return "Desert" | |
| elif code == 70: # Snow and ice | |
| return "Arctic/mountain" | |
| elif code == 90: # Herbaceous wetland | |
| return "Swamp" | |
| elif code == 95: # Mangroves | |
| return "Tidal Zone" # Mangroves are always tidal | |
| elif code == 100: # Moss and lichen | |
| return "Tundra" | |
| else: | |
| # Unknown codes - IMPROVED logic for water classification when WorldCover fails | |
| if elevation is not None: | |
| if elevation >= -2: # Onshore water body threshold | |
| return "Water Body" | |
| elif elevation < -2: # Marine water - THIS IS THE KEY FIX | |
| depth = -elevation # Convert negative elevation to positive depth | |
| if depth <= 200: | |
| return "Continental Shelf" | |
| else: | |
| return "Deep Ocean" # This should now work for deep ocean | |
| elif is_coastal_location(lat, lon, elevation) and elevation <= 10: | |
| return "Tidal Zone" | |
| # If no elevation data available, try to infer from location | |
| if is_coastal_location(lat, lon, elevation): | |
| return "Tidal Zone" | |
| return "Unknown" | |
| def detect_coastal_area(lat: float, lon: float, elevation: Optional[float], landcover_analysis: Optional[LandCoverAnalysis]) -> bool: | |
| """IMPROVED coastal area detection with stricter criteria""" | |
| if elevation is None: | |
| return False | |
| # Primary indicators for tidal zone (shoreline areas) | |
| is_low_elevation = -2 <= elevation <= 10 # Reduced range for more accurate tidal zone detection | |
| # Check land cover analysis for water presence | |
| has_water_nearby = False | |
| if landcover_analysis and landcover_analysis.classifications: | |
| water_types = ['Continental Shelf', 'Deep Ocean', 'Water Body', 'Swamp', 'Tidal Zone'] # Updated water types | |
| water_percentage = sum( | |
| landcover_analysis.percentages.get(water_type, 0) | |
| for water_type in water_types | |
| ) | |
| has_water_nearby = water_percentage > 20 # Increased threshold for more accuracy | |
| # Geographic coastal detection (now stricter) | |
| is_geographically_coastal = is_coastal_location(lat, lon, elevation) | |
| # Enhanced tidal zone detection - must meet multiple criteria | |
| return (is_low_elevation and is_geographically_coastal) or (has_water_nearby and is_geographically_coastal) | |
| def generate_sample_points(center_lat: float, center_lon: float, radius_meters: int = 100, num_points: int = 16) -> List[Tuple[float, float]]: | |
| """Optimized sample point generation with fewer points for speed""" | |
| points = [(center_lat, center_lon)] | |
| # Reduced number of points for faster processing | |
| if radius_meters <= 100: | |
| num_points = 12 # Reduced from 25 | |
| elif radius_meters <= 300: | |
| num_points = 20 # Reduced from 49 | |
| else: | |
| num_points = 30 # Reduced from 81 | |
| # Convert radius from meters to degrees | |
| lat_factor = 111320.0 | |
| lon_factor = 111320.0 * math.cos(math.radians(center_lat)) | |
| radius_lat = radius_meters / lat_factor | |
| radius_lon = radius_meters / lon_factor | |
| # Generate points in 2 concentric circles (reduced from 3) | |
| circles = 2 | |
| points_per_circle = (num_points - 1) // circles | |
| for circle in range(1, circles + 1): | |
| circle_radius_lat = radius_lat * (circle / circles) | |
| circle_radius_lon = radius_lon * (circle / circles) | |
| points_in_this_circle = points_per_circle | |
| if circle == circles: | |
| points_in_this_circle += (num_points - 1) % circles | |
| for i in range(points_in_this_circle): | |
| angle = 2 * math.pi * i / points_in_this_circle | |
| lat_offset = circle_radius_lat * math.cos(angle) | |
| lon_offset = circle_radius_lon * math.sin(angle) | |
| sample_lat = center_lat + lat_offset | |
| sample_lon = center_lon + lon_offset | |
| if -90 <= sample_lat <= 90 and -180 <= sample_lon <= 180: | |
| points.append((sample_lat, sample_lon)) | |
| return points | |
| def worldcover_tile(lat: float, lon: float) -> str: | |
| """Cached tile computation""" | |
| lat_floor = math.floor(lat / 3) * 3 | |
| lon_floor = math.floor(lon / 3) * 3 | |
| if lat_floor >= 0: | |
| lat_label = f"N{int(lat_floor):02d}" | |
| else: | |
| lat_label = f"S{int(abs(lat_floor)):02d}" | |
| if lon_floor >= 0: | |
| lon_label = f"E{int(lon_floor):03d}" | |
| else: | |
| lon_label = f"W{int(abs(lon_floor)):03d}" | |
| return lat_label + lon_label | |
| def sample_worldcover(lat: float, lon: float, year: int = 2020, version: str = "v100") -> Optional[int]: | |
| """Optimized WorldCover sampling with faster timeout""" | |
| tile = worldcover_tile(lat, lon) | |
| url = ( | |
| f"https://esa-worldcover.s3.eu-central-1.amazonaws.com/" | |
| f"{version}/{year}/map/ESA_WorldCover_10m_{year}_{version}_{tile}_Map.tif" | |
| ) | |
| try: | |
| with rasterio.Env(GDAL_HTTP_TIMEOUT=10): # Reduced timeout | |
| with rasterio.open(url) as ds: | |
| row, col = ds.index(lon, lat) | |
| if 0 <= row < ds.height and 0 <= col < ds.width: | |
| arr = ds.read(1, window=((row, row+1), (col, col+1))) | |
| val = arr[0, 0] | |
| if ds.nodata is not None and val == ds.nodata: | |
| return None | |
| return int(val) | |
| else: | |
| return None | |
| except Exception as e: | |
| logging.warning(f"WorldCover sampling failed for {tile}: {e}") | |
| return None | |
| async def get_landcover_consensus_async(lat: float, lon: float, radius_meters: int = 100) -> Tuple[Optional[int], str, Optional[LandCoverAnalysis]]: | |
| """Async land cover classification with parallel processing""" | |
| start_time = time.time() | |
| # Limit radius and reduce sample points for speed | |
| radius_meters = max(50, min(500, radius_meters)) | |
| # Fewer sample points for faster processing | |
| if radius_meters <= 100: | |
| num_points = 12 | |
| elif radius_meters <= 300: | |
| num_points = 20 | |
| else: | |
| num_points = 25 | |
| sample_points = generate_sample_points(lat, lon, radius_meters, num_points) | |
| # Get elevation for enhanced terrain mapping | |
| elevation, _ = get_elevation_opentopo_single(lat, lon) | |
| # Parallel processing of sample points with better timeout handling | |
| classifications = [] | |
| valid_samples = 0 | |
| # Use ThreadPoolExecutor for parallel requests with improved timeout handling | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: | |
| # Submit all sampling tasks | |
| future_to_point = { | |
| executor.submit(sample_worldcover, sample_lat, sample_lon): (sample_lat, sample_lon) | |
| for sample_lat, sample_lon in sample_points[:15] # Limit to 15 points for speed | |
| } | |
| # Collect results with improved timeout handling | |
| completed_futures = [] | |
| try: | |
| # Wait for completion with overall timeout | |
| completed_futures = list(concurrent.futures.as_completed(future_to_point, timeout=12)) | |
| except concurrent.futures.TimeoutError: | |
| logging.warning("Some futures timed out, processing completed ones") | |
| # Get completed futures without timeout | |
| completed_futures = [f for f in future_to_point.keys() if f.done()] | |
| # Process completed futures | |
| for future in completed_futures: | |
| try: | |
| sample_lat, sample_lon = future_to_point[future] | |
| code = future.result(timeout=1) # Short timeout for individual results | |
| if code is not None: | |
| terrain_type = map_worldcover_to_terrain_simplified(code, sample_lat, sample_lon, elevation) # Updated function call | |
| classifications.append((code, terrain_type)) | |
| valid_samples += 1 | |
| except (concurrent.futures.TimeoutError, Exception) as e: | |
| logging.warning(f"Sample point failed: {e}") | |
| continue | |
| # Cancel any remaining futures | |
| for future in future_to_point.keys(): | |
| if not future.done(): | |
| future.cancel() | |
| # Ensure we have at least some data | |
| if not classifications: | |
| # Fallback: try center point only | |
| try: | |
| center_code = sample_worldcover(lat, lon) | |
| if center_code is not None: | |
| center_terrain = map_worldcover_to_terrain_simplified(center_code, lat, lon, elevation) # Updated function call | |
| classifications.append((center_code, center_terrain)) | |
| valid_samples = 1 | |
| except Exception as e: | |
| logging.error(f"Fallback center point sampling failed: {e}") | |
| return None, "Unknown", None | |
| # Rest of the function remains the same... | |
| terrain_counts = Counter([terrain for _, terrain in classifications]) | |
| code_counts = Counter([code for code, _ in classifications]) | |
| most_common_terrain = terrain_counts.most_common(1)[0][0] | |
| most_common_code = code_counts.most_common(1)[0][0] | |
| confidence = (terrain_counts[most_common_terrain] / valid_samples) * 100 | |
| percentages = {} | |
| for terrain, count in terrain_counts.items(): | |
| percentages[terrain] = round((count / valid_samples) * 100, 1) | |
| analysis = LandCoverAnalysis( | |
| primary_classification=most_common_terrain, | |
| confidence=round(confidence, 1), | |
| total_samples=len(sample_points), | |
| valid_samples=valid_samples, | |
| classifications=dict(terrain_counts), | |
| percentages=percentages, | |
| radius_meters=radius_meters, | |
| sampling_method="Parallel Systematic Sampling" | |
| ) | |
| processing_time = time.time() - start_time | |
| logging.info(f"Land cover analysis completed in {processing_time:.2f}s: {most_common_terrain} ({confidence:.1f}% confidence)") | |
| return most_common_code, most_common_terrain, analysis | |
| async def classify_terrain_enhanced_async(lat: float, lon: float, radius_meters: int = 100) -> Dict[str, any]: | |
| """Async enhanced terrain classification with simplified water body logic""" | |
| start_time = time.time() | |
| # Get elevation and land cover in parallel with better timeout handling | |
| elevation_task = asyncio.create_task( | |
| asyncio.to_thread(get_elevation_opentopo_single, lat, lon) | |
| ) | |
| landcover_task = asyncio.create_task( | |
| get_landcover_consensus_async(lat, lon, radius_meters) | |
| ) | |
| # Wait for both tasks with improved error handling | |
| elev, elev_confidence = None, "Failed" | |
| code, terrain, landcover_analysis = None, "Unknown", None | |
| try: | |
| # Try to get elevation with timeout | |
| elev, elev_confidence = await asyncio.wait_for(elevation_task, timeout=15) | |
| except asyncio.TimeoutError: | |
| logging.warning("Elevation fetch timed out") | |
| elev, elev_confidence = None, "Timeout" | |
| except Exception as e: | |
| logging.error(f"Elevation fetch failed: {e}") | |
| elev, elev_confidence = None, "Error" | |
| try: | |
| # Try to get landcover with timeout | |
| code, terrain, landcover_analysis = await asyncio.wait_for(landcover_task, timeout=18) | |
| except asyncio.TimeoutError: | |
| logging.warning("Landcover analysis timed out") | |
| code, terrain, landcover_analysis = None, "Unknown", None | |
| except Exception as e: | |
| logging.error(f"Landcover analysis failed: {e}") | |
| code, terrain, landcover_analysis = None, "Unknown", None | |
| # Cancel any remaining tasks | |
| if not elevation_task.done(): | |
| elevation_task.cancel() | |
| if not landcover_task.done(): | |
| landcover_task.cancel() | |
| # Coastal detection | |
| is_coastal = detect_coastal_area(lat, lon, elev, landcover_analysis) | |
| if elev is None: | |
| return { | |
| "elevation": None, | |
| "terrain_type": terrain or "Unknown", | |
| "landcover_analysis": landcover_analysis, | |
| "elevation_confidence": elev_confidence, | |
| "coastal_detection": is_coastal, | |
| "processing_time": time.time() - start_time | |
| } | |
| # SIMPLIFIED terrain classification logic | |
| final_terrain = terrain | |
| if terrain == "Unknown" and elev is not None: | |
| # Use elevation-only classification for ocean areas | |
| if elev < -2: # Marine water | |
| depth = -elev | |
| if depth <= 200: | |
| terrain = "Continental Shelf" | |
| else: | |
| terrain = "Deep Ocean" | |
| elif elev >= -2 and elev <= 0: | |
| terrain = "Water Body" | |
| elif is_coastal_location(lat, lon, elev) and elev <= 10: | |
| terrain = "Tidal Zone" | |
| # Coastal detection | |
| is_coastal = detect_coastal_area(lat, lon, elev, landcover_analysis) | |
| # Override terrain classification based on elevation for water bodies | |
| if elev is not None and terrain in ["Water Body", "Continental Shelf", "Deep Ocean"]: | |
| if elev >= -2: # Onshore water body threshold | |
| final_terrain = "Water Body" | |
| elif elev < -2: # Marine water | |
| depth = -elev | |
| if depth <= 200: | |
| final_terrain = "Continental Shelf" | |
| else: | |
| final_terrain = "Deep Ocean" | |
| elif is_coastal and elev is not None and elev <= 10: | |
| # Keep tidal zone for coastal areas | |
| if terrain not in ["Water Body", "Continental Shelf", "Deep Ocean"]: | |
| final_terrain = "Tidal Zone" | |
| elif elev is not None and elev > 3000: | |
| final_terrain = "Arctic/mountain" | |
| return { | |
| "elevation": elev, | |
| "terrain_type": final_terrain, | |
| "landcover_analysis": landcover_analysis, | |
| "elevation_confidence": elev_confidence, | |
| "coastal_detection": is_coastal, | |
| "processing_time": time.time() - start_time | |
| } | |
| def parse_decimal(input_str: str) -> Optional[Tuple[float, float]]: | |
| """Parse decimal degree coordinates from input string""" | |
| float_pattern = r"[-+]?\d*\.\d+|\d+" | |
| nums = re.findall(float_pattern, input_str) | |
| if len(nums) >= 2: | |
| try: | |
| lat = float(nums[0]) | |
| lon = float(nums[1]) | |
| if -90 <= lat <= 90 and -180 <= lon <= 180: | |
| return lat, lon | |
| except ValueError: | |
| return None | |
| return None | |
| def parse_dms(input_str: str) -> Optional[Tuple[float, float]]: | |
| """Parse DMS (degrees-minutes-seconds) coordinates""" | |
| dms_pattern = r"""(?P<deg>\d{1,3})[°\s]+(?P<min>\d{1,2})['\s]+(?P<sec>\d{1,2}(?:\.\d+)?)[\"\s]*?(?P<dir>[NSEW])""" | |
| matches = re.finditer(dms_pattern, input_str, re.IGNORECASE | re.VERBOSE) | |
| coords = [] | |
| for m in matches: | |
| deg = float(m.group('deg')) | |
| minute = float(m.group('min')) | |
| sec = float(m.group('sec')) | |
| direction = m.group('dir').upper() | |
| dec = deg + minute/60.0 + sec/3600.0 | |
| if direction in ('S', 'W'): | |
| dec = -dec | |
| coords.append(dec) | |
| if len(coords) >= 2: | |
| lat, lon = coords[0], coords[1] | |
| if -90 <= lat <= 90 and -180 <= lon <= 180: | |
| return lat, lon | |
| return None | |
| def geocode_place(place: str) -> Optional[Tuple[float, float]]: | |
| """Optimized geocoding with faster timeout""" | |
| url = "https://nominatim.openstreetmap.org/search" | |
| params = { | |
| 'q': place, | |
| 'format': 'json', | |
| 'limit': 1, | |
| 'addressdetails': 1 | |
| } | |
| headers = { | |
| 'User-Agent': "enhanced-terrain-tool/2.0 (terrain-analysis@spglobal.com)" | |
| } | |
| try: | |
| resp = requests.get(url, params=params, headers=headers, timeout=5, verify=False) | |
| if resp.status_code == 200: | |
| results = resp.json() | |
| if results: | |
| result = results[0] | |
| lat = float(result['lat']) | |
| lon = float(result['lon']) | |
| logging.info(f"Geocoded '{place}' to ({lat}, {lon})") | |
| return lat, lon | |
| except requests.RequestException as e: | |
| logging.error(f"Geocoding failed for '{place}': {e}") | |
| return None | |
| def parse_location(input_str: str) -> Optional[Tuple[float, float]]: | |
| """Parse location from various input formats or geocode place name""" | |
| input_str = input_str.strip() | |
| # Try decimal degrees first | |
| decimal_coords = parse_decimal(input_str) | |
| if decimal_coords: | |
| return decimal_coords | |
| # Try DMS format | |
| dms_coords = parse_dms(input_str) | |
| if dms_coords: | |
| return dms_coords | |
| # Fall back to geocoding | |
| return geocode_place(input_str) | |
| # Add periodic connection refresh | |
| async def refresh_connections(): | |
| """Periodically refresh connections to prevent timeouts""" | |
| while True: | |
| await asyncio.sleep(300) # Every 5 minutes | |
| try: | |
| # Quick test request to keep connections alive | |
| test_url = "https://api.opentopodata.org/v1/aster30m?locations=0,0" | |
| session.get(test_url, timeout=3) | |
| except: | |
| pass | |
| # API Endpoints | |
| async def root(): | |
| """Serve the main application page""" | |
| if not os.path.exists("index.html"): | |
| return PlainTextResponse("index.html not found", status_code=404) | |
| return FileResponse("index.html") | |
| async def startup_event(): | |
| """Initialize services on startup with connection warming""" | |
| logging.info("Starting Enhanced Terrain Analysis API v2.0.0 - Simplified Water Bodies") | |
| # Initial warmup | |
| def do_warmup(): | |
| try: | |
| get_elevation_opentopo_single(0, 0) | |
| except Exception as e: | |
| logging.warning(f"Startup warmup elevation failed: {e}") | |
| try: | |
| sample_worldcover(0, 0) | |
| except Exception as e: | |
| logging.warning(f"Startup warmup worldcover failed: {e}") | |
| threading.Thread(target=do_warmup).start() | |
| # Start periodic warmup in a background thread | |
| threading.Thread(target=periodic_warmup, daemon=True).start() | |
| # Start connection refresh task | |
| async def start_background_tasks(): | |
| asyncio.create_task(refresh_connections()) | |
| async def analyze_terrain(request: LocationRequest): | |
| """Optimized terrain analysis with simplified water body classification""" | |
| request_start_time = time.time() | |
| try: | |
| # Validate and parse input | |
| coordinates = parse_location(request.input.strip()) | |
| if coordinates is None: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Could not parse coordinates or geocode the location. Please check your input format." | |
| ) | |
| lat, lon = coordinates | |
| # Validate radius (reduced max for speed) | |
| radius = max(50, min(500, request.radius_meters or 100)) | |
| # Perform enhanced terrain classification | |
| result = await classify_terrain_enhanced_async(lat, lon, radius) | |
| # Minimal rate limiting | |
| await asyncio.sleep(0.1) | |
| total_processing_time = time.time() - request_start_time | |
| response = TerrainResponse( | |
| latitude=lat, | |
| longitude=lon, | |
| elevation=result["elevation"], | |
| terrain_type=result["terrain_type"], | |
| landcover_analysis=result.get("landcover_analysis"), | |
| elevation_confidence=result.get("elevation_confidence"), | |
| coastal_detection=result.get("coastal_detection"), | |
| success=True, | |
| message=f"Enhanced terrain analysis completed with {radius}m radius sampling (Simplified water bodies)", | |
| processing_time=round(total_processing_time, 2) | |
| ) | |
| logging.info(f"Analysis completed for ({lat:.6f}, {lon:.6f}) in {total_processing_time:.2f}s") | |
| return response | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logging.error(f"Error analyzing terrain: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Internal server error: {str(e)}" | |
| ) | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return { | |
| "status": "healthy", | |
| "service": "Enhanced Terrain Analysis API", | |
| "version": "2.0.0 - Simplified Water Bodies", | |
| "timestamp": time.time() | |
| } | |
| async def get_available_datasets(): | |
| """Get information about available datasets""" | |
| return { | |
| "elevation_datasets": DATASETS, | |
| "landcover_datasets": [ | |
| { | |
| "name": "ESA WorldCover", | |
| "resolution": "10m", | |
| "coverage": "Global", | |
| "years": [2020, 2021], | |
| "classes": 11 | |
| } | |
| ] | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run( | |
| "main:app", | |
| host="0.0.0.0", | |
| port=7860, | |
| reload=False, | |
| log_level="info" | |
| ) |