""" Fleet Analytics and Performance Tracking Module Comprehensive logging, metrics collection, and performance analysis """ import json import logging import pandas as pd import numpy as np from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple import plotly.graph_objs as go from plotly.subplots import make_subplots import plotly.express as px from dataclasses import dataclass, asdict import sqlite3 import os from pathlib import Path # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('fleet_analytics.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) @dataclass class PerformanceMetrics: """Performance metrics for fleet optimization""" timestamp: datetime total_earnings: float total_distance: float vehicle_utilization: float demand_satisfaction_rate: float average_response_time: float cost_efficiency: float ai_optimization_impact: float weather_impact: float traffic_impact: float api_call_success_rate: float total_api_calls: int successful_assignments: int failed_assignments: int @dataclass class VehicleAnalytics: """Individual vehicle performance analytics""" vehicle_id: int total_earnings: float total_distance: float total_trips: int average_trip_duration: float utilization_rate: float maintenance_count: int battery_efficiency: float last_updated: datetime @dataclass class DemandAnalytics: """Demand pattern analytics""" timestamp: datetime total_demands: int pending_demands: int completed_demands: int cancelled_demands: int average_wait_time: float priority_distribution: Dict[int, int] location_hotspots: Dict[str, int] class FleetAnalytics: """Comprehensive fleet analytics and performance tracking""" def __init__(self, db_path: str = "fleet_analytics.db"): self.db_path = db_path self.metrics_history = [] self.vehicle_analytics = {} self.demand_analytics = [] # Initialize database self._init_database() # Performance tracking self.start_time = datetime.now() self.session_metrics = { 'total_simulation_time': 0, 'total_optimization_cycles': 0, 'peak_vehicle_utilization': 0, 'best_cost_efficiency': float('inf'), 'total_revenue': 0, 'total_distance': 0 } def _init_database(self): """Initialize SQLite database for analytics storage""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Create tables cursor.execute(''' CREATE TABLE IF NOT EXISTS performance_metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT, total_earnings REAL, total_distance REAL, vehicle_utilization REAL, demand_satisfaction_rate REAL, average_response_time REAL, cost_efficiency REAL, ai_optimization_impact REAL, weather_impact REAL, traffic_impact REAL, api_call_success_rate REAL, total_api_calls INTEGER, successful_assignments INTEGER, failed_assignments INTEGER ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS vehicle_analytics ( id INTEGER PRIMARY KEY AUTOINCREMENT, vehicle_id INTEGER, timestamp TEXT, total_earnings REAL, total_distance REAL, total_trips INTEGER, average_trip_duration REAL, utilization_rate REAL, maintenance_count INTEGER, battery_efficiency REAL ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS demand_analytics ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT, total_demands INTEGER, pending_demands INTEGER, completed_demands INTEGER, cancelled_demands INTEGER, average_wait_time REAL, priority_distribution TEXT, location_hotspots TEXT ) ''') conn.commit() conn.close() logger.info("Analytics database initialized") def log_performance_metrics(self, metrics: PerformanceMetrics): """Log performance metrics to database and memory""" self.metrics_history.append(metrics) # Store in database conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO performance_metrics (timestamp, total_earnings, total_distance, vehicle_utilization, demand_satisfaction_rate, average_response_time, cost_efficiency, ai_optimization_impact, weather_impact, traffic_impact, api_call_success_rate, total_api_calls, successful_assignments, failed_assignments) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( metrics.timestamp.isoformat(), metrics.total_earnings, metrics.total_distance, metrics.vehicle_utilization, metrics.demand_satisfaction_rate, metrics.average_response_time, metrics.cost_efficiency, metrics.ai_optimization_impact, metrics.weather_impact, metrics.traffic_impact, metrics.api_call_success_rate, metrics.total_api_calls, metrics.successful_assignments, metrics.failed_assignments )) conn.commit() conn.close() # Update session metrics self.session_metrics['total_revenue'] = max(self.session_metrics['total_revenue'], metrics.total_earnings) self.session_metrics['total_distance'] = max(self.session_metrics['total_distance'], metrics.total_distance) self.session_metrics['peak_vehicle_utilization'] = max( self.session_metrics['peak_vehicle_utilization'], metrics.vehicle_utilization ) if metrics.cost_efficiency < self.session_metrics['best_cost_efficiency']: self.session_metrics['best_cost_efficiency'] = metrics.cost_efficiency logger.info(f"Performance metrics logged: Utilization {metrics.vehicle_utilization:.1f}%, Revenue ${metrics.total_earnings:.2f}") def log_vehicle_analytics(self, vehicle_id: int, analytics: VehicleAnalytics): """Log individual vehicle analytics""" self.vehicle_analytics[vehicle_id] = analytics # Store in database conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO vehicle_analytics (vehicle_id, timestamp, total_earnings, total_distance, total_trips, average_trip_duration, utilization_rate, maintenance_count, battery_efficiency) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( vehicle_id, analytics.last_updated.isoformat(), analytics.total_earnings, analytics.total_distance, analytics.total_trips, analytics.average_trip_duration, analytics.utilization_rate, analytics.maintenance_count, analytics.battery_efficiency )) conn.commit() conn.close() def log_demand_analytics(self, analytics: DemandAnalytics): """Log demand pattern analytics""" self.demand_analytics.append(analytics) # Store in database conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO demand_analytics (timestamp, total_demands, pending_demands, completed_demands, cancelled_demands, average_wait_time, priority_distribution, location_hotspots) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', ( analytics.timestamp.isoformat(), analytics.total_demands, analytics.pending_demands, analytics.completed_demands, analytics.cancelled_demands, analytics.average_wait_time, json.dumps(analytics.priority_distribution), json.dumps(analytics.location_hotspots) )) conn.commit() conn.close() def calculate_performance_metrics(self, vehicles: List, demands: List, performance_stats: Dict, weather_data: Dict, traffic_data: Dict) -> PerformanceMetrics: """Calculate comprehensive performance metrics""" current_time = datetime.now() # Basic metrics total_earnings = sum(v.earnings for v in vehicles) total_distance = sum(v.total_distance for v in vehicles) busy_vehicles = len([v for v in vehicles if v.status == 'busy']) vehicle_utilization = (busy_vehicles / len(vehicles)) * 100 if vehicles else 0 # Demand metrics completed_demands = len([d for d in demands if d.status == 'completed']) total_demands = len(demands) demand_satisfaction_rate = (completed_demands / total_demands) * 100 if total_demands > 0 else 0 # Response time (simplified calculation) pending_demands = [d for d in demands if d.status == 'pending'] if pending_demands: current_time = datetime.now() wait_times = [(current_time - d.timestamp).total_seconds() / 60 for d in pending_demands] average_response_time = np.mean(wait_times) if wait_times else 0 else: average_response_time = 0 # Cost efficiency cost_efficiency = total_earnings / total_distance if total_distance > 0 else 0 # AI optimization impact (simplified) ai_optimization_impact = performance_stats.get('ai_suggestions_generated', 0) * 0.1 # Weather and traffic impact weather_impact = len(weather_data) * 0.05 # Simplified impact calculation traffic_impact = len(traffic_data) * 0.03 # API success rate total_api_calls = performance_stats.get('total_api_calls', 0) successful_assignments = performance_stats.get('successful_assignments', 0) failed_assignments = performance_stats.get('failed_assignments', 0) total_assignments = successful_assignments + failed_assignments api_call_success_rate = (successful_assignments / total_assignments) * 100 if total_assignments > 0 else 100 return PerformanceMetrics( timestamp=current_time, total_earnings=total_earnings, total_distance=total_distance, vehicle_utilization=vehicle_utilization, demand_satisfaction_rate=demand_satisfaction_rate, average_response_time=average_response_time, cost_efficiency=cost_efficiency, ai_optimization_impact=ai_optimization_impact, weather_impact=weather_impact, traffic_impact=traffic_impact, api_call_success_rate=api_call_success_rate, total_api_calls=total_api_calls, successful_assignments=successful_assignments, failed_assignments=failed_assignments ) def calculate_vehicle_analytics(self, vehicle) -> VehicleAnalytics: """Calculate analytics for individual vehicle""" # Calculate utilization rate (simplified) simulation_duration = (datetime.now() - self.start_time).total_seconds() / 3600 # hours busy_time = vehicle.total_distance / 30 # Assume 30 km/h average speed utilization_rate = (busy_time / simulation_duration) * 100 if simulation_duration > 0 else 0 # Estimate trip count based on earnings avg_trip_earnings = 15 # Estimated average trip earnings total_trips = int(vehicle.earnings / avg_trip_earnings) if avg_trip_earnings > 0 else 0 # Calculate average trip duration average_trip_duration = vehicle.total_distance / max(total_trips, 1) / 30 * 60 # minutes # Battery efficiency (simplified) battery_efficiency = vehicle.battery_level if hasattr(vehicle, 'battery_level') else 100 return VehicleAnalytics( vehicle_id=vehicle.id, total_earnings=vehicle.earnings, total_distance=vehicle.total_distance, total_trips=total_trips, average_trip_duration=average_trip_duration, utilization_rate=utilization_rate, maintenance_count=1 if hasattr(vehicle, 'maintenance_due') and vehicle.maintenance_due else 0, battery_efficiency=battery_efficiency, last_updated=datetime.now() ) def calculate_demand_analytics(self, demands: List) -> DemandAnalytics: """Calculate demand pattern analytics""" current_time = datetime.now() # Basic demand counts total_demands = len(demands) pending_demands = len([d for d in demands if d.status == 'pending']) completed_demands = len([d for d in demands if d.status == 'completed']) cancelled_demands = len([d for d in demands if d.status == 'cancelled']) # Average wait time pending_demands_list = [d for d in demands if d.status == 'pending'] if pending_demands_list: wait_times = [(current_time - d.timestamp).total_seconds() / 60 for d in pending_demands_list] average_wait_time = np.mean(wait_times) else: average_wait_time = 0 # Priority distribution priority_distribution = {} for demand in demands: priority = demand.priority priority_distribution[priority] = priority_distribution.get(priority, 0) + 1 # Location hotspots (simplified) location_hotspots = {} for demand in demands: location_key = f"{demand.pickup_location[0]:.3f},{demand.pickup_location[1]:.3f}" location_hotspots[location_key] = location_hotspots.get(location_key, 0) + 1 return DemandAnalytics( timestamp=current_time, total_demands=total_demands, pending_demands=pending_demands, completed_demands=completed_demands, cancelled_demands=cancelled_demands, average_wait_time=average_wait_time, priority_distribution=priority_distribution, location_hotspots=location_hotspots ) def create_performance_dashboard(self) -> go.Figure: """Create comprehensive performance dashboard""" if not self.metrics_history: # Create empty dashboard fig = go.Figure() fig.add_annotation( text="No performance data available yet", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=20) ) return fig # Convert metrics to DataFrame df = pd.DataFrame([asdict(metric) for metric in self.metrics_history]) df['timestamp'] = pd.to_datetime(df['timestamp']) # Create subplots fig = make_subplots( rows=3, cols=2, subplot_titles=( 'Vehicle Utilization Over Time', 'Revenue vs Distance', 'Demand Satisfaction Rate', 'Cost Efficiency', 'API Performance', 'AI Optimization Impact' ), specs=[[{"secondary_y": False}, {"secondary_y": False}], [{"secondary_y": False}, {"secondary_y": False}], [{"secondary_y": False}, {"secondary_y": False}]] ) # Vehicle utilization fig.add_trace( go.Scatter(x=df['timestamp'], y=df['vehicle_utilization'], mode='lines+markers', name='Utilization %'), row=1, col=1 ) # Revenue vs Distance fig.add_trace( go.Scatter(x=df['total_distance'], y=df['total_earnings'], mode='markers', name='Revenue vs Distance'), row=1, col=2 ) # Demand satisfaction fig.add_trace( go.Scatter(x=df['timestamp'], y=df['demand_satisfaction_rate'], mode='lines+markers', name='Satisfaction %'), row=2, col=1 ) # Cost efficiency fig.add_trace( go.Scatter(x=df['timestamp'], y=df['cost_efficiency'], mode='lines+markers', name='Cost Efficiency'), row=2, col=2 ) # API performance fig.add_trace( go.Scatter(x=df['timestamp'], y=df['api_call_success_rate'], mode='lines+markers', name='API Success %'), row=3, col=1 ) # AI optimization impact fig.add_trace( go.Scatter(x=df['timestamp'], y=df['ai_optimization_impact'], mode='lines+markers', name='AI Impact'), row=3, col=2 ) fig.update_layout( title='Fleet Performance Analytics Dashboard', height=800, showlegend=False ) return fig def create_vehicle_analytics_dashboard(self) -> go.Figure: """Create vehicle-specific analytics dashboard""" if not self.vehicle_analytics: fig = go.Figure() fig.add_annotation( text="No vehicle analytics data available yet", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=20) ) return fig # Convert to DataFrame df = pd.DataFrame([asdict(analytics) for analytics in self.vehicle_analytics.values()]) # Create subplots fig = make_subplots( rows=2, cols=2, subplot_titles=( 'Vehicle Earnings Distribution', 'Vehicle Distance vs Trips', 'Vehicle Utilization Rates', 'Battery Efficiency' ) ) # Earnings distribution fig.add_trace( go.Bar(x=df['vehicle_id'], y=df['total_earnings'], name='Earnings'), row=1, col=1 ) # Distance vs Trips fig.add_trace( go.Scatter(x=df['total_trips'], y=df['total_distance'], mode='markers', name='Distance vs Trips'), row=1, col=2 ) # Utilization rates fig.add_trace( go.Bar(x=df['vehicle_id'], y=df['utilization_rate'], name='Utilization %'), row=2, col=1 ) # Battery efficiency fig.add_trace( go.Bar(x=df['vehicle_id'], y=df['battery_efficiency'], name='Battery %'), row=2, col=2 ) fig.update_layout( title='Vehicle Analytics Dashboard', height=600, showlegend=False ) return fig def get_session_summary(self) -> Dict: """Get comprehensive session summary""" current_time = datetime.now() session_duration = (current_time - self.start_time).total_seconds() / 3600 # hours if not self.metrics_history: return { 'session_duration_hours': session_duration, 'status': 'No data collected yet' } latest_metrics = self.metrics_history[-1] return { 'session_duration_hours': session_duration, 'total_optimization_cycles': len(self.metrics_history), 'current_vehicle_utilization': latest_metrics.vehicle_utilization, 'current_demand_satisfaction': latest_metrics.demand_satisfaction_rate, 'total_revenue': latest_metrics.total_earnings, 'total_distance': latest_metrics.total_distance, 'average_cost_efficiency': latest_metrics.cost_efficiency, 'peak_vehicle_utilization': self.session_metrics['peak_vehicle_utilization'], 'best_cost_efficiency': self.session_metrics['best_cost_efficiency'], 'total_api_calls': latest_metrics.total_api_calls, 'successful_assignments': latest_metrics.successful_assignments, 'ai_suggestions_generated': latest_metrics.ai_optimization_impact * 10, # Convert back 'performance_trend': 'improving' if len(self.metrics_history) > 1 and self.metrics_history[-1].vehicle_utilization > self.metrics_history[-2].vehicle_utilization else 'stable' } def export_analytics_data(self, filepath: str = None): """Export all analytics data to JSON file""" if filepath is None: filepath = f"fleet_analytics_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" export_data = { 'session_summary': self.get_session_summary(), 'performance_metrics': [asdict(metric) for metric in self.metrics_history], 'vehicle_analytics': {str(k): asdict(v) for k, v in self.vehicle_analytics.items()}, 'demand_analytics': [asdict(analytics) for analytics in self.demand_analytics], 'export_timestamp': datetime.now().isoformat() } with open(filepath, 'w') as f: json.dump(export_data, f, indent=2, default=str) logger.info(f"Analytics data exported to {filepath}") return filepath # Global analytics instance fleet_analytics = FleetAnalytics() def get_analytics_summary(): """Get analytics summary for dashboard""" return json.dumps(fleet_analytics.get_session_summary(), indent=2, default=str) def create_analytics_dashboard(): """Create analytics dashboard""" return fleet_analytics.create_performance_dashboard() def create_vehicle_dashboard(): """Create vehicle analytics dashboard""" return fleet_analytics.create_vehicle_analytics_dashboard() def export_analytics(): """Export analytics data""" filepath = fleet_analytics.export_analytics_data() return f"Analytics data exported to {filepath}" if __name__ == "__main__": # Test analytics test_metrics = PerformanceMetrics( timestamp=datetime.now(), total_earnings=1000.0, total_distance=500.0, vehicle_utilization=75.0, demand_satisfaction_rate=90.0, average_response_time=5.0, cost_efficiency=2.0, ai_optimization_impact=0.5, weather_impact=0.1, traffic_impact=0.2, api_call_success_rate=95.0, total_api_calls=100, successful_assignments=95, failed_assignments=5 ) fleet_analytics.log_performance_metrics(test_metrics) print("Analytics test completed successfully")