Spaces:
Sleeping
Sleeping
| """ | |
| Raster Data Model for HazardGuard System | |
| Extracts 9 geospatial features from COG-optimized raster data sources: | |
| 1. Soil type (HWSD2) - 33 soil classifications with database lookup [soil_type.tif] | |
| 2. Elevation (WorldClim) - meters above sea level [elevation.tif] | |
| 3. Population density (GlobPOP) - persons per km² [population_density.tif] | |
| 4. Land cover (Copernicus) - 22 land cover classes [land_cover.tif] | |
| 5. NDVI (MODIS/eVIIRS) - Normalized Difference Vegetation Index [ndvi.tif] | |
| 6. Annual precipitation (WorldClim) - mm per year [annual_precip.tif] | |
| 7. Annual mean temperature (WorldClim) - °C [mean_annual_temp.tif] | |
| 8. Mean wind speed (Global Wind Atlas) - m/s [wind_speed.tif] | |
| 9. Impervious surface (GHSL) - percentage [impervious_surface.tif] | |
| All rasters are Cloud Optimized GeoTIFF (COG) with ZSTD compression, 256x256 tiles. | |
| Data is 100% lossless — identical pixel values to original sources. | |
| Files are served from GCS bucket (satellite-cog-data-for-shrishti) or local fallback | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| import os | |
| import rasterio | |
| from rasterio.warp import transform | |
| import pyproj | |
| import logging | |
| from typing import List, Tuple, Dict, Optional, Any | |
| from datetime import datetime | |
| logger = logging.getLogger(__name__) | |
| # Note: PROJ/GDAL environment setup is done in main.py before any imports | |
| class RasterDataModel: | |
| """Core model for raster data extraction and processing""" | |
| def __init__(self): | |
| """Initialize raster data model""" | |
| self.soil_databases_loaded = False | |
| self.smu_df = None | |
| self.wrb4_lookup = None | |
| # Soil type classification mapping (0-33) | |
| self.soil_classes = { | |
| 'Acrisols': 1, 'Alisols': 2, 'Andosols': 3, 'Arenosols': 4, 'Calcisols': 5, | |
| 'Cambisols': 6, 'Chernozems': 7, 'Ferralsols': 8, 'Fluvisols': 9, 'Gleysols': 10, | |
| 'Gypsisols': 11, 'Histosols': 12, 'Kastanozems': 13, 'Leptosols': 14, 'Lixisols': 15, | |
| 'Luvisols': 16, 'Nitisols': 17, 'Phaeozems': 18, 'Planosols': 19, 'Podzols': 20, | |
| 'Regosols': 21, 'Solonchaks': 22, 'Solonetz': 23, 'Vertisols': 24, 'Unknown': 0, | |
| # Singular forms | |
| 'Acrisol': 1, 'Alisol': 2, 'Andosol': 3, 'Arenosol': 4, 'Calcisol': 5, | |
| 'Cambisol': 6, 'Chernozem': 7, 'Ferralsol': 8, 'Fluvisol': 9, 'Gleysol': 10, | |
| 'Gypsisol': 11, 'Histosol': 12, 'Kastanozem': 13, 'Leptosol': 14, 'Lixisol': 15, | |
| 'Luvisol': 16, 'Nitisol': 17, 'Phaeozem': 18, 'Planosol': 19, 'Podzol': 20, | |
| 'Regosol': 21, 'Solonchak': 22, 'Solonetz': 23, 'Vertisol': 24, | |
| # Additional soil types | |
| 'Anthrosols': 25, 'Cryosols': 26, 'Durisols': 27, 'Ferrasols': 28, 'Plinthosols': 29, | |
| 'Retisols': 30, 'Stagnosols': 31, 'Technosols': 32, 'Umbrisols': 33 | |
| } | |
| # Land cover classification mapping (0-21) | |
| self.land_cover_classes = { | |
| 0: 0, # Unknown (NoData) | |
| 20: 1, # Shrubs | |
| 30: 2, # Herbaceous vegetation | |
| 40: 3, # Cropland | |
| 50: 4, # Urban / built up | |
| 60: 5, # Bare / sparse vegetation | |
| 70: 6, # Snow and ice | |
| 80: 7, # Permanent water bodies | |
| 90: 8, # Herbaceous wetland | |
| 100: 9, # Moss and lichen | |
| 111: 10, # Closed forest, evergreen needle leaf | |
| 112: 11, # Closed forest, evergreen broad leaf | |
| 113: 12, # Closed forest, deciduous needle leaf | |
| 114: 13, # Closed forest, deciduous broad leaf | |
| 115: 14, # Closed forest, mixed | |
| 116: 15, # Closed forest, unknown | |
| 121: 16, # Open forest, evergreen needle leaf | |
| 122: 17, # Open forest, evergreen broad leaf | |
| 123: 18, # Open forest, deciduous needle leaf | |
| 124: 19, # Open forest, deciduous broad leaf | |
| 125: 20, # Open forest, mixed | |
| 126: 21 # Open forest, unknown | |
| } | |
| def load_soil_databases(self, hwsd2_path: str, wrb4_path: str) -> bool: | |
| """Load HWSD2 SMU and WRB4 lookup tables""" | |
| try: | |
| self.smu_df = pd.read_excel(hwsd2_path, index_col='HWSD2_SMU_ID') | |
| wrb4_df = pd.read_excel(wrb4_path) | |
| self.wrb4_lookup = dict(zip(wrb4_df['CODE'], wrb4_df['VALUE'])) | |
| self.soil_databases_loaded = True | |
| logger.info(f"Loaded {len(self.smu_df)} SMU records and {len(self.wrb4_lookup)} WRB4 codes") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error loading soil databases: {e}") | |
| self.soil_databases_loaded = False | |
| return False | |
| def encode_soil_class(self, soil_class_name: str) -> int: | |
| """Encode soil class name to integer (0-33)""" | |
| return self.soil_classes.get(soil_class_name, 0) | |
| def encode_land_cover(self, lc_value: int) -> int: | |
| """Encode Copernicus land cover classes (0-21)""" | |
| return self.land_cover_classes.get(lc_value, 0) | |
| def extract_soil_type(self, coords: List[Tuple[float, float]], raster_path: str) -> List[int]: | |
| """Extract soil type with database lookup""" | |
| if not self.soil_databases_loaded: | |
| logger.error("Soil databases not loaded") | |
| return [0] * len(coords) | |
| try: | |
| with rasterio.open(raster_path) as src: | |
| logger.debug(f"Soil Raster NoData: {src.nodata}") | |
| soil_smus = [val[0] for val in src.sample(coords)] | |
| results = [] | |
| for (lon, lat), soil_smu in zip(coords, soil_smus): | |
| if soil_smu == 65535 or soil_smu == src.nodata or pd.isna(soil_smu): | |
| results.append(0) # Unknown | |
| logger.debug(f"NoData soil for lat={lat}, lon={lon}") | |
| else: | |
| try: | |
| wrb4_code = self.smu_df.loc[int(soil_smu), 'WRB4'] | |
| if pd.isna(wrb4_code) or wrb4_code == '': | |
| soil_class_name = 'Unknown' | |
| else: | |
| soil_class_name = self.wrb4_lookup.get(wrb4_code, 'Unknown') | |
| # Extract main soil class (e.g., "Haplic Acrisols" -> "Acrisols") | |
| soil_main = soil_class_name.split()[-1] if len(soil_class_name.split()) > 1 else soil_class_name | |
| # Try main class first, then full name, then default to 0 | |
| soil_class_encoded = self.encode_soil_class(soil_main) | |
| if soil_class_encoded == 0 and soil_main != soil_class_name: | |
| soil_class_encoded = self.encode_soil_class(soil_class_name) | |
| results.append(soil_class_encoded) | |
| logger.debug(f"Got soil type {soil_class_name} (main: {soil_main}, code {soil_class_encoded}) for lat={lat}, lon={lon}") | |
| except (KeyError, ValueError): | |
| results.append(0) # Unknown | |
| logger.debug(f"Missing soil data for SMU {soil_smu} at lat={lat}, lon={lon}") | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error in soil type extraction: {e}") | |
| return [0] * len(coords) | |
| def extract_elevation(self, coords: List[Tuple[float, float]], raster_path: str) -> List[float]: | |
| """Extract elevation in meters""" | |
| try: | |
| with rasterio.open(raster_path) as src: | |
| logger.debug(f"Elevation Raster NoData: {src.nodata}") | |
| elevations = [val[0] for val in src.sample(coords)] | |
| results = [] | |
| for (lon, lat), elev in zip(coords, elevations): | |
| if elev == src.nodata or pd.isna(elev): | |
| results.append(-9999.0) | |
| logger.debug(f"NoData elevation for lat={lat}, lon={lon}") | |
| else: | |
| # Convert numpy types to native Python float | |
| elev_val = float(elev) if hasattr(elev, 'item') else float(elev) | |
| results.append(round(elev_val, 2)) | |
| logger.debug(f"Got elevation {elev_val:.2f}m for lat={lat}, lon={lon}") | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error in elevation extraction: {e}") | |
| return [-9999.0] * len(coords) | |
| def extract_population_density(self, coords: List[Tuple[float, float]], raster_path: str) -> List[float]: | |
| """Extract population density in persons/km²""" | |
| try: | |
| with rasterio.open(raster_path) as src: | |
| logger.debug(f"Population Raster NoData: {src.nodata}") | |
| populations = [val[0] for val in src.sample(coords)] | |
| results = [] | |
| for (lon, lat), pop in zip(coords, populations): | |
| if pop == src.nodata or pd.isna(pop): | |
| results.append(-9999.0) | |
| logger.debug(f"NoData population for lat={lat}, lon={lon}") | |
| else: | |
| # Convert numpy types to native Python float | |
| pop_val = float(pop) if hasattr(pop, 'item') else float(pop) | |
| results.append(round(pop_val, 2)) | |
| logger.debug(f"Got population density {pop_val:.2f} persons/km² for lat={lat}, lon={lon}") | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error in population extraction: {e}") | |
| return [-9999.0] * len(coords) | |
| def extract_land_cover(self, coords: List[Tuple[float, float]], raster_path: str) -> List[int]: | |
| """Extract land cover classification""" | |
| try: | |
| with rasterio.open(raster_path) as src: | |
| logger.debug(f"Land Cover Raster NoData: {src.nodata}") | |
| landcovers = [val[0] for val in src.sample(coords)] | |
| results = [] | |
| for (lon, lat), lc_code in zip(coords, landcovers): | |
| if lc_code == src.nodata or pd.isna(lc_code) or lc_code not in self.land_cover_classes: | |
| logger.debug(f"NoData or invalid land cover for lat={lat}, lon={lon}") | |
| results.append(0) # Default to 0 (Unknown) | |
| else: | |
| lc_encoded = self.land_cover_classes[int(lc_code)] | |
| logger.debug(f"Got land cover class {lc_encoded} (code: {lc_code}) for lat={lat}, lon={lon}") | |
| results.append(lc_encoded) | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error in land cover extraction: {e}") | |
| return [0] * len(coords) | |
| def extract_ndvi(self, coords: List[Tuple[float, float]], raster_path: str) -> List[float]: | |
| """Extract NDVI with scaling factor /10000""" | |
| try: | |
| with rasterio.open(raster_path) as src: | |
| logger.debug(f"NDVI Raster NoData: {src.nodata}") | |
| ndvi_values = [val[0] for val in src.sample(coords)] | |
| results = [] | |
| for (lon, lat), ndvi_val in zip(coords, ndvi_values): | |
| if ndvi_val == -9999.0 or ndvi_val == src.nodata or pd.isna(ndvi_val): | |
| results.append(-9999.0) | |
| logger.debug(f"NoData NDVI for lat={lat}, lon={lon}") | |
| else: | |
| # Convert numpy types to native Python float | |
| ndvi_raw = float(ndvi_val) if hasattr(ndvi_val, 'item') else float(ndvi_val) | |
| scaled_ndvi = ndvi_raw / 10000.0 | |
| rounded_ndvi = round(scaled_ndvi, 4) | |
| results.append(rounded_ndvi) | |
| logger.debug(f"Got NDVI {rounded_ndvi} for lat={lat}, lon={lon}") | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error in NDVI extraction: {e}") | |
| return [-9999.0] * len(coords) | |
| def extract_annual_precipitation(self, coords: List[Tuple[float, float]], raster_path: str) -> List[int]: | |
| """Extract annual precipitation in mm""" | |
| try: | |
| with rasterio.open(raster_path) as src: | |
| logger.debug(f"Precip Raster NoData: {src.nodata}") | |
| precips = [val[0] for val in src.sample(coords)] | |
| results = [] | |
| for (lon, lat), precip in zip(coords, precips): | |
| if precip == src.nodata or pd.isna(precip): | |
| results.append(-9999) | |
| logger.debug(f"NoData precip for lat={lat}, lon={lon}") | |
| else: | |
| # Convert numpy types to native Python int | |
| precip_val = float(precip) if hasattr(precip, 'item') else float(precip) | |
| rounded_precip = int(round(precip_val, 0)) | |
| results.append(rounded_precip) | |
| logger.debug(f"Got annual precip {rounded_precip} mm for lat={lat}, lon={lon}") | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error in precipitation extraction: {e}") | |
| return [-9999] * len(coords) | |
| def extract_annual_temperature(self, coords: List[Tuple[float, float]], raster_path: str) -> List[float]: | |
| """Extract annual mean temperature in °C""" | |
| try: | |
| with rasterio.open(raster_path) as src: | |
| logger.debug(f"Temp Raster NoData: {src.nodata}") | |
| temps = [val[0] for val in src.sample(coords)] | |
| results = [] | |
| for (lon, lat), temp in zip(coords, temps): | |
| if temp == src.nodata or pd.isna(temp): | |
| results.append(-9999.0) | |
| logger.debug(f"NoData temp for lat={lat}, lon={lon}") | |
| else: | |
| # Convert numpy types to native Python float | |
| temp_val = float(temp) if hasattr(temp, 'item') else float(temp) | |
| rounded_temp = round(temp_val, 1) | |
| results.append(rounded_temp) | |
| logger.debug(f"Got annual mean temp {rounded_temp} °C for lat={lat}, lon={lon}") | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error in temperature extraction: {e}") | |
| return [-9999.0] * len(coords) | |
| def extract_wind_speed(self, coords: List[Tuple[float, float]], raster_path: str) -> List[float]: | |
| """Extract mean wind speed in m/s""" | |
| try: | |
| with rasterio.open(raster_path) as src: | |
| logger.debug(f"Wind Raster NoData: {src.nodata}") | |
| winds = [val[0] for val in src.sample(coords)] | |
| results = [] | |
| for (lon, lat), wind in zip(coords, winds): | |
| if wind == src.nodata or pd.isna(wind): | |
| results.append(-9999.0) | |
| logger.debug(f"NoData wind for lat={lat}, lon={lon}") | |
| else: | |
| # Convert numpy types to native Python float | |
| wind_val = float(wind) if hasattr(wind, 'item') else float(wind) | |
| rounded_wind = round(wind_val, 2) | |
| results.append(rounded_wind) | |
| logger.debug(f"Got mean wind speed {rounded_wind} m/s for lat={lat}, lon={lon}") | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error in wind speed extraction: {e}") | |
| return [-9999.0] * len(coords) | |
| def extract_impervious_surface(self, coords: List[Tuple[float, float]], raster_path: str) -> List[float]: | |
| """Extract impervious surface percentage with CRS transformation""" | |
| try: | |
| # Check if raster file exists (skip check for URLs — rasterio handles them) | |
| is_url = raster_path.startswith('http://') or raster_path.startswith('https://') | |
| if not is_url and not os.path.exists(raster_path): | |
| logger.error(f"Impervious surface raster file not found: {raster_path}") | |
| return [-9999.0] * len(coords) | |
| with rasterio.open(raster_path) as src: | |
| logger.info(f"[IMPERVIOUS] Raster CRS: {src.crs}, NoData: {src.nodata}, dtype: {src.dtypes[0]}") | |
| # Transform coordinates from EPSG:4326 to raster's CRS (Mollweide, ESRI:54009) | |
| # Use pyproj.Transformer directly - more reliable than rasterio.warp.transform | |
| # because pyproj manages its own proj.db path independently | |
| lons = [lon for lon, lat in coords] | |
| lats = [lat for lon, lat in coords] | |
| try: | |
| transformer = pyproj.Transformer.from_crs( | |
| 'EPSG:4326', src.crs.to_string(), always_xy=True | |
| ) | |
| transformed_lons, transformed_lats = transformer.transform(lons, lats) | |
| transformed_coords = list(zip(transformed_lons, transformed_lats)) | |
| for i, (lon, lat) in enumerate(coords): | |
| logger.info(f"[IMPERVIOUS] CRS transform: ({lon}, {lat}) -> ({transformed_lons[i]:.1f}, {transformed_lats[i]:.1f})") | |
| except Exception as transform_error: | |
| logger.error(f"Coordinate transformation failed: {transform_error}") | |
| return [-9999.0] * len(coords) | |
| impervs = [val[0] for val in src.sample(transformed_coords)] | |
| results = [] | |
| for (lon, lat), imperv in zip(coords, impervs): | |
| if imperv == src.nodata or pd.isna(imperv): | |
| results.append(-9999.0) # Standard NoData for impervious | |
| logger.info(f"[IMPERVIOUS] NoData (={src.nodata}) for lat={lat}, lon={lon}") | |
| else: | |
| # Convert numpy types to native Python float | |
| imperv_val = float(imperv) if hasattr(imperv, 'item') else float(imperv) | |
| # Apply scaling factor for GHSL (divide by 100) | |
| scaled_imperv = imperv_val / 100.0 | |
| # Round to 2 decimal places (percentage) | |
| rounded_imperv = round(scaled_imperv, 2) | |
| results.append(rounded_imperv) | |
| logger.info(f"[IMPERVIOUS] lat={lat}, lon={lon} -> raw={int(imperv_val)}, scaled={rounded_imperv}%") | |
| return results | |
| except rasterio.errors.RasterioIOError as io_error: | |
| logger.error(f"Rasterio I/O error in impervious surface extraction: {io_error}") | |
| return [-9999.0] * len(coords) | |
| except Exception as e: | |
| logger.error(f"Error in impervious surface extraction: {e}") | |
| logger.error(f"Raster path: {raster_path}") | |
| return [-9999.0] * len(coords) | |
| def extract_all_features(self, coords: List[Tuple[float, float]], raster_paths: Dict[str, str]) -> Dict[str, List[Any]]: | |
| """Extract all 9 raster features in a single operation""" | |
| logger.info(f"Extracting all raster features for {len(coords)} coordinates") | |
| # Setup PROJ paths for all raster operations (handles Flask reloader) | |
| # Use environment variables set by main.py, with fallback auto-detection | |
| from pathlib import Path | |
| proj_lib = os.environ.get('PROJ_LIB', '') | |
| gdal_data = os.environ.get('GDAL_DATA', '') | |
| # Fallback: try common locations if env vars are not set (cross-platform) | |
| if not proj_lib: | |
| candidates = [] | |
| try: | |
| import rasterio as _rio | |
| candidates.append(Path(_rio.__file__).parent / "proj_data") | |
| except ImportError: | |
| pass | |
| try: | |
| import pyproj as _pp | |
| candidates.append(Path(_pp.datadir.get_data_dir())) | |
| except (ImportError, AttributeError): | |
| pass | |
| candidates.extend([Path("/usr/share/proj"), Path("/usr/local/share/proj")]) | |
| for c in candidates: | |
| if c.exists() and (c / "proj.db").exists(): | |
| proj_lib = str(c) | |
| break | |
| if not gdal_data: | |
| candidates = [] | |
| try: | |
| from osgeo import gdal as _gdal | |
| candidates.append(Path(_gdal.__file__).parent / "data" / "gdal") | |
| candidates.append(Path(_gdal.__file__).parent / "data") | |
| except ImportError: | |
| pass | |
| candidates.extend([Path("/usr/share/gdal"), Path("/usr/local/share/gdal")]) | |
| for c in candidates: | |
| if c.exists(): | |
| gdal_data = str(c) | |
| break | |
| # Suppress noisy PROJ "Cannot find proj.db" warnings from rasterio/GDAL | |
| # These are harmless for rasters that don't need CRS transformation | |
| rasterio_logger = logging.getLogger('rasterio._env') | |
| original_level = rasterio_logger.level | |
| rasterio_logger.setLevel(logging.CRITICAL) | |
| try: | |
| # Wrap ALL raster operations in PROJ environment | |
| env_kwargs = {'PROJ_IGNORE_CELESTIAL_BODY': '1'} | |
| if proj_lib: | |
| env_kwargs['PROJ_LIB'] = proj_lib | |
| if gdal_data: | |
| env_kwargs['GDAL_DATA'] = gdal_data | |
| with rasterio.Env(**env_kwargs): | |
| return self._extract_all_features_internal(coords, raster_paths) | |
| finally: | |
| rasterio_logger.setLevel(original_level) | |
| def _extract_all_features_internal(self, coords: List[Tuple[float, float]], raster_paths: Dict[str, str]) -> Dict[str, List[Any]]: | |
| """Internal method for feature extraction (called within PROJ environment)""" | |
| results = {} | |
| # Extract soil type | |
| if 'soil' in raster_paths and self.soil_databases_loaded: | |
| results['soil_type'] = self.extract_soil_type(coords, raster_paths['soil']) | |
| else: | |
| results['soil_type'] = [0] * len(coords) | |
| logger.warning("Soil data not available or databases not loaded") | |
| # Extract elevation | |
| if 'elevation' in raster_paths: | |
| results['elevation_m'] = self.extract_elevation(coords, raster_paths['elevation']) | |
| else: | |
| results['elevation_m'] = [-9999.0] * len(coords) | |
| logger.warning("Elevation data not available") | |
| # Extract population density | |
| if 'population' in raster_paths: | |
| results['pop_density_persqkm'] = self.extract_population_density(coords, raster_paths['population']) | |
| else: | |
| results['pop_density_persqkm'] = [-9999.0] * len(coords) | |
| logger.warning("Population data not available") | |
| # Extract land cover | |
| if 'landcover' in raster_paths: | |
| results['land_cover_class'] = self.extract_land_cover(coords, raster_paths['landcover']) | |
| else: | |
| results['land_cover_class'] = [0] * len(coords) | |
| logger.warning("Land cover data not available") | |
| # Extract NDVI | |
| if 'ndvi' in raster_paths: | |
| results['ndvi'] = self.extract_ndvi(coords, raster_paths['ndvi']) | |
| else: | |
| results['ndvi'] = [-9999.0] * len(coords) | |
| logger.warning("NDVI data not available") | |
| # Extract annual precipitation | |
| if 'precip' in raster_paths: | |
| results['annual_precip_mm'] = self.extract_annual_precipitation(coords, raster_paths['precip']) | |
| else: | |
| results['annual_precip_mm'] = [-9999] * len(coords) | |
| logger.warning("Precipitation data not available") | |
| # Extract annual temperature | |
| if 'temp' in raster_paths: | |
| results['annual_mean_temp_c'] = self.extract_annual_temperature(coords, raster_paths['temp']) | |
| else: | |
| results['annual_mean_temp_c'] = [-9999.0] * len(coords) | |
| logger.warning("Temperature data not available") | |
| # Extract wind speed | |
| if 'wind' in raster_paths: | |
| results['mean_wind_speed_ms'] = self.extract_wind_speed(coords, raster_paths['wind']) | |
| else: | |
| results['mean_wind_speed_ms'] = [-9999.0] * len(coords) | |
| logger.warning("Wind data not available") | |
| # Extract impervious surface | |
| if 'impervious' in raster_paths: | |
| results['impervious_surface_pct'] = self.extract_impervious_surface(coords, raster_paths['impervious']) | |
| else: | |
| results['impervious_surface_pct'] = [-9999.0] * len(coords) | |
| logger.warning("Impervious surface data not available") | |
| logger.info(f"Successfully extracted all raster features for {len(coords)} coordinates") | |
| return results | |
| def validate_coordinates(self, coords: List[Tuple[float, float]]) -> bool: | |
| """Validate coordinate format and ranges""" | |
| try: | |
| for lon, lat in coords: | |
| # Check if coordinates are numeric | |
| if not isinstance(lon, (int, float)) or not isinstance(lat, (int, float)): | |
| return False | |
| # Check coordinate ranges | |
| if not (-180 <= lon <= 180) or not (-90 <= lat <= 90): | |
| return False | |
| return True | |
| except Exception: | |
| return False | |
| def get_feature_info(self) -> Dict[str, Any]: | |
| """Get information about available raster features""" | |
| return { | |
| 'features': { | |
| 'soil_type': { | |
| 'description': 'Soil classification (HWSD2)', | |
| 'range': '0-33 (encoded classes)', | |
| 'classes': len(self.soil_classes), | |
| 'unit': 'categorical' | |
| }, | |
| 'elevation_m': { | |
| 'description': 'Elevation above sea level', | |
| 'range': 'varies by location', | |
| 'unit': 'meters' | |
| }, | |
| 'pop_density_persqkm': { | |
| 'description': 'Population density', | |
| 'range': '0-∞', | |
| 'unit': 'persons/km²' | |
| }, | |
| 'land_cover_class': { | |
| 'description': 'Land cover classification (Copernicus)', | |
| 'range': '0-21 (encoded classes)', | |
| 'classes': len(self.land_cover_classes), | |
| 'unit': 'categorical' | |
| }, | |
| 'ndvi': { | |
| 'description': 'Normalized Difference Vegetation Index', | |
| 'range': '-1.0 to 1.0', | |
| 'unit': 'index' | |
| }, | |
| 'annual_precip_mm': { | |
| 'description': 'Annual precipitation', | |
| 'range': '0-∞', | |
| 'unit': 'mm/year' | |
| }, | |
| 'annual_mean_temp_c': { | |
| 'description': 'Annual mean temperature', | |
| 'range': 'varies by location', | |
| 'unit': '°C' | |
| }, | |
| 'mean_wind_speed_ms': { | |
| 'description': 'Mean wind speed', | |
| 'range': '0-∞', | |
| 'unit': 'm/s' | |
| }, | |
| 'impervious_surface_pct': { | |
| 'description': 'Impervious surface coverage', | |
| 'range': '0-100', | |
| 'unit': 'percentage' | |
| } | |
| }, | |
| 'total_features': 9, | |
| 'nodata_values': { | |
| 'numeric': -9999.0, | |
| 'categorical': 0 | |
| }, | |
| 'coordinate_system': 'EPSG:4326 (WGS84)', | |
| 'soil_databases_loaded': self.soil_databases_loaded | |
| } |