ShrishtiAI-backend / server /models /raster_data_model.py
MEWTROS
My 6 can be your 9
5ccd893
"""
Raster Data Model for HazardGuard System
Extracts 9 geospatial features from COG-optimized raster data sources:
1. Soil type (HWSD2) - 33 soil classifications with database lookup [soil_type.tif]
2. Elevation (WorldClim) - meters above sea level [elevation.tif]
3. Population density (GlobPOP) - persons per km² [population_density.tif]
4. Land cover (Copernicus) - 22 land cover classes [land_cover.tif]
5. NDVI (MODIS/eVIIRS) - Normalized Difference Vegetation Index [ndvi.tif]
6. Annual precipitation (WorldClim) - mm per year [annual_precip.tif]
7. Annual mean temperature (WorldClim) - °C [mean_annual_temp.tif]
8. Mean wind speed (Global Wind Atlas) - m/s [wind_speed.tif]
9. Impervious surface (GHSL) - percentage [impervious_surface.tif]
All rasters are Cloud Optimized GeoTIFF (COG) with ZSTD compression, 256x256 tiles.
Data is 100% lossless — identical pixel values to original sources.
Files are served from GCS bucket (satellite-cog-data-for-shrishti) or local fallback
"""
import pandas as pd
import numpy as np
import os
import rasterio
from rasterio.warp import transform
import pyproj
import logging
from typing import List, Tuple, Dict, Optional, Any
from datetime import datetime
logger = logging.getLogger(__name__)
# Note: PROJ/GDAL environment setup is done in main.py before any imports
class RasterDataModel:
"""Core model for raster data extraction and processing"""
def __init__(self):
"""Initialize raster data model"""
self.soil_databases_loaded = False
self.smu_df = None
self.wrb4_lookup = None
# Soil type classification mapping (0-33)
self.soil_classes = {
'Acrisols': 1, 'Alisols': 2, 'Andosols': 3, 'Arenosols': 4, 'Calcisols': 5,
'Cambisols': 6, 'Chernozems': 7, 'Ferralsols': 8, 'Fluvisols': 9, 'Gleysols': 10,
'Gypsisols': 11, 'Histosols': 12, 'Kastanozems': 13, 'Leptosols': 14, 'Lixisols': 15,
'Luvisols': 16, 'Nitisols': 17, 'Phaeozems': 18, 'Planosols': 19, 'Podzols': 20,
'Regosols': 21, 'Solonchaks': 22, 'Solonetz': 23, 'Vertisols': 24, 'Unknown': 0,
# Singular forms
'Acrisol': 1, 'Alisol': 2, 'Andosol': 3, 'Arenosol': 4, 'Calcisol': 5,
'Cambisol': 6, 'Chernozem': 7, 'Ferralsol': 8, 'Fluvisol': 9, 'Gleysol': 10,
'Gypsisol': 11, 'Histosol': 12, 'Kastanozem': 13, 'Leptosol': 14, 'Lixisol': 15,
'Luvisol': 16, 'Nitisol': 17, 'Phaeozem': 18, 'Planosol': 19, 'Podzol': 20,
'Regosol': 21, 'Solonchak': 22, 'Solonetz': 23, 'Vertisol': 24,
# Additional soil types
'Anthrosols': 25, 'Cryosols': 26, 'Durisols': 27, 'Ferrasols': 28, 'Plinthosols': 29,
'Retisols': 30, 'Stagnosols': 31, 'Technosols': 32, 'Umbrisols': 33
}
# Land cover classification mapping (0-21)
self.land_cover_classes = {
0: 0, # Unknown (NoData)
20: 1, # Shrubs
30: 2, # Herbaceous vegetation
40: 3, # Cropland
50: 4, # Urban / built up
60: 5, # Bare / sparse vegetation
70: 6, # Snow and ice
80: 7, # Permanent water bodies
90: 8, # Herbaceous wetland
100: 9, # Moss and lichen
111: 10, # Closed forest, evergreen needle leaf
112: 11, # Closed forest, evergreen broad leaf
113: 12, # Closed forest, deciduous needle leaf
114: 13, # Closed forest, deciduous broad leaf
115: 14, # Closed forest, mixed
116: 15, # Closed forest, unknown
121: 16, # Open forest, evergreen needle leaf
122: 17, # Open forest, evergreen broad leaf
123: 18, # Open forest, deciduous needle leaf
124: 19, # Open forest, deciduous broad leaf
125: 20, # Open forest, mixed
126: 21 # Open forest, unknown
}
def load_soil_databases(self, hwsd2_path: str, wrb4_path: str) -> bool:
"""Load HWSD2 SMU and WRB4 lookup tables"""
try:
self.smu_df = pd.read_excel(hwsd2_path, index_col='HWSD2_SMU_ID')
wrb4_df = pd.read_excel(wrb4_path)
self.wrb4_lookup = dict(zip(wrb4_df['CODE'], wrb4_df['VALUE']))
self.soil_databases_loaded = True
logger.info(f"Loaded {len(self.smu_df)} SMU records and {len(self.wrb4_lookup)} WRB4 codes")
return True
except Exception as e:
logger.error(f"Error loading soil databases: {e}")
self.soil_databases_loaded = False
return False
def encode_soil_class(self, soil_class_name: str) -> int:
"""Encode soil class name to integer (0-33)"""
return self.soil_classes.get(soil_class_name, 0)
def encode_land_cover(self, lc_value: int) -> int:
"""Encode Copernicus land cover classes (0-21)"""
return self.land_cover_classes.get(lc_value, 0)
def extract_soil_type(self, coords: List[Tuple[float, float]], raster_path: str) -> List[int]:
"""Extract soil type with database lookup"""
if not self.soil_databases_loaded:
logger.error("Soil databases not loaded")
return [0] * len(coords)
try:
with rasterio.open(raster_path) as src:
logger.debug(f"Soil Raster NoData: {src.nodata}")
soil_smus = [val[0] for val in src.sample(coords)]
results = []
for (lon, lat), soil_smu in zip(coords, soil_smus):
if soil_smu == 65535 or soil_smu == src.nodata or pd.isna(soil_smu):
results.append(0) # Unknown
logger.debug(f"NoData soil for lat={lat}, lon={lon}")
else:
try:
wrb4_code = self.smu_df.loc[int(soil_smu), 'WRB4']
if pd.isna(wrb4_code) or wrb4_code == '':
soil_class_name = 'Unknown'
else:
soil_class_name = self.wrb4_lookup.get(wrb4_code, 'Unknown')
# Extract main soil class (e.g., "Haplic Acrisols" -> "Acrisols")
soil_main = soil_class_name.split()[-1] if len(soil_class_name.split()) > 1 else soil_class_name
# Try main class first, then full name, then default to 0
soil_class_encoded = self.encode_soil_class(soil_main)
if soil_class_encoded == 0 and soil_main != soil_class_name:
soil_class_encoded = self.encode_soil_class(soil_class_name)
results.append(soil_class_encoded)
logger.debug(f"Got soil type {soil_class_name} (main: {soil_main}, code {soil_class_encoded}) for lat={lat}, lon={lon}")
except (KeyError, ValueError):
results.append(0) # Unknown
logger.debug(f"Missing soil data for SMU {soil_smu} at lat={lat}, lon={lon}")
return results
except Exception as e:
logger.error(f"Error in soil type extraction: {e}")
return [0] * len(coords)
def extract_elevation(self, coords: List[Tuple[float, float]], raster_path: str) -> List[float]:
"""Extract elevation in meters"""
try:
with rasterio.open(raster_path) as src:
logger.debug(f"Elevation Raster NoData: {src.nodata}")
elevations = [val[0] for val in src.sample(coords)]
results = []
for (lon, lat), elev in zip(coords, elevations):
if elev == src.nodata or pd.isna(elev):
results.append(-9999.0)
logger.debug(f"NoData elevation for lat={lat}, lon={lon}")
else:
# Convert numpy types to native Python float
elev_val = float(elev) if hasattr(elev, 'item') else float(elev)
results.append(round(elev_val, 2))
logger.debug(f"Got elevation {elev_val:.2f}m for lat={lat}, lon={lon}")
return results
except Exception as e:
logger.error(f"Error in elevation extraction: {e}")
return [-9999.0] * len(coords)
def extract_population_density(self, coords: List[Tuple[float, float]], raster_path: str) -> List[float]:
"""Extract population density in persons/km²"""
try:
with rasterio.open(raster_path) as src:
logger.debug(f"Population Raster NoData: {src.nodata}")
populations = [val[0] for val in src.sample(coords)]
results = []
for (lon, lat), pop in zip(coords, populations):
if pop == src.nodata or pd.isna(pop):
results.append(-9999.0)
logger.debug(f"NoData population for lat={lat}, lon={lon}")
else:
# Convert numpy types to native Python float
pop_val = float(pop) if hasattr(pop, 'item') else float(pop)
results.append(round(pop_val, 2))
logger.debug(f"Got population density {pop_val:.2f} persons/km² for lat={lat}, lon={lon}")
return results
except Exception as e:
logger.error(f"Error in population extraction: {e}")
return [-9999.0] * len(coords)
def extract_land_cover(self, coords: List[Tuple[float, float]], raster_path: str) -> List[int]:
"""Extract land cover classification"""
try:
with rasterio.open(raster_path) as src:
logger.debug(f"Land Cover Raster NoData: {src.nodata}")
landcovers = [val[0] for val in src.sample(coords)]
results = []
for (lon, lat), lc_code in zip(coords, landcovers):
if lc_code == src.nodata or pd.isna(lc_code) or lc_code not in self.land_cover_classes:
logger.debug(f"NoData or invalid land cover for lat={lat}, lon={lon}")
results.append(0) # Default to 0 (Unknown)
else:
lc_encoded = self.land_cover_classes[int(lc_code)]
logger.debug(f"Got land cover class {lc_encoded} (code: {lc_code}) for lat={lat}, lon={lon}")
results.append(lc_encoded)
return results
except Exception as e:
logger.error(f"Error in land cover extraction: {e}")
return [0] * len(coords)
def extract_ndvi(self, coords: List[Tuple[float, float]], raster_path: str) -> List[float]:
"""Extract NDVI with scaling factor /10000"""
try:
with rasterio.open(raster_path) as src:
logger.debug(f"NDVI Raster NoData: {src.nodata}")
ndvi_values = [val[0] for val in src.sample(coords)]
results = []
for (lon, lat), ndvi_val in zip(coords, ndvi_values):
if ndvi_val == -9999.0 or ndvi_val == src.nodata or pd.isna(ndvi_val):
results.append(-9999.0)
logger.debug(f"NoData NDVI for lat={lat}, lon={lon}")
else:
# Convert numpy types to native Python float
ndvi_raw = float(ndvi_val) if hasattr(ndvi_val, 'item') else float(ndvi_val)
scaled_ndvi = ndvi_raw / 10000.0
rounded_ndvi = round(scaled_ndvi, 4)
results.append(rounded_ndvi)
logger.debug(f"Got NDVI {rounded_ndvi} for lat={lat}, lon={lon}")
return results
except Exception as e:
logger.error(f"Error in NDVI extraction: {e}")
return [-9999.0] * len(coords)
def extract_annual_precipitation(self, coords: List[Tuple[float, float]], raster_path: str) -> List[int]:
"""Extract annual precipitation in mm"""
try:
with rasterio.open(raster_path) as src:
logger.debug(f"Precip Raster NoData: {src.nodata}")
precips = [val[0] for val in src.sample(coords)]
results = []
for (lon, lat), precip in zip(coords, precips):
if precip == src.nodata or pd.isna(precip):
results.append(-9999)
logger.debug(f"NoData precip for lat={lat}, lon={lon}")
else:
# Convert numpy types to native Python int
precip_val = float(precip) if hasattr(precip, 'item') else float(precip)
rounded_precip = int(round(precip_val, 0))
results.append(rounded_precip)
logger.debug(f"Got annual precip {rounded_precip} mm for lat={lat}, lon={lon}")
return results
except Exception as e:
logger.error(f"Error in precipitation extraction: {e}")
return [-9999] * len(coords)
def extract_annual_temperature(self, coords: List[Tuple[float, float]], raster_path: str) -> List[float]:
"""Extract annual mean temperature in °C"""
try:
with rasterio.open(raster_path) as src:
logger.debug(f"Temp Raster NoData: {src.nodata}")
temps = [val[0] for val in src.sample(coords)]
results = []
for (lon, lat), temp in zip(coords, temps):
if temp == src.nodata or pd.isna(temp):
results.append(-9999.0)
logger.debug(f"NoData temp for lat={lat}, lon={lon}")
else:
# Convert numpy types to native Python float
temp_val = float(temp) if hasattr(temp, 'item') else float(temp)
rounded_temp = round(temp_val, 1)
results.append(rounded_temp)
logger.debug(f"Got annual mean temp {rounded_temp} °C for lat={lat}, lon={lon}")
return results
except Exception as e:
logger.error(f"Error in temperature extraction: {e}")
return [-9999.0] * len(coords)
def extract_wind_speed(self, coords: List[Tuple[float, float]], raster_path: str) -> List[float]:
"""Extract mean wind speed in m/s"""
try:
with rasterio.open(raster_path) as src:
logger.debug(f"Wind Raster NoData: {src.nodata}")
winds = [val[0] for val in src.sample(coords)]
results = []
for (lon, lat), wind in zip(coords, winds):
if wind == src.nodata or pd.isna(wind):
results.append(-9999.0)
logger.debug(f"NoData wind for lat={lat}, lon={lon}")
else:
# Convert numpy types to native Python float
wind_val = float(wind) if hasattr(wind, 'item') else float(wind)
rounded_wind = round(wind_val, 2)
results.append(rounded_wind)
logger.debug(f"Got mean wind speed {rounded_wind} m/s for lat={lat}, lon={lon}")
return results
except Exception as e:
logger.error(f"Error in wind speed extraction: {e}")
return [-9999.0] * len(coords)
def extract_impervious_surface(self, coords: List[Tuple[float, float]], raster_path: str) -> List[float]:
"""Extract impervious surface percentage with CRS transformation"""
try:
# Check if raster file exists (skip check for URLs — rasterio handles them)
is_url = raster_path.startswith('http://') or raster_path.startswith('https://')
if not is_url and not os.path.exists(raster_path):
logger.error(f"Impervious surface raster file not found: {raster_path}")
return [-9999.0] * len(coords)
with rasterio.open(raster_path) as src:
logger.info(f"[IMPERVIOUS] Raster CRS: {src.crs}, NoData: {src.nodata}, dtype: {src.dtypes[0]}")
# Transform coordinates from EPSG:4326 to raster's CRS (Mollweide, ESRI:54009)
# Use pyproj.Transformer directly - more reliable than rasterio.warp.transform
# because pyproj manages its own proj.db path independently
lons = [lon for lon, lat in coords]
lats = [lat for lon, lat in coords]
try:
transformer = pyproj.Transformer.from_crs(
'EPSG:4326', src.crs.to_string(), always_xy=True
)
transformed_lons, transformed_lats = transformer.transform(lons, lats)
transformed_coords = list(zip(transformed_lons, transformed_lats))
for i, (lon, lat) in enumerate(coords):
logger.info(f"[IMPERVIOUS] CRS transform: ({lon}, {lat}) -> ({transformed_lons[i]:.1f}, {transformed_lats[i]:.1f})")
except Exception as transform_error:
logger.error(f"Coordinate transformation failed: {transform_error}")
return [-9999.0] * len(coords)
impervs = [val[0] for val in src.sample(transformed_coords)]
results = []
for (lon, lat), imperv in zip(coords, impervs):
if imperv == src.nodata or pd.isna(imperv):
results.append(-9999.0) # Standard NoData for impervious
logger.info(f"[IMPERVIOUS] NoData (={src.nodata}) for lat={lat}, lon={lon}")
else:
# Convert numpy types to native Python float
imperv_val = float(imperv) if hasattr(imperv, 'item') else float(imperv)
# Apply scaling factor for GHSL (divide by 100)
scaled_imperv = imperv_val / 100.0
# Round to 2 decimal places (percentage)
rounded_imperv = round(scaled_imperv, 2)
results.append(rounded_imperv)
logger.info(f"[IMPERVIOUS] lat={lat}, lon={lon} -> raw={int(imperv_val)}, scaled={rounded_imperv}%")
return results
except rasterio.errors.RasterioIOError as io_error:
logger.error(f"Rasterio I/O error in impervious surface extraction: {io_error}")
return [-9999.0] * len(coords)
except Exception as e:
logger.error(f"Error in impervious surface extraction: {e}")
logger.error(f"Raster path: {raster_path}")
return [-9999.0] * len(coords)
def extract_all_features(self, coords: List[Tuple[float, float]], raster_paths: Dict[str, str]) -> Dict[str, List[Any]]:
"""Extract all 9 raster features in a single operation"""
logger.info(f"Extracting all raster features for {len(coords)} coordinates")
# Setup PROJ paths for all raster operations (handles Flask reloader)
# Use environment variables set by main.py, with fallback auto-detection
from pathlib import Path
proj_lib = os.environ.get('PROJ_LIB', '')
gdal_data = os.environ.get('GDAL_DATA', '')
# Fallback: try common locations if env vars are not set (cross-platform)
if not proj_lib:
candidates = []
try:
import rasterio as _rio
candidates.append(Path(_rio.__file__).parent / "proj_data")
except ImportError:
pass
try:
import pyproj as _pp
candidates.append(Path(_pp.datadir.get_data_dir()))
except (ImportError, AttributeError):
pass
candidates.extend([Path("/usr/share/proj"), Path("/usr/local/share/proj")])
for c in candidates:
if c.exists() and (c / "proj.db").exists():
proj_lib = str(c)
break
if not gdal_data:
candidates = []
try:
from osgeo import gdal as _gdal
candidates.append(Path(_gdal.__file__).parent / "data" / "gdal")
candidates.append(Path(_gdal.__file__).parent / "data")
except ImportError:
pass
candidates.extend([Path("/usr/share/gdal"), Path("/usr/local/share/gdal")])
for c in candidates:
if c.exists():
gdal_data = str(c)
break
# Suppress noisy PROJ "Cannot find proj.db" warnings from rasterio/GDAL
# These are harmless for rasters that don't need CRS transformation
rasterio_logger = logging.getLogger('rasterio._env')
original_level = rasterio_logger.level
rasterio_logger.setLevel(logging.CRITICAL)
try:
# Wrap ALL raster operations in PROJ environment
env_kwargs = {'PROJ_IGNORE_CELESTIAL_BODY': '1'}
if proj_lib:
env_kwargs['PROJ_LIB'] = proj_lib
if gdal_data:
env_kwargs['GDAL_DATA'] = gdal_data
with rasterio.Env(**env_kwargs):
return self._extract_all_features_internal(coords, raster_paths)
finally:
rasterio_logger.setLevel(original_level)
def _extract_all_features_internal(self, coords: List[Tuple[float, float]], raster_paths: Dict[str, str]) -> Dict[str, List[Any]]:
"""Internal method for feature extraction (called within PROJ environment)"""
results = {}
# Extract soil type
if 'soil' in raster_paths and self.soil_databases_loaded:
results['soil_type'] = self.extract_soil_type(coords, raster_paths['soil'])
else:
results['soil_type'] = [0] * len(coords)
logger.warning("Soil data not available or databases not loaded")
# Extract elevation
if 'elevation' in raster_paths:
results['elevation_m'] = self.extract_elevation(coords, raster_paths['elevation'])
else:
results['elevation_m'] = [-9999.0] * len(coords)
logger.warning("Elevation data not available")
# Extract population density
if 'population' in raster_paths:
results['pop_density_persqkm'] = self.extract_population_density(coords, raster_paths['population'])
else:
results['pop_density_persqkm'] = [-9999.0] * len(coords)
logger.warning("Population data not available")
# Extract land cover
if 'landcover' in raster_paths:
results['land_cover_class'] = self.extract_land_cover(coords, raster_paths['landcover'])
else:
results['land_cover_class'] = [0] * len(coords)
logger.warning("Land cover data not available")
# Extract NDVI
if 'ndvi' in raster_paths:
results['ndvi'] = self.extract_ndvi(coords, raster_paths['ndvi'])
else:
results['ndvi'] = [-9999.0] * len(coords)
logger.warning("NDVI data not available")
# Extract annual precipitation
if 'precip' in raster_paths:
results['annual_precip_mm'] = self.extract_annual_precipitation(coords, raster_paths['precip'])
else:
results['annual_precip_mm'] = [-9999] * len(coords)
logger.warning("Precipitation data not available")
# Extract annual temperature
if 'temp' in raster_paths:
results['annual_mean_temp_c'] = self.extract_annual_temperature(coords, raster_paths['temp'])
else:
results['annual_mean_temp_c'] = [-9999.0] * len(coords)
logger.warning("Temperature data not available")
# Extract wind speed
if 'wind' in raster_paths:
results['mean_wind_speed_ms'] = self.extract_wind_speed(coords, raster_paths['wind'])
else:
results['mean_wind_speed_ms'] = [-9999.0] * len(coords)
logger.warning("Wind data not available")
# Extract impervious surface
if 'impervious' in raster_paths:
results['impervious_surface_pct'] = self.extract_impervious_surface(coords, raster_paths['impervious'])
else:
results['impervious_surface_pct'] = [-9999.0] * len(coords)
logger.warning("Impervious surface data not available")
logger.info(f"Successfully extracted all raster features for {len(coords)} coordinates")
return results
def validate_coordinates(self, coords: List[Tuple[float, float]]) -> bool:
"""Validate coordinate format and ranges"""
try:
for lon, lat in coords:
# Check if coordinates are numeric
if not isinstance(lon, (int, float)) or not isinstance(lat, (int, float)):
return False
# Check coordinate ranges
if not (-180 <= lon <= 180) or not (-90 <= lat <= 90):
return False
return True
except Exception:
return False
def get_feature_info(self) -> Dict[str, Any]:
"""Get information about available raster features"""
return {
'features': {
'soil_type': {
'description': 'Soil classification (HWSD2)',
'range': '0-33 (encoded classes)',
'classes': len(self.soil_classes),
'unit': 'categorical'
},
'elevation_m': {
'description': 'Elevation above sea level',
'range': 'varies by location',
'unit': 'meters'
},
'pop_density_persqkm': {
'description': 'Population density',
'range': '0-∞',
'unit': 'persons/km²'
},
'land_cover_class': {
'description': 'Land cover classification (Copernicus)',
'range': '0-21 (encoded classes)',
'classes': len(self.land_cover_classes),
'unit': 'categorical'
},
'ndvi': {
'description': 'Normalized Difference Vegetation Index',
'range': '-1.0 to 1.0',
'unit': 'index'
},
'annual_precip_mm': {
'description': 'Annual precipitation',
'range': '0-∞',
'unit': 'mm/year'
},
'annual_mean_temp_c': {
'description': 'Annual mean temperature',
'range': 'varies by location',
'unit': '°C'
},
'mean_wind_speed_ms': {
'description': 'Mean wind speed',
'range': '0-∞',
'unit': 'm/s'
},
'impervious_surface_pct': {
'description': 'Impervious surface coverage',
'range': '0-100',
'unit': 'percentage'
}
},
'total_features': 9,
'nodata_values': {
'numeric': -9999.0,
'categorical': 0
},
'coordinate_system': 'EPSG:4326 (WGS84)',
'soil_databases_loaded': self.soil_databases_loaded
}