DataSprint / realtime_fleet_optimizer.py
sujana05's picture
Upload folder using huggingface_hub
b4ce589 verified
"""
Real-time Fleet Resource Optimization with AI Agents
Enhanced version with live API integration for traffic, weather, and AI decision making
"""
import pandas as pd
import numpy as np
import requests
import json
import time
from datetime import datetime, timedelta
import plotly.graph_objs as go
import plotly.express as px
from plotly.subplots import make_subplots
import gradio as gr
from dataclasses import dataclass, asdict
from typing import List, Dict, Tuple, Optional
import threading
import queue
import random
import logging
from concurrent.futures import ThreadPoolExecutor
import asyncio
# Import our real-time API client and location manager
from realtime_api_client import RealTimeAPIClient, TrafficData, WeatherData, RouteData
from location_config import LocationManager, initialize_location_manager
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration
class FleetConfig:
def __init__(self):
self.num_vehicles = 50
self.vehicle_capacity = 4
self.max_distance = 100 # km
self.base_cost_per_km = 0.5
self.weather_impact = {
'clear': 1.0,
'clouds': 1.1,
'rain': 1.3,
'snow': 1.6,
'storm': 2.0,
'mist': 1.2,
'fog': 1.4
}
self.traffic_impact = {
'low': 1.0,
'medium': 1.3,
'high': 1.8,
'severe': 2.5
}
self.ai_optimization_enabled = True
self.real_time_data_enabled = True
self.update_interval = 30 # seconds
@dataclass
class Vehicle:
id: int
location: Tuple[float, float] # lat, lng
status: str # 'available', 'busy', 'maintenance'
capacity: int
current_load: int
total_distance: float
earnings: float
last_update: datetime
current_trip: Optional[int] = None # demand ID if on trip
battery_level: float = 100.0 # for electric vehicles
maintenance_due: bool = False
@dataclass
class Demand:
id: int
pickup_location: Tuple[float, float]
dropoff_location: Tuple[float, float]
passengers: int
priority: int # 1-5, 5 being highest
timestamp: datetime
status: str # 'pending', 'assigned', 'completed', 'cancelled'
estimated_wait_time: Optional[int] = None # minutes
assigned_vehicle: Optional[int] = None
ai_suggestion: Optional[str] = None
@dataclass
class OptimizationMetrics:
total_earnings: float
total_distance: float
vehicle_utilization: float
average_response_time: float
demand_satisfaction_rate: float
cost_efficiency: float
ai_optimization_impact: float
timestamp: datetime
class RealTimeFleetOptimizer:
def __init__(self, location_identifier: str = 'new_york'):
self.config = FleetConfig()
self.vehicles = []
self.demands = []
self.weather_data = {}
self.traffic_data = {}
self.route_data = {}
self.simulation_running = False
self.data_queue = queue.Queue()
self.metrics_history = []
# Initialize location manager
self.location_manager = initialize_location_manager("AIzaSyBTA3eACtpCPR9DDi8EhOt1cI7Cy08Mkfg")
self.set_location(location_identifier)
# Initialize API client with location manager
self.api_client = RealTimeAPIClient(self.location_manager)
# Initialize vehicles
self._initialize_vehicles()
# Simulation parameters
self.simulation_time = datetime.now()
self.time_step = 60 # seconds
self.last_api_update = datetime.now()
# Performance tracking
self.performance_stats = {
'total_api_calls': 0,
'successful_assignments': 0,
'failed_assignments': 0,
'ai_suggestions_generated': 0,
'average_optimization_time': 0.0
}
def set_location(self, location_identifier: str):
"""Set the current location for the fleet"""
if self.location_manager.set_location(location_identifier):
logger.info(f"Location set to: {self.location_manager.current_location.name}")
# Reinitialize vehicles for new location
self.vehicles = []
self._initialize_vehicles()
else:
logger.error(f"Failed to set location: {location_identifier}")
def create_custom_location(self, name: str, center_lat: float, center_lng: float,
bounds: Dict[str, float], hotspots: List[Dict] = None):
"""Create and set a custom location"""
location = self.location_manager.create_custom_location(name, center_lat, center_lng, bounds, hotspots)
if location:
# Reinitialize vehicles for new location
self.vehicles = []
self._initialize_vehicles()
logger.info(f"Custom location created and set: {name}")
return location
def _initialize_vehicles(self):
"""Initialize fleet vehicles with random locations in the current area"""
if not self.location_manager or not self.location_manager.current_location:
logger.error("No location set. Cannot initialize vehicles.")
return
location = self.location_manager.current_location
hotspots = location.hotspots
for i in range(self.config.num_vehicles):
# Distribute vehicles around current location hotspots
base_location = random.choice(hotspots)
location_coords = (
base_location['lat'] + random.uniform(-0.01, 0.01),
base_location['lng'] + random.uniform(-0.01, 0.01)
)
vehicle = Vehicle(
id=i,
location=location_coords,
status='available',
capacity=self.config.vehicle_capacity,
current_load=0,
total_distance=0.0,
earnings=0.0,
last_update=datetime.now(),
battery_level=random.uniform(80, 100)
)
self.vehicles.append(vehicle)
logger.info(f"Initialized {len(self.vehicles)} vehicles in {location.name}")
def generate_realistic_demand(self):
"""Generate realistic demand patterns based on time and location"""
if not self.location_manager or not self.location_manager.current_location:
logger.error("No location set. Cannot generate demand.")
return
location = self.location_manager.current_location
hotspots = location.hotspots
hour = self.simulation_time.hour
num_demands = 0
# Generate demands for each hotspot
for hotspot in hotspots:
base_rate = hotspot['base_rate']
# Adjust rate based on peak hours
if hour in hotspot['peak_hours']:
base_rate *= 1.5
# Weekend vs weekday
if self.simulation_time.weekday() >= 5: # Weekend
base_rate *= 0.7
# Generate demand
if random.random() < base_rate:
# Create realistic dropoff location
dropoff = (
hotspot['lat'] + random.uniform(-0.02, 0.02),
hotspot['lng'] + random.uniform(-0.02, 0.02)
)
# Determine priority based on time and location
priority = 3 # default
if hour in [7, 8, 17, 18]: # Rush hour
priority = random.choices([4, 5], weights=[0.7, 0.3])[0]
elif hour in [22, 23, 0, 1]: # Late night
priority = random.choices([1, 2, 3], weights=[0.3, 0.4, 0.3])[0]
demand = Demand(
id=len(self.demands),
pickup_location=(hotspot['lat'], hotspot['lng']),
dropoff_location=dropoff,
passengers=random.choices([1, 2, 3, 4], weights=[0.4, 0.3, 0.2, 0.1])[0],
priority=priority,
timestamp=self.simulation_time,
status='pending'
)
self.demands.append(demand)
num_demands += 1
logger.info(f"Generated {num_demands} new demands at {hour}:00 in {location.name}")
def update_real_time_data(self):
"""Update real-time traffic, weather, and route data"""
if not self.config.real_time_data_enabled:
return
current_time = datetime.now()
if (current_time - self.last_api_update).seconds < self.config.update_interval:
return
logger.info("Updating real-time data...")
start_time = time.time()
# Get unique locations for vehicles and demands
vehicle_locations = [v.location for v in self.vehicles]
demand_locations = [d.pickup_location for d in self.demands if d.status == 'pending']
all_locations = list(set(vehicle_locations + demand_locations))
# Update traffic data
self.traffic_data = self.api_client.get_batch_traffic_data(all_locations)
self.performance_stats['total_api_calls'] += len(all_locations)
# Update weather data
self.weather_data = self.api_client.get_batch_weather_data(all_locations)
self.performance_stats['total_api_calls'] += len(all_locations)
# Update route data for pending demands
for demand in [d for d in self.demands if d.status == 'pending']:
for vehicle in [v for v in self.vehicles if v.status == 'available']:
route_key = (vehicle.location, demand.pickup_location)
if route_key not in self.route_data:
route = self.api_client.get_route_data(vehicle.location, demand.pickup_location)
if route:
self.route_data[route_key] = route
self.performance_stats['total_api_calls'] += 1
self.last_api_update = current_time
update_time = time.time() - start_time
logger.info(f"Real-time data updated in {update_time:.2f}s")
def calculate_enhanced_cost(self, vehicle_id: int, pickup_loc: Tuple[float, float], dropoff_loc: Tuple[float, float]) -> Tuple[float, float, Dict]:
"""Calculate enhanced cost considering real-time data"""
# Get route data
route_key = (self.vehicles[vehicle_id].location, pickup_loc)
route = self.route_data.get(route_key)
if route:
distance = route.distance / 1000 # Convert to km
base_duration = route.duration / 60 # Convert to minutes
else:
# Fallback to simple distance calculation
distance = np.sqrt((pickup_loc[0] - self.vehicles[vehicle_id].location[0])**2 +
(pickup_loc[1] - self.vehicles[vehicle_id].location[1])**2) * 111
base_duration = distance * 2 # Rough estimate
# Get real-time factors
weather = self.weather_data.get(pickup_loc)
traffic = self.traffic_data.get(pickup_loc)
weather_multiplier = 1.0
traffic_multiplier = 1.0
if weather:
weather_multiplier = self.config.weather_impact.get(weather.condition, 1.0)
if traffic:
traffic_multiplier = self.config.traffic_impact.get(traffic.congestion_level, 1.0)
# Calculate total cost
base_cost = distance * self.config.base_cost_per_km
total_cost = base_cost * weather_multiplier * traffic_multiplier
# Add time-based costs
time_cost = base_duration * 0.1 # $0.10 per minute
total_cost += time_cost
cost_breakdown = {
'base_cost': base_cost,
'weather_multiplier': weather_multiplier,
'traffic_multiplier': traffic_multiplier,
'time_cost': time_cost,
'total_cost': total_cost,
'distance': distance,
'duration': base_duration
}
return total_cost, distance, cost_breakdown
def get_ai_optimization_suggestion(self, pending_demands: List[Demand], available_vehicles: List[Vehicle]) -> str:
"""Get AI-powered optimization suggestions"""
if not self.config.ai_optimization_enabled:
return "AI optimization disabled"
try:
# Prepare context for AI
context = f"""
Fleet Optimization Scenario - {datetime.now().strftime('%H:%M:%S')}:
Current Fleet Status:
- Total Vehicles: {len(self.vehicles)}
- Available: {len(available_vehicles)}
- Busy: {len([v for v in self.vehicles if v.status == 'busy'])}
- Maintenance: {len([v for v in self.vehicles if v.status == 'maintenance'])}
Current Demand:
- Pending Demands: {len(pending_demands)}
- High Priority (4-5): {len([d for d in pending_demands if d.priority >= 4])}
- Medium Priority (2-3): {len([d for d in pending_demands if 2 <= d.priority <= 3])}
- Low Priority (1): {len([d for d in pending_demands if d.priority == 1])}
Current Conditions:
- Time: {self.simulation_time.strftime('%H:%M')}
- Weather: {list(self.weather_data.values())[0].condition if self.weather_data else 'Unknown'}
- Traffic: {list(self.traffic_data.values())[0].congestion_level if self.traffic_data else 'Unknown'}
Performance Metrics:
- Total Earnings: ${sum(v.earnings for v in self.vehicles):.2f}
- Vehicle Utilization: {len([v for v in self.vehicles if v.status == 'busy']) / len(self.vehicles) * 100:.1f}%
- Average Response Time: {self.performance_stats.get('average_response_time', 0):.1f} minutes
Provide specific optimization recommendations for:
1. Vehicle allocation strategy
2. Priority handling
3. Route optimization
4. Performance improvements
"""
suggestion = self.api_client.get_ai_optimization_suggestion(
self.vehicles, self.demands, self.traffic_data, self.weather_data
)
self.performance_stats['ai_suggestions_generated'] += 1
return suggestion
except Exception as e:
logger.error(f"Error generating AI suggestion: {e}")
return f"AI optimization error: {str(e)}"
def optimize_vehicle_allocation_ai(self):
"""AI-enhanced vehicle allocation optimization"""
pending_demands = [d for d in self.demands if d.status == 'pending']
available_vehicles = [v for v in self.vehicles if v.status == 'available']
if not pending_demands or not available_vehicles:
return
# Get AI suggestion
ai_suggestion = self.get_ai_optimization_suggestion(pending_demands, available_vehicles)
# Create enhanced cost matrix
cost_matrix = []
assignment_details = []
for vehicle in available_vehicles:
vehicle_costs = []
vehicle_details = []
for demand in pending_demands:
cost, distance, breakdown = self.calculate_enhanced_cost(
vehicle.id, vehicle.location, demand.pickup_location
)
# Add priority penalty
priority_penalty = (6 - demand.priority) * 5
# Add capacity check
if vehicle.current_load + demand.passengers > vehicle.capacity:
total_cost = float('inf')
else:
total_cost = cost + priority_penalty
vehicle_costs.append(total_cost)
vehicle_details.append({
'cost': cost,
'distance': distance,
'priority_penalty': priority_penalty,
'breakdown': breakdown
})
cost_matrix.append(vehicle_costs)
assignment_details.append(vehicle_details)
# Enhanced assignment algorithm
assignments = []
used_vehicles = set()
used_demands = set()
# Sort demands by priority and timestamp
sorted_demands = sorted(pending_demands, key=lambda x: (x.priority, -x.timestamp.timestamp()), reverse=True)
for demand in sorted_demands:
best_vehicle = None
best_cost = float('inf')
best_details = None
for i, vehicle in enumerate(available_vehicles):
if i in used_vehicles:
continue
if vehicle.current_load + demand.passengers <= vehicle.capacity:
cost = cost_matrix[i][pending_demands.index(demand)]
if cost < best_cost:
best_cost = cost
best_vehicle = i
best_details = assignment_details[i][pending_demands.index(demand)]
if best_vehicle is not None and best_cost != float('inf'):
assignments.append((available_vehicles[best_vehicle], demand, best_details))
used_vehicles.add(best_vehicle)
used_demands.add(demand.id)
# Execute assignments with AI insights
max_assignments = min(len(assignments), 8) # Increased from 5
for vehicle, demand, details in assignments[:max_assignments]:
self._assign_vehicle_to_demand_enhanced(vehicle, demand, details, ai_suggestion)
logger.info(f"AI-optimized {len(assignments[:max_assignments])} assignments")
def _assign_vehicle_to_demand_enhanced(self, vehicle: Vehicle, demand: Demand, details: Dict, ai_suggestion: str):
"""Enhanced vehicle assignment with detailed tracking"""
vehicle.status = 'busy'
vehicle.current_load = demand.passengers
vehicle.current_trip = demand.id
demand.status = 'assigned'
demand.assigned_vehicle = vehicle.id
demand.ai_suggestion = ai_suggestion
# Calculate trip details
pickup_distance = details['distance']
trip_distance = np.sqrt((demand.pickup_location[0] - demand.dropoff_location[0])**2 +
(demand.pickup_location[1] - demand.dropoff_location[1])**2) * 111
# Update vehicle metrics
vehicle.total_distance += pickup_distance + trip_distance
vehicle.earnings += details['cost']
vehicle.location = demand.dropoff_location
vehicle.last_update = self.simulation_time
# Calculate estimated completion time
total_duration = details['breakdown']['duration'] + (trip_distance * 2) # minutes
completion_time = self.simulation_time + timedelta(minutes=total_duration)
# Queue trip completion
self.data_queue.put(('complete_trip', vehicle.id, completion_time, demand.id))
# Update performance stats
self.performance_stats['successful_assignments'] += 1
logger.info(f"Assigned Vehicle {vehicle.id} to Demand {demand.id} (Priority {demand.priority})")
def complete_trips(self):
"""Complete trips that have finished"""
current_time = self.simulation_time
while not self.data_queue.empty():
try:
action, vehicle_id, completion_time, demand_id = self.data_queue.get_nowait()
if action == 'complete_trip' and completion_time <= current_time:
vehicle = next(v for v in self.vehicles if v.id == vehicle_id)
demand = next(d for d in self.demands if d.id == demand_id)
vehicle.status = 'available'
vehicle.current_load = 0
vehicle.current_trip = None
demand.status = 'completed'
# Random maintenance check
if random.random() < 0.05: # 5% chance
vehicle.status = 'maintenance'
vehicle.maintenance_due = True
# Schedule maintenance completion
maintenance_time = current_time + timedelta(minutes=random.randint(30, 120))
self.data_queue.put(('complete_maintenance', vehicle_id, maintenance_time))
logger.info(f"Completed trip: Vehicle {vehicle_id} -> Demand {demand_id}")
except queue.Empty:
break
except Exception as e:
logger.error(f"Error completing trip: {e}")
def complete_maintenance(self):
"""Complete maintenance tasks"""
current_time = self.simulation_time
while not self.data_queue.empty():
try:
action, vehicle_id, completion_time = self.data_queue.get_nowait()
if action == 'complete_maintenance' and completion_time <= current_time:
vehicle = next(v for v in self.vehicles if v.id == vehicle_id)
vehicle.status = 'available'
vehicle.maintenance_due = False
vehicle.battery_level = 100.0 # Full charge after maintenance
logger.info(f"Completed maintenance: Vehicle {vehicle_id}")
except queue.Empty:
break
except Exception as e:
logger.error(f"Error completing maintenance: {e}")
def run_simulation_step(self):
"""Run one enhanced simulation step"""
if not self.simulation_running:
return
start_time = time.time()
# Update simulation time
self.simulation_time = self.simulation_time + timedelta(hours=1)
# Generate new demand
self.generate_realistic_demand()
# Update real-time data
self.update_real_time_data()
# Complete finished trips and maintenance
self.complete_trips()
self.complete_maintenance()
# AI-optimized vehicle allocation
self.optimize_vehicle_allocation_ai()
# Update performance metrics
step_time = time.time() - start_time
self.performance_stats['average_optimization_time'] = (
self.performance_stats['average_optimization_time'] + step_time
) / 2
def start_simulation(self):
"""Start the enhanced simulation"""
self.simulation_running = True
logger.info("๐Ÿš— Real-time fleet optimization simulation started!")
# Test API connections
from realtime_api_client import test_api_connections
test_api_connections()
while self.simulation_running:
self.run_simulation_step()
time.sleep(1) # Real-time simulation
def stop_simulation(self):
"""Stop the simulation"""
self.simulation_running = False
logger.info("๐Ÿ›‘ Simulation stopped")
def get_enhanced_simulation_stats(self) -> Dict:
"""Get comprehensive simulation statistics"""
total_earnings = sum(v.earnings for v in self.vehicles)
total_distance = sum(v.total_distance for v in self.vehicles)
available_vehicles = len([v for v in self.vehicles if v.status == 'available'])
busy_vehicles = len([v for v in self.vehicles if v.status == 'busy'])
maintenance_vehicles = len([v for v in self.vehicles if v.status == 'maintenance'])
pending_demands = len([d for d in self.demands if d.status == 'pending'])
completed_demands = len([d for d in self.demands if d.status == 'completed'])
# Calculate metrics
vehicle_utilization = (busy_vehicles / len(self.vehicles)) * 100
demand_satisfaction = (completed_demands / max(len(self.demands), 1)) * 100
avg_earnings_per_vehicle = total_earnings / len(self.vehicles)
# Real-time data status
weather_status = "Active" if self.weather_data else "Inactive"
traffic_status = "Active" if self.traffic_data else "Inactive"
route_status = "Active" if self.route_data else "Inactive"
return {
'total_earnings': total_earnings,
'total_distance': total_distance,
'available_vehicles': available_vehicles,
'busy_vehicles': busy_vehicles,
'maintenance_vehicles': maintenance_vehicles,
'pending_demands': pending_demands,
'completed_demands': completed_demands,
'simulation_time': self.simulation_time.strftime('%H:%M:%S'),
'total_demands': len(self.demands),
'vehicle_utilization': vehicle_utilization,
'demand_satisfaction': demand_satisfaction,
'avg_earnings_per_vehicle': avg_earnings_per_vehicle,
'weather_data_status': weather_status,
'traffic_data_status': traffic_status,
'route_data_status': route_status,
'ai_optimization_enabled': self.config.ai_optimization_enabled,
'real_time_data_enabled': self.config.real_time_data_enabled,
'performance_stats': self.performance_stats
}
def create_enhanced_dashboard(self):
"""Create enhanced interactive dashboard with real-time data"""
# Vehicle locations with enhanced data
vehicle_locations = pd.DataFrame([
{
'id': v.id,
'lat': v.location[0],
'lng': v.location[1],
'status': v.status,
'earnings': v.earnings,
'distance': v.total_distance,
'battery': v.battery_level,
'load': v.current_load,
'capacity': v.capacity
}
for v in self.vehicles
])
# Demand locations with priority and AI suggestions
demand_locations = pd.DataFrame([
{
'id': d.id,
'lat': d.pickup_location[0],
'lng': d.pickup_location[1],
'status': d.status,
'priority': d.priority,
'passengers': d.passengers,
'wait_time': d.estimated_wait_time,
'assigned_vehicle': d.assigned_vehicle
}
for d in self.demands if d.status in ['pending', 'assigned']
])
# Create enhanced map
fig = go.Figure()
# Add vehicle markers with enhanced styling
status_colors = {
'available': 'green',
'busy': 'red',
'maintenance': 'orange'
}
for status in ['available', 'busy', 'maintenance']:
vehicles = vehicle_locations[vehicle_locations['status'] == status]
if not vehicles.empty:
fig.add_trace(go.Scattermapbox(
lat=vehicles['lat'],
lon=vehicles['lng'],
mode='markers',
marker=go.scattermapbox.Marker(
size=12,
color=status_colors[status],
opacity=0.8
),
name=f'Vehicles ({status.title()})',
text=vehicles['id'],
hovertemplate=(
f'Vehicle %{{text}} ({status.title()})<br>'
f'Earnings: $%{{customdata[0]:.2f}}<br>'
f'Distance: %{{customdata[1]:.1f}}km<br>'
f'Battery: %{{customdata[2]:.0f}}%<br>'
f'Load: %{{customdata[3]}}/%{{customdata[4]}}<extra></extra>'
),
customdata=vehicles[['earnings', 'distance', 'battery', 'load', 'capacity']].values
))
# Add demand markers with priority-based styling
if not demand_locations.empty:
# Color by priority
priority_colors = {1: 'lightblue', 2: 'blue', 3: 'purple', 4: 'orange', 5: 'red'}
demand_locations['color'] = demand_locations['priority'].map(priority_colors)
fig.add_trace(go.Scattermapbox(
lat=demand_locations['lat'],
lon=demand_locations['lng'],
mode='markers',
marker=go.scattermapbox.Marker(
size=10,
color=demand_locations['color'],
symbol='diamond',
opacity=0.8
),
name='Demands',
text=demand_locations['id'],
hovertemplate=(
'Demand %{text}<br>'
'Priority: %{customdata[0]}<br>'
'Passengers: %{customdata[1]}<br>'
'Status: %{customdata[2]}<br>'
'Wait Time: %{customdata[3]}min<extra></extra>'
),
customdata=demand_locations[['priority', 'passengers', 'status', 'wait_time']].values
))
# Add weather and traffic indicators
if self.weather_data:
weather_locations = list(self.weather_data.keys())
weather_df = pd.DataFrame([
{
'lat': loc[0],
'lng': loc[1],
'condition': data.condition
}
for loc, data in self.weather_data.items()
])
fig.add_trace(go.Scattermapbox(
lat=weather_df['lat'],
lon=weather_df['lng'],
mode='markers',
marker=go.scattermapbox.Marker(
size=8,
color='lightblue',
symbol='circle',
opacity=0.5
),
name='Weather Stations',
text=weather_df['condition'],
hovertemplate='Weather: %{text}<extra></extra>'
))
# Get current location for map center
if self.location_manager and self.location_manager.current_location:
center_lat = self.location_manager.current_location.center_lat
center_lng = self.location_manager.current_location.center_lng
location_name = self.location_manager.current_location.name
else:
center_lat, center_lng = 40.7589, -73.9851 # Default to NYC
location_name = "Unknown Location"
fig.update_layout(
mapbox=dict(
style='open-street-map',
center=dict(lat=center_lat, lon=center_lng),
zoom=12
),
title=f'Real-time Fleet Optimization Dashboard - {location_name}',
height=700,
showlegend=True
)
return fig
# Global optimizer instance
realtime_optimizer = RealTimeFleetOptimizer()
def start_realtime_simulation():
"""Start the real-time fleet optimization simulation"""
if not realtime_optimizer.simulation_running:
thread = threading.Thread(target=realtime_optimizer.start_simulation, daemon=True)
thread.start()
return "๐Ÿš— Real-time fleet optimization simulation started! Live data integration active."
return "Simulation is already running!"
def stop_realtime_simulation():
"""Stop the real-time fleet optimization simulation"""
realtime_optimizer.stop_simulation()
return "๐Ÿ›‘ Real-time simulation stopped"
def get_realtime_stats():
"""Get comprehensive real-time fleet statistics"""
stats = realtime_optimizer.get_enhanced_simulation_stats()
return json.dumps(stats, indent=2, default=str)
def update_realtime_dashboard():
"""Update the real-time fleet dashboard"""
return realtime_optimizer.create_enhanced_dashboard()
def toggle_ai_optimization():
"""Toggle AI optimization on/off"""
realtime_optimizer.config.ai_optimization_enabled = not realtime_optimizer.config.ai_optimization_enabled
status = "enabled" if realtime_optimizer.config.ai_optimization_enabled else "disabled"
return f"AI optimization {status}"
def toggle_realtime_data():
"""Toggle real-time data integration on/off"""
realtime_optimizer.config.real_time_data_enabled = not realtime_optimizer.config.real_time_data_enabled
status = "enabled" if realtime_optimizer.config.real_time_data_enabled else "disabled"
return f"Real-time data integration {status}"
def set_fleet_location(location_identifier: str):
"""Set the fleet location"""
realtime_optimizer.set_location(location_identifier)
if realtime_optimizer.location_manager.current_location:
return f"Location set to: {realtime_optimizer.location_manager.current_location.name}"
else:
return f"Failed to set location: {location_identifier}"
def get_available_locations():
"""Get list of available locations"""
if realtime_optimizer.location_manager:
locations = realtime_optimizer.location_manager.get_available_locations()
return "\n".join([f"- {loc}" for loc in locations])
return "No location manager available"
def get_current_location_info():
"""Get current location information"""
if realtime_optimizer.location_manager and realtime_optimizer.location_manager.current_location:
info = realtime_optimizer.location_manager.get_location_info()
return f"""
Current Location: {info['name']}
City: {info['city']}, {info['country']}
Center: {info['center'][0]:.4f}, {info['center'][1]:.4f}
Hotspots: {len(info['hotspots'])}
Timezone: {info['timezone']}
"""
return "No location set"
def get_gemini_ai_recommendations():
"""Get current Gemini AI recommendations"""
if not realtime_optimizer.config.ai_optimization_enabled:
return "๐Ÿค– AI optimization is disabled. Enable it to get Gemini AI recommendations."
try:
# Get AI recommendations
ai_suggestion = realtime_optimizer.api_client.get_ai_optimization_suggestion(
realtime_optimizer.vehicles,
realtime_optimizer.demands,
realtime_optimizer.traffic_data,
realtime_optimizer.weather_data
)
return ai_suggestion
except Exception as e:
return f"โŒ Error getting Gemini AI recommendations: {str(e)}"
def create_custom_location(name: str, center_lat: float, center_lng: float):
"""Create a custom location"""
bounds = realtime_optimizer.location_manager.get_bounds_from_center(center_lat, center_lng, 10)
location = realtime_optimizer.create_custom_location(name, center_lat, center_lng, bounds)
if location:
return f"Custom location created: {name} at {center_lat:.4f}, {center_lng:.4f}"
else:
return f"Failed to create custom location: {name}"
# Enhanced Gradio interface
def create_realtime_fleet_interface():
with gr.Blocks(title="Real-time Fleet Resource Optimization", theme=gr.themes.Soft()) as demo:
gr.Markdown("# ๐Ÿš— Real-time Fleet Resource Optimization with AI Agents")
gr.Markdown("### Dynamic vehicle allocation with live traffic, weather, and AI decision making")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### ๐ŸŽฎ Simulation Controls")
start_btn = gr.Button("๐Ÿš€ Start Real-time Simulation", variant="primary")
stop_btn = gr.Button("๐Ÿ›‘ Stop Simulation", variant="secondary")
gr.Markdown("### ๐ŸŒ Location Controls")
location_dropdown = gr.Dropdown(
choices=["new_york", "london", "tokyo", "singapore"],
value="new_york",
label="Select Location"
)
set_location_btn = gr.Button("๐Ÿ“ Set Location")
location_info = gr.Textbox(label="Current Location", lines=4, interactive=False)
gr.Markdown("### ๐Ÿ—๏ธ Custom Location")
custom_name = gr.Textbox(label="Location Name", placeholder="e.g., San Francisco")
custom_lat = gr.Number(label="Latitude", value=37.7749)
custom_lng = gr.Number(label="Longitude", value=-122.4194)
create_custom_btn = gr.Button("๐Ÿ—๏ธ Create Custom Location")
gr.Markdown("### ๐Ÿค– AI & Data Controls")
ai_toggle_btn = gr.Button("๐Ÿง  Toggle AI Optimization")
data_toggle_btn = gr.Button("๐Ÿ“ก Toggle Real-time Data")
ai_status = gr.Textbox(label="AI Status", value="Enabled", interactive=False)
data_status = gr.Textbox(label="Data Status", value="Enabled", interactive=False)
gr.Markdown("### ๐Ÿค– Gemini AI Recommendations")
ai_recommendations_btn = gr.Button("๐Ÿง  Get AI Recommendations")
ai_recommendations_output = gr.Textbox(label="Gemini AI Optimization Suggestions", lines=10, interactive=False)
gr.Markdown("### ๐Ÿ“Š Real-time Statistics")
stats_btn = gr.Button("๐Ÿ“ˆ Update Stats")
stats_output = gr.Textbox(label="Enhanced Fleet Statistics", lines=15, interactive=False)
gr.Markdown("### โš™๏ธ Configuration")
gr.Markdown(f"""
- **Total Vehicles**: {realtime_optimizer.config.num_vehicles}
- **Vehicle Capacity**: {realtime_optimizer.config.vehicle_capacity} passengers
- **Max Distance**: {realtime_optimizer.config.max_distance} km
- **Base Cost**: ${realtime_optimizer.config.base_cost_per_km}/km
- **Update Interval**: {realtime_optimizer.config.update_interval}s
- **AI Optimization**: {realtime_optimizer.config.ai_optimization_enabled}
- **Real-time Data**: {realtime_optimizer.config.real_time_data_enabled}
""")
with gr.Column(scale=2):
gr.Markdown("### ๐Ÿ—บ๏ธ Live Fleet Dashboard")
dashboard_output = gr.Plot(label="Real-time Vehicle Locations & Demand")
# Event handlers
start_btn.click(
fn=start_realtime_simulation,
outputs=gr.Textbox(label="Status", lines=2)
)
stop_btn.click(
fn=stop_realtime_simulation,
outputs=gr.Textbox(label="Status", lines=2)
)
set_location_btn.click(
fn=set_fleet_location,
inputs=location_dropdown,
outputs=location_info
)
create_custom_btn.click(
fn=create_custom_location,
inputs=[custom_name, custom_lat, custom_lng],
outputs=location_info
)
ai_toggle_btn.click(
fn=toggle_ai_optimization,
outputs=ai_status
)
data_toggle_btn.click(
fn=toggle_realtime_data,
outputs=data_status
)
ai_recommendations_btn.click(
fn=get_gemini_ai_recommendations,
outputs=ai_recommendations_output
)
stats_btn.click(
fn=get_realtime_stats,
outputs=stats_output
)
# Auto-refresh dashboard and location info
demo.load(
fn=update_realtime_dashboard,
outputs=dashboard_output
)
demo.load(
fn=get_current_location_info,
outputs=location_info
)
# Periodic updates
demo.load(
fn=lambda: None,
every=10 # Update every 10 seconds for real-time feel
)
return demo
if __name__ == "__main__":
demo = create_realtime_fleet_interface()
demo.launch(share=True, server_name="0.0.0.0", server_port=7860)