Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import folium | |
| import numpy as np | |
| from typing import List, Tuple, Dict, Optional | |
| import time | |
| from collections import deque | |
| import requests | |
| import polyline | |
| from geopy.geocoders import Nominatim | |
| # Utility Functions | |
| def create_numbered_marker(number: int) -> folium.features.DivIcon: | |
| """Create a numbered marker icon for the map""" | |
| return folium.features.DivIcon( | |
| html=f'<div style="background-color: white; border-radius: 50%; width: 25px; height: 25px; ' | |
| f'display: flex; align-items: center; justify-content: center; border: 2px solid blue; ' | |
| f'font-weight: bold;">{number}</div>' | |
| ) | |
| def get_place_coordinates(place_name: str) -> Optional[Tuple[float, float]]: | |
| """Get coordinates for a place using Nominatim geocoding""" | |
| try: | |
| geolocator = Nominatim(user_agent="travel_optimizer") | |
| location = geolocator.geocode(place_name) | |
| if location: | |
| return (location.latitude, location.longitude) | |
| except Exception as e: | |
| st.error(f"Error geocoding {place_name}: {str(e)}") | |
| return None | |
| def format_instructions(steps: List[Dict]) -> List[str]: | |
| """Format OSRM route steps into readable instructions""" | |
| instructions = [] | |
| for step in steps: | |
| if 'maneuver' in step: | |
| instruction = step.get('maneuver', {}).get('instruction', '') | |
| if instruction: | |
| # Add distance information if available | |
| if 'distance' in step: | |
| distance_km = step['distance'] / 1000 | |
| if distance_km >= 1: | |
| instruction += f" ({distance_km:.1f} km)" | |
| else: | |
| instruction += f" ({step['distance']:.0f} m)" | |
| instructions.append(instruction) | |
| return instructions | |
| def get_route_from_osrm(coord1: Tuple[float, float], coord2: Tuple[float, float]) -> Optional[Dict]: | |
| """Get route information from OSRM""" | |
| url = f"http://router.project-osrm.org/route/v1/driving/{coord1[1]},{coord1[0]};{coord2[1]},{coord2[0]}" | |
| params = { | |
| "overview": "full", | |
| "geometries": "polyline", | |
| "steps": "true", | |
| "annotations": "true" | |
| } | |
| # Add rate limiting | |
| time.sleep(1) # Respect OSRM usage guidelines | |
| try: | |
| response = requests.get(url, params=params) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if data["code"] == "Ok" and len(data["routes"]) > 0: | |
| route = data["routes"][0] | |
| return { | |
| 'distance': route['distance'] / 1000, # Convert to km | |
| 'duration': route['duration'] / 60, # Convert to minutes | |
| 'geometry': route['geometry'], | |
| 'steps': route['legs'][0]['steps'] | |
| } | |
| except Exception as e: | |
| st.warning(f"Error getting route: {str(e)}") | |
| return None | |
| def calculate_distance_matrix(places: List[Tuple[str, Tuple[float, float]]]) -> Tuple[np.ndarray, Dict]: | |
| """Calculate distance matrix and routing information using OSRM""" | |
| n = len(places) | |
| distances = np.zeros((n, n)) | |
| routing_info = {} | |
| progress_bar = st.progress(0) | |
| total_calcs = n * (n-1) | |
| current_calc = 0 | |
| for i in range(n): | |
| for j in range(n): | |
| if i != j: | |
| origin = places[i][1] | |
| destination = places[j][1] | |
| route_data = get_route_from_osrm(origin, destination) | |
| if route_data: | |
| distances[i,j] = route_data['distance'] | |
| routing_info[(i,j)] = { | |
| 'distance_km': f"{route_data['distance']:.1f} km", | |
| 'duration_mins': f"{route_data['duration']:.0f} mins", | |
| 'geometry': route_data['geometry'], | |
| 'steps': route_data['steps'] | |
| } | |
| current_calc += 1 | |
| progress_bar.progress(current_calc / total_calcs) | |
| return distances, routing_info | |
| def dijkstra(distances: np.ndarray, start_idx: int) -> Tuple[List[int], Dict[int, float]]: | |
| """Implement Dijkstra's algorithm to find the optimal route""" | |
| n = len(distances) | |
| visited = [False] * n | |
| distances_to = [float('inf')] * n | |
| prev_node = [-1] * n | |
| distances_to[start_idx] = 0 | |
| queue = deque([start_idx]) | |
| while queue: | |
| current_node = queue.popleft() | |
| visited[current_node] = True | |
| for neighbor in range(n): | |
| if not visited[neighbor] and distances[current_node, neighbor] > 0: | |
| new_distance = distances_to[current_node] + distances[current_node, neighbor] | |
| if new_distance < distances_to[neighbor]: | |
| distances_to[neighbor] = new_distance | |
| prev_node[neighbor] = current_node | |
| queue.append(neighbor) | |
| # Reconstruct the optimal route | |
| optimal_order = [] | |
| node = n - 1 | |
| while prev_node[node] != -1: | |
| optimal_order.insert(0, node) | |
| node = prev_node[node] | |
| optimal_order.insert(0, start_idx) | |
| return optimal_order, distances_to | |
| def add_markers_and_route(m: folium.Map, places: List[Tuple[str, Tuple[float, float]]], optimal_order: List[int], routing_info: Dict): | |
| """Add markers and route lines to the map""" | |
| total_distance = 0 | |
| total_duration = 0 | |
| for i in range(len(optimal_order)): | |
| current_idx = optimal_order[i] | |
| next_idx = optimal_order[(i + 1) % len(optimal_order)] | |
| current_place, (lat, lon) = places[current_idx] | |
| segment_info = routing_info.get((current_idx, next_idx), {}) | |
| # Add marker | |
| popup_content = f""" | |
| <b>Stop {i+1}: {current_place}</b><br> | |
| """ | |
| if i < len(optimal_order) - 1: | |
| popup_content += f""" | |
| To next stop:<br> | |
| Distance: {segment_info.get('distance_km', 'N/A')}<br> | |
| Duration: {segment_info.get('duration_mins', 'N/A')} | |
| """ | |
| folium.Marker( | |
| [lat, lon], | |
| popup=folium.Popup(popup_content, max_width=300), | |
| icon=create_numbered_marker(i+1) | |
| ).add_to(m) | |
| # Draw route line | |
| if i < len(optimal_order) - 1: | |
| if 'geometry' in segment_info: | |
| try: | |
| route_coords = polyline.decode(segment_info['geometry']) | |
| folium.PolyLine( | |
| route_coords, | |
| weight=2, | |
| color='blue', | |
| opacity=0.8 | |
| ).add_to(m) | |
| except Exception as e: | |
| st.warning(f"Error drawing route: {str(e)}") | |
| next_place = places[next_idx][1] | |
| folium.PolyLine( | |
| [[lat, lon], [next_place[0], next_place[1]]], | |
| weight=2, | |
| color='red', | |
| opacity=0.8 | |
| ).add_to(m) | |
| # Add to totals | |
| if 'distance_km' in segment_info: | |
| total_distance += float(segment_info['distance_km'].split()[0]) | |
| if 'duration_mins' in segment_info: | |
| total_duration += float(segment_info['duration_mins'].split()[0]) | |
| return total_distance, total_duration | |
| def display_itinerary_summary(places: List[Tuple[str, Tuple[float, float]]], optimal_order: List[int], routing_info: Dict): | |
| """Display the optimized itinerary summary""" | |
| # Display summary first | |
| st.markdown("### π Summary") | |
| total_distance, total_duration = add_markers_and_route(folium.Map(), places, optimal_order, routing_info) | |
| st.info(f""" | |
| π£οΈ Total distance: **{total_distance:.1f} km** | |
| β±οΈ Total duration: **{total_duration:.0f} mins** ({total_duration/60:.1f} hours) | |
| """) | |
| st.markdown("### πΊοΈ Route Details") | |
| for i in range(len(optimal_order)): | |
| current_idx = optimal_order[i] | |
| next_idx = optimal_order[(i + 1) % len(optimal_order)] | |
| current_place = places[current_idx][0] | |
| segment_info = routing_info.get((current_idx, next_idx), {}) | |
| st.markdown(f"**Stop {i+1}: {current_place}**") | |
| if i < len(optimal_order) - 1: | |
| st.markdown(f"β *{segment_info.get('distance_km', 'N/A')}, " | |
| f"Duration: {segment_info.get('duration_mins', 'N/A')}*") | |
| if 'steps' in segment_info: | |
| with st.expander("π Turn-by-turn directions"): | |
| instructions = format_instructions(segment_info['steps']) | |
| for idx, instruction in enumerate(instructions, 1): | |
| st.write(f"{idx}. {instruction}") | |
| def main(): | |
| st.set_page_config(page_title="AI Travel Route Optimizer", layout="wide") | |
| st.title("π AI Travel Route Optimizer") | |
| st.markdown(""" | |
| This app helps you find the optimal route between multiple destinations using real driving distances. | |
| Enter your destinations in the sidebar and get a customized route with turn-by-turn directions. | |
| """) | |
| # Sidebar for inputs | |
| with st.sidebar: | |
| st.header("π Destinations") | |
| num_places = st.number_input("Number of destinations", min_value=2, max_value=10, value=3) | |
| places = [] | |
| for i in range(num_places): | |
| place = st.text_input(f"Destination {i+1}") | |
| if place: | |
| with st.spinner(f"Finding coordinates for {place}..."): | |
| coordinates = get_place_coordinates(place) | |
| if coordinates: | |
| places.append((place, coordinates)) | |
| st.success(f"β Found coordinates for {place}") | |
| else: | |
| st.error(f"β Couldn't find coordinates for {place}") | |
| # Main content area | |
| if len(places) >= 2: | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| with st.spinner("π Calculating optimal route..."): | |
| # Calculate distance matrix | |
| distances, routing_info = calculate_distance_matrix(places) | |
| # Get optimized route order using Dijkstra's algorithm | |
| optimal_order, _ = dijkstra(distances, 0) | |
| # Update the route if the user changes the input | |
| if st.button("Recalculate Route"): | |
| distances, routing_info = calculate_distance_matrix(places) | |
| optimal_order, _ = dijkstra(distances, 0) | |
| # Create map and display | |
| m = folium.Map() | |
| total_distance, total_duration = add_markers_and_route(m, places, optimal_order, routing_info) | |
| st.components.v1.html(m._repr_html_(), height=600) | |
| with col2: | |
| display_itinerary_summary(places, optimal_order, routing_info) | |
| else: | |
| st.info("π Please enter at least 2 destinations in the sidebar to get started") | |
| if __name__ == "__main__": | |
| main() |