shreyankisiri's picture
Upload 7 files
9bcc127 verified
import requests
import os
import gzip
import numpy as np
from PIL import Image
import struct
from pathlib import Path
import rasterio
from rasterio.transform import from_origin
from geopy.geocoders import Nominatim
from state import State
geolocator = Nominatim(user_agent="lulc-retriever")
def get_bbox(place):
"""Get bounding box for a place name"""
location = geolocator.geocode(place)
if location is None:
raise ValueError(f"Could not geocode location: {place}")
lat, lon = location.latitude, location.longitude
buffer = 0.1 # degrees (~10km)
return (lon - buffer, lat - buffer, lon + buffer, lat + buffer)
def download_srtm_hgt(lat, lon, output_dir="dem_tiles"):
"""Download SRTM HGT file"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Format tile name
lat_str = f"N{lat:02d}" if lat >= 0 else f"S{abs(lat):02d}"
lon_str = f"E{lon:03d}" if lon >= 0 else f"W{abs(lon):03d}"
tile_name = f"{lat_str}{lon_str}.hgt"
url = f"https://s3.amazonaws.com/elevation-tiles-prod/skadi/{lat_str}/{tile_name}.gz"
output_path = os.path.join(output_dir, tile_name)
if os.path.exists(output_path):
return output_path
try:
print(f"Downloading {tile_name}...")
response = requests.get(url, stream=True)
response.raise_for_status()
gz_path = output_path + ".gz"
with open(gz_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
with gzip.open(gz_path, 'rb') as f_in:
with open(output_path, 'wb') as f_out:
f_out.write(f_in.read())
os.remove(gz_path)
print(f"βœ… Downloaded: {tile_name}")
return output_path
except Exception as e:
print(f"❌ Failed to download {tile_name}: {e}")
return None
def read_hgt_file(hgt_file):
"""Read HGT file and return elevation data with georeferencing"""
# Get file size to determine format
file_size = os.path.getsize(hgt_file)
if file_size == 1201 * 1201 * 2: # SRTM1
size = 1201
elif file_size == 3601 * 3601 * 2: # SRTM3
size = 3601
else:
# Calculate size
pixels = file_size // 2
size = int(np.sqrt(pixels))
print(f"Auto-detected size: {size}x{size}")
# Extract coordinates from filename
basename = os.path.basename(hgt_file)
lat_str = basename[:3]
lon_str = basename[3:7]
if lat_str.startswith('N'):
lat = int(lat_str[1:])
else:
lat = -int(lat_str[1:])
if lon_str.startswith('E'):
lon = int(lon_str[1:])
else:
lon = -int(lon_str[1:])
# Read elevation data
with open(hgt_file, 'rb') as f:
data = f.read()
# Convert to numpy array (big-endian signed 16-bit)
elevation_data = np.frombuffer(data, dtype='>i2').reshape(size, size)
# Calculate pixel size
pixel_size = 1.0 / (size - 1)
# Georeferencing info
geotransform = [
lon, # Top-left X
pixel_size, # X pixel size
0, # X rotation
lat + 1, # Top-left Y
0, # Y rotation
-pixel_size # Y pixel size (negative because Y decreases)
]
return elevation_data, geotransform, size
def clip_elevation_data(elevation_data, geotransform, size, bbox):
"""Clip elevation data to bounding box"""
west, south, east, north = bbox
# Calculate pixel coordinates
top_left_x = geotransform[0]
top_left_y = geotransform[3]
pixel_size_x = geotransform[1]
pixel_size_y = geotransform[5] # This is negative
# Convert geographic coordinates to pixel coordinates
x1 = int((west - top_left_x) / pixel_size_x)
y1 = int((top_left_y - north) / abs(pixel_size_y))
x2 = int((east - top_left_x) / pixel_size_x)
y2 = int((top_left_y - south) / abs(pixel_size_y))
# Ensure coordinates are within bounds
x1 = max(0, min(x1, size - 1))
y1 = max(0, min(y1, size - 1))
x2 = max(0, min(x2, size - 1))
y2 = max(0, min(y2, size - 1))
# Clip the data
clipped_data = elevation_data[y1:y2+1, x1:x2+1]
# Update geotransform for clipped data
new_geotransform = [
top_left_x + x1 * pixel_size_x, # New top-left X
pixel_size_x, # X pixel size
0, # X rotation
top_left_y + y1 * pixel_size_y, # New top-left Y
0, # Y rotation
pixel_size_y # Y pixel size
]
return clipped_data, new_geotransform
def save_as_geotiff_basic(elevation_data, geotransform, output_file):
"""Save elevation data as a basic GeoTIFF (requires PIL)"""
# Convert to unsigned 16-bit (adding offset to handle negative values)
min_val = np.min(elevation_data)
if min_val < 0:
# Add offset to make all values positive
offset = abs(min_val)
adjusted_data = elevation_data + offset
else:
offset = 0
adjusted_data = elevation_data
# Convert to uint16
adjusted_data = adjusted_data.astype(np.uint16)
# Save as TIFF
image = Image.fromarray(adjusted_data, mode='I;16')
image.save(output_file)
# Save metadata separately
metadata_file = output_file.replace('.tif', '_metadata.txt')
with open(metadata_file, 'w') as f:
f.write(f"GeoTransform: {geotransform}\n")
f.write(f"Offset: {offset}\n")
f.write(f"Original min value: {min_val}\n")
f.write(f"Size: {adjusted_data.shape}\n")
return output_file, metadata_file
def get_dem_elevation_tif(state: State) -> State:
"""
Download DEM data and save as TIF format in a subdirectory `dem_files`
Args:
state: State object containing bbox, place_name, and working_directory
Returns:
Updated State object with DEM file info
"""
try:
state.status = "downloading_dem"
# Validate required fields
if not state.bbox:
state.error_log.append("Bounding box is required for DEM download")
state.status = "error"
return state
if not state.place_name:
state.error_log.append("Place name is required for DEM download")
state.status = "error"
return state
# Create working & sub-directories
working_dir = Path(state.working_directory)
dem_tiles_dir = working_dir / "dem_tiles"
dem_files_dir = working_dir / "dem_files"
working_dir.mkdir(parents=True, exist_ok=True)
dem_tiles_dir.mkdir(parents=True, exist_ok=True)
dem_files_dir.mkdir(parents=True, exist_ok=True)
state.parameters["dem_directory"] = str(dem_files_dir.resolve())
west, south, east, north = state.bbox
place_safe = state.place_name.replace(" ", "_").replace(",", "").replace(".", "")
output_file = dem_files_dir / f"{place_safe}_dem.tif"
print(f"πŸš€ Starting DEM download for {state.place_name}...")
print(f"πŸ“ Bounding box: {state.bbox}")
print(f"πŸ“ Output directory: {dem_files_dir}")
lat_range = range(int(south), int(north) + 1)
lon_range = range(int(west), int(east) + 1)
all_elevation_data = []
all_geotransforms = []
downloaded_tiles = []
for lat in lat_range:
for lon in lon_range:
hgt_file = download_srtm_hgt(lat, lon, str(dem_tiles_dir))
if hgt_file:
try:
elevation_data, geotransform, size = read_hgt_file(hgt_file)
clipped_data, clipped_geotransform = clip_elevation_data(
elevation_data, geotransform, size, state.bbox
)
all_elevation_data.append(clipped_data)
all_geotransforms.append(clipped_geotransform)
downloaded_tiles.append(os.path.basename(hgt_file))
print(f"βœ… Processed {os.path.basename(hgt_file)}: {clipped_data.shape}")
except Exception as e:
err = f"Error processing {hgt_file}: {e}"
state.error_log.append(err)
print(f"❌ {err}")
if not all_elevation_data:
state.error_log.append("No elevation data processed successfully")
state.status = "error"
return state
print(f"\nπŸ”„ Processing {len(all_elevation_data)} elevation tiles...")
if len(all_elevation_data) > 1:
print("⚠️ Multiple tiles detected. Using first tile only (mosaicking not implemented).")
final_data = all_elevation_data[0]
final_geotransform = all_geotransforms[0]
tif_file, metadata_file = save_as_geotiff_basic(
final_data, final_geotransform, str(output_file)
)
min_elev = float(np.min(final_data))
max_elev = float(np.max(final_data))
mean_elev = float(np.mean(final_data))
shape = final_data.shape
state.output_files.append({
"type": "dem",
"format": "geotiff",
"file_path": str(tif_file),
"metadata_file": str(metadata_file),
"min_elevation": min_elev,
"max_elevation": max_elev,
"mean_elevation": mean_elev,
"data_shape": shape,
"downloaded_tiles": downloaded_tiles,
"bbox": state.bbox,
"geotransform": final_geotransform
})
state.status = "dem_downloaded"
print(f"\n🎯 Success! DEM saved to: {tif_file}")
print(f"πŸ“Š Elevation stats: Min={min_elev}, Max={max_elev}, Mean={mean_elev:.1f} m")
print(f"πŸ“ Data size: {shape}")
return state
except Exception as e:
state.error_log.append(f"Unhandled error during DEM download: {e}")
state.status = "error"
print(f"❌ {e}")
return state
def update_dem(filepath,state):
input_path = filepath
output_path = filepath
# Example: Set CRS and transform manually
# ⚠️ Replace with correct values for Chennai SRTM if known
crs = "EPSG:4326" # WGS84 Latitude/Longitude
transform = from_origin(
state.bbox[0],
state.bbox[1],
0.0008333, # pixel width (approx 30m resolution)
0.0008333 # pixel height (approx 30m resolution)
)
with rasterio.open(input_path) as src:
profile = src.profile
data = src.read(1)
profile.update({
'crs': crs,
'transform': transform
})
with rasterio.open(output_path, 'w', **profile) as dst:
dst.write(data, 1)
import os
import requests
import gzip
import shutil
from datetime import datetime, timedelta
from tqdm import tqdm
def download_chirps_tif(date: datetime, out_dir="chirps_tifs"):
y, m, d = date.strftime("%Y"), date.strftime("%m"), date.strftime("%d")
filename = f"chirps-v2.0.{y}.{m}.{d}.tif"
url = f"https://data.chc.ucsb.edu/products/CHIRPS-2.0/global_daily/tifs/p25/{y}/{filename}.gz"
gz_path = os.path.join(out_dir, filename + ".gz")
tif_path = os.path.join(out_dir, filename)
if os.path.exists(tif_path):
print(f"βœ… Already downloaded: {filename}")
return tif_path
os.makedirs(out_dir, exist_ok=True)
r = requests.get(url, stream=True)
if r.status_code != 200:
print(f"❌ Failed: {url}")
return None
with open(gz_path, "wb") as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
with gzip.open(gz_path, "rb") as f_in, open(tif_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(gz_path)
print(f"βœ… Downloaded and extracted: {tif_path}")
return tif_path
def batch_download_chirps(start_date: str, end_date: str, out_dir="chirps_tifs"):
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
current = start
today = datetime.utcnow().date()
max_available = today - timedelta(days=3)
while current <= end:
if current.date() > max_available:
print(f"⚠️ Skipping future/unavailable date: {current.strftime('%Y-%m-%d')}")
else:
download_chirps_tif(current, out_dir)
current += timedelta(days=1)
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
def get_rainfall_data(state: State):
print("Fetching rainfall data from same timeframe last year...")
today = datetime.today()
# Start: (today - 1 year - 7 days)
start_dt = (today - relativedelta(years=1)) - timedelta(days=7)
# End: (today - 1 year)
end_dt = today - relativedelta(years=1)
# Format as strings
start_date = start_dt.strftime('%Y-%m-%d')
end_date = end_dt.strftime('%Y-%m-%d')
print("Start Date:", start_date)
print("End Date:", end_date)
batch_download_chirps(start_date, end_date, state.working_directory + "/rainfall_data")
return state
from whitebox import WhiteboxTools
from pathlib import Path
from dotenv import load_dotenv
import os
load_dotenv()
wbt = WhiteboxTools()
wbt.set_verbose_mode(True)
wbt.set_compress_rasters(False)
def run_hydrology_generator(dem_path, output_dir=None):
# Default to a folder named 'output' if none provided
if not output_dir or output_dir.strip() == "":
output_dir = "output"
output_dir = Path(output_dir).resolve() # Get absolute path
output_dir.mkdir(exist_ok=True, parents=True)
# Ensure DEM exists
dem_path = Path(dem_path)
assert dem_path.exists(), f"❌ DEM not found at {dem_path}"
# Use absolute paths for all outputs
filled_dem = output_dir / "dem_filled.tif"
print(f"πŸ“ Output directory: {output_dir}")
print(f"πŸ“ Output file will be: {filled_dem}")
# Rest of your code...
# Ensure DEM exists
dem_path = Path(dem_path)
assert dem_path.exists(), f"❌ DEM not found at {dem_path}"
filled_dem = output_dir / "dem_filled.tif"
filled_dem.parent.mkdir(parents=True, exist_ok=True)
flow_pointer = output_dir / "flow_dir.tif"
flow_accum = output_dir / "flow_acc.tif"
stream_raster = output_dir / "streams.tif"
slope_path = output_dir / "slope.tif"
aspect_path = output_dir / "aspect.tif"
print("πŸ“ Generating Slope...")
wbt.slope(dem=str(dem_path), output=str(slope_path), zfactor=1.0)
assert slope_path.exists(), "❌ Slope file not generated"
print("🧭 Generating Aspect...")
wbt.aspect(dem=str(dem_path), output=str(aspect_path))
assert aspect_path.exists(), "❌ Aspect file not generated"
print("πŸ“₯ Running Fill Depressions...")
wbt.fill_depressions(dem=str(dem_path), output=str(filled_dem))
assert filled_dem.exists(), "❌ Filled DEM not generated."
print("πŸ“ˆ Calculating Flow Direction...")
wbt.d8_pointer(dem=str(filled_dem), output=str(flow_pointer))
assert flow_pointer.exists(), "❌ Flow direction file not generated."
print("🌊 Flow Accumulation...")
wbt.d8_flow_accumulation(i=str(filled_dem), output=str(flow_accum), out_type="cells")
assert flow_accum.exists(), "❌ Flow accumulation file not generated."
print("🧡 Extracting Streams...")
wbt.extract_streams(flow_accum=str(flow_accum), output=str(stream_raster), threshold=100)
assert stream_raster.exists(), "❌ Stream raster not generated."
print("βœ… All hydrological outputs generated successfully.")
return {
"filled_dem": str(filled_dem),
"flow_dir": str(flow_pointer),
"flow_acc": str(flow_accum),
"streams": str(stream_raster),
"slope": str(slope_path),
"aspect": str(aspect_path)
}
import os
import osmnx as ox
import geopandas as gpd
import os
import osmnx as ox
import geopandas as gpd
import pandas as pd
from datetime import datetime
def fetch_osm_infrastructure(place: str, save_path: str):
"""
Fetch roads, buildings, schools, hospitals from OSM and save as one GeoJSON.
Parameters:
- place: str β€” e.g., "Bangalore, India"
- save_path: str β€” Output GeoJSON path
Returns: Combined GeoDataFrame
"""
start = datetime.now()
print(f"πŸ” Fetching combined OSM infrastructure for: {place}")
os.makedirs(os.path.dirname(save_path), exist_ok=True)
all_gdfs = []
feature_tags = {
"roads": {"highway": True},
"buildings": {"building": True},
"schools": {"amenity": "school"},
"hospitals": {"amenity": "hospital"}
}
for name, tags in feature_tags.items():
print(f"➑️ Fetching {name}")
try:
gdf = ox.features_from_place(place, tags=tags)
gdf["feature_type"] = name # add a column to indicate the type
all_gdfs.append(gdf)
except Exception as e:
print(f"⚠️ Failed to fetch {name}: {e}")
if not all_gdfs:
raise RuntimeError("No OSM data was fetched.")
combined_gdf = pd.concat(all_gdfs, ignore_index=True)
combined_gdf = gpd.GeoDataFrame(combined_gdf, geometry="geometry", crs="EPSG:4326")
combined_gdf.to_file(save_path, driver="GeoJSON")
print(f"βœ… Combined GeoJSON saved to: {save_path}")
end = datetime.now()
print((end-start)*1000)
return combined_gdf
def get_osm_infrastructure(state):
base_dir = os.path.join(state.working_directory, "OSM_infrastructure")
osm = fetch_osm_infrastructure(
state.place_name,
os.path.join(base_dir, "OSM.geojson")
)
import osmnx as ox
import geopandas as gpd
import pandas as pd
def tidal_risk_from_osm(place, buffer_dist=1000, output_geojson="tidal_risk_osm.geojson"):
print(f"🌍 Fetching OSM water + coastline for {place}")
# 1. Get coastlines and water
coast = ox.features_from_place(place, tags={"natural": "coastline"})
water = ox.features_from_place(place, tags={"natural": "water"})
# 2. Combine and buffer
coast = coast.to_crs("EPSG:3857")
water = water.to_crs("EPSG:3857")
combined = gpd.GeoDataFrame(pd.concat([coast, water], ignore_index=True), crs=coast.crs)
print(f"🧱 Found {len(combined)} features. Buffering...")
risk_zone = combined.buffer(buffer_dist)
risk_gdf = gpd.GeoDataFrame(geometry=risk_zone, crs="EPSG:3857").dissolve()
risk_gdf = risk_gdf.to_crs("EPSG:4326")
# 3. Save as GeoJSON
risk_gdf.to_file(output_geojson, driver="GeoJSON")
print(f"βœ… Saved Tidal Risk GeoJSON: {output_geojson}")
return output_geojson
import os
import numpy as np
import rasterio
from rasterio.transform import from_bounds
from rasterio.crs import CRS
import osmnx as ox
import geopandas as gpd
from shapely.geometry import box
from scipy.ndimage import distance_transform_edt
def get_healthcare_data(bbox, tags):
minx, miny, maxx, maxy = bbox
polygon = box(minx, miny, maxx, maxy)
# Fixed: Use features_from_polygon instead of geometries_from_polygon
gdf = ox.features_from_polygon(polygon, tags=tags)
gdf = gdf.to_crs("EPSG:4326")
gdf["geometry"] = gdf.centroid
return gdf
def rasterize_healthcare_points(bbox, points_gdf, pixel_size=0.0005):
"""Rasterize healthcare points over a bounding box."""
minx, miny, maxx, maxy = bbox
width = int((maxx - minx) / pixel_size)
height = int((maxy - miny) / pixel_size)
transform = from_bounds(minx, miny, maxx, maxy, width, height)
raster = np.zeros((height, width), dtype=np.uint8)
for point in points_gdf.geometry:
col, row = ~transform * (point.x, point.y)
col, row = int(col), int(row)
if 0 <= row < height and 0 <= col < width:
raster[row, col] = 1
return raster, transform
def compute_distance_transform(binary_raster, pixel_size_deg):
"""Compute Euclidean distance in meters from healthcare locations."""
binary_mask = (binary_raster == 0).astype(np.uint8)
distance_pixels = distance_transform_edt(binary_mask)
distance_meters = distance_pixels * (111000 * pixel_size_deg)
return distance_meters
def save_distance_raster(distance_raster, transform, output_path, crs="EPSG:4326"):
"""Save distance raster to GeoTIFF."""
with rasterio.open(
output_path,
"w",
driver="GTiff",
height=distance_raster.shape[0],
width=distance_raster.shape[1],
count=1,
dtype=distance_raster.dtype,
crs=CRS.from_string(crs),
transform=transform,
) as dst:
dst.write(distance_raster, 1)
def generate_distance_to_healthcare(bbox, output_path="distance_to_healthcare.tif"):
"""
Complete tool to generate distance raster to healthcare facilities.
Parameters:
- bbox: [minx, miny, maxx, maxy] for the area of interest
- output_path: output GeoTIFF path
"""
print("πŸ” Fetching healthcare data from OpenStreetMap...")
tags = {"amenity": ["hospital", "clinic", "doctors", "pharmacy"]}
healthcare_gdf = get_healthcare_data(bbox, tags)
print(f"πŸ—Ί Rasterizing {len(healthcare_gdf)} healthcare points...")
pixel_size = 0.0005
binary_raster, transform = rasterize_healthcare_points(bbox, healthcare_gdf, pixel_size)
print("πŸ“ Computing distance transform...")
distance_raster = compute_distance_transform(binary_raster, pixel_size)
print(f"πŸ’Ύ Saving to {output_path}...")
save_distance_raster(distance_raster, transform, output_path)
print("βœ… Done! Distance raster generated.")
import os
import geopandas as gpd
import rasterio
import matplotlib.pyplot as plt
from rasterio.plot import show
from shapely.geometry import box
import contextily as ctx
def visualize_geospatial_file(file_path: str, output_path: str = "output_map.png"):
"""
Visualizes raster or vector geospatial files and saves the output as an image.
Args:
file_path (str): Path to the GeoTIFF (.tif), GeoJSON, Shapefile, etc.
output_path (str): Path to save the output image (.png)
"""
ext = os.path.splitext(file_path)[1].lower()
if ext in [".tif", ".tiff"]:
with rasterio.open(file_path) as src:
fig, ax = plt.subplots(figsize=(10, 10))
show(src, ax=ax, title="Raster Preview")
ax.set_axis_off()
plt.plot()
return output_path
elif ext in [".geojson", ".shp", ".gpkg"]:
gdf = gpd.read_file(file_path)
fig, ax = plt.subplots(figsize=(10, 10))
gdf.plot(ax=ax, edgecolor='black', linewidth=0.8, alpha=0.6, color='orange')
# Add basemap if projection is set
if gdf.crs and gdf.crs.to_epsg() == 4326:
gdf = gdf.to_crs(epsg=3857)
ctx.add_basemap(ax, source=ctx.providers.Stamen.TonerLite)
ax.set_title("Vector Preview")
ax.set_axis_off()
plt.plot()
return output_path
else:
raise ValueError(f"Unsupported file type: {ext}")
import osmnx as ox
import geopandas as gpd
from shapely.geometry import box
import numpy as np
import rasterio
from rasterio.transform import from_bounds
from rasterio.crs import CRS
from scipy.ndimage import distance_transform_edt
def get_infrastructure_gdf(bbox, tags):
"""Fetch infrastructure data using OSM."""
ox.settings.overpass_endpoint = "https://overpass.kumi.systems/api/interpreter"
ox.settings.timeout = 60
polygon = box(*bbox)
gdf = ox.features_from_polygon(polygon, tags=tags)
gdf = gdf.to_crs("EPSG:4326")
gdf["geometry"] = gdf.centroid
return gdf
def rasterize_points(gdf, bbox, pixel_size=0.0005):
minx, miny, maxx, maxy = bbox
width = int((maxx - minx) / pixel_size)
height = int((maxy - miny) / pixel_size)
transform = from_bounds(minx, miny, maxx, maxy, width, height)
raster = np.zeros((height, width), dtype=np.uint8)
for point in gdf.geometry:
col, row = ~transform * (point.x, point.y)
col, row = int(col), int(row)
if 0 <= row < height and 0 <= col < width:
raster[row, col] = 1
return raster, transform
def save_raster(raster, transform, output_path, crs="EPSG:4326"):
with rasterio.open(
output_path,
"w",
driver="GTiff",
height=raster.shape[0],
width=raster.shape[1],
count=1,
dtype=raster.dtype,
crs=CRS.from_string(crs),
transform=transform,
) as dst:
dst.write(raster, 1)
def generate_infrastructure_tif(bbox, output_path="infrastructure.tif", pixel_size=0.0005, distance=False):
"""
Generate a binary or distance-based infrastructure raster.
"""
# Define infrastructure tags to fetch
tags = {
"highway": True,
"building": True,
"bridge": True,
"railway": True
}
print("πŸ” Fetching infrastructure data...")
gdf = get_infrastructure_gdf(bbox, tags)
print(f"πŸ—Ί Rasterizing {len(gdf)} points...")
raster, transform = rasterize_points(gdf, bbox, pixel_size)
if distance:
print("πŸ“ Computing distance transform...")
mask = (raster == 0).astype(np.uint8)
raster = distance_transform_edt(mask) * (111000 * pixel_size) # meters
print(f"πŸ’Ύ Saving raster to {output_path}...")
save_raster(raster, transform, output_path)
print("βœ… Done.")
def get_infrastructure(state:State):
generate_infrastructure_tif(state.bbox)