""" Real-time monitoring and analytics dashboard """ from fastapi import APIRouter, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse import asyncio import json import time from typing import List, Dict, Any import logging from datetime import datetime, timedelta import psutil import torch logger = logging.getLogger(__name__) router = APIRouter() class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] self.stats = { 'total_predictions': 0, 'hallucinations_detected': 0, 'average_confidence': 0.0, 'average_response_time': 0.0, 'method_usage': {}, 'hourly_stats': [], 'error_count': 0 } self.recent_predictions = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) # Send current stats immediately await self.send_stats_update() def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def send_personal_message(self, message: str, websocket: WebSocket): await websocket.send_text(message) async def broadcast(self, message: dict): for connection in self.active_connections: try: await connection.send_text(json.dumps(message)) except Exception as e: logger.error(f"Failed to send message to websocket: {e}") def update_stats(self, prediction_result: Dict[str, Any], response_time: float): """Update statistics with new prediction result""" try: self.stats['total_predictions'] += 1 if prediction_result.get('is_hallucination', False): self.stats['hallucinations_detected'] += 1 # Update running averages confidence = prediction_result.get('confidence', 0.5) total = self.stats['total_predictions'] self.stats['average_confidence'] = ( (self.stats['average_confidence'] * (total - 1) + confidence) / total ) self.stats['average_response_time'] = ( (self.stats['average_response_time'] * (total - 1) + response_time) / total ) # Track method usage method = prediction_result.get('method', 'unknown') self.stats['method_usage'][method] = self.stats['method_usage'].get(method, 0) + 1 # Store recent prediction (keep last 100) self.recent_predictions.append({ 'timestamp': datetime.now().isoformat(), 'is_hallucination': prediction_result.get('is_hallucination', False), 'confidence': confidence, 'method': method, 'response_time': response_time }) if len(self.recent_predictions) > 100: self.recent_predictions.pop(0) # Update hourly stats self._update_hourly_stats() except Exception as e: logger.error(f"Failed to update stats: {e}") self.stats['error_count'] += 1 def _update_hourly_stats(self): """Update hourly statistics""" current_hour = datetime.now().replace(minute=0, second=0, microsecond=0) # Find or create current hour entry current_hour_entry = None for entry in self.stats['hourly_stats']: if entry['hour'] == current_hour.isoformat(): current_hour_entry = entry break if not current_hour_entry: current_hour_entry = { 'hour': current_hour.isoformat(), 'predictions': 0, 'hallucinations': 0, 'avg_confidence': 0.0, 'avg_response_time': 0.0 } self.stats['hourly_stats'].append(current_hour_entry) # Update current hour current_hour_entry['predictions'] += 1 # Keep only last 24 hours cutoff_time = current_hour - timedelta(hours=24) self.stats['hourly_stats'] = [ entry for entry in self.stats['hourly_stats'] if datetime.fromisoformat(entry['hour']) >= cutoff_time ] async def send_stats_update(self): """Send current statistics to all connected clients""" try: # Get system stats system_stats = self._get_system_stats() message = { 'type': 'stats_update', 'data': { **self.stats, 'system': system_stats, 'recent_predictions': self.recent_predictions[-10:], # Last 10 'timestamp': datetime.now().isoformat() } } await self.broadcast(message) except Exception as e: logger.error(f"Failed to send stats update: {e}") def _get_system_stats(self) -> Dict[str, Any]: """Get system performance statistics""" try: return { 'cpu_percent': psutil.cpu_percent(), 'memory_percent': psutil.virtual_memory().percent, 'gpu_memory_used': torch.cuda.memory_allocated() / 1024**3 if torch.cuda.is_available() else 0, 'gpu_memory_total': torch.cuda.get_device_properties(0).total_memory / 1024**3 if torch.cuda.is_available() else 0 } except Exception as e: logger.error(f"Failed to get system stats: {e}") return {} # Global connection manager manager = ConnectionManager() @router.websocket("/ws/monitor") async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: while True: # Send periodic updates every 5 seconds await asyncio.sleep(5) await manager.send_stats_update() except WebSocketDisconnect: manager.disconnect(websocket) @router.get("/monitor", response_class=HTMLResponse) async def get_monitor_dashboard(): """Serve the monitoring dashboard""" html_content = """