import requests import os import gzip import numpy as np from PIL import Image import struct from pathlib import Path import rasterio from rasterio.transform import from_origin from geopy.geocoders import Nominatim from state import State geolocator = Nominatim(user_agent="lulc-retriever") def get_bbox(place): """Get bounding box for a place name""" location = geolocator.geocode(place) if location is None: raise ValueError(f"Could not geocode location: {place}") lat, lon = location.latitude, location.longitude buffer = 0.1 # degrees (~10km) return (lon - buffer, lat - buffer, lon + buffer, lat + buffer) def download_srtm_hgt(lat, lon, output_dir="dem_tiles"): """Download SRTM HGT file""" if not os.path.exists(output_dir): os.makedirs(output_dir) # Format tile name lat_str = f"N{lat:02d}" if lat >= 0 else f"S{abs(lat):02d}" lon_str = f"E{lon:03d}" if lon >= 0 else f"W{abs(lon):03d}" tile_name = f"{lat_str}{lon_str}.hgt" url = f"https://s3.amazonaws.com/elevation-tiles-prod/skadi/{lat_str}/{tile_name}.gz" output_path = os.path.join(output_dir, tile_name) if os.path.exists(output_path): return output_path try: print(f"Downloading {tile_name}...") response = requests.get(url, stream=True) response.raise_for_status() gz_path = output_path + ".gz" with open(gz_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) with gzip.open(gz_path, 'rb') as f_in: with open(output_path, 'wb') as f_out: f_out.write(f_in.read()) os.remove(gz_path) print(f"โœ… Downloaded: {tile_name}") return output_path except Exception as e: print(f"โŒ Failed to download {tile_name}: {e}") return None def read_hgt_file(hgt_file): """Read HGT file and return elevation data with georeferencing""" # Get file size to determine format file_size = os.path.getsize(hgt_file) if file_size == 1201 * 1201 * 2: # SRTM1 size = 1201 elif file_size == 3601 * 3601 * 2: # SRTM3 size = 3601 else: # Calculate size pixels = file_size // 2 size = int(np.sqrt(pixels)) print(f"Auto-detected size: {size}x{size}") # Extract coordinates from filename basename = os.path.basename(hgt_file) lat_str = basename[:3] lon_str = basename[3:7] if lat_str.startswith('N'): lat = int(lat_str[1:]) else: lat = -int(lat_str[1:]) if lon_str.startswith('E'): lon = int(lon_str[1:]) else: lon = -int(lon_str[1:]) # Read elevation data with open(hgt_file, 'rb') as f: data = f.read() # Convert to numpy array (big-endian signed 16-bit) elevation_data = np.frombuffer(data, dtype='>i2').reshape(size, size) # Calculate pixel size pixel_size = 1.0 / (size - 1) # Georeferencing info geotransform = [ lon, # Top-left X pixel_size, # X pixel size 0, # X rotation lat + 1, # Top-left Y 0, # Y rotation -pixel_size # Y pixel size (negative because Y decreases) ] return elevation_data, geotransform, size def clip_elevation_data(elevation_data, geotransform, size, bbox): """Clip elevation data to bounding box""" west, south, east, north = bbox # Calculate pixel coordinates top_left_x = geotransform[0] top_left_y = geotransform[3] pixel_size_x = geotransform[1] pixel_size_y = geotransform[5] # This is negative # Convert geographic coordinates to pixel coordinates x1 = int((west - top_left_x) / pixel_size_x) y1 = int((top_left_y - north) / abs(pixel_size_y)) x2 = int((east - top_left_x) / pixel_size_x) y2 = int((top_left_y - south) / abs(pixel_size_y)) # Ensure coordinates are within bounds x1 = max(0, min(x1, size - 1)) y1 = max(0, min(y1, size - 1)) x2 = max(0, min(x2, size - 1)) y2 = max(0, min(y2, size - 1)) # Clip the data clipped_data = elevation_data[y1:y2+1, x1:x2+1] # Update geotransform for clipped data new_geotransform = [ top_left_x + x1 * pixel_size_x, # New top-left X pixel_size_x, # X pixel size 0, # X rotation top_left_y + y1 * pixel_size_y, # New top-left Y 0, # Y rotation pixel_size_y # Y pixel size ] return clipped_data, new_geotransform def save_as_geotiff_basic(elevation_data, geotransform, output_file): """Save elevation data as a basic GeoTIFF (requires PIL)""" # Convert to unsigned 16-bit (adding offset to handle negative values) min_val = np.min(elevation_data) if min_val < 0: # Add offset to make all values positive offset = abs(min_val) adjusted_data = elevation_data + offset else: offset = 0 adjusted_data = elevation_data # Convert to uint16 adjusted_data = adjusted_data.astype(np.uint16) # Save as TIFF image = Image.fromarray(adjusted_data, mode='I;16') image.save(output_file) # Save metadata separately metadata_file = output_file.replace('.tif', '_metadata.txt') with open(metadata_file, 'w') as f: f.write(f"GeoTransform: {geotransform}\n") f.write(f"Offset: {offset}\n") f.write(f"Original min value: {min_val}\n") f.write(f"Size: {adjusted_data.shape}\n") return output_file, metadata_file def get_dem_elevation_tif(state: State) -> State: """ Download DEM data and save as TIF format in a subdirectory `dem_files` Args: state: State object containing bbox, place_name, and working_directory Returns: Updated State object with DEM file info """ try: state.status = "downloading_dem" # Validate required fields if not state.bbox: state.error_log.append("Bounding box is required for DEM download") state.status = "error" return state if not state.place_name: state.error_log.append("Place name is required for DEM download") state.status = "error" return state # Create working & sub-directories working_dir = Path(state.working_directory) dem_tiles_dir = working_dir / "dem_tiles" dem_files_dir = working_dir / "dem_files" working_dir.mkdir(parents=True, exist_ok=True) dem_tiles_dir.mkdir(parents=True, exist_ok=True) dem_files_dir.mkdir(parents=True, exist_ok=True) state.parameters["dem_directory"] = str(dem_files_dir.resolve()) west, south, east, north = state.bbox place_safe = state.place_name.replace(" ", "_").replace(",", "").replace(".", "") output_file = dem_files_dir / f"{place_safe}_dem.tif" print(f"๐Ÿš€ Starting DEM download for {state.place_name}...") print(f"๐Ÿ“ Bounding box: {state.bbox}") print(f"๐Ÿ“ Output directory: {dem_files_dir}") lat_range = range(int(south), int(north) + 1) lon_range = range(int(west), int(east) + 1) all_elevation_data = [] all_geotransforms = [] downloaded_tiles = [] for lat in lat_range: for lon in lon_range: hgt_file = download_srtm_hgt(lat, lon, str(dem_tiles_dir)) if hgt_file: try: elevation_data, geotransform, size = read_hgt_file(hgt_file) clipped_data, clipped_geotransform = clip_elevation_data( elevation_data, geotransform, size, state.bbox ) all_elevation_data.append(clipped_data) all_geotransforms.append(clipped_geotransform) downloaded_tiles.append(os.path.basename(hgt_file)) print(f"โœ… Processed {os.path.basename(hgt_file)}: {clipped_data.shape}") except Exception as e: err = f"Error processing {hgt_file}: {e}" state.error_log.append(err) print(f"โŒ {err}") if not all_elevation_data: state.error_log.append("No elevation data processed successfully") state.status = "error" return state print(f"\n๐Ÿ”„ Processing {len(all_elevation_data)} elevation tiles...") if len(all_elevation_data) > 1: print("โš ๏ธ Multiple tiles detected. Using first tile only (mosaicking not implemented).") final_data = all_elevation_data[0] final_geotransform = all_geotransforms[0] tif_file, metadata_file = save_as_geotiff_basic( final_data, final_geotransform, str(output_file) ) min_elev = float(np.min(final_data)) max_elev = float(np.max(final_data)) mean_elev = float(np.mean(final_data)) shape = final_data.shape state.output_files.append({ "type": "dem", "format": "geotiff", "file_path": str(tif_file), "metadata_file": str(metadata_file), "min_elevation": min_elev, "max_elevation": max_elev, "mean_elevation": mean_elev, "data_shape": shape, "downloaded_tiles": downloaded_tiles, "bbox": state.bbox, "geotransform": final_geotransform }) state.status = "dem_downloaded" print(f"\n๐ŸŽฏ Success! DEM saved to: {tif_file}") print(f"๐Ÿ“Š Elevation stats: Min={min_elev}, Max={max_elev}, Mean={mean_elev:.1f} m") print(f"๐Ÿ“ Data size: {shape}") return state except Exception as e: state.error_log.append(f"Unhandled error during DEM download: {e}") state.status = "error" print(f"โŒ {e}") return state def update_dem(filepath,state): input_path = filepath output_path = filepath # Example: Set CRS and transform manually # โš ๏ธ Replace with correct values for Chennai SRTM if known crs = "EPSG:4326" # WGS84 Latitude/Longitude transform = from_origin( state.bbox[0], state.bbox[1], 0.0008333, # pixel width (approx 30m resolution) 0.0008333 # pixel height (approx 30m resolution) ) with rasterio.open(input_path) as src: profile = src.profile data = src.read(1) profile.update({ 'crs': crs, 'transform': transform }) with rasterio.open(output_path, 'w', **profile) as dst: dst.write(data, 1) import os import requests import gzip import shutil from datetime import datetime, timedelta from tqdm import tqdm def download_chirps_tif(date: datetime, out_dir="chirps_tifs"): y, m, d = date.strftime("%Y"), date.strftime("%m"), date.strftime("%d") filename = f"chirps-v2.0.{y}.{m}.{d}.tif" url = f"https://data.chc.ucsb.edu/products/CHIRPS-2.0/global_daily/tifs/p25/{y}/{filename}.gz" gz_path = os.path.join(out_dir, filename + ".gz") tif_path = os.path.join(out_dir, filename) if os.path.exists(tif_path): print(f"โœ… Already downloaded: {filename}") return tif_path os.makedirs(out_dir, exist_ok=True) r = requests.get(url, stream=True) if r.status_code != 200: print(f"โŒ Failed: {url}") return None with open(gz_path, "wb") as f: for chunk in r.iter_content(chunk_size=1024): if chunk: f.write(chunk) with gzip.open(gz_path, "rb") as f_in, open(tif_path, "wb") as f_out: shutil.copyfileobj(f_in, f_out) os.remove(gz_path) print(f"โœ… Downloaded and extracted: {tif_path}") return tif_path def batch_download_chirps(start_date: str, end_date: str, out_dir="chirps_tifs"): start = datetime.strptime(start_date, "%Y-%m-%d") end = datetime.strptime(end_date, "%Y-%m-%d") current = start today = datetime.utcnow().date() max_available = today - timedelta(days=3) while current <= end: if current.date() > max_available: print(f"โš ๏ธ Skipping future/unavailable date: {current.strftime('%Y-%m-%d')}") else: download_chirps_tif(current, out_dir) current += timedelta(days=1) from datetime import datetime, timedelta from dateutil.relativedelta import relativedelta def get_rainfall_data(state: State): print("Fetching rainfall data from same timeframe last year...") today = datetime.today() # Start: (today - 1 year - 7 days) start_dt = (today - relativedelta(years=1)) - timedelta(days=7) # End: (today - 1 year) end_dt = today - relativedelta(years=1) # Format as strings start_date = start_dt.strftime('%Y-%m-%d') end_date = end_dt.strftime('%Y-%m-%d') print("Start Date:", start_date) print("End Date:", end_date) batch_download_chirps(start_date, end_date, state.working_directory + "/rainfall_data") return state from whitebox import WhiteboxTools from pathlib import Path from dotenv import load_dotenv import os load_dotenv() wbt = WhiteboxTools() wbt.set_verbose_mode(True) wbt.set_compress_rasters(False) def run_hydrology_generator(dem_path, output_dir=None): # Default to a folder named 'output' if none provided if not output_dir or output_dir.strip() == "": output_dir = "output" output_dir = Path(output_dir).resolve() # Get absolute path output_dir.mkdir(exist_ok=True, parents=True) # Ensure DEM exists dem_path = Path(dem_path) assert dem_path.exists(), f"โŒ DEM not found at {dem_path}" # Use absolute paths for all outputs filled_dem = output_dir / "dem_filled.tif" print(f"๐Ÿ“ Output directory: {output_dir}") print(f"๐Ÿ“ Output file will be: {filled_dem}") # Rest of your code... # Ensure DEM exists dem_path = Path(dem_path) assert dem_path.exists(), f"โŒ DEM not found at {dem_path}" filled_dem = output_dir / "dem_filled.tif" filled_dem.parent.mkdir(parents=True, exist_ok=True) flow_pointer = output_dir / "flow_dir.tif" flow_accum = output_dir / "flow_acc.tif" stream_raster = output_dir / "streams.tif" slope_path = output_dir / "slope.tif" aspect_path = output_dir / "aspect.tif" print("๐Ÿ“ Generating Slope...") wbt.slope(dem=str(dem_path), output=str(slope_path), zfactor=1.0) assert slope_path.exists(), "โŒ Slope file not generated" print("๐Ÿงญ Generating Aspect...") wbt.aspect(dem=str(dem_path), output=str(aspect_path)) assert aspect_path.exists(), "โŒ Aspect file not generated" print("๐Ÿ“ฅ Running Fill Depressions...") wbt.fill_depressions(dem=str(dem_path), output=str(filled_dem)) assert filled_dem.exists(), "โŒ Filled DEM not generated." print("๐Ÿ“ˆ Calculating Flow Direction...") wbt.d8_pointer(dem=str(filled_dem), output=str(flow_pointer)) assert flow_pointer.exists(), "โŒ Flow direction file not generated." print("๐ŸŒŠ Flow Accumulation...") wbt.d8_flow_accumulation(i=str(filled_dem), output=str(flow_accum), out_type="cells") assert flow_accum.exists(), "โŒ Flow accumulation file not generated." print("๐Ÿงต Extracting Streams...") wbt.extract_streams(flow_accum=str(flow_accum), output=str(stream_raster), threshold=100) assert stream_raster.exists(), "โŒ Stream raster not generated." print("โœ… All hydrological outputs generated successfully.") return { "filled_dem": str(filled_dem), "flow_dir": str(flow_pointer), "flow_acc": str(flow_accum), "streams": str(stream_raster), "slope": str(slope_path), "aspect": str(aspect_path) } import os import osmnx as ox import geopandas as gpd import os import osmnx as ox import geopandas as gpd import pandas as pd from datetime import datetime def fetch_osm_infrastructure(place: str, save_path: str): """ Fetch roads, buildings, schools, hospitals from OSM and save as one GeoJSON. Parameters: - place: str โ€” e.g., "Bangalore, India" - save_path: str โ€” Output GeoJSON path Returns: Combined GeoDataFrame """ start = datetime.now() print(f"๐Ÿ” Fetching combined OSM infrastructure for: {place}") os.makedirs(os.path.dirname(save_path), exist_ok=True) all_gdfs = [] feature_tags = { "roads": {"highway": True}, "buildings": {"building": True}, "schools": {"amenity": "school"}, "hospitals": {"amenity": "hospital"} } for name, tags in feature_tags.items(): print(f"โžก๏ธ Fetching {name}") try: gdf = ox.features_from_place(place, tags=tags) gdf["feature_type"] = name # add a column to indicate the type all_gdfs.append(gdf) except Exception as e: print(f"โš ๏ธ Failed to fetch {name}: {e}") if not all_gdfs: raise RuntimeError("No OSM data was fetched.") combined_gdf = pd.concat(all_gdfs, ignore_index=True) combined_gdf = gpd.GeoDataFrame(combined_gdf, geometry="geometry", crs="EPSG:4326") combined_gdf.to_file(save_path, driver="GeoJSON") print(f"โœ… Combined GeoJSON saved to: {save_path}") end = datetime.now() print((end-start)*1000) return combined_gdf def get_osm_infrastructure(state): base_dir = os.path.join(state.working_directory, "OSM_infrastructure") osm = fetch_osm_infrastructure( state.place_name, os.path.join(base_dir, "OSM.geojson") ) import osmnx as ox import geopandas as gpd import pandas as pd def tidal_risk_from_osm(place, buffer_dist=1000, output_geojson="tidal_risk_osm.geojson"): print(f"๐ŸŒ Fetching OSM water + coastline for {place}") # 1. Get coastlines and water coast = ox.features_from_place(place, tags={"natural": "coastline"}) water = ox.features_from_place(place, tags={"natural": "water"}) # 2. Combine and buffer coast = coast.to_crs("EPSG:3857") water = water.to_crs("EPSG:3857") combined = gpd.GeoDataFrame(pd.concat([coast, water], ignore_index=True), crs=coast.crs) print(f"๐Ÿงฑ Found {len(combined)} features. Buffering...") risk_zone = combined.buffer(buffer_dist) risk_gdf = gpd.GeoDataFrame(geometry=risk_zone, crs="EPSG:3857").dissolve() risk_gdf = risk_gdf.to_crs("EPSG:4326") # 3. Save as GeoJSON risk_gdf.to_file(output_geojson, driver="GeoJSON") print(f"โœ… Saved Tidal Risk GeoJSON: {output_geojson}") return output_geojson import os import numpy as np import rasterio from rasterio.transform import from_bounds from rasterio.crs import CRS import osmnx as ox import geopandas as gpd from shapely.geometry import box from scipy.ndimage import distance_transform_edt def get_healthcare_data(bbox, tags): minx, miny, maxx, maxy = bbox polygon = box(minx, miny, maxx, maxy) # Fixed: Use features_from_polygon instead of geometries_from_polygon gdf = ox.features_from_polygon(polygon, tags=tags) gdf = gdf.to_crs("EPSG:4326") gdf["geometry"] = gdf.centroid return gdf def rasterize_healthcare_points(bbox, points_gdf, pixel_size=0.0005): """Rasterize healthcare points over a bounding box.""" minx, miny, maxx, maxy = bbox width = int((maxx - minx) / pixel_size) height = int((maxy - miny) / pixel_size) transform = from_bounds(minx, miny, maxx, maxy, width, height) raster = np.zeros((height, width), dtype=np.uint8) for point in points_gdf.geometry: col, row = ~transform * (point.x, point.y) col, row = int(col), int(row) if 0 <= row < height and 0 <= col < width: raster[row, col] = 1 return raster, transform def compute_distance_transform(binary_raster, pixel_size_deg): """Compute Euclidean distance in meters from healthcare locations.""" binary_mask = (binary_raster == 0).astype(np.uint8) distance_pixels = distance_transform_edt(binary_mask) distance_meters = distance_pixels * (111000 * pixel_size_deg) return distance_meters def save_distance_raster(distance_raster, transform, output_path, crs="EPSG:4326"): """Save distance raster to GeoTIFF.""" with rasterio.open( output_path, "w", driver="GTiff", height=distance_raster.shape[0], width=distance_raster.shape[1], count=1, dtype=distance_raster.dtype, crs=CRS.from_string(crs), transform=transform, ) as dst: dst.write(distance_raster, 1) def generate_distance_to_healthcare(bbox, output_path="distance_to_healthcare.tif"): """ Complete tool to generate distance raster to healthcare facilities. Parameters: - bbox: [minx, miny, maxx, maxy] for the area of interest - output_path: output GeoTIFF path """ print("๐Ÿ” Fetching healthcare data from OpenStreetMap...") tags = {"amenity": ["hospital", "clinic", "doctors", "pharmacy"]} healthcare_gdf = get_healthcare_data(bbox, tags) print(f"๐Ÿ—บ Rasterizing {len(healthcare_gdf)} healthcare points...") pixel_size = 0.0005 binary_raster, transform = rasterize_healthcare_points(bbox, healthcare_gdf, pixel_size) print("๐Ÿ“ Computing distance transform...") distance_raster = compute_distance_transform(binary_raster, pixel_size) print(f"๐Ÿ’พ Saving to {output_path}...") save_distance_raster(distance_raster, transform, output_path) print("โœ… Done! Distance raster generated.") import os import geopandas as gpd import rasterio import matplotlib.pyplot as plt from rasterio.plot import show from shapely.geometry import box import contextily as ctx def visualize_geospatial_file(file_path: str, output_path: str = "output_map.png"): """ Visualizes raster or vector geospatial files and saves the output as an image. Args: file_path (str): Path to the GeoTIFF (.tif), GeoJSON, Shapefile, etc. output_path (str): Path to save the output image (.png) """ ext = os.path.splitext(file_path)[1].lower() if ext in [".tif", ".tiff"]: with rasterio.open(file_path) as src: fig, ax = plt.subplots(figsize=(10, 10)) show(src, ax=ax, title="Raster Preview") ax.set_axis_off() plt.plot() return output_path elif ext in [".geojson", ".shp", ".gpkg"]: gdf = gpd.read_file(file_path) fig, ax = plt.subplots(figsize=(10, 10)) gdf.plot(ax=ax, edgecolor='black', linewidth=0.8, alpha=0.6, color='orange') # Add basemap if projection is set if gdf.crs and gdf.crs.to_epsg() == 4326: gdf = gdf.to_crs(epsg=3857) ctx.add_basemap(ax, source=ctx.providers.Stamen.TonerLite) ax.set_title("Vector Preview") ax.set_axis_off() plt.plot() return output_path else: raise ValueError(f"Unsupported file type: {ext}") import osmnx as ox import geopandas as gpd from shapely.geometry import box import numpy as np import rasterio from rasterio.transform import from_bounds from rasterio.crs import CRS from scipy.ndimage import distance_transform_edt def get_infrastructure_gdf(bbox, tags): """Fetch infrastructure data using OSM.""" ox.settings.overpass_endpoint = "https://overpass.kumi.systems/api/interpreter" ox.settings.timeout = 60 polygon = box(*bbox) gdf = ox.features_from_polygon(polygon, tags=tags) gdf = gdf.to_crs("EPSG:4326") gdf["geometry"] = gdf.centroid return gdf def rasterize_points(gdf, bbox, pixel_size=0.0005): minx, miny, maxx, maxy = bbox width = int((maxx - minx) / pixel_size) height = int((maxy - miny) / pixel_size) transform = from_bounds(minx, miny, maxx, maxy, width, height) raster = np.zeros((height, width), dtype=np.uint8) for point in gdf.geometry: col, row = ~transform * (point.x, point.y) col, row = int(col), int(row) if 0 <= row < height and 0 <= col < width: raster[row, col] = 1 return raster, transform def save_raster(raster, transform, output_path, crs="EPSG:4326"): with rasterio.open( output_path, "w", driver="GTiff", height=raster.shape[0], width=raster.shape[1], count=1, dtype=raster.dtype, crs=CRS.from_string(crs), transform=transform, ) as dst: dst.write(raster, 1) def generate_infrastructure_tif(bbox, output_path="infrastructure.tif", pixel_size=0.0005, distance=False): """ Generate a binary or distance-based infrastructure raster. """ # Define infrastructure tags to fetch tags = { "highway": True, "building": True, "bridge": True, "railway": True } print("๐Ÿ” Fetching infrastructure data...") gdf = get_infrastructure_gdf(bbox, tags) print(f"๐Ÿ—บ Rasterizing {len(gdf)} points...") raster, transform = rasterize_points(gdf, bbox, pixel_size) if distance: print("๐Ÿ“ Computing distance transform...") mask = (raster == 0).astype(np.uint8) raster = distance_transform_edt(mask) * (111000 * pixel_size) # meters print(f"๐Ÿ’พ Saving raster to {output_path}...") save_raster(raster, transform, output_path) print("โœ… Done.") def get_infrastructure(state:State): generate_infrastructure_tif(state.bbox)