from pydantic import BaseModel, Field from typing import List, Dict, Optional, Any import requests import os import gzip import numpy as np from PIL import Image import struct from pathlib import Path import rasterio from rasterio.transform import from_origin from datetime import datetime, timedelta from dateutil.relativedelta import relativedelta import shutil from tqdm import tqdm import osmnx as ox import geopandas as gpd import pandas as pd from shapely.geometry import box from scipy.ndimage import distance_transform_edt from rasterio.transform import from_bounds from rasterio.crs import CRS from geopy.geocoders import Nominatim from whitebox import WhiteboxTools from dotenv import load_dotenv load_dotenv() class State(BaseModel): """Enhanced state model for geospatial analysis workflow""" query: str = Field(description="The initial query sent by the user") tasks: List[str] = Field(default=[], description="Detailed breakdown of the tasks") output_files: List[Dict[str, str]] = Field(default=[], description="Generated files with metadata") bbox: Optional[List[float]] = Field(default=None, description="Bounding box [minx, miny, maxx, maxy]") place_name: Optional[str] = Field(default=None, description="Place name for analysis") working_directory: str = Field(default="output", description="Working directory for outputs") analysis_type: Optional[str] = Field(default=None, description="Type of analysis (flood, slope, etc.)") parameters: Dict[str, Any] = Field(default={}, description="Analysis parameters") error_log: List[str] = Field(default=[], description="Error messages during processing") status: str = Field(default="initialized", description="Current processing status") # Initialize tools geolocator = Nominatim(user_agent="lulc-retriever") wbt = WhiteboxTools() wbt.set_verbose_mode(True) wbt.set_compress_rasters(False) def get_bbox(place): """Get bounding box for a place name""" location = geolocator.geocode(place) if location is None: raise ValueError(f"Could not geocode location: {place}") lat, lon = location.latitude, location.longitude buffer = 0.1 # degrees (~10km) return (lon - buffer, lat - buffer, lon + buffer, lat + buffer) def download_srtm_hgt(lat, lon, output_dir): """Download SRTM HGT file""" if not os.path.exists(output_dir): os.makedirs(output_dir) # Format tile name lat_str = f"N{lat:02d}" if lat >= 0 else f"S{abs(lat):02d}" lon_str = f"E{lon:03d}" if lon >= 0 else f"W{abs(lon):03d}" tile_name = f"{lat_str}{lon_str}.hgt" url = f"https://s3.amazonaws.com/elevation-tiles-prod/skadi/{lat_str}/{tile_name}.gz" output_path = os.path.join(output_dir, tile_name) if os.path.exists(output_path): return output_path try: print(f"Downloading {tile_name}...") response = requests.get(url, stream=True) response.raise_for_status() gz_path = output_path + ".gz" with open(gz_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) with gzip.open(gz_path, 'rb') as f_in: with open(output_path, 'wb') as f_out: f_out.write(f_in.read()) os.remove(gz_path) print(f"โœ… Downloaded: {tile_name}") return output_path except Exception as e: print(f"โŒ Failed to download {tile_name}: {e}") return None def read_hgt_file(hgt_file): """Read HGT file and return elevation data with georeferencing""" # Get file size to determine format file_size = os.path.getsize(hgt_file) if file_size == 1201 * 1201 * 2: # SRTM1 size = 1201 elif file_size == 3601 * 3601 * 2: # SRTM3 size = 3601 else: # Calculate size pixels = file_size // 2 size = int(np.sqrt(pixels)) print(f"Auto-detected size: {size}x{size}") # Extract coordinates from filename basename = os.path.basename(hgt_file) lat_str = basename[:3] lon_str = basename[3:7] if lat_str.startswith('N'): lat = int(lat_str[1:]) else: lat = -int(lat_str[1:]) if lon_str.startswith('E'): lon = int(lon_str[1:]) else: lon = -int(lon_str[1:]) # Read elevation data with open(hgt_file, 'rb') as f: data = f.read() # Convert to numpy array (big-endian signed 16-bit) elevation_data = np.frombuffer(data, dtype='>i2').reshape(size, size) # Calculate pixel size pixel_size = 1.0 / (size - 1) # Georeferencing info geotransform = [ lon, # Top-left X pixel_size, # X pixel size 0, # X rotation lat + 1, # Top-left Y 0, # Y rotation -pixel_size # Y pixel size (negative because Y decreases) ] return elevation_data, geotransform, size def clip_elevation_data(elevation_data, geotransform, size, bbox): """Clip elevation data to bounding box""" west, south, east, north = bbox # Calculate pixel coordinates top_left_x = geotransform[0] top_left_y = geotransform[3] pixel_size_x = geotransform[1] pixel_size_y = geotransform[5] # This is negative # Convert geographic coordinates to pixel coordinates x1 = int((west - top_left_x) / pixel_size_x) y1 = int((top_left_y - north) / abs(pixel_size_y)) x2 = int((east - top_left_x) / pixel_size_x) y2 = int((top_left_y - south) / abs(pixel_size_y)) # Ensure coordinates are within bounds x1 = max(0, min(x1, size - 1)) y1 = max(0, min(y1, size - 1)) x2 = max(0, min(x2, size - 1)) y2 = max(0, min(y2, size - 1)) # Clip the data clipped_data = elevation_data[y1:y2+1, x1:x2+1] # Update geotransform for clipped data new_geotransform = [ top_left_x + x1 * pixel_size_x, # New top-left X pixel_size_x, # X pixel size 0, # X rotation top_left_y + y1 * pixel_size_y, # New top-left Y 0, # Y rotation pixel_size_y # Y pixel size ] return clipped_data, new_geotransform def save_as_geotiff_basic(elevation_data, geotransform, output_file): """Save elevation data as a basic GeoTIFF (requires PIL)""" # Convert to unsigned 16-bit (adding offset to handle negative values) min_val = np.min(elevation_data) if min_val < 0: # Add offset to make all values positive offset = abs(min_val) adjusted_data = elevation_data + offset else: offset = 0 adjusted_data = elevation_data # Convert to uint16 adjusted_data = adjusted_data.astype(np.uint16) # Save as TIFF image = Image.fromarray(adjusted_data, mode='I;16') image.save(output_file) # Save metadata separately metadata_file = output_file.replace('.tif', '_metadata.txt') with open(metadata_file, 'w') as f: f.write(f"GeoTransform: {geotransform}\n") f.write(f"Offset: {offset}\n") f.write(f"Original min value: {min_val}\n") f.write(f"Size: {adjusted_data.shape}\n") return output_file, metadata_file def get_dem_elevation_tif(state: State) -> State: """ Download DEM data and save as TIF format in dem_files subdirectory """ try: state.status = "downloading_dem" # Validate required fields if not state.bbox: state.error_log.append("Bounding box is required for DEM download") state.status = "error" return state if not state.place_name: state.error_log.append("Place name is required for DEM download") state.status = "error" return state # Create working & sub-directories working_dir = Path(state.working_directory) dem_tiles_dir = working_dir / "dem_files" / "dem_tiles" dem_files_dir = working_dir / "dem_files" working_dir.mkdir(parents=True, exist_ok=True) dem_tiles_dir.mkdir(parents=True, exist_ok=True) dem_files_dir.mkdir(parents=True, exist_ok=True) state.parameters["dem_directory"] = str(dem_files_dir.resolve()) west, south, east, north = state.bbox place_safe = state.place_name.replace(" ", "_").replace(",", "").replace(".", "") output_file = dem_files_dir / f"{place_safe}_dem.tif" print(f"๐Ÿš€ Starting DEM download for {state.place_name}...") print(f"๐Ÿ“ Bounding box: {state.bbox}") print(f"๐Ÿ“ Output directory: {dem_files_dir}") lat_range = range(int(south), int(north) + 1) lon_range = range(int(west), int(east) + 1) all_elevation_data = [] all_geotransforms = [] downloaded_tiles = [] for lat in lat_range: for lon in lon_range: hgt_file = download_srtm_hgt(lat, lon, str(dem_tiles_dir)) if hgt_file: try: elevation_data, geotransform, size = read_hgt_file(hgt_file) clipped_data, clipped_geotransform = clip_elevation_data( elevation_data, geotransform, size, state.bbox ) all_elevation_data.append(clipped_data) all_geotransforms.append(clipped_geotransform) downloaded_tiles.append(os.path.basename(hgt_file)) print(f"โœ… Processed {os.path.basename(hgt_file)}: {clipped_data.shape}") except Exception as e: err = f"Error processing {hgt_file}: {e}" state.error_log.append(err) print(f"โŒ {err}") if not all_elevation_data: state.error_log.append("No elevation data processed successfully") state.status = "error" return state print(f"\n๐Ÿ”„ Processing {len(all_elevation_data)} elevation tiles...") if len(all_elevation_data) > 1: print("โš ๏ธ Multiple tiles detected. Using first tile only (mosaicking not implemented).") final_data = all_elevation_data[0] final_geotransform = all_geotransforms[0] tif_file, metadata_file = save_as_geotiff_basic( final_data, final_geotransform, str(output_file) ) # Update DEM with proper CRS update_dem(tif_file, state) min_elev = float(np.min(final_data)) max_elev = float(np.max(final_data)) mean_elev = float(np.mean(final_data)) shape = final_data.shape state.output_files.append({ "type": "dem", "format": "geotiff", "file_path": str(tif_file), "metadata_file": str(metadata_file), "min_elevation": min_elev, "max_elevation": max_elev, "mean_elevation": mean_elev, "data_shape": shape, "downloaded_tiles": downloaded_tiles, "bbox": state.bbox, "geotransform": final_geotransform }) state.status = "dem_downloaded" print(f"\n๐ŸŽฏ Success! DEM saved to: {tif_file}") print(f"๐Ÿ“Š Elevation stats: Min={min_elev}, Max={max_elev}, Mean={mean_elev:.1f} m") print(f"๐Ÿ“ Data size: {shape}") return state except Exception as e: state.error_log.append(f"Unhandled error during DEM download: {e}") state.status = "error" print(f"โŒ {e}") return state def update_dem(filepath, state): """Update DEM with proper CRS and transform""" input_path = filepath output_path = filepath crs = "EPSG:4326" # WGS84 Latitude/Longitude transform = from_origin( state.bbox[0], state.bbox[3], # Use north boundary 0.0008333, # pixel width (approx 30m resolution) 0.0008333 # pixel height (approx 30m resolution) ) with rasterio.open(input_path) as src: profile = src.profile data = src.read(1) profile.update({ 'crs': crs, 'transform': transform }) with rasterio.open(output_path, 'w', **profile) as dst: dst.write(data, 1) def download_chirps_tif(date: datetime, out_dir): """Download CHIRPS precipitation data""" y, m, d = date.strftime("%Y"), date.strftime("%m"), date.strftime("%d") filename = f"chirps-v2.0.{y}.{m}.{d}.tif" url = f"https://data.chc.ucsb.edu/products/CHIRPS-2.0/global_daily/tifs/p25/{y}/{filename}.gz" gz_path = os.path.join(out_dir, filename + ".gz") tif_path = os.path.join(out_dir, filename) if os.path.exists(tif_path): print(f"โœ… Already downloaded: {filename}") return tif_path os.makedirs(out_dir, exist_ok=True) r = requests.get(url, stream=True) if r.status_code != 200: print(f"โŒ Failed: {url}") return None with open(gz_path, "wb") as f: for chunk in r.iter_content(chunk_size=1024): if chunk: f.write(chunk) with gzip.open(gz_path, "rb") as f_in, open(tif_path, "wb") as f_out: shutil.copyfileobj(f_in, f_out) os.remove(gz_path) print(f"โœ… Downloaded and extracted: {tif_path}") return tif_path def batch_download_chirps(start_date: str, end_date: str, out_dir): """Batch download CHIRPS data""" start = datetime.strptime(start_date, "%Y-%m-%d") end = datetime.strptime(end_date, "%Y-%m-%d") current = start today = datetime.utcnow().date() max_available = today - timedelta(days=3) while current <= end: if current.date() > max_available: print(f"โš ๏ธ Skipping future/unavailable date: {current.strftime('%Y-%m-%d')}") else: download_chirps_tif(current, out_dir) current += timedelta(days=1) def get_rainfall_data(state: State) -> State: """ Download rainfall data and save in rainfall_data subdirectory """ try: state.status = "downloading_rainfall" # Create rainfall data directory working_dir = Path(state.working_directory) rainfall_dir = working_dir / "rainfall_data" rainfall_dir.mkdir(parents=True, exist_ok=True) print("๐ŸŒง๏ธ Fetching rainfall data from same timeframe last year...") today = datetime.today() # Start: (today - 1 year - 7 days) start_dt = (today - relativedelta(years=1)) - timedelta(days=7) # End: (today - 1 year) end_dt = today - relativedelta(years=1) start_date = start_dt.strftime('%Y-%m-%d') end_date = end_dt.strftime('%Y-%m-%d') print(f"๐Ÿ“… Start Date: {start_date}") print(f"๐Ÿ“… End Date: {end_date}") batch_download_chirps(start_date, end_date, str(rainfall_dir)) # Count downloaded files downloaded_files = list(rainfall_dir.glob("*.tif")) state.output_files.append({ "type": "rainfall", "format": "geotiff", "directory": str(rainfall_dir), "file_count": len(downloaded_files), "date_range": f"{start_date} to {end_date}", "files": [str(f) for f in downloaded_files] }) state.status = "rainfall_downloaded" print(f"โœ… Downloaded {len(downloaded_files)} rainfall files to {rainfall_dir}") return state except Exception as e: state.error_log.append(f"Error downloading rainfall data: {e}") state.status = "error" print(f"โŒ {e}") return state def run_hydrology_generator(state: State) -> State: """ Run hydrological analysis and save outputs in hydrology_outputs subdirectory """ try: state.status = "running_hydrology" # Get DEM file from state dem_file = None for output in state.output_files: if output.get("type") == "dem": dem_file = output.get("file_path") break if not dem_file: state.error_log.append("No DEM file found in state for hydrology analysis") state.status = "error" return state # Create hydrology outputs directory working_dir = Path(state.working_directory) hydrology_dir = working_dir / "hydrology_outputs" hydrology_dir.mkdir(parents=True, exist_ok=True) # Ensure DEM exists dem_path = Path(dem_file) if not dem_path.exists(): state.error_log.append(f"DEM file not found at {dem_path}") state.status = "error" return state # Define output paths filled_dem = hydrology_dir / "dem_filled.tif" flow_pointer = hydrology_dir / "flow_dir.tif" flow_accum = hydrology_dir / "flow_acc.tif" stream_raster = hydrology_dir / "streams.tif" slope_path = hydrology_dir / "slope.tif" aspect_path = hydrology_dir / "aspect.tif" print(f"๐Ÿ“ Hydrology output directory: {hydrology_dir}") print("๐Ÿš€ Starting hydrological analysis...") print("๐Ÿ“ Generating Slope...") wbt.slope(dem=str(dem_path), output=str(slope_path), zfactor=1.0) if not slope_path.exists(): raise Exception("Slope file not generated") print("๐Ÿงญ Generating Aspect...") wbt.aspect(dem=str(dem_path), output=str(aspect_path)) if not aspect_path.exists(): raise Exception("Aspect file not generated") print("๐Ÿ“ฅ Running Fill Depressions...") wbt.fill_depressions(dem=str(dem_path), output=str(filled_dem)) if not filled_dem.exists(): raise Exception("Filled DEM not generated") print("๐Ÿ“ˆ Calculating Flow Direction...") wbt.d8_pointer(dem=str(filled_dem), output=str(flow_pointer)) if not flow_pointer.exists(): raise Exception("Flow direction file not generated") print("๐ŸŒŠ Flow Accumulation...") wbt.d8_flow_accumulation(i=str(filled_dem), output=str(flow_accum), out_type="cells") if not flow_accum.exists(): raise Exception("Flow accumulation file not generated") print("๐Ÿงต Extracting Streams...") wbt.extract_streams(flow_accum=str(flow_accum), output=str(stream_raster), threshold=100) if not stream_raster.exists(): raise Exception("Stream raster not generated") hydrology_outputs = { "filled_dem": str(filled_dem), "flow_dir": str(flow_pointer), "flow_acc": str(flow_accum), "streams": str(stream_raster), "slope": str(slope_path), "aspect": str(aspect_path) } state.output_files.append({ "type": "hydrology", "format": "geotiff", "directory": str(hydrology_dir), "outputs": hydrology_outputs }) state.status = "hydrology_completed" print("โœ… All hydrological outputs generated successfully.") return state except Exception as e: state.error_log.append(f"Error in hydrology analysis: {e}") state.status = "error" print(f"โŒ {e}") return state def tidal_risk_from_osm(state: State) -> State: """ Generate tidal risk zones and save in tidal_risk subdirectory """ try: state.status = "generating_tidal_risk" # Create tidal risk directory working_dir = Path(state.working_directory) tidal_dir = working_dir / "tidal_risk" tidal_dir.mkdir(parents=True, exist_ok=True) place = state.place_name buffer_dist = 1000 # meters output_geojson = tidal_dir / f"{place.replace(' ', '_').replace(',', '')}_tidal_risk.geojson" print(f"๐ŸŒ Fetching OSM water + coastline for {place}") # Get coastlines and water coast = ox.features_from_place(place, tags={"natural": "coastline"}) water = ox.features_from_place(place, tags={"natural": "water"}) # Combine and buffer coast = coast.to_crs("EPSG:3857") water = water.to_crs("EPSG:3857") combined = gpd.GeoDataFrame(pd.concat([coast, water], ignore_index=True), crs=coast.crs) print(f"๐Ÿงฑ Found {len(combined)} features. Buffering...") risk_zone = combined.buffer(buffer_dist) risk_gdf = gpd.GeoDataFrame(geometry=risk_zone, crs="EPSG:3857").dissolve() risk_gdf = risk_gdf.to_crs("EPSG:4326") # Save as GeoJSON risk_gdf.to_file(output_geojson, driver="GeoJSON") state.output_files.append({ "type": "tidal_risk", "format": "geojson", "file_path": str(output_geojson), "buffer_distance": buffer_dist, "feature_count": len(combined) }) state.status = "tidal_risk_completed" print(f"โœ… Saved Tidal Risk GeoJSON: {output_geojson}") return state except Exception as e: state.error_log.append(f"Error generating tidal risk: {e}") state.status = "error" print(f"โŒ {e}") return state def get_healthcare_data(bbox, tags): """Fetch healthcare data from OSM""" minx, miny, maxx, maxy = bbox polygon = box(minx, miny, maxx, maxy) gdf = ox.features_from_polygon(polygon, tags=tags) gdf = gdf.to_crs("EPSG:4326") gdf["geometry"] = gdf.centroid return gdf def rasterize_healthcare_points(bbox, points_gdf, pixel_size=0.0005): """Rasterize healthcare points over a bounding box""" minx, miny, maxx, maxy = bbox width = int((maxx - minx) / pixel_size) height = int((maxy - miny) / pixel_size) transform = from_bounds(minx, miny, maxx, maxy, width, height) raster = np.zeros((height, width), dtype=np.uint8) for point in points_gdf.geometry: col, row = ~transform * (point.x, point.y) col, row = int(col), int(row) if 0 <= row < height and 0 <= col < width: raster[row, col] = 1 return raster, transform def compute_distance_transform(binary_raster, pixel_size_deg): """Compute Euclidean distance in meters from healthcare locations""" binary_mask = (binary_raster == 0).astype(np.uint8) distance_pixels = distance_transform_edt(binary_mask) distance_meters = distance_pixels * (111000 * pixel_size_deg) return distance_meters def save_distance_raster(distance_raster, transform, output_path, crs="EPSG:4326"): """Save distance raster to GeoTIFF""" with rasterio.open( output_path, "w", driver="GTiff", height=distance_raster.shape[0], width=distance_raster.shape[1], count=1, dtype=distance_raster.dtype, crs=CRS.from_string(crs), transform=transform, ) as dst: dst.write(distance_raster, 1) def generate_distance_to_healthcare(state: State) -> State: """ Generate distance raster to healthcare facilities in healthcare_analysis subdirectory """ try: state.status = "generating_healthcare_distance" # Create healthcare analysis directory working_dir = Path(state.working_directory) healthcare_dir = working_dir / "healthcare_analysis" healthcare_dir.mkdir(parents=True, exist_ok=True) output_path = healthcare_dir / "distance_to_healthcare.tif" print("๐Ÿ” Fetching healthcare data from OpenStreetMap...") tags = {"amenity": ["hospital", "clinic", "doctors", "pharmacy"]} healthcare_gdf = get_healthcare_data(state.bbox, tags) print(f"๐Ÿ—บ Rasterizing {len(healthcare_gdf)} healthcare points...") pixel_size = 0.0005 binary_raster, transform = rasterize_healthcare_points(state.bbox, healthcare_gdf, pixel_size) print("๐Ÿ“ Computing distance transform...") distance_raster = compute_distance_transform(binary_raster, pixel_size) print(f"๐Ÿ’พ Saving to {output_path}...") save_distance_raster(distance_raster, transform, str(output_path)) state.output_files.append({ "type": "healthcare_distance", "format": "geotiff", "file_path": str(output_path), "healthcare_count": len(healthcare_gdf), "pixel_size": pixel_size }) state.status = "healthcare_distance_completed" print("โœ… Done! Distance raster generated.") return state except Exception as e: state.error_log.append(f"Error generating healthcare distance: {e}") state.status = "error" print(f"โŒ {e}") return state import osmnx as ox import geopandas as gpd from shapely.geometry import box import numpy as np import rasterio from rasterio.transform import from_bounds from rasterio.crs import CRS from scipy.ndimage import distance_transform_edt def get_infrastructure_gdf(bbox, tags): """Fetch infrastructure data using OSM.""" ox.settings.overpass_endpoint = "https://overpass.kumi.systems/api/interpreter" ox.settings.timeout = 60 polygon = box(*bbox) gdf = ox.features_from_polygon(polygon, tags=tags) gdf = gdf.to_crs("EPSG:4326") gdf["geometry"] = gdf.centroid return gdf def rasterize_points(gdf, bbox, pixel_size=0.0005): minx, miny, maxx, maxy = bbox width = int((maxx - minx) / pixel_size) height = int((maxy - miny) / pixel_size) transform = from_bounds(minx, miny, maxx, maxy, width, height) raster = np.zeros((height, width), dtype=np.uint8) for point in gdf.geometry: col, row = ~transform * (point.x, point.y) col, row = int(col), int(row) if 0 <= row < height and 0 <= col < width: raster[row, col] = 1 return raster, transform def save_raster(raster, transform, output_path, crs="EPSG:4326"): with rasterio.open( output_path, "w", driver="GTiff", height=raster.shape[0], width=raster.shape[1], count=1, dtype=raster.dtype, crs=CRS.from_string(crs), transform=transform, ) as dst: dst.write(raster, 1) def generate_infrastructure_tif(bbox, output_path="infrastructure.tif", pixel_size=0.0005, distance=False): """ Generate a binary or distance-based infrastructure raster. """ # Define infrastructure tags to fetch tags = { "highway": True, "building": True, "bridge": True, "railway": True } print("๐Ÿ” Fetching infrastructure data...") gdf = get_infrastructure_gdf(bbox, tags) print(f"๐Ÿ—บ Rasterizing {len(gdf)} points...") raster, transform = rasterize_points(gdf, bbox, pixel_size) if distance: print("๐Ÿ“ Computing distance transform...") mask = (raster == 0).astype(np.uint8) raster = distance_transform_edt(mask) * (111000 * pixel_size) # meters print(f"๐Ÿ’พ Saving raster to {output_path}...") save_raster(raster, transform, output_path) print("โœ… Done.") def get_infrastructure(state:State): generate_infrastructure_tif(state.bbox) section_breakdown_template = '''You are a Geospatial AI Agent with expertise in environmental modeling, GIS, and spatial data processing. Your job is to break down high-level geospatial analysis queries into a structured list of tasks, with clear descriptions and suggested tools (if any). Each task must be: Self-contained and descriptive Ordered for execution Mapped to an appropriate tool (if known) Ready to be passed to a task executor agent ๐Ÿ—‚๏ธ Input Example User Goal: Analyze flood vulnerability for Chennai using DEM, rainfall, and infrastructure data. Tools available: get_dem_elevation_tif: Downloads DEM data which is base for any task run_hydrolysis_tool: Computes slope, flow direction, stream network get_rainfall_data: Retrieves rainfall from satellite or IMD data OSM_retriever: Downloads Infrastructure,road data for the given place visualize_geospatial_file: Creates maps from raster/vector layers llm: Used for reasoning, summarization, or decision-making '''