from datetime import datetime from typing import Dict, List import logging class LegalRAGMonitor: """Monitoring and error tracking for the legal RAG system""" def __init__(self): self.error_log = [] self.performance_metrics = { "query_times": [], "routing_accuracy": [], "retrieval_success_rate": 0 } self.alerts = [] def log_error(self, error_type: str, message: str, context: Dict = None): """Log errors for analysis""" error_entry = { "timestamp": datetime.now(), "type": error_type, "message": message, "context": context or {} } self.error_log.append(error_entry) logging.error(f"[{error_type}] {message}") # Check for alert conditions self._check_alerts(error_type, error_entry) def track_query_performance(self, query_time: float, success: bool): """Track query performance metrics""" self.performance_metrics["query_times"].append(query_time) # Update success rate current_rate = self.performance_metrics["retrieval_success_rate"] total_queries = len(self.performance_metrics["query_times"]) if success: self.performance_metrics["retrieval_success_rate"] = ( (current_rate * (total_queries - 1) + 1) / total_queries ) def get_health_report(self) -> Dict: """Generate system health report""" query_times = self.performance_metrics["query_times"] return { "error_count": len(self.error_log), "recent_errors": self.error_log[-5:], "avg_query_time": sum(query_times) / len(query_times) if query_times else 0, "success_rate": self.performance_metrics["retrieval_success_rate"], "total_queries": len(query_times), "active_alerts": len(self.alerts) } def _check_alerts(self, error_type: str, error_entry: Dict): """Check if error should trigger an alert""" # Example alert conditions if error_type == "database_connection": self.alerts.append({ "type": "critical", "message": "Database connection failure", "timestamp": datetime.now(), "error": error_entry }) # Clean old alerts (keep only last 24 hours) cutoff_time = datetime.now().timestamp() - (24 * 3600) self.alerts = [ alert for alert in self.alerts if alert["timestamp"].timestamp() > cutoff_time ] class AlertManager: """Manage system alerts and notifications""" def __init__(self): self.alerts = [] self.subscribers = [] def add_alert(self, alert_type: str, message: str, severity: str = "warning"): """Add a new alert""" alert = { "type": alert_type, "message": message, "severity": severity, "timestamp": datetime.now(), "acknowledged": False } self.alerts.append(alert) self._notify_subscribers(alert) def acknowledge_alert(self, alert_index: int): """Acknowledge an alert""" if 0 <= alert_index < len(self.alerts): self.alerts[alert_index]["acknowledged"] = True def subscribe(self, callback): """Subscribe to alert notifications""" self.subscribers.append(callback) def _notify_subscribers(self, alert): """Notify all subscribers of a new alert""" for subscriber in self.subscribers: try: subscriber(alert) except Exception as e: logging.error(f"Error notifying subscriber: {e}")