Spaces:
Sleeping
Sleeping
| import requests | |
| import os | |
| import gzip | |
| import numpy as np | |
| from PIL import Image | |
| import struct | |
| from pathlib import Path | |
| import rasterio | |
| from rasterio.transform import from_origin | |
| from geopy.geocoders import Nominatim | |
| from state import State | |
| geolocator = Nominatim(user_agent="lulc-retriever") | |
| def get_bbox(place): | |
| """Get bounding box for a place name""" | |
| location = geolocator.geocode(place) | |
| if location is None: | |
| raise ValueError(f"Could not geocode location: {place}") | |
| lat, lon = location.latitude, location.longitude | |
| buffer = 0.1 # degrees (~10km) | |
| return (lon - buffer, lat - buffer, lon + buffer, lat + buffer) | |
| def download_srtm_hgt(lat, lon, output_dir="dem_tiles"): | |
| """Download SRTM HGT file""" | |
| if not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| # Format tile name | |
| lat_str = f"N{lat:02d}" if lat >= 0 else f"S{abs(lat):02d}" | |
| lon_str = f"E{lon:03d}" if lon >= 0 else f"W{abs(lon):03d}" | |
| tile_name = f"{lat_str}{lon_str}.hgt" | |
| url = f"https://s3.amazonaws.com/elevation-tiles-prod/skadi/{lat_str}/{tile_name}.gz" | |
| output_path = os.path.join(output_dir, tile_name) | |
| if os.path.exists(output_path): | |
| return output_path | |
| try: | |
| print(f"Downloading {tile_name}...") | |
| response = requests.get(url, stream=True) | |
| response.raise_for_status() | |
| gz_path = output_path + ".gz" | |
| with open(gz_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| with gzip.open(gz_path, 'rb') as f_in: | |
| with open(output_path, 'wb') as f_out: | |
| f_out.write(f_in.read()) | |
| os.remove(gz_path) | |
| print(f"β Downloaded: {tile_name}") | |
| return output_path | |
| except Exception as e: | |
| print(f"β Failed to download {tile_name}: {e}") | |
| return None | |
| def read_hgt_file(hgt_file): | |
| """Read HGT file and return elevation data with georeferencing""" | |
| # Get file size to determine format | |
| file_size = os.path.getsize(hgt_file) | |
| if file_size == 1201 * 1201 * 2: # SRTM1 | |
| size = 1201 | |
| elif file_size == 3601 * 3601 * 2: # SRTM3 | |
| size = 3601 | |
| else: | |
| # Calculate size | |
| pixels = file_size // 2 | |
| size = int(np.sqrt(pixels)) | |
| print(f"Auto-detected size: {size}x{size}") | |
| # Extract coordinates from filename | |
| basename = os.path.basename(hgt_file) | |
| lat_str = basename[:3] | |
| lon_str = basename[3:7] | |
| if lat_str.startswith('N'): | |
| lat = int(lat_str[1:]) | |
| else: | |
| lat = -int(lat_str[1:]) | |
| if lon_str.startswith('E'): | |
| lon = int(lon_str[1:]) | |
| else: | |
| lon = -int(lon_str[1:]) | |
| # Read elevation data | |
| with open(hgt_file, 'rb') as f: | |
| data = f.read() | |
| # Convert to numpy array (big-endian signed 16-bit) | |
| elevation_data = np.frombuffer(data, dtype='>i2').reshape(size, size) | |
| # Calculate pixel size | |
| pixel_size = 1.0 / (size - 1) | |
| # Georeferencing info | |
| geotransform = [ | |
| lon, # Top-left X | |
| pixel_size, # X pixel size | |
| 0, # X rotation | |
| lat + 1, # Top-left Y | |
| 0, # Y rotation | |
| -pixel_size # Y pixel size (negative because Y decreases) | |
| ] | |
| return elevation_data, geotransform, size | |
| def clip_elevation_data(elevation_data, geotransform, size, bbox): | |
| """Clip elevation data to bounding box""" | |
| west, south, east, north = bbox | |
| # Calculate pixel coordinates | |
| top_left_x = geotransform[0] | |
| top_left_y = geotransform[3] | |
| pixel_size_x = geotransform[1] | |
| pixel_size_y = geotransform[5] # This is negative | |
| # Convert geographic coordinates to pixel coordinates | |
| x1 = int((west - top_left_x) / pixel_size_x) | |
| y1 = int((top_left_y - north) / abs(pixel_size_y)) | |
| x2 = int((east - top_left_x) / pixel_size_x) | |
| y2 = int((top_left_y - south) / abs(pixel_size_y)) | |
| # Ensure coordinates are within bounds | |
| x1 = max(0, min(x1, size - 1)) | |
| y1 = max(0, min(y1, size - 1)) | |
| x2 = max(0, min(x2, size - 1)) | |
| y2 = max(0, min(y2, size - 1)) | |
| # Clip the data | |
| clipped_data = elevation_data[y1:y2+1, x1:x2+1] | |
| # Update geotransform for clipped data | |
| new_geotransform = [ | |
| top_left_x + x1 * pixel_size_x, # New top-left X | |
| pixel_size_x, # X pixel size | |
| 0, # X rotation | |
| top_left_y + y1 * pixel_size_y, # New top-left Y | |
| 0, # Y rotation | |
| pixel_size_y # Y pixel size | |
| ] | |
| return clipped_data, new_geotransform | |
| def save_as_geotiff_basic(elevation_data, geotransform, output_file): | |
| """Save elevation data as a basic GeoTIFF (requires PIL)""" | |
| # Convert to unsigned 16-bit (adding offset to handle negative values) | |
| min_val = np.min(elevation_data) | |
| if min_val < 0: | |
| # Add offset to make all values positive | |
| offset = abs(min_val) | |
| adjusted_data = elevation_data + offset | |
| else: | |
| offset = 0 | |
| adjusted_data = elevation_data | |
| # Convert to uint16 | |
| adjusted_data = adjusted_data.astype(np.uint16) | |
| # Save as TIFF | |
| image = Image.fromarray(adjusted_data, mode='I;16') | |
| image.save(output_file) | |
| # Save metadata separately | |
| metadata_file = output_file.replace('.tif', '_metadata.txt') | |
| with open(metadata_file, 'w') as f: | |
| f.write(f"GeoTransform: {geotransform}\n") | |
| f.write(f"Offset: {offset}\n") | |
| f.write(f"Original min value: {min_val}\n") | |
| f.write(f"Size: {adjusted_data.shape}\n") | |
| return output_file, metadata_file | |
| def get_dem_elevation_tif(state: State) -> State: | |
| """ | |
| Download DEM data and save as TIF format in a subdirectory `dem_files` | |
| Args: | |
| state: State object containing bbox, place_name, and working_directory | |
| Returns: | |
| Updated State object with DEM file info | |
| """ | |
| try: | |
| state.status = "downloading_dem" | |
| # Validate required fields | |
| if not state.bbox: | |
| state.error_log.append("Bounding box is required for DEM download") | |
| state.status = "error" | |
| return state | |
| if not state.place_name: | |
| state.error_log.append("Place name is required for DEM download") | |
| state.status = "error" | |
| return state | |
| # Create working & sub-directories | |
| working_dir = Path(state.working_directory) | |
| dem_tiles_dir = working_dir / "dem_tiles" | |
| dem_files_dir = working_dir / "dem_files" | |
| working_dir.mkdir(parents=True, exist_ok=True) | |
| dem_tiles_dir.mkdir(parents=True, exist_ok=True) | |
| dem_files_dir.mkdir(parents=True, exist_ok=True) | |
| state.parameters["dem_directory"] = str(dem_files_dir.resolve()) | |
| west, south, east, north = state.bbox | |
| place_safe = state.place_name.replace(" ", "_").replace(",", "").replace(".", "") | |
| output_file = dem_files_dir / f"{place_safe}_dem.tif" | |
| print(f"π Starting DEM download for {state.place_name}...") | |
| print(f"π Bounding box: {state.bbox}") | |
| print(f"π Output directory: {dem_files_dir}") | |
| lat_range = range(int(south), int(north) + 1) | |
| lon_range = range(int(west), int(east) + 1) | |
| all_elevation_data = [] | |
| all_geotransforms = [] | |
| downloaded_tiles = [] | |
| for lat in lat_range: | |
| for lon in lon_range: | |
| hgt_file = download_srtm_hgt(lat, lon, str(dem_tiles_dir)) | |
| if hgt_file: | |
| try: | |
| elevation_data, geotransform, size = read_hgt_file(hgt_file) | |
| clipped_data, clipped_geotransform = clip_elevation_data( | |
| elevation_data, geotransform, size, state.bbox | |
| ) | |
| all_elevation_data.append(clipped_data) | |
| all_geotransforms.append(clipped_geotransform) | |
| downloaded_tiles.append(os.path.basename(hgt_file)) | |
| print(f"β Processed {os.path.basename(hgt_file)}: {clipped_data.shape}") | |
| except Exception as e: | |
| err = f"Error processing {hgt_file}: {e}" | |
| state.error_log.append(err) | |
| print(f"β {err}") | |
| if not all_elevation_data: | |
| state.error_log.append("No elevation data processed successfully") | |
| state.status = "error" | |
| return state | |
| print(f"\nπ Processing {len(all_elevation_data)} elevation tiles...") | |
| if len(all_elevation_data) > 1: | |
| print("β οΈ Multiple tiles detected. Using first tile only (mosaicking not implemented).") | |
| final_data = all_elevation_data[0] | |
| final_geotransform = all_geotransforms[0] | |
| tif_file, metadata_file = save_as_geotiff_basic( | |
| final_data, final_geotransform, str(output_file) | |
| ) | |
| min_elev = float(np.min(final_data)) | |
| max_elev = float(np.max(final_data)) | |
| mean_elev = float(np.mean(final_data)) | |
| shape = final_data.shape | |
| state.output_files.append({ | |
| "type": "dem", | |
| "format": "geotiff", | |
| "file_path": str(tif_file), | |
| "metadata_file": str(metadata_file), | |
| "min_elevation": min_elev, | |
| "max_elevation": max_elev, | |
| "mean_elevation": mean_elev, | |
| "data_shape": shape, | |
| "downloaded_tiles": downloaded_tiles, | |
| "bbox": state.bbox, | |
| "geotransform": final_geotransform | |
| }) | |
| state.status = "dem_downloaded" | |
| print(f"\nπ― Success! DEM saved to: {tif_file}") | |
| print(f"π Elevation stats: Min={min_elev}, Max={max_elev}, Mean={mean_elev:.1f} m") | |
| print(f"π Data size: {shape}") | |
| return state | |
| except Exception as e: | |
| state.error_log.append(f"Unhandled error during DEM download: {e}") | |
| state.status = "error" | |
| print(f"β {e}") | |
| return state | |
| def update_dem(filepath,state): | |
| input_path = filepath | |
| output_path = filepath | |
| # Example: Set CRS and transform manually | |
| # β οΈ Replace with correct values for Chennai SRTM if known | |
| crs = "EPSG:4326" # WGS84 Latitude/Longitude | |
| transform = from_origin( | |
| state.bbox[0], | |
| state.bbox[1], | |
| 0.0008333, # pixel width (approx 30m resolution) | |
| 0.0008333 # pixel height (approx 30m resolution) | |
| ) | |
| with rasterio.open(input_path) as src: | |
| profile = src.profile | |
| data = src.read(1) | |
| profile.update({ | |
| 'crs': crs, | |
| 'transform': transform | |
| }) | |
| with rasterio.open(output_path, 'w', **profile) as dst: | |
| dst.write(data, 1) | |
| import os | |
| import requests | |
| import gzip | |
| import shutil | |
| from datetime import datetime, timedelta | |
| from tqdm import tqdm | |
| def download_chirps_tif(date: datetime, out_dir="chirps_tifs"): | |
| y, m, d = date.strftime("%Y"), date.strftime("%m"), date.strftime("%d") | |
| filename = f"chirps-v2.0.{y}.{m}.{d}.tif" | |
| url = f"https://data.chc.ucsb.edu/products/CHIRPS-2.0/global_daily/tifs/p25/{y}/{filename}.gz" | |
| gz_path = os.path.join(out_dir, filename + ".gz") | |
| tif_path = os.path.join(out_dir, filename) | |
| if os.path.exists(tif_path): | |
| print(f"β Already downloaded: {filename}") | |
| return tif_path | |
| os.makedirs(out_dir, exist_ok=True) | |
| r = requests.get(url, stream=True) | |
| if r.status_code != 200: | |
| print(f"β Failed: {url}") | |
| return None | |
| with open(gz_path, "wb") as f: | |
| for chunk in r.iter_content(chunk_size=1024): | |
| if chunk: | |
| f.write(chunk) | |
| with gzip.open(gz_path, "rb") as f_in, open(tif_path, "wb") as f_out: | |
| shutil.copyfileobj(f_in, f_out) | |
| os.remove(gz_path) | |
| print(f"β Downloaded and extracted: {tif_path}") | |
| return tif_path | |
| def batch_download_chirps(start_date: str, end_date: str, out_dir="chirps_tifs"): | |
| start = datetime.strptime(start_date, "%Y-%m-%d") | |
| end = datetime.strptime(end_date, "%Y-%m-%d") | |
| current = start | |
| today = datetime.utcnow().date() | |
| max_available = today - timedelta(days=3) | |
| while current <= end: | |
| if current.date() > max_available: | |
| print(f"β οΈ Skipping future/unavailable date: {current.strftime('%Y-%m-%d')}") | |
| else: | |
| download_chirps_tif(current, out_dir) | |
| current += timedelta(days=1) | |
| from datetime import datetime, timedelta | |
| from dateutil.relativedelta import relativedelta | |
| def get_rainfall_data(state: State): | |
| print("Fetching rainfall data from same timeframe last year...") | |
| today = datetime.today() | |
| # Start: (today - 1 year - 7 days) | |
| start_dt = (today - relativedelta(years=1)) - timedelta(days=7) | |
| # End: (today - 1 year) | |
| end_dt = today - relativedelta(years=1) | |
| # Format as strings | |
| start_date = start_dt.strftime('%Y-%m-%d') | |
| end_date = end_dt.strftime('%Y-%m-%d') | |
| print("Start Date:", start_date) | |
| print("End Date:", end_date) | |
| batch_download_chirps(start_date, end_date, state.working_directory + "/rainfall_data") | |
| return state | |
| from whitebox import WhiteboxTools | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| import os | |
| load_dotenv() | |
| wbt = WhiteboxTools() | |
| wbt.set_verbose_mode(True) | |
| wbt.set_compress_rasters(False) | |
| def run_hydrology_generator(dem_path, output_dir=None): | |
| # Default to a folder named 'output' if none provided | |
| if not output_dir or output_dir.strip() == "": | |
| output_dir = "output" | |
| output_dir = Path(output_dir).resolve() # Get absolute path | |
| output_dir.mkdir(exist_ok=True, parents=True) | |
| # Ensure DEM exists | |
| dem_path = Path(dem_path) | |
| assert dem_path.exists(), f"β DEM not found at {dem_path}" | |
| # Use absolute paths for all outputs | |
| filled_dem = output_dir / "dem_filled.tif" | |
| print(f"π Output directory: {output_dir}") | |
| print(f"π Output file will be: {filled_dem}") | |
| # Rest of your code... | |
| # Ensure DEM exists | |
| dem_path = Path(dem_path) | |
| assert dem_path.exists(), f"β DEM not found at {dem_path}" | |
| filled_dem = output_dir / "dem_filled.tif" | |
| filled_dem.parent.mkdir(parents=True, exist_ok=True) | |
| flow_pointer = output_dir / "flow_dir.tif" | |
| flow_accum = output_dir / "flow_acc.tif" | |
| stream_raster = output_dir / "streams.tif" | |
| slope_path = output_dir / "slope.tif" | |
| aspect_path = output_dir / "aspect.tif" | |
| print("π Generating Slope...") | |
| wbt.slope(dem=str(dem_path), output=str(slope_path), zfactor=1.0) | |
| assert slope_path.exists(), "β Slope file not generated" | |
| print("π§ Generating Aspect...") | |
| wbt.aspect(dem=str(dem_path), output=str(aspect_path)) | |
| assert aspect_path.exists(), "β Aspect file not generated" | |
| print("π₯ Running Fill Depressions...") | |
| wbt.fill_depressions(dem=str(dem_path), output=str(filled_dem)) | |
| assert filled_dem.exists(), "β Filled DEM not generated." | |
| print("π Calculating Flow Direction...") | |
| wbt.d8_pointer(dem=str(filled_dem), output=str(flow_pointer)) | |
| assert flow_pointer.exists(), "β Flow direction file not generated." | |
| print("π Flow Accumulation...") | |
| wbt.d8_flow_accumulation(i=str(filled_dem), output=str(flow_accum), out_type="cells") | |
| assert flow_accum.exists(), "β Flow accumulation file not generated." | |
| print("π§΅ Extracting Streams...") | |
| wbt.extract_streams(flow_accum=str(flow_accum), output=str(stream_raster), threshold=100) | |
| assert stream_raster.exists(), "β Stream raster not generated." | |
| print("β All hydrological outputs generated successfully.") | |
| return { | |
| "filled_dem": str(filled_dem), | |
| "flow_dir": str(flow_pointer), | |
| "flow_acc": str(flow_accum), | |
| "streams": str(stream_raster), | |
| "slope": str(slope_path), | |
| "aspect": str(aspect_path) | |
| } | |
| import os | |
| import osmnx as ox | |
| import geopandas as gpd | |
| import os | |
| import osmnx as ox | |
| import geopandas as gpd | |
| import pandas as pd | |
| from datetime import datetime | |
| def fetch_osm_infrastructure(place: str, save_path: str): | |
| """ | |
| Fetch roads, buildings, schools, hospitals from OSM and save as one GeoJSON. | |
| Parameters: | |
| - place: str β e.g., "Bangalore, India" | |
| - save_path: str β Output GeoJSON path | |
| Returns: Combined GeoDataFrame | |
| """ | |
| start = datetime.now() | |
| print(f"π Fetching combined OSM infrastructure for: {place}") | |
| os.makedirs(os.path.dirname(save_path), exist_ok=True) | |
| all_gdfs = [] | |
| feature_tags = { | |
| "roads": {"highway": True}, | |
| "buildings": {"building": True}, | |
| "schools": {"amenity": "school"}, | |
| "hospitals": {"amenity": "hospital"} | |
| } | |
| for name, tags in feature_tags.items(): | |
| print(f"β‘οΈ Fetching {name}") | |
| try: | |
| gdf = ox.features_from_place(place, tags=tags) | |
| gdf["feature_type"] = name # add a column to indicate the type | |
| all_gdfs.append(gdf) | |
| except Exception as e: | |
| print(f"β οΈ Failed to fetch {name}: {e}") | |
| if not all_gdfs: | |
| raise RuntimeError("No OSM data was fetched.") | |
| combined_gdf = pd.concat(all_gdfs, ignore_index=True) | |
| combined_gdf = gpd.GeoDataFrame(combined_gdf, geometry="geometry", crs="EPSG:4326") | |
| combined_gdf.to_file(save_path, driver="GeoJSON") | |
| print(f"β Combined GeoJSON saved to: {save_path}") | |
| end = datetime.now() | |
| print((end-start)*1000) | |
| return combined_gdf | |
| def get_osm_infrastructure(state): | |
| base_dir = os.path.join(state.working_directory, "OSM_infrastructure") | |
| osm = fetch_osm_infrastructure( | |
| state.place_name, | |
| os.path.join(base_dir, "OSM.geojson") | |
| ) | |
| import osmnx as ox | |
| import geopandas as gpd | |
| import pandas as pd | |
| def tidal_risk_from_osm(place, buffer_dist=1000, output_geojson="tidal_risk_osm.geojson"): | |
| print(f"π Fetching OSM water + coastline for {place}") | |
| # 1. Get coastlines and water | |
| coast = ox.features_from_place(place, tags={"natural": "coastline"}) | |
| water = ox.features_from_place(place, tags={"natural": "water"}) | |
| # 2. Combine and buffer | |
| coast = coast.to_crs("EPSG:3857") | |
| water = water.to_crs("EPSG:3857") | |
| combined = gpd.GeoDataFrame(pd.concat([coast, water], ignore_index=True), crs=coast.crs) | |
| print(f"π§± Found {len(combined)} features. Buffering...") | |
| risk_zone = combined.buffer(buffer_dist) | |
| risk_gdf = gpd.GeoDataFrame(geometry=risk_zone, crs="EPSG:3857").dissolve() | |
| risk_gdf = risk_gdf.to_crs("EPSG:4326") | |
| # 3. Save as GeoJSON | |
| risk_gdf.to_file(output_geojson, driver="GeoJSON") | |
| print(f"β Saved Tidal Risk GeoJSON: {output_geojson}") | |
| return output_geojson | |
| import os | |
| import numpy as np | |
| import rasterio | |
| from rasterio.transform import from_bounds | |
| from rasterio.crs import CRS | |
| import osmnx as ox | |
| import geopandas as gpd | |
| from shapely.geometry import box | |
| from scipy.ndimage import distance_transform_edt | |
| def get_healthcare_data(bbox, tags): | |
| minx, miny, maxx, maxy = bbox | |
| polygon = box(minx, miny, maxx, maxy) | |
| # Fixed: Use features_from_polygon instead of geometries_from_polygon | |
| gdf = ox.features_from_polygon(polygon, tags=tags) | |
| gdf = gdf.to_crs("EPSG:4326") | |
| gdf["geometry"] = gdf.centroid | |
| return gdf | |
| def rasterize_healthcare_points(bbox, points_gdf, pixel_size=0.0005): | |
| """Rasterize healthcare points over a bounding box.""" | |
| minx, miny, maxx, maxy = bbox | |
| width = int((maxx - minx) / pixel_size) | |
| height = int((maxy - miny) / pixel_size) | |
| transform = from_bounds(minx, miny, maxx, maxy, width, height) | |
| raster = np.zeros((height, width), dtype=np.uint8) | |
| for point in points_gdf.geometry: | |
| col, row = ~transform * (point.x, point.y) | |
| col, row = int(col), int(row) | |
| if 0 <= row < height and 0 <= col < width: | |
| raster[row, col] = 1 | |
| return raster, transform | |
| def compute_distance_transform(binary_raster, pixel_size_deg): | |
| """Compute Euclidean distance in meters from healthcare locations.""" | |
| binary_mask = (binary_raster == 0).astype(np.uint8) | |
| distance_pixels = distance_transform_edt(binary_mask) | |
| distance_meters = distance_pixels * (111000 * pixel_size_deg) | |
| return distance_meters | |
| def save_distance_raster(distance_raster, transform, output_path, crs="EPSG:4326"): | |
| """Save distance raster to GeoTIFF.""" | |
| with rasterio.open( | |
| output_path, | |
| "w", | |
| driver="GTiff", | |
| height=distance_raster.shape[0], | |
| width=distance_raster.shape[1], | |
| count=1, | |
| dtype=distance_raster.dtype, | |
| crs=CRS.from_string(crs), | |
| transform=transform, | |
| ) as dst: | |
| dst.write(distance_raster, 1) | |
| def generate_distance_to_healthcare(bbox, output_path="distance_to_healthcare.tif"): | |
| """ | |
| Complete tool to generate distance raster to healthcare facilities. | |
| Parameters: | |
| - bbox: [minx, miny, maxx, maxy] for the area of interest | |
| - output_path: output GeoTIFF path | |
| """ | |
| print("π Fetching healthcare data from OpenStreetMap...") | |
| tags = {"amenity": ["hospital", "clinic", "doctors", "pharmacy"]} | |
| healthcare_gdf = get_healthcare_data(bbox, tags) | |
| print(f"πΊ Rasterizing {len(healthcare_gdf)} healthcare points...") | |
| pixel_size = 0.0005 | |
| binary_raster, transform = rasterize_healthcare_points(bbox, healthcare_gdf, pixel_size) | |
| print("π Computing distance transform...") | |
| distance_raster = compute_distance_transform(binary_raster, pixel_size) | |
| print(f"πΎ Saving to {output_path}...") | |
| save_distance_raster(distance_raster, transform, output_path) | |
| print("β Done! Distance raster generated.") | |
| import os | |
| import geopandas as gpd | |
| import rasterio | |
| import matplotlib.pyplot as plt | |
| from rasterio.plot import show | |
| from shapely.geometry import box | |
| import contextily as ctx | |
| def visualize_geospatial_file(file_path: str, output_path: str = "output_map.png"): | |
| """ | |
| Visualizes raster or vector geospatial files and saves the output as an image. | |
| Args: | |
| file_path (str): Path to the GeoTIFF (.tif), GeoJSON, Shapefile, etc. | |
| output_path (str): Path to save the output image (.png) | |
| """ | |
| ext = os.path.splitext(file_path)[1].lower() | |
| if ext in [".tif", ".tiff"]: | |
| with rasterio.open(file_path) as src: | |
| fig, ax = plt.subplots(figsize=(10, 10)) | |
| show(src, ax=ax, title="Raster Preview") | |
| ax.set_axis_off() | |
| plt.plot() | |
| return output_path | |
| elif ext in [".geojson", ".shp", ".gpkg"]: | |
| gdf = gpd.read_file(file_path) | |
| fig, ax = plt.subplots(figsize=(10, 10)) | |
| gdf.plot(ax=ax, edgecolor='black', linewidth=0.8, alpha=0.6, color='orange') | |
| # Add basemap if projection is set | |
| if gdf.crs and gdf.crs.to_epsg() == 4326: | |
| gdf = gdf.to_crs(epsg=3857) | |
| ctx.add_basemap(ax, source=ctx.providers.Stamen.TonerLite) | |
| ax.set_title("Vector Preview") | |
| ax.set_axis_off() | |
| plt.plot() | |
| return output_path | |
| else: | |
| raise ValueError(f"Unsupported file type: {ext}") | |
| import osmnx as ox | |
| import geopandas as gpd | |
| from shapely.geometry import box | |
| import numpy as np | |
| import rasterio | |
| from rasterio.transform import from_bounds | |
| from rasterio.crs import CRS | |
| from scipy.ndimage import distance_transform_edt | |
| def get_infrastructure_gdf(bbox, tags): | |
| """Fetch infrastructure data using OSM.""" | |
| ox.settings.overpass_endpoint = "https://overpass.kumi.systems/api/interpreter" | |
| ox.settings.timeout = 60 | |
| polygon = box(*bbox) | |
| gdf = ox.features_from_polygon(polygon, tags=tags) | |
| gdf = gdf.to_crs("EPSG:4326") | |
| gdf["geometry"] = gdf.centroid | |
| return gdf | |
| def rasterize_points(gdf, bbox, pixel_size=0.0005): | |
| minx, miny, maxx, maxy = bbox | |
| width = int((maxx - minx) / pixel_size) | |
| height = int((maxy - miny) / pixel_size) | |
| transform = from_bounds(minx, miny, maxx, maxy, width, height) | |
| raster = np.zeros((height, width), dtype=np.uint8) | |
| for point in gdf.geometry: | |
| col, row = ~transform * (point.x, point.y) | |
| col, row = int(col), int(row) | |
| if 0 <= row < height and 0 <= col < width: | |
| raster[row, col] = 1 | |
| return raster, transform | |
| def save_raster(raster, transform, output_path, crs="EPSG:4326"): | |
| with rasterio.open( | |
| output_path, | |
| "w", | |
| driver="GTiff", | |
| height=raster.shape[0], | |
| width=raster.shape[1], | |
| count=1, | |
| dtype=raster.dtype, | |
| crs=CRS.from_string(crs), | |
| transform=transform, | |
| ) as dst: | |
| dst.write(raster, 1) | |
| def generate_infrastructure_tif(bbox, output_path="infrastructure.tif", pixel_size=0.0005, distance=False): | |
| """ | |
| Generate a binary or distance-based infrastructure raster. | |
| """ | |
| # Define infrastructure tags to fetch | |
| tags = { | |
| "highway": True, | |
| "building": True, | |
| "bridge": True, | |
| "railway": True | |
| } | |
| print("π Fetching infrastructure data...") | |
| gdf = get_infrastructure_gdf(bbox, tags) | |
| print(f"πΊ Rasterizing {len(gdf)} points...") | |
| raster, transform = rasterize_points(gdf, bbox, pixel_size) | |
| if distance: | |
| print("π Computing distance transform...") | |
| mask = (raster == 0).astype(np.uint8) | |
| raster = distance_transform_edt(mask) * (111000 * pixel_size) # meters | |
| print(f"πΎ Saving raster to {output_path}...") | |
| save_raster(raster, transform, output_path) | |
| print("β Done.") | |
| def get_infrastructure(state:State): | |
| generate_infrastructure_tif(state.bbox) | |