Spaces:
Sleeping
Sleeping
| from pydantic import BaseModel, Field | |
| from typing import List, Dict, Optional, Any | |
| import requests | |
| import os | |
| import gzip | |
| import numpy as np | |
| from PIL import Image | |
| import struct | |
| from pathlib import Path | |
| import rasterio | |
| from rasterio.transform import from_origin | |
| from datetime import datetime, timedelta | |
| from dateutil.relativedelta import relativedelta | |
| import shutil | |
| from tqdm import tqdm | |
| import osmnx as ox | |
| import geopandas as gpd | |
| import pandas as pd | |
| from shapely.geometry import box | |
| from scipy.ndimage import distance_transform_edt | |
| from rasterio.transform import from_bounds | |
| from rasterio.crs import CRS | |
| from geopy.geocoders import Nominatim | |
| from whitebox import WhiteboxTools | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| class State(BaseModel): | |
| """Enhanced state model for geospatial analysis workflow""" | |
| query: str = Field(description="The initial query sent by the user") | |
| tasks: List[str] = Field(default=[], description="Detailed breakdown of the tasks") | |
| output_files: List[Dict[str, str]] = Field(default=[], description="Generated files with metadata") | |
| bbox: Optional[List[float]] = Field(default=None, description="Bounding box [minx, miny, maxx, maxy]") | |
| place_name: Optional[str] = Field(default=None, description="Place name for analysis") | |
| working_directory: str = Field(default="output", description="Working directory for outputs") | |
| analysis_type: Optional[str] = Field(default=None, description="Type of analysis (flood, slope, etc.)") | |
| parameters: Dict[str, Any] = Field(default={}, description="Analysis parameters") | |
| error_log: List[str] = Field(default=[], description="Error messages during processing") | |
| status: str = Field(default="initialized", description="Current processing status") | |
| # Initialize tools | |
| geolocator = Nominatim(user_agent="lulc-retriever") | |
| wbt = WhiteboxTools() | |
| wbt.set_verbose_mode(True) | |
| wbt.set_compress_rasters(False) | |
| def get_bbox(place): | |
| """Get bounding box for a place name""" | |
| location = geolocator.geocode(place) | |
| if location is None: | |
| raise ValueError(f"Could not geocode location: {place}") | |
| lat, lon = location.latitude, location.longitude | |
| buffer = 0.1 # degrees (~10km) | |
| return (lon - buffer, lat - buffer, lon + buffer, lat + buffer) | |
| def download_srtm_hgt(lat, lon, output_dir): | |
| """Download SRTM HGT file""" | |
| if not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| # Format tile name | |
| lat_str = f"N{lat:02d}" if lat >= 0 else f"S{abs(lat):02d}" | |
| lon_str = f"E{lon:03d}" if lon >= 0 else f"W{abs(lon):03d}" | |
| tile_name = f"{lat_str}{lon_str}.hgt" | |
| url = f"https://s3.amazonaws.com/elevation-tiles-prod/skadi/{lat_str}/{tile_name}.gz" | |
| output_path = os.path.join(output_dir, tile_name) | |
| if os.path.exists(output_path): | |
| return output_path | |
| try: | |
| print(f"Downloading {tile_name}...") | |
| response = requests.get(url, stream=True) | |
| response.raise_for_status() | |
| gz_path = output_path + ".gz" | |
| with open(gz_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| with gzip.open(gz_path, 'rb') as f_in: | |
| with open(output_path, 'wb') as f_out: | |
| f_out.write(f_in.read()) | |
| os.remove(gz_path) | |
| print(f"β Downloaded: {tile_name}") | |
| return output_path | |
| except Exception as e: | |
| print(f"β Failed to download {tile_name}: {e}") | |
| return None | |
| def read_hgt_file(hgt_file): | |
| """Read HGT file and return elevation data with georeferencing""" | |
| # Get file size to determine format | |
| file_size = os.path.getsize(hgt_file) | |
| if file_size == 1201 * 1201 * 2: # SRTM1 | |
| size = 1201 | |
| elif file_size == 3601 * 3601 * 2: # SRTM3 | |
| size = 3601 | |
| else: | |
| # Calculate size | |
| pixels = file_size // 2 | |
| size = int(np.sqrt(pixels)) | |
| print(f"Auto-detected size: {size}x{size}") | |
| # Extract coordinates from filename | |
| basename = os.path.basename(hgt_file) | |
| lat_str = basename[:3] | |
| lon_str = basename[3:7] | |
| if lat_str.startswith('N'): | |
| lat = int(lat_str[1:]) | |
| else: | |
| lat = -int(lat_str[1:]) | |
| if lon_str.startswith('E'): | |
| lon = int(lon_str[1:]) | |
| else: | |
| lon = -int(lon_str[1:]) | |
| # Read elevation data | |
| with open(hgt_file, 'rb') as f: | |
| data = f.read() | |
| # Convert to numpy array (big-endian signed 16-bit) | |
| elevation_data = np.frombuffer(data, dtype='>i2').reshape(size, size) | |
| # Calculate pixel size | |
| pixel_size = 1.0 / (size - 1) | |
| # Georeferencing info | |
| geotransform = [ | |
| lon, # Top-left X | |
| pixel_size, # X pixel size | |
| 0, # X rotation | |
| lat + 1, # Top-left Y | |
| 0, # Y rotation | |
| -pixel_size # Y pixel size (negative because Y decreases) | |
| ] | |
| return elevation_data, geotransform, size | |
| def clip_elevation_data(elevation_data, geotransform, size, bbox): | |
| """Clip elevation data to bounding box""" | |
| west, south, east, north = bbox | |
| # Calculate pixel coordinates | |
| top_left_x = geotransform[0] | |
| top_left_y = geotransform[3] | |
| pixel_size_x = geotransform[1] | |
| pixel_size_y = geotransform[5] # This is negative | |
| # Convert geographic coordinates to pixel coordinates | |
| x1 = int((west - top_left_x) / pixel_size_x) | |
| y1 = int((top_left_y - north) / abs(pixel_size_y)) | |
| x2 = int((east - top_left_x) / pixel_size_x) | |
| y2 = int((top_left_y - south) / abs(pixel_size_y)) | |
| # Ensure coordinates are within bounds | |
| x1 = max(0, min(x1, size - 1)) | |
| y1 = max(0, min(y1, size - 1)) | |
| x2 = max(0, min(x2, size - 1)) | |
| y2 = max(0, min(y2, size - 1)) | |
| # Clip the data | |
| clipped_data = elevation_data[y1:y2+1, x1:x2+1] | |
| # Update geotransform for clipped data | |
| new_geotransform = [ | |
| top_left_x + x1 * pixel_size_x, # New top-left X | |
| pixel_size_x, # X pixel size | |
| 0, # X rotation | |
| top_left_y + y1 * pixel_size_y, # New top-left Y | |
| 0, # Y rotation | |
| pixel_size_y # Y pixel size | |
| ] | |
| return clipped_data, new_geotransform | |
| def save_as_geotiff_basic(elevation_data, geotransform, output_file): | |
| """Save elevation data as a basic GeoTIFF (requires PIL)""" | |
| # Convert to unsigned 16-bit (adding offset to handle negative values) | |
| min_val = np.min(elevation_data) | |
| if min_val < 0: | |
| # Add offset to make all values positive | |
| offset = abs(min_val) | |
| adjusted_data = elevation_data + offset | |
| else: | |
| offset = 0 | |
| adjusted_data = elevation_data | |
| # Convert to uint16 | |
| adjusted_data = adjusted_data.astype(np.uint16) | |
| # Save as TIFF | |
| image = Image.fromarray(adjusted_data, mode='I;16') | |
| image.save(output_file) | |
| # Save metadata separately | |
| metadata_file = output_file.replace('.tif', '_metadata.txt') | |
| with open(metadata_file, 'w') as f: | |
| f.write(f"GeoTransform: {geotransform}\n") | |
| f.write(f"Offset: {offset}\n") | |
| f.write(f"Original min value: {min_val}\n") | |
| f.write(f"Size: {adjusted_data.shape}\n") | |
| return output_file, metadata_file | |
| def get_dem_elevation_tif(state: State) -> State: | |
| """ | |
| Download DEM data and save as TIF format in dem_files subdirectory | |
| """ | |
| try: | |
| state.status = "downloading_dem" | |
| # Validate required fields | |
| if not state.bbox: | |
| state.error_log.append("Bounding box is required for DEM download") | |
| state.status = "error" | |
| return state | |
| if not state.place_name: | |
| state.error_log.append("Place name is required for DEM download") | |
| state.status = "error" | |
| return state | |
| # Create working & sub-directories | |
| working_dir = Path(state.working_directory) | |
| dem_tiles_dir = working_dir / "dem_files" / "dem_tiles" | |
| dem_files_dir = working_dir / "dem_files" | |
| working_dir.mkdir(parents=True, exist_ok=True) | |
| dem_tiles_dir.mkdir(parents=True, exist_ok=True) | |
| dem_files_dir.mkdir(parents=True, exist_ok=True) | |
| state.parameters["dem_directory"] = str(dem_files_dir.resolve()) | |
| west, south, east, north = state.bbox | |
| place_safe = state.place_name.replace(" ", "_").replace(",", "").replace(".", "") | |
| output_file = dem_files_dir / f"{place_safe}_dem.tif" | |
| print(f"π Starting DEM download for {state.place_name}...") | |
| print(f"π Bounding box: {state.bbox}") | |
| print(f"π Output directory: {dem_files_dir}") | |
| lat_range = range(int(south), int(north) + 1) | |
| lon_range = range(int(west), int(east) + 1) | |
| all_elevation_data = [] | |
| all_geotransforms = [] | |
| downloaded_tiles = [] | |
| for lat in lat_range: | |
| for lon in lon_range: | |
| hgt_file = download_srtm_hgt(lat, lon, str(dem_tiles_dir)) | |
| if hgt_file: | |
| try: | |
| elevation_data, geotransform, size = read_hgt_file(hgt_file) | |
| clipped_data, clipped_geotransform = clip_elevation_data( | |
| elevation_data, geotransform, size, state.bbox | |
| ) | |
| all_elevation_data.append(clipped_data) | |
| all_geotransforms.append(clipped_geotransform) | |
| downloaded_tiles.append(os.path.basename(hgt_file)) | |
| print(f"β Processed {os.path.basename(hgt_file)}: {clipped_data.shape}") | |
| except Exception as e: | |
| err = f"Error processing {hgt_file}: {e}" | |
| state.error_log.append(err) | |
| print(f"β {err}") | |
| if not all_elevation_data: | |
| state.error_log.append("No elevation data processed successfully") | |
| state.status = "error" | |
| return state | |
| print(f"\nπ Processing {len(all_elevation_data)} elevation tiles...") | |
| if len(all_elevation_data) > 1: | |
| print("β οΈ Multiple tiles detected. Using first tile only (mosaicking not implemented).") | |
| final_data = all_elevation_data[0] | |
| final_geotransform = all_geotransforms[0] | |
| tif_file, metadata_file = save_as_geotiff_basic( | |
| final_data, final_geotransform, str(output_file) | |
| ) | |
| # Update DEM with proper CRS | |
| update_dem(tif_file, state) | |
| min_elev = float(np.min(final_data)) | |
| max_elev = float(np.max(final_data)) | |
| mean_elev = float(np.mean(final_data)) | |
| shape = final_data.shape | |
| state.output_files.append({ | |
| "type": "dem", | |
| "format": "geotiff", | |
| "file_path": str(tif_file), | |
| "metadata_file": str(metadata_file), | |
| "min_elevation": min_elev, | |
| "max_elevation": max_elev, | |
| "mean_elevation": mean_elev, | |
| "data_shape": shape, | |
| "downloaded_tiles": downloaded_tiles, | |
| "bbox": state.bbox, | |
| "geotransform": final_geotransform | |
| }) | |
| state.status = "dem_downloaded" | |
| print(f"\nπ― Success! DEM saved to: {tif_file}") | |
| print(f"π Elevation stats: Min={min_elev}, Max={max_elev}, Mean={mean_elev:.1f} m") | |
| print(f"π Data size: {shape}") | |
| return state | |
| except Exception as e: | |
| state.error_log.append(f"Unhandled error during DEM download: {e}") | |
| state.status = "error" | |
| print(f"β {e}") | |
| return state | |
| def update_dem(filepath, state): | |
| """Update DEM with proper CRS and transform""" | |
| input_path = filepath | |
| output_path = filepath | |
| crs = "EPSG:4326" # WGS84 Latitude/Longitude | |
| transform = from_origin( | |
| state.bbox[0], | |
| state.bbox[3], # Use north boundary | |
| 0.0008333, # pixel width (approx 30m resolution) | |
| 0.0008333 # pixel height (approx 30m resolution) | |
| ) | |
| with rasterio.open(input_path) as src: | |
| profile = src.profile | |
| data = src.read(1) | |
| profile.update({ | |
| 'crs': crs, | |
| 'transform': transform | |
| }) | |
| with rasterio.open(output_path, 'w', **profile) as dst: | |
| dst.write(data, 1) | |
| def download_chirps_tif(date: datetime, out_dir): | |
| """Download CHIRPS precipitation data""" | |
| y, m, d = date.strftime("%Y"), date.strftime("%m"), date.strftime("%d") | |
| filename = f"chirps-v2.0.{y}.{m}.{d}.tif" | |
| url = f"https://data.chc.ucsb.edu/products/CHIRPS-2.0/global_daily/tifs/p25/{y}/{filename}.gz" | |
| gz_path = os.path.join(out_dir, filename + ".gz") | |
| tif_path = os.path.join(out_dir, filename) | |
| if os.path.exists(tif_path): | |
| print(f"β Already downloaded: {filename}") | |
| return tif_path | |
| os.makedirs(out_dir, exist_ok=True) | |
| r = requests.get(url, stream=True) | |
| if r.status_code != 200: | |
| print(f"β Failed: {url}") | |
| return None | |
| with open(gz_path, "wb") as f: | |
| for chunk in r.iter_content(chunk_size=1024): | |
| if chunk: | |
| f.write(chunk) | |
| with gzip.open(gz_path, "rb") as f_in, open(tif_path, "wb") as f_out: | |
| shutil.copyfileobj(f_in, f_out) | |
| os.remove(gz_path) | |
| print(f"β Downloaded and extracted: {tif_path}") | |
| return tif_path | |
| def batch_download_chirps(start_date: str, end_date: str, out_dir): | |
| """Batch download CHIRPS data""" | |
| start = datetime.strptime(start_date, "%Y-%m-%d") | |
| end = datetime.strptime(end_date, "%Y-%m-%d") | |
| current = start | |
| today = datetime.utcnow().date() | |
| max_available = today - timedelta(days=3) | |
| while current <= end: | |
| if current.date() > max_available: | |
| print(f"β οΈ Skipping future/unavailable date: {current.strftime('%Y-%m-%d')}") | |
| else: | |
| download_chirps_tif(current, out_dir) | |
| current += timedelta(days=1) | |
| def get_rainfall_data(state: State) -> State: | |
| """ | |
| Download rainfall data and save in rainfall_data subdirectory | |
| """ | |
| try: | |
| state.status = "downloading_rainfall" | |
| # Create rainfall data directory | |
| working_dir = Path(state.working_directory) | |
| rainfall_dir = working_dir / "rainfall_data" | |
| rainfall_dir.mkdir(parents=True, exist_ok=True) | |
| print("π§οΈ Fetching rainfall data from same timeframe last year...") | |
| today = datetime.today() | |
| # Start: (today - 1 year - 7 days) | |
| start_dt = (today - relativedelta(years=1)) - timedelta(days=7) | |
| # End: (today - 1 year) | |
| end_dt = today - relativedelta(years=1) | |
| start_date = start_dt.strftime('%Y-%m-%d') | |
| end_date = end_dt.strftime('%Y-%m-%d') | |
| print(f"π Start Date: {start_date}") | |
| print(f"π End Date: {end_date}") | |
| batch_download_chirps(start_date, end_date, str(rainfall_dir)) | |
| # Count downloaded files | |
| downloaded_files = list(rainfall_dir.glob("*.tif")) | |
| state.output_files.append({ | |
| "type": "rainfall", | |
| "format": "geotiff", | |
| "directory": str(rainfall_dir), | |
| "file_count": len(downloaded_files), | |
| "date_range": f"{start_date} to {end_date}", | |
| "files": [str(f) for f in downloaded_files] | |
| }) | |
| state.status = "rainfall_downloaded" | |
| print(f"β Downloaded {len(downloaded_files)} rainfall files to {rainfall_dir}") | |
| return state | |
| except Exception as e: | |
| state.error_log.append(f"Error downloading rainfall data: {e}") | |
| state.status = "error" | |
| print(f"β {e}") | |
| return state | |
| def run_hydrology_generator(state: State) -> State: | |
| """ | |
| Run hydrological analysis and save outputs in hydrology_outputs subdirectory | |
| """ | |
| try: | |
| state.status = "running_hydrology" | |
| # Get DEM file from state | |
| dem_file = None | |
| for output in state.output_files: | |
| if output.get("type") == "dem": | |
| dem_file = output.get("file_path") | |
| break | |
| if not dem_file: | |
| state.error_log.append("No DEM file found in state for hydrology analysis") | |
| state.status = "error" | |
| return state | |
| # Create hydrology outputs directory | |
| working_dir = Path(state.working_directory) | |
| hydrology_dir = working_dir / "hydrology_outputs" | |
| hydrology_dir.mkdir(parents=True, exist_ok=True) | |
| # Ensure DEM exists | |
| dem_path = Path(dem_file) | |
| if not dem_path.exists(): | |
| state.error_log.append(f"DEM file not found at {dem_path}") | |
| state.status = "error" | |
| return state | |
| # Define output paths | |
| filled_dem = hydrology_dir / "dem_filled.tif" | |
| flow_pointer = hydrology_dir / "flow_dir.tif" | |
| flow_accum = hydrology_dir / "flow_acc.tif" | |
| stream_raster = hydrology_dir / "streams.tif" | |
| slope_path = hydrology_dir / "slope.tif" | |
| aspect_path = hydrology_dir / "aspect.tif" | |
| print(f"π Hydrology output directory: {hydrology_dir}") | |
| print("π Starting hydrological analysis...") | |
| print("π Generating Slope...") | |
| wbt.slope(dem=str(dem_path), output=str(slope_path), zfactor=1.0) | |
| if not slope_path.exists(): | |
| raise Exception("Slope file not generated") | |
| print("π§ Generating Aspect...") | |
| wbt.aspect(dem=str(dem_path), output=str(aspect_path)) | |
| if not aspect_path.exists(): | |
| raise Exception("Aspect file not generated") | |
| print("π₯ Running Fill Depressions...") | |
| wbt.fill_depressions(dem=str(dem_path), output=str(filled_dem)) | |
| if not filled_dem.exists(): | |
| raise Exception("Filled DEM not generated") | |
| print("π Calculating Flow Direction...") | |
| wbt.d8_pointer(dem=str(filled_dem), output=str(flow_pointer)) | |
| if not flow_pointer.exists(): | |
| raise Exception("Flow direction file not generated") | |
| print("π Flow Accumulation...") | |
| wbt.d8_flow_accumulation(i=str(filled_dem), output=str(flow_accum), out_type="cells") | |
| if not flow_accum.exists(): | |
| raise Exception("Flow accumulation file not generated") | |
| print("π§΅ Extracting Streams...") | |
| wbt.extract_streams(flow_accum=str(flow_accum), output=str(stream_raster), threshold=100) | |
| if not stream_raster.exists(): | |
| raise Exception("Stream raster not generated") | |
| hydrology_outputs = { | |
| "filled_dem": str(filled_dem), | |
| "flow_dir": str(flow_pointer), | |
| "flow_acc": str(flow_accum), | |
| "streams": str(stream_raster), | |
| "slope": str(slope_path), | |
| "aspect": str(aspect_path) | |
| } | |
| state.output_files.append({ | |
| "type": "hydrology", | |
| "format": "geotiff", | |
| "directory": str(hydrology_dir), | |
| "outputs": hydrology_outputs | |
| }) | |
| state.status = "hydrology_completed" | |
| print("β All hydrological outputs generated successfully.") | |
| return state | |
| except Exception as e: | |
| state.error_log.append(f"Error in hydrology analysis: {e}") | |
| state.status = "error" | |
| print(f"β {e}") | |
| return state | |
| def tidal_risk_from_osm(state: State) -> State: | |
| """ | |
| Generate tidal risk zones and save in tidal_risk subdirectory | |
| """ | |
| try: | |
| state.status = "generating_tidal_risk" | |
| # Create tidal risk directory | |
| working_dir = Path(state.working_directory) | |
| tidal_dir = working_dir / "tidal_risk" | |
| tidal_dir.mkdir(parents=True, exist_ok=True) | |
| place = state.place_name | |
| buffer_dist = 1000 # meters | |
| output_geojson = tidal_dir / f"{place.replace(' ', '_').replace(',', '')}_tidal_risk.geojson" | |
| print(f"π Fetching OSM water + coastline for {place}") | |
| # Get coastlines and water | |
| coast = ox.features_from_place(place, tags={"natural": "coastline"}) | |
| water = ox.features_from_place(place, tags={"natural": "water"}) | |
| # Combine and buffer | |
| coast = coast.to_crs("EPSG:3857") | |
| water = water.to_crs("EPSG:3857") | |
| combined = gpd.GeoDataFrame(pd.concat([coast, water], ignore_index=True), crs=coast.crs) | |
| print(f"π§± Found {len(combined)} features. Buffering...") | |
| risk_zone = combined.buffer(buffer_dist) | |
| risk_gdf = gpd.GeoDataFrame(geometry=risk_zone, crs="EPSG:3857").dissolve() | |
| risk_gdf = risk_gdf.to_crs("EPSG:4326") | |
| # Save as GeoJSON | |
| risk_gdf.to_file(output_geojson, driver="GeoJSON") | |
| state.output_files.append({ | |
| "type": "tidal_risk", | |
| "format": "geojson", | |
| "file_path": str(output_geojson), | |
| "buffer_distance": buffer_dist, | |
| "feature_count": len(combined) | |
| }) | |
| state.status = "tidal_risk_completed" | |
| print(f"β Saved Tidal Risk GeoJSON: {output_geojson}") | |
| return state | |
| except Exception as e: | |
| state.error_log.append(f"Error generating tidal risk: {e}") | |
| state.status = "error" | |
| print(f"β {e}") | |
| return state | |
| def get_healthcare_data(bbox, tags): | |
| """Fetch healthcare data from OSM""" | |
| minx, miny, maxx, maxy = bbox | |
| polygon = box(minx, miny, maxx, maxy) | |
| gdf = ox.features_from_polygon(polygon, tags=tags) | |
| gdf = gdf.to_crs("EPSG:4326") | |
| gdf["geometry"] = gdf.centroid | |
| return gdf | |
| def rasterize_healthcare_points(bbox, points_gdf, pixel_size=0.0005): | |
| """Rasterize healthcare points over a bounding box""" | |
| minx, miny, maxx, maxy = bbox | |
| width = int((maxx - minx) / pixel_size) | |
| height = int((maxy - miny) / pixel_size) | |
| transform = from_bounds(minx, miny, maxx, maxy, width, height) | |
| raster = np.zeros((height, width), dtype=np.uint8) | |
| for point in points_gdf.geometry: | |
| col, row = ~transform * (point.x, point.y) | |
| col, row = int(col), int(row) | |
| if 0 <= row < height and 0 <= col < width: | |
| raster[row, col] = 1 | |
| return raster, transform | |
| def compute_distance_transform(binary_raster, pixel_size_deg): | |
| """Compute Euclidean distance in meters from healthcare locations""" | |
| binary_mask = (binary_raster == 0).astype(np.uint8) | |
| distance_pixels = distance_transform_edt(binary_mask) | |
| distance_meters = distance_pixels * (111000 * pixel_size_deg) | |
| return distance_meters | |
| def save_distance_raster(distance_raster, transform, output_path, crs="EPSG:4326"): | |
| """Save distance raster to GeoTIFF""" | |
| with rasterio.open( | |
| output_path, | |
| "w", | |
| driver="GTiff", | |
| height=distance_raster.shape[0], | |
| width=distance_raster.shape[1], | |
| count=1, | |
| dtype=distance_raster.dtype, | |
| crs=CRS.from_string(crs), | |
| transform=transform, | |
| ) as dst: | |
| dst.write(distance_raster, 1) | |
| def generate_distance_to_healthcare(state: State) -> State: | |
| """ | |
| Generate distance raster to healthcare facilities in healthcare_analysis subdirectory | |
| """ | |
| try: | |
| state.status = "generating_healthcare_distance" | |
| # Create healthcare analysis directory | |
| working_dir = Path(state.working_directory) | |
| healthcare_dir = working_dir / "healthcare_analysis" | |
| healthcare_dir.mkdir(parents=True, exist_ok=True) | |
| output_path = healthcare_dir / "distance_to_healthcare.tif" | |
| print("π Fetching healthcare data from OpenStreetMap...") | |
| tags = {"amenity": ["hospital", "clinic", "doctors", "pharmacy"]} | |
| healthcare_gdf = get_healthcare_data(state.bbox, tags) | |
| print(f"πΊ Rasterizing {len(healthcare_gdf)} healthcare points...") | |
| pixel_size = 0.0005 | |
| binary_raster, transform = rasterize_healthcare_points(state.bbox, healthcare_gdf, pixel_size) | |
| print("π Computing distance transform...") | |
| distance_raster = compute_distance_transform(binary_raster, pixel_size) | |
| print(f"πΎ Saving to {output_path}...") | |
| save_distance_raster(distance_raster, transform, str(output_path)) | |
| state.output_files.append({ | |
| "type": "healthcare_distance", | |
| "format": "geotiff", | |
| "file_path": str(output_path), | |
| "healthcare_count": len(healthcare_gdf), | |
| "pixel_size": pixel_size | |
| }) | |
| state.status = "healthcare_distance_completed" | |
| print("β Done! Distance raster generated.") | |
| return state | |
| except Exception as e: | |
| state.error_log.append(f"Error generating healthcare distance: {e}") | |
| state.status = "error" | |
| print(f"β {e}") | |
| return state | |
| import osmnx as ox | |
| import geopandas as gpd | |
| from shapely.geometry import box | |
| import numpy as np | |
| import rasterio | |
| from rasterio.transform import from_bounds | |
| from rasterio.crs import CRS | |
| from scipy.ndimage import distance_transform_edt | |
| def get_infrastructure_gdf(bbox, tags): | |
| """Fetch infrastructure data using OSM.""" | |
| ox.settings.overpass_endpoint = "https://overpass.kumi.systems/api/interpreter" | |
| ox.settings.timeout = 60 | |
| polygon = box(*bbox) | |
| gdf = ox.features_from_polygon(polygon, tags=tags) | |
| gdf = gdf.to_crs("EPSG:4326") | |
| gdf["geometry"] = gdf.centroid | |
| return gdf | |
| def rasterize_points(gdf, bbox, pixel_size=0.0005): | |
| minx, miny, maxx, maxy = bbox | |
| width = int((maxx - minx) / pixel_size) | |
| height = int((maxy - miny) / pixel_size) | |
| transform = from_bounds(minx, miny, maxx, maxy, width, height) | |
| raster = np.zeros((height, width), dtype=np.uint8) | |
| for point in gdf.geometry: | |
| col, row = ~transform * (point.x, point.y) | |
| col, row = int(col), int(row) | |
| if 0 <= row < height and 0 <= col < width: | |
| raster[row, col] = 1 | |
| return raster, transform | |
| def save_raster(raster, transform, output_path, crs="EPSG:4326"): | |
| with rasterio.open( | |
| output_path, | |
| "w", | |
| driver="GTiff", | |
| height=raster.shape[0], | |
| width=raster.shape[1], | |
| count=1, | |
| dtype=raster.dtype, | |
| crs=CRS.from_string(crs), | |
| transform=transform, | |
| ) as dst: | |
| dst.write(raster, 1) | |
| def generate_infrastructure_tif(bbox, output_path="infrastructure.tif", pixel_size=0.0005, distance=False): | |
| """ | |
| Generate a binary or distance-based infrastructure raster. | |
| """ | |
| # Define infrastructure tags to fetch | |
| tags = { | |
| "highway": True, | |
| "building": True, | |
| "bridge": True, | |
| "railway": True | |
| } | |
| print("π Fetching infrastructure data...") | |
| gdf = get_infrastructure_gdf(bbox, tags) | |
| print(f"πΊ Rasterizing {len(gdf)} points...") | |
| raster, transform = rasterize_points(gdf, bbox, pixel_size) | |
| if distance: | |
| print("π Computing distance transform...") | |
| mask = (raster == 0).astype(np.uint8) | |
| raster = distance_transform_edt(mask) * (111000 * pixel_size) # meters | |
| print(f"πΎ Saving raster to {output_path}...") | |
| save_raster(raster, transform, output_path) | |
| print("β Done.") | |
| def get_infrastructure(state:State): | |
| generate_infrastructure_tif(state.bbox) | |
| section_breakdown_template = '''You are a Geospatial AI Agent with expertise in environmental modeling, GIS, and spatial data processing. | |
| Your job is to break down high-level geospatial analysis queries into a structured list of tasks, with clear descriptions and suggested tools (if any). | |
| Each task must be: | |
| Self-contained and descriptive | |
| Ordered for execution | |
| Mapped to an appropriate tool (if known) | |
| Ready to be passed to a task executor agent | |
| ποΈ Input Example | |
| User Goal: | |
| Analyze flood vulnerability for Chennai using DEM, rainfall, and infrastructure data. | |
| Tools available: | |
| get_dem_elevation_tif: Downloads DEM data which is base for any task | |
| run_hydrolysis_tool: Computes slope, flow direction, stream network | |
| get_rainfall_data: Retrieves rainfall from satellite or IMD data | |
| OSM_retriever: Downloads Infrastructure,road data for the given place | |
| visualize_geospatial_file: Creates maps from raster/vector layers | |
| llm: Used for reasoning, summarization, or decision-making | |
| ''' |