our-era / core /engine.py
msradam's picture
Add full deployment package for HF Spaces
1a5769f
"""
Routing engine for the Emergency Routing Assistant.
This module contains all routing logic, data loading, and processing.
The frontend (app.py) should only handle presentation.
Features inspired by dream-meridian:
- igraph backend for high-performance routing (optional)
- Isochrone generation (reachable area within X minutes)
- Find along route (discover POIs along a computed route)
"""
import os
import re
import json
import networkx as nx
import pandas as pd
import geopandas as gpd
import osmnx as ox
import requests
from typing import Any
from dataclasses import dataclass, field
from scipy.spatial import cKDTree
import numpy as np
# Optional igraph support for high-performance routing
try:
import igraph as ig
HAS_IGRAPH = True
except ImportError:
HAS_IGRAPH = False
# =============================================================================
# Constants
# =============================================================================
WALK_SPEED_M_PER_MIN = 75 # ~4.5 km/h
ELEVATION_PENALTY_FACTOR = 3.0
BROWNSVILLE_CENTER = {"lat": 40.6594, "lon": -73.9126}
BROWNSVILLE_BOUNDS = {
"min_lat": 40.64,
"max_lat": 40.68,
"min_lon": -73.93,
"max_lon": -73.89
}
VALID_RESOURCE_TYPES = [
"pharmacy", "clinic", "hospital", "fire_station", "police",
"school", "library", "community_centre", "place_of_worship"
]
# Route colors for display
ROUTE_COLORS = {
"shortest": "#3b82f6", # Blue
"flattest": "#22c55e", # Green
"balanced": "#f59e0b", # Amber
"safest": "#10b981", # Emerald (climate-safe route)
}
# Isochrone colors by time
ISOCHRONE_COLORS = {
5: "#22c55e", # Green - 5 min
10: "#84cc16", # Lime - 10 min
15: "#f59e0b", # Amber - 15 min
20: "#ef4444", # Red - 20 min
}
# =============================================================================
# POI Marker Styles
# =============================================================================
# This configuration allows UI developers to customize markers by POI type.
# Each entry defines: color, icon, and optional display properties.
#
# Icon options for Folium:
# - Font Awesome icons (prefix "fa"): "fa-hospital", "fa-fire", etc.
# - Glyphicons (prefix "glyphicon"): "glyphicon-home", etc.
# - Bootstrap icons: "heart", "star", "flag", etc.
#
# For custom SVG/image markers, UI devs can extend render_map() to use
# folium.CustomIcon or folium.DivIcon with the 'custom_icon_url' field.
POI_MARKER_STYLES = {
# Emergency Services
"hospital": {
"color": "#dc2626", # Red-600
"fill_color": "#fecaca", # Red-200
"icon": "fa-hospital",
"icon_prefix": "fa",
"radius": 8,
"category": "Emergency Service",
},
"clinic": {
"color": "#ef4444", # Red-500
"fill_color": "#fee2e2", # Red-100
"icon": "fa-stethoscope",
"icon_prefix": "fa",
"radius": 7,
"category": "Emergency Service",
},
"fire_station": {
"color": "#ea580c", # Orange-600
"fill_color": "#ffedd5", # Orange-100
"icon": "fa-fire-extinguisher",
"icon_prefix": "fa",
"radius": 8,
"category": "Emergency Service",
},
"police": {
"color": "#1d4ed8", # Blue-700
"fill_color": "#dbeafe", # Blue-100
"icon": "fa-shield",
"icon_prefix": "fa",
"radius": 8,
"category": "Emergency Service",
},
# Healthcare
"pharmacy": {
"color": "#16a34a", # Green-600
"fill_color": "#dcfce7", # Green-100
"icon": "fa-medkit",
"icon_prefix": "fa",
"radius": 6,
"category": "Healthcare",
},
"doctors": {
"color": "#22c55e", # Green-500
"fill_color": "#bbf7d0", # Green-200
"icon": "fa-user-md",
"icon_prefix": "fa",
"radius": 6,
"category": "Healthcare",
},
# Community Resources
"school": {
"color": "#7c3aed", # Violet-600
"fill_color": "#ede9fe", # Violet-100
"icon": "fa-graduation-cap",
"icon_prefix": "fa",
"radius": 7,
"category": "Community Resource",
},
"library": {
"color": "#8b5cf6", # Violet-500
"fill_color": "#f3e8ff", # Purple-100
"icon": "fa-book",
"icon_prefix": "fa",
"radius": 6,
"category": "Community Resource",
},
"community_centre": {
"color": "#6366f1", # Indigo-500
"fill_color": "#e0e7ff", # Indigo-100
"icon": "fa-users",
"icon_prefix": "fa",
"radius": 7,
"category": "Community Resource",
},
"place_of_worship": {
"color": "#a855f7", # Purple-500
"fill_color": "#f3e8ff", # Purple-100
"icon": "fa-church",
"icon_prefix": "fa",
"radius": 6,
"category": "Community Resource",
},
"youth_center": {
"color": "#ec4899", # Pink-500
"fill_color": "#fce7f3", # Pink-100
"icon": "fa-child",
"icon_prefix": "fa",
"radius": 6,
"category": "Community Resource",
},
"senior_center": {
"color": "#f97316", # Orange-500
"fill_color": "#ffedd5", # Orange-100
"icon": "fa-heart",
"icon_prefix": "fa",
"radius": 6,
"category": "Community Resource",
},
"childcare": {
"color": "#f472b6", # Pink-400
"fill_color": "#fbcfe8", # Pink-200
"icon": "fa-child",
"icon_prefix": "fa",
"radius": 5,
"category": "Community Resource",
},
"nycha_community_center": {
"color": "#0ea5e9", # Sky-500
"fill_color": "#e0f2fe", # Sky-100
"icon": "fa-building",
"icon_prefix": "fa",
"radius": 7,
"category": "Community Resource",
},
# Climate Infrastructure
"park": {
"color": "#16a34a", # Green-600
"fill_color": "#bbf7d0", # Green-200
"icon": "fa-tree",
"icon_prefix": "fa",
"radius": 6,
"category": "Climate Infrastructure",
},
"shelter": {
"color": "#0284c7", # Sky-600
"fill_color": "#bae6fd", # Sky-200
"icon": "fa-home",
"icon_prefix": "fa",
"radius": 8,
"category": "Climate Infrastructure",
},
"drinking_water": {
"color": "#0891b2", # Cyan-600
"fill_color": "#cffafe", # Cyan-100
"icon": "fa-tint",
"icon_prefix": "fa",
"radius": 5,
"category": "Climate Infrastructure",
},
# Local Business
"bodega": {
"color": "#f59e0b", # Amber-500
"fill_color": "#fef3c7", # Amber-100
"icon": "fa-shopping-basket",
"icon_prefix": "fa",
"radius": 5,
"category": "Local Business",
},
"supermarket": {
"color": "#d97706", # Amber-600
"fill_color": "#fef3c7", # Amber-100
"icon": "fa-shopping-cart",
"icon_prefix": "fa",
"radius": 6,
"category": "Local Business",
},
"fast_food": {
"color": "#fbbf24", # Amber-400
"fill_color": "#fef9c3", # Yellow-100
"icon": "fa-cutlery",
"icon_prefix": "fa",
"radius": 5,
"category": "Local Business",
},
"cafe": {
"color": "#92400e", # Amber-800
"fill_color": "#fef3c7", # Amber-100
"icon": "fa-coffee",
"icon_prefix": "fa",
"radius": 5,
"category": "Local Business",
},
"bank": {
"color": "#475569", # Slate-600
"fill_color": "#e2e8f0", # Slate-200
"icon": "fa-university",
"icon_prefix": "fa",
"radius": 5,
"category": "Local Business",
},
# Social Services
"social_facility": {
"color": "#0d9488", # Teal-600
"fill_color": "#ccfbf1", # Teal-100
"icon": "fa-handshake-o",
"icon_prefix": "fa",
"radius": 6,
"category": "Social Services",
},
"snap_center": {
"color": "#14b8a6", # Teal-500
"fill_color": "#ccfbf1", # Teal-100
"icon": "fa-id-card",
"icon_prefix": "fa",
"radius": 6,
"category": "Social Services",
},
# Default / Fallback for unknown types
"facility": {
"color": "#6b7280", # Gray-500
"fill_color": "#e5e7eb", # Gray-200
"icon": "fa-building-o",
"icon_prefix": "fa",
"radius": 5,
"category": "Other",
},
}
# Default style for POI types not explicitly defined
POI_DEFAULT_STYLE = {
"color": "#6b7280", # Gray-500
"fill_color": "#e5e7eb", # Gray-200
"icon": "fa-map-marker",
"icon_prefix": "fa",
"radius": 5,
"category": "Other",
}
def get_poi_marker_style(poi_type: str) -> dict:
"""
Get marker style configuration for a given POI type.
Args:
poi_type: The type of POI (e.g., "hospital", "pharmacy", "school")
Returns:
Dictionary with marker style properties:
- color: Border/stroke color (hex)
- fill_color: Fill color (hex)
- icon: Font Awesome or Glyphicon name
- icon_prefix: Icon library prefix ("fa" or "glyphicon")
- radius: Circle marker radius in pixels
- category: POI category for grouping
Example:
>>> style = get_poi_marker_style("hospital")
>>> style["color"]
'#dc2626'
>>> style["icon"]
'fa-hospital'
"""
return POI_MARKER_STYLES.get(poi_type, POI_DEFAULT_STYLE).copy()
def get_all_poi_styles() -> dict:
"""
Get all POI marker style configurations.
Returns:
Dictionary mapping POI type names to their style configurations.
Useful for UI developers to enumerate all available styles.
Example:
>>> styles = get_all_poi_styles()
>>> list(styles.keys())
['hospital', 'clinic', 'fire_station', ...]
"""
return POI_MARKER_STYLES.copy()
def get_poi_styles_by_category() -> dict[str, list[str]]:
"""
Get POI types grouped by category.
Returns:
Dictionary mapping category names to lists of POI types.
Example:
>>> by_cat = get_poi_styles_by_category()
>>> by_cat["Emergency Service"]
['hospital', 'clinic', 'fire_station', 'police']
"""
categories: dict[str, list[str]] = {}
for poi_type, style in POI_MARKER_STYLES.items():
cat = style.get("category", "Other")
if cat not in categories:
categories[cat] = []
categories[cat].append(poi_type)
return categories
# =============================================================================
# Data Classes
# =============================================================================
@dataclass
class RouteMetrics:
elevation_gain_m: float = 0
elevation_loss_m: float = 0
max_elevation_m: float = 0
min_elevation_m: float = 0
avg_grade_pct: float = 0
max_grade_pct: float = 0
difficulty: str = "flat"
def to_dict(self) -> dict:
return {
"elevation_gain_m": self.elevation_gain_m,
"elevation_loss_m": self.elevation_loss_m,
"max_elevation_m": self.max_elevation_m,
"min_elevation_m": self.min_elevation_m,
"avg_grade_pct": self.avg_grade_pct,
"max_grade_pct": self.max_grade_pct,
"difficulty": self.difficulty,
}
@dataclass
class ClimateMetrics:
"""Climate risk metrics for a route.
Climate weights are now computed at RUNTIME using configurable parameters,
allowing the LLM to adjust weights based on user context (e.g., flooding,
heat wave, air quality concerns).
"""
avg_climate_risk: float = 0
max_climate_risk: float = 0
avg_flood_risk: float = 0
max_flood_risk: float = 0
avg_heat_risk: float = 0
max_heat_risk: float = 0
avg_air_quality_risk: float = 0
max_air_quality_risk: float = 0
avg_tree_coverage: float = 0
flood_exposure_m: float = 0 # meters of route where flood_risk > 0.3
def to_dict(self) -> dict:
return {
"avg_climate_risk": self.avg_climate_risk,
"max_climate_risk": self.max_climate_risk,
"avg_flood_risk": self.avg_flood_risk,
"max_flood_risk": self.max_flood_risk,
"avg_heat_risk": self.avg_heat_risk,
"max_heat_risk": self.max_heat_risk,
"avg_air_quality_risk": self.avg_air_quality_risk,
"max_air_quality_risk": self.max_air_quality_risk,
"avg_tree_coverage": self.avg_tree_coverage,
"flood_exposure_m": self.flood_exposure_m,
}
@dataclass
class ClimateWeightParams:
"""Parameters for runtime climate weight calculation.
These parameters are inferred by the LLM based on user context:
- Flooding mentioned: increase flood penalties
- Hot day / shade requested: increase heat_factor and shade_factor
- Respiratory concerns: increase aqi_factor
- Mobility concerns / elderly: increase grade_factor
"""
flood_penalty_deep: float = 5.0 # Multiplier for deep flood zones (flood_risk >= 1.0)
flood_penalty_shallow: float = 2.0 # Multiplier for shallow flood zones (flood_risk >= 0.6)
heat_factor: float = 0.3 # Penalty for high heat vulnerability (0.0-1.0)
shade_factor: float = 0.3 # Benefit from tree coverage (0.0-1.0)
aqi_factor: float = 0.1 # Penalty for poor air quality (0.0-1.0)
grade_factor: float = 0.2 # Penalty for steep grades (0.0-1.0)
def to_dict(self) -> dict:
return {
"flood_penalty_deep": self.flood_penalty_deep,
"flood_penalty_shallow": self.flood_penalty_shallow,
"heat_factor": self.heat_factor,
"shade_factor": self.shade_factor,
"aqi_factor": self.aqi_factor,
"grade_factor": self.grade_factor,
}
@dataclass
class RouteOption:
name: str
label: str
color: str
coords: list[tuple[float, float]]
distance_m: float
time_min: float
metrics: RouteMetrics
route_nodes: list[int] = field(default_factory=list)
climate_metrics: ClimateMetrics = field(default_factory=ClimateMetrics)
def to_dict(self) -> dict:
result = {
"name": self.name,
"label": self.label,
"color": self.color,
"coords": self.coords,
"distance_meters": self.distance_m,
"walking_time_minutes": self.time_min,
"route_metrics": self.metrics.to_dict(),
}
# Include climate metrics if they have data
if self.climate_metrics.avg_climate_risk > 0:
result["climate_metrics"] = self.climate_metrics.to_dict()
return result
@dataclass
class RoutingResult:
success: bool
recommended: str = ""
alternatives: list[RouteOption] = field(default_factory=list)
origin: tuple[float, float] = (0, 0)
destination: tuple[float, float] = (0, 0)
dest_name: str = ""
error: str = ""
def to_dict(self) -> dict:
if not self.success:
return {"error": self.error}
recommended_route = next(
(r for r in self.alternatives if r.name == self.recommended),
self.alternatives[0] if self.alternatives else None
)
return {
"success": True,
"recommended": self.recommended,
"alternatives": [r.to_dict() for r in self.alternatives],
"distance_meters": recommended_route.distance_m if recommended_route else 0,
"walking_time_minutes": recommended_route.time_min if recommended_route else 0,
"origin": {"lat": self.origin[0], "lon": self.origin[1]},
"destination": {"lat": self.destination[0], "lon": self.destination[1], "name": self.dest_name},
"route_metrics": recommended_route.metrics.to_dict() if recommended_route else {},
}
def to_map_data(self) -> dict | None:
if not self.success or not self.alternatives:
return None
recommended_route = next(
(r for r in self.alternatives if r.name == self.recommended),
self.alternatives[0]
)
return {
"routes": [
{"coords": r.coords, "color": r.color, "label": r.label, "name": r.name}
for r in self.alternatives
],
"origin": list(self.origin),
"destination": list(self.destination),
"dest_name": self.dest_name,
"distance": recommended_route.distance_m,
"route_coords": recommended_route.coords, # backwards compat
}
@dataclass
class IsochroneResult:
"""Result of isochrone generation - areas reachable within time limits."""
success: bool
origin: tuple[float, float] = (0, 0)
isochrones: list[dict] = field(default_factory=list) # [{time_min, polygon_coords, color}]
resources_within: list[dict] = field(default_factory=list) # Resources within max isochrone
error: str = ""
def to_dict(self) -> dict:
if not self.success:
return {"error": self.error}
return {
"success": True,
"origin": {"lat": self.origin[0], "lon": self.origin[1]},
"isochrones": self.isochrones,
"resources_within": self.resources_within,
}
def to_map_data(self) -> dict | None:
if not self.success:
return None
return {
"origin": list(self.origin),
"isochrones": self.isochrones,
"resources_within": self.resources_within,
}
@dataclass
class AlongRouteResult:
"""Result of find_along_route - POIs discovered along a route."""
success: bool
route_coords: list[tuple[float, float]] = field(default_factory=list)
pois_found: list[dict] = field(default_factory=list)
origin: tuple[float, float] = (0, 0)
destination: tuple[float, float] = (0, 0)
buffer_meters: float = 100
climate_metrics: ClimateMetrics | None = None
error: str = ""
def to_dict(self) -> dict:
if not self.success:
return {"error": self.error}
result = {
"success": True,
"origin": {"lat": self.origin[0], "lon": self.origin[1]},
"destination": {"lat": self.destination[0], "lon": self.destination[1]},
"buffer_meters": self.buffer_meters,
"pois_found": self.pois_found,
"poi_count": len(self.pois_found),
}
if self.climate_metrics:
result["climate_metrics"] = self.climate_metrics.to_dict()
return result
def to_map_data(self) -> dict | None:
if not self.success:
return None
return {
"route_coords": self.route_coords,
"origin": list(self.origin),
"destination": list(self.destination),
"pois_along_route": self.pois_found,
}
@dataclass
class RouteComparisonResult:
"""Result of comparing shortest vs safest routes."""
success: bool
shortest: RouteOption | None = None
safest: RouteOption | None = None
origin: tuple[float, float] = (0, 0)
destination: tuple[float, float] = (0, 0)
dest_name: str = ""
extra_distance_m: float = 0
extra_distance_pct: float = 0
risk_reduction: float = 0
error: str = ""
def to_dict(self) -> dict:
if not self.success:
return {"error": self.error}
return {
"success": True,
"shortest": self.shortest.to_dict() if self.shortest else None,
"safest": self.safest.to_dict() if self.safest else None,
"origin": {"lat": self.origin[0], "lon": self.origin[1]},
"destination": {"lat": self.destination[0], "lon": self.destination[1], "name": self.dest_name},
"comparison": {
"extra_distance_m": round(self.extra_distance_m, 1),
"extra_distance_pct": round(self.extra_distance_pct, 1),
"risk_reduction": round(self.risk_reduction, 3),
},
}
def to_map_data(self) -> dict | None:
if not self.success:
return None
routes = []
if self.shortest:
routes.append({
"coords": self.shortest.coords,
"color": self.shortest.color,
"label": self.shortest.label,
"name": self.shortest.name,
})
if self.safest and self.safest.coords != (self.shortest.coords if self.shortest else []):
routes.append({
"coords": self.safest.coords,
"color": self.safest.color,
"label": self.safest.label,
"name": self.safest.name,
})
return {
"routes": routes,
"origin": list(self.origin),
"destination": list(self.destination),
"dest_name": self.dest_name,
}
# =============================================================================
# Routing Engine
# =============================================================================
class RoutingEngine:
"""
Core routing engine - handles all graph operations and route computation.
Supports optional igraph backend for high-performance routing.
Features:
- Multi-route computation (shortest, flattest, balanced)
- Isochrone generation (reachable area within X minutes)
- Find along route (POIs near a route corridor)
"""
def __init__(self, use_igraph: bool = True):
self.G: nx.MultiDiGraph | None = None
self.resources_df: pd.DataFrame | None = None
self.known_places: dict[str, dict] = {}
self._loaded = False
# igraph backend (if available and requested)
self.use_igraph = use_igraph and HAS_IGRAPH
self.ig_graph: "ig.Graph | None" = None
self.ig_node_map: dict[int, int] = {} # NetworkX node -> igraph node
self.ig_reverse_map: dict[int, int] = {} # igraph node -> NetworkX node
# Edge attribute indices for igraph (for climate routing)
self.ig_length_idx: int = -1
self.ig_climate_weight_idx: int = -1
# Track if climate data is available
self.has_climate_data: bool = False
# Spatial index for fast nearest-neighbor queries
self._node_coords: np.ndarray | None = None
self._node_ids: list[int] = []
self._kdtree: cKDTree | None = None
self._resource_kdtree: cKDTree | None = None
def load(self) -> bool:
"""Load network and resources from disk.
Tries to load from pre-built cache first (fast), falls back to GraphML (slow).
Run `python build_graph_cache.py` to generate the cache for faster startup.
"""
if self._loaded:
return True
# Data is in project root's data/ directory, not core/data/
data_dir = os.path.join(os.path.dirname(__file__), "..", "data", "brownsville")
try:
# Try loading from cache first (much faster!)
cache_path = os.path.join(data_dir, "graph_cache.pkl")
if os.path.exists(cache_path):
loaded_from_cache = self._load_from_cache(cache_path)
if loaded_from_cache:
# Cache loaded successfully, skip GraphML parsing
pass
else:
# Cache failed, fall back to GraphML
self._load_from_graphml(data_dir)
else:
# No cache, load from GraphML
self._load_from_graphml(data_dir)
# Load resources
resources_path = os.path.join(data_dir, "all_resources.csv")
if os.path.exists(resources_path):
self.resources_df = pd.read_csv(resources_path)
else:
geojson_path = os.path.join(data_dir, "all_resources.geojson")
if os.path.exists(geojson_path):
gdf = gpd.read_file(geojson_path)
self.resources_df = pd.DataFrame({
"name": gdf["name"],
"type": gdf["type"],
"category": gdf["category"],
"lat": gdf.geometry.y,
"lon": gdf.geometry.x
})
# Build resource spatial index
if self.resources_df is not None:
self._build_resource_index()
# Load places for geocoding
self._load_known_places(data_dir)
self._loaded = True
return True
except Exception as e:
print(f"Error loading data: {e}")
return False
def _load_from_cache(self, cache_path: str) -> bool:
"""Load pre-built graph data from pickle cache (fast startup)."""
import pickle
try:
with open(cache_path, 'rb') as f:
cache = pickle.load(f)
# Check cache version (v3 added air_quality_risk)
if cache.get("version", 1) < 3:
print("Cache version outdated, rebuilding from GraphML...")
return False
# Restore all cached data
self.G = cache["nx_graph"]
self.has_climate_data = cache["has_climate_data"]
self._node_ids = cache["node_ids"]
self._node_coords = cache["coords"]
self._kdtree = cache["kdtree"]
# Restore igraph if available
if self.use_igraph and cache.get("ig_graph") is not None:
self.ig_graph = cache["ig_graph"]
self.ig_node_map = cache["ig_node_map"]
self.ig_reverse_map = cache["ig_reverse_map"]
return True
except Exception as e:
print(f"Failed to load cache: {e}")
return False
def _load_from_graphml(self, data_dir: str):
"""Load graph from GraphML file (slow fallback)."""
# Load climate-enhanced network in priority order:
# 1. Real climate data (walking_network_final.graphml)
# 2. Mock climate data (walking_network_climate.graphml)
# 3. Raw network (walking_network.graphml)
final_path = os.path.join(data_dir, "walking_network_final.graphml")
climate_path = os.path.join(data_dir, "walking_network_climate.graphml")
graphml_path = os.path.join(data_dir, "walking_network.graphml")
if os.path.exists(final_path):
self.G = ox.load_graphml(final_path)
self.has_climate_data = True
elif os.path.exists(climate_path):
self.G = ox.load_graphml(climate_path)
self.has_climate_data = True
elif os.path.exists(graphml_path):
self.G = ox.load_graphml(graphml_path)
self.has_climate_data = False
else:
from shapely.geometry import box
brownsville_bbox = box(-73.93, 40.64, -73.89, 40.68)
self.G = ox.graph_from_polygon(brownsville_bbox, network_type='walk', simplify=True)
self.has_climate_data = False
# Convert climate attributes from strings to floats (GraphML stores all as strings)
if self.has_climate_data:
climate_attrs = ['flood_risk', 'heat_risk', 'air_quality_risk', 'climate_risk', 'climate_weight']
for u, v, data in self.G.edges(data=True):
for attr in climate_attrs:
if attr in data and isinstance(data[attr], str):
try:
data[attr] = float(data[attr])
except (ValueError, TypeError):
data[attr] = 0.0
# Verify climate data by checking first edge
if self.has_climate_data:
sample_edge = next(iter(self.G.edges(data=True)), None)
if sample_edge and "climate_weight" not in sample_edge[2]:
self.has_climate_data = False
self.G = ox.project_graph(self.G)
# Build spatial index for fast nearest-node queries
self._build_spatial_index()
# Build igraph graph if available
if self.use_igraph:
self._build_igraph_graph()
def _build_spatial_index(self):
"""Build KD-tree for fast nearest-node lookups."""
if self.G is None:
return
nodes = list(self.G.nodes())
coords = []
for node in nodes:
x = self.G.nodes[node].get("x", 0)
y = self.G.nodes[node].get("y", 0)
coords.append([x, y])
self._node_ids = nodes
self._node_coords = np.array(coords)
self._kdtree = cKDTree(self._node_coords)
def _build_resource_index(self):
"""Build KD-tree for fast resource lookups."""
if self.resources_df is None or len(self.resources_df) == 0:
return
# Convert lat/lon to projected coordinates for consistency
coords = []
if "crs" in self.G.graph and self.G.graph["crs"] != "EPSG:4326":
import pyproj
transformer = pyproj.Transformer.from_crs("EPSG:4326", self.G.graph["crs"], always_xy=True)
for _, row in self.resources_df.iterrows():
x, y = transformer.transform(row["lon"], row["lat"])
coords.append([x, y])
else:
for _, row in self.resources_df.iterrows():
coords.append([row["lon"], row["lat"]])
self._resource_kdtree = cKDTree(np.array(coords))
def _build_igraph_graph(self):
"""Build igraph graph from NetworkX graph for high-performance routing.
Preserves all edge attributes including climate data.
Uses ig.Graph.from_networkx() for automatic attribute transfer.
"""
if not HAS_IGRAPH or self.G is None:
return
# Create node mapping (NetworkX uses arbitrary IDs, igraph uses 0..n-1)
nx_nodes = list(self.G.nodes())
self.ig_node_map = {nx_node: i for i, nx_node in enumerate(nx_nodes)}
self.ig_reverse_map = {i: nx_node for nx_node, i in self.ig_node_map.items()}
# Create igraph graph (directed)
self.ig_graph = ig.Graph(n=len(nx_nodes), directed=True)
# Store vertex osmids for reverse lookup
self.ig_graph.vs["_nx_name"] = nx_nodes
# Add edges with all attributes
edges = []
lengths = []
climate_weights = []
flood_risks = []
heat_risks = []
climate_risks = []
for u, v, data in self.G.edges(data=True):
ig_u = self.ig_node_map[u]
ig_v = self.ig_node_map[v]
edges.append((ig_u, ig_v))
length = data.get("length", 1.0)
if length is None:
length = 1.0
lengths.append(float(length))
# Climate attributes (default to safe values if missing)
flood_risks.append(float(data.get("flood_risk", 0) or 0))
heat_risks.append(float(data.get("heat_risk", 0) or 0))
climate_risks.append(float(data.get("climate_risk", 0) or 0))
# Climate weight for routing (default to length if missing)
cw = data.get("climate_weight")
if cw is None:
cw = length
climate_weights.append(float(cw))
self.ig_graph.add_edges(edges)
self.ig_graph.es["length"] = lengths
self.ig_graph.es["weight"] = lengths # Default weight is length
self.ig_graph.es["climate_weight"] = climate_weights
self.ig_graph.es["flood_risk"] = flood_risks
self.ig_graph.es["heat_risk"] = heat_risks
self.ig_graph.es["climate_risk"] = climate_risks
def _load_known_places(self, data_dir: str):
"""Load known places for geocoding."""
places_path = os.path.join(data_dir, "places.csv")
if os.path.exists(places_path):
try:
df = pd.read_csv(places_path)
for _, row in df.iterrows():
self.known_places[row['name_lower']] = {
"lat": row['lat'],
"lon": row['lon'],
"name": row['name']
}
except Exception:
pass
@property
def is_loaded(self) -> bool:
return self._loaded
@property
def node_count(self) -> int:
return len(self.G.nodes) if self.G else 0
@property
def resource_count(self) -> int:
return len(self.resources_df) if self.resources_df is not None else 0
# -------------------------------------------------------------------------
# Graph utilities
# -------------------------------------------------------------------------
def _get_nearest_node(self, lat: float, lon: float) -> int:
"""Find nearest network node to a point using KD-tree (fast)."""
# Convert to projected coordinates if needed
if "crs" in self.G.graph and self.G.graph["crs"] != "EPSG:4326":
import pyproj
transformer = pyproj.Transformer.from_crs("EPSG:4326", self.G.graph["crs"], always_xy=True)
x, y = transformer.transform(lon, lat)
else:
x, y = lon, lat
# Use KD-tree for O(log n) lookup
if self._kdtree is not None:
_, idx = self._kdtree.query([x, y])
return self._node_ids[idx]
# Fallback to OSMnx
return ox.nearest_nodes(self.G, x, y)
def _get_nearest_node_ig(self, lat: float, lon: float) -> int:
"""Get igraph node ID for a location."""
nx_node = self._get_nearest_node(lat, lon)
return self.ig_node_map.get(nx_node, 0)
def _get_route_coords(self, route: list[int]) -> list[tuple[float, float]]:
"""Extract lat/lon coordinates from route nodes."""
if "crs" in self.G.graph and self.G.graph["crs"] != "EPSG:4326":
import pyproj
transformer = pyproj.Transformer.from_crs(self.G.graph["crs"], "EPSG:4326", always_xy=True)
coords = []
for node in route:
x, y = self.G.nodes[node]["x"], self.G.nodes[node]["y"]
lon, lat = transformer.transform(x, y)
coords.append((lat, lon))
return coords
return [(self.G.nodes[node]["y"], self.G.nodes[node]["x"]) for node in route]
def _apply_elevation_weights(self, penalty_factor: float = ELEVATION_PENALTY_FACTOR) -> nx.MultiDiGraph:
"""Create graph copy with elevation-weighted edges."""
G_weighted = self.G.copy()
for u, v, key, data in G_weighted.edges(keys=True, data=True):
base_length = data.get("length", 1)
elev_u = G_weighted.nodes[u].get("elevation", 0) or 0
elev_v = G_weighted.nodes[v].get("elevation", 0) or 0
elev_diff = elev_v - elev_u
if elev_diff > 0:
data["weighted_length"] = base_length + (elev_diff * penalty_factor)
else:
data["weighted_length"] = base_length
return G_weighted
def _compute_edge_weight(
self,
data: dict,
params: ClimateWeightParams = None
) -> float:
"""
Compute routing weight for an edge using raw risk values and configurable penalties.
This is computed at RUNTIME, allowing the LLM to adjust weights based on
what the user tells us about their situation (flooding, heat, air quality, mobility).
Design:
- Flood: categorical penalty (physical danger)
- Heat: continuous penalty based on HVI (social vulnerability / exposure proxy)
- Tree coverage: REDUCES weight (shade is beneficial)
- AQI: small continuous penalty (background air quality)
- Grade: continuous penalty for uphill segments (mobility / exertion)
Args:
data: Edge attribute dict
params: ClimateWeightParams with penalty/factor values
Returns:
float: Weighted length for Dijkstra's algorithm
"""
if params is None:
params = ClimateWeightParams()
length = float(data.get('length', 100))
# === FLOOD: Categorical penalty (physical danger) ===
flood_risk = float(data.get('flood_risk', 0.1))
if flood_risk >= 1.0:
flood_mult = params.flood_penalty_deep # Deep flooding: major penalty
elif flood_risk >= 0.6:
flood_mult = params.flood_penalty_shallow # Shallow flooding: moderate penalty
else:
flood_mult = 1.0 # No flooding: no penalty
# === HEAT: Continuous penalty based on HVI ===
heat_risk = float(data.get('heat_risk', 0.5))
# At heat_factor=0.3: HVI 1 (0.2) adds 6%, HVI 5 (1.0) adds 30%
heat_mult = 1.0 + params.heat_factor * heat_risk
# === TREE COVERAGE: Reduces weight (shade is good) ===
tree_coverage = float(data.get('tree_coverage', 0.0))
# At shade_factor=0.3: 0% trees = 1.0x, 100% trees = 0.7x
shade_mult = 1.0 - (params.shade_factor * tree_coverage)
# === AIR QUALITY: Small continuous penalty ===
aqi_risk = float(data.get('air_quality_risk', 0.0))
# At aqi_factor=0.1: worst AQI adds 10%
aqi_mult = 1.0 + params.aqi_factor * aqi_risk
# === GRADE: Continuous penalty for steep uphill segments ===
# Grade is stored as decimal (0.05 = 5% grade)
# Normalize: 10% grade (0.1) → grade_norm = 1.0
grade = abs(float(data.get('grade', 0) or 0))
grade_norm = min(grade * 10, 1.0) # Cap at 10% grade
# At grade_factor=0.2: flat = 1.0x, 10% grade = 1.2x
grade_mult = 1.0 + params.grade_factor * grade_norm
# Combine multiplicatively
return length * flood_mult * heat_mult * shade_mult * aqi_mult * grade_mult
def _create_weight_function(self, params: ClimateWeightParams = None):
"""Create a weight function for NetworkX routing with given climate parameters.
Note: For MultiDiGraph, NetworkX passes the edge data as {key: {attrs}} dict,
not the {attrs} dict directly. We extract the first edge's attributes.
"""
if params is None:
params = ClimateWeightParams()
def weight_func(u, v, data):
# For MultiDiGraph, data is {key: {attrs}} - extract first edge's attrs
if isinstance(data, dict) and data:
first_key = next(iter(data))
if isinstance(first_key, int) and isinstance(data[first_key], dict):
data = data[first_key]
return self._compute_edge_weight(data, params)
return weight_func
def _compute_route_metrics(self, route: list[int]) -> RouteMetrics:
"""Compute elevation metrics for a route."""
if not route or len(route) < 2:
return RouteMetrics()
elevations = []
grades = []
elevation_gain = 0
elevation_loss = 0
for i, node in enumerate(route):
elev = self.G.nodes[node].get("elevation", 0) or 0
elevations.append(elev)
if i > 0:
diff = elev - elevations[i-1]
if diff > 0:
elevation_gain += diff
else:
elevation_loss += abs(diff)
edge_data = self.G.get_edge_data(route[i-1], node)
if edge_data:
first_edge = list(edge_data.values())[0] if isinstance(edge_data, dict) else edge_data
grade = abs(first_edge.get("grade", 0)) * 100
grades.append(grade)
max_elev = max(elevations) if elevations else 0
min_elev = min(elevations) if elevations else 0
avg_grade = sum(grades) / len(grades) if grades else 0
max_grade = max(grades) if grades else 0
if elevation_gain < 5 and max_grade < 3:
difficulty = "flat"
elif elevation_gain < 15 or max_grade < 8:
difficulty = "moderate"
else:
difficulty = "hilly"
return RouteMetrics(
elevation_gain_m=round(elevation_gain, 1),
elevation_loss_m=round(elevation_loss, 1),
max_elevation_m=round(max_elev, 1),
min_elevation_m=round(min_elev, 1),
avg_grade_pct=round(avg_grade, 1),
max_grade_pct=round(max_grade, 1),
difficulty=difficulty
)
def _compute_climate_metrics(self, route: list[int]) -> ClimateMetrics:
"""Compute climate risk metrics for a route.
Climate risk is now computed at runtime using a simple weighted formula:
climate_risk = 0.5 * flood_risk + 0.3 * heat_risk + 0.2 * air_quality_risk
Tree coverage is tracked but used separately (reduces routing weight).
"""
if not route or len(route) < 2 or not self.has_climate_data:
return ClimateMetrics()
flood_risks = []
heat_risks = []
air_quality_risks = []
tree_coverages = []
climate_risks = []
flood_exposure = 0.0 # meters in flood risk > 0.3
for i in range(len(route) - 1):
u, v = route[i], route[i + 1]
edge_data = self.G.get_edge_data(u, v)
if not edge_data:
continue
# Get first edge (MultiDiGraph may have multiple)
if isinstance(edge_data, dict) and 0 in edge_data:
data = edge_data[0]
else:
data = next(iter(edge_data.values())) if isinstance(edge_data, dict) else edge_data
flood = float(data.get("flood_risk", 0) or 0)
heat = float(data.get("heat_risk", 0) or 0)
air_quality = float(data.get("air_quality_risk", 0) or 0)
tree_coverage = float(data.get("tree_coverage", 0) or 0)
length = float(data.get("length", 0) or 0)
# Compute combined climate risk at runtime
climate = 0.5 * flood + 0.3 * heat + 0.2 * air_quality
flood_risks.append(flood)
heat_risks.append(heat)
air_quality_risks.append(air_quality)
tree_coverages.append(tree_coverage)
climate_risks.append(climate)
if flood > 0.3:
flood_exposure += length
if not climate_risks:
return ClimateMetrics()
return ClimateMetrics(
avg_climate_risk=round(sum(climate_risks) / len(climate_risks), 3),
max_climate_risk=round(max(climate_risks), 3),
avg_flood_risk=round(sum(flood_risks) / len(flood_risks), 3),
max_flood_risk=round(max(flood_risks), 3),
avg_heat_risk=round(sum(heat_risks) / len(heat_risks), 3),
max_heat_risk=round(max(heat_risks), 3),
avg_air_quality_risk=round(sum(air_quality_risks) / len(air_quality_risks), 3),
max_air_quality_risk=round(max(air_quality_risks), 3),
avg_tree_coverage=round(sum(tree_coverages) / len(tree_coverages), 3),
flood_exposure_m=round(flood_exposure, 1),
)
# -------------------------------------------------------------------------
# Route computation
# -------------------------------------------------------------------------
def _compute_single_route(
self,
G: nx.MultiDiGraph,
origin_node: int,
dest_node: int,
weight_key: str = "length"
) -> RouteOption | None:
"""Compute a single route."""
try:
route = nx.shortest_path(G, origin_node, dest_node, weight=weight_key)
distance = sum(
self.G[u][v][0].get("length", 0) for u, v in zip(route[:-1], route[1:])
)
return RouteOption(
name="",
label="",
color="",
coords=self._get_route_coords(route),
distance_m=round(distance, 1),
time_min=round(distance / WALK_SPEED_M_PER_MIN, 1),
metrics=self._compute_route_metrics(route),
route_nodes=route,
climate_metrics=self._compute_climate_metrics(route),
)
except (nx.NetworkXNoPath, Exception):
return None
def _compute_single_route_igraph(
self,
origin_node: int,
dest_node: int,
weight_attr: str = "length"
) -> RouteOption | None:
"""Compute a single route using igraph for better performance."""
if not self.use_igraph or self.ig_graph is None:
return None
try:
ig_origin = self.ig_node_map.get(origin_node)
ig_dest = self.ig_node_map.get(dest_node)
if ig_origin is None or ig_dest is None:
return None
# Get shortest path using specified weight
path = self.ig_graph.get_shortest_paths(
ig_origin, to=ig_dest, weights=weight_attr, output="vpath"
)[0]
if not path:
return None
# Convert back to NetworkX node IDs
route = [self.ig_reverse_map[ig_node] for ig_node in path]
# Compute distance using original graph
distance = sum(
self.G[u][v][0].get("length", 0) for u, v in zip(route[:-1], route[1:])
)
return RouteOption(
name="",
label="",
color="",
coords=self._get_route_coords(route),
distance_m=round(distance, 1),
time_min=round(distance / WALK_SPEED_M_PER_MIN, 1),
metrics=self._compute_route_metrics(route),
route_nodes=route,
climate_metrics=self._compute_climate_metrics(route),
)
except Exception:
return None
def route(
self,
origin_coords: tuple[float, float],
dest_coords: tuple[float, float],
mode: str = "safest",
climate_params: ClimateWeightParams = None
) -> RouteOption | None:
"""
Compute a single route between two points with runtime climate weight calculation.
Climate weights are computed at RUNTIME using configurable parameters,
allowing the LLM to adjust based on user context (flooding, heat, asthma, etc.).
Args:
origin_coords: (lat, lon) tuple for origin
dest_coords: (lat, lon) tuple for destination
mode: 'safest' (use runtime climate weights) or 'fastest' (use length only)
climate_params: ClimateWeightParams for runtime weight calculation.
If None, uses default parameters.
Returns:
RouteOption or None if no path found
"""
if not self._loaded:
return None
origin_lat, origin_lon = origin_coords
dest_lat, dest_lon = dest_coords
try:
origin_node = self._get_nearest_node(origin_lat, origin_lon)
dest_node = self._get_nearest_node(dest_lat, dest_lon)
except Exception:
return None
# For 'fastest' mode, use simple length-based routing
if mode == "fastest" or mode == "fast":
result = self._compute_single_route(self.G, origin_node, dest_node, "length")
if result:
result.name = "fastest"
result.label = "Fastest"
result.color = ROUTE_COLORS.get("shortest", "#3b82f6")
return result
# For 'safest' mode, use runtime climate weight calculation
if climate_params is None:
climate_params = ClimateWeightParams()
# Use NetworkX with callable weight function for runtime calculation
weight_func = self._create_weight_function(climate_params)
result = self._compute_single_route_with_weight_func(
origin_node, dest_node, weight_func
)
if result:
result.name = "safest"
result.label = "Safest"
result.color = ROUTE_COLORS.get("safest", "#10b981")
return result
def _compute_single_route_with_weight_func(
self,
origin_node: int,
dest_node: int,
weight_func
) -> RouteOption | None:
"""Compute a single route using a callable weight function."""
try:
route = nx.shortest_path(self.G, origin_node, dest_node, weight=weight_func)
distance = sum(
self.G[u][v][0].get("length", 0) for u, v in zip(route[:-1], route[1:])
)
return RouteOption(
name="",
label="",
color="",
coords=self._get_route_coords(route),
distance_m=round(distance, 1),
time_min=round(distance / WALK_SPEED_M_PER_MIN, 1),
metrics=self._compute_route_metrics(route),
route_nodes=route,
climate_metrics=self._compute_climate_metrics(route),
)
except (nx.NetworkXNoPath, Exception):
return None
def compare_routes(
self,
origin_coords: tuple[float, float],
dest_coords: tuple[float, float],
dest_name: str = "Destination",
climate_params: ClimateWeightParams = None
) -> RouteComparisonResult:
"""
Compare shortest and safest routes between two points.
Returns both routes with comparison statistics showing the trade-off
between distance and climate risk.
Args:
origin_coords: (lat, lon) tuple for origin
dest_coords: (lat, lon) tuple for destination
dest_name: Optional name for the destination
climate_params: ClimateWeightParams for runtime weight calculation
Returns:
RouteComparisonResult with both routes and comparison stats
"""
if not self._loaded:
return RouteComparisonResult(success=False, error="Engine not loaded")
if not self.has_climate_data:
return RouteComparisonResult(
success=False,
error="Climate data not available. Load climate-enhanced network first."
)
origin_lat, origin_lon = origin_coords
dest_lat, dest_lon = dest_coords
# Compute both routes - fastest uses length, safest uses climate weights
fastest = self.route(origin_coords, dest_coords, mode="fastest")
safest = self.route(origin_coords, dest_coords, mode="safest", climate_params=climate_params)
if not fastest:
return RouteComparisonResult(success=False, error="No path found for fastest route")
if not safest:
return RouteComparisonResult(success=False, error="No path found for safest route")
# Calculate comparison statistics
extra_distance_m = safest.distance_m - fastest.distance_m
extra_distance_pct = (extra_distance_m / fastest.distance_m * 100) if fastest.distance_m > 0 else 0
# Risk reduction is the difference in average climate risk
risk_reduction = fastest.climate_metrics.avg_climate_risk - safest.climate_metrics.avg_climate_risk
return RouteComparisonResult(
success=True,
shortest=fastest,
safest=safest,
origin=(origin_lat, origin_lon),
destination=(dest_lat, dest_lon),
dest_name=dest_name,
extra_distance_m=extra_distance_m,
extra_distance_pct=extra_distance_pct,
risk_reduction=risk_reduction,
)
def compute_routes(
self,
origin_lat: float,
origin_lon: float,
dest_lat: float,
dest_lon: float,
dest_name: str = "Destination",
climate_params: ClimateWeightParams = None
) -> RoutingResult:
"""Compute multiple route alternatives between two points.
Climate weights are computed at RUNTIME using configurable parameters.
The safest route uses runtime weight calculation based on flood, heat,
tree coverage, and air quality factors.
Args:
origin_lat, origin_lon: Origin coordinates
dest_lat, dest_lon: Destination coordinates
dest_name: Name of destination
climate_params: ClimateWeightParams for runtime weight calculation
"""
if not self._loaded:
return RoutingResult(success=False, error="Engine not loaded")
try:
origin_node = self._get_nearest_node(origin_lat, origin_lon)
dest_node = self._get_nearest_node(dest_lat, dest_lon)
except Exception as e:
return RoutingResult(success=False, error=f"Could not locate points: {e}")
alternatives = []
# Shortest route (length only)
shortest = self._compute_single_route(self.G, origin_node, dest_node, "length")
if shortest:
shortest.name = "shortest"
shortest.label = "Shortest"
shortest.color = ROUTE_COLORS["shortest"]
alternatives.append(shortest)
# Flattest route
G_flat = self._apply_elevation_weights(penalty_factor=5.0)
flattest = self._compute_single_route(G_flat, origin_node, dest_node, "weighted_length")
if flattest and (not shortest or flattest.coords != shortest.coords):
flattest.name = "flattest"
flattest.label = "Flattest"
flattest.color = ROUTE_COLORS["flattest"]
alternatives.append(flattest)
# Balanced route
G_balanced = self._apply_elevation_weights(penalty_factor=2.0)
balanced = self._compute_single_route(G_balanced, origin_node, dest_node, "weighted_length")
if balanced:
existing_coords = [r.coords for r in alternatives]
if balanced.coords not in existing_coords:
balanced.name = "balanced"
balanced.label = "Balanced"
balanced.color = ROUTE_COLORS["balanced"]
alternatives.append(balanced)
# Safest route (climate-aware with runtime weight calculation)
if self.has_climate_data:
if climate_params is None:
climate_params = ClimateWeightParams()
weight_func = self._create_weight_function(climate_params)
safest = self._compute_single_route_with_weight_func(origin_node, dest_node, weight_func)
if safest:
existing_coords = [r.coords for r in alternatives]
if safest.coords not in existing_coords:
# Different route - add as separate option
safest.name = "safest"
safest.label = "Safest (Climate)"
safest.color = ROUTE_COLORS["safest"]
alternatives.append(safest)
else:
# Safest route has same path as an existing route (likely shortest)
# Update the existing route to show it's ALSO the safest option
for route in alternatives:
if route.coords == safest.coords:
if route.name == "shortest":
route.label = "Shortest & Safest (Climate)"
route.name = "safest" # Mark as safest for recommendation
break
if not alternatives:
return RoutingResult(success=False, error="No path found")
# CLIMATE-AWARE BY DEFAULT: Always recommend safest route when climate
# data is available to protect users from flood and heat exposure.
if self.has_climate_data and any(r.name == "safest" for r in alternatives):
recommended = "safest"
elif any(r.name == "flattest" for r in alternatives):
recommended = "flattest"
else:
recommended = "shortest"
return RoutingResult(
success=True,
recommended=recommended,
alternatives=alternatives,
origin=(origin_lat, origin_lon),
destination=(dest_lat, dest_lon),
dest_name=dest_name
)
def find_nearest_resource(
self,
resource_type: str,
origin_lat: float,
origin_lon: float,
prefer_safe: bool = True,
climate_params: ClimateWeightParams = None
) -> tuple[dict[str, Any], dict | None]:
"""
Find nearest resource of a given type with climate-aware route alternatives.
This function:
1. Finds the nearest resource by ROUTED distance (not crow-flies)
2. Computes multiple route alternatives (shortest, flattest, safest)
3. Recommends the safest route when climate data is available
Climate weights are computed at RUNTIME using configurable parameters,
allowing the LLM to adjust based on user context (flooding, heat, asthma, etc.).
Args:
resource_type: Type of resource to find
origin_lat: Origin latitude
origin_lon: Origin longitude
prefer_safe: If True and climate data available, prefer climate-safe routes
climate_params: ClimateWeightParams for runtime weight calculation
"""
if not self._loaded:
return {"error": "Engine not loaded"}, None
if self.resources_df is None:
return {"error": "Resources not loaded"}, None
candidates = self.resources_df[self.resources_df["type"] == resource_type]
if len(candidates) == 0:
return {"error": f"No resources of type '{resource_type}' found"}, None
try:
origin_node = self._get_nearest_node(origin_lat, origin_lon)
except Exception as e:
return {"error": f"Could not find origin: {e}"}, None
# Find the nearest resource by ROUTED distance (not crow-flies)
# Use simple length-based routing to find which resource is nearest
best_resource = None
best_distance = float("inf")
for _, row in candidates.iterrows():
try:
dest_node = self._get_nearest_node(row["lat"], row["lon"])
# Use actual path length to determine nearest
route_length = nx.shortest_path_length(
self.G, origin_node, dest_node, weight="length"
)
if route_length < best_distance:
best_distance = route_length
best_resource = row.to_dict()
except (nx.NetworkXNoPath, Exception):
continue
if best_resource is None:
return {"error": f"Could not find route to any {resource_type}"}, None
# Now compute route alternatives to the nearest resource
# This provides shortest, flattest, and safest (climate-aware) routes
routing_result = self.compute_routes(
origin_lat=origin_lat,
origin_lon=origin_lon,
dest_lat=best_resource["lat"],
dest_lon=best_resource["lon"],
dest_name=best_resource["name"],
climate_params=climate_params
)
if not routing_result.success:
return {"error": routing_result.error or "Could not compute routes"}, None
# Get the recommended route (safest when climate data available)
recommended_route = next(
(r for r in routing_result.alternatives if r.name == routing_result.recommended),
routing_result.alternatives[0] if routing_result.alternatives else None
)
if not recommended_route:
return {"error": "No route alternatives computed"}, None
# Build map data with all route alternatives
map_data = {
"routes": [
{
"coords": alt.coords,
"color": alt.color,
"label": alt.label,
"name": alt.name,
}
for alt in routing_result.alternatives
],
"origin": [origin_lat, origin_lon],
"destination": [best_resource["lat"], best_resource["lon"]],
"dest_name": best_resource["name"],
"recommended": routing_result.recommended,
}
# Build result with resource info and route alternatives
result = {
"found": True,
"name": best_resource["name"],
"type": best_resource["type"],
"category": best_resource.get("category", ""),
"lat": best_resource["lat"],
"lon": best_resource["lon"],
"origin": {"lat": origin_lat, "lon": origin_lon},
# Include all route alternatives
"alternatives": [alt.to_dict() for alt in routing_result.alternatives],
"recommended": routing_result.recommended,
# Recommended route metrics for backward compatibility
"distance_meters": round(recommended_route.distance_m, 1),
"walking_time_minutes": round(recommended_route.time_min, 1),
"route_metrics": recommended_route.metrics.to_dict() if recommended_route.metrics else {},
"climate_metrics": recommended_route.climate_metrics.to_dict() if recommended_route.climate_metrics else {},
"climate_aware": prefer_safe and self.has_climate_data,
}
return result, map_data
def list_resources(self, resource_type: str = "", category: str = "") -> dict[str, Any]:
"""List available resources with optional filtering."""
if self.resources_df is None:
return {"error": "Resources not loaded"}
df = self.resources_df.copy()
if category:
df = df[df["category"] == category]
if resource_type:
df = df[df["type"] == resource_type]
summary = df.groupby("type").agg({"name": "count", "category": "first"}).rename(
columns={"name": "count"}
).to_dict("index")
return {
"total_count": len(df),
"by_type": summary,
"resources": df[["name", "type", "category", "lat", "lon"]].to_dict("records")[:20]
}
# -------------------------------------------------------------------------
# Isochrone Generation (reachable area within X minutes)
# -------------------------------------------------------------------------
def generate_isochrone(
self,
origin_lat: float,
origin_lon: float,
time_limits: list[int] = None,
resource_types: list[str] = None
) -> IsochroneResult:
"""
Generate isochrones showing areas reachable within given time limits.
Uses igraph SSSP if available for better performance, otherwise NetworkX.
Args:
origin_lat: Origin latitude
origin_lon: Origin longitude
time_limits: List of time limits in minutes (default: [5, 10, 15])
resource_types: Optional filter for resources to include
Returns:
IsochroneResult with polygon boundaries and resources within reach
"""
if not self._loaded:
return IsochroneResult(success=False, error="Engine not loaded")
if time_limits is None:
time_limits = [5, 10, 15]
time_limits = sorted(time_limits)
max_time = max(time_limits)
max_distance = max_time * WALK_SPEED_M_PER_MIN
try:
origin_node = self._get_nearest_node(origin_lat, origin_lon)
except Exception as e:
return IsochroneResult(success=False, error=f"Could not locate origin: {e}")
# Compute distances from origin to all reachable nodes
if self.use_igraph and self.ig_graph is not None:
# Use igraph SSSP (faster for large graphs)
node_distances = self._compute_sssp_igraph(origin_node, max_distance)
else:
# Use NetworkX Dijkstra
node_distances = self._compute_sssp_networkx(origin_node, max_distance)
# Build isochrone polygons for each time limit
isochrones = []
for time_min in time_limits:
distance_limit = time_min * WALK_SPEED_M_PER_MIN
reachable_nodes = [n for n, d in node_distances.items() if d <= distance_limit]
if not reachable_nodes:
continue
# Get convex hull of reachable nodes
polygon_coords = self._nodes_to_polygon(reachable_nodes)
isochrones.append({
"time_min": time_min,
"polygon_coords": polygon_coords,
"color": ISOCHRONE_COLORS.get(time_min, "#6366f1"),
"node_count": len(reachable_nodes),
})
# Find resources within the max isochrone
resources_within = []
if self.resources_df is not None:
max_distance_nodes = {n for n, d in node_distances.items() if d <= max_distance}
for _, row in self.resources_df.iterrows():
if resource_types and row["type"] not in resource_types:
continue
try:
resource_node = self._get_nearest_node(row["lat"], row["lon"])
if resource_node in max_distance_nodes:
dist = node_distances.get(resource_node, float("inf"))
time_to_reach = dist / WALK_SPEED_M_PER_MIN
resources_within.append({
"name": row["name"],
"type": row["type"],
"category": row.get("category", ""),
"lat": row["lat"],
"lon": row["lon"],
"distance_meters": round(dist, 1),
"walking_time_minutes": round(time_to_reach, 1),
})
except Exception:
continue
# Sort by distance
resources_within.sort(key=lambda x: x["distance_meters"])
return IsochroneResult(
success=True,
origin=(origin_lat, origin_lon),
isochrones=isochrones,
resources_within=resources_within[:20], # Limit for display
)
def _compute_sssp_igraph(self, origin_node: int, max_distance: float) -> dict[int, float]:
"""Compute single-source shortest paths using igraph."""
ig_origin = self.ig_node_map.get(origin_node)
if ig_origin is None:
return {}
# Run Dijkstra from origin using igraph's shortest_paths
all_distances = self.ig_graph.distances(source=ig_origin, weights="weight", mode="out")[0]
# Convert back to NetworkX node IDs
distances = {}
for ig_node, dist in enumerate(all_distances):
if dist != float("inf") and dist <= max_distance:
nx_node = self.ig_reverse_map.get(ig_node)
if nx_node is not None:
distances[nx_node] = dist
return distances
def _compute_sssp_networkx(self, origin_node: int, max_distance: float) -> dict[int, float]:
"""Compute single-source shortest paths using NetworkX."""
try:
# Use cutoff for efficiency
lengths = nx.single_source_dijkstra_path_length(
self.G, origin_node, cutoff=max_distance, weight="length"
)
return dict(lengths)
except Exception:
return {}
def _nodes_to_polygon(self, nodes: list[int]) -> list[tuple[float, float]]:
"""Convert a set of nodes to a convex hull polygon in lat/lon."""
if len(nodes) < 3:
return []
# Get coordinates
coords = []
for node in nodes:
x = self.G.nodes[node].get("x", 0)
y = self.G.nodes[node].get("y", 0)
coords.append([x, y])
coords = np.array(coords)
# Compute convex hull
try:
from scipy.spatial import ConvexHull
hull = ConvexHull(coords)
hull_points = coords[hull.vertices]
# Convert to lat/lon
if "crs" in self.G.graph and self.G.graph["crs"] != "EPSG:4326":
import pyproj
transformer = pyproj.Transformer.from_crs(
self.G.graph["crs"], "EPSG:4326", always_xy=True
)
result = []
for x, y in hull_points:
lon, lat = transformer.transform(x, y)
result.append((lat, lon))
# Close the polygon
result.append(result[0])
return result
else:
result = [(y, x) for x, y in hull_points]
result.append(result[0])
return result
except Exception:
return []
# -------------------------------------------------------------------------
# Find Along Route (POI discovery along a route corridor)
# -------------------------------------------------------------------------
def find_along_route(
self,
origin_lat: float,
origin_lon: float,
dest_lat: float,
dest_lon: float,
buffer_meters: float = 100,
resource_types: list[str] = None
) -> AlongRouteResult:
"""
Find resources/POIs along a route corridor.
Args:
origin_lat, origin_lon: Route start
dest_lat, dest_lon: Route end
buffer_meters: Width of corridor to search (default 100m)
resource_types: Optional filter for resource types
Returns:
AlongRouteResult with POIs found along the route
"""
if not self._loaded:
return AlongRouteResult(success=False, error="Engine not loaded")
if self.resources_df is None:
return AlongRouteResult(success=False, error="Resources not loaded")
# Compute the route first
routing_result = self.compute_routes(origin_lat, origin_lon, dest_lat, dest_lon)
if not routing_result.success:
return AlongRouteResult(success=False, error=routing_result.error)
# Get the recommended route
recommended = next(
(r for r in routing_result.alternatives if r.name == routing_result.recommended),
routing_result.alternatives[0]
)
route_coords = recommended.coords
if not route_coords:
return AlongRouteResult(success=False, error="No route coordinates")
# Convert route to projected coordinates for distance calculations
if "crs" in self.G.graph and self.G.graph["crs"] != "EPSG:4326":
import pyproj
transformer = pyproj.Transformer.from_crs(
"EPSG:4326", self.G.graph["crs"], always_xy=True
)
projected_route = []
for lat, lon in route_coords:
x, y = transformer.transform(lon, lat)
projected_route.append([x, y])
projected_route = np.array(projected_route)
else:
projected_route = np.array([[lon, lat] for lat, lon in route_coords])
# Find resources within buffer distance of route
pois_found = []
for _, row in self.resources_df.iterrows():
if resource_types and row["type"] not in resource_types:
continue
# Get resource in projected coordinates
if "crs" in self.G.graph and self.G.graph["crs"] != "EPSG:4326":
x, y = transformer.transform(row["lon"], row["lat"])
point = np.array([x, y])
else:
point = np.array([row["lon"], row["lat"]])
# Calculate minimum distance to route
min_dist = self._point_to_polyline_distance(point, projected_route)
if min_dist <= buffer_meters:
# Find which segment of the route it's nearest to (for ordering)
segment_idx = self._nearest_segment_index(point, projected_route)
pois_found.append({
"name": row["name"],
"type": row["type"],
"category": row.get("category", ""),
"lat": row["lat"],
"lon": row["lon"],
"distance_from_route_m": round(min_dist, 1),
"route_segment": segment_idx,
})
# Sort by route segment (so POIs appear in order along the route)
pois_found.sort(key=lambda x: x["route_segment"])
return AlongRouteResult(
success=True,
route_coords=route_coords,
pois_found=pois_found,
origin=(origin_lat, origin_lon),
destination=(dest_lat, dest_lon),
buffer_meters=buffer_meters,
climate_metrics=recommended.climate_metrics,
)
def _point_to_polyline_distance(self, point: np.ndarray, polyline: np.ndarray) -> float:
"""Calculate minimum distance from a point to a polyline."""
min_dist = float("inf")
for i in range(len(polyline) - 1):
seg_start = polyline[i]
seg_end = polyline[i + 1]
# Vector from start to end
seg_vec = seg_end - seg_start
seg_len_sq = np.dot(seg_vec, seg_vec)
if seg_len_sq == 0:
# Segment is a point
dist = np.linalg.norm(point - seg_start)
else:
# Project point onto segment
t = max(0, min(1, np.dot(point - seg_start, seg_vec) / seg_len_sq))
projection = seg_start + t * seg_vec
dist = np.linalg.norm(point - projection)
min_dist = min(min_dist, dist)
return min_dist
def _nearest_segment_index(self, point: np.ndarray, polyline: np.ndarray) -> int:
"""Find which segment of the polyline is nearest to the point."""
min_dist = float("inf")
nearest_idx = 0
for i in range(len(polyline) - 1):
seg_start = polyline[i]
seg_end = polyline[i + 1]
seg_vec = seg_end - seg_start
seg_len_sq = np.dot(seg_vec, seg_vec)
if seg_len_sq == 0:
dist = np.linalg.norm(point - seg_start)
else:
t = max(0, min(1, np.dot(point - seg_start, seg_vec) / seg_len_sq))
projection = seg_start + t * seg_vec
dist = np.linalg.norm(point - projection)
if dist < min_dist:
min_dist = dist
nearest_idx = i
return nearest_idx
# -------------------------------------------------------------------------
# Geocoding
# -------------------------------------------------------------------------
def geocode_query(self, query: str) -> tuple[str, dict]:
"""Resolve place names in query to coordinates."""
geocode_info = {}
modified = query
query_lower = query.lower()
# Sort by name length for greedy matching
sorted_places = sorted(self.known_places.items(), key=lambda x: -len(x[0]))
used_spans = []
for name_lower, info in sorted_places:
pattern = r"\b" + re.escape(name_lower) + r"\b"
for match in re.finditer(pattern, query_lower):
start, end = match.span()
overlaps = any(not (end <= us or start >= ue) for us, ue in used_spans)
if not overlaps:
geocode_info[info["name"]] = {
"lat": info["lat"],
"lon": info["lon"],
"name": info["name"]
}
original_text = query[start:end]
modified = re.compile(re.escape(original_text), re.IGNORECASE).sub(
f"(lat {info['lat']:.6f}, lon {info['lon']:.6f})",
modified,
count=1
)
used_spans.append((start, end))
return modified, geocode_info
# =============================================================================
# Tool Executor (bridges LLM output to engine)
# =============================================================================
def _safe_str(val, default: str = "") -> str:
if val is None:
return default
if isinstance(val, list):
return str(val[0]) if val else default
return str(val)
def _safe_float(val, default: float) -> float:
if val is None:
return default
if isinstance(val, list):
val = val[0] if val else default
try:
return float(val)
except (ValueError, TypeError):
return default
def _parse_climate_params(args: dict) -> ClimateWeightParams:
"""Extract climate weight parameters from tool args.
The LLM can pass these parameters based on user context:
- flood_penalty_deep: 5.0 default, 10.0 for active flooding
- flood_penalty_shallow: 2.0 default, 4.0 for flooding conditions
- heat_factor: 0.3 default, 0.5 for hot days
- shade_factor: 0.3 default, 0.5 for shade-seeking
- aqi_factor: 0.1 default, 0.5 for respiratory concerns
- grade_factor: 0.2 default, 0.5 for elderly/mobility-impaired users
"""
return ClimateWeightParams(
flood_penalty_deep=_safe_float(args.get("flood_penalty_deep"), 5.0),
flood_penalty_shallow=_safe_float(args.get("flood_penalty_shallow"), 2.0),
heat_factor=_safe_float(args.get("heat_factor"), 0.3),
shade_factor=_safe_float(args.get("shade_factor"), 0.3),
aqi_factor=_safe_float(args.get("aqi_factor"), 0.1),
grade_factor=_safe_float(args.get("grade_factor"), 0.2),
)
def execute_tool(
tool_name: str,
args: dict,
engine: RoutingEngine
) -> tuple[dict[str, Any], dict | None]:
"""Execute a tool by name using the routing engine.
Climate weight parameters can be passed in args and will be used for
runtime weight calculation when routing.
"""
# Parse climate parameters from args (LLM can set these based on context)
climate_params = _parse_climate_params(args)
if tool_name == "list_resources":
result = engine.list_resources(
resource_type=_safe_str(args.get("resource_type"), ""),
category=_safe_str(args.get("category"), "")
)
return result, None
elif tool_name == "find_nearest":
lat = args.get("lat") or args.get("origin_lat")
lon = args.get("lon") or args.get("origin_lon")
return engine.find_nearest_resource(
resource_type=_safe_str(args.get("resource_type"), ""),
origin_lat=_safe_float(lat, BROWNSVILLE_CENTER["lat"]),
origin_lon=_safe_float(lon, BROWNSVILLE_CENTER["lon"]),
climate_params=climate_params
)
elif tool_name == "calculate_route":
# Check for routing_mode parameter
routing_mode = _safe_str(args.get("routing_mode"), "safe")
result = engine.compute_routes(
origin_lat=_safe_float(args.get("start_lat") or args.get("origin_lat"), BROWNSVILLE_CENTER["lat"]),
origin_lon=_safe_float(args.get("start_lon") or args.get("origin_lon"), BROWNSVILLE_CENTER["lon"]),
dest_lat=_safe_float(args.get("end_lat") or args.get("dest_lat"), BROWNSVILLE_CENTER["lat"]),
dest_lon=_safe_float(args.get("end_lon") or args.get("dest_lon"), BROWNSVILLE_CENTER["lon"]),
dest_name=_safe_str(args.get("dest_name"), "Destination"),
climate_params=climate_params
)
return result.to_dict(), result.to_map_data()
elif tool_name == "generate_isochrone":
# Parse time limits - handle various formats like "10", "10 minutes", "5, 10, 15"
time_limits = args.get("time_limits", [5, 10, 15])
if isinstance(time_limits, str):
# Split by comma and extract numeric values
parsed = []
for x in time_limits.split(","):
# Extract just the numeric part (handles "10 minutes", "15 min", etc.)
import re
match = re.search(r'(\d+)', x.strip())
if match:
parsed.append(int(match.group(1)))
time_limits = parsed if parsed else [5, 10, 15]
elif isinstance(time_limits, (int, float)):
time_limits = [int(time_limits)]
elif isinstance(time_limits, list):
# Ensure all elements are integers (LLM may return strings like ["5", "10"])
time_limits = [int(x) if isinstance(x, (int, float, str)) and str(x).isdigit() else 10 for x in time_limits]
if not time_limits:
time_limits = [5, 10, 15]
# Parse resource types filter
resource_types = args.get("resource_types")
if isinstance(resource_types, str):
resource_types = [x.strip() for x in resource_types.split(",")]
lat = args.get("lat") or args.get("origin_lat")
lon = args.get("lon") or args.get("origin_lon")
result = engine.generate_isochrone(
origin_lat=_safe_float(lat, BROWNSVILLE_CENTER["lat"]),
origin_lon=_safe_float(lon, BROWNSVILLE_CENTER["lon"]),
time_limits=time_limits,
resource_types=resource_types
)
return result.to_dict(), result.to_map_data()
elif tool_name == "find_along_route":
# Parse resource types filter
resource_types = args.get("resource_types")
if isinstance(resource_types, str):
resource_types = [x.strip() for x in resource_types.split(",")]
result = engine.find_along_route(
origin_lat=_safe_float(args.get("start_lat") or args.get("origin_lat"), BROWNSVILLE_CENTER["lat"]),
origin_lon=_safe_float(args.get("start_lon") or args.get("origin_lon"), BROWNSVILLE_CENTER["lon"]),
dest_lat=_safe_float(args.get("end_lat") or args.get("dest_lat"), BROWNSVILLE_CENTER["lat"]),
dest_lon=_safe_float(args.get("end_lon") or args.get("dest_lon"), BROWNSVILLE_CENTER["lon"]),
buffer_meters=_safe_float(args.get("buffer_meters"), 100),
resource_types=resource_types
)
return result.to_dict(), result.to_map_data()
elif tool_name == "compare_routes":
# Compare fastest vs safest (climate-aware) routes
origin_lat = _safe_float(args.get("start_lat") or args.get("origin_lat"), BROWNSVILLE_CENTER["lat"])
origin_lon = _safe_float(args.get("start_lon") or args.get("origin_lon"), BROWNSVILLE_CENTER["lon"])
dest_lat = _safe_float(args.get("end_lat") or args.get("dest_lat"), BROWNSVILLE_CENTER["lat"])
dest_lon = _safe_float(args.get("end_lon") or args.get("dest_lon"), BROWNSVILLE_CENTER["lon"])
result = engine.compare_routes(
origin_coords=(origin_lat, origin_lon),
dest_coords=(dest_lat, dest_lon),
dest_name=_safe_str(args.get("dest_name"), "Destination"),
climate_params=climate_params
)
return result.to_dict(), result.to_map_data()
elif tool_name == "climate_route":
# Single route with mode selection and climate parameters
origin_lat = _safe_float(args.get("start_lat") or args.get("origin_lat"), BROWNSVILLE_CENTER["lat"])
origin_lon = _safe_float(args.get("start_lon") or args.get("origin_lon"), BROWNSVILLE_CENTER["lon"])
dest_lat = _safe_float(args.get("end_lat") or args.get("dest_lat"), BROWNSVILLE_CENTER["lat"])
dest_lon = _safe_float(args.get("end_lon") or args.get("dest_lon"), BROWNSVILLE_CENTER["lon"])
mode = _safe_str(args.get("mode") or args.get("routing_mode"), "safest")
result = engine.route(
origin_coords=(origin_lat, origin_lon),
dest_coords=(dest_lat, dest_lon),
mode=mode,
climate_params=climate_params
)
if result is None:
return {"error": "No path found"}, None
map_data = {
"routes": [{
"coords": result.coords,
"color": result.color,
"label": result.label,
"name": result.name,
}],
"origin": [origin_lat, origin_lon],
"destination": [dest_lat, dest_lon],
}
return result.to_dict(), map_data
else:
return {"error": f"Unknown tool: {tool_name}"}, None