Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| """ | |
| ML Model Performance Analyzer | |
| Comprehensive machine learning model performance monitoring and analysis | |
| """ | |
| from dataclasses import asdict, dataclass | |
| from datetime import datetime | |
| from typing import Any, Dict, List, Optional | |
| import numpy as np | |
| class ModelPerformanceMetric: | |
| """ML model performance metric data structure""" | |
| model_name: str | |
| metric_type: str # accuracy, precision, recall, f1, auc, rmse, mae, r2 | |
| value: float | |
| threshold: Optional[float] | |
| status: str # excellent, good, acceptable, poor, critical | |
| timestamp: datetime | |
| dataset_type: str # training, validation, test, production | |
| details: Dict[str, Any] | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary for JSON serialization""" | |
| result = asdict(self) | |
| result["timestamp"] = self.timestamp.isoformat() | |
| return result | |
| class ModelDriftAnalysis: | |
| """Model drift analysis results""" | |
| model_name: str | |
| drift_detected: bool | |
| drift_type: str # concept_drift, data_drift, covariate_shift | |
| drift_score: float | |
| baseline_performance: float | |
| current_performance: float | |
| performance_drop: float | |
| timestamp: datetime | |
| recommendations: List[str] | |
| def to_dict(self) -> Dict[str, Any]: | |
| result = asdict(self) | |
| result["timestamp"] = self.timestamp.isoformat() | |
| return result | |
| class MLModelPerformanceAnalyzer: | |
| """Comprehensive ML model performance monitoring and analysis""" | |
| def __init__(self): | |
| self.start_time = datetime.now() | |
| self.performance_metrics = [] | |
| self.drift_analyses = [] | |
| self.model_baselines = {} | |
| # Performance thresholds for different model types | |
| self.thresholds = { | |
| "classification": { | |
| "accuracy": { | |
| "excellent": 0.95, | |
| "good": 0.90, | |
| "acceptable": 0.80, | |
| "poor": 0.70, | |
| }, | |
| "precision": { | |
| "excellent": 0.95, | |
| "good": 0.90, | |
| "acceptable": 0.80, | |
| "poor": 0.70, | |
| }, | |
| "recall": { | |
| "excellent": 0.95, | |
| "good": 0.90, | |
| "acceptable": 0.80, | |
| "poor": 0.70, | |
| }, | |
| "f1": { | |
| "excellent": 0.95, | |
| "good": 0.90, | |
| "acceptable": 0.80, | |
| "poor": 0.70, | |
| }, | |
| "auc": { | |
| "excellent": 0.95, | |
| "good": 0.90, | |
| "acceptable": 0.80, | |
| "poor": 0.70, | |
| }, | |
| }, | |
| "regression": { | |
| "rmse": { | |
| "excellent": 0.05, | |
| "good": 0.10, | |
| "acceptable": 0.20, | |
| "poor": 0.30, | |
| }, | |
| "mae": { | |
| "excellent": 0.05, | |
| "good": 0.10, | |
| "acceptable": 0.20, | |
| "poor": 0.30, | |
| }, | |
| "r2": { | |
| "excellent": 0.95, | |
| "good": 0.90, | |
| "acceptable": 0.80, | |
| "poor": 0.70, | |
| }, | |
| }, | |
| "anomaly_detection": { | |
| "precision": { | |
| "excellent": 0.90, | |
| "good": 0.80, | |
| "acceptable": 0.70, | |
| "poor": 0.60, | |
| }, | |
| "recall": { | |
| "excellent": 0.90, | |
| "good": 0.80, | |
| "acceptable": 0.70, | |
| "poor": 0.60, | |
| }, | |
| "f1": { | |
| "excellent": 0.90, | |
| "good": 0.80, | |
| "acceptable": 0.70, | |
| "poor": 0.60, | |
| }, | |
| }, | |
| } | |
| # Drift detection thresholds | |
| self.drift_thresholds = { | |
| "performance_drop": 0.10, # 10% drop in performance | |
| "data_drift": 0.15, # 15% data distribution change | |
| "concept_drift": 0.20, # 20% concept change | |
| } | |
| async def analyze_model_performance( | |
| self, | |
| model_name: str, | |
| model_type: str, | |
| predictions: List[Any], | |
| actuals: List[Any], | |
| dataset_type: str = "production", | |
| ) -> List[ModelPerformanceMetric]: | |
| """Analyze model performance metrics""" | |
| metrics = [] | |
| current_time = datetime.now() | |
| try: | |
| # Convert to numpy arrays for calculations | |
| y_pred = np.array(predictions) | |
| y_true = np.array(actuals) | |
| if model_type == "classification": | |
| # Classification metrics | |
| from sklearn.metrics import ( | |
| accuracy_score, | |
| confusion_matrix, | |
| f1_score, | |
| precision_score, | |
| recall_score, | |
| roc_auc_score, | |
| ) | |
| # Basic metrics | |
| accuracy = accuracy_score(y_true, y_pred) | |
| precision = precision_score( | |
| y_true, y_pred, average="weighted", zero_division=0 | |
| ) | |
| recall = recall_score( | |
| y_true, y_pred, average="weighted", zero_division=0 | |
| ) | |
| f1 = f1_score(y_true, y_pred, average="weighted", zero_division=0) | |
| # Add metrics | |
| metrics.extend( | |
| [ | |
| self._create_metric( | |
| model_name, | |
| "accuracy", | |
| accuracy, | |
| model_type, | |
| dataset_type, | |
| current_time, | |
| ), | |
| self._create_metric( | |
| model_name, | |
| "precision", | |
| precision, | |
| model_type, | |
| dataset_type, | |
| current_time, | |
| ), | |
| self._create_metric( | |
| model_name, | |
| "recall", | |
| recall, | |
| model_type, | |
| dataset_type, | |
| current_time, | |
| ), | |
| self._create_metric( | |
| model_name, "f1", f1, model_type, dataset_type, current_time | |
| ), | |
| ] | |
| ) | |
| # AUC if binary classification | |
| if len(np.unique(y_true)) == 2: | |
| try: | |
| auc = roc_auc_score(y_true, y_pred) | |
| metrics.append( | |
| self._create_metric( | |
| model_name, | |
| "auc", | |
| auc, | |
| model_type, | |
| dataset_type, | |
| current_time, | |
| ) | |
| ) | |
| except Exception: | |
| pass | |
| # Confusion matrix details | |
| cm = confusion_matrix(y_true, y_pred) | |
| details = { | |
| "confusion_matrix": cm.tolist(), | |
| "true_positives": int(cm[1, 1]) if cm.shape[0] > 1 else 0, | |
| "false_positives": int(cm[0, 1]) if cm.shape[0] > 1 else 0, | |
| "true_negatives": int(cm[0, 0]) if cm.shape[0] > 1 else 0, | |
| "false_negatives": int(cm[1, 0]) if cm.shape[0] > 1 else 0, | |
| "total_predictions": len(y_pred), | |
| "unique_classes": len(np.unique(y_true)), | |
| } | |
| elif model_type == "regression": | |
| # Regression metrics | |
| from sklearn.metrics import ( | |
| mean_absolute_error, | |
| mean_squared_error, | |
| r2_score, | |
| ) | |
| mse = mean_squared_error(y_true, y_pred) | |
| rmse = np.sqrt(mse) | |
| mae = mean_absolute_error(y_true, y_pred) | |
| r2 = r2_score(y_true, y_pred) | |
| metrics.extend( | |
| [ | |
| self._create_metric( | |
| model_name, | |
| "rmse", | |
| rmse, | |
| model_type, | |
| dataset_type, | |
| current_time, | |
| ), | |
| self._create_metric( | |
| model_name, | |
| "mae", | |
| mae, | |
| model_type, | |
| dataset_type, | |
| current_time, | |
| ), | |
| self._create_metric( | |
| model_name, "r2", r2, model_type, dataset_type, current_time | |
| ), | |
| ] | |
| ) | |
| details = { | |
| "mse": mse, | |
| "total_predictions": len(y_pred), | |
| "target_range": [float(np.min(y_true)), float(np.max(y_true))], | |
| "prediction_range": [float(np.min(y_pred)), float(np.max(y_pred))], | |
| } | |
| elif model_type == "anomaly_detection": | |
| # Anomaly detection metrics | |
| from sklearn.metrics import f1_score, precision_score, recall_score | |
| precision = precision_score(y_true, y_pred, zero_division=0) | |
| recall = recall_score(y_true, y_pred, zero_division=0) | |
| f1 = f1_score(y_true, y_pred, zero_division=0) | |
| metrics.extend( | |
| [ | |
| self._create_metric( | |
| model_name, | |
| "precision", | |
| precision, | |
| model_type, | |
| dataset_type, | |
| current_time, | |
| ), | |
| self._create_metric( | |
| model_name, | |
| "recall", | |
| recall, | |
| model_type, | |
| dataset_type, | |
| current_time, | |
| ), | |
| self._create_metric( | |
| model_name, "f1", f1, model_type, dataset_type, current_time | |
| ), | |
| ] | |
| ) | |
| details = { | |
| "total_predictions": len(y_pred), | |
| "anomalies_detected": int(np.sum(y_pred)), | |
| "actual_anomalies": int(np.sum(y_true)), | |
| "true_positives": int(np.sum((y_pred == 1) & (y_true == 1))), | |
| "false_positives": int(np.sum((y_pred == 1) & (y_true == 0))), | |
| } | |
| # Add details to all metrics | |
| for metric in metrics: | |
| metric.details.update(details) | |
| # Store metrics | |
| self.performance_metrics.extend(metrics) | |
| except Exception as e: | |
| # Add error metric | |
| metrics.append( | |
| ModelPerformanceMetric( | |
| model_name=model_name, | |
| metric_type="error", | |
| value=0.0, | |
| threshold=None, | |
| status="critical", | |
| timestamp=current_time, | |
| dataset_type=dataset_type, | |
| details={"error": str(e)}, | |
| ) | |
| ) | |
| return metrics | |
| def _create_metric( | |
| self, | |
| model_name: str, | |
| metric_type: str, | |
| value: float, | |
| model_type: str, | |
| dataset_type: str, | |
| timestamp: datetime, | |
| ) -> ModelPerformanceMetric: | |
| """Create a performance metric with status determination""" | |
| # Get threshold for this metric type | |
| thresholds = self.thresholds.get(model_type, {}).get(metric_type, {}) | |
| if thresholds: | |
| if value >= thresholds.get("excellent", 1.0): | |
| status = "excellent" | |
| elif value >= thresholds.get("good", 0.8): | |
| status = "good" | |
| elif value >= thresholds.get("acceptable", 0.6): | |
| status = "acceptable" | |
| elif value >= thresholds.get("poor", 0.4): | |
| status = "poor" | |
| else: | |
| status = "critical" | |
| threshold = thresholds.get("acceptable", 0.5) | |
| else: | |
| status = "unknown" | |
| threshold = None | |
| return ModelPerformanceMetric( | |
| model_name=model_name, | |
| metric_type=metric_type, | |
| value=value, | |
| threshold=threshold, | |
| status=status, | |
| timestamp=timestamp, | |
| dataset_type=dataset_type, | |
| details={}, | |
| ) | |
| async def detect_model_drift( | |
| self, | |
| model_name: str, | |
| current_predictions: List[Any], | |
| current_actuals: List[Any], | |
| baseline_predictions: List[Any] = None, | |
| baseline_actuals: List[Any] = None, | |
| ) -> ModelDriftAnalysis: | |
| """Detect model drift using various techniques""" | |
| current_time = datetime.now() | |
| try: | |
| # Get current performance | |
| current_metrics = await self.analyze_model_performance( | |
| model_name, | |
| "classification", | |
| current_predictions, | |
| current_actuals, | |
| "production", | |
| ) | |
| # Get baseline performance | |
| if baseline_predictions and baseline_actuals: | |
| baseline_metrics = await self.analyze_model_performance( | |
| model_name, | |
| "classification", | |
| baseline_predictions, | |
| baseline_actuals, | |
| "training", | |
| ) | |
| else: | |
| # Use stored baseline if available | |
| baseline_metrics = [ | |
| m | |
| for m in self.performance_metrics | |
| if m.model_name == model_name and m.dataset_type == "training" | |
| ] | |
| if not baseline_metrics: | |
| return ModelDriftAnalysis( | |
| model_name=model_name, | |
| drift_detected=False, | |
| drift_type="no_baseline", | |
| drift_score=0.0, | |
| baseline_performance=0.0, | |
| current_performance=0.0, | |
| performance_drop=0.0, | |
| timestamp=current_time, | |
| recommendations=["Establish baseline performance metrics"], | |
| ) | |
| # Calculate performance drop | |
| baseline_perf = np.mean( | |
| [ | |
| m.value | |
| for m in baseline_metrics | |
| if m.metric_type in ["accuracy", "f1", "r2"] | |
| ] | |
| ) | |
| current_perf = np.mean( | |
| [ | |
| m.value | |
| for m in current_metrics | |
| if m.metric_type in ["accuracy", "f1", "r2"] | |
| ] | |
| ) | |
| performance_drop = ( | |
| (baseline_perf - current_perf) / baseline_perf | |
| if baseline_perf > 0 | |
| else 0 | |
| ) | |
| # Detect performance drift | |
| performance_drift = ( | |
| performance_drop > self.drift_thresholds["performance_drop"] | |
| ) | |
| # Detect data drift (simplified - in real implementation would use statistical tests) | |
| data_drift_score = self._calculate_data_drift( | |
| current_predictions, baseline_predictions | |
| ) | |
| data_drift = data_drift_score > self.drift_thresholds["data_drift"] | |
| # Detect concept drift | |
| concept_drift_score = self._calculate_concept_drift( | |
| current_predictions, | |
| current_actuals, | |
| baseline_predictions, | |
| baseline_actuals, | |
| ) | |
| concept_drift = concept_drift_score > self.drift_thresholds["concept_drift"] | |
| # Determine overall drift | |
| drift_detected = performance_drift or data_drift or concept_drift | |
| # Determine drift type | |
| if concept_drift: | |
| drift_type = "concept_drift" | |
| elif data_drift: | |
| drift_type = "data_drift" | |
| elif performance_drift: | |
| drift_type = "performance_drift" | |
| else: | |
| drift_type = "no_drift" | |
| # Calculate overall drift score | |
| drift_score = max(performance_drop, data_drift_score, concept_drift_score) | |
| # Generate recommendations | |
| recommendations = [] | |
| if drift_detected: | |
| if performance_drift: | |
| recommendations.append("Retrain model with recent data") | |
| if data_drift: | |
| recommendations.append("Update data preprocessing pipeline") | |
| if concept_drift: | |
| recommendations.append( | |
| "Review model features and target definition" | |
| ) | |
| recommendations.append("Monitor model performance closely") | |
| recommendations.append("Consider ensemble methods for robustness") | |
| return ModelDriftAnalysis( | |
| model_name=model_name, | |
| drift_detected=drift_detected, | |
| drift_type=drift_type, | |
| drift_score=drift_score, | |
| baseline_performance=baseline_perf, | |
| current_performance=current_perf, | |
| performance_drop=performance_drop, | |
| timestamp=current_time, | |
| recommendations=recommendations, | |
| ) | |
| except Exception as e: | |
| return ModelDriftAnalysis( | |
| model_name=model_name, | |
| drift_detected=False, | |
| drift_type="error", | |
| drift_score=0.0, | |
| baseline_performance=0.0, | |
| current_performance=0.0, | |
| performance_drop=0.0, | |
| timestamp=current_time, | |
| recommendations=[f"Error in drift detection: {str(e)}"], | |
| ) | |
| def _calculate_data_drift( | |
| self, current_data: List[Any], baseline_data: List[Any] | |
| ) -> float: | |
| """Calculate data drift score (simplified)""" | |
| try: | |
| if not baseline_data: | |
| return 0.0 | |
| # Convert to numpy arrays | |
| current_arr = np.array(current_data) | |
| baseline_arr = np.array(baseline_data) | |
| # Simple statistical comparison | |
| current_mean = np.mean(current_arr) | |
| baseline_mean = np.mean(baseline_arr) | |
| current_std = np.std(current_arr) | |
| baseline_std = np.std(baseline_arr) | |
| # Calculate drift score based on mean and std differences | |
| mean_diff = abs(current_mean - baseline_mean) / (abs(baseline_mean) + 1e-8) | |
| std_diff = abs(current_std - baseline_std) / (abs(baseline_std) + 1e-8) | |
| drift_score = (mean_diff + std_diff) / 2 | |
| return min(drift_score, 1.0) | |
| except Exception: | |
| return 0.0 | |
| def _calculate_concept_drift( | |
| self, | |
| current_pred: List[Any], | |
| current_true: List[Any], | |
| baseline_pred: List[Any], | |
| baseline_true: List[Any], | |
| ) -> float: | |
| """Calculate concept drift score (simplified)""" | |
| try: | |
| if not baseline_pred or not baseline_true: | |
| return 0.0 | |
| # Calculate error rates | |
| current_error = np.mean(np.array(current_pred) != np.array(current_true)) | |
| baseline_error = np.mean(np.array(baseline_pred) != np.array(baseline_true)) | |
| # Concept drift is change in error rate | |
| concept_drift = abs(current_error - baseline_error) | |
| return min(concept_drift, 1.0) | |
| except Exception: | |
| return 0.0 | |
| async def analyze_feature_importance( | |
| self, model_name: str, feature_names: List[str], feature_importance: List[float] | |
| ) -> Dict[str, Any]: | |
| """Analyze feature importance and stability""" | |
| try: | |
| # Sort features by importance | |
| feature_data = list(zip(feature_names, feature_importance)) | |
| feature_data.sort(key=lambda x: x[1], reverse=True) | |
| # Calculate statistics | |
| importance_values = [imp for _, imp in feature_data] | |
| analysis = { | |
| "model_name": model_name, | |
| "total_features": len(feature_names), | |
| "top_features": feature_data[:10], | |
| "feature_importance_stats": { | |
| "mean": np.mean(importance_values), | |
| "std": np.std(importance_values), | |
| "min": np.min(importance_values), | |
| "max": np.max(importance_values), | |
| "median": np.median(importance_values), | |
| }, | |
| "importance_distribution": { | |
| "high_importance": len( | |
| [imp for imp in importance_values if imp > 0.1] | |
| ), | |
| "medium_importance": len( | |
| [imp for imp in importance_values if 0.05 <= imp <= 0.1] | |
| ), | |
| "low_importance": len( | |
| [imp for imp in importance_values if imp < 0.05] | |
| ), | |
| }, | |
| "recommendations": [], | |
| } | |
| # Generate recommendations | |
| if ( | |
| analysis["importance_distribution"]["low_importance"] | |
| > len(feature_names) * 0.5 | |
| ): | |
| analysis["recommendations"].append( | |
| "Consider feature selection to reduce dimensionality" | |
| ) | |
| if analysis["feature_importance_stats"]["std"] > 0.3: | |
| analysis["recommendations"].append( | |
| "Feature importance is highly skewed - consider regularization" | |
| ) | |
| if len(feature_names) > 100: | |
| analysis["recommendations"].append( | |
| "High-dimensional data - consider dimensionality reduction" | |
| ) | |
| return analysis | |
| except Exception as e: | |
| return { | |
| "model_name": model_name, | |
| "error": str(e), | |
| "recommendations": ["Fix feature importance analysis"], | |
| } | |
| async def generate_model_performance_report(self) -> Dict[str, Any]: | |
| """Generate comprehensive ML model performance report""" | |
| current_time = datetime.now() | |
| # Group metrics by model | |
| models = {} | |
| for metric in self.performance_metrics: | |
| if metric.model_name not in models: | |
| models[metric.model_name] = [] | |
| models[metric.model_name].append(metric) | |
| # Analyze each model | |
| model_analyses = {} | |
| for model_name, model_metrics in models.items(): | |
| # Group by dataset type | |
| training_metrics = [ | |
| m for m in model_metrics if m.dataset_type == "training" | |
| ] | |
| validation_metrics = [ | |
| m for m in model_metrics if m.dataset_type == "validation" | |
| ] | |
| test_metrics = [m for m in model_metrics if m.dataset_type == "test"] | |
| production_metrics = [ | |
| m for m in model_metrics if m.dataset_type == "production" | |
| ] | |
| # Calculate overall scores | |
| def calculate_score(metrics_list): | |
| if not metrics_list: | |
| return 0.0 | |
| score_map = { | |
| "excellent": 100, | |
| "good": 80, | |
| "acceptable": 60, | |
| "poor": 40, | |
| "critical": 20, | |
| "unknown": 50, | |
| } | |
| scores = [score_map.get(m.status, 50) for m in metrics_list] | |
| return np.mean(scores) | |
| model_analyses[model_name] = { | |
| "overall_score": calculate_score(model_metrics), | |
| "training_score": calculate_score(training_metrics), | |
| "validation_score": calculate_score(validation_metrics), | |
| "test_score": calculate_score(test_metrics), | |
| "production_score": calculate_score(production_metrics), | |
| "metrics_count": { | |
| "total": len(model_metrics), | |
| "training": len(training_metrics), | |
| "validation": len(validation_metrics), | |
| "test": len(test_metrics), | |
| "production": len(production_metrics), | |
| }, | |
| "latest_metrics": { | |
| metric.metric_type: metric.to_dict() | |
| for metric in sorted( | |
| model_metrics, key=lambda x: x.timestamp, reverse=True | |
| )[:10] | |
| }, | |
| "performance_trend": self._analyze_performance_trend(model_metrics), | |
| "critical_issues": [ | |
| m.to_dict() for m in model_metrics if m.status == "critical" | |
| ], | |
| } | |
| # Analyze drift across all models | |
| drift_summary = { | |
| "total_models_analyzed": len(self.drift_analyses), | |
| "models_with_drift": len( | |
| [d for d in self.drift_analyses if d.drift_detected] | |
| ), | |
| "drift_types": { | |
| drift_type: len( | |
| [d for d in self.drift_analyses if d.drift_type == drift_type] | |
| ) | |
| for drift_type in set(d.drift_type for d in self.drift_analyses) | |
| }, | |
| "recent_drift": [ | |
| d.to_dict() | |
| for d in sorted( | |
| self.drift_analyses, key=lambda x: x.timestamp, reverse=True | |
| )[:5] | |
| ], | |
| } | |
| # Generate recommendations | |
| recommendations = [] | |
| # Model-specific recommendations | |
| for model_name, analysis in model_analyses.items(): | |
| if analysis["production_score"] < 60: | |
| recommendations.append( | |
| f"CRITICAL: {model_name} performance is poor in production - immediate action required" | |
| ) | |
| elif analysis["production_score"] < 80: | |
| recommendations.append( | |
| f"WARNING: {model_name} performance degradation detected - consider retraining" | |
| ) | |
| if analysis["critical_issues"]: | |
| recommendations.append( | |
| f"Address {len(analysis['critical_issues'])} critical issues in {model_name}" | |
| ) | |
| # General recommendations | |
| if drift_summary["models_with_drift"] > 0: | |
| recommendations.append("Implement automated model retraining pipeline") | |
| if len(model_analyses) > 5: | |
| recommendations.append("Consider model ensemble for improved robustness") | |
| # Calculate overall ML system health | |
| overall_scores = [ | |
| analysis["overall_score"] for analysis in model_analyses.values() | |
| ] | |
| ml_system_health = np.mean(overall_scores) if overall_scores else 0.0 | |
| report = { | |
| "ml_system_health": ml_system_health, | |
| "analysis_timestamp": current_time.isoformat(), | |
| "summary": { | |
| "total_models": len(model_analyses), | |
| "models_with_critical_issues": len( | |
| [m for m in model_analyses.values() if m["critical_issues"]] | |
| ), | |
| "average_production_score": ( | |
| np.mean([m["production_score"] for m in model_analyses.values()]) | |
| if model_analyses | |
| else 0.0 | |
| ), | |
| "drift_detection_active": len(self.drift_analyses) > 0, | |
| }, | |
| "model_analyses": model_analyses, | |
| "drift_analysis": drift_summary, | |
| "recommendations": recommendations, | |
| "next_steps": [ | |
| "1. Address critical model performance issues immediately", | |
| "2. Implement automated model monitoring and drift detection", | |
| "3. Schedule regular model retraining based on drift detection", | |
| "4. Consider A/B testing for model improvements", | |
| "5. Establish model governance and versioning procedures", | |
| ], | |
| } | |
| return report | |
| def _analyze_performance_trend( | |
| self, metrics: List[ModelPerformanceMetric] | |
| ) -> Dict[str, Any]: | |
| """Analyze performance trend over time""" | |
| if len(metrics) < 2: | |
| return {"trend": "insufficient_data", "direction": "unknown"} | |
| # Sort by timestamp | |
| sorted_metrics = sorted(metrics, key=lambda x: x.timestamp) | |
| # Calculate trend for each metric type | |
| trends = {} | |
| for metric_type in set(m.metric_type for m in metrics): | |
| type_metrics = [m for m in sorted_metrics if m.metric_type == metric_type] | |
| if len(type_metrics) >= 2: | |
| # Simple linear regression | |
| values = [m.value for m in type_metrics] | |
| n = len(values) | |
| x = list(range(n)) | |
| # Calculate slope | |
| sum_x = sum(x) | |
| sum_y = sum(values) | |
| sum_xy = sum(x[i] * values[i] for i in range(n)) | |
| sum_x2 = sum(x[i] * x[i] for i in range(n)) | |
| if n * sum_x2 - sum_x * sum_x != 0: | |
| slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x) | |
| if abs(slope) < 0.01: | |
| trend = "stable" | |
| elif slope > 0: | |
| trend = "improving" | |
| else: | |
| trend = "degrading" | |
| trends[metric_type] = { | |
| "trend": trend, | |
| "slope": slope, | |
| "recent_value": values[-1], | |
| "change_percent": ( | |
| ((values[-1] - values[0]) / values[0] * 100) | |
| if values[0] != 0 | |
| else 0 | |
| ), | |
| } | |
| return trends | |
| # Global ML performance analyzer instance | |
| ml_performance_analyzer = MLModelPerformanceAnalyzer() | |