multilabel-news-classifier / monitoring /performance_monitor.py
Solareva Taisia
chore(release): initial public snapshot
198ccb0
"""Model performance monitoring utilities."""
import logging
from typing import Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from collections import defaultdict
import json
from pathlib import Path
logger = logging.getLogger(__name__)
class PerformanceMonitor:
"""
Monitor model performance over time.
Tracks metrics, detects performance degradation, and generates alerts.
"""
def __init__(
self,
metrics_file: str = "monitoring/performance_metrics.json",
window_size: int = 100,
threshold_drop: float = 0.05,
):
"""
Initialize performance monitor.
Args:
metrics_file: Path to store metrics
window_size: Number of predictions to track
threshold_drop: Threshold for performance drop alert (5%)
"""
self.metrics_file = Path(metrics_file)
self.metrics_file.parent.mkdir(parents=True, exist_ok=True)
self.window_size = window_size
self.threshold_drop = threshold_drop
# Load existing metrics
self.metrics_history = self._load_metrics()
logger.info(f"PerformanceMonitor initialized: window_size={window_size}")
def _load_metrics(self) -> List[Dict]:
"""Load metrics from file."""
if self.metrics_file.exists():
try:
with open(self.metrics_file) as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load metrics: {e}")
return []
return []
def _save_metrics(self) -> None:
"""Save metrics to file."""
try:
with open(self.metrics_file, 'w') as f:
json.dump(self.metrics_history, f, indent=2, default=str)
except Exception as e:
logger.error(f"Failed to save metrics: {e}")
def record_prediction(
self,
prediction: Dict,
ground_truth: Optional[Dict] = None,
metadata: Optional[Dict] = None,
) -> None:
"""
Record a prediction for monitoring.
Args:
prediction: Prediction results (tags, scores)
ground_truth: Optional ground truth labels
metadata: Optional metadata (timestamp, model_version, etc.)
"""
record = {
"timestamp": datetime.now().isoformat(),
"prediction": prediction,
"ground_truth": ground_truth,
"metadata": metadata or {},
}
# Calculate metrics if ground truth available
if ground_truth:
metrics = self._calculate_metrics(prediction, ground_truth)
record["metrics"] = metrics
self.metrics_history.append(record)
# Keep only recent records
if len(self.metrics_history) > self.window_size * 2:
self.metrics_history = self.metrics_history[-self.window_size:]
self._save_metrics()
def _calculate_metrics(
self,
prediction: Dict,
ground_truth: Dict,
) -> Dict[str, float]:
"""
Calculate performance metrics.
Args:
prediction: Prediction results
ground_truth: Ground truth labels
Returns:
Dictionary of metrics
"""
pred_tags = set(prediction.get("tags", []))
true_tags = set(ground_truth.get("tags", []))
if not true_tags:
return {}
# Calculate precision, recall, F1
tp = len(pred_tags & true_tags)
fp = len(pred_tags - true_tags)
fn = len(true_tags - pred_tags)
precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0
# Exact match
exact_match = 1.0 if pred_tags == true_tags else 0.0
return {
"precision": precision,
"recall": recall,
"f1": f1,
"exact_match": exact_match,
"tp": tp,
"fp": fp,
"fn": fn,
}
def get_recent_metrics(
self,
window: Optional[int] = None,
) -> Dict[str, float]:
"""
Get recent performance metrics.
Args:
window: Number of recent predictions to analyze
Returns:
Dictionary of average metrics
"""
window = window or self.window_size
recent = self.metrics_history[-window:]
if not recent:
return {}
# Filter records with metrics
records_with_metrics = [r for r in recent if "metrics" in r]
if not records_with_metrics:
return {}
# Calculate averages
metrics_list = [r["metrics"] for r in records_with_metrics]
return {
"precision": np.mean([m["precision"] for m in metrics_list]),
"recall": np.mean([m["recall"] for m in metrics_list]),
"f1": np.mean([m["f1"] for m in metrics_list]),
"exact_match": np.mean([m["exact_match"] for m in metrics_list]),
"count": len(records_with_metrics),
}
def check_performance_degradation(
self,
baseline_metrics: Optional[Dict[str, float]] = None,
) -> Tuple[bool, Dict]:
"""
Check for performance degradation.
Args:
baseline_metrics: Baseline metrics to compare against
Returns:
Tuple of (is_degraded, degradation_info)
"""
recent_metrics = self.get_recent_metrics()
if not recent_metrics or not baseline_metrics:
return False, {}
degradation_info = {}
is_degraded = False
for metric in ["precision", "recall", "f1"]:
if metric in baseline_metrics and metric in recent_metrics:
drop = baseline_metrics[metric] - recent_metrics[metric]
drop_pct = drop / baseline_metrics[metric] if baseline_metrics[metric] > 0 else 0
degradation_info[metric] = {
"baseline": baseline_metrics[metric],
"current": recent_metrics[metric],
"drop": drop,
"drop_pct": drop_pct,
}
if drop_pct > self.threshold_drop:
is_degraded = True
logger.warning(
f"Performance degradation detected: {metric} dropped by "
f"{drop_pct:.2%} ({baseline_metrics[metric]:.3f} -> {recent_metrics[metric]:.3f})"
)
return is_degraded, degradation_info
def get_performance_trends(
self,
metric: str = "f1",
days: int = 7,
) -> pd.DataFrame:
"""
Get performance trends over time.
Args:
metric: Metric to track
days: Number of days to analyze
Returns:
DataFrame with trends
"""
cutoff_date = datetime.now() - timedelta(days=days)
filtered = [
r for r in self.metrics_history
if datetime.fromisoformat(r["timestamp"]) >= cutoff_date
and "metrics" in r
and metric in r["metrics"]
]
if not filtered:
return pd.DataFrame()
data = {
"timestamp": [r["timestamp"] for r in filtered],
metric: [r["metrics"][metric] for r in filtered],
}
df = pd.DataFrame(data)
df["timestamp"] = pd.to_datetime(df["timestamp"])
return df
def generate_report(self) -> Dict:
"""
Generate performance monitoring report.
Returns:
Dictionary with report data
"""
recent_metrics = self.get_recent_metrics()
trends = self.get_performance_trends()
report = {
"timestamp": datetime.now().isoformat(),
"recent_metrics": recent_metrics,
"total_predictions": len(self.metrics_history),
"predictions_with_metrics": len([r for r in self.metrics_history if "metrics" in r]),
}
if not trends.empty:
report["trends"] = {
"f1_mean": trends["f1"].mean() if "f1" in trends.columns else None,
"f1_std": trends["f1"].std() if "f1" in trends.columns else None,
"f1_min": trends["f1"].min() if "f1" in trends.columns else None,
"f1_max": trends["f1"].max() if "f1" in trends.columns else None,
}
return report