| """ |
| Routing tools for the Emergency Routing Assistant. |
| |
| Uses dream-meridian pattern: |
| 1. Geocode place names BEFORE sending to LLM |
| 2. LLM outputs simple JSON tool call |
| 3. Execute tool and return result |
| """ |
|
|
| import os |
| import re |
| import networkx as nx |
| import pandas as pd |
| import geopandas as gpd |
| import osmnx as ox |
| import requests |
| from typing import Any |
|
|
| |
| WALK_SPEED_M_PER_MIN = 75 |
|
|
| |
| ELEVATION_PENALTY_FACTOR = 3.0 |
|
|
| |
| BROWNSVILLE_CENTER = {"lat": 40.6594, "lon": -73.9126} |
| BROWNSVILLE_BOUNDS = { |
| "min_lat": 40.64, |
| "max_lat": 40.68, |
| "min_lon": -73.93, |
| "max_lon": -73.89 |
| } |
|
|
| |
| |
| KNOWN_PLACES = {} |
|
|
|
|
| def load_known_places(): |
| """Load known places from the places.csv file.""" |
| global KNOWN_PLACES |
|
|
| |
| places_path = os.path.join(os.path.dirname(__file__), "..", "data", "brownsville", "places.csv") |
|
|
| if os.path.exists(places_path): |
| try: |
| df = pd.read_csv(places_path) |
| for _, row in df.iterrows(): |
| name_lower = row['name_lower'] |
| KNOWN_PLACES[name_lower] = { |
| "lat": row['lat'], |
| "lon": row['lon'], |
| "name": row['name'] |
| } |
| print(f"Loaded {len(KNOWN_PLACES)} places for geocoding") |
| except Exception as e: |
| print(f"Warning: Could not load places: {e}") |
| else: |
| print(f"Warning: places.csv not found at {places_path}") |
|
|
|
|
| |
| load_known_places() |
|
|
| |
| INTERSECTION_PATTERN = re.compile( |
| r"(\w+(?:\s+\w+)?)\s+(?:and|&|at)\s+(\w+(?:\s+\w+)?)", |
| re.IGNORECASE |
| ) |
|
|
| |
| |
| |
|
|
| def find_place_in_query(query: str) -> list[tuple[str, dict]]: |
| """ |
| Find known place names in a query. |
| Returns list of (matched_text, place_info) tuples. |
| Matches longest places first. |
| """ |
| query_lower = query.lower() |
| matches = [] |
| used_spans = [] |
|
|
| |
| sorted_places = sorted(KNOWN_PLACES.items(), key=lambda x: -len(x[0])) |
|
|
| for name_lower, info in sorted_places: |
| |
| pattern = r"\b" + re.escape(name_lower) + r"\b" |
| for match in re.finditer(pattern, query_lower): |
| start, end = match.span() |
|
|
| |
| overlaps = any( |
| not (end <= us or start >= ue) for us, ue in used_spans |
| ) |
|
|
| if not overlaps: |
| original_text = query[start:end] |
| matches.append((original_text, info)) |
| used_spans.append((start, end)) |
|
|
| return matches |
|
|
|
|
| def geocode_nominatim(place_name: str) -> dict | None: |
| """Fallback: geocode using Nominatim API.""" |
| try: |
| search_query = f"{place_name}, Brownsville, Brooklyn, NY" |
| url = "https://nominatim.openstreetmap.org/search" |
| params = { |
| "q": search_query, |
| "format": "json", |
| "limit": 3, |
| "viewbox": f"{BROWNSVILLE_BOUNDS['min_lon']},{BROWNSVILLE_BOUNDS['max_lat']},{BROWNSVILLE_BOUNDS['max_lon']},{BROWNSVILLE_BOUNDS['min_lat']}", |
| "bounded": 0 |
| } |
| headers = {"User-Agent": "BrownsvilleEmergencyApp/1.0"} |
|
|
| response = requests.get(url, params=params, headers=headers, timeout=5) |
| results = response.json() |
|
|
| if results: |
| |
| for r in results: |
| lat, lon = float(r["lat"]), float(r["lon"]) |
| if (BROWNSVILLE_BOUNDS["min_lat"] - 0.02 <= lat <= BROWNSVILLE_BOUNDS["max_lat"] + 0.02 and |
| BROWNSVILLE_BOUNDS["min_lon"] - 0.02 <= lon <= BROWNSVILLE_BOUNDS["max_lon"] + 0.02): |
| return {"name": place_name, "lat": lat, "lon": lon} |
| |
| return {"name": place_name, "lat": float(results[0]["lat"]), "lon": float(results[0]["lon"])} |
| except Exception: |
| pass |
| return None |
|
|
|
|
| def geocode_query(query: str, resources_df: pd.DataFrame = None) -> tuple[str, dict]: |
| """ |
| Process query to resolve place names to coordinates BEFORE sending to LLM. |
| |
| Returns: |
| tuple: (modified_query, geocode_info) |
| - modified_query: Query with place names replaced by coordinates |
| - geocode_info: Dict of resolved places |
| """ |
| geocode_info = {} |
| modified = query |
|
|
| |
| matches = find_place_in_query(query) |
|
|
| for original_text, info in matches: |
| geocode_info[info["name"]] = { |
| "lat": info["lat"], |
| "lon": info["lon"], |
| "name": info["name"] |
| } |
| |
| pattern = re.compile(re.escape(original_text), re.IGNORECASE) |
| modified = pattern.sub( |
| f"(lat {info['lat']:.6f}, lon {info['lon']:.6f})", |
| modified, |
| count=1 |
| ) |
|
|
| |
| if not matches: |
| |
| location_patterns = [ |
| r"near\s+([A-Za-z][A-Za-z\s]+?)(?:\s+(?:and|&)\s+|\s*$)", |
| r"to\s+([A-Za-z][A-Za-z\s]+?)(?:\s+(?:and|&)\s+|\s*$)", |
| r"at\s+([A-Za-z][A-Za-z\s]+?)(?:\s+(?:and|&)\s+|\s*$)", |
| ] |
| for pattern in location_patterns: |
| match = re.search(pattern, query, re.IGNORECASE) |
| if match: |
| place_name = match.group(1).strip() |
| |
| if place_name.lower() not in ["pharmacy", "clinic", "hospital", "school", "library", "fire station", "police"]: |
| result = geocode_nominatim(place_name) |
| if result: |
| geocode_info[result["name"]] = result |
| modified = query.replace( |
| match.group(0), |
| f"near (lat {result['lat']:.6f}, lon {result['lon']:.6f}) " |
| ) |
| break |
|
|
| return modified, geocode_info |
|
|
|
|
| def _convert_climate_attrs_to_float(G: nx.MultiDiGraph) -> None: |
| """Convert climate attributes from strings to floats (GraphML stores all as strings).""" |
| climate_attrs = ['flood_risk', 'heat_risk', 'climate_risk', 'climate_weight'] |
|
|
| for u, v, data in G.edges(data=True): |
| for attr in climate_attrs: |
| if attr in data and isinstance(data[attr], str): |
| try: |
| data[attr] = float(data[attr]) |
| except (ValueError, TypeError): |
| data[attr] = 0.0 |
|
|
|
|
| def load_network_and_resources() -> tuple[nx.MultiDiGraph | None, pd.DataFrame | None, tuple[float, float]]: |
| """Load the walking network and resources from saved files.""" |
| data_dir = os.path.join(os.path.dirname(__file__), "data", "brownsville") |
|
|
| |
| center = (40.6594, -73.9126) |
|
|
| try: |
| |
| graphml_path = os.path.join(data_dir, "walking_network_final.graphml") |
| if os.path.exists(graphml_path): |
| G_walk = ox.load_graphml(graphml_path) |
| else: |
| |
| from shapely.geometry import box |
| brownsville_bbox = box(-73.93, 40.64, -73.89, 40.68) |
| G_walk = ox.graph_from_polygon(brownsville_bbox, network_type='walk', simplify=True) |
|
|
| |
| _convert_climate_attrs_to_float(G_walk) |
|
|
| |
| G_walk = ox.project_graph(G_walk) |
|
|
| |
| resources_path = os.path.join(data_dir, "all_resources.csv") |
| if os.path.exists(resources_path): |
| resources_df = pd.read_csv(resources_path) |
| else: |
| |
| geojson_path = os.path.join(data_dir, "all_resources.geojson") |
| if os.path.exists(geojson_path): |
| gdf = gpd.read_file(geojson_path) |
| resources_df = pd.DataFrame({ |
| "name": gdf["name"], |
| "type": gdf["type"], |
| "category": gdf["category"], |
| "lat": gdf.geometry.y, |
| "lon": gdf.geometry.x |
| }) |
| else: |
| resources_df = None |
|
|
| return G_walk, resources_df, center |
|
|
| except Exception as e: |
| print(f"Error loading data: {e}") |
| return None, None, center |
|
|
|
|
| def get_nearest_node(G: nx.MultiDiGraph, lat: float, lon: float) -> int: |
| """Find the nearest network node to a point (handles projected graphs).""" |
| |
| if "crs" in G.graph and G.graph["crs"] != "EPSG:4326": |
| import pyproj |
| transformer = pyproj.Transformer.from_crs("EPSG:4326", G.graph["crs"], always_xy=True) |
| x, y = transformer.transform(lon, lat) |
| return ox.nearest_nodes(G, x, y) |
| return ox.nearest_nodes(G, lon, lat) |
|
|
|
|
| def get_route_coords(G: nx.MultiDiGraph, route: list) -> list[tuple[float, float]]: |
| """Extract lat/lon coordinates from route nodes (handles projected graphs).""" |
| if "crs" in G.graph and G.graph["crs"] != "EPSG:4326": |
| import pyproj |
| transformer = pyproj.Transformer.from_crs(G.graph["crs"], "EPSG:4326", always_xy=True) |
| coords = [] |
| for node in route: |
| x, y = G.nodes[node]["x"], G.nodes[node]["y"] |
| lon, lat = transformer.transform(x, y) |
| coords.append((lat, lon)) |
| return coords |
| return [(G.nodes[node]["y"], G.nodes[node]["x"]) for node in route] |
|
|
|
|
| def compute_route_metrics(G: nx.MultiDiGraph, route: list) -> dict: |
| """ |
| Compute detailed metrics for a route including elevation profile. |
| |
| Returns dict with: |
| - elevation_gain_m: Total meters climbed |
| - elevation_loss_m: Total meters descended |
| - max_elevation_m: Highest point on route |
| - min_elevation_m: Lowest point on route |
| - avg_grade_pct: Average slope percentage |
| - max_grade_pct: Steepest segment |
| - difficulty: "flat", "moderate", or "hilly" |
| """ |
| if not route or len(route) < 2: |
| return { |
| "elevation_gain_m": 0, "elevation_loss_m": 0, |
| "max_elevation_m": 0, "min_elevation_m": 0, |
| "avg_grade_pct": 0, "max_grade_pct": 0, |
| "difficulty": "flat" |
| } |
|
|
| elevations = [] |
| grades = [] |
| elevation_gain = 0 |
| elevation_loss = 0 |
|
|
| for i, node in enumerate(route): |
| elev = G.nodes[node].get("elevation", 0) |
| if elev is None: |
| elev = 0 |
| elevations.append(elev) |
|
|
| if i > 0: |
| prev_elev = elevations[i-1] |
| diff = elev - prev_elev |
| if diff > 0: |
| elevation_gain += diff |
| else: |
| elevation_loss += abs(diff) |
|
|
| |
| prev_node = route[i-1] |
| edge_data = G.get_edge_data(prev_node, node) |
| if edge_data: |
| |
| first_edge = list(edge_data.values())[0] if isinstance(edge_data, dict) else edge_data |
| grade = abs(first_edge.get("grade", 0)) * 100 |
| grades.append(grade) |
|
|
| max_elev = max(elevations) if elevations else 0 |
| min_elev = min(elevations) if elevations else 0 |
| avg_grade = sum(grades) / len(grades) if grades else 0 |
| max_grade = max(grades) if grades else 0 |
|
|
| |
| if elevation_gain < 5 and max_grade < 3: |
| difficulty = "flat" |
| elif elevation_gain < 15 or max_grade < 8: |
| difficulty = "moderate" |
| else: |
| difficulty = "hilly" |
|
|
| return { |
| "elevation_gain_m": round(elevation_gain, 1), |
| "elevation_loss_m": round(elevation_loss, 1), |
| "max_elevation_m": round(max_elev, 1), |
| "min_elevation_m": round(min_elev, 1), |
| "avg_grade_pct": round(avg_grade, 1), |
| "max_grade_pct": round(max_grade, 1), |
| "difficulty": difficulty |
| } |
|
|
|
|
| def has_climate_data(G: nx.MultiDiGraph) -> bool: |
| """Check if the graph has climate data on edges.""" |
| sample_edge = next(iter(G.edges(data=True)), None) |
| if sample_edge and "climate_weight" in sample_edge[2]: |
| return True |
| return False |
|
|
|
|
| def compute_climate_metrics(G: nx.MultiDiGraph, route: list) -> dict: |
| """ |
| Compute climate risk metrics for a route. |
| |
| Returns dict with: |
| - avg_flood_risk: Average flood risk along route (0-1) |
| - max_flood_risk: Maximum flood risk encountered |
| - avg_heat_risk: Average heat risk along route (0-1) |
| - max_heat_risk: Maximum heat risk encountered |
| - avg_climate_risk: Combined climate risk (0-1) |
| - flood_exposure_m: Meters of route in high flood risk areas (>0.3) |
| """ |
| if not route or len(route) < 2: |
| return { |
| "avg_flood_risk": 0, "max_flood_risk": 0, |
| "avg_heat_risk": 0, "max_heat_risk": 0, |
| "avg_climate_risk": 0, "flood_exposure_m": 0 |
| } |
|
|
| flood_risks = [] |
| heat_risks = [] |
| climate_risks = [] |
| flood_exposure = 0.0 |
|
|
| for i in range(len(route) - 1): |
| u, v = route[i], route[i + 1] |
| edge_data = G.get_edge_data(u, v) |
| if not edge_data: |
| continue |
|
|
| |
| if isinstance(edge_data, dict) and 0 in edge_data: |
| data = edge_data[0] |
| else: |
| data = next(iter(edge_data.values())) if isinstance(edge_data, dict) else edge_data |
|
|
| flood = float(data.get("flood_risk", 0) or 0) |
| heat = float(data.get("heat_risk", 0) or 0) |
| climate = float(data.get("climate_risk", 0) or 0) |
| length = float(data.get("length", 0) or 0) |
|
|
| flood_risks.append(flood) |
| heat_risks.append(heat) |
| climate_risks.append(climate) |
|
|
| if flood > 0.3: |
| flood_exposure += length |
|
|
| if not climate_risks: |
| return { |
| "avg_flood_risk": 0, "max_flood_risk": 0, |
| "avg_heat_risk": 0, "max_heat_risk": 0, |
| "avg_climate_risk": 0, "flood_exposure_m": 0 |
| } |
|
|
| return { |
| "avg_flood_risk": round(sum(flood_risks) / len(flood_risks), 3), |
| "max_flood_risk": round(max(flood_risks), 3), |
| "avg_heat_risk": round(sum(heat_risks) / len(heat_risks), 3), |
| "max_heat_risk": round(max(heat_risks), 3), |
| "avg_climate_risk": round(sum(climate_risks) / len(climate_risks), 3), |
| "flood_exposure_m": round(flood_exposure, 1) |
| } |
|
|
|
|
| def apply_elevation_weights(G: nx.MultiDiGraph, penalty_factor: float = ELEVATION_PENALTY_FACTOR) -> nx.MultiDiGraph: |
| """ |
| Create a copy of the graph with edge weights adjusted for elevation. |
| |
| Penalizes uphill segments to find routes that minimize climbing. |
| """ |
| G_weighted = G.copy() |
|
|
| for u, v, key, data in G_weighted.edges(keys=True, data=True): |
| base_length = data.get("length", 1) |
|
|
| |
| elev_u = G_weighted.nodes[u].get("elevation", 0) or 0 |
| elev_v = G_weighted.nodes[v].get("elevation", 0) or 0 |
| elev_diff = elev_v - elev_u |
|
|
| |
| if elev_diff > 0: |
| |
| penalty = elev_diff * penalty_factor |
| data["weighted_length"] = base_length + penalty |
| else: |
| data["weighted_length"] = base_length |
|
|
| return G_weighted |
|
|
|
|
|
|
|
|
| def list_resources( |
| resources_df: pd.DataFrame, |
| category: str = "", |
| resource_type: str = "" |
| ) -> dict[str, Any]: |
| """List available resources with optional filtering.""" |
| if resources_df is None: |
| return {"error": "Resources not loaded"} |
|
|
| df = resources_df.copy() |
|
|
| |
| if isinstance(category, list): |
| category = category[0] if category else "" |
| if isinstance(resource_type, list): |
| resource_type = resource_type[0] if resource_type else "" |
|
|
| category = str(category) if category else "" |
| resource_type = str(resource_type) if resource_type else "" |
|
|
| if category: |
| df = df[df["category"] == category] |
|
|
| if resource_type: |
| df = df[df["type"] == resource_type] |
|
|
| |
| summary = df.groupby("type").agg({ |
| "name": "count", |
| "category": "first" |
| }).rename(columns={"name": "count"}).to_dict("index") |
|
|
| |
| resources = df[["name", "type", "category", "lat", "lon"]].to_dict("records") |
|
|
| return { |
| "total_count": len(df), |
| "by_type": summary, |
| "resources": resources[:20] |
| } |
|
|
|
|
| def find_nearest( |
| G: nx.MultiDiGraph, |
| resources_df: pd.DataFrame, |
| resource_type: str, |
| origin_lat: float = 40.6594, |
| origin_lon: float = -73.9126, |
| prefer_flat: bool = True, |
| prefer_safe: bool = True |
| ) -> tuple[dict[str, Any], dict | None]: |
| """ |
| Find the nearest resource of a given type with climate and elevation-aware routing. |
| |
| Args: |
| G: Walking network graph |
| resources_df: DataFrame of resources |
| resource_type: Type of resource to find |
| origin_lat: Origin latitude |
| origin_lon: Origin longitude |
| prefer_flat: If True, prefer routes with less elevation gain |
| prefer_safe: If True and climate data available, prefer climate-safe routes |
| """ |
| if resources_df is None: |
| return {"error": "Resources not loaded"}, None |
|
|
| |
| if isinstance(resource_type, list): |
| resource_type = resource_type[0] if resource_type else "" |
| resource_type = str(resource_type) if resource_type else "" |
|
|
| if not resource_type: |
| return {"error": "resource_type is required"}, None |
|
|
| |
| candidates = resources_df[resources_df["type"] == resource_type] |
|
|
| if len(candidates) == 0: |
| return {"error": f"No resources of type '{resource_type}' found"}, None |
|
|
| |
| try: |
| origin_node = get_nearest_node(G, origin_lat, origin_lon) |
| except Exception as e: |
| return {"error": f"Could not find origin on network: {e}"}, None |
|
|
| |
| graph_has_climate = has_climate_data(G) |
|
|
| if prefer_safe and graph_has_climate: |
| |
| G_routing = G |
| weight_key = "climate_weight" |
| elif prefer_flat: |
| |
| G_routing = apply_elevation_weights(G) |
| weight_key = "weighted_length" |
| else: |
| G_routing = G |
| weight_key = "length" |
|
|
| |
| best_resource = None |
| best_distance = float("inf") |
| best_route = None |
| best_actual_distance = 0 |
|
|
| for _, row in candidates.iterrows(): |
| try: |
| dest_node = get_nearest_node(G, row["lat"], row["lon"]) |
| |
| weighted_distance = nx.shortest_path_length(G_routing, origin_node, dest_node, weight=weight_key) |
|
|
| if weighted_distance < best_distance: |
| best_distance = weighted_distance |
| best_resource = row.to_dict() |
| best_route = nx.shortest_path(G_routing, origin_node, dest_node, weight=weight_key) |
| |
| best_actual_distance = nx.shortest_path_length(G, origin_node, dest_node, weight="length") |
| except nx.NetworkXNoPath: |
| continue |
| except Exception: |
| continue |
|
|
| if best_resource is None: |
| return {"error": f"Could not find a route to any {resource_type}"}, None |
|
|
| walk_time = best_actual_distance / WALK_SPEED_M_PER_MIN |
|
|
| |
| route_metrics = compute_route_metrics(G, best_route) |
|
|
| |
| route_coords = get_route_coords(G, best_route) |
|
|
| map_data = { |
| "route_coords": route_coords, |
| "origin": [origin_lat, origin_lon], |
| "destination": [best_resource["lat"], best_resource["lon"]], |
| "dest_name": best_resource["name"], |
| "distance": best_actual_distance |
| } |
|
|
| result = { |
| "found": True, |
| "name": best_resource["name"], |
| "type": best_resource["type"], |
| "category": best_resource["category"], |
| "lat": best_resource["lat"], |
| "lon": best_resource["lon"], |
| "distance_meters": round(best_actual_distance, 1), |
| "walking_time_minutes": round(walk_time, 1), |
| "origin": {"lat": origin_lat, "lon": origin_lon}, |
| "route_metrics": route_metrics |
| } |
|
|
| |
| if graph_has_climate: |
| result["climate_metrics"] = compute_climate_metrics(G, best_route) |
| result["climate_aware"] = prefer_safe |
|
|
| return result, map_data |
|
|
|
|
| def compute_single_route( |
| G: nx.MultiDiGraph, |
| origin_node: int, |
| dest_node: int, |
| weight_key: str = "length" |
| ) -> dict | None: |
| """Compute a single route and its metrics.""" |
| try: |
| route = nx.shortest_path(G, origin_node, dest_node, weight=weight_key) |
| |
| distance = sum( |
| G[u][v][0].get("length", 0) for u, v in zip(route[:-1], route[1:]) |
| ) |
| walk_time = distance / WALK_SPEED_M_PER_MIN |
| route_metrics = compute_route_metrics(G, route) |
| route_coords = get_route_coords(G, route) |
|
|
| return { |
| "route": route, |
| "coords": route_coords, |
| "distance_m": round(distance, 1), |
| "time_min": round(walk_time, 1), |
| "metrics": route_metrics |
| } |
| except nx.NetworkXNoPath: |
| return None |
| except Exception: |
| return None |
|
|
|
|
| def compute_alternative_routes( |
| G: nx.MultiDiGraph, |
| origin_lat: float, |
| origin_lon: float, |
| dest_lat: float, |
| dest_lon: float |
| ) -> list[dict]: |
| """ |
| Compute multiple route alternatives with different optimization criteria. |
| |
| Returns list of route options: |
| - shortest: Minimum distance |
| - flattest: Minimum elevation gain (penalizes uphill) |
| - balanced: Compromise between distance and elevation |
| - safest: Minimum climate risk (flood + heat) if climate data available |
| """ |
| origin_node = get_nearest_node(G, origin_lat, origin_lon) |
| dest_node = get_nearest_node(G, dest_lat, dest_lon) |
|
|
| routes = [] |
| graph_has_climate = has_climate_data(G) |
|
|
| |
| shortest = compute_single_route(G, origin_node, dest_node, weight_key="length") |
| if shortest: |
| shortest["name"] = "shortest" |
| shortest["label"] = "Shortest" |
| shortest["color"] = "#3b82f6" |
| if graph_has_climate: |
| shortest["climate_metrics"] = compute_climate_metrics(G, shortest["route"]) |
| routes.append(shortest) |
|
|
| |
| G_flat = apply_elevation_weights(G, penalty_factor=5.0) |
| flattest = compute_single_route(G_flat, origin_node, dest_node, weight_key="weighted_length") |
| if flattest: |
| flattest["name"] = "flattest" |
| flattest["label"] = "Flattest" |
| flattest["color"] = "#22c55e" |
| if graph_has_climate: |
| flattest["climate_metrics"] = compute_climate_metrics(G, flattest["route"]) |
| |
| if not shortest or flattest["coords"] != shortest["coords"]: |
| routes.append(flattest) |
|
|
| |
| G_balanced = apply_elevation_weights(G, penalty_factor=2.0) |
| balanced = compute_single_route(G_balanced, origin_node, dest_node, weight_key="weighted_length") |
| if balanced: |
| balanced["name"] = "balanced" |
| balanced["label"] = "Balanced" |
| balanced["color"] = "#f59e0b" |
| if graph_has_climate: |
| balanced["climate_metrics"] = compute_climate_metrics(G, balanced["route"]) |
| |
| existing_coords = [r["coords"] for r in routes] |
| if balanced["coords"] not in existing_coords: |
| routes.append(balanced) |
|
|
| |
| if graph_has_climate: |
| safest = compute_single_route(G, origin_node, dest_node, weight_key="climate_weight") |
| if safest: |
| safest["name"] = "safest" |
| safest["label"] = "Safest (Climate)" |
| safest["color"] = "#10b981" |
| safest["climate_metrics"] = compute_climate_metrics(G, safest["route"]) |
| |
| existing_coords = [r["coords"] for r in routes] |
| if safest["coords"] not in existing_coords: |
| routes.append(safest) |
|
|
| return routes |
|
|
|
|
| def calculate_route( |
| G: nx.MultiDiGraph, |
| origin_lat: float, |
| origin_lon: float, |
| dest_lat: float, |
| dest_lon: float, |
| dest_name: str = "Destination", |
| prefer_flat: bool = True, |
| prefer_safe: bool = True |
| ) -> tuple[dict[str, Any], dict | None]: |
| """ |
| Calculate multiple walking routes between two points with different criteria. |
| |
| Args: |
| G: Walking network graph |
| origin_lat, origin_lon: Origin coordinates |
| dest_lat, dest_lon: Destination coordinates |
| dest_name: Name of destination |
| prefer_flat: Prefer routes with less elevation gain |
| prefer_safe: If True and climate data available, recommend safest route |
| """ |
| try: |
| |
| alternatives = compute_alternative_routes(G, origin_lat, origin_lon, dest_lat, dest_lon) |
|
|
| if not alternatives: |
| return {"error": "No path found between origin and destination"}, None |
|
|
| |
| graph_has_climate = has_climate_data(G) |
|
|
| |
| if prefer_safe and graph_has_climate: |
| recommended = next((r for r in alternatives if r["name"] == "safest"), alternatives[0]) |
| elif prefer_flat: |
| recommended = next((r for r in alternatives if r["name"] == "flattest"), alternatives[0]) |
| else: |
| recommended = next((r for r in alternatives if r["name"] == "shortest"), alternatives[0]) |
|
|
| |
| map_data = { |
| "routes": [ |
| { |
| "coords": r["coords"], |
| "color": r["color"], |
| "label": r["label"], |
| "name": r["name"] |
| } |
| for r in alternatives |
| ], |
| "origin": [origin_lat, origin_lon], |
| "destination": [dest_lat, dest_lon], |
| "dest_name": dest_name, |
| "distance": recommended["distance_m"], |
| |
| "route_coords": recommended["coords"] |
| } |
|
|
| |
| alt_list = [] |
| for r in alternatives: |
| alt_info = { |
| "name": r["name"], |
| "label": r["label"], |
| "distance_meters": r["distance_m"], |
| "walking_time_minutes": r["time_min"], |
| "route_metrics": r["metrics"] |
| } |
| if graph_has_climate and "climate_metrics" in r: |
| alt_info["climate_metrics"] = r["climate_metrics"] |
| alt_list.append(alt_info) |
|
|
| |
| result = { |
| "success": True, |
| "recommended": recommended["name"], |
| "alternatives": alt_list, |
| "distance_meters": recommended["distance_m"], |
| "walking_time_minutes": recommended["time_min"], |
| "route_points": len(recommended["route"]), |
| "origin": {"lat": origin_lat, "lon": origin_lon}, |
| "destination": {"lat": dest_lat, "lon": dest_lon, "name": dest_name}, |
| "route_metrics": recommended["metrics"] |
| } |
|
|
| |
| if graph_has_climate: |
| result["climate_aware"] = True |
| if "climate_metrics" in recommended: |
| result["climate_metrics"] = recommended["climate_metrics"] |
|
|
| return result, map_data |
|
|
| except nx.NetworkXNoPath: |
| return {"error": "No path found between origin and destination"}, None |
| except Exception as e: |
| return {"error": f"Route calculation failed: {e}"}, None |
|
|
|
|
| def _safe_str(val, default: str = "") -> str: |
| """Safely convert a value to string, handling lists.""" |
| if val is None: |
| return default |
| if isinstance(val, list): |
| return str(val[0]) if val else default |
| return str(val) |
|
|
|
|
| def _safe_float(val, default: float) -> float: |
| """Safely convert a value to float.""" |
| if val is None: |
| return default |
| if isinstance(val, list): |
| val = val[0] if val else default |
| try: |
| return float(val) |
| except (ValueError, TypeError): |
| return default |
|
|
|
|
| def execute_tool( |
| tool_name: str, |
| args: dict, |
| G: nx.MultiDiGraph, |
| resources_df: pd.DataFrame |
| ) -> tuple[dict[str, Any], dict | None]: |
| """Execute a tool by name with given arguments.""" |
| if tool_name == "list_resources": |
| result = list_resources( |
| resources_df, |
| category=_safe_str(args.get("category"), ""), |
| resource_type=_safe_str(args.get("resource_type"), "") |
| ) |
| return result, None |
|
|
| elif tool_name == "find_nearest": |
| |
| lat = args.get("lat") or args.get("origin_lat") |
| lon = args.get("lon") or args.get("origin_lon") |
| return find_nearest( |
| G, |
| resources_df, |
| resource_type=_safe_str(args.get("resource_type"), ""), |
| origin_lat=_safe_float(lat, BROWNSVILLE_CENTER["lat"]), |
| origin_lon=_safe_float(lon, BROWNSVILLE_CENTER["lon"]) |
| ) |
|
|
| elif tool_name == "calculate_route": |
| return calculate_route( |
| G, |
| origin_lat=_safe_float(args.get("start_lat") or args.get("origin_lat"), BROWNSVILLE_CENTER["lat"]), |
| origin_lon=_safe_float(args.get("start_lon") or args.get("origin_lon"), BROWNSVILLE_CENTER["lon"]), |
| dest_lat=_safe_float(args.get("end_lat") or args.get("dest_lat"), BROWNSVILLE_CENTER["lat"]), |
| dest_lon=_safe_float(args.get("end_lon") or args.get("dest_lon"), BROWNSVILLE_CENTER["lon"]), |
| dest_name=_safe_str(args.get("dest_name"), "Destination") |
| ) |
|
|
| else: |
| return {"error": f"Unknown tool: {tool_name}"}, None |
|
|