|
|
"""Model performance monitoring utilities.""" |
|
|
|
|
|
import logging |
|
|
from typing import Dict, List, Optional, Tuple |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from datetime import datetime, timedelta |
|
|
from collections import defaultdict |
|
|
import json |
|
|
from pathlib import Path |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class PerformanceMonitor: |
|
|
""" |
|
|
Monitor model performance over time. |
|
|
|
|
|
Tracks metrics, detects performance degradation, and generates alerts. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
metrics_file: str = "monitoring/performance_metrics.json", |
|
|
window_size: int = 100, |
|
|
threshold_drop: float = 0.05, |
|
|
): |
|
|
""" |
|
|
Initialize performance monitor. |
|
|
|
|
|
Args: |
|
|
metrics_file: Path to store metrics |
|
|
window_size: Number of predictions to track |
|
|
threshold_drop: Threshold for performance drop alert (5%) |
|
|
""" |
|
|
self.metrics_file = Path(metrics_file) |
|
|
self.metrics_file.parent.mkdir(parents=True, exist_ok=True) |
|
|
self.window_size = window_size |
|
|
self.threshold_drop = threshold_drop |
|
|
|
|
|
|
|
|
self.metrics_history = self._load_metrics() |
|
|
|
|
|
logger.info(f"PerformanceMonitor initialized: window_size={window_size}") |
|
|
|
|
|
def _load_metrics(self) -> List[Dict]: |
|
|
"""Load metrics from file.""" |
|
|
if self.metrics_file.exists(): |
|
|
try: |
|
|
with open(self.metrics_file) as f: |
|
|
return json.load(f) |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to load metrics: {e}") |
|
|
return [] |
|
|
return [] |
|
|
|
|
|
def _save_metrics(self) -> None: |
|
|
"""Save metrics to file.""" |
|
|
try: |
|
|
with open(self.metrics_file, 'w') as f: |
|
|
json.dump(self.metrics_history, f, indent=2, default=str) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to save metrics: {e}") |
|
|
|
|
|
def record_prediction( |
|
|
self, |
|
|
prediction: Dict, |
|
|
ground_truth: Optional[Dict] = None, |
|
|
metadata: Optional[Dict] = None, |
|
|
) -> None: |
|
|
""" |
|
|
Record a prediction for monitoring. |
|
|
|
|
|
Args: |
|
|
prediction: Prediction results (tags, scores) |
|
|
ground_truth: Optional ground truth labels |
|
|
metadata: Optional metadata (timestamp, model_version, etc.) |
|
|
""" |
|
|
record = { |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"prediction": prediction, |
|
|
"ground_truth": ground_truth, |
|
|
"metadata": metadata or {}, |
|
|
} |
|
|
|
|
|
|
|
|
if ground_truth: |
|
|
metrics = self._calculate_metrics(prediction, ground_truth) |
|
|
record["metrics"] = metrics |
|
|
|
|
|
self.metrics_history.append(record) |
|
|
|
|
|
|
|
|
if len(self.metrics_history) > self.window_size * 2: |
|
|
self.metrics_history = self.metrics_history[-self.window_size:] |
|
|
|
|
|
self._save_metrics() |
|
|
|
|
|
def _calculate_metrics( |
|
|
self, |
|
|
prediction: Dict, |
|
|
ground_truth: Dict, |
|
|
) -> Dict[str, float]: |
|
|
""" |
|
|
Calculate performance metrics. |
|
|
|
|
|
Args: |
|
|
prediction: Prediction results |
|
|
ground_truth: Ground truth labels |
|
|
|
|
|
Returns: |
|
|
Dictionary of metrics |
|
|
""" |
|
|
pred_tags = set(prediction.get("tags", [])) |
|
|
true_tags = set(ground_truth.get("tags", [])) |
|
|
|
|
|
if not true_tags: |
|
|
return {} |
|
|
|
|
|
|
|
|
tp = len(pred_tags & true_tags) |
|
|
fp = len(pred_tags - true_tags) |
|
|
fn = len(true_tags - pred_tags) |
|
|
|
|
|
precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0 |
|
|
recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0 |
|
|
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0 |
|
|
|
|
|
|
|
|
exact_match = 1.0 if pred_tags == true_tags else 0.0 |
|
|
|
|
|
return { |
|
|
"precision": precision, |
|
|
"recall": recall, |
|
|
"f1": f1, |
|
|
"exact_match": exact_match, |
|
|
"tp": tp, |
|
|
"fp": fp, |
|
|
"fn": fn, |
|
|
} |
|
|
|
|
|
def get_recent_metrics( |
|
|
self, |
|
|
window: Optional[int] = None, |
|
|
) -> Dict[str, float]: |
|
|
""" |
|
|
Get recent performance metrics. |
|
|
|
|
|
Args: |
|
|
window: Number of recent predictions to analyze |
|
|
|
|
|
Returns: |
|
|
Dictionary of average metrics |
|
|
""" |
|
|
window = window or self.window_size |
|
|
recent = self.metrics_history[-window:] |
|
|
|
|
|
if not recent: |
|
|
return {} |
|
|
|
|
|
|
|
|
records_with_metrics = [r for r in recent if "metrics" in r] |
|
|
|
|
|
if not records_with_metrics: |
|
|
return {} |
|
|
|
|
|
|
|
|
metrics_list = [r["metrics"] for r in records_with_metrics] |
|
|
|
|
|
return { |
|
|
"precision": np.mean([m["precision"] for m in metrics_list]), |
|
|
"recall": np.mean([m["recall"] for m in metrics_list]), |
|
|
"f1": np.mean([m["f1"] for m in metrics_list]), |
|
|
"exact_match": np.mean([m["exact_match"] for m in metrics_list]), |
|
|
"count": len(records_with_metrics), |
|
|
} |
|
|
|
|
|
def check_performance_degradation( |
|
|
self, |
|
|
baseline_metrics: Optional[Dict[str, float]] = None, |
|
|
) -> Tuple[bool, Dict]: |
|
|
""" |
|
|
Check for performance degradation. |
|
|
|
|
|
Args: |
|
|
baseline_metrics: Baseline metrics to compare against |
|
|
|
|
|
Returns: |
|
|
Tuple of (is_degraded, degradation_info) |
|
|
""" |
|
|
recent_metrics = self.get_recent_metrics() |
|
|
|
|
|
if not recent_metrics or not baseline_metrics: |
|
|
return False, {} |
|
|
|
|
|
degradation_info = {} |
|
|
is_degraded = False |
|
|
|
|
|
for metric in ["precision", "recall", "f1"]: |
|
|
if metric in baseline_metrics and metric in recent_metrics: |
|
|
drop = baseline_metrics[metric] - recent_metrics[metric] |
|
|
drop_pct = drop / baseline_metrics[metric] if baseline_metrics[metric] > 0 else 0 |
|
|
|
|
|
degradation_info[metric] = { |
|
|
"baseline": baseline_metrics[metric], |
|
|
"current": recent_metrics[metric], |
|
|
"drop": drop, |
|
|
"drop_pct": drop_pct, |
|
|
} |
|
|
|
|
|
if drop_pct > self.threshold_drop: |
|
|
is_degraded = True |
|
|
logger.warning( |
|
|
f"Performance degradation detected: {metric} dropped by " |
|
|
f"{drop_pct:.2%} ({baseline_metrics[metric]:.3f} -> {recent_metrics[metric]:.3f})" |
|
|
) |
|
|
|
|
|
return is_degraded, degradation_info |
|
|
|
|
|
def get_performance_trends( |
|
|
self, |
|
|
metric: str = "f1", |
|
|
days: int = 7, |
|
|
) -> pd.DataFrame: |
|
|
""" |
|
|
Get performance trends over time. |
|
|
|
|
|
Args: |
|
|
metric: Metric to track |
|
|
days: Number of days to analyze |
|
|
|
|
|
Returns: |
|
|
DataFrame with trends |
|
|
""" |
|
|
cutoff_date = datetime.now() - timedelta(days=days) |
|
|
|
|
|
filtered = [ |
|
|
r for r in self.metrics_history |
|
|
if datetime.fromisoformat(r["timestamp"]) >= cutoff_date |
|
|
and "metrics" in r |
|
|
and metric in r["metrics"] |
|
|
] |
|
|
|
|
|
if not filtered: |
|
|
return pd.DataFrame() |
|
|
|
|
|
data = { |
|
|
"timestamp": [r["timestamp"] for r in filtered], |
|
|
metric: [r["metrics"][metric] for r in filtered], |
|
|
} |
|
|
|
|
|
df = pd.DataFrame(data) |
|
|
df["timestamp"] = pd.to_datetime(df["timestamp"]) |
|
|
|
|
|
return df |
|
|
|
|
|
def generate_report(self) -> Dict: |
|
|
""" |
|
|
Generate performance monitoring report. |
|
|
|
|
|
Returns: |
|
|
Dictionary with report data |
|
|
""" |
|
|
recent_metrics = self.get_recent_metrics() |
|
|
trends = self.get_performance_trends() |
|
|
|
|
|
report = { |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"recent_metrics": recent_metrics, |
|
|
"total_predictions": len(self.metrics_history), |
|
|
"predictions_with_metrics": len([r for r in self.metrics_history if "metrics" in r]), |
|
|
} |
|
|
|
|
|
if not trends.empty: |
|
|
report["trends"] = { |
|
|
"f1_mean": trends["f1"].mean() if "f1" in trends.columns else None, |
|
|
"f1_std": trends["f1"].std() if "f1" in trends.columns else None, |
|
|
"f1_min": trends["f1"].min() if "f1" in trends.columns else None, |
|
|
"f1_max": trends["f1"].max() if "f1" in trends.columns else None, |
|
|
} |
|
|
|
|
|
return report |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|