""" Routing tools for the Emergency Routing Assistant. Uses dream-meridian pattern: 1. Geocode place names BEFORE sending to LLM 2. LLM outputs simple JSON tool call 3. Execute tool and return result """ import os import re import networkx as nx import pandas as pd import geopandas as gpd import osmnx as ox import requests from typing import Any # Walking speed: ~4.5 km/h = 75 m/min WALK_SPEED_M_PER_MIN = 75 # Elevation penalty factor for routing (higher = more avoidance of elevation gain) ELEVATION_PENALTY_FACTOR = 3.0 # Brownsville center and bounds BROWNSVILLE_CENTER = {"lat": 40.6594, "lon": -73.9126} BROWNSVILLE_BOUNDS = { "min_lat": 40.64, "max_lat": 40.68, "min_lon": -73.93, "max_lon": -73.89 } # Known places - loaded from data/brownsville/places.csv # These are matched BEFORE sending query to LLM KNOWN_PLACES = {} def load_known_places(): """Load known places from the places.csv file.""" global KNOWN_PLACES # Look in project root's data directory (one level up from core/) places_path = os.path.join(os.path.dirname(__file__), "..", "data", "brownsville", "places.csv") if os.path.exists(places_path): try: df = pd.read_csv(places_path) for _, row in df.iterrows(): name_lower = row['name_lower'] KNOWN_PLACES[name_lower] = { "lat": row['lat'], "lon": row['lon'], "name": row['name'] } print(f"Loaded {len(KNOWN_PLACES)} places for geocoding") except Exception as e: print(f"Warning: Could not load places: {e}") else: print(f"Warning: places.csv not found at {places_path}") # Load places on module import load_known_places() # Intersection patterns to match INTERSECTION_PATTERN = re.compile( r"(\w+(?:\s+\w+)?)\s+(?:and|&|at)\s+(\w+(?:\s+\w+)?)", re.IGNORECASE ) # ============================================================================ # Geocoding (runs BEFORE LLM) # ============================================================================ def find_place_in_query(query: str) -> list[tuple[str, dict]]: """ Find known place names in a query. Returns list of (matched_text, place_info) tuples. Matches longest places first. """ query_lower = query.lower() matches = [] used_spans = [] # Sort by name length (longest first) for greedy matching sorted_places = sorted(KNOWN_PLACES.items(), key=lambda x: -len(x[0])) for name_lower, info in sorted_places: # Use word boundaries pattern = r"\b" + re.escape(name_lower) + r"\b" for match in re.finditer(pattern, query_lower): start, end = match.span() # Check overlap with existing matches overlaps = any( not (end <= us or start >= ue) for us, ue in used_spans ) if not overlaps: original_text = query[start:end] matches.append((original_text, info)) used_spans.append((start, end)) return matches def geocode_nominatim(place_name: str) -> dict | None: """Fallback: geocode using Nominatim API.""" try: search_query = f"{place_name}, Brownsville, Brooklyn, NY" url = "https://nominatim.openstreetmap.org/search" params = { "q": search_query, "format": "json", "limit": 3, "viewbox": f"{BROWNSVILLE_BOUNDS['min_lon']},{BROWNSVILLE_BOUNDS['max_lat']},{BROWNSVILLE_BOUNDS['max_lon']},{BROWNSVILLE_BOUNDS['min_lat']}", "bounded": 0 } headers = {"User-Agent": "BrownsvilleEmergencyApp/1.0"} response = requests.get(url, params=params, headers=headers, timeout=5) results = response.json() if results: # Find result nearest to Brownsville for r in results: lat, lon = float(r["lat"]), float(r["lon"]) if (BROWNSVILLE_BOUNDS["min_lat"] - 0.02 <= lat <= BROWNSVILLE_BOUNDS["max_lat"] + 0.02 and BROWNSVILLE_BOUNDS["min_lon"] - 0.02 <= lon <= BROWNSVILLE_BOUNDS["max_lon"] + 0.02): return {"name": place_name, "lat": lat, "lon": lon} # Fallback to first result return {"name": place_name, "lat": float(results[0]["lat"]), "lon": float(results[0]["lon"])} except Exception: pass return None def geocode_query(query: str, resources_df: pd.DataFrame = None) -> tuple[str, dict]: """ Process query to resolve place names to coordinates BEFORE sending to LLM. Returns: tuple: (modified_query, geocode_info) - modified_query: Query with place names replaced by coordinates - geocode_info: Dict of resolved places """ geocode_info = {} modified = query # First try known places matches = find_place_in_query(query) for original_text, info in matches: geocode_info[info["name"]] = { "lat": info["lat"], "lon": info["lon"], "name": info["name"] } # Replace in query with coordinates pattern = re.compile(re.escape(original_text), re.IGNORECASE) modified = pattern.sub( f"(lat {info['lat']:.6f}, lon {info['lon']:.6f})", modified, count=1 ) # If no matches, try to find location phrases and geocode them if not matches: # Look for "near X", "at X", "to X" patterns location_patterns = [ r"near\s+([A-Za-z][A-Za-z\s]+?)(?:\s+(?:and|&)\s+|\s*$)", r"to\s+([A-Za-z][A-Za-z\s]+?)(?:\s+(?:and|&)\s+|\s*$)", r"at\s+([A-Za-z][A-Za-z\s]+?)(?:\s+(?:and|&)\s+|\s*$)", ] for pattern in location_patterns: match = re.search(pattern, query, re.IGNORECASE) if match: place_name = match.group(1).strip() # Skip if it's a resource type if place_name.lower() not in ["pharmacy", "clinic", "hospital", "school", "library", "fire station", "police"]: result = geocode_nominatim(place_name) if result: geocode_info[result["name"]] = result modified = query.replace( match.group(0), f"near (lat {result['lat']:.6f}, lon {result['lon']:.6f}) " ) break return modified, geocode_info def _convert_climate_attrs_to_float(G: nx.MultiDiGraph) -> None: """Convert climate attributes from strings to floats (GraphML stores all as strings).""" climate_attrs = ['flood_risk', 'heat_risk', 'climate_risk', 'climate_weight'] for u, v, data in G.edges(data=True): for attr in climate_attrs: if attr in data and isinstance(data[attr], str): try: data[attr] = float(data[attr]) except (ValueError, TypeError): data[attr] = 0.0 def load_network_and_resources() -> tuple[nx.MultiDiGraph | None, pd.DataFrame | None, tuple[float, float]]: """Load the walking network and resources from saved files.""" data_dir = os.path.join(os.path.dirname(__file__), "data", "brownsville") # Default center center = (40.6594, -73.9126) try: # Load walking network graphml_path = os.path.join(data_dir, "walking_network_final.graphml") if os.path.exists(graphml_path): G_walk = ox.load_graphml(graphml_path) else: # Fallback: load from OSM directly from shapely.geometry import box brownsville_bbox = box(-73.93, 40.64, -73.89, 40.68) G_walk = ox.graph_from_polygon(brownsville_bbox, network_type='walk', simplify=True) # Convert climate attributes from strings to floats _convert_climate_attrs_to_float(G_walk) # Project the graph to enable fast nearest_nodes lookup without scikit-learn G_walk = ox.project_graph(G_walk) # Load resources resources_path = os.path.join(data_dir, "all_resources.csv") if os.path.exists(resources_path): resources_df = pd.read_csv(resources_path) else: # Try GeoJSON geojson_path = os.path.join(data_dir, "all_resources.geojson") if os.path.exists(geojson_path): gdf = gpd.read_file(geojson_path) resources_df = pd.DataFrame({ "name": gdf["name"], "type": gdf["type"], "category": gdf["category"], "lat": gdf.geometry.y, "lon": gdf.geometry.x }) else: resources_df = None return G_walk, resources_df, center except Exception as e: print(f"Error loading data: {e}") return None, None, center def get_nearest_node(G: nx.MultiDiGraph, lat: float, lon: float) -> int: """Find the nearest network node to a point (handles projected graphs).""" # For projected graphs, convert lat/lon to projected coordinates if "crs" in G.graph and G.graph["crs"] != "EPSG:4326": import pyproj transformer = pyproj.Transformer.from_crs("EPSG:4326", G.graph["crs"], always_xy=True) x, y = transformer.transform(lon, lat) return ox.nearest_nodes(G, x, y) return ox.nearest_nodes(G, lon, lat) def get_route_coords(G: nx.MultiDiGraph, route: list) -> list[tuple[float, float]]: """Extract lat/lon coordinates from route nodes (handles projected graphs).""" if "crs" in G.graph and G.graph["crs"] != "EPSG:4326": import pyproj transformer = pyproj.Transformer.from_crs(G.graph["crs"], "EPSG:4326", always_xy=True) coords = [] for node in route: x, y = G.nodes[node]["x"], G.nodes[node]["y"] lon, lat = transformer.transform(x, y) coords.append((lat, lon)) return coords return [(G.nodes[node]["y"], G.nodes[node]["x"]) for node in route] def compute_route_metrics(G: nx.MultiDiGraph, route: list) -> dict: """ Compute detailed metrics for a route including elevation profile. Returns dict with: - elevation_gain_m: Total meters climbed - elevation_loss_m: Total meters descended - max_elevation_m: Highest point on route - min_elevation_m: Lowest point on route - avg_grade_pct: Average slope percentage - max_grade_pct: Steepest segment - difficulty: "flat", "moderate", or "hilly" """ if not route or len(route) < 2: return { "elevation_gain_m": 0, "elevation_loss_m": 0, "max_elevation_m": 0, "min_elevation_m": 0, "avg_grade_pct": 0, "max_grade_pct": 0, "difficulty": "flat" } elevations = [] grades = [] elevation_gain = 0 elevation_loss = 0 for i, node in enumerate(route): elev = G.nodes[node].get("elevation", 0) if elev is None: elev = 0 elevations.append(elev) if i > 0: prev_elev = elevations[i-1] diff = elev - prev_elev if diff > 0: elevation_gain += diff else: elevation_loss += abs(diff) # Get grade from edge if available prev_node = route[i-1] edge_data = G.get_edge_data(prev_node, node) if edge_data: # MultiDiGraph returns dict of edges first_edge = list(edge_data.values())[0] if isinstance(edge_data, dict) else edge_data grade = abs(first_edge.get("grade", 0)) * 100 # Convert to percentage grades.append(grade) max_elev = max(elevations) if elevations else 0 min_elev = min(elevations) if elevations else 0 avg_grade = sum(grades) / len(grades) if grades else 0 max_grade = max(grades) if grades else 0 # Classify difficulty if elevation_gain < 5 and max_grade < 3: difficulty = "flat" elif elevation_gain < 15 or max_grade < 8: difficulty = "moderate" else: difficulty = "hilly" return { "elevation_gain_m": round(elevation_gain, 1), "elevation_loss_m": round(elevation_loss, 1), "max_elevation_m": round(max_elev, 1), "min_elevation_m": round(min_elev, 1), "avg_grade_pct": round(avg_grade, 1), "max_grade_pct": round(max_grade, 1), "difficulty": difficulty } def has_climate_data(G: nx.MultiDiGraph) -> bool: """Check if the graph has climate data on edges.""" sample_edge = next(iter(G.edges(data=True)), None) if sample_edge and "climate_weight" in sample_edge[2]: return True return False def compute_climate_metrics(G: nx.MultiDiGraph, route: list) -> dict: """ Compute climate risk metrics for a route. Returns dict with: - avg_flood_risk: Average flood risk along route (0-1) - max_flood_risk: Maximum flood risk encountered - avg_heat_risk: Average heat risk along route (0-1) - max_heat_risk: Maximum heat risk encountered - avg_climate_risk: Combined climate risk (0-1) - flood_exposure_m: Meters of route in high flood risk areas (>0.3) """ if not route or len(route) < 2: return { "avg_flood_risk": 0, "max_flood_risk": 0, "avg_heat_risk": 0, "max_heat_risk": 0, "avg_climate_risk": 0, "flood_exposure_m": 0 } flood_risks = [] heat_risks = [] climate_risks = [] flood_exposure = 0.0 for i in range(len(route) - 1): u, v = route[i], route[i + 1] edge_data = G.get_edge_data(u, v) if not edge_data: continue # Get first edge (MultiDiGraph may have multiple) if isinstance(edge_data, dict) and 0 in edge_data: data = edge_data[0] else: data = next(iter(edge_data.values())) if isinstance(edge_data, dict) else edge_data flood = float(data.get("flood_risk", 0) or 0) heat = float(data.get("heat_risk", 0) or 0) climate = float(data.get("climate_risk", 0) or 0) length = float(data.get("length", 0) or 0) flood_risks.append(flood) heat_risks.append(heat) climate_risks.append(climate) if flood > 0.3: flood_exposure += length if not climate_risks: return { "avg_flood_risk": 0, "max_flood_risk": 0, "avg_heat_risk": 0, "max_heat_risk": 0, "avg_climate_risk": 0, "flood_exposure_m": 0 } return { "avg_flood_risk": round(sum(flood_risks) / len(flood_risks), 3), "max_flood_risk": round(max(flood_risks), 3), "avg_heat_risk": round(sum(heat_risks) / len(heat_risks), 3), "max_heat_risk": round(max(heat_risks), 3), "avg_climate_risk": round(sum(climate_risks) / len(climate_risks), 3), "flood_exposure_m": round(flood_exposure, 1) } def apply_elevation_weights(G: nx.MultiDiGraph, penalty_factor: float = ELEVATION_PENALTY_FACTOR) -> nx.MultiDiGraph: """ Create a copy of the graph with edge weights adjusted for elevation. Penalizes uphill segments to find routes that minimize climbing. """ G_weighted = G.copy() for u, v, key, data in G_weighted.edges(keys=True, data=True): base_length = data.get("length", 1) # Get elevation change elev_u = G_weighted.nodes[u].get("elevation", 0) or 0 elev_v = G_weighted.nodes[v].get("elevation", 0) or 0 elev_diff = elev_v - elev_u # Only penalize uphill (positive elevation change) if elev_diff > 0: # Penalty proportional to climb: each meter of climb adds penalty_factor meters equivalent penalty = elev_diff * penalty_factor data["weighted_length"] = base_length + penalty else: data["weighted_length"] = base_length return G_weighted def list_resources( resources_df: pd.DataFrame, category: str = "", resource_type: str = "" ) -> dict[str, Any]: """List available resources with optional filtering.""" if resources_df is None: return {"error": "Resources not loaded"} df = resources_df.copy() # Ensure category and resource_type are strings (LLM might pass lists or other types) if isinstance(category, list): category = category[0] if category else "" if isinstance(resource_type, list): resource_type = resource_type[0] if resource_type else "" category = str(category) if category else "" resource_type = str(resource_type) if resource_type else "" if category: df = df[df["category"] == category] if resource_type: df = df[df["type"] == resource_type] # Group by type summary = df.groupby("type").agg({ "name": "count", "category": "first" }).rename(columns={"name": "count"}).to_dict("index") # List of resources resources = df[["name", "type", "category", "lat", "lon"]].to_dict("records") return { "total_count": len(df), "by_type": summary, "resources": resources[:20] # Limit to 20 for display } def find_nearest( G: nx.MultiDiGraph, resources_df: pd.DataFrame, resource_type: str, origin_lat: float = 40.6594, origin_lon: float = -73.9126, prefer_flat: bool = True, prefer_safe: bool = True ) -> tuple[dict[str, Any], dict | None]: """ Find the nearest resource of a given type with climate and elevation-aware routing. Args: G: Walking network graph resources_df: DataFrame of resources resource_type: Type of resource to find origin_lat: Origin latitude origin_lon: Origin longitude prefer_flat: If True, prefer routes with less elevation gain prefer_safe: If True and climate data available, prefer climate-safe routes """ if resources_df is None: return {"error": "Resources not loaded"}, None # Ensure resource_type is a string if isinstance(resource_type, list): resource_type = resource_type[0] if resource_type else "" resource_type = str(resource_type) if resource_type else "" if not resource_type: return {"error": "resource_type is required"}, None # Filter by type candidates = resources_df[resources_df["type"] == resource_type] if len(candidates) == 0: return {"error": f"No resources of type '{resource_type}' found"}, None # Get origin node try: origin_node = get_nearest_node(G, origin_lat, origin_lon) except Exception as e: return {"error": f"Could not find origin on network: {e}"}, None # Choose routing strategy based on climate data availability graph_has_climate = has_climate_data(G) if prefer_safe and graph_has_climate: # Use climate-aware routing (uses pre-computed climate_weight) G_routing = G weight_key = "climate_weight" elif prefer_flat: # Fall back to elevation-aware routing G_routing = apply_elevation_weights(G) weight_key = "weighted_length" else: G_routing = G weight_key = "length" # Find nearest best_resource = None best_distance = float("inf") best_route = None best_actual_distance = 0 for _, row in candidates.iterrows(): try: dest_node = get_nearest_node(G, row["lat"], row["lon"]) # Use weighted distance for comparison weighted_distance = nx.shortest_path_length(G_routing, origin_node, dest_node, weight=weight_key) if weighted_distance < best_distance: best_distance = weighted_distance best_resource = row.to_dict() best_route = nx.shortest_path(G_routing, origin_node, dest_node, weight=weight_key) # Calculate actual distance (unweighted) best_actual_distance = nx.shortest_path_length(G, origin_node, dest_node, weight="length") except nx.NetworkXNoPath: continue except Exception: continue if best_resource is None: return {"error": f"Could not find a route to any {resource_type}"}, None walk_time = best_actual_distance / WALK_SPEED_M_PER_MIN # Compute route metrics route_metrics = compute_route_metrics(G, best_route) # Build map data (convert projected coords to lat/lon) route_coords = get_route_coords(G, best_route) map_data = { "route_coords": route_coords, "origin": [origin_lat, origin_lon], "destination": [best_resource["lat"], best_resource["lon"]], "dest_name": best_resource["name"], "distance": best_actual_distance } result = { "found": True, "name": best_resource["name"], "type": best_resource["type"], "category": best_resource["category"], "lat": best_resource["lat"], "lon": best_resource["lon"], "distance_meters": round(best_actual_distance, 1), "walking_time_minutes": round(walk_time, 1), "origin": {"lat": origin_lat, "lon": origin_lon}, "route_metrics": route_metrics } # Add climate metrics if available if graph_has_climate: result["climate_metrics"] = compute_climate_metrics(G, best_route) result["climate_aware"] = prefer_safe return result, map_data def compute_single_route( G: nx.MultiDiGraph, origin_node: int, dest_node: int, weight_key: str = "length" ) -> dict | None: """Compute a single route and its metrics.""" try: route = nx.shortest_path(G, origin_node, dest_node, weight=weight_key) # Always compute actual distance using length distance = sum( G[u][v][0].get("length", 0) for u, v in zip(route[:-1], route[1:]) ) walk_time = distance / WALK_SPEED_M_PER_MIN route_metrics = compute_route_metrics(G, route) route_coords = get_route_coords(G, route) return { "route": route, "coords": route_coords, "distance_m": round(distance, 1), "time_min": round(walk_time, 1), "metrics": route_metrics } except nx.NetworkXNoPath: return None except Exception: return None def compute_alternative_routes( G: nx.MultiDiGraph, origin_lat: float, origin_lon: float, dest_lat: float, dest_lon: float ) -> list[dict]: """ Compute multiple route alternatives with different optimization criteria. Returns list of route options: - shortest: Minimum distance - flattest: Minimum elevation gain (penalizes uphill) - balanced: Compromise between distance and elevation - safest: Minimum climate risk (flood + heat) if climate data available """ origin_node = get_nearest_node(G, origin_lat, origin_lon) dest_node = get_nearest_node(G, dest_lat, dest_lon) routes = [] graph_has_climate = has_climate_data(G) # 1. Shortest route (distance only) shortest = compute_single_route(G, origin_node, dest_node, weight_key="length") if shortest: shortest["name"] = "shortest" shortest["label"] = "Shortest" shortest["color"] = "#3b82f6" # Blue if graph_has_climate: shortest["climate_metrics"] = compute_climate_metrics(G, shortest["route"]) routes.append(shortest) # 2. Flattest route (heavy elevation penalty) G_flat = apply_elevation_weights(G, penalty_factor=5.0) flattest = compute_single_route(G_flat, origin_node, dest_node, weight_key="weighted_length") if flattest: flattest["name"] = "flattest" flattest["label"] = "Flattest" flattest["color"] = "#22c55e" # Green if graph_has_climate: flattest["climate_metrics"] = compute_climate_metrics(G, flattest["route"]) # Check if it's actually different from shortest if not shortest or flattest["coords"] != shortest["coords"]: routes.append(flattest) # 3. Balanced route (moderate elevation penalty) G_balanced = apply_elevation_weights(G, penalty_factor=2.0) balanced = compute_single_route(G_balanced, origin_node, dest_node, weight_key="weighted_length") if balanced: balanced["name"] = "balanced" balanced["label"] = "Balanced" balanced["color"] = "#f59e0b" # Amber if graph_has_climate: balanced["climate_metrics"] = compute_climate_metrics(G, balanced["route"]) # Check if it's different from both shortest and flattest existing_coords = [r["coords"] for r in routes] if balanced["coords"] not in existing_coords: routes.append(balanced) # 4. Safest route (climate-aware) - only if climate data available if graph_has_climate: safest = compute_single_route(G, origin_node, dest_node, weight_key="climate_weight") if safest: safest["name"] = "safest" safest["label"] = "Safest (Climate)" safest["color"] = "#10b981" # Emerald safest["climate_metrics"] = compute_climate_metrics(G, safest["route"]) # Check if it's different from existing routes existing_coords = [r["coords"] for r in routes] if safest["coords"] not in existing_coords: routes.append(safest) return routes def calculate_route( G: nx.MultiDiGraph, origin_lat: float, origin_lon: float, dest_lat: float, dest_lon: float, dest_name: str = "Destination", prefer_flat: bool = True, prefer_safe: bool = True ) -> tuple[dict[str, Any], dict | None]: """ Calculate multiple walking routes between two points with different criteria. Args: G: Walking network graph origin_lat, origin_lon: Origin coordinates dest_lat, dest_lon: Destination coordinates dest_name: Name of destination prefer_flat: Prefer routes with less elevation gain prefer_safe: If True and climate data available, recommend safest route """ try: # Compute all route alternatives alternatives = compute_alternative_routes(G, origin_lat, origin_lon, dest_lat, dest_lon) if not alternatives: return {"error": "No path found between origin and destination"}, None # Check if climate data is available graph_has_climate = has_climate_data(G) # Pick recommended route based on preferences if prefer_safe and graph_has_climate: recommended = next((r for r in alternatives if r["name"] == "safest"), alternatives[0]) elif prefer_flat: recommended = next((r for r in alternatives if r["name"] == "flattest"), alternatives[0]) else: recommended = next((r for r in alternatives if r["name"] == "shortest"), alternatives[0]) # Build map data with all routes map_data = { "routes": [ { "coords": r["coords"], "color": r["color"], "label": r["label"], "name": r["name"] } for r in alternatives ], "origin": [origin_lat, origin_lon], "destination": [dest_lat, dest_lon], "dest_name": dest_name, "distance": recommended["distance_m"], # Keep route_coords for backwards compatibility "route_coords": recommended["coords"] } # Build alternatives with climate metrics if available alt_list = [] for r in alternatives: alt_info = { "name": r["name"], "label": r["label"], "distance_meters": r["distance_m"], "walking_time_minutes": r["time_min"], "route_metrics": r["metrics"] } if graph_has_climate and "climate_metrics" in r: alt_info["climate_metrics"] = r["climate_metrics"] alt_list.append(alt_info) # Build result result = { "success": True, "recommended": recommended["name"], "alternatives": alt_list, "distance_meters": recommended["distance_m"], "walking_time_minutes": recommended["time_min"], "route_points": len(recommended["route"]), "origin": {"lat": origin_lat, "lon": origin_lon}, "destination": {"lat": dest_lat, "lon": dest_lon, "name": dest_name}, "route_metrics": recommended["metrics"] } # Add climate metrics to result if available if graph_has_climate: result["climate_aware"] = True if "climate_metrics" in recommended: result["climate_metrics"] = recommended["climate_metrics"] return result, map_data except nx.NetworkXNoPath: return {"error": "No path found between origin and destination"}, None except Exception as e: return {"error": f"Route calculation failed: {e}"}, None def _safe_str(val, default: str = "") -> str: """Safely convert a value to string, handling lists.""" if val is None: return default if isinstance(val, list): return str(val[0]) if val else default return str(val) def _safe_float(val, default: float) -> float: """Safely convert a value to float.""" if val is None: return default if isinstance(val, list): val = val[0] if val else default try: return float(val) except (ValueError, TypeError): return default def execute_tool( tool_name: str, args: dict, G: nx.MultiDiGraph, resources_df: pd.DataFrame ) -> tuple[dict[str, Any], dict | None]: """Execute a tool by name with given arguments.""" if tool_name == "list_resources": result = list_resources( resources_df, category=_safe_str(args.get("category"), ""), resource_type=_safe_str(args.get("resource_type"), "") ) return result, None elif tool_name == "find_nearest": # Accept both "lat"/"lon" (from system prompt) and "origin_lat"/"origin_lon" lat = args.get("lat") or args.get("origin_lat") lon = args.get("lon") or args.get("origin_lon") return find_nearest( G, resources_df, resource_type=_safe_str(args.get("resource_type"), ""), origin_lat=_safe_float(lat, BROWNSVILLE_CENTER["lat"]), origin_lon=_safe_float(lon, BROWNSVILLE_CENTER["lon"]) ) elif tool_name == "calculate_route": return calculate_route( G, origin_lat=_safe_float(args.get("start_lat") or args.get("origin_lat"), BROWNSVILLE_CENTER["lat"]), origin_lon=_safe_float(args.get("start_lon") or args.get("origin_lon"), BROWNSVILLE_CENTER["lon"]), dest_lat=_safe_float(args.get("end_lat") or args.get("dest_lat"), BROWNSVILLE_CENTER["lat"]), dest_lon=_safe_float(args.get("end_lon") or args.get("dest_lon"), BROWNSVILLE_CENTER["lon"]), dest_name=_safe_str(args.get("dest_name"), "Destination") ) else: return {"error": f"Unknown tool: {tool_name}"}, None