File size: 3,823 Bytes
fbdfc24
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from datetime import datetime
from typing import Dict, List
import logging

class LegalRAGMonitor:
    """Monitoring and error tracking for the legal RAG system"""
    
    def __init__(self):
        self.error_log = []
        self.performance_metrics = {
            "query_times": [],
            "routing_accuracy": [],
            "retrieval_success_rate": 0
        }
        self.alerts = []

    def log_error(self, error_type: str, message: str, context: Dict = None):
        """Log errors for analysis"""
        error_entry = {
            "timestamp": datetime.now(),
            "type": error_type,
            "message": message,
            "context": context or {}
        }
        self.error_log.append(error_entry)
        logging.error(f"[{error_type}] {message}")
        
        # Check for alert conditions
        self._check_alerts(error_type, error_entry)

    def track_query_performance(self, query_time: float, success: bool):
        """Track query performance metrics"""
        self.performance_metrics["query_times"].append(query_time)
        
        # Update success rate
        current_rate = self.performance_metrics["retrieval_success_rate"]
        total_queries = len(self.performance_metrics["query_times"])
        
        if success:
            self.performance_metrics["retrieval_success_rate"] = (
                (current_rate * (total_queries - 1) + 1) / total_queries
            )

    def get_health_report(self) -> Dict:
        """Generate system health report"""
        query_times = self.performance_metrics["query_times"]
        
        return {
            "error_count": len(self.error_log),
            "recent_errors": self.error_log[-5:],
            "avg_query_time": sum(query_times) / len(query_times) if query_times else 0,
            "success_rate": self.performance_metrics["retrieval_success_rate"],
            "total_queries": len(query_times),
            "active_alerts": len(self.alerts)
        }

    def _check_alerts(self, error_type: str, error_entry: Dict):
        """Check if error should trigger an alert"""
        # Example alert conditions
        if error_type == "database_connection":
            self.alerts.append({
                "type": "critical",
                "message": "Database connection failure",
                "timestamp": datetime.now(),
                "error": error_entry
            })
        
        # Clean old alerts (keep only last 24 hours)
        cutoff_time = datetime.now().timestamp() - (24 * 3600)
        self.alerts = [
            alert for alert in self.alerts 
            if alert["timestamp"].timestamp() > cutoff_time
        ]

class AlertManager:
    """Manage system alerts and notifications"""
    
    def __init__(self):
        self.alerts = []
        self.subscribers = []

    def add_alert(self, alert_type: str, message: str, severity: str = "warning"):
        """Add a new alert"""
        alert = {
            "type": alert_type,
            "message": message,
            "severity": severity,
            "timestamp": datetime.now(),
            "acknowledged": False
        }
        self.alerts.append(alert)
        self._notify_subscribers(alert)

    def acknowledge_alert(self, alert_index: int):
        """Acknowledge an alert"""
        if 0 <= alert_index < len(self.alerts):
            self.alerts[alert_index]["acknowledged"] = True

    def subscribe(self, callback):
        """Subscribe to alert notifications"""
        self.subscribers.append(callback)

    def _notify_subscribers(self, alert):
        """Notify all subscribers of a new alert"""
        for subscriber in self.subscribers:
            try:
                subscriber(alert)
            except Exception as e:
                logging.error(f"Error notifying subscriber: {e}")