zenith-backend / core /diagnostics /ml_performance_analyzer.py
teoat's picture
Upload folder using huggingface_hub
4ae946d verified
#!/usr/bin/env python3
"""
ML Model Performance Analyzer
Comprehensive machine learning model performance monitoring and analysis
"""
from dataclasses import asdict, dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional
import numpy as np
@dataclass
class ModelPerformanceMetric:
"""ML model performance metric data structure"""
model_name: str
metric_type: str # accuracy, precision, recall, f1, auc, rmse, mae, r2
value: float
threshold: Optional[float]
status: str # excellent, good, acceptable, poor, critical
timestamp: datetime
dataset_type: str # training, validation, test, production
details: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization"""
result = asdict(self)
result["timestamp"] = self.timestamp.isoformat()
return result
@dataclass
class ModelDriftAnalysis:
"""Model drift analysis results"""
model_name: str
drift_detected: bool
drift_type: str # concept_drift, data_drift, covariate_shift
drift_score: float
baseline_performance: float
current_performance: float
performance_drop: float
timestamp: datetime
recommendations: List[str]
def to_dict(self) -> Dict[str, Any]:
result = asdict(self)
result["timestamp"] = self.timestamp.isoformat()
return result
class MLModelPerformanceAnalyzer:
"""Comprehensive ML model performance monitoring and analysis"""
def __init__(self):
self.start_time = datetime.now()
self.performance_metrics = []
self.drift_analyses = []
self.model_baselines = {}
# Performance thresholds for different model types
self.thresholds = {
"classification": {
"accuracy": {
"excellent": 0.95,
"good": 0.90,
"acceptable": 0.80,
"poor": 0.70,
},
"precision": {
"excellent": 0.95,
"good": 0.90,
"acceptable": 0.80,
"poor": 0.70,
},
"recall": {
"excellent": 0.95,
"good": 0.90,
"acceptable": 0.80,
"poor": 0.70,
},
"f1": {
"excellent": 0.95,
"good": 0.90,
"acceptable": 0.80,
"poor": 0.70,
},
"auc": {
"excellent": 0.95,
"good": 0.90,
"acceptable": 0.80,
"poor": 0.70,
},
},
"regression": {
"rmse": {
"excellent": 0.05,
"good": 0.10,
"acceptable": 0.20,
"poor": 0.30,
},
"mae": {
"excellent": 0.05,
"good": 0.10,
"acceptable": 0.20,
"poor": 0.30,
},
"r2": {
"excellent": 0.95,
"good": 0.90,
"acceptable": 0.80,
"poor": 0.70,
},
},
"anomaly_detection": {
"precision": {
"excellent": 0.90,
"good": 0.80,
"acceptable": 0.70,
"poor": 0.60,
},
"recall": {
"excellent": 0.90,
"good": 0.80,
"acceptable": 0.70,
"poor": 0.60,
},
"f1": {
"excellent": 0.90,
"good": 0.80,
"acceptable": 0.70,
"poor": 0.60,
},
},
}
# Drift detection thresholds
self.drift_thresholds = {
"performance_drop": 0.10, # 10% drop in performance
"data_drift": 0.15, # 15% data distribution change
"concept_drift": 0.20, # 20% concept change
}
async def analyze_model_performance(
self,
model_name: str,
model_type: str,
predictions: List[Any],
actuals: List[Any],
dataset_type: str = "production",
) -> List[ModelPerformanceMetric]:
"""Analyze model performance metrics"""
metrics = []
current_time = datetime.now()
try:
# Convert to numpy arrays for calculations
y_pred = np.array(predictions)
y_true = np.array(actuals)
if model_type == "classification":
# Classification metrics
from sklearn.metrics import (
accuracy_score,
confusion_matrix,
f1_score,
precision_score,
recall_score,
roc_auc_score,
)
# Basic metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(
y_true, y_pred, average="weighted", zero_division=0
)
recall = recall_score(
y_true, y_pred, average="weighted", zero_division=0
)
f1 = f1_score(y_true, y_pred, average="weighted", zero_division=0)
# Add metrics
metrics.extend(
[
self._create_metric(
model_name,
"accuracy",
accuracy,
model_type,
dataset_type,
current_time,
),
self._create_metric(
model_name,
"precision",
precision,
model_type,
dataset_type,
current_time,
),
self._create_metric(
model_name,
"recall",
recall,
model_type,
dataset_type,
current_time,
),
self._create_metric(
model_name, "f1", f1, model_type, dataset_type, current_time
),
]
)
# AUC if binary classification
if len(np.unique(y_true)) == 2:
try:
auc = roc_auc_score(y_true, y_pred)
metrics.append(
self._create_metric(
model_name,
"auc",
auc,
model_type,
dataset_type,
current_time,
)
)
except Exception:
pass
# Confusion matrix details
cm = confusion_matrix(y_true, y_pred)
details = {
"confusion_matrix": cm.tolist(),
"true_positives": int(cm[1, 1]) if cm.shape[0] > 1 else 0,
"false_positives": int(cm[0, 1]) if cm.shape[0] > 1 else 0,
"true_negatives": int(cm[0, 0]) if cm.shape[0] > 1 else 0,
"false_negatives": int(cm[1, 0]) if cm.shape[0] > 1 else 0,
"total_predictions": len(y_pred),
"unique_classes": len(np.unique(y_true)),
}
elif model_type == "regression":
# Regression metrics
from sklearn.metrics import (
mean_absolute_error,
mean_squared_error,
r2_score,
)
mse = mean_squared_error(y_true, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_true, y_pred)
r2 = r2_score(y_true, y_pred)
metrics.extend(
[
self._create_metric(
model_name,
"rmse",
rmse,
model_type,
dataset_type,
current_time,
),
self._create_metric(
model_name,
"mae",
mae,
model_type,
dataset_type,
current_time,
),
self._create_metric(
model_name, "r2", r2, model_type, dataset_type, current_time
),
]
)
details = {
"mse": mse,
"total_predictions": len(y_pred),
"target_range": [float(np.min(y_true)), float(np.max(y_true))],
"prediction_range": [float(np.min(y_pred)), float(np.max(y_pred))],
}
elif model_type == "anomaly_detection":
# Anomaly detection metrics
from sklearn.metrics import f1_score, precision_score, recall_score
precision = precision_score(y_true, y_pred, zero_division=0)
recall = recall_score(y_true, y_pred, zero_division=0)
f1 = f1_score(y_true, y_pred, zero_division=0)
metrics.extend(
[
self._create_metric(
model_name,
"precision",
precision,
model_type,
dataset_type,
current_time,
),
self._create_metric(
model_name,
"recall",
recall,
model_type,
dataset_type,
current_time,
),
self._create_metric(
model_name, "f1", f1, model_type, dataset_type, current_time
),
]
)
details = {
"total_predictions": len(y_pred),
"anomalies_detected": int(np.sum(y_pred)),
"actual_anomalies": int(np.sum(y_true)),
"true_positives": int(np.sum((y_pred == 1) & (y_true == 1))),
"false_positives": int(np.sum((y_pred == 1) & (y_true == 0))),
}
# Add details to all metrics
for metric in metrics:
metric.details.update(details)
# Store metrics
self.performance_metrics.extend(metrics)
except Exception as e:
# Add error metric
metrics.append(
ModelPerformanceMetric(
model_name=model_name,
metric_type="error",
value=0.0,
threshold=None,
status="critical",
timestamp=current_time,
dataset_type=dataset_type,
details={"error": str(e)},
)
)
return metrics
def _create_metric(
self,
model_name: str,
metric_type: str,
value: float,
model_type: str,
dataset_type: str,
timestamp: datetime,
) -> ModelPerformanceMetric:
"""Create a performance metric with status determination"""
# Get threshold for this metric type
thresholds = self.thresholds.get(model_type, {}).get(metric_type, {})
if thresholds:
if value >= thresholds.get("excellent", 1.0):
status = "excellent"
elif value >= thresholds.get("good", 0.8):
status = "good"
elif value >= thresholds.get("acceptable", 0.6):
status = "acceptable"
elif value >= thresholds.get("poor", 0.4):
status = "poor"
else:
status = "critical"
threshold = thresholds.get("acceptable", 0.5)
else:
status = "unknown"
threshold = None
return ModelPerformanceMetric(
model_name=model_name,
metric_type=metric_type,
value=value,
threshold=threshold,
status=status,
timestamp=timestamp,
dataset_type=dataset_type,
details={},
)
async def detect_model_drift(
self,
model_name: str,
current_predictions: List[Any],
current_actuals: List[Any],
baseline_predictions: List[Any] = None,
baseline_actuals: List[Any] = None,
) -> ModelDriftAnalysis:
"""Detect model drift using various techniques"""
current_time = datetime.now()
try:
# Get current performance
current_metrics = await self.analyze_model_performance(
model_name,
"classification",
current_predictions,
current_actuals,
"production",
)
# Get baseline performance
if baseline_predictions and baseline_actuals:
baseline_metrics = await self.analyze_model_performance(
model_name,
"classification",
baseline_predictions,
baseline_actuals,
"training",
)
else:
# Use stored baseline if available
baseline_metrics = [
m
for m in self.performance_metrics
if m.model_name == model_name and m.dataset_type == "training"
]
if not baseline_metrics:
return ModelDriftAnalysis(
model_name=model_name,
drift_detected=False,
drift_type="no_baseline",
drift_score=0.0,
baseline_performance=0.0,
current_performance=0.0,
performance_drop=0.0,
timestamp=current_time,
recommendations=["Establish baseline performance metrics"],
)
# Calculate performance drop
baseline_perf = np.mean(
[
m.value
for m in baseline_metrics
if m.metric_type in ["accuracy", "f1", "r2"]
]
)
current_perf = np.mean(
[
m.value
for m in current_metrics
if m.metric_type in ["accuracy", "f1", "r2"]
]
)
performance_drop = (
(baseline_perf - current_perf) / baseline_perf
if baseline_perf > 0
else 0
)
# Detect performance drift
performance_drift = (
performance_drop > self.drift_thresholds["performance_drop"]
)
# Detect data drift (simplified - in real implementation would use statistical tests)
data_drift_score = self._calculate_data_drift(
current_predictions, baseline_predictions
)
data_drift = data_drift_score > self.drift_thresholds["data_drift"]
# Detect concept drift
concept_drift_score = self._calculate_concept_drift(
current_predictions,
current_actuals,
baseline_predictions,
baseline_actuals,
)
concept_drift = concept_drift_score > self.drift_thresholds["concept_drift"]
# Determine overall drift
drift_detected = performance_drift or data_drift or concept_drift
# Determine drift type
if concept_drift:
drift_type = "concept_drift"
elif data_drift:
drift_type = "data_drift"
elif performance_drift:
drift_type = "performance_drift"
else:
drift_type = "no_drift"
# Calculate overall drift score
drift_score = max(performance_drop, data_drift_score, concept_drift_score)
# Generate recommendations
recommendations = []
if drift_detected:
if performance_drift:
recommendations.append("Retrain model with recent data")
if data_drift:
recommendations.append("Update data preprocessing pipeline")
if concept_drift:
recommendations.append(
"Review model features and target definition"
)
recommendations.append("Monitor model performance closely")
recommendations.append("Consider ensemble methods for robustness")
return ModelDriftAnalysis(
model_name=model_name,
drift_detected=drift_detected,
drift_type=drift_type,
drift_score=drift_score,
baseline_performance=baseline_perf,
current_performance=current_perf,
performance_drop=performance_drop,
timestamp=current_time,
recommendations=recommendations,
)
except Exception as e:
return ModelDriftAnalysis(
model_name=model_name,
drift_detected=False,
drift_type="error",
drift_score=0.0,
baseline_performance=0.0,
current_performance=0.0,
performance_drop=0.0,
timestamp=current_time,
recommendations=[f"Error in drift detection: {str(e)}"],
)
def _calculate_data_drift(
self, current_data: List[Any], baseline_data: List[Any]
) -> float:
"""Calculate data drift score (simplified)"""
try:
if not baseline_data:
return 0.0
# Convert to numpy arrays
current_arr = np.array(current_data)
baseline_arr = np.array(baseline_data)
# Simple statistical comparison
current_mean = np.mean(current_arr)
baseline_mean = np.mean(baseline_arr)
current_std = np.std(current_arr)
baseline_std = np.std(baseline_arr)
# Calculate drift score based on mean and std differences
mean_diff = abs(current_mean - baseline_mean) / (abs(baseline_mean) + 1e-8)
std_diff = abs(current_std - baseline_std) / (abs(baseline_std) + 1e-8)
drift_score = (mean_diff + std_diff) / 2
return min(drift_score, 1.0)
except Exception:
return 0.0
def _calculate_concept_drift(
self,
current_pred: List[Any],
current_true: List[Any],
baseline_pred: List[Any],
baseline_true: List[Any],
) -> float:
"""Calculate concept drift score (simplified)"""
try:
if not baseline_pred or not baseline_true:
return 0.0
# Calculate error rates
current_error = np.mean(np.array(current_pred) != np.array(current_true))
baseline_error = np.mean(np.array(baseline_pred) != np.array(baseline_true))
# Concept drift is change in error rate
concept_drift = abs(current_error - baseline_error)
return min(concept_drift, 1.0)
except Exception:
return 0.0
async def analyze_feature_importance(
self, model_name: str, feature_names: List[str], feature_importance: List[float]
) -> Dict[str, Any]:
"""Analyze feature importance and stability"""
try:
# Sort features by importance
feature_data = list(zip(feature_names, feature_importance))
feature_data.sort(key=lambda x: x[1], reverse=True)
# Calculate statistics
importance_values = [imp for _, imp in feature_data]
analysis = {
"model_name": model_name,
"total_features": len(feature_names),
"top_features": feature_data[:10],
"feature_importance_stats": {
"mean": np.mean(importance_values),
"std": np.std(importance_values),
"min": np.min(importance_values),
"max": np.max(importance_values),
"median": np.median(importance_values),
},
"importance_distribution": {
"high_importance": len(
[imp for imp in importance_values if imp > 0.1]
),
"medium_importance": len(
[imp for imp in importance_values if 0.05 <= imp <= 0.1]
),
"low_importance": len(
[imp for imp in importance_values if imp < 0.05]
),
},
"recommendations": [],
}
# Generate recommendations
if (
analysis["importance_distribution"]["low_importance"]
> len(feature_names) * 0.5
):
analysis["recommendations"].append(
"Consider feature selection to reduce dimensionality"
)
if analysis["feature_importance_stats"]["std"] > 0.3:
analysis["recommendations"].append(
"Feature importance is highly skewed - consider regularization"
)
if len(feature_names) > 100:
analysis["recommendations"].append(
"High-dimensional data - consider dimensionality reduction"
)
return analysis
except Exception as e:
return {
"model_name": model_name,
"error": str(e),
"recommendations": ["Fix feature importance analysis"],
}
async def generate_model_performance_report(self) -> Dict[str, Any]:
"""Generate comprehensive ML model performance report"""
current_time = datetime.now()
# Group metrics by model
models = {}
for metric in self.performance_metrics:
if metric.model_name not in models:
models[metric.model_name] = []
models[metric.model_name].append(metric)
# Analyze each model
model_analyses = {}
for model_name, model_metrics in models.items():
# Group by dataset type
training_metrics = [
m for m in model_metrics if m.dataset_type == "training"
]
validation_metrics = [
m for m in model_metrics if m.dataset_type == "validation"
]
test_metrics = [m for m in model_metrics if m.dataset_type == "test"]
production_metrics = [
m for m in model_metrics if m.dataset_type == "production"
]
# Calculate overall scores
def calculate_score(metrics_list):
if not metrics_list:
return 0.0
score_map = {
"excellent": 100,
"good": 80,
"acceptable": 60,
"poor": 40,
"critical": 20,
"unknown": 50,
}
scores = [score_map.get(m.status, 50) for m in metrics_list]
return np.mean(scores)
model_analyses[model_name] = {
"overall_score": calculate_score(model_metrics),
"training_score": calculate_score(training_metrics),
"validation_score": calculate_score(validation_metrics),
"test_score": calculate_score(test_metrics),
"production_score": calculate_score(production_metrics),
"metrics_count": {
"total": len(model_metrics),
"training": len(training_metrics),
"validation": len(validation_metrics),
"test": len(test_metrics),
"production": len(production_metrics),
},
"latest_metrics": {
metric.metric_type: metric.to_dict()
for metric in sorted(
model_metrics, key=lambda x: x.timestamp, reverse=True
)[:10]
},
"performance_trend": self._analyze_performance_trend(model_metrics),
"critical_issues": [
m.to_dict() for m in model_metrics if m.status == "critical"
],
}
# Analyze drift across all models
drift_summary = {
"total_models_analyzed": len(self.drift_analyses),
"models_with_drift": len(
[d for d in self.drift_analyses if d.drift_detected]
),
"drift_types": {
drift_type: len(
[d for d in self.drift_analyses if d.drift_type == drift_type]
)
for drift_type in set(d.drift_type for d in self.drift_analyses)
},
"recent_drift": [
d.to_dict()
for d in sorted(
self.drift_analyses, key=lambda x: x.timestamp, reverse=True
)[:5]
],
}
# Generate recommendations
recommendations = []
# Model-specific recommendations
for model_name, analysis in model_analyses.items():
if analysis["production_score"] < 60:
recommendations.append(
f"CRITICAL: {model_name} performance is poor in production - immediate action required"
)
elif analysis["production_score"] < 80:
recommendations.append(
f"WARNING: {model_name} performance degradation detected - consider retraining"
)
if analysis["critical_issues"]:
recommendations.append(
f"Address {len(analysis['critical_issues'])} critical issues in {model_name}"
)
# General recommendations
if drift_summary["models_with_drift"] > 0:
recommendations.append("Implement automated model retraining pipeline")
if len(model_analyses) > 5:
recommendations.append("Consider model ensemble for improved robustness")
# Calculate overall ML system health
overall_scores = [
analysis["overall_score"] for analysis in model_analyses.values()
]
ml_system_health = np.mean(overall_scores) if overall_scores else 0.0
report = {
"ml_system_health": ml_system_health,
"analysis_timestamp": current_time.isoformat(),
"summary": {
"total_models": len(model_analyses),
"models_with_critical_issues": len(
[m for m in model_analyses.values() if m["critical_issues"]]
),
"average_production_score": (
np.mean([m["production_score"] for m in model_analyses.values()])
if model_analyses
else 0.0
),
"drift_detection_active": len(self.drift_analyses) > 0,
},
"model_analyses": model_analyses,
"drift_analysis": drift_summary,
"recommendations": recommendations,
"next_steps": [
"1. Address critical model performance issues immediately",
"2. Implement automated model monitoring and drift detection",
"3. Schedule regular model retraining based on drift detection",
"4. Consider A/B testing for model improvements",
"5. Establish model governance and versioning procedures",
],
}
return report
def _analyze_performance_trend(
self, metrics: List[ModelPerformanceMetric]
) -> Dict[str, Any]:
"""Analyze performance trend over time"""
if len(metrics) < 2:
return {"trend": "insufficient_data", "direction": "unknown"}
# Sort by timestamp
sorted_metrics = sorted(metrics, key=lambda x: x.timestamp)
# Calculate trend for each metric type
trends = {}
for metric_type in set(m.metric_type for m in metrics):
type_metrics = [m for m in sorted_metrics if m.metric_type == metric_type]
if len(type_metrics) >= 2:
# Simple linear regression
values = [m.value for m in type_metrics]
n = len(values)
x = list(range(n))
# Calculate slope
sum_x = sum(x)
sum_y = sum(values)
sum_xy = sum(x[i] * values[i] for i in range(n))
sum_x2 = sum(x[i] * x[i] for i in range(n))
if n * sum_x2 - sum_x * sum_x != 0:
slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x)
if abs(slope) < 0.01:
trend = "stable"
elif slope > 0:
trend = "improving"
else:
trend = "degrading"
trends[metric_type] = {
"trend": trend,
"slope": slope,
"recent_value": values[-1],
"change_percent": (
((values[-1] - values[0]) / values[0] * 100)
if values[0] != 0
else 0
),
}
return trends
# Global ML performance analyzer instance
ml_performance_analyzer = MLModelPerformanceAnalyzer()