Spaces:
Sleeping
Sleeping
| """ | |
| Evaluation Tracking and Monitoring System | |
| Provides continuous evaluation tracking, trend analysis, and performance monitoring | |
| for the RAG system with automated alerts and quality regression detection. | |
| """ | |
| import json | |
| import os | |
| import statistics | |
| import time | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| class EvaluationTracker: | |
| """Track evaluation results over time and detect performance trends.""" | |
| def __init__(self, tracking_dir: str = "evaluation_tracking"): | |
| """Initialize evaluation tracker.""" | |
| self.tracking_dir = Path(tracking_dir) | |
| self.tracking_dir.mkdir(exist_ok=True) | |
| self.metrics_file = self.tracking_dir / "metrics_history.json" | |
| self.alerts_file = self.tracking_dir / "alerts.json" | |
| self.trends_file = self.tracking_dir / "trends.json" | |
| self._load_history() | |
| def _load_history(self): | |
| """Load historical tracking data.""" | |
| try: | |
| with open(self.metrics_file, "r") as f: | |
| self.metrics_history = json.load(f) | |
| except (FileNotFoundError, json.JSONDecodeError): | |
| self.metrics_history = [] | |
| try: | |
| with open(self.alerts_file, "r") as f: | |
| self.alerts = json.load(f) | |
| except (FileNotFoundError, json.JSONDecodeError): | |
| self.alerts = [] | |
| def record_evaluation(self, results_file: str) -> Dict[str, Any]: | |
| """Record a new evaluation run.""" | |
| try: | |
| with open(results_file, "r") as f: | |
| results = json.load(f) | |
| except Exception as e: | |
| return {"error": f"Failed to load results: {e}"} | |
| # Extract key metrics | |
| summary = results.get("summary", {}) | |
| timestamp = time.time() | |
| evaluation_record = { | |
| "timestamp": timestamp, | |
| "date": datetime.fromtimestamp(timestamp).isoformat(), | |
| "metrics": { | |
| "total_questions": summary.get("n_questions", 0), | |
| "success_rate": summary.get("success_rate", 0.0), | |
| "avg_latency_s": summary.get("avg_latency_s", 0.0), | |
| "avg_groundedness_score": summary.get("avg_groundedness_score", 0.0), | |
| "avg_citation_accuracy": summary.get("avg_citation_accuracy", 0.0), | |
| "perfect_citations": summary.get("perfect_citations", 0), | |
| "no_citations": summary.get("no_citations", 0), | |
| }, | |
| "performance_score": self._calculate_performance_score(summary), | |
| "quality_grade": self._calculate_quality_grade(summary), | |
| "evaluation_file": results_file, | |
| } | |
| # Add to history | |
| self.metrics_history.append(evaluation_record) | |
| # Keep only last 100 evaluations | |
| if len(self.metrics_history) > 100: | |
| self.metrics_history = self.metrics_history[-100:] | |
| # Save updated history | |
| self._save_history() | |
| # Check for alerts | |
| alerts = self._check_alerts(evaluation_record) | |
| # Update trends | |
| trends = self._update_trends() | |
| return { | |
| "recorded": True, | |
| "timestamp": timestamp, | |
| "performance_score": evaluation_record["performance_score"], | |
| "quality_grade": evaluation_record["quality_grade"], | |
| "alerts": alerts, | |
| "trends": trends, | |
| } | |
| def _calculate_performance_score(self, summary: Dict) -> float: | |
| """Calculate composite performance score.""" | |
| success_rate = summary.get("success_rate", 0.0) | |
| latency = summary.get("avg_latency_s", 10.0) | |
| groundedness = summary.get("avg_groundedness_score", 0.0) | |
| citation = summary.get("avg_citation_accuracy", 0.0) | |
| # Normalize latency (assume 10s worst, 1s best) | |
| latency_score = max(0, min(1, (10 - latency) / 9)) | |
| # Weighted composite score | |
| score = ( | |
| success_rate * 0.25 # System reliability | |
| + latency_score * 0.25 # Response speed | |
| + groundedness * 0.30 # Content accuracy | |
| + citation * 0.20 # Source attribution | |
| ) | |
| return round(score, 3) | |
| def _calculate_quality_grade(self, summary: Dict) -> str: | |
| """Calculate quality grade from metrics.""" | |
| score = self._calculate_performance_score(summary) | |
| if score >= 0.95: | |
| return "A+" | |
| elif score >= 0.90: | |
| return "A" | |
| elif score >= 0.80: | |
| return "B+" | |
| elif score >= 0.70: | |
| return "B" | |
| elif score >= 0.60: | |
| return "C+" | |
| elif score >= 0.50: | |
| return "C" | |
| else: | |
| return "D" | |
| def _check_alerts(self, current_evaluation: Dict) -> List[Dict[str, Any]]: | |
| """Check for performance alerts and quality regressions.""" | |
| alerts = [] | |
| current_metrics = current_evaluation["metrics"] | |
| timestamp = current_evaluation["timestamp"] | |
| # Define alert thresholds | |
| thresholds = { | |
| "success_rate_critical": 0.90, | |
| "success_rate_warning": 0.95, | |
| "latency_critical": 10.0, | |
| "latency_warning": 6.0, | |
| "groundedness_critical": 0.80, | |
| "groundedness_warning": 0.90, | |
| "citation_critical": 0.20, | |
| "citation_warning": 0.50, | |
| } | |
| # Check current values against thresholds | |
| success_rate = current_metrics["success_rate"] | |
| if success_rate < thresholds["success_rate_critical"]: | |
| alerts.append( | |
| { | |
| "level": "critical", | |
| "category": "reliability", | |
| "title": "Critical System Reliability Issue", | |
| "message": f"Success rate dropped to {success_rate*100:.1f}% " | |
| f"(threshold: {thresholds['success_rate_critical']*100:.1f}%)", | |
| "timestamp": timestamp, | |
| "value": success_rate, | |
| } | |
| ) | |
| elif success_rate < thresholds["success_rate_warning"]: | |
| alerts.append( | |
| { | |
| "level": "warning", | |
| "category": "reliability", | |
| "title": "System Reliability Warning", | |
| "message": f"Success rate at {success_rate*100:.1f}% " | |
| f"(threshold: {thresholds['success_rate_warning']*100:.1f}%)", | |
| "timestamp": timestamp, | |
| "value": success_rate, | |
| } | |
| ) | |
| # Check latency | |
| latency = current_metrics["avg_latency_s"] | |
| if latency > thresholds["latency_critical"]: | |
| alerts.append( | |
| { | |
| "level": "critical", | |
| "category": "performance", | |
| "title": "Critical Performance Degradation", | |
| "message": f"Average latency at {latency:.1f}s (threshold: {thresholds['latency_critical']:.1f}s)", | |
| "timestamp": timestamp, | |
| "value": latency, | |
| } | |
| ) | |
| elif latency > thresholds["latency_warning"]: | |
| alerts.append( | |
| { | |
| "level": "warning", | |
| "category": "performance", | |
| "title": "Performance Warning", | |
| "message": f"Average latency at {latency:.1f}s (threshold: {thresholds['latency_warning']:.1f}s)", | |
| "timestamp": timestamp, | |
| "value": latency, | |
| } | |
| ) | |
| # Check groundedness | |
| groundedness = current_metrics["avg_groundedness_score"] | |
| if groundedness < thresholds["groundedness_critical"]: | |
| alerts.append( | |
| { | |
| "level": "critical", | |
| "category": "quality", | |
| "title": "Critical Content Quality Issue", | |
| "message": f"Groundedness score at {groundedness*100:.1f}% " | |
| f"(threshold: {thresholds['groundedness_critical']*100:.1f}%)", | |
| "timestamp": timestamp, | |
| "value": groundedness, | |
| } | |
| ) | |
| elif groundedness < thresholds["groundedness_warning"]: | |
| alerts.append( | |
| { | |
| "level": "warning", | |
| "category": "quality", | |
| "title": "Content Quality Warning", | |
| "message": ( | |
| f"Groundedness score at {groundedness*100:.1f}% " | |
| f"(threshold: {thresholds['groundedness_warning']*100:.1f}%)" | |
| ), | |
| "timestamp": timestamp, | |
| "value": groundedness, | |
| } | |
| ) | |
| # Check citation accuracy | |
| citation = current_metrics["avg_citation_accuracy"] | |
| if citation < thresholds["citation_critical"]: | |
| alerts.append( | |
| { | |
| "level": "critical", | |
| "category": "attribution", | |
| "title": "Critical Citation Accuracy Issue", | |
| "message": ( | |
| f"Citation accuracy at {citation*100:.1f}% " | |
| f"(threshold: {thresholds['citation_critical']*100:.1f}%)" | |
| ), | |
| "timestamp": timestamp, | |
| "value": citation, | |
| } | |
| ) | |
| elif citation < thresholds["citation_warning"]: | |
| alerts.append( | |
| { | |
| "level": "warning", | |
| "category": "attribution", | |
| "title": "Citation Accuracy Warning", | |
| "message": ( | |
| f"Citation accuracy at {citation*100:.1f}% " | |
| f"(threshold: {thresholds['citation_warning']*100:.1f}%)" | |
| ), | |
| "timestamp": timestamp, | |
| "value": citation, | |
| } | |
| ) | |
| # Check for trend-based alerts (regression detection) | |
| if len(self.metrics_history) >= 3: | |
| trend_alerts = self._check_trend_alerts(current_evaluation) | |
| alerts.extend(trend_alerts) | |
| # Save alerts | |
| self.alerts.extend(alerts) | |
| # Keep only alerts from last 30 days | |
| cutoff_time = timestamp - (30 * 24 * 3600) | |
| self.alerts = [a for a in self.alerts if a["timestamp"] > cutoff_time] | |
| with open(self.alerts_file, "w") as f: | |
| json.dump(self.alerts, f, indent=2) | |
| return alerts | |
| def _check_trend_alerts(self, current_evaluation: Dict) -> List[Dict[str, Any]]: | |
| """Check for negative trends and regressions.""" | |
| alerts = [] | |
| if len(self.metrics_history) < 3: | |
| return alerts | |
| # Get recent history for trend analysis | |
| recent_history = self.metrics_history[-3:] # Last 3 evaluations | |
| current_metrics = current_evaluation["metrics"] | |
| # Check for performance degradation trends | |
| recent_scores = [eval_record["performance_score"] for eval_record in recent_history] | |
| current_score = current_evaluation["performance_score"] | |
| # Check if performance is consistently declining | |
| if len(recent_scores) >= 2: | |
| declining_trend = all(recent_scores[i] > recent_scores[i + 1] for i in range(len(recent_scores) - 1)) | |
| score_drop = recent_scores[0] - current_score | |
| if declining_trend and score_drop > 0.1: | |
| alerts.append( | |
| { | |
| "level": "warning", | |
| "category": "trend", | |
| "title": "Performance Degradation Trend", | |
| "message": ( | |
| f"Performance score declining over last {len(recent_scores)+1} " | |
| f"evaluations (drop: {score_drop:.3f})" | |
| ), | |
| "timestamp": current_evaluation["timestamp"], | |
| "value": current_score, | |
| } | |
| ) | |
| # Check specific metric trends | |
| metrics_to_check = [ | |
| "avg_latency_s", | |
| "avg_groundedness_score", | |
| "avg_citation_accuracy", | |
| ] | |
| for metric in metrics_to_check: | |
| recent_values = [eval_record["metrics"][metric] for eval_record in recent_history] | |
| current_value = current_metrics[metric] | |
| if metric == "avg_latency_s": | |
| # For latency, increasing is bad | |
| if all(recent_values[i] < recent_values[i + 1] for i in range(len(recent_values) - 1)): | |
| value_increase = current_value - recent_values[0] | |
| if value_increase > 1.0: # 1 second increase | |
| alerts.append( | |
| { | |
| "level": "warning", | |
| "category": "trend", | |
| "title": "Latency Increase Trend", | |
| "message": f"Response time increasing over recent evaluations (+{value_increase:.1f}s)", | |
| "timestamp": current_evaluation["timestamp"], | |
| "value": current_value, | |
| } | |
| ) | |
| else: | |
| # For other metrics, decreasing is bad | |
| if all(recent_values[i] > recent_values[i + 1] for i in range(len(recent_values) - 1)): | |
| value_decrease = recent_values[0] - current_value | |
| if value_decrease > 0.05: # 5% decrease | |
| alerts.append( | |
| { | |
| "level": "warning", | |
| "category": "trend", | |
| "title": f"{metric.replace('_', ' ').title()} Decline Trend", | |
| "message": f"{metric} declining over recent evaluations (-{value_decrease:.3f})", | |
| "timestamp": current_evaluation["timestamp"], | |
| "value": current_value, | |
| } | |
| ) | |
| return alerts | |
| def _update_trends(self) -> Dict[str, Any]: | |
| """Update trend analysis.""" | |
| if len(self.metrics_history) < 2: | |
| return {"error": "Insufficient data for trend analysis"} | |
| # Calculate trends over different time windows | |
| trends = { | |
| "overall_performance": self._calculate_metric_trend("performance_score"), | |
| "system_reliability": self._calculate_metric_trend("success_rate"), | |
| "response_time": self._calculate_metric_trend("avg_latency_s"), | |
| "content_quality": self._calculate_metric_trend("avg_groundedness_score"), | |
| "citation_accuracy": self._calculate_metric_trend("avg_citation_accuracy"), | |
| "last_updated": time.time(), | |
| } | |
| # Save trends | |
| with open(self.trends_file, "w") as f: | |
| json.dump(trends, f, indent=2) | |
| return trends | |
| def _calculate_metric_trend(self, metric_path: str) -> Dict[str, Any]: | |
| """Calculate trend for a specific metric.""" | |
| if len(self.metrics_history) < 2: | |
| return {"trend": "insufficient_data"} | |
| # Extract values | |
| if metric_path in ["performance_score", "quality_grade"]: | |
| values = [record[metric_path] for record in self.metrics_history[-10:]] # Last 10 evaluations | |
| else: | |
| values = [record["metrics"][metric_path] for record in self.metrics_history[-10:]] | |
| if metric_path == "quality_grade": | |
| # Convert grades to numeric for trend analysis | |
| grade_values = { | |
| "A+": 4.0, | |
| "A": 3.7, | |
| "B+": 3.3, | |
| "B": 3.0, | |
| "C+": 2.7, | |
| "C": 2.3, | |
| "D": 2.0, | |
| } | |
| values = [grade_values.get(v, 2.0) for v in values] | |
| # Calculate trend | |
| if len(values) < 2: | |
| return {"trend": "insufficient_data"} | |
| # Simple linear trend calculation | |
| x = list(range(len(values))) | |
| mean_x = statistics.mean(x) | |
| mean_y = statistics.mean(values) | |
| numerator = sum((x[i] - mean_x) * (values[i] - mean_y) for i in range(len(values))) | |
| denominator = sum((x[i] - mean_x) ** 2 for i in range(len(values))) | |
| if denominator == 0: | |
| slope = 0 | |
| else: | |
| slope = numerator / denominator | |
| # Determine trend direction | |
| if abs(slope) < 0.01: | |
| trend_direction = "stable" | |
| elif slope > 0: | |
| trend_direction = "improving" if metric_path != "avg_latency_s" else "degrading" | |
| else: | |
| trend_direction = "degrading" if metric_path != "avg_latency_s" else "improving" | |
| return { | |
| "trend": trend_direction, | |
| "slope": slope, | |
| "current_value": values[-1], | |
| "previous_value": values[-2] if len(values) >= 2 else values[-1], | |
| "change": values[-1] - (values[-2] if len(values) >= 2 else values[-1]), | |
| "data_points": len(values), | |
| } | |
| def _save_history(self): | |
| """Save metrics history to file.""" | |
| with open(self.metrics_file, "w") as f: | |
| json.dump(self.metrics_history, f, indent=2) | |
| def get_current_status(self) -> Dict[str, Any]: | |
| """Get current system status and recent trends.""" | |
| if not self.metrics_history: | |
| return {"error": "No evaluation history available"} | |
| latest_evaluation = self.metrics_history[-1] | |
| recent_alerts = [a for a in self.alerts if a["timestamp"] > time.time() - (24 * 3600)] # Last 24h | |
| try: | |
| with open(self.trends_file, "r") as f: | |
| trends = json.load(f) | |
| except (FileNotFoundError, json.JSONDecodeError): | |
| trends = {} | |
| return { | |
| "current_performance": { | |
| "score": latest_evaluation["performance_score"], | |
| "grade": latest_evaluation["quality_grade"], | |
| "timestamp": latest_evaluation["timestamp"], | |
| "date": latest_evaluation["date"], | |
| }, | |
| "current_metrics": latest_evaluation["metrics"], | |
| "recent_alerts": recent_alerts, | |
| "alert_summary": { | |
| "critical": len([a for a in recent_alerts if a["level"] == "critical"]), | |
| "warning": len([a for a in recent_alerts if a["level"] == "warning"]), | |
| }, | |
| "trends": trends, | |
| "evaluation_count": len(self.metrics_history), | |
| } | |
| def generate_monitoring_report(self) -> Dict[str, Any]: | |
| """Generate comprehensive monitoring report.""" | |
| if not self.metrics_history: | |
| return {"error": "No evaluation data available"} | |
| current_status = self.get_current_status() | |
| # Calculate statistics over different time periods | |
| last_7_days = [e for e in self.metrics_history if e["timestamp"] > time.time() - (7 * 24 * 3600)] | |
| last_30_days = [e for e in self.metrics_history if e["timestamp"] > time.time() - (30 * 24 * 3600)] | |
| report = { | |
| "report_timestamp": time.time(), | |
| "report_date": datetime.now().isoformat(), | |
| "current_status": current_status, | |
| "historical_analysis": { | |
| "total_evaluations": len(self.metrics_history), | |
| "evaluations_last_7_days": len(last_7_days), | |
| "evaluations_last_30_days": len(last_30_days), | |
| "average_performance_7d": ( | |
| statistics.mean([e["performance_score"] for e in last_7_days]) if last_7_days else None | |
| ), | |
| "average_performance_30d": ( | |
| statistics.mean([e["performance_score"] for e in last_30_days]) if last_30_days else None | |
| ), | |
| }, | |
| "alert_analysis": { | |
| "total_alerts": len(self.alerts), | |
| "critical_alerts_30d": len( | |
| [ | |
| a | |
| for a in self.alerts | |
| if a["level"] == "critical" and a["timestamp"] > time.time() - (30 * 24 * 3600) | |
| ] | |
| ), | |
| "most_frequent_alert_category": self._get_most_frequent_alert_category(), | |
| }, | |
| "recommendations": self._generate_monitoring_recommendations(current_status), | |
| } | |
| return report | |
| def _get_most_frequent_alert_category(self) -> Optional[str]: | |
| """Get the most frequent alert category.""" | |
| if not self.alerts: | |
| return None | |
| categories = {} | |
| for alert in self.alerts: | |
| category = alert["category"] | |
| categories[category] = categories.get(category, 0) + 1 | |
| return max(categories.items(), key=lambda x: x[1])[0] if categories else None | |
| def _generate_monitoring_recommendations(self, current_status: Dict) -> List[str]: | |
| """Generate monitoring-based recommendations.""" | |
| recommendations = [] | |
| alert_summary = current_status["alert_summary"] | |
| if alert_summary["critical"] > 0: | |
| recommendations.append(f"π΄ Address {alert_summary['critical']} critical alert(s) immediately") | |
| if alert_summary["warning"] > 2: | |
| recommendations.append(f"π‘ Investigate {alert_summary['warning']} warning alert(s) to prevent degradation") | |
| current_score = current_status["current_performance"]["score"] | |
| if current_score < 0.7: | |
| recommendations.append("π Performance score below acceptable threshold - implement improvement plan") | |
| evaluation_count = current_status["evaluation_count"] | |
| if evaluation_count < 5: | |
| recommendations.append("π Increase evaluation frequency for better trend analysis") | |
| return recommendations | |
| def main(): | |
| """Demonstrate evaluation tracking system.""" | |
| print("π Initializing evaluation tracking system...") | |
| # Initialize tracker | |
| tracker = EvaluationTracker("evaluation_tracking") | |
| # Record latest evaluation | |
| results_file = "/Users/sethmcknight/Developer/msse-ai-engineering/evaluation/enhanced_results.json" | |
| if os.path.exists(results_file): | |
| print("π Recording latest evaluation...") | |
| record_result = tracker.record_evaluation(results_file) | |
| if "error" in record_result: | |
| print(f"β Error: {record_result['error']}") | |
| return | |
| print("β Evaluation recorded successfully") | |
| print(f" Performance Score: {record_result['performance_score']}") | |
| print(f" Quality Grade: {record_result['quality_grade']}") | |
| if record_result["alerts"]: | |
| print(f" β οΈ Generated {len(record_result['alerts'])} alert(s)") | |
| # Get current status | |
| print("\nπ Current System Status:") | |
| status = tracker.get_current_status() | |
| if "error" in status: | |
| print(f"β Error: {status['error']}") | |
| return | |
| current_perf = status["current_performance"] | |
| print(f" Grade: {current_perf['grade']}") | |
| print(f" Score: {current_perf['score']}") | |
| print(f" Last Evaluation: {current_perf['date'][:19]}") | |
| alert_summary = status["alert_summary"] | |
| print(f" Recent Alerts: {alert_summary['critical']} critical, {alert_summary['warning']} warnings") | |
| # Generate monitoring report | |
| print("\nπ Generating monitoring report...") | |
| report = tracker.generate_monitoring_report() | |
| # Save report | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| report_file = f"evaluation_tracking/monitoring_report_{timestamp}.json" | |
| with open(report_file, "w") as f: | |
| json.dump(report, f, indent=2) | |
| print(f"π Monitoring report saved: {report_file}") | |
| recommendations = report.get("recommendations", []) | |
| if recommendations: | |
| print("\nπ‘ RECOMMENDATIONS:") | |
| for rec in recommendations: | |
| print(f" {rec}") | |
| print("\nβ Evaluation tracking system ready!") | |
| if __name__ == "__main__": | |
| main() | |