DataSprint / fleet_analytics.py
sujana05's picture
Upload folder using huggingface_hub
b4ce589 verified
"""
Fleet Analytics and Performance Tracking Module
Comprehensive logging, metrics collection, and performance analysis
"""
import json
import logging
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import plotly.graph_objs as go
from plotly.subplots import make_subplots
import plotly.express as px
from dataclasses import dataclass, asdict
import sqlite3
import os
from pathlib import Path
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('fleet_analytics.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
@dataclass
class PerformanceMetrics:
"""Performance metrics for fleet optimization"""
timestamp: datetime
total_earnings: float
total_distance: float
vehicle_utilization: float
demand_satisfaction_rate: float
average_response_time: float
cost_efficiency: float
ai_optimization_impact: float
weather_impact: float
traffic_impact: float
api_call_success_rate: float
total_api_calls: int
successful_assignments: int
failed_assignments: int
@dataclass
class VehicleAnalytics:
"""Individual vehicle performance analytics"""
vehicle_id: int
total_earnings: float
total_distance: float
total_trips: int
average_trip_duration: float
utilization_rate: float
maintenance_count: int
battery_efficiency: float
last_updated: datetime
@dataclass
class DemandAnalytics:
"""Demand pattern analytics"""
timestamp: datetime
total_demands: int
pending_demands: int
completed_demands: int
cancelled_demands: int
average_wait_time: float
priority_distribution: Dict[int, int]
location_hotspots: Dict[str, int]
class FleetAnalytics:
"""Comprehensive fleet analytics and performance tracking"""
def __init__(self, db_path: str = "fleet_analytics.db"):
self.db_path = db_path
self.metrics_history = []
self.vehicle_analytics = {}
self.demand_analytics = []
# Initialize database
self._init_database()
# Performance tracking
self.start_time = datetime.now()
self.session_metrics = {
'total_simulation_time': 0,
'total_optimization_cycles': 0,
'peak_vehicle_utilization': 0,
'best_cost_efficiency': float('inf'),
'total_revenue': 0,
'total_distance': 0
}
def _init_database(self):
"""Initialize SQLite database for analytics storage"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create tables
cursor.execute('''
CREATE TABLE IF NOT EXISTS performance_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
total_earnings REAL,
total_distance REAL,
vehicle_utilization REAL,
demand_satisfaction_rate REAL,
average_response_time REAL,
cost_efficiency REAL,
ai_optimization_impact REAL,
weather_impact REAL,
traffic_impact REAL,
api_call_success_rate REAL,
total_api_calls INTEGER,
successful_assignments INTEGER,
failed_assignments INTEGER
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS vehicle_analytics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
vehicle_id INTEGER,
timestamp TEXT,
total_earnings REAL,
total_distance REAL,
total_trips INTEGER,
average_trip_duration REAL,
utilization_rate REAL,
maintenance_count INTEGER,
battery_efficiency REAL
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS demand_analytics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
total_demands INTEGER,
pending_demands INTEGER,
completed_demands INTEGER,
cancelled_demands INTEGER,
average_wait_time REAL,
priority_distribution TEXT,
location_hotspots TEXT
)
''')
conn.commit()
conn.close()
logger.info("Analytics database initialized")
def log_performance_metrics(self, metrics: PerformanceMetrics):
"""Log performance metrics to database and memory"""
self.metrics_history.append(metrics)
# Store in database
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO performance_metrics
(timestamp, total_earnings, total_distance, vehicle_utilization,
demand_satisfaction_rate, average_response_time, cost_efficiency,
ai_optimization_impact, weather_impact, traffic_impact,
api_call_success_rate, total_api_calls, successful_assignments, failed_assignments)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
metrics.timestamp.isoformat(),
metrics.total_earnings,
metrics.total_distance,
metrics.vehicle_utilization,
metrics.demand_satisfaction_rate,
metrics.average_response_time,
metrics.cost_efficiency,
metrics.ai_optimization_impact,
metrics.weather_impact,
metrics.traffic_impact,
metrics.api_call_success_rate,
metrics.total_api_calls,
metrics.successful_assignments,
metrics.failed_assignments
))
conn.commit()
conn.close()
# Update session metrics
self.session_metrics['total_revenue'] = max(self.session_metrics['total_revenue'], metrics.total_earnings)
self.session_metrics['total_distance'] = max(self.session_metrics['total_distance'], metrics.total_distance)
self.session_metrics['peak_vehicle_utilization'] = max(
self.session_metrics['peak_vehicle_utilization'], metrics.vehicle_utilization
)
if metrics.cost_efficiency < self.session_metrics['best_cost_efficiency']:
self.session_metrics['best_cost_efficiency'] = metrics.cost_efficiency
logger.info(f"Performance metrics logged: Utilization {metrics.vehicle_utilization:.1f}%, Revenue ${metrics.total_earnings:.2f}")
def log_vehicle_analytics(self, vehicle_id: int, analytics: VehicleAnalytics):
"""Log individual vehicle analytics"""
self.vehicle_analytics[vehicle_id] = analytics
# Store in database
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO vehicle_analytics
(vehicle_id, timestamp, total_earnings, total_distance, total_trips,
average_trip_duration, utilization_rate, maintenance_count, battery_efficiency)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
vehicle_id,
analytics.last_updated.isoformat(),
analytics.total_earnings,
analytics.total_distance,
analytics.total_trips,
analytics.average_trip_duration,
analytics.utilization_rate,
analytics.maintenance_count,
analytics.battery_efficiency
))
conn.commit()
conn.close()
def log_demand_analytics(self, analytics: DemandAnalytics):
"""Log demand pattern analytics"""
self.demand_analytics.append(analytics)
# Store in database
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO demand_analytics
(timestamp, total_demands, pending_demands, completed_demands, cancelled_demands,
average_wait_time, priority_distribution, location_hotspots)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
analytics.timestamp.isoformat(),
analytics.total_demands,
analytics.pending_demands,
analytics.completed_demands,
analytics.cancelled_demands,
analytics.average_wait_time,
json.dumps(analytics.priority_distribution),
json.dumps(analytics.location_hotspots)
))
conn.commit()
conn.close()
def calculate_performance_metrics(self, vehicles: List, demands: List,
performance_stats: Dict, weather_data: Dict,
traffic_data: Dict) -> PerformanceMetrics:
"""Calculate comprehensive performance metrics"""
current_time = datetime.now()
# Basic metrics
total_earnings = sum(v.earnings for v in vehicles)
total_distance = sum(v.total_distance for v in vehicles)
busy_vehicles = len([v for v in vehicles if v.status == 'busy'])
vehicle_utilization = (busy_vehicles / len(vehicles)) * 100 if vehicles else 0
# Demand metrics
completed_demands = len([d for d in demands if d.status == 'completed'])
total_demands = len(demands)
demand_satisfaction_rate = (completed_demands / total_demands) * 100 if total_demands > 0 else 0
# Response time (simplified calculation)
pending_demands = [d for d in demands if d.status == 'pending']
if pending_demands:
current_time = datetime.now()
wait_times = [(current_time - d.timestamp).total_seconds() / 60 for d in pending_demands]
average_response_time = np.mean(wait_times) if wait_times else 0
else:
average_response_time = 0
# Cost efficiency
cost_efficiency = total_earnings / total_distance if total_distance > 0 else 0
# AI optimization impact (simplified)
ai_optimization_impact = performance_stats.get('ai_suggestions_generated', 0) * 0.1
# Weather and traffic impact
weather_impact = len(weather_data) * 0.05 # Simplified impact calculation
traffic_impact = len(traffic_data) * 0.03
# API success rate
total_api_calls = performance_stats.get('total_api_calls', 0)
successful_assignments = performance_stats.get('successful_assignments', 0)
failed_assignments = performance_stats.get('failed_assignments', 0)
total_assignments = successful_assignments + failed_assignments
api_call_success_rate = (successful_assignments / total_assignments) * 100 if total_assignments > 0 else 100
return PerformanceMetrics(
timestamp=current_time,
total_earnings=total_earnings,
total_distance=total_distance,
vehicle_utilization=vehicle_utilization,
demand_satisfaction_rate=demand_satisfaction_rate,
average_response_time=average_response_time,
cost_efficiency=cost_efficiency,
ai_optimization_impact=ai_optimization_impact,
weather_impact=weather_impact,
traffic_impact=traffic_impact,
api_call_success_rate=api_call_success_rate,
total_api_calls=total_api_calls,
successful_assignments=successful_assignments,
failed_assignments=failed_assignments
)
def calculate_vehicle_analytics(self, vehicle) -> VehicleAnalytics:
"""Calculate analytics for individual vehicle"""
# Calculate utilization rate (simplified)
simulation_duration = (datetime.now() - self.start_time).total_seconds() / 3600 # hours
busy_time = vehicle.total_distance / 30 # Assume 30 km/h average speed
utilization_rate = (busy_time / simulation_duration) * 100 if simulation_duration > 0 else 0
# Estimate trip count based on earnings
avg_trip_earnings = 15 # Estimated average trip earnings
total_trips = int(vehicle.earnings / avg_trip_earnings) if avg_trip_earnings > 0 else 0
# Calculate average trip duration
average_trip_duration = vehicle.total_distance / max(total_trips, 1) / 30 * 60 # minutes
# Battery efficiency (simplified)
battery_efficiency = vehicle.battery_level if hasattr(vehicle, 'battery_level') else 100
return VehicleAnalytics(
vehicle_id=vehicle.id,
total_earnings=vehicle.earnings,
total_distance=vehicle.total_distance,
total_trips=total_trips,
average_trip_duration=average_trip_duration,
utilization_rate=utilization_rate,
maintenance_count=1 if hasattr(vehicle, 'maintenance_due') and vehicle.maintenance_due else 0,
battery_efficiency=battery_efficiency,
last_updated=datetime.now()
)
def calculate_demand_analytics(self, demands: List) -> DemandAnalytics:
"""Calculate demand pattern analytics"""
current_time = datetime.now()
# Basic demand counts
total_demands = len(demands)
pending_demands = len([d for d in demands if d.status == 'pending'])
completed_demands = len([d for d in demands if d.status == 'completed'])
cancelled_demands = len([d for d in demands if d.status == 'cancelled'])
# Average wait time
pending_demands_list = [d for d in demands if d.status == 'pending']
if pending_demands_list:
wait_times = [(current_time - d.timestamp).total_seconds() / 60 for d in pending_demands_list]
average_wait_time = np.mean(wait_times)
else:
average_wait_time = 0
# Priority distribution
priority_distribution = {}
for demand in demands:
priority = demand.priority
priority_distribution[priority] = priority_distribution.get(priority, 0) + 1
# Location hotspots (simplified)
location_hotspots = {}
for demand in demands:
location_key = f"{demand.pickup_location[0]:.3f},{demand.pickup_location[1]:.3f}"
location_hotspots[location_key] = location_hotspots.get(location_key, 0) + 1
return DemandAnalytics(
timestamp=current_time,
total_demands=total_demands,
pending_demands=pending_demands,
completed_demands=completed_demands,
cancelled_demands=cancelled_demands,
average_wait_time=average_wait_time,
priority_distribution=priority_distribution,
location_hotspots=location_hotspots
)
def create_performance_dashboard(self) -> go.Figure:
"""Create comprehensive performance dashboard"""
if not self.metrics_history:
# Create empty dashboard
fig = go.Figure()
fig.add_annotation(
text="No performance data available yet",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=20)
)
return fig
# Convert metrics to DataFrame
df = pd.DataFrame([asdict(metric) for metric in self.metrics_history])
df['timestamp'] = pd.to_datetime(df['timestamp'])
# Create subplots
fig = make_subplots(
rows=3, cols=2,
subplot_titles=(
'Vehicle Utilization Over Time',
'Revenue vs Distance',
'Demand Satisfaction Rate',
'Cost Efficiency',
'API Performance',
'AI Optimization Impact'
),
specs=[[{"secondary_y": False}, {"secondary_y": False}],
[{"secondary_y": False}, {"secondary_y": False}],
[{"secondary_y": False}, {"secondary_y": False}]]
)
# Vehicle utilization
fig.add_trace(
go.Scatter(x=df['timestamp'], y=df['vehicle_utilization'],
mode='lines+markers', name='Utilization %'),
row=1, col=1
)
# Revenue vs Distance
fig.add_trace(
go.Scatter(x=df['total_distance'], y=df['total_earnings'],
mode='markers', name='Revenue vs Distance'),
row=1, col=2
)
# Demand satisfaction
fig.add_trace(
go.Scatter(x=df['timestamp'], y=df['demand_satisfaction_rate'],
mode='lines+markers', name='Satisfaction %'),
row=2, col=1
)
# Cost efficiency
fig.add_trace(
go.Scatter(x=df['timestamp'], y=df['cost_efficiency'],
mode='lines+markers', name='Cost Efficiency'),
row=2, col=2
)
# API performance
fig.add_trace(
go.Scatter(x=df['timestamp'], y=df['api_call_success_rate'],
mode='lines+markers', name='API Success %'),
row=3, col=1
)
# AI optimization impact
fig.add_trace(
go.Scatter(x=df['timestamp'], y=df['ai_optimization_impact'],
mode='lines+markers', name='AI Impact'),
row=3, col=2
)
fig.update_layout(
title='Fleet Performance Analytics Dashboard',
height=800,
showlegend=False
)
return fig
def create_vehicle_analytics_dashboard(self) -> go.Figure:
"""Create vehicle-specific analytics dashboard"""
if not self.vehicle_analytics:
fig = go.Figure()
fig.add_annotation(
text="No vehicle analytics data available yet",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=20)
)
return fig
# Convert to DataFrame
df = pd.DataFrame([asdict(analytics) for analytics in self.vehicle_analytics.values()])
# Create subplots
fig = make_subplots(
rows=2, cols=2,
subplot_titles=(
'Vehicle Earnings Distribution',
'Vehicle Distance vs Trips',
'Vehicle Utilization Rates',
'Battery Efficiency'
)
)
# Earnings distribution
fig.add_trace(
go.Bar(x=df['vehicle_id'], y=df['total_earnings'], name='Earnings'),
row=1, col=1
)
# Distance vs Trips
fig.add_trace(
go.Scatter(x=df['total_trips'], y=df['total_distance'],
mode='markers', name='Distance vs Trips'),
row=1, col=2
)
# Utilization rates
fig.add_trace(
go.Bar(x=df['vehicle_id'], y=df['utilization_rate'], name='Utilization %'),
row=2, col=1
)
# Battery efficiency
fig.add_trace(
go.Bar(x=df['vehicle_id'], y=df['battery_efficiency'], name='Battery %'),
row=2, col=2
)
fig.update_layout(
title='Vehicle Analytics Dashboard',
height=600,
showlegend=False
)
return fig
def get_session_summary(self) -> Dict:
"""Get comprehensive session summary"""
current_time = datetime.now()
session_duration = (current_time - self.start_time).total_seconds() / 3600 # hours
if not self.metrics_history:
return {
'session_duration_hours': session_duration,
'status': 'No data collected yet'
}
latest_metrics = self.metrics_history[-1]
return {
'session_duration_hours': session_duration,
'total_optimization_cycles': len(self.metrics_history),
'current_vehicle_utilization': latest_metrics.vehicle_utilization,
'current_demand_satisfaction': latest_metrics.demand_satisfaction_rate,
'total_revenue': latest_metrics.total_earnings,
'total_distance': latest_metrics.total_distance,
'average_cost_efficiency': latest_metrics.cost_efficiency,
'peak_vehicle_utilization': self.session_metrics['peak_vehicle_utilization'],
'best_cost_efficiency': self.session_metrics['best_cost_efficiency'],
'total_api_calls': latest_metrics.total_api_calls,
'successful_assignments': latest_metrics.successful_assignments,
'ai_suggestions_generated': latest_metrics.ai_optimization_impact * 10, # Convert back
'performance_trend': 'improving' if len(self.metrics_history) > 1 and
self.metrics_history[-1].vehicle_utilization > self.metrics_history[-2].vehicle_utilization
else 'stable'
}
def export_analytics_data(self, filepath: str = None):
"""Export all analytics data to JSON file"""
if filepath is None:
filepath = f"fleet_analytics_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
export_data = {
'session_summary': self.get_session_summary(),
'performance_metrics': [asdict(metric) for metric in self.metrics_history],
'vehicle_analytics': {str(k): asdict(v) for k, v in self.vehicle_analytics.items()},
'demand_analytics': [asdict(analytics) for analytics in self.demand_analytics],
'export_timestamp': datetime.now().isoformat()
}
with open(filepath, 'w') as f:
json.dump(export_data, f, indent=2, default=str)
logger.info(f"Analytics data exported to {filepath}")
return filepath
# Global analytics instance
fleet_analytics = FleetAnalytics()
def get_analytics_summary():
"""Get analytics summary for dashboard"""
return json.dumps(fleet_analytics.get_session_summary(), indent=2, default=str)
def create_analytics_dashboard():
"""Create analytics dashboard"""
return fleet_analytics.create_performance_dashboard()
def create_vehicle_dashboard():
"""Create vehicle analytics dashboard"""
return fleet_analytics.create_vehicle_analytics_dashboard()
def export_analytics():
"""Export analytics data"""
filepath = fleet_analytics.export_analytics_data()
return f"Analytics data exported to {filepath}"
if __name__ == "__main__":
# Test analytics
test_metrics = PerformanceMetrics(
timestamp=datetime.now(),
total_earnings=1000.0,
total_distance=500.0,
vehicle_utilization=75.0,
demand_satisfaction_rate=90.0,
average_response_time=5.0,
cost_efficiency=2.0,
ai_optimization_impact=0.5,
weather_impact=0.1,
traffic_impact=0.2,
api_call_success_rate=95.0,
total_api_calls=100,
successful_assignments=95,
failed_assignments=5
)
fleet_analytics.log_performance_metrics(test_metrics)
print("Analytics test completed successfully")