| """ |
| Routing engine for the Emergency Routing Assistant. |
| |
| This module contains all routing logic, data loading, and processing. |
| The frontend (app.py) should only handle presentation. |
| |
| Features inspired by dream-meridian: |
| - igraph backend for high-performance routing (optional) |
| - Isochrone generation (reachable area within X minutes) |
| - Find along route (discover POIs along a computed route) |
| """ |
|
|
| import os |
| import re |
| import json |
| import networkx as nx |
| import pandas as pd |
| import geopandas as gpd |
| import osmnx as ox |
| import requests |
| from typing import Any |
| from dataclasses import dataclass, field |
| from scipy.spatial import cKDTree |
| import numpy as np |
|
|
| |
| try: |
| import igraph as ig |
| HAS_IGRAPH = True |
| except ImportError: |
| HAS_IGRAPH = False |
|
|
| |
| |
| |
|
|
| WALK_SPEED_M_PER_MIN = 75 |
| ELEVATION_PENALTY_FACTOR = 3.0 |
|
|
| BROWNSVILLE_CENTER = {"lat": 40.6594, "lon": -73.9126} |
| BROWNSVILLE_BOUNDS = { |
| "min_lat": 40.64, |
| "max_lat": 40.68, |
| "min_lon": -73.93, |
| "max_lon": -73.89 |
| } |
|
|
| VALID_RESOURCE_TYPES = [ |
| "pharmacy", "clinic", "hospital", "fire_station", "police", |
| "school", "library", "community_centre", "place_of_worship" |
| ] |
|
|
| |
| ROUTE_COLORS = { |
| "shortest": "#3b82f6", |
| "flattest": "#22c55e", |
| "balanced": "#f59e0b", |
| "safest": "#10b981", |
| } |
|
|
| |
| ISOCHRONE_COLORS = { |
| 5: "#22c55e", |
| 10: "#84cc16", |
| 15: "#f59e0b", |
| 20: "#ef4444", |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| POI_MARKER_STYLES = { |
| |
| "hospital": { |
| "color": "#dc2626", |
| "fill_color": "#fecaca", |
| "icon": "fa-hospital", |
| "icon_prefix": "fa", |
| "radius": 8, |
| "category": "Emergency Service", |
| }, |
| "clinic": { |
| "color": "#ef4444", |
| "fill_color": "#fee2e2", |
| "icon": "fa-stethoscope", |
| "icon_prefix": "fa", |
| "radius": 7, |
| "category": "Emergency Service", |
| }, |
| "fire_station": { |
| "color": "#ea580c", |
| "fill_color": "#ffedd5", |
| "icon": "fa-fire-extinguisher", |
| "icon_prefix": "fa", |
| "radius": 8, |
| "category": "Emergency Service", |
| }, |
| "police": { |
| "color": "#1d4ed8", |
| "fill_color": "#dbeafe", |
| "icon": "fa-shield", |
| "icon_prefix": "fa", |
| "radius": 8, |
| "category": "Emergency Service", |
| }, |
|
|
| |
| "pharmacy": { |
| "color": "#16a34a", |
| "fill_color": "#dcfce7", |
| "icon": "fa-medkit", |
| "icon_prefix": "fa", |
| "radius": 6, |
| "category": "Healthcare", |
| }, |
| "doctors": { |
| "color": "#22c55e", |
| "fill_color": "#bbf7d0", |
| "icon": "fa-user-md", |
| "icon_prefix": "fa", |
| "radius": 6, |
| "category": "Healthcare", |
| }, |
|
|
| |
| "school": { |
| "color": "#7c3aed", |
| "fill_color": "#ede9fe", |
| "icon": "fa-graduation-cap", |
| "icon_prefix": "fa", |
| "radius": 7, |
| "category": "Community Resource", |
| }, |
| "library": { |
| "color": "#8b5cf6", |
| "fill_color": "#f3e8ff", |
| "icon": "fa-book", |
| "icon_prefix": "fa", |
| "radius": 6, |
| "category": "Community Resource", |
| }, |
| "community_centre": { |
| "color": "#6366f1", |
| "fill_color": "#e0e7ff", |
| "icon": "fa-users", |
| "icon_prefix": "fa", |
| "radius": 7, |
| "category": "Community Resource", |
| }, |
| "place_of_worship": { |
| "color": "#a855f7", |
| "fill_color": "#f3e8ff", |
| "icon": "fa-church", |
| "icon_prefix": "fa", |
| "radius": 6, |
| "category": "Community Resource", |
| }, |
| "youth_center": { |
| "color": "#ec4899", |
| "fill_color": "#fce7f3", |
| "icon": "fa-child", |
| "icon_prefix": "fa", |
| "radius": 6, |
| "category": "Community Resource", |
| }, |
| "senior_center": { |
| "color": "#f97316", |
| "fill_color": "#ffedd5", |
| "icon": "fa-heart", |
| "icon_prefix": "fa", |
| "radius": 6, |
| "category": "Community Resource", |
| }, |
| "childcare": { |
| "color": "#f472b6", |
| "fill_color": "#fbcfe8", |
| "icon": "fa-child", |
| "icon_prefix": "fa", |
| "radius": 5, |
| "category": "Community Resource", |
| }, |
| "nycha_community_center": { |
| "color": "#0ea5e9", |
| "fill_color": "#e0f2fe", |
| "icon": "fa-building", |
| "icon_prefix": "fa", |
| "radius": 7, |
| "category": "Community Resource", |
| }, |
|
|
| |
| "park": { |
| "color": "#16a34a", |
| "fill_color": "#bbf7d0", |
| "icon": "fa-tree", |
| "icon_prefix": "fa", |
| "radius": 6, |
| "category": "Climate Infrastructure", |
| }, |
| "shelter": { |
| "color": "#0284c7", |
| "fill_color": "#bae6fd", |
| "icon": "fa-home", |
| "icon_prefix": "fa", |
| "radius": 8, |
| "category": "Climate Infrastructure", |
| }, |
| "drinking_water": { |
| "color": "#0891b2", |
| "fill_color": "#cffafe", |
| "icon": "fa-tint", |
| "icon_prefix": "fa", |
| "radius": 5, |
| "category": "Climate Infrastructure", |
| }, |
|
|
| |
| "bodega": { |
| "color": "#f59e0b", |
| "fill_color": "#fef3c7", |
| "icon": "fa-shopping-basket", |
| "icon_prefix": "fa", |
| "radius": 5, |
| "category": "Local Business", |
| }, |
| "supermarket": { |
| "color": "#d97706", |
| "fill_color": "#fef3c7", |
| "icon": "fa-shopping-cart", |
| "icon_prefix": "fa", |
| "radius": 6, |
| "category": "Local Business", |
| }, |
| "fast_food": { |
| "color": "#fbbf24", |
| "fill_color": "#fef9c3", |
| "icon": "fa-cutlery", |
| "icon_prefix": "fa", |
| "radius": 5, |
| "category": "Local Business", |
| }, |
| "cafe": { |
| "color": "#92400e", |
| "fill_color": "#fef3c7", |
| "icon": "fa-coffee", |
| "icon_prefix": "fa", |
| "radius": 5, |
| "category": "Local Business", |
| }, |
| "bank": { |
| "color": "#475569", |
| "fill_color": "#e2e8f0", |
| "icon": "fa-university", |
| "icon_prefix": "fa", |
| "radius": 5, |
| "category": "Local Business", |
| }, |
|
|
| |
| "social_facility": { |
| "color": "#0d9488", |
| "fill_color": "#ccfbf1", |
| "icon": "fa-handshake-o", |
| "icon_prefix": "fa", |
| "radius": 6, |
| "category": "Social Services", |
| }, |
| "snap_center": { |
| "color": "#14b8a6", |
| "fill_color": "#ccfbf1", |
| "icon": "fa-id-card", |
| "icon_prefix": "fa", |
| "radius": 6, |
| "category": "Social Services", |
| }, |
|
|
| |
| "facility": { |
| "color": "#6b7280", |
| "fill_color": "#e5e7eb", |
| "icon": "fa-building-o", |
| "icon_prefix": "fa", |
| "radius": 5, |
| "category": "Other", |
| }, |
| } |
|
|
| |
| POI_DEFAULT_STYLE = { |
| "color": "#6b7280", |
| "fill_color": "#e5e7eb", |
| "icon": "fa-map-marker", |
| "icon_prefix": "fa", |
| "radius": 5, |
| "category": "Other", |
| } |
|
|
|
|
| def get_poi_marker_style(poi_type: str) -> dict: |
| """ |
| Get marker style configuration for a given POI type. |
| |
| Args: |
| poi_type: The type of POI (e.g., "hospital", "pharmacy", "school") |
| |
| Returns: |
| Dictionary with marker style properties: |
| - color: Border/stroke color (hex) |
| - fill_color: Fill color (hex) |
| - icon: Font Awesome or Glyphicon name |
| - icon_prefix: Icon library prefix ("fa" or "glyphicon") |
| - radius: Circle marker radius in pixels |
| - category: POI category for grouping |
| |
| Example: |
| >>> style = get_poi_marker_style("hospital") |
| >>> style["color"] |
| '#dc2626' |
| >>> style["icon"] |
| 'fa-hospital' |
| """ |
| return POI_MARKER_STYLES.get(poi_type, POI_DEFAULT_STYLE).copy() |
|
|
|
|
| def get_all_poi_styles() -> dict: |
| """ |
| Get all POI marker style configurations. |
| |
| Returns: |
| Dictionary mapping POI type names to their style configurations. |
| Useful for UI developers to enumerate all available styles. |
| |
| Example: |
| >>> styles = get_all_poi_styles() |
| >>> list(styles.keys()) |
| ['hospital', 'clinic', 'fire_station', ...] |
| """ |
| return POI_MARKER_STYLES.copy() |
|
|
|
|
| def get_poi_styles_by_category() -> dict[str, list[str]]: |
| """ |
| Get POI types grouped by category. |
| |
| Returns: |
| Dictionary mapping category names to lists of POI types. |
| |
| Example: |
| >>> by_cat = get_poi_styles_by_category() |
| >>> by_cat["Emergency Service"] |
| ['hospital', 'clinic', 'fire_station', 'police'] |
| """ |
| categories: dict[str, list[str]] = {} |
| for poi_type, style in POI_MARKER_STYLES.items(): |
| cat = style.get("category", "Other") |
| if cat not in categories: |
| categories[cat] = [] |
| categories[cat].append(poi_type) |
| return categories |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class RouteMetrics: |
| elevation_gain_m: float = 0 |
| elevation_loss_m: float = 0 |
| max_elevation_m: float = 0 |
| min_elevation_m: float = 0 |
| avg_grade_pct: float = 0 |
| max_grade_pct: float = 0 |
| difficulty: str = "flat" |
|
|
| def to_dict(self) -> dict: |
| return { |
| "elevation_gain_m": self.elevation_gain_m, |
| "elevation_loss_m": self.elevation_loss_m, |
| "max_elevation_m": self.max_elevation_m, |
| "min_elevation_m": self.min_elevation_m, |
| "avg_grade_pct": self.avg_grade_pct, |
| "max_grade_pct": self.max_grade_pct, |
| "difficulty": self.difficulty, |
| } |
|
|
|
|
| @dataclass |
| class ClimateMetrics: |
| """Climate risk metrics for a route. |
| |
| Climate weights are now computed at RUNTIME using configurable parameters, |
| allowing the LLM to adjust weights based on user context (e.g., flooding, |
| heat wave, air quality concerns). |
| """ |
| avg_climate_risk: float = 0 |
| max_climate_risk: float = 0 |
| avg_flood_risk: float = 0 |
| max_flood_risk: float = 0 |
| avg_heat_risk: float = 0 |
| max_heat_risk: float = 0 |
| avg_air_quality_risk: float = 0 |
| max_air_quality_risk: float = 0 |
| avg_tree_coverage: float = 0 |
| flood_exposure_m: float = 0 |
|
|
| def to_dict(self) -> dict: |
| return { |
| "avg_climate_risk": self.avg_climate_risk, |
| "max_climate_risk": self.max_climate_risk, |
| "avg_flood_risk": self.avg_flood_risk, |
| "max_flood_risk": self.max_flood_risk, |
| "avg_heat_risk": self.avg_heat_risk, |
| "max_heat_risk": self.max_heat_risk, |
| "avg_air_quality_risk": self.avg_air_quality_risk, |
| "max_air_quality_risk": self.max_air_quality_risk, |
| "avg_tree_coverage": self.avg_tree_coverage, |
| "flood_exposure_m": self.flood_exposure_m, |
| } |
|
|
|
|
| @dataclass |
| class ClimateWeightParams: |
| """Parameters for runtime climate weight calculation. |
| |
| These parameters are inferred by the LLM based on user context: |
| - Flooding mentioned: increase flood penalties |
| - Hot day / shade requested: increase heat_factor and shade_factor |
| - Respiratory concerns: increase aqi_factor |
| - Mobility concerns / elderly: increase grade_factor |
| """ |
| flood_penalty_deep: float = 5.0 |
| flood_penalty_shallow: float = 2.0 |
| heat_factor: float = 0.3 |
| shade_factor: float = 0.3 |
| aqi_factor: float = 0.1 |
| grade_factor: float = 0.2 |
|
|
| def to_dict(self) -> dict: |
| return { |
| "flood_penalty_deep": self.flood_penalty_deep, |
| "flood_penalty_shallow": self.flood_penalty_shallow, |
| "heat_factor": self.heat_factor, |
| "shade_factor": self.shade_factor, |
| "aqi_factor": self.aqi_factor, |
| "grade_factor": self.grade_factor, |
| } |
|
|
|
|
| @dataclass |
| class RouteOption: |
| name: str |
| label: str |
| color: str |
| coords: list[tuple[float, float]] |
| distance_m: float |
| time_min: float |
| metrics: RouteMetrics |
| route_nodes: list[int] = field(default_factory=list) |
| climate_metrics: ClimateMetrics = field(default_factory=ClimateMetrics) |
|
|
| def to_dict(self) -> dict: |
| result = { |
| "name": self.name, |
| "label": self.label, |
| "color": self.color, |
| "coords": self.coords, |
| "distance_meters": self.distance_m, |
| "walking_time_minutes": self.time_min, |
| "route_metrics": self.metrics.to_dict(), |
| } |
| |
| if self.climate_metrics.avg_climate_risk > 0: |
| result["climate_metrics"] = self.climate_metrics.to_dict() |
| return result |
|
|
|
|
| @dataclass |
| class RoutingResult: |
| success: bool |
| recommended: str = "" |
| alternatives: list[RouteOption] = field(default_factory=list) |
| origin: tuple[float, float] = (0, 0) |
| destination: tuple[float, float] = (0, 0) |
| dest_name: str = "" |
| error: str = "" |
|
|
| def to_dict(self) -> dict: |
| if not self.success: |
| return {"error": self.error} |
|
|
| recommended_route = next( |
| (r for r in self.alternatives if r.name == self.recommended), |
| self.alternatives[0] if self.alternatives else None |
| ) |
|
|
| return { |
| "success": True, |
| "recommended": self.recommended, |
| "alternatives": [r.to_dict() for r in self.alternatives], |
| "distance_meters": recommended_route.distance_m if recommended_route else 0, |
| "walking_time_minutes": recommended_route.time_min if recommended_route else 0, |
| "origin": {"lat": self.origin[0], "lon": self.origin[1]}, |
| "destination": {"lat": self.destination[0], "lon": self.destination[1], "name": self.dest_name}, |
| "route_metrics": recommended_route.metrics.to_dict() if recommended_route else {}, |
| } |
|
|
| def to_map_data(self) -> dict | None: |
| if not self.success or not self.alternatives: |
| return None |
|
|
| recommended_route = next( |
| (r for r in self.alternatives if r.name == self.recommended), |
| self.alternatives[0] |
| ) |
|
|
| return { |
| "routes": [ |
| {"coords": r.coords, "color": r.color, "label": r.label, "name": r.name} |
| for r in self.alternatives |
| ], |
| "origin": list(self.origin), |
| "destination": list(self.destination), |
| "dest_name": self.dest_name, |
| "distance": recommended_route.distance_m, |
| "route_coords": recommended_route.coords, |
| } |
|
|
|
|
| @dataclass |
| class IsochroneResult: |
| """Result of isochrone generation - areas reachable within time limits.""" |
| success: bool |
| origin: tuple[float, float] = (0, 0) |
| isochrones: list[dict] = field(default_factory=list) |
| resources_within: list[dict] = field(default_factory=list) |
| error: str = "" |
|
|
| def to_dict(self) -> dict: |
| if not self.success: |
| return {"error": self.error} |
| return { |
| "success": True, |
| "origin": {"lat": self.origin[0], "lon": self.origin[1]}, |
| "isochrones": self.isochrones, |
| "resources_within": self.resources_within, |
| } |
|
|
| def to_map_data(self) -> dict | None: |
| if not self.success: |
| return None |
| return { |
| "origin": list(self.origin), |
| "isochrones": self.isochrones, |
| "resources_within": self.resources_within, |
| } |
|
|
|
|
| @dataclass |
| class AlongRouteResult: |
| """Result of find_along_route - POIs discovered along a route.""" |
| success: bool |
| route_coords: list[tuple[float, float]] = field(default_factory=list) |
| pois_found: list[dict] = field(default_factory=list) |
| origin: tuple[float, float] = (0, 0) |
| destination: tuple[float, float] = (0, 0) |
| buffer_meters: float = 100 |
| climate_metrics: ClimateMetrics | None = None |
| error: str = "" |
|
|
| def to_dict(self) -> dict: |
| if not self.success: |
| return {"error": self.error} |
| result = { |
| "success": True, |
| "origin": {"lat": self.origin[0], "lon": self.origin[1]}, |
| "destination": {"lat": self.destination[0], "lon": self.destination[1]}, |
| "buffer_meters": self.buffer_meters, |
| "pois_found": self.pois_found, |
| "poi_count": len(self.pois_found), |
| } |
| if self.climate_metrics: |
| result["climate_metrics"] = self.climate_metrics.to_dict() |
| return result |
|
|
| def to_map_data(self) -> dict | None: |
| if not self.success: |
| return None |
| return { |
| "route_coords": self.route_coords, |
| "origin": list(self.origin), |
| "destination": list(self.destination), |
| "pois_along_route": self.pois_found, |
| } |
|
|
|
|
| @dataclass |
| class RouteComparisonResult: |
| """Result of comparing shortest vs safest routes.""" |
| success: bool |
| shortest: RouteOption | None = None |
| safest: RouteOption | None = None |
| origin: tuple[float, float] = (0, 0) |
| destination: tuple[float, float] = (0, 0) |
| dest_name: str = "" |
| extra_distance_m: float = 0 |
| extra_distance_pct: float = 0 |
| risk_reduction: float = 0 |
| error: str = "" |
|
|
| def to_dict(self) -> dict: |
| if not self.success: |
| return {"error": self.error} |
| return { |
| "success": True, |
| "shortest": self.shortest.to_dict() if self.shortest else None, |
| "safest": self.safest.to_dict() if self.safest else None, |
| "origin": {"lat": self.origin[0], "lon": self.origin[1]}, |
| "destination": {"lat": self.destination[0], "lon": self.destination[1], "name": self.dest_name}, |
| "comparison": { |
| "extra_distance_m": round(self.extra_distance_m, 1), |
| "extra_distance_pct": round(self.extra_distance_pct, 1), |
| "risk_reduction": round(self.risk_reduction, 3), |
| }, |
| } |
|
|
| def to_map_data(self) -> dict | None: |
| if not self.success: |
| return None |
| routes = [] |
| if self.shortest: |
| routes.append({ |
| "coords": self.shortest.coords, |
| "color": self.shortest.color, |
| "label": self.shortest.label, |
| "name": self.shortest.name, |
| }) |
| if self.safest and self.safest.coords != (self.shortest.coords if self.shortest else []): |
| routes.append({ |
| "coords": self.safest.coords, |
| "color": self.safest.color, |
| "label": self.safest.label, |
| "name": self.safest.name, |
| }) |
| return { |
| "routes": routes, |
| "origin": list(self.origin), |
| "destination": list(self.destination), |
| "dest_name": self.dest_name, |
| } |
|
|
|
|
| |
| |
| |
|
|
| class RoutingEngine: |
| """ |
| Core routing engine - handles all graph operations and route computation. |
| |
| Supports optional igraph backend for high-performance routing. |
| Features: |
| - Multi-route computation (shortest, flattest, balanced) |
| - Isochrone generation (reachable area within X minutes) |
| - Find along route (POIs near a route corridor) |
| """ |
|
|
| def __init__(self, use_igraph: bool = True): |
| self.G: nx.MultiDiGraph | None = None |
| self.resources_df: pd.DataFrame | None = None |
| self.known_places: dict[str, dict] = {} |
| self._loaded = False |
|
|
| |
| self.use_igraph = use_igraph and HAS_IGRAPH |
| self.ig_graph: "ig.Graph | None" = None |
| self.ig_node_map: dict[int, int] = {} |
| self.ig_reverse_map: dict[int, int] = {} |
|
|
| |
| self.ig_length_idx: int = -1 |
| self.ig_climate_weight_idx: int = -1 |
|
|
| |
| self.has_climate_data: bool = False |
|
|
| |
| self._node_coords: np.ndarray | None = None |
| self._node_ids: list[int] = [] |
| self._kdtree: cKDTree | None = None |
| self._resource_kdtree: cKDTree | None = None |
|
|
| def load(self) -> bool: |
| """Load network and resources from disk. |
| |
| Tries to load from pre-built cache first (fast), falls back to GraphML (slow). |
| Run `python build_graph_cache.py` to generate the cache for faster startup. |
| """ |
| if self._loaded: |
| return True |
|
|
| |
| data_dir = os.path.join(os.path.dirname(__file__), "..", "data", "brownsville") |
|
|
| try: |
| |
| cache_path = os.path.join(data_dir, "graph_cache.pkl") |
| if os.path.exists(cache_path): |
| loaded_from_cache = self._load_from_cache(cache_path) |
| if loaded_from_cache: |
| |
| pass |
| else: |
| |
| self._load_from_graphml(data_dir) |
| else: |
| |
| self._load_from_graphml(data_dir) |
|
|
| |
| resources_path = os.path.join(data_dir, "all_resources.csv") |
| if os.path.exists(resources_path): |
| self.resources_df = pd.read_csv(resources_path) |
| else: |
| geojson_path = os.path.join(data_dir, "all_resources.geojson") |
| if os.path.exists(geojson_path): |
| gdf = gpd.read_file(geojson_path) |
| self.resources_df = pd.DataFrame({ |
| "name": gdf["name"], |
| "type": gdf["type"], |
| "category": gdf["category"], |
| "lat": gdf.geometry.y, |
| "lon": gdf.geometry.x |
| }) |
|
|
| |
| if self.resources_df is not None: |
| self._build_resource_index() |
|
|
| |
| self._load_known_places(data_dir) |
|
|
| self._loaded = True |
| return True |
|
|
| except Exception as e: |
| print(f"Error loading data: {e}") |
| return False |
|
|
| def _load_from_cache(self, cache_path: str) -> bool: |
| """Load pre-built graph data from pickle cache (fast startup).""" |
| import pickle |
| try: |
| with open(cache_path, 'rb') as f: |
| cache = pickle.load(f) |
|
|
| |
| if cache.get("version", 1) < 3: |
| print("Cache version outdated, rebuilding from GraphML...") |
| return False |
|
|
| |
| self.G = cache["nx_graph"] |
| self.has_climate_data = cache["has_climate_data"] |
| self._node_ids = cache["node_ids"] |
| self._node_coords = cache["coords"] |
| self._kdtree = cache["kdtree"] |
|
|
| |
| if self.use_igraph and cache.get("ig_graph") is not None: |
| self.ig_graph = cache["ig_graph"] |
| self.ig_node_map = cache["ig_node_map"] |
| self.ig_reverse_map = cache["ig_reverse_map"] |
|
|
| return True |
| except Exception as e: |
| print(f"Failed to load cache: {e}") |
| return False |
|
|
| def _load_from_graphml(self, data_dir: str): |
| """Load graph from GraphML file (slow fallback).""" |
| |
| |
| |
| |
| final_path = os.path.join(data_dir, "walking_network_final.graphml") |
| climate_path = os.path.join(data_dir, "walking_network_climate.graphml") |
| graphml_path = os.path.join(data_dir, "walking_network.graphml") |
|
|
| if os.path.exists(final_path): |
| self.G = ox.load_graphml(final_path) |
| self.has_climate_data = True |
| elif os.path.exists(climate_path): |
| self.G = ox.load_graphml(climate_path) |
| self.has_climate_data = True |
| elif os.path.exists(graphml_path): |
| self.G = ox.load_graphml(graphml_path) |
| self.has_climate_data = False |
| else: |
| from shapely.geometry import box |
| brownsville_bbox = box(-73.93, 40.64, -73.89, 40.68) |
| self.G = ox.graph_from_polygon(brownsville_bbox, network_type='walk', simplify=True) |
| self.has_climate_data = False |
|
|
| |
| if self.has_climate_data: |
| climate_attrs = ['flood_risk', 'heat_risk', 'air_quality_risk', 'climate_risk', 'climate_weight'] |
| for u, v, data in self.G.edges(data=True): |
| for attr in climate_attrs: |
| if attr in data and isinstance(data[attr], str): |
| try: |
| data[attr] = float(data[attr]) |
| except (ValueError, TypeError): |
| data[attr] = 0.0 |
|
|
| |
| if self.has_climate_data: |
| sample_edge = next(iter(self.G.edges(data=True)), None) |
| if sample_edge and "climate_weight" not in sample_edge[2]: |
| self.has_climate_data = False |
|
|
| self.G = ox.project_graph(self.G) |
|
|
| |
| self._build_spatial_index() |
|
|
| |
| if self.use_igraph: |
| self._build_igraph_graph() |
|
|
| def _build_spatial_index(self): |
| """Build KD-tree for fast nearest-node lookups.""" |
| if self.G is None: |
| return |
|
|
| nodes = list(self.G.nodes()) |
| coords = [] |
| for node in nodes: |
| x = self.G.nodes[node].get("x", 0) |
| y = self.G.nodes[node].get("y", 0) |
| coords.append([x, y]) |
|
|
| self._node_ids = nodes |
| self._node_coords = np.array(coords) |
| self._kdtree = cKDTree(self._node_coords) |
|
|
| def _build_resource_index(self): |
| """Build KD-tree for fast resource lookups.""" |
| if self.resources_df is None or len(self.resources_df) == 0: |
| return |
|
|
| |
| coords = [] |
| if "crs" in self.G.graph and self.G.graph["crs"] != "EPSG:4326": |
| import pyproj |
| transformer = pyproj.Transformer.from_crs("EPSG:4326", self.G.graph["crs"], always_xy=True) |
| for _, row in self.resources_df.iterrows(): |
| x, y = transformer.transform(row["lon"], row["lat"]) |
| coords.append([x, y]) |
| else: |
| for _, row in self.resources_df.iterrows(): |
| coords.append([row["lon"], row["lat"]]) |
|
|
| self._resource_kdtree = cKDTree(np.array(coords)) |
|
|
| def _build_igraph_graph(self): |
| """Build igraph graph from NetworkX graph for high-performance routing. |
| |
| Preserves all edge attributes including climate data. |
| Uses ig.Graph.from_networkx() for automatic attribute transfer. |
| """ |
| if not HAS_IGRAPH or self.G is None: |
| return |
|
|
| |
| nx_nodes = list(self.G.nodes()) |
| self.ig_node_map = {nx_node: i for i, nx_node in enumerate(nx_nodes)} |
| self.ig_reverse_map = {i: nx_node for nx_node, i in self.ig_node_map.items()} |
|
|
| |
| self.ig_graph = ig.Graph(n=len(nx_nodes), directed=True) |
|
|
| |
| self.ig_graph.vs["_nx_name"] = nx_nodes |
|
|
| |
| edges = [] |
| lengths = [] |
| climate_weights = [] |
| flood_risks = [] |
| heat_risks = [] |
| climate_risks = [] |
|
|
| for u, v, data in self.G.edges(data=True): |
| ig_u = self.ig_node_map[u] |
| ig_v = self.ig_node_map[v] |
| edges.append((ig_u, ig_v)) |
|
|
| length = data.get("length", 1.0) |
| if length is None: |
| length = 1.0 |
| lengths.append(float(length)) |
|
|
| |
| flood_risks.append(float(data.get("flood_risk", 0) or 0)) |
| heat_risks.append(float(data.get("heat_risk", 0) or 0)) |
| climate_risks.append(float(data.get("climate_risk", 0) or 0)) |
|
|
| |
| cw = data.get("climate_weight") |
| if cw is None: |
| cw = length |
| climate_weights.append(float(cw)) |
|
|
| self.ig_graph.add_edges(edges) |
| self.ig_graph.es["length"] = lengths |
| self.ig_graph.es["weight"] = lengths |
| self.ig_graph.es["climate_weight"] = climate_weights |
| self.ig_graph.es["flood_risk"] = flood_risks |
| self.ig_graph.es["heat_risk"] = heat_risks |
| self.ig_graph.es["climate_risk"] = climate_risks |
|
|
| def _load_known_places(self, data_dir: str): |
| """Load known places for geocoding.""" |
| places_path = os.path.join(data_dir, "places.csv") |
| if os.path.exists(places_path): |
| try: |
| df = pd.read_csv(places_path) |
| for _, row in df.iterrows(): |
| self.known_places[row['name_lower']] = { |
| "lat": row['lat'], |
| "lon": row['lon'], |
| "name": row['name'] |
| } |
| except Exception: |
| pass |
|
|
| @property |
| def is_loaded(self) -> bool: |
| return self._loaded |
|
|
| @property |
| def node_count(self) -> int: |
| return len(self.G.nodes) if self.G else 0 |
|
|
| @property |
| def resource_count(self) -> int: |
| return len(self.resources_df) if self.resources_df is not None else 0 |
|
|
| |
| |
| |
|
|
| def _get_nearest_node(self, lat: float, lon: float) -> int: |
| """Find nearest network node to a point using KD-tree (fast).""" |
| |
| if "crs" in self.G.graph and self.G.graph["crs"] != "EPSG:4326": |
| import pyproj |
| transformer = pyproj.Transformer.from_crs("EPSG:4326", self.G.graph["crs"], always_xy=True) |
| x, y = transformer.transform(lon, lat) |
| else: |
| x, y = lon, lat |
|
|
| |
| if self._kdtree is not None: |
| _, idx = self._kdtree.query([x, y]) |
| return self._node_ids[idx] |
|
|
| |
| return ox.nearest_nodes(self.G, x, y) |
|
|
| def _get_nearest_node_ig(self, lat: float, lon: float) -> int: |
| """Get igraph node ID for a location.""" |
| nx_node = self._get_nearest_node(lat, lon) |
| return self.ig_node_map.get(nx_node, 0) |
|
|
| def _get_route_coords(self, route: list[int]) -> list[tuple[float, float]]: |
| """Extract lat/lon coordinates from route nodes.""" |
| if "crs" in self.G.graph and self.G.graph["crs"] != "EPSG:4326": |
| import pyproj |
| transformer = pyproj.Transformer.from_crs(self.G.graph["crs"], "EPSG:4326", always_xy=True) |
| coords = [] |
| for node in route: |
| x, y = self.G.nodes[node]["x"], self.G.nodes[node]["y"] |
| lon, lat = transformer.transform(x, y) |
| coords.append((lat, lon)) |
| return coords |
| return [(self.G.nodes[node]["y"], self.G.nodes[node]["x"]) for node in route] |
|
|
| def _apply_elevation_weights(self, penalty_factor: float = ELEVATION_PENALTY_FACTOR) -> nx.MultiDiGraph: |
| """Create graph copy with elevation-weighted edges.""" |
| G_weighted = self.G.copy() |
| for u, v, key, data in G_weighted.edges(keys=True, data=True): |
| base_length = data.get("length", 1) |
| elev_u = G_weighted.nodes[u].get("elevation", 0) or 0 |
| elev_v = G_weighted.nodes[v].get("elevation", 0) or 0 |
| elev_diff = elev_v - elev_u |
|
|
| if elev_diff > 0: |
| data["weighted_length"] = base_length + (elev_diff * penalty_factor) |
| else: |
| data["weighted_length"] = base_length |
| return G_weighted |
|
|
| def _compute_edge_weight( |
| self, |
| data: dict, |
| params: ClimateWeightParams = None |
| ) -> float: |
| """ |
| Compute routing weight for an edge using raw risk values and configurable penalties. |
| |
| This is computed at RUNTIME, allowing the LLM to adjust weights based on |
| what the user tells us about their situation (flooding, heat, air quality, mobility). |
| |
| Design: |
| - Flood: categorical penalty (physical danger) |
| - Heat: continuous penalty based on HVI (social vulnerability / exposure proxy) |
| - Tree coverage: REDUCES weight (shade is beneficial) |
| - AQI: small continuous penalty (background air quality) |
| - Grade: continuous penalty for uphill segments (mobility / exertion) |
| |
| Args: |
| data: Edge attribute dict |
| params: ClimateWeightParams with penalty/factor values |
| |
| Returns: |
| float: Weighted length for Dijkstra's algorithm |
| """ |
| if params is None: |
| params = ClimateWeightParams() |
|
|
| length = float(data.get('length', 100)) |
|
|
| |
| flood_risk = float(data.get('flood_risk', 0.1)) |
| if flood_risk >= 1.0: |
| flood_mult = params.flood_penalty_deep |
| elif flood_risk >= 0.6: |
| flood_mult = params.flood_penalty_shallow |
| else: |
| flood_mult = 1.0 |
|
|
| |
| heat_risk = float(data.get('heat_risk', 0.5)) |
| |
| heat_mult = 1.0 + params.heat_factor * heat_risk |
|
|
| |
| tree_coverage = float(data.get('tree_coverage', 0.0)) |
| |
| shade_mult = 1.0 - (params.shade_factor * tree_coverage) |
|
|
| |
| aqi_risk = float(data.get('air_quality_risk', 0.0)) |
| |
| aqi_mult = 1.0 + params.aqi_factor * aqi_risk |
|
|
| |
| |
| |
| grade = abs(float(data.get('grade', 0) or 0)) |
| grade_norm = min(grade * 10, 1.0) |
| |
| grade_mult = 1.0 + params.grade_factor * grade_norm |
|
|
| |
| return length * flood_mult * heat_mult * shade_mult * aqi_mult * grade_mult |
|
|
| def _create_weight_function(self, params: ClimateWeightParams = None): |
| """Create a weight function for NetworkX routing with given climate parameters. |
| |
| Note: For MultiDiGraph, NetworkX passes the edge data as {key: {attrs}} dict, |
| not the {attrs} dict directly. We extract the first edge's attributes. |
| """ |
| if params is None: |
| params = ClimateWeightParams() |
|
|
| def weight_func(u, v, data): |
| |
| if isinstance(data, dict) and data: |
| first_key = next(iter(data)) |
| if isinstance(first_key, int) and isinstance(data[first_key], dict): |
| data = data[first_key] |
| return self._compute_edge_weight(data, params) |
|
|
| return weight_func |
|
|
| def _compute_route_metrics(self, route: list[int]) -> RouteMetrics: |
| """Compute elevation metrics for a route.""" |
| if not route or len(route) < 2: |
| return RouteMetrics() |
|
|
| elevations = [] |
| grades = [] |
| elevation_gain = 0 |
| elevation_loss = 0 |
|
|
| for i, node in enumerate(route): |
| elev = self.G.nodes[node].get("elevation", 0) or 0 |
| elevations.append(elev) |
|
|
| if i > 0: |
| diff = elev - elevations[i-1] |
| if diff > 0: |
| elevation_gain += diff |
| else: |
| elevation_loss += abs(diff) |
|
|
| edge_data = self.G.get_edge_data(route[i-1], node) |
| if edge_data: |
| first_edge = list(edge_data.values())[0] if isinstance(edge_data, dict) else edge_data |
| grade = abs(first_edge.get("grade", 0)) * 100 |
| grades.append(grade) |
|
|
| max_elev = max(elevations) if elevations else 0 |
| min_elev = min(elevations) if elevations else 0 |
| avg_grade = sum(grades) / len(grades) if grades else 0 |
| max_grade = max(grades) if grades else 0 |
|
|
| if elevation_gain < 5 and max_grade < 3: |
| difficulty = "flat" |
| elif elevation_gain < 15 or max_grade < 8: |
| difficulty = "moderate" |
| else: |
| difficulty = "hilly" |
|
|
| return RouteMetrics( |
| elevation_gain_m=round(elevation_gain, 1), |
| elevation_loss_m=round(elevation_loss, 1), |
| max_elevation_m=round(max_elev, 1), |
| min_elevation_m=round(min_elev, 1), |
| avg_grade_pct=round(avg_grade, 1), |
| max_grade_pct=round(max_grade, 1), |
| difficulty=difficulty |
| ) |
|
|
| def _compute_climate_metrics(self, route: list[int]) -> ClimateMetrics: |
| """Compute climate risk metrics for a route. |
| |
| Climate risk is now computed at runtime using a simple weighted formula: |
| climate_risk = 0.5 * flood_risk + 0.3 * heat_risk + 0.2 * air_quality_risk |
| |
| Tree coverage is tracked but used separately (reduces routing weight). |
| """ |
| if not route or len(route) < 2 or not self.has_climate_data: |
| return ClimateMetrics() |
|
|
| flood_risks = [] |
| heat_risks = [] |
| air_quality_risks = [] |
| tree_coverages = [] |
| climate_risks = [] |
| flood_exposure = 0.0 |
|
|
| for i in range(len(route) - 1): |
| u, v = route[i], route[i + 1] |
| edge_data = self.G.get_edge_data(u, v) |
| if not edge_data: |
| continue |
|
|
| |
| if isinstance(edge_data, dict) and 0 in edge_data: |
| data = edge_data[0] |
| else: |
| data = next(iter(edge_data.values())) if isinstance(edge_data, dict) else edge_data |
|
|
| flood = float(data.get("flood_risk", 0) or 0) |
| heat = float(data.get("heat_risk", 0) or 0) |
| air_quality = float(data.get("air_quality_risk", 0) or 0) |
| tree_coverage = float(data.get("tree_coverage", 0) or 0) |
| length = float(data.get("length", 0) or 0) |
|
|
| |
| climate = 0.5 * flood + 0.3 * heat + 0.2 * air_quality |
|
|
| flood_risks.append(flood) |
| heat_risks.append(heat) |
| air_quality_risks.append(air_quality) |
| tree_coverages.append(tree_coverage) |
| climate_risks.append(climate) |
|
|
| if flood > 0.3: |
| flood_exposure += length |
|
|
| if not climate_risks: |
| return ClimateMetrics() |
|
|
| return ClimateMetrics( |
| avg_climate_risk=round(sum(climate_risks) / len(climate_risks), 3), |
| max_climate_risk=round(max(climate_risks), 3), |
| avg_flood_risk=round(sum(flood_risks) / len(flood_risks), 3), |
| max_flood_risk=round(max(flood_risks), 3), |
| avg_heat_risk=round(sum(heat_risks) / len(heat_risks), 3), |
| max_heat_risk=round(max(heat_risks), 3), |
| avg_air_quality_risk=round(sum(air_quality_risks) / len(air_quality_risks), 3), |
| max_air_quality_risk=round(max(air_quality_risks), 3), |
| avg_tree_coverage=round(sum(tree_coverages) / len(tree_coverages), 3), |
| flood_exposure_m=round(flood_exposure, 1), |
| ) |
|
|
| |
| |
| |
|
|
| def _compute_single_route( |
| self, |
| G: nx.MultiDiGraph, |
| origin_node: int, |
| dest_node: int, |
| weight_key: str = "length" |
| ) -> RouteOption | None: |
| """Compute a single route.""" |
| try: |
| route = nx.shortest_path(G, origin_node, dest_node, weight=weight_key) |
| distance = sum( |
| self.G[u][v][0].get("length", 0) for u, v in zip(route[:-1], route[1:]) |
| ) |
| return RouteOption( |
| name="", |
| label="", |
| color="", |
| coords=self._get_route_coords(route), |
| distance_m=round(distance, 1), |
| time_min=round(distance / WALK_SPEED_M_PER_MIN, 1), |
| metrics=self._compute_route_metrics(route), |
| route_nodes=route, |
| climate_metrics=self._compute_climate_metrics(route), |
| ) |
| except (nx.NetworkXNoPath, Exception): |
| return None |
|
|
| def _compute_single_route_igraph( |
| self, |
| origin_node: int, |
| dest_node: int, |
| weight_attr: str = "length" |
| ) -> RouteOption | None: |
| """Compute a single route using igraph for better performance.""" |
| if not self.use_igraph or self.ig_graph is None: |
| return None |
|
|
| try: |
| ig_origin = self.ig_node_map.get(origin_node) |
| ig_dest = self.ig_node_map.get(dest_node) |
|
|
| if ig_origin is None or ig_dest is None: |
| return None |
|
|
| |
| path = self.ig_graph.get_shortest_paths( |
| ig_origin, to=ig_dest, weights=weight_attr, output="vpath" |
| )[0] |
|
|
| if not path: |
| return None |
|
|
| |
| route = [self.ig_reverse_map[ig_node] for ig_node in path] |
|
|
| |
| distance = sum( |
| self.G[u][v][0].get("length", 0) for u, v in zip(route[:-1], route[1:]) |
| ) |
|
|
| return RouteOption( |
| name="", |
| label="", |
| color="", |
| coords=self._get_route_coords(route), |
| distance_m=round(distance, 1), |
| time_min=round(distance / WALK_SPEED_M_PER_MIN, 1), |
| metrics=self._compute_route_metrics(route), |
| route_nodes=route, |
| climate_metrics=self._compute_climate_metrics(route), |
| ) |
| except Exception: |
| return None |
|
|
| def route( |
| self, |
| origin_coords: tuple[float, float], |
| dest_coords: tuple[float, float], |
| mode: str = "safest", |
| climate_params: ClimateWeightParams = None |
| ) -> RouteOption | None: |
| """ |
| Compute a single route between two points with runtime climate weight calculation. |
| |
| Climate weights are computed at RUNTIME using configurable parameters, |
| allowing the LLM to adjust based on user context (flooding, heat, asthma, etc.). |
| |
| Args: |
| origin_coords: (lat, lon) tuple for origin |
| dest_coords: (lat, lon) tuple for destination |
| mode: 'safest' (use runtime climate weights) or 'fastest' (use length only) |
| climate_params: ClimateWeightParams for runtime weight calculation. |
| If None, uses default parameters. |
| |
| Returns: |
| RouteOption or None if no path found |
| """ |
| if not self._loaded: |
| return None |
|
|
| origin_lat, origin_lon = origin_coords |
| dest_lat, dest_lon = dest_coords |
|
|
| try: |
| origin_node = self._get_nearest_node(origin_lat, origin_lon) |
| dest_node = self._get_nearest_node(dest_lat, dest_lon) |
| except Exception: |
| return None |
|
|
| |
| if mode == "fastest" or mode == "fast": |
| result = self._compute_single_route(self.G, origin_node, dest_node, "length") |
| if result: |
| result.name = "fastest" |
| result.label = "Fastest" |
| result.color = ROUTE_COLORS.get("shortest", "#3b82f6") |
| return result |
|
|
| |
| if climate_params is None: |
| climate_params = ClimateWeightParams() |
|
|
| |
| weight_func = self._create_weight_function(climate_params) |
| result = self._compute_single_route_with_weight_func( |
| origin_node, dest_node, weight_func |
| ) |
|
|
| if result: |
| result.name = "safest" |
| result.label = "Safest" |
| result.color = ROUTE_COLORS.get("safest", "#10b981") |
| return result |
|
|
| def _compute_single_route_with_weight_func( |
| self, |
| origin_node: int, |
| dest_node: int, |
| weight_func |
| ) -> RouteOption | None: |
| """Compute a single route using a callable weight function.""" |
| try: |
| route = nx.shortest_path(self.G, origin_node, dest_node, weight=weight_func) |
| distance = sum( |
| self.G[u][v][0].get("length", 0) for u, v in zip(route[:-1], route[1:]) |
| ) |
| return RouteOption( |
| name="", |
| label="", |
| color="", |
| coords=self._get_route_coords(route), |
| distance_m=round(distance, 1), |
| time_min=round(distance / WALK_SPEED_M_PER_MIN, 1), |
| metrics=self._compute_route_metrics(route), |
| route_nodes=route, |
| climate_metrics=self._compute_climate_metrics(route), |
| ) |
| except (nx.NetworkXNoPath, Exception): |
| return None |
|
|
| def compare_routes( |
| self, |
| origin_coords: tuple[float, float], |
| dest_coords: tuple[float, float], |
| dest_name: str = "Destination", |
| climate_params: ClimateWeightParams = None |
| ) -> RouteComparisonResult: |
| """ |
| Compare shortest and safest routes between two points. |
| |
| Returns both routes with comparison statistics showing the trade-off |
| between distance and climate risk. |
| |
| Args: |
| origin_coords: (lat, lon) tuple for origin |
| dest_coords: (lat, lon) tuple for destination |
| dest_name: Optional name for the destination |
| climate_params: ClimateWeightParams for runtime weight calculation |
| |
| Returns: |
| RouteComparisonResult with both routes and comparison stats |
| """ |
| if not self._loaded: |
| return RouteComparisonResult(success=False, error="Engine not loaded") |
|
|
| if not self.has_climate_data: |
| return RouteComparisonResult( |
| success=False, |
| error="Climate data not available. Load climate-enhanced network first." |
| ) |
|
|
| origin_lat, origin_lon = origin_coords |
| dest_lat, dest_lon = dest_coords |
|
|
| |
| fastest = self.route(origin_coords, dest_coords, mode="fastest") |
| safest = self.route(origin_coords, dest_coords, mode="safest", climate_params=climate_params) |
|
|
| if not fastest: |
| return RouteComparisonResult(success=False, error="No path found for fastest route") |
| if not safest: |
| return RouteComparisonResult(success=False, error="No path found for safest route") |
|
|
| |
| extra_distance_m = safest.distance_m - fastest.distance_m |
| extra_distance_pct = (extra_distance_m / fastest.distance_m * 100) if fastest.distance_m > 0 else 0 |
|
|
| |
| risk_reduction = fastest.climate_metrics.avg_climate_risk - safest.climate_metrics.avg_climate_risk |
|
|
| return RouteComparisonResult( |
| success=True, |
| shortest=fastest, |
| safest=safest, |
| origin=(origin_lat, origin_lon), |
| destination=(dest_lat, dest_lon), |
| dest_name=dest_name, |
| extra_distance_m=extra_distance_m, |
| extra_distance_pct=extra_distance_pct, |
| risk_reduction=risk_reduction, |
| ) |
|
|
| def compute_routes( |
| self, |
| origin_lat: float, |
| origin_lon: float, |
| dest_lat: float, |
| dest_lon: float, |
| dest_name: str = "Destination", |
| climate_params: ClimateWeightParams = None |
| ) -> RoutingResult: |
| """Compute multiple route alternatives between two points. |
| |
| Climate weights are computed at RUNTIME using configurable parameters. |
| The safest route uses runtime weight calculation based on flood, heat, |
| tree coverage, and air quality factors. |
| |
| Args: |
| origin_lat, origin_lon: Origin coordinates |
| dest_lat, dest_lon: Destination coordinates |
| dest_name: Name of destination |
| climate_params: ClimateWeightParams for runtime weight calculation |
| """ |
| if not self._loaded: |
| return RoutingResult(success=False, error="Engine not loaded") |
|
|
| try: |
| origin_node = self._get_nearest_node(origin_lat, origin_lon) |
| dest_node = self._get_nearest_node(dest_lat, dest_lon) |
| except Exception as e: |
| return RoutingResult(success=False, error=f"Could not locate points: {e}") |
|
|
| alternatives = [] |
|
|
| |
| shortest = self._compute_single_route(self.G, origin_node, dest_node, "length") |
| if shortest: |
| shortest.name = "shortest" |
| shortest.label = "Shortest" |
| shortest.color = ROUTE_COLORS["shortest"] |
| alternatives.append(shortest) |
|
|
| |
| G_flat = self._apply_elevation_weights(penalty_factor=5.0) |
| flattest = self._compute_single_route(G_flat, origin_node, dest_node, "weighted_length") |
| if flattest and (not shortest or flattest.coords != shortest.coords): |
| flattest.name = "flattest" |
| flattest.label = "Flattest" |
| flattest.color = ROUTE_COLORS["flattest"] |
| alternatives.append(flattest) |
|
|
| |
| G_balanced = self._apply_elevation_weights(penalty_factor=2.0) |
| balanced = self._compute_single_route(G_balanced, origin_node, dest_node, "weighted_length") |
| if balanced: |
| existing_coords = [r.coords for r in alternatives] |
| if balanced.coords not in existing_coords: |
| balanced.name = "balanced" |
| balanced.label = "Balanced" |
| balanced.color = ROUTE_COLORS["balanced"] |
| alternatives.append(balanced) |
|
|
| |
| if self.has_climate_data: |
| if climate_params is None: |
| climate_params = ClimateWeightParams() |
|
|
| weight_func = self._create_weight_function(climate_params) |
| safest = self._compute_single_route_with_weight_func(origin_node, dest_node, weight_func) |
| if safest: |
| existing_coords = [r.coords for r in alternatives] |
| if safest.coords not in existing_coords: |
| |
| safest.name = "safest" |
| safest.label = "Safest (Climate)" |
| safest.color = ROUTE_COLORS["safest"] |
| alternatives.append(safest) |
| else: |
| |
| |
| for route in alternatives: |
| if route.coords == safest.coords: |
| if route.name == "shortest": |
| route.label = "Shortest & Safest (Climate)" |
| route.name = "safest" |
| break |
|
|
| if not alternatives: |
| return RoutingResult(success=False, error="No path found") |
|
|
| |
| |
| if self.has_climate_data and any(r.name == "safest" for r in alternatives): |
| recommended = "safest" |
| elif any(r.name == "flattest" for r in alternatives): |
| recommended = "flattest" |
| else: |
| recommended = "shortest" |
|
|
| return RoutingResult( |
| success=True, |
| recommended=recommended, |
| alternatives=alternatives, |
| origin=(origin_lat, origin_lon), |
| destination=(dest_lat, dest_lon), |
| dest_name=dest_name |
| ) |
|
|
| def find_nearest_resource( |
| self, |
| resource_type: str, |
| origin_lat: float, |
| origin_lon: float, |
| prefer_safe: bool = True, |
| climate_params: ClimateWeightParams = None |
| ) -> tuple[dict[str, Any], dict | None]: |
| """ |
| Find nearest resource of a given type with climate-aware route alternatives. |
| |
| This function: |
| 1. Finds the nearest resource by ROUTED distance (not crow-flies) |
| 2. Computes multiple route alternatives (shortest, flattest, safest) |
| 3. Recommends the safest route when climate data is available |
| |
| Climate weights are computed at RUNTIME using configurable parameters, |
| allowing the LLM to adjust based on user context (flooding, heat, asthma, etc.). |
| |
| Args: |
| resource_type: Type of resource to find |
| origin_lat: Origin latitude |
| origin_lon: Origin longitude |
| prefer_safe: If True and climate data available, prefer climate-safe routes |
| climate_params: ClimateWeightParams for runtime weight calculation |
| """ |
| if not self._loaded: |
| return {"error": "Engine not loaded"}, None |
|
|
| if self.resources_df is None: |
| return {"error": "Resources not loaded"}, None |
|
|
| candidates = self.resources_df[self.resources_df["type"] == resource_type] |
| if len(candidates) == 0: |
| return {"error": f"No resources of type '{resource_type}' found"}, None |
|
|
| try: |
| origin_node = self._get_nearest_node(origin_lat, origin_lon) |
| except Exception as e: |
| return {"error": f"Could not find origin: {e}"}, None |
|
|
| |
| |
| best_resource = None |
| best_distance = float("inf") |
|
|
| for _, row in candidates.iterrows(): |
| try: |
| dest_node = self._get_nearest_node(row["lat"], row["lon"]) |
| |
| route_length = nx.shortest_path_length( |
| self.G, origin_node, dest_node, weight="length" |
| ) |
| if route_length < best_distance: |
| best_distance = route_length |
| best_resource = row.to_dict() |
| except (nx.NetworkXNoPath, Exception): |
| continue |
|
|
| if best_resource is None: |
| return {"error": f"Could not find route to any {resource_type}"}, None |
|
|
| |
| |
| routing_result = self.compute_routes( |
| origin_lat=origin_lat, |
| origin_lon=origin_lon, |
| dest_lat=best_resource["lat"], |
| dest_lon=best_resource["lon"], |
| dest_name=best_resource["name"], |
| climate_params=climate_params |
| ) |
|
|
| if not routing_result.success: |
| return {"error": routing_result.error or "Could not compute routes"}, None |
|
|
| |
| recommended_route = next( |
| (r for r in routing_result.alternatives if r.name == routing_result.recommended), |
| routing_result.alternatives[0] if routing_result.alternatives else None |
| ) |
| if not recommended_route: |
| return {"error": "No route alternatives computed"}, None |
|
|
| |
| map_data = { |
| "routes": [ |
| { |
| "coords": alt.coords, |
| "color": alt.color, |
| "label": alt.label, |
| "name": alt.name, |
| } |
| for alt in routing_result.alternatives |
| ], |
| "origin": [origin_lat, origin_lon], |
| "destination": [best_resource["lat"], best_resource["lon"]], |
| "dest_name": best_resource["name"], |
| "recommended": routing_result.recommended, |
| } |
|
|
| |
| result = { |
| "found": True, |
| "name": best_resource["name"], |
| "type": best_resource["type"], |
| "category": best_resource.get("category", ""), |
| "lat": best_resource["lat"], |
| "lon": best_resource["lon"], |
| "origin": {"lat": origin_lat, "lon": origin_lon}, |
| |
| "alternatives": [alt.to_dict() for alt in routing_result.alternatives], |
| "recommended": routing_result.recommended, |
| |
| "distance_meters": round(recommended_route.distance_m, 1), |
| "walking_time_minutes": round(recommended_route.time_min, 1), |
| "route_metrics": recommended_route.metrics.to_dict() if recommended_route.metrics else {}, |
| "climate_metrics": recommended_route.climate_metrics.to_dict() if recommended_route.climate_metrics else {}, |
| "climate_aware": prefer_safe and self.has_climate_data, |
| } |
|
|
| return result, map_data |
|
|
| def list_resources(self, resource_type: str = "", category: str = "") -> dict[str, Any]: |
| """List available resources with optional filtering.""" |
| if self.resources_df is None: |
| return {"error": "Resources not loaded"} |
|
|
| df = self.resources_df.copy() |
| if category: |
| df = df[df["category"] == category] |
| if resource_type: |
| df = df[df["type"] == resource_type] |
|
|
| summary = df.groupby("type").agg({"name": "count", "category": "first"}).rename( |
| columns={"name": "count"} |
| ).to_dict("index") |
|
|
| return { |
| "total_count": len(df), |
| "by_type": summary, |
| "resources": df[["name", "type", "category", "lat", "lon"]].to_dict("records")[:20] |
| } |
|
|
| |
| |
| |
|
|
| def generate_isochrone( |
| self, |
| origin_lat: float, |
| origin_lon: float, |
| time_limits: list[int] = None, |
| resource_types: list[str] = None |
| ) -> IsochroneResult: |
| """ |
| Generate isochrones showing areas reachable within given time limits. |
| |
| Uses igraph SSSP if available for better performance, otherwise NetworkX. |
| |
| Args: |
| origin_lat: Origin latitude |
| origin_lon: Origin longitude |
| time_limits: List of time limits in minutes (default: [5, 10, 15]) |
| resource_types: Optional filter for resources to include |
| |
| Returns: |
| IsochroneResult with polygon boundaries and resources within reach |
| """ |
| if not self._loaded: |
| return IsochroneResult(success=False, error="Engine not loaded") |
|
|
| if time_limits is None: |
| time_limits = [5, 10, 15] |
|
|
| time_limits = sorted(time_limits) |
| max_time = max(time_limits) |
| max_distance = max_time * WALK_SPEED_M_PER_MIN |
|
|
| try: |
| origin_node = self._get_nearest_node(origin_lat, origin_lon) |
| except Exception as e: |
| return IsochroneResult(success=False, error=f"Could not locate origin: {e}") |
|
|
| |
| if self.use_igraph and self.ig_graph is not None: |
| |
| node_distances = self._compute_sssp_igraph(origin_node, max_distance) |
| else: |
| |
| node_distances = self._compute_sssp_networkx(origin_node, max_distance) |
|
|
| |
| isochrones = [] |
| for time_min in time_limits: |
| distance_limit = time_min * WALK_SPEED_M_PER_MIN |
| reachable_nodes = [n for n, d in node_distances.items() if d <= distance_limit] |
|
|
| if not reachable_nodes: |
| continue |
|
|
| |
| polygon_coords = self._nodes_to_polygon(reachable_nodes) |
|
|
| isochrones.append({ |
| "time_min": time_min, |
| "polygon_coords": polygon_coords, |
| "color": ISOCHRONE_COLORS.get(time_min, "#6366f1"), |
| "node_count": len(reachable_nodes), |
| }) |
|
|
| |
| resources_within = [] |
| if self.resources_df is not None: |
| max_distance_nodes = {n for n, d in node_distances.items() if d <= max_distance} |
|
|
| for _, row in self.resources_df.iterrows(): |
| if resource_types and row["type"] not in resource_types: |
| continue |
|
|
| try: |
| resource_node = self._get_nearest_node(row["lat"], row["lon"]) |
| if resource_node in max_distance_nodes: |
| dist = node_distances.get(resource_node, float("inf")) |
| time_to_reach = dist / WALK_SPEED_M_PER_MIN |
| resources_within.append({ |
| "name": row["name"], |
| "type": row["type"], |
| "category": row.get("category", ""), |
| "lat": row["lat"], |
| "lon": row["lon"], |
| "distance_meters": round(dist, 1), |
| "walking_time_minutes": round(time_to_reach, 1), |
| }) |
| except Exception: |
| continue |
|
|
| |
| resources_within.sort(key=lambda x: x["distance_meters"]) |
|
|
| return IsochroneResult( |
| success=True, |
| origin=(origin_lat, origin_lon), |
| isochrones=isochrones, |
| resources_within=resources_within[:20], |
| ) |
|
|
| def _compute_sssp_igraph(self, origin_node: int, max_distance: float) -> dict[int, float]: |
| """Compute single-source shortest paths using igraph.""" |
| ig_origin = self.ig_node_map.get(origin_node) |
| if ig_origin is None: |
| return {} |
|
|
| |
| all_distances = self.ig_graph.distances(source=ig_origin, weights="weight", mode="out")[0] |
|
|
| |
| distances = {} |
| for ig_node, dist in enumerate(all_distances): |
| if dist != float("inf") and dist <= max_distance: |
| nx_node = self.ig_reverse_map.get(ig_node) |
| if nx_node is not None: |
| distances[nx_node] = dist |
|
|
| return distances |
|
|
| def _compute_sssp_networkx(self, origin_node: int, max_distance: float) -> dict[int, float]: |
| """Compute single-source shortest paths using NetworkX.""" |
| try: |
| |
| lengths = nx.single_source_dijkstra_path_length( |
| self.G, origin_node, cutoff=max_distance, weight="length" |
| ) |
| return dict(lengths) |
| except Exception: |
| return {} |
|
|
| def _nodes_to_polygon(self, nodes: list[int]) -> list[tuple[float, float]]: |
| """Convert a set of nodes to a convex hull polygon in lat/lon.""" |
| if len(nodes) < 3: |
| return [] |
|
|
| |
| coords = [] |
| for node in nodes: |
| x = self.G.nodes[node].get("x", 0) |
| y = self.G.nodes[node].get("y", 0) |
| coords.append([x, y]) |
|
|
| coords = np.array(coords) |
|
|
| |
| try: |
| from scipy.spatial import ConvexHull |
| hull = ConvexHull(coords) |
| hull_points = coords[hull.vertices] |
|
|
| |
| if "crs" in self.G.graph and self.G.graph["crs"] != "EPSG:4326": |
| import pyproj |
| transformer = pyproj.Transformer.from_crs( |
| self.G.graph["crs"], "EPSG:4326", always_xy=True |
| ) |
| result = [] |
| for x, y in hull_points: |
| lon, lat = transformer.transform(x, y) |
| result.append((lat, lon)) |
| |
| result.append(result[0]) |
| return result |
| else: |
| result = [(y, x) for x, y in hull_points] |
| result.append(result[0]) |
| return result |
| except Exception: |
| return [] |
|
|
| |
| |
| |
|
|
| def find_along_route( |
| self, |
| origin_lat: float, |
| origin_lon: float, |
| dest_lat: float, |
| dest_lon: float, |
| buffer_meters: float = 100, |
| resource_types: list[str] = None |
| ) -> AlongRouteResult: |
| """ |
| Find resources/POIs along a route corridor. |
| |
| Args: |
| origin_lat, origin_lon: Route start |
| dest_lat, dest_lon: Route end |
| buffer_meters: Width of corridor to search (default 100m) |
| resource_types: Optional filter for resource types |
| |
| Returns: |
| AlongRouteResult with POIs found along the route |
| """ |
| if not self._loaded: |
| return AlongRouteResult(success=False, error="Engine not loaded") |
|
|
| if self.resources_df is None: |
| return AlongRouteResult(success=False, error="Resources not loaded") |
|
|
| |
| routing_result = self.compute_routes(origin_lat, origin_lon, dest_lat, dest_lon) |
| if not routing_result.success: |
| return AlongRouteResult(success=False, error=routing_result.error) |
|
|
| |
| recommended = next( |
| (r for r in routing_result.alternatives if r.name == routing_result.recommended), |
| routing_result.alternatives[0] |
| ) |
|
|
| route_coords = recommended.coords |
| if not route_coords: |
| return AlongRouteResult(success=False, error="No route coordinates") |
|
|
| |
| if "crs" in self.G.graph and self.G.graph["crs"] != "EPSG:4326": |
| import pyproj |
| transformer = pyproj.Transformer.from_crs( |
| "EPSG:4326", self.G.graph["crs"], always_xy=True |
| ) |
| projected_route = [] |
| for lat, lon in route_coords: |
| x, y = transformer.transform(lon, lat) |
| projected_route.append([x, y]) |
| projected_route = np.array(projected_route) |
| else: |
| projected_route = np.array([[lon, lat] for lat, lon in route_coords]) |
|
|
| |
| pois_found = [] |
|
|
| for _, row in self.resources_df.iterrows(): |
| if resource_types and row["type"] not in resource_types: |
| continue |
|
|
| |
| if "crs" in self.G.graph and self.G.graph["crs"] != "EPSG:4326": |
| x, y = transformer.transform(row["lon"], row["lat"]) |
| point = np.array([x, y]) |
| else: |
| point = np.array([row["lon"], row["lat"]]) |
|
|
| |
| min_dist = self._point_to_polyline_distance(point, projected_route) |
|
|
| if min_dist <= buffer_meters: |
| |
| segment_idx = self._nearest_segment_index(point, projected_route) |
|
|
| pois_found.append({ |
| "name": row["name"], |
| "type": row["type"], |
| "category": row.get("category", ""), |
| "lat": row["lat"], |
| "lon": row["lon"], |
| "distance_from_route_m": round(min_dist, 1), |
| "route_segment": segment_idx, |
| }) |
|
|
| |
| pois_found.sort(key=lambda x: x["route_segment"]) |
|
|
| return AlongRouteResult( |
| success=True, |
| route_coords=route_coords, |
| pois_found=pois_found, |
| origin=(origin_lat, origin_lon), |
| destination=(dest_lat, dest_lon), |
| buffer_meters=buffer_meters, |
| climate_metrics=recommended.climate_metrics, |
| ) |
|
|
| def _point_to_polyline_distance(self, point: np.ndarray, polyline: np.ndarray) -> float: |
| """Calculate minimum distance from a point to a polyline.""" |
| min_dist = float("inf") |
|
|
| for i in range(len(polyline) - 1): |
| seg_start = polyline[i] |
| seg_end = polyline[i + 1] |
|
|
| |
| seg_vec = seg_end - seg_start |
| seg_len_sq = np.dot(seg_vec, seg_vec) |
|
|
| if seg_len_sq == 0: |
| |
| dist = np.linalg.norm(point - seg_start) |
| else: |
| |
| t = max(0, min(1, np.dot(point - seg_start, seg_vec) / seg_len_sq)) |
| projection = seg_start + t * seg_vec |
| dist = np.linalg.norm(point - projection) |
|
|
| min_dist = min(min_dist, dist) |
|
|
| return min_dist |
|
|
| def _nearest_segment_index(self, point: np.ndarray, polyline: np.ndarray) -> int: |
| """Find which segment of the polyline is nearest to the point.""" |
| min_dist = float("inf") |
| nearest_idx = 0 |
|
|
| for i in range(len(polyline) - 1): |
| seg_start = polyline[i] |
| seg_end = polyline[i + 1] |
| seg_vec = seg_end - seg_start |
| seg_len_sq = np.dot(seg_vec, seg_vec) |
|
|
| if seg_len_sq == 0: |
| dist = np.linalg.norm(point - seg_start) |
| else: |
| t = max(0, min(1, np.dot(point - seg_start, seg_vec) / seg_len_sq)) |
| projection = seg_start + t * seg_vec |
| dist = np.linalg.norm(point - projection) |
|
|
| if dist < min_dist: |
| min_dist = dist |
| nearest_idx = i |
|
|
| return nearest_idx |
|
|
| |
| |
| |
|
|
| def geocode_query(self, query: str) -> tuple[str, dict]: |
| """Resolve place names in query to coordinates.""" |
| geocode_info = {} |
| modified = query |
| query_lower = query.lower() |
|
|
| |
| sorted_places = sorted(self.known_places.items(), key=lambda x: -len(x[0])) |
| used_spans = [] |
|
|
| for name_lower, info in sorted_places: |
| pattern = r"\b" + re.escape(name_lower) + r"\b" |
| for match in re.finditer(pattern, query_lower): |
| start, end = match.span() |
| overlaps = any(not (end <= us or start >= ue) for us, ue in used_spans) |
| if not overlaps: |
| geocode_info[info["name"]] = { |
| "lat": info["lat"], |
| "lon": info["lon"], |
| "name": info["name"] |
| } |
| original_text = query[start:end] |
| modified = re.compile(re.escape(original_text), re.IGNORECASE).sub( |
| f"(lat {info['lat']:.6f}, lon {info['lon']:.6f})", |
| modified, |
| count=1 |
| ) |
| used_spans.append((start, end)) |
|
|
| return modified, geocode_info |
|
|
|
|
| |
| |
| |
|
|
| def _safe_str(val, default: str = "") -> str: |
| if val is None: |
| return default |
| if isinstance(val, list): |
| return str(val[0]) if val else default |
| return str(val) |
|
|
|
|
| def _safe_float(val, default: float) -> float: |
| if val is None: |
| return default |
| if isinstance(val, list): |
| val = val[0] if val else default |
| try: |
| return float(val) |
| except (ValueError, TypeError): |
| return default |
|
|
|
|
| def _parse_climate_params(args: dict) -> ClimateWeightParams: |
| """Extract climate weight parameters from tool args. |
| |
| The LLM can pass these parameters based on user context: |
| - flood_penalty_deep: 5.0 default, 10.0 for active flooding |
| - flood_penalty_shallow: 2.0 default, 4.0 for flooding conditions |
| - heat_factor: 0.3 default, 0.5 for hot days |
| - shade_factor: 0.3 default, 0.5 for shade-seeking |
| - aqi_factor: 0.1 default, 0.5 for respiratory concerns |
| - grade_factor: 0.2 default, 0.5 for elderly/mobility-impaired users |
| """ |
| return ClimateWeightParams( |
| flood_penalty_deep=_safe_float(args.get("flood_penalty_deep"), 5.0), |
| flood_penalty_shallow=_safe_float(args.get("flood_penalty_shallow"), 2.0), |
| heat_factor=_safe_float(args.get("heat_factor"), 0.3), |
| shade_factor=_safe_float(args.get("shade_factor"), 0.3), |
| aqi_factor=_safe_float(args.get("aqi_factor"), 0.1), |
| grade_factor=_safe_float(args.get("grade_factor"), 0.2), |
| ) |
|
|
|
|
| def execute_tool( |
| tool_name: str, |
| args: dict, |
| engine: RoutingEngine |
| ) -> tuple[dict[str, Any], dict | None]: |
| """Execute a tool by name using the routing engine. |
| |
| Climate weight parameters can be passed in args and will be used for |
| runtime weight calculation when routing. |
| """ |
| |
| climate_params = _parse_climate_params(args) |
|
|
| if tool_name == "list_resources": |
| result = engine.list_resources( |
| resource_type=_safe_str(args.get("resource_type"), ""), |
| category=_safe_str(args.get("category"), "") |
| ) |
| return result, None |
|
|
| elif tool_name == "find_nearest": |
| lat = args.get("lat") or args.get("origin_lat") |
| lon = args.get("lon") or args.get("origin_lon") |
| return engine.find_nearest_resource( |
| resource_type=_safe_str(args.get("resource_type"), ""), |
| origin_lat=_safe_float(lat, BROWNSVILLE_CENTER["lat"]), |
| origin_lon=_safe_float(lon, BROWNSVILLE_CENTER["lon"]), |
| climate_params=climate_params |
| ) |
|
|
| elif tool_name == "calculate_route": |
| |
| routing_mode = _safe_str(args.get("routing_mode"), "safe") |
|
|
| result = engine.compute_routes( |
| origin_lat=_safe_float(args.get("start_lat") or args.get("origin_lat"), BROWNSVILLE_CENTER["lat"]), |
| origin_lon=_safe_float(args.get("start_lon") or args.get("origin_lon"), BROWNSVILLE_CENTER["lon"]), |
| dest_lat=_safe_float(args.get("end_lat") or args.get("dest_lat"), BROWNSVILLE_CENTER["lat"]), |
| dest_lon=_safe_float(args.get("end_lon") or args.get("dest_lon"), BROWNSVILLE_CENTER["lon"]), |
| dest_name=_safe_str(args.get("dest_name"), "Destination"), |
| climate_params=climate_params |
| ) |
| return result.to_dict(), result.to_map_data() |
|
|
| elif tool_name == "generate_isochrone": |
| |
| time_limits = args.get("time_limits", [5, 10, 15]) |
| if isinstance(time_limits, str): |
| |
| parsed = [] |
| for x in time_limits.split(","): |
| |
| import re |
| match = re.search(r'(\d+)', x.strip()) |
| if match: |
| parsed.append(int(match.group(1))) |
| time_limits = parsed if parsed else [5, 10, 15] |
| elif isinstance(time_limits, (int, float)): |
| time_limits = [int(time_limits)] |
| elif isinstance(time_limits, list): |
| |
| time_limits = [int(x) if isinstance(x, (int, float, str)) and str(x).isdigit() else 10 for x in time_limits] |
| if not time_limits: |
| time_limits = [5, 10, 15] |
|
|
| |
| resource_types = args.get("resource_types") |
| if isinstance(resource_types, str): |
| resource_types = [x.strip() for x in resource_types.split(",")] |
|
|
| lat = args.get("lat") or args.get("origin_lat") |
| lon = args.get("lon") or args.get("origin_lon") |
|
|
| result = engine.generate_isochrone( |
| origin_lat=_safe_float(lat, BROWNSVILLE_CENTER["lat"]), |
| origin_lon=_safe_float(lon, BROWNSVILLE_CENTER["lon"]), |
| time_limits=time_limits, |
| resource_types=resource_types |
| ) |
| return result.to_dict(), result.to_map_data() |
|
|
| elif tool_name == "find_along_route": |
| |
| resource_types = args.get("resource_types") |
| if isinstance(resource_types, str): |
| resource_types = [x.strip() for x in resource_types.split(",")] |
|
|
| result = engine.find_along_route( |
| origin_lat=_safe_float(args.get("start_lat") or args.get("origin_lat"), BROWNSVILLE_CENTER["lat"]), |
| origin_lon=_safe_float(args.get("start_lon") or args.get("origin_lon"), BROWNSVILLE_CENTER["lon"]), |
| dest_lat=_safe_float(args.get("end_lat") or args.get("dest_lat"), BROWNSVILLE_CENTER["lat"]), |
| dest_lon=_safe_float(args.get("end_lon") or args.get("dest_lon"), BROWNSVILLE_CENTER["lon"]), |
| buffer_meters=_safe_float(args.get("buffer_meters"), 100), |
| resource_types=resource_types |
| ) |
| return result.to_dict(), result.to_map_data() |
|
|
| elif tool_name == "compare_routes": |
| |
| origin_lat = _safe_float(args.get("start_lat") or args.get("origin_lat"), BROWNSVILLE_CENTER["lat"]) |
| origin_lon = _safe_float(args.get("start_lon") or args.get("origin_lon"), BROWNSVILLE_CENTER["lon"]) |
| dest_lat = _safe_float(args.get("end_lat") or args.get("dest_lat"), BROWNSVILLE_CENTER["lat"]) |
| dest_lon = _safe_float(args.get("end_lon") or args.get("dest_lon"), BROWNSVILLE_CENTER["lon"]) |
|
|
| result = engine.compare_routes( |
| origin_coords=(origin_lat, origin_lon), |
| dest_coords=(dest_lat, dest_lon), |
| dest_name=_safe_str(args.get("dest_name"), "Destination"), |
| climate_params=climate_params |
| ) |
| return result.to_dict(), result.to_map_data() |
|
|
| elif tool_name == "climate_route": |
| |
| origin_lat = _safe_float(args.get("start_lat") or args.get("origin_lat"), BROWNSVILLE_CENTER["lat"]) |
| origin_lon = _safe_float(args.get("start_lon") or args.get("origin_lon"), BROWNSVILLE_CENTER["lon"]) |
| dest_lat = _safe_float(args.get("end_lat") or args.get("dest_lat"), BROWNSVILLE_CENTER["lat"]) |
| dest_lon = _safe_float(args.get("end_lon") or args.get("dest_lon"), BROWNSVILLE_CENTER["lon"]) |
| mode = _safe_str(args.get("mode") or args.get("routing_mode"), "safest") |
|
|
| result = engine.route( |
| origin_coords=(origin_lat, origin_lon), |
| dest_coords=(dest_lat, dest_lon), |
| mode=mode, |
| climate_params=climate_params |
| ) |
|
|
| if result is None: |
| return {"error": "No path found"}, None |
|
|
| map_data = { |
| "routes": [{ |
| "coords": result.coords, |
| "color": result.color, |
| "label": result.label, |
| "name": result.name, |
| }], |
| "origin": [origin_lat, origin_lon], |
| "destination": [dest_lat, dest_lon], |
| } |
|
|
| return result.to_dict(), map_data |
|
|
| else: |
| return {"error": f"Unknown tool: {tool_name}"}, None |
|
|