our-era / core /tools.py
msradam's picture
Add full deployment package for HF Spaces
1a5769f
"""
Routing tools for the Emergency Routing Assistant.
Uses dream-meridian pattern:
1. Geocode place names BEFORE sending to LLM
2. LLM outputs simple JSON tool call
3. Execute tool and return result
"""
import os
import re
import networkx as nx
import pandas as pd
import geopandas as gpd
import osmnx as ox
import requests
from typing import Any
# Walking speed: ~4.5 km/h = 75 m/min
WALK_SPEED_M_PER_MIN = 75
# Elevation penalty factor for routing (higher = more avoidance of elevation gain)
ELEVATION_PENALTY_FACTOR = 3.0
# Brownsville center and bounds
BROWNSVILLE_CENTER = {"lat": 40.6594, "lon": -73.9126}
BROWNSVILLE_BOUNDS = {
"min_lat": 40.64,
"max_lat": 40.68,
"min_lon": -73.93,
"max_lon": -73.89
}
# Known places - loaded from data/brownsville/places.csv
# These are matched BEFORE sending query to LLM
KNOWN_PLACES = {}
def load_known_places():
"""Load known places from the places.csv file."""
global KNOWN_PLACES
# Look in project root's data directory (one level up from core/)
places_path = os.path.join(os.path.dirname(__file__), "..", "data", "brownsville", "places.csv")
if os.path.exists(places_path):
try:
df = pd.read_csv(places_path)
for _, row in df.iterrows():
name_lower = row['name_lower']
KNOWN_PLACES[name_lower] = {
"lat": row['lat'],
"lon": row['lon'],
"name": row['name']
}
print(f"Loaded {len(KNOWN_PLACES)} places for geocoding")
except Exception as e:
print(f"Warning: Could not load places: {e}")
else:
print(f"Warning: places.csv not found at {places_path}")
# Load places on module import
load_known_places()
# Intersection patterns to match
INTERSECTION_PATTERN = re.compile(
r"(\w+(?:\s+\w+)?)\s+(?:and|&|at)\s+(\w+(?:\s+\w+)?)",
re.IGNORECASE
)
# ============================================================================
# Geocoding (runs BEFORE LLM)
# ============================================================================
def find_place_in_query(query: str) -> list[tuple[str, dict]]:
"""
Find known place names in a query.
Returns list of (matched_text, place_info) tuples.
Matches longest places first.
"""
query_lower = query.lower()
matches = []
used_spans = []
# Sort by name length (longest first) for greedy matching
sorted_places = sorted(KNOWN_PLACES.items(), key=lambda x: -len(x[0]))
for name_lower, info in sorted_places:
# Use word boundaries
pattern = r"\b" + re.escape(name_lower) + r"\b"
for match in re.finditer(pattern, query_lower):
start, end = match.span()
# Check overlap with existing matches
overlaps = any(
not (end <= us or start >= ue) for us, ue in used_spans
)
if not overlaps:
original_text = query[start:end]
matches.append((original_text, info))
used_spans.append((start, end))
return matches
def geocode_nominatim(place_name: str) -> dict | None:
"""Fallback: geocode using Nominatim API."""
try:
search_query = f"{place_name}, Brownsville, Brooklyn, NY"
url = "https://nominatim.openstreetmap.org/search"
params = {
"q": search_query,
"format": "json",
"limit": 3,
"viewbox": f"{BROWNSVILLE_BOUNDS['min_lon']},{BROWNSVILLE_BOUNDS['max_lat']},{BROWNSVILLE_BOUNDS['max_lon']},{BROWNSVILLE_BOUNDS['min_lat']}",
"bounded": 0
}
headers = {"User-Agent": "BrownsvilleEmergencyApp/1.0"}
response = requests.get(url, params=params, headers=headers, timeout=5)
results = response.json()
if results:
# Find result nearest to Brownsville
for r in results:
lat, lon = float(r["lat"]), float(r["lon"])
if (BROWNSVILLE_BOUNDS["min_lat"] - 0.02 <= lat <= BROWNSVILLE_BOUNDS["max_lat"] + 0.02 and
BROWNSVILLE_BOUNDS["min_lon"] - 0.02 <= lon <= BROWNSVILLE_BOUNDS["max_lon"] + 0.02):
return {"name": place_name, "lat": lat, "lon": lon}
# Fallback to first result
return {"name": place_name, "lat": float(results[0]["lat"]), "lon": float(results[0]["lon"])}
except Exception:
pass
return None
def geocode_query(query: str, resources_df: pd.DataFrame = None) -> tuple[str, dict]:
"""
Process query to resolve place names to coordinates BEFORE sending to LLM.
Returns:
tuple: (modified_query, geocode_info)
- modified_query: Query with place names replaced by coordinates
- geocode_info: Dict of resolved places
"""
geocode_info = {}
modified = query
# First try known places
matches = find_place_in_query(query)
for original_text, info in matches:
geocode_info[info["name"]] = {
"lat": info["lat"],
"lon": info["lon"],
"name": info["name"]
}
# Replace in query with coordinates
pattern = re.compile(re.escape(original_text), re.IGNORECASE)
modified = pattern.sub(
f"(lat {info['lat']:.6f}, lon {info['lon']:.6f})",
modified,
count=1
)
# If no matches, try to find location phrases and geocode them
if not matches:
# Look for "near X", "at X", "to X" patterns
location_patterns = [
r"near\s+([A-Za-z][A-Za-z\s]+?)(?:\s+(?:and|&)\s+|\s*$)",
r"to\s+([A-Za-z][A-Za-z\s]+?)(?:\s+(?:and|&)\s+|\s*$)",
r"at\s+([A-Za-z][A-Za-z\s]+?)(?:\s+(?:and|&)\s+|\s*$)",
]
for pattern in location_patterns:
match = re.search(pattern, query, re.IGNORECASE)
if match:
place_name = match.group(1).strip()
# Skip if it's a resource type
if place_name.lower() not in ["pharmacy", "clinic", "hospital", "school", "library", "fire station", "police"]:
result = geocode_nominatim(place_name)
if result:
geocode_info[result["name"]] = result
modified = query.replace(
match.group(0),
f"near (lat {result['lat']:.6f}, lon {result['lon']:.6f}) "
)
break
return modified, geocode_info
def _convert_climate_attrs_to_float(G: nx.MultiDiGraph) -> None:
"""Convert climate attributes from strings to floats (GraphML stores all as strings)."""
climate_attrs = ['flood_risk', 'heat_risk', 'climate_risk', 'climate_weight']
for u, v, data in G.edges(data=True):
for attr in climate_attrs:
if attr in data and isinstance(data[attr], str):
try:
data[attr] = float(data[attr])
except (ValueError, TypeError):
data[attr] = 0.0
def load_network_and_resources() -> tuple[nx.MultiDiGraph | None, pd.DataFrame | None, tuple[float, float]]:
"""Load the walking network and resources from saved files."""
data_dir = os.path.join(os.path.dirname(__file__), "data", "brownsville")
# Default center
center = (40.6594, -73.9126)
try:
# Load walking network
graphml_path = os.path.join(data_dir, "walking_network_final.graphml")
if os.path.exists(graphml_path):
G_walk = ox.load_graphml(graphml_path)
else:
# Fallback: load from OSM directly
from shapely.geometry import box
brownsville_bbox = box(-73.93, 40.64, -73.89, 40.68)
G_walk = ox.graph_from_polygon(brownsville_bbox, network_type='walk', simplify=True)
# Convert climate attributes from strings to floats
_convert_climate_attrs_to_float(G_walk)
# Project the graph to enable fast nearest_nodes lookup without scikit-learn
G_walk = ox.project_graph(G_walk)
# Load resources
resources_path = os.path.join(data_dir, "all_resources.csv")
if os.path.exists(resources_path):
resources_df = pd.read_csv(resources_path)
else:
# Try GeoJSON
geojson_path = os.path.join(data_dir, "all_resources.geojson")
if os.path.exists(geojson_path):
gdf = gpd.read_file(geojson_path)
resources_df = pd.DataFrame({
"name": gdf["name"],
"type": gdf["type"],
"category": gdf["category"],
"lat": gdf.geometry.y,
"lon": gdf.geometry.x
})
else:
resources_df = None
return G_walk, resources_df, center
except Exception as e:
print(f"Error loading data: {e}")
return None, None, center
def get_nearest_node(G: nx.MultiDiGraph, lat: float, lon: float) -> int:
"""Find the nearest network node to a point (handles projected graphs)."""
# For projected graphs, convert lat/lon to projected coordinates
if "crs" in G.graph and G.graph["crs"] != "EPSG:4326":
import pyproj
transformer = pyproj.Transformer.from_crs("EPSG:4326", G.graph["crs"], always_xy=True)
x, y = transformer.transform(lon, lat)
return ox.nearest_nodes(G, x, y)
return ox.nearest_nodes(G, lon, lat)
def get_route_coords(G: nx.MultiDiGraph, route: list) -> list[tuple[float, float]]:
"""Extract lat/lon coordinates from route nodes (handles projected graphs)."""
if "crs" in G.graph and G.graph["crs"] != "EPSG:4326":
import pyproj
transformer = pyproj.Transformer.from_crs(G.graph["crs"], "EPSG:4326", always_xy=True)
coords = []
for node in route:
x, y = G.nodes[node]["x"], G.nodes[node]["y"]
lon, lat = transformer.transform(x, y)
coords.append((lat, lon))
return coords
return [(G.nodes[node]["y"], G.nodes[node]["x"]) for node in route]
def compute_route_metrics(G: nx.MultiDiGraph, route: list) -> dict:
"""
Compute detailed metrics for a route including elevation profile.
Returns dict with:
- elevation_gain_m: Total meters climbed
- elevation_loss_m: Total meters descended
- max_elevation_m: Highest point on route
- min_elevation_m: Lowest point on route
- avg_grade_pct: Average slope percentage
- max_grade_pct: Steepest segment
- difficulty: "flat", "moderate", or "hilly"
"""
if not route or len(route) < 2:
return {
"elevation_gain_m": 0, "elevation_loss_m": 0,
"max_elevation_m": 0, "min_elevation_m": 0,
"avg_grade_pct": 0, "max_grade_pct": 0,
"difficulty": "flat"
}
elevations = []
grades = []
elevation_gain = 0
elevation_loss = 0
for i, node in enumerate(route):
elev = G.nodes[node].get("elevation", 0)
if elev is None:
elev = 0
elevations.append(elev)
if i > 0:
prev_elev = elevations[i-1]
diff = elev - prev_elev
if diff > 0:
elevation_gain += diff
else:
elevation_loss += abs(diff)
# Get grade from edge if available
prev_node = route[i-1]
edge_data = G.get_edge_data(prev_node, node)
if edge_data:
# MultiDiGraph returns dict of edges
first_edge = list(edge_data.values())[0] if isinstance(edge_data, dict) else edge_data
grade = abs(first_edge.get("grade", 0)) * 100 # Convert to percentage
grades.append(grade)
max_elev = max(elevations) if elevations else 0
min_elev = min(elevations) if elevations else 0
avg_grade = sum(grades) / len(grades) if grades else 0
max_grade = max(grades) if grades else 0
# Classify difficulty
if elevation_gain < 5 and max_grade < 3:
difficulty = "flat"
elif elevation_gain < 15 or max_grade < 8:
difficulty = "moderate"
else:
difficulty = "hilly"
return {
"elevation_gain_m": round(elevation_gain, 1),
"elevation_loss_m": round(elevation_loss, 1),
"max_elevation_m": round(max_elev, 1),
"min_elevation_m": round(min_elev, 1),
"avg_grade_pct": round(avg_grade, 1),
"max_grade_pct": round(max_grade, 1),
"difficulty": difficulty
}
def has_climate_data(G: nx.MultiDiGraph) -> bool:
"""Check if the graph has climate data on edges."""
sample_edge = next(iter(G.edges(data=True)), None)
if sample_edge and "climate_weight" in sample_edge[2]:
return True
return False
def compute_climate_metrics(G: nx.MultiDiGraph, route: list) -> dict:
"""
Compute climate risk metrics for a route.
Returns dict with:
- avg_flood_risk: Average flood risk along route (0-1)
- max_flood_risk: Maximum flood risk encountered
- avg_heat_risk: Average heat risk along route (0-1)
- max_heat_risk: Maximum heat risk encountered
- avg_climate_risk: Combined climate risk (0-1)
- flood_exposure_m: Meters of route in high flood risk areas (>0.3)
"""
if not route or len(route) < 2:
return {
"avg_flood_risk": 0, "max_flood_risk": 0,
"avg_heat_risk": 0, "max_heat_risk": 0,
"avg_climate_risk": 0, "flood_exposure_m": 0
}
flood_risks = []
heat_risks = []
climate_risks = []
flood_exposure = 0.0
for i in range(len(route) - 1):
u, v = route[i], route[i + 1]
edge_data = G.get_edge_data(u, v)
if not edge_data:
continue
# Get first edge (MultiDiGraph may have multiple)
if isinstance(edge_data, dict) and 0 in edge_data:
data = edge_data[0]
else:
data = next(iter(edge_data.values())) if isinstance(edge_data, dict) else edge_data
flood = float(data.get("flood_risk", 0) or 0)
heat = float(data.get("heat_risk", 0) or 0)
climate = float(data.get("climate_risk", 0) or 0)
length = float(data.get("length", 0) or 0)
flood_risks.append(flood)
heat_risks.append(heat)
climate_risks.append(climate)
if flood > 0.3:
flood_exposure += length
if not climate_risks:
return {
"avg_flood_risk": 0, "max_flood_risk": 0,
"avg_heat_risk": 0, "max_heat_risk": 0,
"avg_climate_risk": 0, "flood_exposure_m": 0
}
return {
"avg_flood_risk": round(sum(flood_risks) / len(flood_risks), 3),
"max_flood_risk": round(max(flood_risks), 3),
"avg_heat_risk": round(sum(heat_risks) / len(heat_risks), 3),
"max_heat_risk": round(max(heat_risks), 3),
"avg_climate_risk": round(sum(climate_risks) / len(climate_risks), 3),
"flood_exposure_m": round(flood_exposure, 1)
}
def apply_elevation_weights(G: nx.MultiDiGraph, penalty_factor: float = ELEVATION_PENALTY_FACTOR) -> nx.MultiDiGraph:
"""
Create a copy of the graph with edge weights adjusted for elevation.
Penalizes uphill segments to find routes that minimize climbing.
"""
G_weighted = G.copy()
for u, v, key, data in G_weighted.edges(keys=True, data=True):
base_length = data.get("length", 1)
# Get elevation change
elev_u = G_weighted.nodes[u].get("elevation", 0) or 0
elev_v = G_weighted.nodes[v].get("elevation", 0) or 0
elev_diff = elev_v - elev_u
# Only penalize uphill (positive elevation change)
if elev_diff > 0:
# Penalty proportional to climb: each meter of climb adds penalty_factor meters equivalent
penalty = elev_diff * penalty_factor
data["weighted_length"] = base_length + penalty
else:
data["weighted_length"] = base_length
return G_weighted
def list_resources(
resources_df: pd.DataFrame,
category: str = "",
resource_type: str = ""
) -> dict[str, Any]:
"""List available resources with optional filtering."""
if resources_df is None:
return {"error": "Resources not loaded"}
df = resources_df.copy()
# Ensure category and resource_type are strings (LLM might pass lists or other types)
if isinstance(category, list):
category = category[0] if category else ""
if isinstance(resource_type, list):
resource_type = resource_type[0] if resource_type else ""
category = str(category) if category else ""
resource_type = str(resource_type) if resource_type else ""
if category:
df = df[df["category"] == category]
if resource_type:
df = df[df["type"] == resource_type]
# Group by type
summary = df.groupby("type").agg({
"name": "count",
"category": "first"
}).rename(columns={"name": "count"}).to_dict("index")
# List of resources
resources = df[["name", "type", "category", "lat", "lon"]].to_dict("records")
return {
"total_count": len(df),
"by_type": summary,
"resources": resources[:20] # Limit to 20 for display
}
def find_nearest(
G: nx.MultiDiGraph,
resources_df: pd.DataFrame,
resource_type: str,
origin_lat: float = 40.6594,
origin_lon: float = -73.9126,
prefer_flat: bool = True,
prefer_safe: bool = True
) -> tuple[dict[str, Any], dict | None]:
"""
Find the nearest resource of a given type with climate and elevation-aware routing.
Args:
G: Walking network graph
resources_df: DataFrame of resources
resource_type: Type of resource to find
origin_lat: Origin latitude
origin_lon: Origin longitude
prefer_flat: If True, prefer routes with less elevation gain
prefer_safe: If True and climate data available, prefer climate-safe routes
"""
if resources_df is None:
return {"error": "Resources not loaded"}, None
# Ensure resource_type is a string
if isinstance(resource_type, list):
resource_type = resource_type[0] if resource_type else ""
resource_type = str(resource_type) if resource_type else ""
if not resource_type:
return {"error": "resource_type is required"}, None
# Filter by type
candidates = resources_df[resources_df["type"] == resource_type]
if len(candidates) == 0:
return {"error": f"No resources of type '{resource_type}' found"}, None
# Get origin node
try:
origin_node = get_nearest_node(G, origin_lat, origin_lon)
except Exception as e:
return {"error": f"Could not find origin on network: {e}"}, None
# Choose routing strategy based on climate data availability
graph_has_climate = has_climate_data(G)
if prefer_safe and graph_has_climate:
# Use climate-aware routing (uses pre-computed climate_weight)
G_routing = G
weight_key = "climate_weight"
elif prefer_flat:
# Fall back to elevation-aware routing
G_routing = apply_elevation_weights(G)
weight_key = "weighted_length"
else:
G_routing = G
weight_key = "length"
# Find nearest
best_resource = None
best_distance = float("inf")
best_route = None
best_actual_distance = 0
for _, row in candidates.iterrows():
try:
dest_node = get_nearest_node(G, row["lat"], row["lon"])
# Use weighted distance for comparison
weighted_distance = nx.shortest_path_length(G_routing, origin_node, dest_node, weight=weight_key)
if weighted_distance < best_distance:
best_distance = weighted_distance
best_resource = row.to_dict()
best_route = nx.shortest_path(G_routing, origin_node, dest_node, weight=weight_key)
# Calculate actual distance (unweighted)
best_actual_distance = nx.shortest_path_length(G, origin_node, dest_node, weight="length")
except nx.NetworkXNoPath:
continue
except Exception:
continue
if best_resource is None:
return {"error": f"Could not find a route to any {resource_type}"}, None
walk_time = best_actual_distance / WALK_SPEED_M_PER_MIN
# Compute route metrics
route_metrics = compute_route_metrics(G, best_route)
# Build map data (convert projected coords to lat/lon)
route_coords = get_route_coords(G, best_route)
map_data = {
"route_coords": route_coords,
"origin": [origin_lat, origin_lon],
"destination": [best_resource["lat"], best_resource["lon"]],
"dest_name": best_resource["name"],
"distance": best_actual_distance
}
result = {
"found": True,
"name": best_resource["name"],
"type": best_resource["type"],
"category": best_resource["category"],
"lat": best_resource["lat"],
"lon": best_resource["lon"],
"distance_meters": round(best_actual_distance, 1),
"walking_time_minutes": round(walk_time, 1),
"origin": {"lat": origin_lat, "lon": origin_lon},
"route_metrics": route_metrics
}
# Add climate metrics if available
if graph_has_climate:
result["climate_metrics"] = compute_climate_metrics(G, best_route)
result["climate_aware"] = prefer_safe
return result, map_data
def compute_single_route(
G: nx.MultiDiGraph,
origin_node: int,
dest_node: int,
weight_key: str = "length"
) -> dict | None:
"""Compute a single route and its metrics."""
try:
route = nx.shortest_path(G, origin_node, dest_node, weight=weight_key)
# Always compute actual distance using length
distance = sum(
G[u][v][0].get("length", 0) for u, v in zip(route[:-1], route[1:])
)
walk_time = distance / WALK_SPEED_M_PER_MIN
route_metrics = compute_route_metrics(G, route)
route_coords = get_route_coords(G, route)
return {
"route": route,
"coords": route_coords,
"distance_m": round(distance, 1),
"time_min": round(walk_time, 1),
"metrics": route_metrics
}
except nx.NetworkXNoPath:
return None
except Exception:
return None
def compute_alternative_routes(
G: nx.MultiDiGraph,
origin_lat: float,
origin_lon: float,
dest_lat: float,
dest_lon: float
) -> list[dict]:
"""
Compute multiple route alternatives with different optimization criteria.
Returns list of route options:
- shortest: Minimum distance
- flattest: Minimum elevation gain (penalizes uphill)
- balanced: Compromise between distance and elevation
- safest: Minimum climate risk (flood + heat) if climate data available
"""
origin_node = get_nearest_node(G, origin_lat, origin_lon)
dest_node = get_nearest_node(G, dest_lat, dest_lon)
routes = []
graph_has_climate = has_climate_data(G)
# 1. Shortest route (distance only)
shortest = compute_single_route(G, origin_node, dest_node, weight_key="length")
if shortest:
shortest["name"] = "shortest"
shortest["label"] = "Shortest"
shortest["color"] = "#3b82f6" # Blue
if graph_has_climate:
shortest["climate_metrics"] = compute_climate_metrics(G, shortest["route"])
routes.append(shortest)
# 2. Flattest route (heavy elevation penalty)
G_flat = apply_elevation_weights(G, penalty_factor=5.0)
flattest = compute_single_route(G_flat, origin_node, dest_node, weight_key="weighted_length")
if flattest:
flattest["name"] = "flattest"
flattest["label"] = "Flattest"
flattest["color"] = "#22c55e" # Green
if graph_has_climate:
flattest["climate_metrics"] = compute_climate_metrics(G, flattest["route"])
# Check if it's actually different from shortest
if not shortest or flattest["coords"] != shortest["coords"]:
routes.append(flattest)
# 3. Balanced route (moderate elevation penalty)
G_balanced = apply_elevation_weights(G, penalty_factor=2.0)
balanced = compute_single_route(G_balanced, origin_node, dest_node, weight_key="weighted_length")
if balanced:
balanced["name"] = "balanced"
balanced["label"] = "Balanced"
balanced["color"] = "#f59e0b" # Amber
if graph_has_climate:
balanced["climate_metrics"] = compute_climate_metrics(G, balanced["route"])
# Check if it's different from both shortest and flattest
existing_coords = [r["coords"] for r in routes]
if balanced["coords"] not in existing_coords:
routes.append(balanced)
# 4. Safest route (climate-aware) - only if climate data available
if graph_has_climate:
safest = compute_single_route(G, origin_node, dest_node, weight_key="climate_weight")
if safest:
safest["name"] = "safest"
safest["label"] = "Safest (Climate)"
safest["color"] = "#10b981" # Emerald
safest["climate_metrics"] = compute_climate_metrics(G, safest["route"])
# Check if it's different from existing routes
existing_coords = [r["coords"] for r in routes]
if safest["coords"] not in existing_coords:
routes.append(safest)
return routes
def calculate_route(
G: nx.MultiDiGraph,
origin_lat: float,
origin_lon: float,
dest_lat: float,
dest_lon: float,
dest_name: str = "Destination",
prefer_flat: bool = True,
prefer_safe: bool = True
) -> tuple[dict[str, Any], dict | None]:
"""
Calculate multiple walking routes between two points with different criteria.
Args:
G: Walking network graph
origin_lat, origin_lon: Origin coordinates
dest_lat, dest_lon: Destination coordinates
dest_name: Name of destination
prefer_flat: Prefer routes with less elevation gain
prefer_safe: If True and climate data available, recommend safest route
"""
try:
# Compute all route alternatives
alternatives = compute_alternative_routes(G, origin_lat, origin_lon, dest_lat, dest_lon)
if not alternatives:
return {"error": "No path found between origin and destination"}, None
# Check if climate data is available
graph_has_climate = has_climate_data(G)
# Pick recommended route based on preferences
if prefer_safe and graph_has_climate:
recommended = next((r for r in alternatives if r["name"] == "safest"), alternatives[0])
elif prefer_flat:
recommended = next((r for r in alternatives if r["name"] == "flattest"), alternatives[0])
else:
recommended = next((r for r in alternatives if r["name"] == "shortest"), alternatives[0])
# Build map data with all routes
map_data = {
"routes": [
{
"coords": r["coords"],
"color": r["color"],
"label": r["label"],
"name": r["name"]
}
for r in alternatives
],
"origin": [origin_lat, origin_lon],
"destination": [dest_lat, dest_lon],
"dest_name": dest_name,
"distance": recommended["distance_m"],
# Keep route_coords for backwards compatibility
"route_coords": recommended["coords"]
}
# Build alternatives with climate metrics if available
alt_list = []
for r in alternatives:
alt_info = {
"name": r["name"],
"label": r["label"],
"distance_meters": r["distance_m"],
"walking_time_minutes": r["time_min"],
"route_metrics": r["metrics"]
}
if graph_has_climate and "climate_metrics" in r:
alt_info["climate_metrics"] = r["climate_metrics"]
alt_list.append(alt_info)
# Build result
result = {
"success": True,
"recommended": recommended["name"],
"alternatives": alt_list,
"distance_meters": recommended["distance_m"],
"walking_time_minutes": recommended["time_min"],
"route_points": len(recommended["route"]),
"origin": {"lat": origin_lat, "lon": origin_lon},
"destination": {"lat": dest_lat, "lon": dest_lon, "name": dest_name},
"route_metrics": recommended["metrics"]
}
# Add climate metrics to result if available
if graph_has_climate:
result["climate_aware"] = True
if "climate_metrics" in recommended:
result["climate_metrics"] = recommended["climate_metrics"]
return result, map_data
except nx.NetworkXNoPath:
return {"error": "No path found between origin and destination"}, None
except Exception as e:
return {"error": f"Route calculation failed: {e}"}, None
def _safe_str(val, default: str = "") -> str:
"""Safely convert a value to string, handling lists."""
if val is None:
return default
if isinstance(val, list):
return str(val[0]) if val else default
return str(val)
def _safe_float(val, default: float) -> float:
"""Safely convert a value to float."""
if val is None:
return default
if isinstance(val, list):
val = val[0] if val else default
try:
return float(val)
except (ValueError, TypeError):
return default
def execute_tool(
tool_name: str,
args: dict,
G: nx.MultiDiGraph,
resources_df: pd.DataFrame
) -> tuple[dict[str, Any], dict | None]:
"""Execute a tool by name with given arguments."""
if tool_name == "list_resources":
result = list_resources(
resources_df,
category=_safe_str(args.get("category"), ""),
resource_type=_safe_str(args.get("resource_type"), "")
)
return result, None
elif tool_name == "find_nearest":
# Accept both "lat"/"lon" (from system prompt) and "origin_lat"/"origin_lon"
lat = args.get("lat") or args.get("origin_lat")
lon = args.get("lon") or args.get("origin_lon")
return find_nearest(
G,
resources_df,
resource_type=_safe_str(args.get("resource_type"), ""),
origin_lat=_safe_float(lat, BROWNSVILLE_CENTER["lat"]),
origin_lon=_safe_float(lon, BROWNSVILLE_CENTER["lon"])
)
elif tool_name == "calculate_route":
return calculate_route(
G,
origin_lat=_safe_float(args.get("start_lat") or args.get("origin_lat"), BROWNSVILLE_CENTER["lat"]),
origin_lon=_safe_float(args.get("start_lon") or args.get("origin_lon"), BROWNSVILLE_CENTER["lon"]),
dest_lat=_safe_float(args.get("end_lat") or args.get("dest_lat"), BROWNSVILLE_CENTER["lat"]),
dest_lon=_safe_float(args.get("end_lon") or args.get("dest_lon"), BROWNSVILLE_CENTER["lon"]),
dest_name=_safe_str(args.get("dest_name"), "Destination")
)
else:
return {"error": f"Unknown tool: {tool_name}"}, None