""" Routing engine for the Emergency Routing Assistant. This module contains all routing logic, data loading, and processing. The frontend (app.py) should only handle presentation. Features inspired by dream-meridian: - igraph backend for high-performance routing (optional) - Isochrone generation (reachable area within X minutes) - Find along route (discover POIs along a computed route) """ import os import re import json import networkx as nx import pandas as pd import geopandas as gpd import osmnx as ox import requests from typing import Any from dataclasses import dataclass, field from scipy.spatial import cKDTree import numpy as np # Optional igraph support for high-performance routing try: import igraph as ig HAS_IGRAPH = True except ImportError: HAS_IGRAPH = False # ============================================================================= # Constants # ============================================================================= WALK_SPEED_M_PER_MIN = 75 # ~4.5 km/h ELEVATION_PENALTY_FACTOR = 3.0 BROWNSVILLE_CENTER = {"lat": 40.6594, "lon": -73.9126} BROWNSVILLE_BOUNDS = { "min_lat": 40.64, "max_lat": 40.68, "min_lon": -73.93, "max_lon": -73.89 } VALID_RESOURCE_TYPES = [ "pharmacy", "clinic", "hospital", "fire_station", "police", "school", "library", "community_centre", "place_of_worship" ] # Route colors for display ROUTE_COLORS = { "shortest": "#3b82f6", # Blue "flattest": "#22c55e", # Green "balanced": "#f59e0b", # Amber "safest": "#10b981", # Emerald (climate-safe route) } # Isochrone colors by time ISOCHRONE_COLORS = { 5: "#22c55e", # Green - 5 min 10: "#84cc16", # Lime - 10 min 15: "#f59e0b", # Amber - 15 min 20: "#ef4444", # Red - 20 min } # ============================================================================= # POI Marker Styles # ============================================================================= # This configuration allows UI developers to customize markers by POI type. # Each entry defines: color, icon, and optional display properties. # # Icon options for Folium: # - Font Awesome icons (prefix "fa"): "fa-hospital", "fa-fire", etc. # - Glyphicons (prefix "glyphicon"): "glyphicon-home", etc. # - Bootstrap icons: "heart", "star", "flag", etc. # # For custom SVG/image markers, UI devs can extend render_map() to use # folium.CustomIcon or folium.DivIcon with the 'custom_icon_url' field. POI_MARKER_STYLES = { # Emergency Services "hospital": { "color": "#dc2626", # Red-600 "fill_color": "#fecaca", # Red-200 "icon": "fa-hospital", "icon_prefix": "fa", "radius": 8, "category": "Emergency Service", }, "clinic": { "color": "#ef4444", # Red-500 "fill_color": "#fee2e2", # Red-100 "icon": "fa-stethoscope", "icon_prefix": "fa", "radius": 7, "category": "Emergency Service", }, "fire_station": { "color": "#ea580c", # Orange-600 "fill_color": "#ffedd5", # Orange-100 "icon": "fa-fire-extinguisher", "icon_prefix": "fa", "radius": 8, "category": "Emergency Service", }, "police": { "color": "#1d4ed8", # Blue-700 "fill_color": "#dbeafe", # Blue-100 "icon": "fa-shield", "icon_prefix": "fa", "radius": 8, "category": "Emergency Service", }, # Healthcare "pharmacy": { "color": "#16a34a", # Green-600 "fill_color": "#dcfce7", # Green-100 "icon": "fa-medkit", "icon_prefix": "fa", "radius": 6, "category": "Healthcare", }, "doctors": { "color": "#22c55e", # Green-500 "fill_color": "#bbf7d0", # Green-200 "icon": "fa-user-md", "icon_prefix": "fa", "radius": 6, "category": "Healthcare", }, # Community Resources "school": { "color": "#7c3aed", # Violet-600 "fill_color": "#ede9fe", # Violet-100 "icon": "fa-graduation-cap", "icon_prefix": "fa", "radius": 7, "category": "Community Resource", }, "library": { "color": "#8b5cf6", # Violet-500 "fill_color": "#f3e8ff", # Purple-100 "icon": "fa-book", "icon_prefix": "fa", "radius": 6, "category": "Community Resource", }, "community_centre": { "color": "#6366f1", # Indigo-500 "fill_color": "#e0e7ff", # Indigo-100 "icon": "fa-users", "icon_prefix": "fa", "radius": 7, "category": "Community Resource", }, "place_of_worship": { "color": "#a855f7", # Purple-500 "fill_color": "#f3e8ff", # Purple-100 "icon": "fa-church", "icon_prefix": "fa", "radius": 6, "category": "Community Resource", }, "youth_center": { "color": "#ec4899", # Pink-500 "fill_color": "#fce7f3", # Pink-100 "icon": "fa-child", "icon_prefix": "fa", "radius": 6, "category": "Community Resource", }, "senior_center": { "color": "#f97316", # Orange-500 "fill_color": "#ffedd5", # Orange-100 "icon": "fa-heart", "icon_prefix": "fa", "radius": 6, "category": "Community Resource", }, "childcare": { "color": "#f472b6", # Pink-400 "fill_color": "#fbcfe8", # Pink-200 "icon": "fa-child", "icon_prefix": "fa", "radius": 5, "category": "Community Resource", }, "nycha_community_center": { "color": "#0ea5e9", # Sky-500 "fill_color": "#e0f2fe", # Sky-100 "icon": "fa-building", "icon_prefix": "fa", "radius": 7, "category": "Community Resource", }, # Climate Infrastructure "park": { "color": "#16a34a", # Green-600 "fill_color": "#bbf7d0", # Green-200 "icon": "fa-tree", "icon_prefix": "fa", "radius": 6, "category": "Climate Infrastructure", }, "shelter": { "color": "#0284c7", # Sky-600 "fill_color": "#bae6fd", # Sky-200 "icon": "fa-home", "icon_prefix": "fa", "radius": 8, "category": "Climate Infrastructure", }, "drinking_water": { "color": "#0891b2", # Cyan-600 "fill_color": "#cffafe", # Cyan-100 "icon": "fa-tint", "icon_prefix": "fa", "radius": 5, "category": "Climate Infrastructure", }, # Local Business "bodega": { "color": "#f59e0b", # Amber-500 "fill_color": "#fef3c7", # Amber-100 "icon": "fa-shopping-basket", "icon_prefix": "fa", "radius": 5, "category": "Local Business", }, "supermarket": { "color": "#d97706", # Amber-600 "fill_color": "#fef3c7", # Amber-100 "icon": "fa-shopping-cart", "icon_prefix": "fa", "radius": 6, "category": "Local Business", }, "fast_food": { "color": "#fbbf24", # Amber-400 "fill_color": "#fef9c3", # Yellow-100 "icon": "fa-cutlery", "icon_prefix": "fa", "radius": 5, "category": "Local Business", }, "cafe": { "color": "#92400e", # Amber-800 "fill_color": "#fef3c7", # Amber-100 "icon": "fa-coffee", "icon_prefix": "fa", "radius": 5, "category": "Local Business", }, "bank": { "color": "#475569", # Slate-600 "fill_color": "#e2e8f0", # Slate-200 "icon": "fa-university", "icon_prefix": "fa", "radius": 5, "category": "Local Business", }, # Social Services "social_facility": { "color": "#0d9488", # Teal-600 "fill_color": "#ccfbf1", # Teal-100 "icon": "fa-handshake-o", "icon_prefix": "fa", "radius": 6, "category": "Social Services", }, "snap_center": { "color": "#14b8a6", # Teal-500 "fill_color": "#ccfbf1", # Teal-100 "icon": "fa-id-card", "icon_prefix": "fa", "radius": 6, "category": "Social Services", }, # Default / Fallback for unknown types "facility": { "color": "#6b7280", # Gray-500 "fill_color": "#e5e7eb", # Gray-200 "icon": "fa-building-o", "icon_prefix": "fa", "radius": 5, "category": "Other", }, } # Default style for POI types not explicitly defined POI_DEFAULT_STYLE = { "color": "#6b7280", # Gray-500 "fill_color": "#e5e7eb", # Gray-200 "icon": "fa-map-marker", "icon_prefix": "fa", "radius": 5, "category": "Other", } def get_poi_marker_style(poi_type: str) -> dict: """ Get marker style configuration for a given POI type. Args: poi_type: The type of POI (e.g., "hospital", "pharmacy", "school") Returns: Dictionary with marker style properties: - color: Border/stroke color (hex) - fill_color: Fill color (hex) - icon: Font Awesome or Glyphicon name - icon_prefix: Icon library prefix ("fa" or "glyphicon") - radius: Circle marker radius in pixels - category: POI category for grouping Example: >>> style = get_poi_marker_style("hospital") >>> style["color"] '#dc2626' >>> style["icon"] 'fa-hospital' """ return POI_MARKER_STYLES.get(poi_type, POI_DEFAULT_STYLE).copy() def get_all_poi_styles() -> dict: """ Get all POI marker style configurations. Returns: Dictionary mapping POI type names to their style configurations. Useful for UI developers to enumerate all available styles. Example: >>> styles = get_all_poi_styles() >>> list(styles.keys()) ['hospital', 'clinic', 'fire_station', ...] """ return POI_MARKER_STYLES.copy() def get_poi_styles_by_category() -> dict[str, list[str]]: """ Get POI types grouped by category. Returns: Dictionary mapping category names to lists of POI types. Example: >>> by_cat = get_poi_styles_by_category() >>> by_cat["Emergency Service"] ['hospital', 'clinic', 'fire_station', 'police'] """ categories: dict[str, list[str]] = {} for poi_type, style in POI_MARKER_STYLES.items(): cat = style.get("category", "Other") if cat not in categories: categories[cat] = [] categories[cat].append(poi_type) return categories # ============================================================================= # Data Classes # ============================================================================= @dataclass class RouteMetrics: elevation_gain_m: float = 0 elevation_loss_m: float = 0 max_elevation_m: float = 0 min_elevation_m: float = 0 avg_grade_pct: float = 0 max_grade_pct: float = 0 difficulty: str = "flat" def to_dict(self) -> dict: return { "elevation_gain_m": self.elevation_gain_m, "elevation_loss_m": self.elevation_loss_m, "max_elevation_m": self.max_elevation_m, "min_elevation_m": self.min_elevation_m, "avg_grade_pct": self.avg_grade_pct, "max_grade_pct": self.max_grade_pct, "difficulty": self.difficulty, } @dataclass class ClimateMetrics: """Climate risk metrics for a route. Climate weights are now computed at RUNTIME using configurable parameters, allowing the LLM to adjust weights based on user context (e.g., flooding, heat wave, air quality concerns). """ avg_climate_risk: float = 0 max_climate_risk: float = 0 avg_flood_risk: float = 0 max_flood_risk: float = 0 avg_heat_risk: float = 0 max_heat_risk: float = 0 avg_air_quality_risk: float = 0 max_air_quality_risk: float = 0 avg_tree_coverage: float = 0 flood_exposure_m: float = 0 # meters of route where flood_risk > 0.3 def to_dict(self) -> dict: return { "avg_climate_risk": self.avg_climate_risk, "max_climate_risk": self.max_climate_risk, "avg_flood_risk": self.avg_flood_risk, "max_flood_risk": self.max_flood_risk, "avg_heat_risk": self.avg_heat_risk, "max_heat_risk": self.max_heat_risk, "avg_air_quality_risk": self.avg_air_quality_risk, "max_air_quality_risk": self.max_air_quality_risk, "avg_tree_coverage": self.avg_tree_coverage, "flood_exposure_m": self.flood_exposure_m, } @dataclass class ClimateWeightParams: """Parameters for runtime climate weight calculation. These parameters are inferred by the LLM based on user context: - Flooding mentioned: increase flood penalties - Hot day / shade requested: increase heat_factor and shade_factor - Respiratory concerns: increase aqi_factor - Mobility concerns / elderly: increase grade_factor """ flood_penalty_deep: float = 5.0 # Multiplier for deep flood zones (flood_risk >= 1.0) flood_penalty_shallow: float = 2.0 # Multiplier for shallow flood zones (flood_risk >= 0.6) heat_factor: float = 0.3 # Penalty for high heat vulnerability (0.0-1.0) shade_factor: float = 0.3 # Benefit from tree coverage (0.0-1.0) aqi_factor: float = 0.1 # Penalty for poor air quality (0.0-1.0) grade_factor: float = 0.2 # Penalty for steep grades (0.0-1.0) def to_dict(self) -> dict: return { "flood_penalty_deep": self.flood_penalty_deep, "flood_penalty_shallow": self.flood_penalty_shallow, "heat_factor": self.heat_factor, "shade_factor": self.shade_factor, "aqi_factor": self.aqi_factor, "grade_factor": self.grade_factor, } @dataclass class RouteOption: name: str label: str color: str coords: list[tuple[float, float]] distance_m: float time_min: float metrics: RouteMetrics route_nodes: list[int] = field(default_factory=list) climate_metrics: ClimateMetrics = field(default_factory=ClimateMetrics) def to_dict(self) -> dict: result = { "name": self.name, "label": self.label, "color": self.color, "coords": self.coords, "distance_meters": self.distance_m, "walking_time_minutes": self.time_min, "route_metrics": self.metrics.to_dict(), } # Include climate metrics if they have data if self.climate_metrics.avg_climate_risk > 0: result["climate_metrics"] = self.climate_metrics.to_dict() return result @dataclass class RoutingResult: success: bool recommended: str = "" alternatives: list[RouteOption] = field(default_factory=list) origin: tuple[float, float] = (0, 0) destination: tuple[float, float] = (0, 0) dest_name: str = "" error: str = "" def to_dict(self) -> dict: if not self.success: return {"error": self.error} recommended_route = next( (r for r in self.alternatives if r.name == self.recommended), self.alternatives[0] if self.alternatives else None ) return { "success": True, "recommended": self.recommended, "alternatives": [r.to_dict() for r in self.alternatives], "distance_meters": recommended_route.distance_m if recommended_route else 0, "walking_time_minutes": recommended_route.time_min if recommended_route else 0, "origin": {"lat": self.origin[0], "lon": self.origin[1]}, "destination": {"lat": self.destination[0], "lon": self.destination[1], "name": self.dest_name}, "route_metrics": recommended_route.metrics.to_dict() if recommended_route else {}, } def to_map_data(self) -> dict | None: if not self.success or not self.alternatives: return None recommended_route = next( (r for r in self.alternatives if r.name == self.recommended), self.alternatives[0] ) return { "routes": [ {"coords": r.coords, "color": r.color, "label": r.label, "name": r.name} for r in self.alternatives ], "origin": list(self.origin), "destination": list(self.destination), "dest_name": self.dest_name, "distance": recommended_route.distance_m, "route_coords": recommended_route.coords, # backwards compat } @dataclass class IsochroneResult: """Result of isochrone generation - areas reachable within time limits.""" success: bool origin: tuple[float, float] = (0, 0) isochrones: list[dict] = field(default_factory=list) # [{time_min, polygon_coords, color}] resources_within: list[dict] = field(default_factory=list) # Resources within max isochrone error: str = "" def to_dict(self) -> dict: if not self.success: return {"error": self.error} return { "success": True, "origin": {"lat": self.origin[0], "lon": self.origin[1]}, "isochrones": self.isochrones, "resources_within": self.resources_within, } def to_map_data(self) -> dict | None: if not self.success: return None return { "origin": list(self.origin), "isochrones": self.isochrones, "resources_within": self.resources_within, } @dataclass class AlongRouteResult: """Result of find_along_route - POIs discovered along a route.""" success: bool route_coords: list[tuple[float, float]] = field(default_factory=list) pois_found: list[dict] = field(default_factory=list) origin: tuple[float, float] = (0, 0) destination: tuple[float, float] = (0, 0) buffer_meters: float = 100 climate_metrics: ClimateMetrics | None = None error: str = "" def to_dict(self) -> dict: if not self.success: return {"error": self.error} result = { "success": True, "origin": {"lat": self.origin[0], "lon": self.origin[1]}, "destination": {"lat": self.destination[0], "lon": self.destination[1]}, "buffer_meters": self.buffer_meters, "pois_found": self.pois_found, "poi_count": len(self.pois_found), } if self.climate_metrics: result["climate_metrics"] = self.climate_metrics.to_dict() return result def to_map_data(self) -> dict | None: if not self.success: return None return { "route_coords": self.route_coords, "origin": list(self.origin), "destination": list(self.destination), "pois_along_route": self.pois_found, } @dataclass class RouteComparisonResult: """Result of comparing shortest vs safest routes.""" success: bool shortest: RouteOption | None = None safest: RouteOption | None = None origin: tuple[float, float] = (0, 0) destination: tuple[float, float] = (0, 0) dest_name: str = "" extra_distance_m: float = 0 extra_distance_pct: float = 0 risk_reduction: float = 0 error: str = "" def to_dict(self) -> dict: if not self.success: return {"error": self.error} return { "success": True, "shortest": self.shortest.to_dict() if self.shortest else None, "safest": self.safest.to_dict() if self.safest else None, "origin": {"lat": self.origin[0], "lon": self.origin[1]}, "destination": {"lat": self.destination[0], "lon": self.destination[1], "name": self.dest_name}, "comparison": { "extra_distance_m": round(self.extra_distance_m, 1), "extra_distance_pct": round(self.extra_distance_pct, 1), "risk_reduction": round(self.risk_reduction, 3), }, } def to_map_data(self) -> dict | None: if not self.success: return None routes = [] if self.shortest: routes.append({ "coords": self.shortest.coords, "color": self.shortest.color, "label": self.shortest.label, "name": self.shortest.name, }) if self.safest and self.safest.coords != (self.shortest.coords if self.shortest else []): routes.append({ "coords": self.safest.coords, "color": self.safest.color, "label": self.safest.label, "name": self.safest.name, }) return { "routes": routes, "origin": list(self.origin), "destination": list(self.destination), "dest_name": self.dest_name, } # ============================================================================= # Routing Engine # ============================================================================= class RoutingEngine: """ Core routing engine - handles all graph operations and route computation. Supports optional igraph backend for high-performance routing. Features: - Multi-route computation (shortest, flattest, balanced) - Isochrone generation (reachable area within X minutes) - Find along route (POIs near a route corridor) """ def __init__(self, use_igraph: bool = True): self.G: nx.MultiDiGraph | None = None self.resources_df: pd.DataFrame | None = None self.known_places: dict[str, dict] = {} self._loaded = False # igraph backend (if available and requested) self.use_igraph = use_igraph and HAS_IGRAPH self.ig_graph: "ig.Graph | None" = None self.ig_node_map: dict[int, int] = {} # NetworkX node -> igraph node self.ig_reverse_map: dict[int, int] = {} # igraph node -> NetworkX node # Edge attribute indices for igraph (for climate routing) self.ig_length_idx: int = -1 self.ig_climate_weight_idx: int = -1 # Track if climate data is available self.has_climate_data: bool = False # Spatial index for fast nearest-neighbor queries self._node_coords: np.ndarray | None = None self._node_ids: list[int] = [] self._kdtree: cKDTree | None = None self._resource_kdtree: cKDTree | None = None def load(self) -> bool: """Load network and resources from disk. Tries to load from pre-built cache first (fast), falls back to GraphML (slow). Run `python build_graph_cache.py` to generate the cache for faster startup. """ if self._loaded: return True # Data is in project root's data/ directory, not core/data/ data_dir = os.path.join(os.path.dirname(__file__), "..", "data", "brownsville") try: # Try loading from cache first (much faster!) cache_path = os.path.join(data_dir, "graph_cache.pkl") if os.path.exists(cache_path): loaded_from_cache = self._load_from_cache(cache_path) if loaded_from_cache: # Cache loaded successfully, skip GraphML parsing pass else: # Cache failed, fall back to GraphML self._load_from_graphml(data_dir) else: # No cache, load from GraphML self._load_from_graphml(data_dir) # Load resources resources_path = os.path.join(data_dir, "all_resources.csv") if os.path.exists(resources_path): self.resources_df = pd.read_csv(resources_path) else: geojson_path = os.path.join(data_dir, "all_resources.geojson") if os.path.exists(geojson_path): gdf = gpd.read_file(geojson_path) self.resources_df = pd.DataFrame({ "name": gdf["name"], "type": gdf["type"], "category": gdf["category"], "lat": gdf.geometry.y, "lon": gdf.geometry.x }) # Build resource spatial index if self.resources_df is not None: self._build_resource_index() # Load places for geocoding self._load_known_places(data_dir) self._loaded = True return True except Exception as e: print(f"Error loading data: {e}") return False def _load_from_cache(self, cache_path: str) -> bool: """Load pre-built graph data from pickle cache (fast startup).""" import pickle try: with open(cache_path, 'rb') as f: cache = pickle.load(f) # Check cache version (v3 added air_quality_risk) if cache.get("version", 1) < 3: print("Cache version outdated, rebuilding from GraphML...") return False # Restore all cached data self.G = cache["nx_graph"] self.has_climate_data = cache["has_climate_data"] self._node_ids = cache["node_ids"] self._node_coords = cache["coords"] self._kdtree = cache["kdtree"] # Restore igraph if available if self.use_igraph and cache.get("ig_graph") is not None: self.ig_graph = cache["ig_graph"] self.ig_node_map = cache["ig_node_map"] self.ig_reverse_map = cache["ig_reverse_map"] return True except Exception as e: print(f"Failed to load cache: {e}") return False def _load_from_graphml(self, data_dir: str): """Load graph from GraphML file (slow fallback).""" # Load climate-enhanced network in priority order: # 1. Real climate data (walking_network_final.graphml) # 2. Mock climate data (walking_network_climate.graphml) # 3. Raw network (walking_network.graphml) final_path = os.path.join(data_dir, "walking_network_final.graphml") climate_path = os.path.join(data_dir, "walking_network_climate.graphml") graphml_path = os.path.join(data_dir, "walking_network.graphml") if os.path.exists(final_path): self.G = ox.load_graphml(final_path) self.has_climate_data = True elif os.path.exists(climate_path): self.G = ox.load_graphml(climate_path) self.has_climate_data = True elif os.path.exists(graphml_path): self.G = ox.load_graphml(graphml_path) self.has_climate_data = False else: from shapely.geometry import box brownsville_bbox = box(-73.93, 40.64, -73.89, 40.68) self.G = ox.graph_from_polygon(brownsville_bbox, network_type='walk', simplify=True) self.has_climate_data = False # Convert climate attributes from strings to floats (GraphML stores all as strings) if self.has_climate_data: climate_attrs = ['flood_risk', 'heat_risk', 'air_quality_risk', 'climate_risk', 'climate_weight'] for u, v, data in self.G.edges(data=True): for attr in climate_attrs: if attr in data and isinstance(data[attr], str): try: data[attr] = float(data[attr]) except (ValueError, TypeError): data[attr] = 0.0 # Verify climate data by checking first edge if self.has_climate_data: sample_edge = next(iter(self.G.edges(data=True)), None) if sample_edge and "climate_weight" not in sample_edge[2]: self.has_climate_data = False self.G = ox.project_graph(self.G) # Build spatial index for fast nearest-node queries self._build_spatial_index() # Build igraph graph if available if self.use_igraph: self._build_igraph_graph() def _build_spatial_index(self): """Build KD-tree for fast nearest-node lookups.""" if self.G is None: return nodes = list(self.G.nodes()) coords = [] for node in nodes: x = self.G.nodes[node].get("x", 0) y = self.G.nodes[node].get("y", 0) coords.append([x, y]) self._node_ids = nodes self._node_coords = np.array(coords) self._kdtree = cKDTree(self._node_coords) def _build_resource_index(self): """Build KD-tree for fast resource lookups.""" if self.resources_df is None or len(self.resources_df) == 0: return # Convert lat/lon to projected coordinates for consistency coords = [] if "crs" in self.G.graph and self.G.graph["crs"] != "EPSG:4326": import pyproj transformer = pyproj.Transformer.from_crs("EPSG:4326", self.G.graph["crs"], always_xy=True) for _, row in self.resources_df.iterrows(): x, y = transformer.transform(row["lon"], row["lat"]) coords.append([x, y]) else: for _, row in self.resources_df.iterrows(): coords.append([row["lon"], row["lat"]]) self._resource_kdtree = cKDTree(np.array(coords)) def _build_igraph_graph(self): """Build igraph graph from NetworkX graph for high-performance routing. Preserves all edge attributes including climate data. Uses ig.Graph.from_networkx() for automatic attribute transfer. """ if not HAS_IGRAPH or self.G is None: return # Create node mapping (NetworkX uses arbitrary IDs, igraph uses 0..n-1) nx_nodes = list(self.G.nodes()) self.ig_node_map = {nx_node: i for i, nx_node in enumerate(nx_nodes)} self.ig_reverse_map = {i: nx_node for nx_node, i in self.ig_node_map.items()} # Create igraph graph (directed) self.ig_graph = ig.Graph(n=len(nx_nodes), directed=True) # Store vertex osmids for reverse lookup self.ig_graph.vs["_nx_name"] = nx_nodes # Add edges with all attributes edges = [] lengths = [] climate_weights = [] flood_risks = [] heat_risks = [] climate_risks = [] for u, v, data in self.G.edges(data=True): ig_u = self.ig_node_map[u] ig_v = self.ig_node_map[v] edges.append((ig_u, ig_v)) length = data.get("length", 1.0) if length is None: length = 1.0 lengths.append(float(length)) # Climate attributes (default to safe values if missing) flood_risks.append(float(data.get("flood_risk", 0) or 0)) heat_risks.append(float(data.get("heat_risk", 0) or 0)) climate_risks.append(float(data.get("climate_risk", 0) or 0)) # Climate weight for routing (default to length if missing) cw = data.get("climate_weight") if cw is None: cw = length climate_weights.append(float(cw)) self.ig_graph.add_edges(edges) self.ig_graph.es["length"] = lengths self.ig_graph.es["weight"] = lengths # Default weight is length self.ig_graph.es["climate_weight"] = climate_weights self.ig_graph.es["flood_risk"] = flood_risks self.ig_graph.es["heat_risk"] = heat_risks self.ig_graph.es["climate_risk"] = climate_risks def _load_known_places(self, data_dir: str): """Load known places for geocoding.""" places_path = os.path.join(data_dir, "places.csv") if os.path.exists(places_path): try: df = pd.read_csv(places_path) for _, row in df.iterrows(): self.known_places[row['name_lower']] = { "lat": row['lat'], "lon": row['lon'], "name": row['name'] } except Exception: pass @property def is_loaded(self) -> bool: return self._loaded @property def node_count(self) -> int: return len(self.G.nodes) if self.G else 0 @property def resource_count(self) -> int: return len(self.resources_df) if self.resources_df is not None else 0 # ------------------------------------------------------------------------- # Graph utilities # ------------------------------------------------------------------------- def _get_nearest_node(self, lat: float, lon: float) -> int: """Find nearest network node to a point using KD-tree (fast).""" # Convert to projected coordinates if needed if "crs" in self.G.graph and self.G.graph["crs"] != "EPSG:4326": import pyproj transformer = pyproj.Transformer.from_crs("EPSG:4326", self.G.graph["crs"], always_xy=True) x, y = transformer.transform(lon, lat) else: x, y = lon, lat # Use KD-tree for O(log n) lookup if self._kdtree is not None: _, idx = self._kdtree.query([x, y]) return self._node_ids[idx] # Fallback to OSMnx return ox.nearest_nodes(self.G, x, y) def _get_nearest_node_ig(self, lat: float, lon: float) -> int: """Get igraph node ID for a location.""" nx_node = self._get_nearest_node(lat, lon) return self.ig_node_map.get(nx_node, 0) def _get_route_coords(self, route: list[int]) -> list[tuple[float, float]]: """Extract lat/lon coordinates from route nodes.""" if "crs" in self.G.graph and self.G.graph["crs"] != "EPSG:4326": import pyproj transformer = pyproj.Transformer.from_crs(self.G.graph["crs"], "EPSG:4326", always_xy=True) coords = [] for node in route: x, y = self.G.nodes[node]["x"], self.G.nodes[node]["y"] lon, lat = transformer.transform(x, y) coords.append((lat, lon)) return coords return [(self.G.nodes[node]["y"], self.G.nodes[node]["x"]) for node in route] def _apply_elevation_weights(self, penalty_factor: float = ELEVATION_PENALTY_FACTOR) -> nx.MultiDiGraph: """Create graph copy with elevation-weighted edges.""" G_weighted = self.G.copy() for u, v, key, data in G_weighted.edges(keys=True, data=True): base_length = data.get("length", 1) elev_u = G_weighted.nodes[u].get("elevation", 0) or 0 elev_v = G_weighted.nodes[v].get("elevation", 0) or 0 elev_diff = elev_v - elev_u if elev_diff > 0: data["weighted_length"] = base_length + (elev_diff * penalty_factor) else: data["weighted_length"] = base_length return G_weighted def _compute_edge_weight( self, data: dict, params: ClimateWeightParams = None ) -> float: """ Compute routing weight for an edge using raw risk values and configurable penalties. This is computed at RUNTIME, allowing the LLM to adjust weights based on what the user tells us about their situation (flooding, heat, air quality, mobility). Design: - Flood: categorical penalty (physical danger) - Heat: continuous penalty based on HVI (social vulnerability / exposure proxy) - Tree coverage: REDUCES weight (shade is beneficial) - AQI: small continuous penalty (background air quality) - Grade: continuous penalty for uphill segments (mobility / exertion) Args: data: Edge attribute dict params: ClimateWeightParams with penalty/factor values Returns: float: Weighted length for Dijkstra's algorithm """ if params is None: params = ClimateWeightParams() length = float(data.get('length', 100)) # === FLOOD: Categorical penalty (physical danger) === flood_risk = float(data.get('flood_risk', 0.1)) if flood_risk >= 1.0: flood_mult = params.flood_penalty_deep # Deep flooding: major penalty elif flood_risk >= 0.6: flood_mult = params.flood_penalty_shallow # Shallow flooding: moderate penalty else: flood_mult = 1.0 # No flooding: no penalty # === HEAT: Continuous penalty based on HVI === heat_risk = float(data.get('heat_risk', 0.5)) # At heat_factor=0.3: HVI 1 (0.2) adds 6%, HVI 5 (1.0) adds 30% heat_mult = 1.0 + params.heat_factor * heat_risk # === TREE COVERAGE: Reduces weight (shade is good) === tree_coverage = float(data.get('tree_coverage', 0.0)) # At shade_factor=0.3: 0% trees = 1.0x, 100% trees = 0.7x shade_mult = 1.0 - (params.shade_factor * tree_coverage) # === AIR QUALITY: Small continuous penalty === aqi_risk = float(data.get('air_quality_risk', 0.0)) # At aqi_factor=0.1: worst AQI adds 10% aqi_mult = 1.0 + params.aqi_factor * aqi_risk # === GRADE: Continuous penalty for steep uphill segments === # Grade is stored as decimal (0.05 = 5% grade) # Normalize: 10% grade (0.1) → grade_norm = 1.0 grade = abs(float(data.get('grade', 0) or 0)) grade_norm = min(grade * 10, 1.0) # Cap at 10% grade # At grade_factor=0.2: flat = 1.0x, 10% grade = 1.2x grade_mult = 1.0 + params.grade_factor * grade_norm # Combine multiplicatively return length * flood_mult * heat_mult * shade_mult * aqi_mult * grade_mult def _create_weight_function(self, params: ClimateWeightParams = None): """Create a weight function for NetworkX routing with given climate parameters. Note: For MultiDiGraph, NetworkX passes the edge data as {key: {attrs}} dict, not the {attrs} dict directly. We extract the first edge's attributes. """ if params is None: params = ClimateWeightParams() def weight_func(u, v, data): # For MultiDiGraph, data is {key: {attrs}} - extract first edge's attrs if isinstance(data, dict) and data: first_key = next(iter(data)) if isinstance(first_key, int) and isinstance(data[first_key], dict): data = data[first_key] return self._compute_edge_weight(data, params) return weight_func def _compute_route_metrics(self, route: list[int]) -> RouteMetrics: """Compute elevation metrics for a route.""" if not route or len(route) < 2: return RouteMetrics() elevations = [] grades = [] elevation_gain = 0 elevation_loss = 0 for i, node in enumerate(route): elev = self.G.nodes[node].get("elevation", 0) or 0 elevations.append(elev) if i > 0: diff = elev - elevations[i-1] if diff > 0: elevation_gain += diff else: elevation_loss += abs(diff) edge_data = self.G.get_edge_data(route[i-1], node) if edge_data: first_edge = list(edge_data.values())[0] if isinstance(edge_data, dict) else edge_data grade = abs(first_edge.get("grade", 0)) * 100 grades.append(grade) max_elev = max(elevations) if elevations else 0 min_elev = min(elevations) if elevations else 0 avg_grade = sum(grades) / len(grades) if grades else 0 max_grade = max(grades) if grades else 0 if elevation_gain < 5 and max_grade < 3: difficulty = "flat" elif elevation_gain < 15 or max_grade < 8: difficulty = "moderate" else: difficulty = "hilly" return RouteMetrics( elevation_gain_m=round(elevation_gain, 1), elevation_loss_m=round(elevation_loss, 1), max_elevation_m=round(max_elev, 1), min_elevation_m=round(min_elev, 1), avg_grade_pct=round(avg_grade, 1), max_grade_pct=round(max_grade, 1), difficulty=difficulty ) def _compute_climate_metrics(self, route: list[int]) -> ClimateMetrics: """Compute climate risk metrics for a route. Climate risk is now computed at runtime using a simple weighted formula: climate_risk = 0.5 * flood_risk + 0.3 * heat_risk + 0.2 * air_quality_risk Tree coverage is tracked but used separately (reduces routing weight). """ if not route or len(route) < 2 or not self.has_climate_data: return ClimateMetrics() flood_risks = [] heat_risks = [] air_quality_risks = [] tree_coverages = [] climate_risks = [] flood_exposure = 0.0 # meters in flood risk > 0.3 for i in range(len(route) - 1): u, v = route[i], route[i + 1] edge_data = self.G.get_edge_data(u, v) if not edge_data: continue # Get first edge (MultiDiGraph may have multiple) if isinstance(edge_data, dict) and 0 in edge_data: data = edge_data[0] else: data = next(iter(edge_data.values())) if isinstance(edge_data, dict) else edge_data flood = float(data.get("flood_risk", 0) or 0) heat = float(data.get("heat_risk", 0) or 0) air_quality = float(data.get("air_quality_risk", 0) or 0) tree_coverage = float(data.get("tree_coverage", 0) or 0) length = float(data.get("length", 0) or 0) # Compute combined climate risk at runtime climate = 0.5 * flood + 0.3 * heat + 0.2 * air_quality flood_risks.append(flood) heat_risks.append(heat) air_quality_risks.append(air_quality) tree_coverages.append(tree_coverage) climate_risks.append(climate) if flood > 0.3: flood_exposure += length if not climate_risks: return ClimateMetrics() return ClimateMetrics( avg_climate_risk=round(sum(climate_risks) / len(climate_risks), 3), max_climate_risk=round(max(climate_risks), 3), avg_flood_risk=round(sum(flood_risks) / len(flood_risks), 3), max_flood_risk=round(max(flood_risks), 3), avg_heat_risk=round(sum(heat_risks) / len(heat_risks), 3), max_heat_risk=round(max(heat_risks), 3), avg_air_quality_risk=round(sum(air_quality_risks) / len(air_quality_risks), 3), max_air_quality_risk=round(max(air_quality_risks), 3), avg_tree_coverage=round(sum(tree_coverages) / len(tree_coverages), 3), flood_exposure_m=round(flood_exposure, 1), ) # ------------------------------------------------------------------------- # Route computation # ------------------------------------------------------------------------- def _compute_single_route( self, G: nx.MultiDiGraph, origin_node: int, dest_node: int, weight_key: str = "length" ) -> RouteOption | None: """Compute a single route.""" try: route = nx.shortest_path(G, origin_node, dest_node, weight=weight_key) distance = sum( self.G[u][v][0].get("length", 0) for u, v in zip(route[:-1], route[1:]) ) return RouteOption( name="", label="", color="", coords=self._get_route_coords(route), distance_m=round(distance, 1), time_min=round(distance / WALK_SPEED_M_PER_MIN, 1), metrics=self._compute_route_metrics(route), route_nodes=route, climate_metrics=self._compute_climate_metrics(route), ) except (nx.NetworkXNoPath, Exception): return None def _compute_single_route_igraph( self, origin_node: int, dest_node: int, weight_attr: str = "length" ) -> RouteOption | None: """Compute a single route using igraph for better performance.""" if not self.use_igraph or self.ig_graph is None: return None try: ig_origin = self.ig_node_map.get(origin_node) ig_dest = self.ig_node_map.get(dest_node) if ig_origin is None or ig_dest is None: return None # Get shortest path using specified weight path = self.ig_graph.get_shortest_paths( ig_origin, to=ig_dest, weights=weight_attr, output="vpath" )[0] if not path: return None # Convert back to NetworkX node IDs route = [self.ig_reverse_map[ig_node] for ig_node in path] # Compute distance using original graph distance = sum( self.G[u][v][0].get("length", 0) for u, v in zip(route[:-1], route[1:]) ) return RouteOption( name="", label="", color="", coords=self._get_route_coords(route), distance_m=round(distance, 1), time_min=round(distance / WALK_SPEED_M_PER_MIN, 1), metrics=self._compute_route_metrics(route), route_nodes=route, climate_metrics=self._compute_climate_metrics(route), ) except Exception: return None def route( self, origin_coords: tuple[float, float], dest_coords: tuple[float, float], mode: str = "safest", climate_params: ClimateWeightParams = None ) -> RouteOption | None: """ Compute a single route between two points with runtime climate weight calculation. Climate weights are computed at RUNTIME using configurable parameters, allowing the LLM to adjust based on user context (flooding, heat, asthma, etc.). Args: origin_coords: (lat, lon) tuple for origin dest_coords: (lat, lon) tuple for destination mode: 'safest' (use runtime climate weights) or 'fastest' (use length only) climate_params: ClimateWeightParams for runtime weight calculation. If None, uses default parameters. Returns: RouteOption or None if no path found """ if not self._loaded: return None origin_lat, origin_lon = origin_coords dest_lat, dest_lon = dest_coords try: origin_node = self._get_nearest_node(origin_lat, origin_lon) dest_node = self._get_nearest_node(dest_lat, dest_lon) except Exception: return None # For 'fastest' mode, use simple length-based routing if mode == "fastest" or mode == "fast": result = self._compute_single_route(self.G, origin_node, dest_node, "length") if result: result.name = "fastest" result.label = "Fastest" result.color = ROUTE_COLORS.get("shortest", "#3b82f6") return result # For 'safest' mode, use runtime climate weight calculation if climate_params is None: climate_params = ClimateWeightParams() # Use NetworkX with callable weight function for runtime calculation weight_func = self._create_weight_function(climate_params) result = self._compute_single_route_with_weight_func( origin_node, dest_node, weight_func ) if result: result.name = "safest" result.label = "Safest" result.color = ROUTE_COLORS.get("safest", "#10b981") return result def _compute_single_route_with_weight_func( self, origin_node: int, dest_node: int, weight_func ) -> RouteOption | None: """Compute a single route using a callable weight function.""" try: route = nx.shortest_path(self.G, origin_node, dest_node, weight=weight_func) distance = sum( self.G[u][v][0].get("length", 0) for u, v in zip(route[:-1], route[1:]) ) return RouteOption( name="", label="", color="", coords=self._get_route_coords(route), distance_m=round(distance, 1), time_min=round(distance / WALK_SPEED_M_PER_MIN, 1), metrics=self._compute_route_metrics(route), route_nodes=route, climate_metrics=self._compute_climate_metrics(route), ) except (nx.NetworkXNoPath, Exception): return None def compare_routes( self, origin_coords: tuple[float, float], dest_coords: tuple[float, float], dest_name: str = "Destination", climate_params: ClimateWeightParams = None ) -> RouteComparisonResult: """ Compare shortest and safest routes between two points. Returns both routes with comparison statistics showing the trade-off between distance and climate risk. Args: origin_coords: (lat, lon) tuple for origin dest_coords: (lat, lon) tuple for destination dest_name: Optional name for the destination climate_params: ClimateWeightParams for runtime weight calculation Returns: RouteComparisonResult with both routes and comparison stats """ if not self._loaded: return RouteComparisonResult(success=False, error="Engine not loaded") if not self.has_climate_data: return RouteComparisonResult( success=False, error="Climate data not available. Load climate-enhanced network first." ) origin_lat, origin_lon = origin_coords dest_lat, dest_lon = dest_coords # Compute both routes - fastest uses length, safest uses climate weights fastest = self.route(origin_coords, dest_coords, mode="fastest") safest = self.route(origin_coords, dest_coords, mode="safest", climate_params=climate_params) if not fastest: return RouteComparisonResult(success=False, error="No path found for fastest route") if not safest: return RouteComparisonResult(success=False, error="No path found for safest route") # Calculate comparison statistics extra_distance_m = safest.distance_m - fastest.distance_m extra_distance_pct = (extra_distance_m / fastest.distance_m * 100) if fastest.distance_m > 0 else 0 # Risk reduction is the difference in average climate risk risk_reduction = fastest.climate_metrics.avg_climate_risk - safest.climate_metrics.avg_climate_risk return RouteComparisonResult( success=True, shortest=fastest, safest=safest, origin=(origin_lat, origin_lon), destination=(dest_lat, dest_lon), dest_name=dest_name, extra_distance_m=extra_distance_m, extra_distance_pct=extra_distance_pct, risk_reduction=risk_reduction, ) def compute_routes( self, origin_lat: float, origin_lon: float, dest_lat: float, dest_lon: float, dest_name: str = "Destination", climate_params: ClimateWeightParams = None ) -> RoutingResult: """Compute multiple route alternatives between two points. Climate weights are computed at RUNTIME using configurable parameters. The safest route uses runtime weight calculation based on flood, heat, tree coverage, and air quality factors. Args: origin_lat, origin_lon: Origin coordinates dest_lat, dest_lon: Destination coordinates dest_name: Name of destination climate_params: ClimateWeightParams for runtime weight calculation """ if not self._loaded: return RoutingResult(success=False, error="Engine not loaded") try: origin_node = self._get_nearest_node(origin_lat, origin_lon) dest_node = self._get_nearest_node(dest_lat, dest_lon) except Exception as e: return RoutingResult(success=False, error=f"Could not locate points: {e}") alternatives = [] # Shortest route (length only) shortest = self._compute_single_route(self.G, origin_node, dest_node, "length") if shortest: shortest.name = "shortest" shortest.label = "Shortest" shortest.color = ROUTE_COLORS["shortest"] alternatives.append(shortest) # Flattest route G_flat = self._apply_elevation_weights(penalty_factor=5.0) flattest = self._compute_single_route(G_flat, origin_node, dest_node, "weighted_length") if flattest and (not shortest or flattest.coords != shortest.coords): flattest.name = "flattest" flattest.label = "Flattest" flattest.color = ROUTE_COLORS["flattest"] alternatives.append(flattest) # Balanced route G_balanced = self._apply_elevation_weights(penalty_factor=2.0) balanced = self._compute_single_route(G_balanced, origin_node, dest_node, "weighted_length") if balanced: existing_coords = [r.coords for r in alternatives] if balanced.coords not in existing_coords: balanced.name = "balanced" balanced.label = "Balanced" balanced.color = ROUTE_COLORS["balanced"] alternatives.append(balanced) # Safest route (climate-aware with runtime weight calculation) if self.has_climate_data: if climate_params is None: climate_params = ClimateWeightParams() weight_func = self._create_weight_function(climate_params) safest = self._compute_single_route_with_weight_func(origin_node, dest_node, weight_func) if safest: existing_coords = [r.coords for r in alternatives] if safest.coords not in existing_coords: # Different route - add as separate option safest.name = "safest" safest.label = "Safest (Climate)" safest.color = ROUTE_COLORS["safest"] alternatives.append(safest) else: # Safest route has same path as an existing route (likely shortest) # Update the existing route to show it's ALSO the safest option for route in alternatives: if route.coords == safest.coords: if route.name == "shortest": route.label = "Shortest & Safest (Climate)" route.name = "safest" # Mark as safest for recommendation break if not alternatives: return RoutingResult(success=False, error="No path found") # CLIMATE-AWARE BY DEFAULT: Always recommend safest route when climate # data is available to protect users from flood and heat exposure. if self.has_climate_data and any(r.name == "safest" for r in alternatives): recommended = "safest" elif any(r.name == "flattest" for r in alternatives): recommended = "flattest" else: recommended = "shortest" return RoutingResult( success=True, recommended=recommended, alternatives=alternatives, origin=(origin_lat, origin_lon), destination=(dest_lat, dest_lon), dest_name=dest_name ) def find_nearest_resource( self, resource_type: str, origin_lat: float, origin_lon: float, prefer_safe: bool = True, climate_params: ClimateWeightParams = None ) -> tuple[dict[str, Any], dict | None]: """ Find nearest resource of a given type with climate-aware route alternatives. This function: 1. Finds the nearest resource by ROUTED distance (not crow-flies) 2. Computes multiple route alternatives (shortest, flattest, safest) 3. Recommends the safest route when climate data is available Climate weights are computed at RUNTIME using configurable parameters, allowing the LLM to adjust based on user context (flooding, heat, asthma, etc.). Args: resource_type: Type of resource to find origin_lat: Origin latitude origin_lon: Origin longitude prefer_safe: If True and climate data available, prefer climate-safe routes climate_params: ClimateWeightParams for runtime weight calculation """ if not self._loaded: return {"error": "Engine not loaded"}, None if self.resources_df is None: return {"error": "Resources not loaded"}, None candidates = self.resources_df[self.resources_df["type"] == resource_type] if len(candidates) == 0: return {"error": f"No resources of type '{resource_type}' found"}, None try: origin_node = self._get_nearest_node(origin_lat, origin_lon) except Exception as e: return {"error": f"Could not find origin: {e}"}, None # Find the nearest resource by ROUTED distance (not crow-flies) # Use simple length-based routing to find which resource is nearest best_resource = None best_distance = float("inf") for _, row in candidates.iterrows(): try: dest_node = self._get_nearest_node(row["lat"], row["lon"]) # Use actual path length to determine nearest route_length = nx.shortest_path_length( self.G, origin_node, dest_node, weight="length" ) if route_length < best_distance: best_distance = route_length best_resource = row.to_dict() except (nx.NetworkXNoPath, Exception): continue if best_resource is None: return {"error": f"Could not find route to any {resource_type}"}, None # Now compute route alternatives to the nearest resource # This provides shortest, flattest, and safest (climate-aware) routes routing_result = self.compute_routes( origin_lat=origin_lat, origin_lon=origin_lon, dest_lat=best_resource["lat"], dest_lon=best_resource["lon"], dest_name=best_resource["name"], climate_params=climate_params ) if not routing_result.success: return {"error": routing_result.error or "Could not compute routes"}, None # Get the recommended route (safest when climate data available) recommended_route = next( (r for r in routing_result.alternatives if r.name == routing_result.recommended), routing_result.alternatives[0] if routing_result.alternatives else None ) if not recommended_route: return {"error": "No route alternatives computed"}, None # Build map data with all route alternatives map_data = { "routes": [ { "coords": alt.coords, "color": alt.color, "label": alt.label, "name": alt.name, } for alt in routing_result.alternatives ], "origin": [origin_lat, origin_lon], "destination": [best_resource["lat"], best_resource["lon"]], "dest_name": best_resource["name"], "recommended": routing_result.recommended, } # Build result with resource info and route alternatives result = { "found": True, "name": best_resource["name"], "type": best_resource["type"], "category": best_resource.get("category", ""), "lat": best_resource["lat"], "lon": best_resource["lon"], "origin": {"lat": origin_lat, "lon": origin_lon}, # Include all route alternatives "alternatives": [alt.to_dict() for alt in routing_result.alternatives], "recommended": routing_result.recommended, # Recommended route metrics for backward compatibility "distance_meters": round(recommended_route.distance_m, 1), "walking_time_minutes": round(recommended_route.time_min, 1), "route_metrics": recommended_route.metrics.to_dict() if recommended_route.metrics else {}, "climate_metrics": recommended_route.climate_metrics.to_dict() if recommended_route.climate_metrics else {}, "climate_aware": prefer_safe and self.has_climate_data, } return result, map_data def list_resources(self, resource_type: str = "", category: str = "") -> dict[str, Any]: """List available resources with optional filtering.""" if self.resources_df is None: return {"error": "Resources not loaded"} df = self.resources_df.copy() if category: df = df[df["category"] == category] if resource_type: df = df[df["type"] == resource_type] summary = df.groupby("type").agg({"name": "count", "category": "first"}).rename( columns={"name": "count"} ).to_dict("index") return { "total_count": len(df), "by_type": summary, "resources": df[["name", "type", "category", "lat", "lon"]].to_dict("records")[:20] } # ------------------------------------------------------------------------- # Isochrone Generation (reachable area within X minutes) # ------------------------------------------------------------------------- def generate_isochrone( self, origin_lat: float, origin_lon: float, time_limits: list[int] = None, resource_types: list[str] = None ) -> IsochroneResult: """ Generate isochrones showing areas reachable within given time limits. Uses igraph SSSP if available for better performance, otherwise NetworkX. Args: origin_lat: Origin latitude origin_lon: Origin longitude time_limits: List of time limits in minutes (default: [5, 10, 15]) resource_types: Optional filter for resources to include Returns: IsochroneResult with polygon boundaries and resources within reach """ if not self._loaded: return IsochroneResult(success=False, error="Engine not loaded") if time_limits is None: time_limits = [5, 10, 15] time_limits = sorted(time_limits) max_time = max(time_limits) max_distance = max_time * WALK_SPEED_M_PER_MIN try: origin_node = self._get_nearest_node(origin_lat, origin_lon) except Exception as e: return IsochroneResult(success=False, error=f"Could not locate origin: {e}") # Compute distances from origin to all reachable nodes if self.use_igraph and self.ig_graph is not None: # Use igraph SSSP (faster for large graphs) node_distances = self._compute_sssp_igraph(origin_node, max_distance) else: # Use NetworkX Dijkstra node_distances = self._compute_sssp_networkx(origin_node, max_distance) # Build isochrone polygons for each time limit isochrones = [] for time_min in time_limits: distance_limit = time_min * WALK_SPEED_M_PER_MIN reachable_nodes = [n for n, d in node_distances.items() if d <= distance_limit] if not reachable_nodes: continue # Get convex hull of reachable nodes polygon_coords = self._nodes_to_polygon(reachable_nodes) isochrones.append({ "time_min": time_min, "polygon_coords": polygon_coords, "color": ISOCHRONE_COLORS.get(time_min, "#6366f1"), "node_count": len(reachable_nodes), }) # Find resources within the max isochrone resources_within = [] if self.resources_df is not None: max_distance_nodes = {n for n, d in node_distances.items() if d <= max_distance} for _, row in self.resources_df.iterrows(): if resource_types and row["type"] not in resource_types: continue try: resource_node = self._get_nearest_node(row["lat"], row["lon"]) if resource_node in max_distance_nodes: dist = node_distances.get(resource_node, float("inf")) time_to_reach = dist / WALK_SPEED_M_PER_MIN resources_within.append({ "name": row["name"], "type": row["type"], "category": row.get("category", ""), "lat": row["lat"], "lon": row["lon"], "distance_meters": round(dist, 1), "walking_time_minutes": round(time_to_reach, 1), }) except Exception: continue # Sort by distance resources_within.sort(key=lambda x: x["distance_meters"]) return IsochroneResult( success=True, origin=(origin_lat, origin_lon), isochrones=isochrones, resources_within=resources_within[:20], # Limit for display ) def _compute_sssp_igraph(self, origin_node: int, max_distance: float) -> dict[int, float]: """Compute single-source shortest paths using igraph.""" ig_origin = self.ig_node_map.get(origin_node) if ig_origin is None: return {} # Run Dijkstra from origin using igraph's shortest_paths all_distances = self.ig_graph.distances(source=ig_origin, weights="weight", mode="out")[0] # Convert back to NetworkX node IDs distances = {} for ig_node, dist in enumerate(all_distances): if dist != float("inf") and dist <= max_distance: nx_node = self.ig_reverse_map.get(ig_node) if nx_node is not None: distances[nx_node] = dist return distances def _compute_sssp_networkx(self, origin_node: int, max_distance: float) -> dict[int, float]: """Compute single-source shortest paths using NetworkX.""" try: # Use cutoff for efficiency lengths = nx.single_source_dijkstra_path_length( self.G, origin_node, cutoff=max_distance, weight="length" ) return dict(lengths) except Exception: return {} def _nodes_to_polygon(self, nodes: list[int]) -> list[tuple[float, float]]: """Convert a set of nodes to a convex hull polygon in lat/lon.""" if len(nodes) < 3: return [] # Get coordinates coords = [] for node in nodes: x = self.G.nodes[node].get("x", 0) y = self.G.nodes[node].get("y", 0) coords.append([x, y]) coords = np.array(coords) # Compute convex hull try: from scipy.spatial import ConvexHull hull = ConvexHull(coords) hull_points = coords[hull.vertices] # Convert to lat/lon if "crs" in self.G.graph and self.G.graph["crs"] != "EPSG:4326": import pyproj transformer = pyproj.Transformer.from_crs( self.G.graph["crs"], "EPSG:4326", always_xy=True ) result = [] for x, y in hull_points: lon, lat = transformer.transform(x, y) result.append((lat, lon)) # Close the polygon result.append(result[0]) return result else: result = [(y, x) for x, y in hull_points] result.append(result[0]) return result except Exception: return [] # ------------------------------------------------------------------------- # Find Along Route (POI discovery along a route corridor) # ------------------------------------------------------------------------- def find_along_route( self, origin_lat: float, origin_lon: float, dest_lat: float, dest_lon: float, buffer_meters: float = 100, resource_types: list[str] = None ) -> AlongRouteResult: """ Find resources/POIs along a route corridor. Args: origin_lat, origin_lon: Route start dest_lat, dest_lon: Route end buffer_meters: Width of corridor to search (default 100m) resource_types: Optional filter for resource types Returns: AlongRouteResult with POIs found along the route """ if not self._loaded: return AlongRouteResult(success=False, error="Engine not loaded") if self.resources_df is None: return AlongRouteResult(success=False, error="Resources not loaded") # Compute the route first routing_result = self.compute_routes(origin_lat, origin_lon, dest_lat, dest_lon) if not routing_result.success: return AlongRouteResult(success=False, error=routing_result.error) # Get the recommended route recommended = next( (r for r in routing_result.alternatives if r.name == routing_result.recommended), routing_result.alternatives[0] ) route_coords = recommended.coords if not route_coords: return AlongRouteResult(success=False, error="No route coordinates") # Convert route to projected coordinates for distance calculations if "crs" in self.G.graph and self.G.graph["crs"] != "EPSG:4326": import pyproj transformer = pyproj.Transformer.from_crs( "EPSG:4326", self.G.graph["crs"], always_xy=True ) projected_route = [] for lat, lon in route_coords: x, y = transformer.transform(lon, lat) projected_route.append([x, y]) projected_route = np.array(projected_route) else: projected_route = np.array([[lon, lat] for lat, lon in route_coords]) # Find resources within buffer distance of route pois_found = [] for _, row in self.resources_df.iterrows(): if resource_types and row["type"] not in resource_types: continue # Get resource in projected coordinates if "crs" in self.G.graph and self.G.graph["crs"] != "EPSG:4326": x, y = transformer.transform(row["lon"], row["lat"]) point = np.array([x, y]) else: point = np.array([row["lon"], row["lat"]]) # Calculate minimum distance to route min_dist = self._point_to_polyline_distance(point, projected_route) if min_dist <= buffer_meters: # Find which segment of the route it's nearest to (for ordering) segment_idx = self._nearest_segment_index(point, projected_route) pois_found.append({ "name": row["name"], "type": row["type"], "category": row.get("category", ""), "lat": row["lat"], "lon": row["lon"], "distance_from_route_m": round(min_dist, 1), "route_segment": segment_idx, }) # Sort by route segment (so POIs appear in order along the route) pois_found.sort(key=lambda x: x["route_segment"]) return AlongRouteResult( success=True, route_coords=route_coords, pois_found=pois_found, origin=(origin_lat, origin_lon), destination=(dest_lat, dest_lon), buffer_meters=buffer_meters, climate_metrics=recommended.climate_metrics, ) def _point_to_polyline_distance(self, point: np.ndarray, polyline: np.ndarray) -> float: """Calculate minimum distance from a point to a polyline.""" min_dist = float("inf") for i in range(len(polyline) - 1): seg_start = polyline[i] seg_end = polyline[i + 1] # Vector from start to end seg_vec = seg_end - seg_start seg_len_sq = np.dot(seg_vec, seg_vec) if seg_len_sq == 0: # Segment is a point dist = np.linalg.norm(point - seg_start) else: # Project point onto segment t = max(0, min(1, np.dot(point - seg_start, seg_vec) / seg_len_sq)) projection = seg_start + t * seg_vec dist = np.linalg.norm(point - projection) min_dist = min(min_dist, dist) return min_dist def _nearest_segment_index(self, point: np.ndarray, polyline: np.ndarray) -> int: """Find which segment of the polyline is nearest to the point.""" min_dist = float("inf") nearest_idx = 0 for i in range(len(polyline) - 1): seg_start = polyline[i] seg_end = polyline[i + 1] seg_vec = seg_end - seg_start seg_len_sq = np.dot(seg_vec, seg_vec) if seg_len_sq == 0: dist = np.linalg.norm(point - seg_start) else: t = max(0, min(1, np.dot(point - seg_start, seg_vec) / seg_len_sq)) projection = seg_start + t * seg_vec dist = np.linalg.norm(point - projection) if dist < min_dist: min_dist = dist nearest_idx = i return nearest_idx # ------------------------------------------------------------------------- # Geocoding # ------------------------------------------------------------------------- def geocode_query(self, query: str) -> tuple[str, dict]: """Resolve place names in query to coordinates.""" geocode_info = {} modified = query query_lower = query.lower() # Sort by name length for greedy matching sorted_places = sorted(self.known_places.items(), key=lambda x: -len(x[0])) used_spans = [] for name_lower, info in sorted_places: pattern = r"\b" + re.escape(name_lower) + r"\b" for match in re.finditer(pattern, query_lower): start, end = match.span() overlaps = any(not (end <= us or start >= ue) for us, ue in used_spans) if not overlaps: geocode_info[info["name"]] = { "lat": info["lat"], "lon": info["lon"], "name": info["name"] } original_text = query[start:end] modified = re.compile(re.escape(original_text), re.IGNORECASE).sub( f"(lat {info['lat']:.6f}, lon {info['lon']:.6f})", modified, count=1 ) used_spans.append((start, end)) return modified, geocode_info # ============================================================================= # Tool Executor (bridges LLM output to engine) # ============================================================================= def _safe_str(val, default: str = "") -> str: if val is None: return default if isinstance(val, list): return str(val[0]) if val else default return str(val) def _safe_float(val, default: float) -> float: if val is None: return default if isinstance(val, list): val = val[0] if val else default try: return float(val) except (ValueError, TypeError): return default def _parse_climate_params(args: dict) -> ClimateWeightParams: """Extract climate weight parameters from tool args. The LLM can pass these parameters based on user context: - flood_penalty_deep: 5.0 default, 10.0 for active flooding - flood_penalty_shallow: 2.0 default, 4.0 for flooding conditions - heat_factor: 0.3 default, 0.5 for hot days - shade_factor: 0.3 default, 0.5 for shade-seeking - aqi_factor: 0.1 default, 0.5 for respiratory concerns - grade_factor: 0.2 default, 0.5 for elderly/mobility-impaired users """ return ClimateWeightParams( flood_penalty_deep=_safe_float(args.get("flood_penalty_deep"), 5.0), flood_penalty_shallow=_safe_float(args.get("flood_penalty_shallow"), 2.0), heat_factor=_safe_float(args.get("heat_factor"), 0.3), shade_factor=_safe_float(args.get("shade_factor"), 0.3), aqi_factor=_safe_float(args.get("aqi_factor"), 0.1), grade_factor=_safe_float(args.get("grade_factor"), 0.2), ) def execute_tool( tool_name: str, args: dict, engine: RoutingEngine ) -> tuple[dict[str, Any], dict | None]: """Execute a tool by name using the routing engine. Climate weight parameters can be passed in args and will be used for runtime weight calculation when routing. """ # Parse climate parameters from args (LLM can set these based on context) climate_params = _parse_climate_params(args) if tool_name == "list_resources": result = engine.list_resources( resource_type=_safe_str(args.get("resource_type"), ""), category=_safe_str(args.get("category"), "") ) return result, None elif tool_name == "find_nearest": lat = args.get("lat") or args.get("origin_lat") lon = args.get("lon") or args.get("origin_lon") return engine.find_nearest_resource( resource_type=_safe_str(args.get("resource_type"), ""), origin_lat=_safe_float(lat, BROWNSVILLE_CENTER["lat"]), origin_lon=_safe_float(lon, BROWNSVILLE_CENTER["lon"]), climate_params=climate_params ) elif tool_name == "calculate_route": # Check for routing_mode parameter routing_mode = _safe_str(args.get("routing_mode"), "safe") result = engine.compute_routes( origin_lat=_safe_float(args.get("start_lat") or args.get("origin_lat"), BROWNSVILLE_CENTER["lat"]), origin_lon=_safe_float(args.get("start_lon") or args.get("origin_lon"), BROWNSVILLE_CENTER["lon"]), dest_lat=_safe_float(args.get("end_lat") or args.get("dest_lat"), BROWNSVILLE_CENTER["lat"]), dest_lon=_safe_float(args.get("end_lon") or args.get("dest_lon"), BROWNSVILLE_CENTER["lon"]), dest_name=_safe_str(args.get("dest_name"), "Destination"), climate_params=climate_params ) return result.to_dict(), result.to_map_data() elif tool_name == "generate_isochrone": # Parse time limits - handle various formats like "10", "10 minutes", "5, 10, 15" time_limits = args.get("time_limits", [5, 10, 15]) if isinstance(time_limits, str): # Split by comma and extract numeric values parsed = [] for x in time_limits.split(","): # Extract just the numeric part (handles "10 minutes", "15 min", etc.) import re match = re.search(r'(\d+)', x.strip()) if match: parsed.append(int(match.group(1))) time_limits = parsed if parsed else [5, 10, 15] elif isinstance(time_limits, (int, float)): time_limits = [int(time_limits)] elif isinstance(time_limits, list): # Ensure all elements are integers (LLM may return strings like ["5", "10"]) time_limits = [int(x) if isinstance(x, (int, float, str)) and str(x).isdigit() else 10 for x in time_limits] if not time_limits: time_limits = [5, 10, 15] # Parse resource types filter resource_types = args.get("resource_types") if isinstance(resource_types, str): resource_types = [x.strip() for x in resource_types.split(",")] lat = args.get("lat") or args.get("origin_lat") lon = args.get("lon") or args.get("origin_lon") result = engine.generate_isochrone( origin_lat=_safe_float(lat, BROWNSVILLE_CENTER["lat"]), origin_lon=_safe_float(lon, BROWNSVILLE_CENTER["lon"]), time_limits=time_limits, resource_types=resource_types ) return result.to_dict(), result.to_map_data() elif tool_name == "find_along_route": # Parse resource types filter resource_types = args.get("resource_types") if isinstance(resource_types, str): resource_types = [x.strip() for x in resource_types.split(",")] result = engine.find_along_route( origin_lat=_safe_float(args.get("start_lat") or args.get("origin_lat"), BROWNSVILLE_CENTER["lat"]), origin_lon=_safe_float(args.get("start_lon") or args.get("origin_lon"), BROWNSVILLE_CENTER["lon"]), dest_lat=_safe_float(args.get("end_lat") or args.get("dest_lat"), BROWNSVILLE_CENTER["lat"]), dest_lon=_safe_float(args.get("end_lon") or args.get("dest_lon"), BROWNSVILLE_CENTER["lon"]), buffer_meters=_safe_float(args.get("buffer_meters"), 100), resource_types=resource_types ) return result.to_dict(), result.to_map_data() elif tool_name == "compare_routes": # Compare fastest vs safest (climate-aware) routes origin_lat = _safe_float(args.get("start_lat") or args.get("origin_lat"), BROWNSVILLE_CENTER["lat"]) origin_lon = _safe_float(args.get("start_lon") or args.get("origin_lon"), BROWNSVILLE_CENTER["lon"]) dest_lat = _safe_float(args.get("end_lat") or args.get("dest_lat"), BROWNSVILLE_CENTER["lat"]) dest_lon = _safe_float(args.get("end_lon") or args.get("dest_lon"), BROWNSVILLE_CENTER["lon"]) result = engine.compare_routes( origin_coords=(origin_lat, origin_lon), dest_coords=(dest_lat, dest_lon), dest_name=_safe_str(args.get("dest_name"), "Destination"), climate_params=climate_params ) return result.to_dict(), result.to_map_data() elif tool_name == "climate_route": # Single route with mode selection and climate parameters origin_lat = _safe_float(args.get("start_lat") or args.get("origin_lat"), BROWNSVILLE_CENTER["lat"]) origin_lon = _safe_float(args.get("start_lon") or args.get("origin_lon"), BROWNSVILLE_CENTER["lon"]) dest_lat = _safe_float(args.get("end_lat") or args.get("dest_lat"), BROWNSVILLE_CENTER["lat"]) dest_lon = _safe_float(args.get("end_lon") or args.get("dest_lon"), BROWNSVILLE_CENTER["lon"]) mode = _safe_str(args.get("mode") or args.get("routing_mode"), "safest") result = engine.route( origin_coords=(origin_lat, origin_lon), dest_coords=(dest_lat, dest_lon), mode=mode, climate_params=climate_params ) if result is None: return {"error": "No path found"}, None map_data = { "routes": [{ "coords": result.coords, "color": result.color, "label": result.label, "name": result.name, }], "origin": [origin_lat, origin_lon], "destination": [dest_lat, dest_lon], } return result.to_dict(), map_data else: return {"error": f"Unknown tool: {tool_name}"}, None