#!/usr/bin/env python3 """ ML Model Performance Analyzer Comprehensive machine learning model performance monitoring and analysis """ from dataclasses import asdict, dataclass from datetime import datetime from typing import Any, Dict, List, Optional import numpy as np @dataclass class ModelPerformanceMetric: """ML model performance metric data structure""" model_name: str metric_type: str # accuracy, precision, recall, f1, auc, rmse, mae, r2 value: float threshold: Optional[float] status: str # excellent, good, acceptable, poor, critical timestamp: datetime dataset_type: str # training, validation, test, production details: Dict[str, Any] def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for JSON serialization""" result = asdict(self) result["timestamp"] = self.timestamp.isoformat() return result @dataclass class ModelDriftAnalysis: """Model drift analysis results""" model_name: str drift_detected: bool drift_type: str # concept_drift, data_drift, covariate_shift drift_score: float baseline_performance: float current_performance: float performance_drop: float timestamp: datetime recommendations: List[str] def to_dict(self) -> Dict[str, Any]: result = asdict(self) result["timestamp"] = self.timestamp.isoformat() return result class MLModelPerformanceAnalyzer: """Comprehensive ML model performance monitoring and analysis""" def __init__(self): self.start_time = datetime.now() self.performance_metrics = [] self.drift_analyses = [] self.model_baselines = {} # Performance thresholds for different model types self.thresholds = { "classification": { "accuracy": { "excellent": 0.95, "good": 0.90, "acceptable": 0.80, "poor": 0.70, }, "precision": { "excellent": 0.95, "good": 0.90, "acceptable": 0.80, "poor": 0.70, }, "recall": { "excellent": 0.95, "good": 0.90, "acceptable": 0.80, "poor": 0.70, }, "f1": { "excellent": 0.95, "good": 0.90, "acceptable": 0.80, "poor": 0.70, }, "auc": { "excellent": 0.95, "good": 0.90, "acceptable": 0.80, "poor": 0.70, }, }, "regression": { "rmse": { "excellent": 0.05, "good": 0.10, "acceptable": 0.20, "poor": 0.30, }, "mae": { "excellent": 0.05, "good": 0.10, "acceptable": 0.20, "poor": 0.30, }, "r2": { "excellent": 0.95, "good": 0.90, "acceptable": 0.80, "poor": 0.70, }, }, "anomaly_detection": { "precision": { "excellent": 0.90, "good": 0.80, "acceptable": 0.70, "poor": 0.60, }, "recall": { "excellent": 0.90, "good": 0.80, "acceptable": 0.70, "poor": 0.60, }, "f1": { "excellent": 0.90, "good": 0.80, "acceptable": 0.70, "poor": 0.60, }, }, } # Drift detection thresholds self.drift_thresholds = { "performance_drop": 0.10, # 10% drop in performance "data_drift": 0.15, # 15% data distribution change "concept_drift": 0.20, # 20% concept change } async def analyze_model_performance( self, model_name: str, model_type: str, predictions: List[Any], actuals: List[Any], dataset_type: str = "production", ) -> List[ModelPerformanceMetric]: """Analyze model performance metrics""" metrics = [] current_time = datetime.now() try: # Convert to numpy arrays for calculations y_pred = np.array(predictions) y_true = np.array(actuals) if model_type == "classification": # Classification metrics from sklearn.metrics import ( accuracy_score, confusion_matrix, f1_score, precision_score, recall_score, roc_auc_score, ) # Basic metrics accuracy = accuracy_score(y_true, y_pred) precision = precision_score( y_true, y_pred, average="weighted", zero_division=0 ) recall = recall_score( y_true, y_pred, average="weighted", zero_division=0 ) f1 = f1_score(y_true, y_pred, average="weighted", zero_division=0) # Add metrics metrics.extend( [ self._create_metric( model_name, "accuracy", accuracy, model_type, dataset_type, current_time, ), self._create_metric( model_name, "precision", precision, model_type, dataset_type, current_time, ), self._create_metric( model_name, "recall", recall, model_type, dataset_type, current_time, ), self._create_metric( model_name, "f1", f1, model_type, dataset_type, current_time ), ] ) # AUC if binary classification if len(np.unique(y_true)) == 2: try: auc = roc_auc_score(y_true, y_pred) metrics.append( self._create_metric( model_name, "auc", auc, model_type, dataset_type, current_time, ) ) except Exception: pass # Confusion matrix details cm = confusion_matrix(y_true, y_pred) details = { "confusion_matrix": cm.tolist(), "true_positives": int(cm[1, 1]) if cm.shape[0] > 1 else 0, "false_positives": int(cm[0, 1]) if cm.shape[0] > 1 else 0, "true_negatives": int(cm[0, 0]) if cm.shape[0] > 1 else 0, "false_negatives": int(cm[1, 0]) if cm.shape[0] > 1 else 0, "total_predictions": len(y_pred), "unique_classes": len(np.unique(y_true)), } elif model_type == "regression": # Regression metrics from sklearn.metrics import ( mean_absolute_error, mean_squared_error, r2_score, ) mse = mean_squared_error(y_true, y_pred) rmse = np.sqrt(mse) mae = mean_absolute_error(y_true, y_pred) r2 = r2_score(y_true, y_pred) metrics.extend( [ self._create_metric( model_name, "rmse", rmse, model_type, dataset_type, current_time, ), self._create_metric( model_name, "mae", mae, model_type, dataset_type, current_time, ), self._create_metric( model_name, "r2", r2, model_type, dataset_type, current_time ), ] ) details = { "mse": mse, "total_predictions": len(y_pred), "target_range": [float(np.min(y_true)), float(np.max(y_true))], "prediction_range": [float(np.min(y_pred)), float(np.max(y_pred))], } elif model_type == "anomaly_detection": # Anomaly detection metrics from sklearn.metrics import f1_score, precision_score, recall_score precision = precision_score(y_true, y_pred, zero_division=0) recall = recall_score(y_true, y_pred, zero_division=0) f1 = f1_score(y_true, y_pred, zero_division=0) metrics.extend( [ self._create_metric( model_name, "precision", precision, model_type, dataset_type, current_time, ), self._create_metric( model_name, "recall", recall, model_type, dataset_type, current_time, ), self._create_metric( model_name, "f1", f1, model_type, dataset_type, current_time ), ] ) details = { "total_predictions": len(y_pred), "anomalies_detected": int(np.sum(y_pred)), "actual_anomalies": int(np.sum(y_true)), "true_positives": int(np.sum((y_pred == 1) & (y_true == 1))), "false_positives": int(np.sum((y_pred == 1) & (y_true == 0))), } # Add details to all metrics for metric in metrics: metric.details.update(details) # Store metrics self.performance_metrics.extend(metrics) except Exception as e: # Add error metric metrics.append( ModelPerformanceMetric( model_name=model_name, metric_type="error", value=0.0, threshold=None, status="critical", timestamp=current_time, dataset_type=dataset_type, details={"error": str(e)}, ) ) return metrics def _create_metric( self, model_name: str, metric_type: str, value: float, model_type: str, dataset_type: str, timestamp: datetime, ) -> ModelPerformanceMetric: """Create a performance metric with status determination""" # Get threshold for this metric type thresholds = self.thresholds.get(model_type, {}).get(metric_type, {}) if thresholds: if value >= thresholds.get("excellent", 1.0): status = "excellent" elif value >= thresholds.get("good", 0.8): status = "good" elif value >= thresholds.get("acceptable", 0.6): status = "acceptable" elif value >= thresholds.get("poor", 0.4): status = "poor" else: status = "critical" threshold = thresholds.get("acceptable", 0.5) else: status = "unknown" threshold = None return ModelPerformanceMetric( model_name=model_name, metric_type=metric_type, value=value, threshold=threshold, status=status, timestamp=timestamp, dataset_type=dataset_type, details={}, ) async def detect_model_drift( self, model_name: str, current_predictions: List[Any], current_actuals: List[Any], baseline_predictions: List[Any] = None, baseline_actuals: List[Any] = None, ) -> ModelDriftAnalysis: """Detect model drift using various techniques""" current_time = datetime.now() try: # Get current performance current_metrics = await self.analyze_model_performance( model_name, "classification", current_predictions, current_actuals, "production", ) # Get baseline performance if baseline_predictions and baseline_actuals: baseline_metrics = await self.analyze_model_performance( model_name, "classification", baseline_predictions, baseline_actuals, "training", ) else: # Use stored baseline if available baseline_metrics = [ m for m in self.performance_metrics if m.model_name == model_name and m.dataset_type == "training" ] if not baseline_metrics: return ModelDriftAnalysis( model_name=model_name, drift_detected=False, drift_type="no_baseline", drift_score=0.0, baseline_performance=0.0, current_performance=0.0, performance_drop=0.0, timestamp=current_time, recommendations=["Establish baseline performance metrics"], ) # Calculate performance drop baseline_perf = np.mean( [ m.value for m in baseline_metrics if m.metric_type in ["accuracy", "f1", "r2"] ] ) current_perf = np.mean( [ m.value for m in current_metrics if m.metric_type in ["accuracy", "f1", "r2"] ] ) performance_drop = ( (baseline_perf - current_perf) / baseline_perf if baseline_perf > 0 else 0 ) # Detect performance drift performance_drift = ( performance_drop > self.drift_thresholds["performance_drop"] ) # Detect data drift (simplified - in real implementation would use statistical tests) data_drift_score = self._calculate_data_drift( current_predictions, baseline_predictions ) data_drift = data_drift_score > self.drift_thresholds["data_drift"] # Detect concept drift concept_drift_score = self._calculate_concept_drift( current_predictions, current_actuals, baseline_predictions, baseline_actuals, ) concept_drift = concept_drift_score > self.drift_thresholds["concept_drift"] # Determine overall drift drift_detected = performance_drift or data_drift or concept_drift # Determine drift type if concept_drift: drift_type = "concept_drift" elif data_drift: drift_type = "data_drift" elif performance_drift: drift_type = "performance_drift" else: drift_type = "no_drift" # Calculate overall drift score drift_score = max(performance_drop, data_drift_score, concept_drift_score) # Generate recommendations recommendations = [] if drift_detected: if performance_drift: recommendations.append("Retrain model with recent data") if data_drift: recommendations.append("Update data preprocessing pipeline") if concept_drift: recommendations.append( "Review model features and target definition" ) recommendations.append("Monitor model performance closely") recommendations.append("Consider ensemble methods for robustness") return ModelDriftAnalysis( model_name=model_name, drift_detected=drift_detected, drift_type=drift_type, drift_score=drift_score, baseline_performance=baseline_perf, current_performance=current_perf, performance_drop=performance_drop, timestamp=current_time, recommendations=recommendations, ) except Exception as e: return ModelDriftAnalysis( model_name=model_name, drift_detected=False, drift_type="error", drift_score=0.0, baseline_performance=0.0, current_performance=0.0, performance_drop=0.0, timestamp=current_time, recommendations=[f"Error in drift detection: {str(e)}"], ) def _calculate_data_drift( self, current_data: List[Any], baseline_data: List[Any] ) -> float: """Calculate data drift score (simplified)""" try: if not baseline_data: return 0.0 # Convert to numpy arrays current_arr = np.array(current_data) baseline_arr = np.array(baseline_data) # Simple statistical comparison current_mean = np.mean(current_arr) baseline_mean = np.mean(baseline_arr) current_std = np.std(current_arr) baseline_std = np.std(baseline_arr) # Calculate drift score based on mean and std differences mean_diff = abs(current_mean - baseline_mean) / (abs(baseline_mean) + 1e-8) std_diff = abs(current_std - baseline_std) / (abs(baseline_std) + 1e-8) drift_score = (mean_diff + std_diff) / 2 return min(drift_score, 1.0) except Exception: return 0.0 def _calculate_concept_drift( self, current_pred: List[Any], current_true: List[Any], baseline_pred: List[Any], baseline_true: List[Any], ) -> float: """Calculate concept drift score (simplified)""" try: if not baseline_pred or not baseline_true: return 0.0 # Calculate error rates current_error = np.mean(np.array(current_pred) != np.array(current_true)) baseline_error = np.mean(np.array(baseline_pred) != np.array(baseline_true)) # Concept drift is change in error rate concept_drift = abs(current_error - baseline_error) return min(concept_drift, 1.0) except Exception: return 0.0 async def analyze_feature_importance( self, model_name: str, feature_names: List[str], feature_importance: List[float] ) -> Dict[str, Any]: """Analyze feature importance and stability""" try: # Sort features by importance feature_data = list(zip(feature_names, feature_importance)) feature_data.sort(key=lambda x: x[1], reverse=True) # Calculate statistics importance_values = [imp for _, imp in feature_data] analysis = { "model_name": model_name, "total_features": len(feature_names), "top_features": feature_data[:10], "feature_importance_stats": { "mean": np.mean(importance_values), "std": np.std(importance_values), "min": np.min(importance_values), "max": np.max(importance_values), "median": np.median(importance_values), }, "importance_distribution": { "high_importance": len( [imp for imp in importance_values if imp > 0.1] ), "medium_importance": len( [imp for imp in importance_values if 0.05 <= imp <= 0.1] ), "low_importance": len( [imp for imp in importance_values if imp < 0.05] ), }, "recommendations": [], } # Generate recommendations if ( analysis["importance_distribution"]["low_importance"] > len(feature_names) * 0.5 ): analysis["recommendations"].append( "Consider feature selection to reduce dimensionality" ) if analysis["feature_importance_stats"]["std"] > 0.3: analysis["recommendations"].append( "Feature importance is highly skewed - consider regularization" ) if len(feature_names) > 100: analysis["recommendations"].append( "High-dimensional data - consider dimensionality reduction" ) return analysis except Exception as e: return { "model_name": model_name, "error": str(e), "recommendations": ["Fix feature importance analysis"], } async def generate_model_performance_report(self) -> Dict[str, Any]: """Generate comprehensive ML model performance report""" current_time = datetime.now() # Group metrics by model models = {} for metric in self.performance_metrics: if metric.model_name not in models: models[metric.model_name] = [] models[metric.model_name].append(metric) # Analyze each model model_analyses = {} for model_name, model_metrics in models.items(): # Group by dataset type training_metrics = [ m for m in model_metrics if m.dataset_type == "training" ] validation_metrics = [ m for m in model_metrics if m.dataset_type == "validation" ] test_metrics = [m for m in model_metrics if m.dataset_type == "test"] production_metrics = [ m for m in model_metrics if m.dataset_type == "production" ] # Calculate overall scores def calculate_score(metrics_list): if not metrics_list: return 0.0 score_map = { "excellent": 100, "good": 80, "acceptable": 60, "poor": 40, "critical": 20, "unknown": 50, } scores = [score_map.get(m.status, 50) for m in metrics_list] return np.mean(scores) model_analyses[model_name] = { "overall_score": calculate_score(model_metrics), "training_score": calculate_score(training_metrics), "validation_score": calculate_score(validation_metrics), "test_score": calculate_score(test_metrics), "production_score": calculate_score(production_metrics), "metrics_count": { "total": len(model_metrics), "training": len(training_metrics), "validation": len(validation_metrics), "test": len(test_metrics), "production": len(production_metrics), }, "latest_metrics": { metric.metric_type: metric.to_dict() for metric in sorted( model_metrics, key=lambda x: x.timestamp, reverse=True )[:10] }, "performance_trend": self._analyze_performance_trend(model_metrics), "critical_issues": [ m.to_dict() for m in model_metrics if m.status == "critical" ], } # Analyze drift across all models drift_summary = { "total_models_analyzed": len(self.drift_analyses), "models_with_drift": len( [d for d in self.drift_analyses if d.drift_detected] ), "drift_types": { drift_type: len( [d for d in self.drift_analyses if d.drift_type == drift_type] ) for drift_type in set(d.drift_type for d in self.drift_analyses) }, "recent_drift": [ d.to_dict() for d in sorted( self.drift_analyses, key=lambda x: x.timestamp, reverse=True )[:5] ], } # Generate recommendations recommendations = [] # Model-specific recommendations for model_name, analysis in model_analyses.items(): if analysis["production_score"] < 60: recommendations.append( f"CRITICAL: {model_name} performance is poor in production - immediate action required" ) elif analysis["production_score"] < 80: recommendations.append( f"WARNING: {model_name} performance degradation detected - consider retraining" ) if analysis["critical_issues"]: recommendations.append( f"Address {len(analysis['critical_issues'])} critical issues in {model_name}" ) # General recommendations if drift_summary["models_with_drift"] > 0: recommendations.append("Implement automated model retraining pipeline") if len(model_analyses) > 5: recommendations.append("Consider model ensemble for improved robustness") # Calculate overall ML system health overall_scores = [ analysis["overall_score"] for analysis in model_analyses.values() ] ml_system_health = np.mean(overall_scores) if overall_scores else 0.0 report = { "ml_system_health": ml_system_health, "analysis_timestamp": current_time.isoformat(), "summary": { "total_models": len(model_analyses), "models_with_critical_issues": len( [m for m in model_analyses.values() if m["critical_issues"]] ), "average_production_score": ( np.mean([m["production_score"] for m in model_analyses.values()]) if model_analyses else 0.0 ), "drift_detection_active": len(self.drift_analyses) > 0, }, "model_analyses": model_analyses, "drift_analysis": drift_summary, "recommendations": recommendations, "next_steps": [ "1. Address critical model performance issues immediately", "2. Implement automated model monitoring and drift detection", "3. Schedule regular model retraining based on drift detection", "4. Consider A/B testing for model improvements", "5. Establish model governance and versioning procedures", ], } return report def _analyze_performance_trend( self, metrics: List[ModelPerformanceMetric] ) -> Dict[str, Any]: """Analyze performance trend over time""" if len(metrics) < 2: return {"trend": "insufficient_data", "direction": "unknown"} # Sort by timestamp sorted_metrics = sorted(metrics, key=lambda x: x.timestamp) # Calculate trend for each metric type trends = {} for metric_type in set(m.metric_type for m in metrics): type_metrics = [m for m in sorted_metrics if m.metric_type == metric_type] if len(type_metrics) >= 2: # Simple linear regression values = [m.value for m in type_metrics] n = len(values) x = list(range(n)) # Calculate slope sum_x = sum(x) sum_y = sum(values) sum_xy = sum(x[i] * values[i] for i in range(n)) sum_x2 = sum(x[i] * x[i] for i in range(n)) if n * sum_x2 - sum_x * sum_x != 0: slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x) if abs(slope) < 0.01: trend = "stable" elif slope > 0: trend = "improving" else: trend = "degrading" trends[metric_type] = { "trend": trend, "slope": slope, "recent_value": values[-1], "change_percent": ( ((values[-1] - values[0]) / values[0] * 100) if values[0] != 0 else 0 ), } return trends # Global ML performance analyzer instance ml_performance_analyzer = MLModelPerformanceAnalyzer()