Spaces:
Sleeping
Sleeping
| """ | |
| Real-time Fleet Resource Optimization with AI Agents | |
| Enhanced version with live API integration for traffic, weather, and AI decision making | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| import requests | |
| import json | |
| import time | |
| from datetime import datetime, timedelta | |
| import plotly.graph_objs as go | |
| import plotly.express as px | |
| from plotly.subplots import make_subplots | |
| import gradio as gr | |
| from dataclasses import dataclass, asdict | |
| from typing import List, Dict, Tuple, Optional | |
| import threading | |
| import queue | |
| import random | |
| import logging | |
| from concurrent.futures import ThreadPoolExecutor | |
| import asyncio | |
| # Import our real-time API client and location manager | |
| from realtime_api_client import RealTimeAPIClient, TrafficData, WeatherData, RouteData | |
| from location_config import LocationManager, initialize_location_manager | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Configuration | |
| class FleetConfig: | |
| def __init__(self): | |
| self.num_vehicles = 50 | |
| self.vehicle_capacity = 4 | |
| self.max_distance = 100 # km | |
| self.base_cost_per_km = 0.5 | |
| self.weather_impact = { | |
| 'clear': 1.0, | |
| 'clouds': 1.1, | |
| 'rain': 1.3, | |
| 'snow': 1.6, | |
| 'storm': 2.0, | |
| 'mist': 1.2, | |
| 'fog': 1.4 | |
| } | |
| self.traffic_impact = { | |
| 'low': 1.0, | |
| 'medium': 1.3, | |
| 'high': 1.8, | |
| 'severe': 2.5 | |
| } | |
| self.ai_optimization_enabled = True | |
| self.real_time_data_enabled = True | |
| self.update_interval = 30 # seconds | |
| class Vehicle: | |
| id: int | |
| location: Tuple[float, float] # lat, lng | |
| status: str # 'available', 'busy', 'maintenance' | |
| capacity: int | |
| current_load: int | |
| total_distance: float | |
| earnings: float | |
| last_update: datetime | |
| current_trip: Optional[int] = None # demand ID if on trip | |
| battery_level: float = 100.0 # for electric vehicles | |
| maintenance_due: bool = False | |
| class Demand: | |
| id: int | |
| pickup_location: Tuple[float, float] | |
| dropoff_location: Tuple[float, float] | |
| passengers: int | |
| priority: int # 1-5, 5 being highest | |
| timestamp: datetime | |
| status: str # 'pending', 'assigned', 'completed', 'cancelled' | |
| estimated_wait_time: Optional[int] = None # minutes | |
| assigned_vehicle: Optional[int] = None | |
| ai_suggestion: Optional[str] = None | |
| class OptimizationMetrics: | |
| total_earnings: float | |
| total_distance: float | |
| vehicle_utilization: float | |
| average_response_time: float | |
| demand_satisfaction_rate: float | |
| cost_efficiency: float | |
| ai_optimization_impact: float | |
| timestamp: datetime | |
| class RealTimeFleetOptimizer: | |
| def __init__(self, location_identifier: str = 'new_york'): | |
| self.config = FleetConfig() | |
| self.vehicles = [] | |
| self.demands = [] | |
| self.weather_data = {} | |
| self.traffic_data = {} | |
| self.route_data = {} | |
| self.simulation_running = False | |
| self.data_queue = queue.Queue() | |
| self.metrics_history = [] | |
| # Initialize location manager | |
| self.location_manager = initialize_location_manager("AIzaSyBTA3eACtpCPR9DDi8EhOt1cI7Cy08Mkfg") | |
| self.set_location(location_identifier) | |
| # Initialize API client with location manager | |
| self.api_client = RealTimeAPIClient(self.location_manager) | |
| # Initialize vehicles | |
| self._initialize_vehicles() | |
| # Simulation parameters | |
| self.simulation_time = datetime.now() | |
| self.time_step = 60 # seconds | |
| self.last_api_update = datetime.now() | |
| # Performance tracking | |
| self.performance_stats = { | |
| 'total_api_calls': 0, | |
| 'successful_assignments': 0, | |
| 'failed_assignments': 0, | |
| 'ai_suggestions_generated': 0, | |
| 'average_optimization_time': 0.0 | |
| } | |
| def set_location(self, location_identifier: str): | |
| """Set the current location for the fleet""" | |
| if self.location_manager.set_location(location_identifier): | |
| logger.info(f"Location set to: {self.location_manager.current_location.name}") | |
| # Reinitialize vehicles for new location | |
| self.vehicles = [] | |
| self._initialize_vehicles() | |
| else: | |
| logger.error(f"Failed to set location: {location_identifier}") | |
| def create_custom_location(self, name: str, center_lat: float, center_lng: float, | |
| bounds: Dict[str, float], hotspots: List[Dict] = None): | |
| """Create and set a custom location""" | |
| location = self.location_manager.create_custom_location(name, center_lat, center_lng, bounds, hotspots) | |
| if location: | |
| # Reinitialize vehicles for new location | |
| self.vehicles = [] | |
| self._initialize_vehicles() | |
| logger.info(f"Custom location created and set: {name}") | |
| return location | |
| def _initialize_vehicles(self): | |
| """Initialize fleet vehicles with random locations in the current area""" | |
| if not self.location_manager or not self.location_manager.current_location: | |
| logger.error("No location set. Cannot initialize vehicles.") | |
| return | |
| location = self.location_manager.current_location | |
| hotspots = location.hotspots | |
| for i in range(self.config.num_vehicles): | |
| # Distribute vehicles around current location hotspots | |
| base_location = random.choice(hotspots) | |
| location_coords = ( | |
| base_location['lat'] + random.uniform(-0.01, 0.01), | |
| base_location['lng'] + random.uniform(-0.01, 0.01) | |
| ) | |
| vehicle = Vehicle( | |
| id=i, | |
| location=location_coords, | |
| status='available', | |
| capacity=self.config.vehicle_capacity, | |
| current_load=0, | |
| total_distance=0.0, | |
| earnings=0.0, | |
| last_update=datetime.now(), | |
| battery_level=random.uniform(80, 100) | |
| ) | |
| self.vehicles.append(vehicle) | |
| logger.info(f"Initialized {len(self.vehicles)} vehicles in {location.name}") | |
| def generate_realistic_demand(self): | |
| """Generate realistic demand patterns based on time and location""" | |
| if not self.location_manager or not self.location_manager.current_location: | |
| logger.error("No location set. Cannot generate demand.") | |
| return | |
| location = self.location_manager.current_location | |
| hotspots = location.hotspots | |
| hour = self.simulation_time.hour | |
| num_demands = 0 | |
| # Generate demands for each hotspot | |
| for hotspot in hotspots: | |
| base_rate = hotspot['base_rate'] | |
| # Adjust rate based on peak hours | |
| if hour in hotspot['peak_hours']: | |
| base_rate *= 1.5 | |
| # Weekend vs weekday | |
| if self.simulation_time.weekday() >= 5: # Weekend | |
| base_rate *= 0.7 | |
| # Generate demand | |
| if random.random() < base_rate: | |
| # Create realistic dropoff location | |
| dropoff = ( | |
| hotspot['lat'] + random.uniform(-0.02, 0.02), | |
| hotspot['lng'] + random.uniform(-0.02, 0.02) | |
| ) | |
| # Determine priority based on time and location | |
| priority = 3 # default | |
| if hour in [7, 8, 17, 18]: # Rush hour | |
| priority = random.choices([4, 5], weights=[0.7, 0.3])[0] | |
| elif hour in [22, 23, 0, 1]: # Late night | |
| priority = random.choices([1, 2, 3], weights=[0.3, 0.4, 0.3])[0] | |
| demand = Demand( | |
| id=len(self.demands), | |
| pickup_location=(hotspot['lat'], hotspot['lng']), | |
| dropoff_location=dropoff, | |
| passengers=random.choices([1, 2, 3, 4], weights=[0.4, 0.3, 0.2, 0.1])[0], | |
| priority=priority, | |
| timestamp=self.simulation_time, | |
| status='pending' | |
| ) | |
| self.demands.append(demand) | |
| num_demands += 1 | |
| logger.info(f"Generated {num_demands} new demands at {hour}:00 in {location.name}") | |
| def update_real_time_data(self): | |
| """Update real-time traffic, weather, and route data""" | |
| if not self.config.real_time_data_enabled: | |
| return | |
| current_time = datetime.now() | |
| if (current_time - self.last_api_update).seconds < self.config.update_interval: | |
| return | |
| logger.info("Updating real-time data...") | |
| start_time = time.time() | |
| # Get unique locations for vehicles and demands | |
| vehicle_locations = [v.location for v in self.vehicles] | |
| demand_locations = [d.pickup_location for d in self.demands if d.status == 'pending'] | |
| all_locations = list(set(vehicle_locations + demand_locations)) | |
| # Update traffic data | |
| self.traffic_data = self.api_client.get_batch_traffic_data(all_locations) | |
| self.performance_stats['total_api_calls'] += len(all_locations) | |
| # Update weather data | |
| self.weather_data = self.api_client.get_batch_weather_data(all_locations) | |
| self.performance_stats['total_api_calls'] += len(all_locations) | |
| # Update route data for pending demands | |
| for demand in [d for d in self.demands if d.status == 'pending']: | |
| for vehicle in [v for v in self.vehicles if v.status == 'available']: | |
| route_key = (vehicle.location, demand.pickup_location) | |
| if route_key not in self.route_data: | |
| route = self.api_client.get_route_data(vehicle.location, demand.pickup_location) | |
| if route: | |
| self.route_data[route_key] = route | |
| self.performance_stats['total_api_calls'] += 1 | |
| self.last_api_update = current_time | |
| update_time = time.time() - start_time | |
| logger.info(f"Real-time data updated in {update_time:.2f}s") | |
| def calculate_enhanced_cost(self, vehicle_id: int, pickup_loc: Tuple[float, float], dropoff_loc: Tuple[float, float]) -> Tuple[float, float, Dict]: | |
| """Calculate enhanced cost considering real-time data""" | |
| # Get route data | |
| route_key = (self.vehicles[vehicle_id].location, pickup_loc) | |
| route = self.route_data.get(route_key) | |
| if route: | |
| distance = route.distance / 1000 # Convert to km | |
| base_duration = route.duration / 60 # Convert to minutes | |
| else: | |
| # Fallback to simple distance calculation | |
| distance = np.sqrt((pickup_loc[0] - self.vehicles[vehicle_id].location[0])**2 + | |
| (pickup_loc[1] - self.vehicles[vehicle_id].location[1])**2) * 111 | |
| base_duration = distance * 2 # Rough estimate | |
| # Get real-time factors | |
| weather = self.weather_data.get(pickup_loc) | |
| traffic = self.traffic_data.get(pickup_loc) | |
| weather_multiplier = 1.0 | |
| traffic_multiplier = 1.0 | |
| if weather: | |
| weather_multiplier = self.config.weather_impact.get(weather.condition, 1.0) | |
| if traffic: | |
| traffic_multiplier = self.config.traffic_impact.get(traffic.congestion_level, 1.0) | |
| # Calculate total cost | |
| base_cost = distance * self.config.base_cost_per_km | |
| total_cost = base_cost * weather_multiplier * traffic_multiplier | |
| # Add time-based costs | |
| time_cost = base_duration * 0.1 # $0.10 per minute | |
| total_cost += time_cost | |
| cost_breakdown = { | |
| 'base_cost': base_cost, | |
| 'weather_multiplier': weather_multiplier, | |
| 'traffic_multiplier': traffic_multiplier, | |
| 'time_cost': time_cost, | |
| 'total_cost': total_cost, | |
| 'distance': distance, | |
| 'duration': base_duration | |
| } | |
| return total_cost, distance, cost_breakdown | |
| def get_ai_optimization_suggestion(self, pending_demands: List[Demand], available_vehicles: List[Vehicle]) -> str: | |
| """Get AI-powered optimization suggestions""" | |
| if not self.config.ai_optimization_enabled: | |
| return "AI optimization disabled" | |
| try: | |
| # Prepare context for AI | |
| context = f""" | |
| Fleet Optimization Scenario - {datetime.now().strftime('%H:%M:%S')}: | |
| Current Fleet Status: | |
| - Total Vehicles: {len(self.vehicles)} | |
| - Available: {len(available_vehicles)} | |
| - Busy: {len([v for v in self.vehicles if v.status == 'busy'])} | |
| - Maintenance: {len([v for v in self.vehicles if v.status == 'maintenance'])} | |
| Current Demand: | |
| - Pending Demands: {len(pending_demands)} | |
| - High Priority (4-5): {len([d for d in pending_demands if d.priority >= 4])} | |
| - Medium Priority (2-3): {len([d for d in pending_demands if 2 <= d.priority <= 3])} | |
| - Low Priority (1): {len([d for d in pending_demands if d.priority == 1])} | |
| Current Conditions: | |
| - Time: {self.simulation_time.strftime('%H:%M')} | |
| - Weather: {list(self.weather_data.values())[0].condition if self.weather_data else 'Unknown'} | |
| - Traffic: {list(self.traffic_data.values())[0].congestion_level if self.traffic_data else 'Unknown'} | |
| Performance Metrics: | |
| - Total Earnings: ${sum(v.earnings for v in self.vehicles):.2f} | |
| - Vehicle Utilization: {len([v for v in self.vehicles if v.status == 'busy']) / len(self.vehicles) * 100:.1f}% | |
| - Average Response Time: {self.performance_stats.get('average_response_time', 0):.1f} minutes | |
| Provide specific optimization recommendations for: | |
| 1. Vehicle allocation strategy | |
| 2. Priority handling | |
| 3. Route optimization | |
| 4. Performance improvements | |
| """ | |
| suggestion = self.api_client.get_ai_optimization_suggestion( | |
| self.vehicles, self.demands, self.traffic_data, self.weather_data | |
| ) | |
| self.performance_stats['ai_suggestions_generated'] += 1 | |
| return suggestion | |
| except Exception as e: | |
| logger.error(f"Error generating AI suggestion: {e}") | |
| return f"AI optimization error: {str(e)}" | |
| def optimize_vehicle_allocation_ai(self): | |
| """AI-enhanced vehicle allocation optimization""" | |
| pending_demands = [d for d in self.demands if d.status == 'pending'] | |
| available_vehicles = [v for v in self.vehicles if v.status == 'available'] | |
| if not pending_demands or not available_vehicles: | |
| return | |
| # Get AI suggestion | |
| ai_suggestion = self.get_ai_optimization_suggestion(pending_demands, available_vehicles) | |
| # Create enhanced cost matrix | |
| cost_matrix = [] | |
| assignment_details = [] | |
| for vehicle in available_vehicles: | |
| vehicle_costs = [] | |
| vehicle_details = [] | |
| for demand in pending_demands: | |
| cost, distance, breakdown = self.calculate_enhanced_cost( | |
| vehicle.id, vehicle.location, demand.pickup_location | |
| ) | |
| # Add priority penalty | |
| priority_penalty = (6 - demand.priority) * 5 | |
| # Add capacity check | |
| if vehicle.current_load + demand.passengers > vehicle.capacity: | |
| total_cost = float('inf') | |
| else: | |
| total_cost = cost + priority_penalty | |
| vehicle_costs.append(total_cost) | |
| vehicle_details.append({ | |
| 'cost': cost, | |
| 'distance': distance, | |
| 'priority_penalty': priority_penalty, | |
| 'breakdown': breakdown | |
| }) | |
| cost_matrix.append(vehicle_costs) | |
| assignment_details.append(vehicle_details) | |
| # Enhanced assignment algorithm | |
| assignments = [] | |
| used_vehicles = set() | |
| used_demands = set() | |
| # Sort demands by priority and timestamp | |
| sorted_demands = sorted(pending_demands, key=lambda x: (x.priority, -x.timestamp.timestamp()), reverse=True) | |
| for demand in sorted_demands: | |
| best_vehicle = None | |
| best_cost = float('inf') | |
| best_details = None | |
| for i, vehicle in enumerate(available_vehicles): | |
| if i in used_vehicles: | |
| continue | |
| if vehicle.current_load + demand.passengers <= vehicle.capacity: | |
| cost = cost_matrix[i][pending_demands.index(demand)] | |
| if cost < best_cost: | |
| best_cost = cost | |
| best_vehicle = i | |
| best_details = assignment_details[i][pending_demands.index(demand)] | |
| if best_vehicle is not None and best_cost != float('inf'): | |
| assignments.append((available_vehicles[best_vehicle], demand, best_details)) | |
| used_vehicles.add(best_vehicle) | |
| used_demands.add(demand.id) | |
| # Execute assignments with AI insights | |
| max_assignments = min(len(assignments), 8) # Increased from 5 | |
| for vehicle, demand, details in assignments[:max_assignments]: | |
| self._assign_vehicle_to_demand_enhanced(vehicle, demand, details, ai_suggestion) | |
| logger.info(f"AI-optimized {len(assignments[:max_assignments])} assignments") | |
| def _assign_vehicle_to_demand_enhanced(self, vehicle: Vehicle, demand: Demand, details: Dict, ai_suggestion: str): | |
| """Enhanced vehicle assignment with detailed tracking""" | |
| vehicle.status = 'busy' | |
| vehicle.current_load = demand.passengers | |
| vehicle.current_trip = demand.id | |
| demand.status = 'assigned' | |
| demand.assigned_vehicle = vehicle.id | |
| demand.ai_suggestion = ai_suggestion | |
| # Calculate trip details | |
| pickup_distance = details['distance'] | |
| trip_distance = np.sqrt((demand.pickup_location[0] - demand.dropoff_location[0])**2 + | |
| (demand.pickup_location[1] - demand.dropoff_location[1])**2) * 111 | |
| # Update vehicle metrics | |
| vehicle.total_distance += pickup_distance + trip_distance | |
| vehicle.earnings += details['cost'] | |
| vehicle.location = demand.dropoff_location | |
| vehicle.last_update = self.simulation_time | |
| # Calculate estimated completion time | |
| total_duration = details['breakdown']['duration'] + (trip_distance * 2) # minutes | |
| completion_time = self.simulation_time + timedelta(minutes=total_duration) | |
| # Queue trip completion | |
| self.data_queue.put(('complete_trip', vehicle.id, completion_time, demand.id)) | |
| # Update performance stats | |
| self.performance_stats['successful_assignments'] += 1 | |
| logger.info(f"Assigned Vehicle {vehicle.id} to Demand {demand.id} (Priority {demand.priority})") | |
| def complete_trips(self): | |
| """Complete trips that have finished""" | |
| current_time = self.simulation_time | |
| while not self.data_queue.empty(): | |
| try: | |
| action, vehicle_id, completion_time, demand_id = self.data_queue.get_nowait() | |
| if action == 'complete_trip' and completion_time <= current_time: | |
| vehicle = next(v for v in self.vehicles if v.id == vehicle_id) | |
| demand = next(d for d in self.demands if d.id == demand_id) | |
| vehicle.status = 'available' | |
| vehicle.current_load = 0 | |
| vehicle.current_trip = None | |
| demand.status = 'completed' | |
| # Random maintenance check | |
| if random.random() < 0.05: # 5% chance | |
| vehicle.status = 'maintenance' | |
| vehicle.maintenance_due = True | |
| # Schedule maintenance completion | |
| maintenance_time = current_time + timedelta(minutes=random.randint(30, 120)) | |
| self.data_queue.put(('complete_maintenance', vehicle_id, maintenance_time)) | |
| logger.info(f"Completed trip: Vehicle {vehicle_id} -> Demand {demand_id}") | |
| except queue.Empty: | |
| break | |
| except Exception as e: | |
| logger.error(f"Error completing trip: {e}") | |
| def complete_maintenance(self): | |
| """Complete maintenance tasks""" | |
| current_time = self.simulation_time | |
| while not self.data_queue.empty(): | |
| try: | |
| action, vehicle_id, completion_time = self.data_queue.get_nowait() | |
| if action == 'complete_maintenance' and completion_time <= current_time: | |
| vehicle = next(v for v in self.vehicles if v.id == vehicle_id) | |
| vehicle.status = 'available' | |
| vehicle.maintenance_due = False | |
| vehicle.battery_level = 100.0 # Full charge after maintenance | |
| logger.info(f"Completed maintenance: Vehicle {vehicle_id}") | |
| except queue.Empty: | |
| break | |
| except Exception as e: | |
| logger.error(f"Error completing maintenance: {e}") | |
| def run_simulation_step(self): | |
| """Run one enhanced simulation step""" | |
| if not self.simulation_running: | |
| return | |
| start_time = time.time() | |
| # Update simulation time | |
| self.simulation_time = self.simulation_time + timedelta(hours=1) | |
| # Generate new demand | |
| self.generate_realistic_demand() | |
| # Update real-time data | |
| self.update_real_time_data() | |
| # Complete finished trips and maintenance | |
| self.complete_trips() | |
| self.complete_maintenance() | |
| # AI-optimized vehicle allocation | |
| self.optimize_vehicle_allocation_ai() | |
| # Update performance metrics | |
| step_time = time.time() - start_time | |
| self.performance_stats['average_optimization_time'] = ( | |
| self.performance_stats['average_optimization_time'] + step_time | |
| ) / 2 | |
| def start_simulation(self): | |
| """Start the enhanced simulation""" | |
| self.simulation_running = True | |
| logger.info("๐ Real-time fleet optimization simulation started!") | |
| # Test API connections | |
| from realtime_api_client import test_api_connections | |
| test_api_connections() | |
| while self.simulation_running: | |
| self.run_simulation_step() | |
| time.sleep(1) # Real-time simulation | |
| def stop_simulation(self): | |
| """Stop the simulation""" | |
| self.simulation_running = False | |
| logger.info("๐ Simulation stopped") | |
| def get_enhanced_simulation_stats(self) -> Dict: | |
| """Get comprehensive simulation statistics""" | |
| total_earnings = sum(v.earnings for v in self.vehicles) | |
| total_distance = sum(v.total_distance for v in self.vehicles) | |
| available_vehicles = len([v for v in self.vehicles if v.status == 'available']) | |
| busy_vehicles = len([v for v in self.vehicles if v.status == 'busy']) | |
| maintenance_vehicles = len([v for v in self.vehicles if v.status == 'maintenance']) | |
| pending_demands = len([d for d in self.demands if d.status == 'pending']) | |
| completed_demands = len([d for d in self.demands if d.status == 'completed']) | |
| # Calculate metrics | |
| vehicle_utilization = (busy_vehicles / len(self.vehicles)) * 100 | |
| demand_satisfaction = (completed_demands / max(len(self.demands), 1)) * 100 | |
| avg_earnings_per_vehicle = total_earnings / len(self.vehicles) | |
| # Real-time data status | |
| weather_status = "Active" if self.weather_data else "Inactive" | |
| traffic_status = "Active" if self.traffic_data else "Inactive" | |
| route_status = "Active" if self.route_data else "Inactive" | |
| return { | |
| 'total_earnings': total_earnings, | |
| 'total_distance': total_distance, | |
| 'available_vehicles': available_vehicles, | |
| 'busy_vehicles': busy_vehicles, | |
| 'maintenance_vehicles': maintenance_vehicles, | |
| 'pending_demands': pending_demands, | |
| 'completed_demands': completed_demands, | |
| 'simulation_time': self.simulation_time.strftime('%H:%M:%S'), | |
| 'total_demands': len(self.demands), | |
| 'vehicle_utilization': vehicle_utilization, | |
| 'demand_satisfaction': demand_satisfaction, | |
| 'avg_earnings_per_vehicle': avg_earnings_per_vehicle, | |
| 'weather_data_status': weather_status, | |
| 'traffic_data_status': traffic_status, | |
| 'route_data_status': route_status, | |
| 'ai_optimization_enabled': self.config.ai_optimization_enabled, | |
| 'real_time_data_enabled': self.config.real_time_data_enabled, | |
| 'performance_stats': self.performance_stats | |
| } | |
| def create_enhanced_dashboard(self): | |
| """Create enhanced interactive dashboard with real-time data""" | |
| # Vehicle locations with enhanced data | |
| vehicle_locations = pd.DataFrame([ | |
| { | |
| 'id': v.id, | |
| 'lat': v.location[0], | |
| 'lng': v.location[1], | |
| 'status': v.status, | |
| 'earnings': v.earnings, | |
| 'distance': v.total_distance, | |
| 'battery': v.battery_level, | |
| 'load': v.current_load, | |
| 'capacity': v.capacity | |
| } | |
| for v in self.vehicles | |
| ]) | |
| # Demand locations with priority and AI suggestions | |
| demand_locations = pd.DataFrame([ | |
| { | |
| 'id': d.id, | |
| 'lat': d.pickup_location[0], | |
| 'lng': d.pickup_location[1], | |
| 'status': d.status, | |
| 'priority': d.priority, | |
| 'passengers': d.passengers, | |
| 'wait_time': d.estimated_wait_time, | |
| 'assigned_vehicle': d.assigned_vehicle | |
| } | |
| for d in self.demands if d.status in ['pending', 'assigned'] | |
| ]) | |
| # Create enhanced map | |
| fig = go.Figure() | |
| # Add vehicle markers with enhanced styling | |
| status_colors = { | |
| 'available': 'green', | |
| 'busy': 'red', | |
| 'maintenance': 'orange' | |
| } | |
| for status in ['available', 'busy', 'maintenance']: | |
| vehicles = vehicle_locations[vehicle_locations['status'] == status] | |
| if not vehicles.empty: | |
| fig.add_trace(go.Scattermapbox( | |
| lat=vehicles['lat'], | |
| lon=vehicles['lng'], | |
| mode='markers', | |
| marker=go.scattermapbox.Marker( | |
| size=12, | |
| color=status_colors[status], | |
| opacity=0.8 | |
| ), | |
| name=f'Vehicles ({status.title()})', | |
| text=vehicles['id'], | |
| hovertemplate=( | |
| f'Vehicle %{{text}} ({status.title()})<br>' | |
| f'Earnings: $%{{customdata[0]:.2f}}<br>' | |
| f'Distance: %{{customdata[1]:.1f}}km<br>' | |
| f'Battery: %{{customdata[2]:.0f}}%<br>' | |
| f'Load: %{{customdata[3]}}/%{{customdata[4]}}<extra></extra>' | |
| ), | |
| customdata=vehicles[['earnings', 'distance', 'battery', 'load', 'capacity']].values | |
| )) | |
| # Add demand markers with priority-based styling | |
| if not demand_locations.empty: | |
| # Color by priority | |
| priority_colors = {1: 'lightblue', 2: 'blue', 3: 'purple', 4: 'orange', 5: 'red'} | |
| demand_locations['color'] = demand_locations['priority'].map(priority_colors) | |
| fig.add_trace(go.Scattermapbox( | |
| lat=demand_locations['lat'], | |
| lon=demand_locations['lng'], | |
| mode='markers', | |
| marker=go.scattermapbox.Marker( | |
| size=10, | |
| color=demand_locations['color'], | |
| symbol='diamond', | |
| opacity=0.8 | |
| ), | |
| name='Demands', | |
| text=demand_locations['id'], | |
| hovertemplate=( | |
| 'Demand %{text}<br>' | |
| 'Priority: %{customdata[0]}<br>' | |
| 'Passengers: %{customdata[1]}<br>' | |
| 'Status: %{customdata[2]}<br>' | |
| 'Wait Time: %{customdata[3]}min<extra></extra>' | |
| ), | |
| customdata=demand_locations[['priority', 'passengers', 'status', 'wait_time']].values | |
| )) | |
| # Add weather and traffic indicators | |
| if self.weather_data: | |
| weather_locations = list(self.weather_data.keys()) | |
| weather_df = pd.DataFrame([ | |
| { | |
| 'lat': loc[0], | |
| 'lng': loc[1], | |
| 'condition': data.condition | |
| } | |
| for loc, data in self.weather_data.items() | |
| ]) | |
| fig.add_trace(go.Scattermapbox( | |
| lat=weather_df['lat'], | |
| lon=weather_df['lng'], | |
| mode='markers', | |
| marker=go.scattermapbox.Marker( | |
| size=8, | |
| color='lightblue', | |
| symbol='circle', | |
| opacity=0.5 | |
| ), | |
| name='Weather Stations', | |
| text=weather_df['condition'], | |
| hovertemplate='Weather: %{text}<extra></extra>' | |
| )) | |
| # Get current location for map center | |
| if self.location_manager and self.location_manager.current_location: | |
| center_lat = self.location_manager.current_location.center_lat | |
| center_lng = self.location_manager.current_location.center_lng | |
| location_name = self.location_manager.current_location.name | |
| else: | |
| center_lat, center_lng = 40.7589, -73.9851 # Default to NYC | |
| location_name = "Unknown Location" | |
| fig.update_layout( | |
| mapbox=dict( | |
| style='open-street-map', | |
| center=dict(lat=center_lat, lon=center_lng), | |
| zoom=12 | |
| ), | |
| title=f'Real-time Fleet Optimization Dashboard - {location_name}', | |
| height=700, | |
| showlegend=True | |
| ) | |
| return fig | |
| # Global optimizer instance | |
| realtime_optimizer = RealTimeFleetOptimizer() | |
| def start_realtime_simulation(): | |
| """Start the real-time fleet optimization simulation""" | |
| if not realtime_optimizer.simulation_running: | |
| thread = threading.Thread(target=realtime_optimizer.start_simulation, daemon=True) | |
| thread.start() | |
| return "๐ Real-time fleet optimization simulation started! Live data integration active." | |
| return "Simulation is already running!" | |
| def stop_realtime_simulation(): | |
| """Stop the real-time fleet optimization simulation""" | |
| realtime_optimizer.stop_simulation() | |
| return "๐ Real-time simulation stopped" | |
| def get_realtime_stats(): | |
| """Get comprehensive real-time fleet statistics""" | |
| stats = realtime_optimizer.get_enhanced_simulation_stats() | |
| return json.dumps(stats, indent=2, default=str) | |
| def update_realtime_dashboard(): | |
| """Update the real-time fleet dashboard""" | |
| return realtime_optimizer.create_enhanced_dashboard() | |
| def toggle_ai_optimization(): | |
| """Toggle AI optimization on/off""" | |
| realtime_optimizer.config.ai_optimization_enabled = not realtime_optimizer.config.ai_optimization_enabled | |
| status = "enabled" if realtime_optimizer.config.ai_optimization_enabled else "disabled" | |
| return f"AI optimization {status}" | |
| def toggle_realtime_data(): | |
| """Toggle real-time data integration on/off""" | |
| realtime_optimizer.config.real_time_data_enabled = not realtime_optimizer.config.real_time_data_enabled | |
| status = "enabled" if realtime_optimizer.config.real_time_data_enabled else "disabled" | |
| return f"Real-time data integration {status}" | |
| def set_fleet_location(location_identifier: str): | |
| """Set the fleet location""" | |
| realtime_optimizer.set_location(location_identifier) | |
| if realtime_optimizer.location_manager.current_location: | |
| return f"Location set to: {realtime_optimizer.location_manager.current_location.name}" | |
| else: | |
| return f"Failed to set location: {location_identifier}" | |
| def get_available_locations(): | |
| """Get list of available locations""" | |
| if realtime_optimizer.location_manager: | |
| locations = realtime_optimizer.location_manager.get_available_locations() | |
| return "\n".join([f"- {loc}" for loc in locations]) | |
| return "No location manager available" | |
| def get_current_location_info(): | |
| """Get current location information""" | |
| if realtime_optimizer.location_manager and realtime_optimizer.location_manager.current_location: | |
| info = realtime_optimizer.location_manager.get_location_info() | |
| return f""" | |
| Current Location: {info['name']} | |
| City: {info['city']}, {info['country']} | |
| Center: {info['center'][0]:.4f}, {info['center'][1]:.4f} | |
| Hotspots: {len(info['hotspots'])} | |
| Timezone: {info['timezone']} | |
| """ | |
| return "No location set" | |
| def get_gemini_ai_recommendations(): | |
| """Get current Gemini AI recommendations""" | |
| if not realtime_optimizer.config.ai_optimization_enabled: | |
| return "๐ค AI optimization is disabled. Enable it to get Gemini AI recommendations." | |
| try: | |
| # Get AI recommendations | |
| ai_suggestion = realtime_optimizer.api_client.get_ai_optimization_suggestion( | |
| realtime_optimizer.vehicles, | |
| realtime_optimizer.demands, | |
| realtime_optimizer.traffic_data, | |
| realtime_optimizer.weather_data | |
| ) | |
| return ai_suggestion | |
| except Exception as e: | |
| return f"โ Error getting Gemini AI recommendations: {str(e)}" | |
| def create_custom_location(name: str, center_lat: float, center_lng: float): | |
| """Create a custom location""" | |
| bounds = realtime_optimizer.location_manager.get_bounds_from_center(center_lat, center_lng, 10) | |
| location = realtime_optimizer.create_custom_location(name, center_lat, center_lng, bounds) | |
| if location: | |
| return f"Custom location created: {name} at {center_lat:.4f}, {center_lng:.4f}" | |
| else: | |
| return f"Failed to create custom location: {name}" | |
| # Enhanced Gradio interface | |
| def create_realtime_fleet_interface(): | |
| with gr.Blocks(title="Real-time Fleet Resource Optimization", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# ๐ Real-time Fleet Resource Optimization with AI Agents") | |
| gr.Markdown("### Dynamic vehicle allocation with live traffic, weather, and AI decision making") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### ๐ฎ Simulation Controls") | |
| start_btn = gr.Button("๐ Start Real-time Simulation", variant="primary") | |
| stop_btn = gr.Button("๐ Stop Simulation", variant="secondary") | |
| gr.Markdown("### ๐ Location Controls") | |
| location_dropdown = gr.Dropdown( | |
| choices=["new_york", "london", "tokyo", "singapore"], | |
| value="new_york", | |
| label="Select Location" | |
| ) | |
| set_location_btn = gr.Button("๐ Set Location") | |
| location_info = gr.Textbox(label="Current Location", lines=4, interactive=False) | |
| gr.Markdown("### ๐๏ธ Custom Location") | |
| custom_name = gr.Textbox(label="Location Name", placeholder="e.g., San Francisco") | |
| custom_lat = gr.Number(label="Latitude", value=37.7749) | |
| custom_lng = gr.Number(label="Longitude", value=-122.4194) | |
| create_custom_btn = gr.Button("๐๏ธ Create Custom Location") | |
| gr.Markdown("### ๐ค AI & Data Controls") | |
| ai_toggle_btn = gr.Button("๐ง Toggle AI Optimization") | |
| data_toggle_btn = gr.Button("๐ก Toggle Real-time Data") | |
| ai_status = gr.Textbox(label="AI Status", value="Enabled", interactive=False) | |
| data_status = gr.Textbox(label="Data Status", value="Enabled", interactive=False) | |
| gr.Markdown("### ๐ค Gemini AI Recommendations") | |
| ai_recommendations_btn = gr.Button("๐ง Get AI Recommendations") | |
| ai_recommendations_output = gr.Textbox(label="Gemini AI Optimization Suggestions", lines=10, interactive=False) | |
| gr.Markdown("### ๐ Real-time Statistics") | |
| stats_btn = gr.Button("๐ Update Stats") | |
| stats_output = gr.Textbox(label="Enhanced Fleet Statistics", lines=15, interactive=False) | |
| gr.Markdown("### โ๏ธ Configuration") | |
| gr.Markdown(f""" | |
| - **Total Vehicles**: {realtime_optimizer.config.num_vehicles} | |
| - **Vehicle Capacity**: {realtime_optimizer.config.vehicle_capacity} passengers | |
| - **Max Distance**: {realtime_optimizer.config.max_distance} km | |
| - **Base Cost**: ${realtime_optimizer.config.base_cost_per_km}/km | |
| - **Update Interval**: {realtime_optimizer.config.update_interval}s | |
| - **AI Optimization**: {realtime_optimizer.config.ai_optimization_enabled} | |
| - **Real-time Data**: {realtime_optimizer.config.real_time_data_enabled} | |
| """) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### ๐บ๏ธ Live Fleet Dashboard") | |
| dashboard_output = gr.Plot(label="Real-time Vehicle Locations & Demand") | |
| # Event handlers | |
| start_btn.click( | |
| fn=start_realtime_simulation, | |
| outputs=gr.Textbox(label="Status", lines=2) | |
| ) | |
| stop_btn.click( | |
| fn=stop_realtime_simulation, | |
| outputs=gr.Textbox(label="Status", lines=2) | |
| ) | |
| set_location_btn.click( | |
| fn=set_fleet_location, | |
| inputs=location_dropdown, | |
| outputs=location_info | |
| ) | |
| create_custom_btn.click( | |
| fn=create_custom_location, | |
| inputs=[custom_name, custom_lat, custom_lng], | |
| outputs=location_info | |
| ) | |
| ai_toggle_btn.click( | |
| fn=toggle_ai_optimization, | |
| outputs=ai_status | |
| ) | |
| data_toggle_btn.click( | |
| fn=toggle_realtime_data, | |
| outputs=data_status | |
| ) | |
| ai_recommendations_btn.click( | |
| fn=get_gemini_ai_recommendations, | |
| outputs=ai_recommendations_output | |
| ) | |
| stats_btn.click( | |
| fn=get_realtime_stats, | |
| outputs=stats_output | |
| ) | |
| # Auto-refresh dashboard and location info | |
| demo.load( | |
| fn=update_realtime_dashboard, | |
| outputs=dashboard_output | |
| ) | |
| demo.load( | |
| fn=get_current_location_info, | |
| outputs=location_info | |
| ) | |
| # Periodic updates | |
| demo.load( | |
| fn=lambda: None, | |
| every=10 # Update every 10 seconds for real-time feel | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_realtime_fleet_interface() | |
| demo.launch(share=True, server_name="0.0.0.0", server_port=7860) | |