File size: 9,034 Bytes
198ccb0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
"""Model performance monitoring utilities."""

import logging
from typing import Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from collections import defaultdict
import json
from pathlib import Path

logger = logging.getLogger(__name__)


class PerformanceMonitor:
    """
    Monitor model performance over time.
    
    Tracks metrics, detects performance degradation, and generates alerts.
    """

    def __init__(
        self,
        metrics_file: str = "monitoring/performance_metrics.json",
        window_size: int = 100,
        threshold_drop: float = 0.05,
    ):
        """
        Initialize performance monitor.
        
        Args:
            metrics_file: Path to store metrics
            window_size: Number of predictions to track
            threshold_drop: Threshold for performance drop alert (5%)
        """
        self.metrics_file = Path(metrics_file)
        self.metrics_file.parent.mkdir(parents=True, exist_ok=True)
        self.window_size = window_size
        self.threshold_drop = threshold_drop
        
        # Load existing metrics
        self.metrics_history = self._load_metrics()
        
        logger.info(f"PerformanceMonitor initialized: window_size={window_size}")

    def _load_metrics(self) -> List[Dict]:
        """Load metrics from file."""
        if self.metrics_file.exists():
            try:
                with open(self.metrics_file) as f:
                    return json.load(f)
            except Exception as e:
                logger.warning(f"Failed to load metrics: {e}")
                return []
        return []

    def _save_metrics(self) -> None:
        """Save metrics to file."""
        try:
            with open(self.metrics_file, 'w') as f:
                json.dump(self.metrics_history, f, indent=2, default=str)
        except Exception as e:
            logger.error(f"Failed to save metrics: {e}")

    def record_prediction(
        self,
        prediction: Dict,
        ground_truth: Optional[Dict] = None,
        metadata: Optional[Dict] = None,
    ) -> None:
        """
        Record a prediction for monitoring.
        
        Args:
            prediction: Prediction results (tags, scores)
            ground_truth: Optional ground truth labels
            metadata: Optional metadata (timestamp, model_version, etc.)
        """
        record = {
            "timestamp": datetime.now().isoformat(),
            "prediction": prediction,
            "ground_truth": ground_truth,
            "metadata": metadata or {},
        }
        
        # Calculate metrics if ground truth available
        if ground_truth:
            metrics = self._calculate_metrics(prediction, ground_truth)
            record["metrics"] = metrics
        
        self.metrics_history.append(record)
        
        # Keep only recent records
        if len(self.metrics_history) > self.window_size * 2:
            self.metrics_history = self.metrics_history[-self.window_size:]
        
        self._save_metrics()

    def _calculate_metrics(
        self,
        prediction: Dict,
        ground_truth: Dict,
    ) -> Dict[str, float]:
        """
        Calculate performance metrics.
        
        Args:
            prediction: Prediction results
            ground_truth: Ground truth labels
            
        Returns:
            Dictionary of metrics
        """
        pred_tags = set(prediction.get("tags", []))
        true_tags = set(ground_truth.get("tags", []))
        
        if not true_tags:
            return {}
        
        # Calculate precision, recall, F1
        tp = len(pred_tags & true_tags)
        fp = len(pred_tags - true_tags)
        fn = len(true_tags - pred_tags)
        
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0
        
        # Exact match
        exact_match = 1.0 if pred_tags == true_tags else 0.0
        
        return {
            "precision": precision,
            "recall": recall,
            "f1": f1,
            "exact_match": exact_match,
            "tp": tp,
            "fp": fp,
            "fn": fn,
        }

    def get_recent_metrics(
        self,
        window: Optional[int] = None,
    ) -> Dict[str, float]:
        """
        Get recent performance metrics.
        
        Args:
            window: Number of recent predictions to analyze
            
        Returns:
            Dictionary of average metrics
        """
        window = window or self.window_size
        recent = self.metrics_history[-window:]
        
        if not recent:
            return {}
        
        # Filter records with metrics
        records_with_metrics = [r for r in recent if "metrics" in r]
        
        if not records_with_metrics:
            return {}
        
        # Calculate averages
        metrics_list = [r["metrics"] for r in records_with_metrics]
        
        return {
            "precision": np.mean([m["precision"] for m in metrics_list]),
            "recall": np.mean([m["recall"] for m in metrics_list]),
            "f1": np.mean([m["f1"] for m in metrics_list]),
            "exact_match": np.mean([m["exact_match"] for m in metrics_list]),
            "count": len(records_with_metrics),
        }

    def check_performance_degradation(
        self,
        baseline_metrics: Optional[Dict[str, float]] = None,
    ) -> Tuple[bool, Dict]:
        """
        Check for performance degradation.
        
        Args:
            baseline_metrics: Baseline metrics to compare against
            
        Returns:
            Tuple of (is_degraded, degradation_info)
        """
        recent_metrics = self.get_recent_metrics()
        
        if not recent_metrics or not baseline_metrics:
            return False, {}
        
        degradation_info = {}
        is_degraded = False
        
        for metric in ["precision", "recall", "f1"]:
            if metric in baseline_metrics and metric in recent_metrics:
                drop = baseline_metrics[metric] - recent_metrics[metric]
                drop_pct = drop / baseline_metrics[metric] if baseline_metrics[metric] > 0 else 0
                
                degradation_info[metric] = {
                    "baseline": baseline_metrics[metric],
                    "current": recent_metrics[metric],
                    "drop": drop,
                    "drop_pct": drop_pct,
                }
                
                if drop_pct > self.threshold_drop:
                    is_degraded = True
                    logger.warning(
                        f"Performance degradation detected: {metric} dropped by "
                        f"{drop_pct:.2%} ({baseline_metrics[metric]:.3f} -> {recent_metrics[metric]:.3f})"
                    )
        
        return is_degraded, degradation_info

    def get_performance_trends(
        self,
        metric: str = "f1",
        days: int = 7,
    ) -> pd.DataFrame:
        """
        Get performance trends over time.
        
        Args:
            metric: Metric to track
            days: Number of days to analyze
            
        Returns:
            DataFrame with trends
        """
        cutoff_date = datetime.now() - timedelta(days=days)
        
        filtered = [
            r for r in self.metrics_history
            if datetime.fromisoformat(r["timestamp"]) >= cutoff_date
            and "metrics" in r
            and metric in r["metrics"]
        ]
        
        if not filtered:
            return pd.DataFrame()
        
        data = {
            "timestamp": [r["timestamp"] for r in filtered],
            metric: [r["metrics"][metric] for r in filtered],
        }
        
        df = pd.DataFrame(data)
        df["timestamp"] = pd.to_datetime(df["timestamp"])
        
        return df

    def generate_report(self) -> Dict:
        """
        Generate performance monitoring report.
        
        Returns:
            Dictionary with report data
        """
        recent_metrics = self.get_recent_metrics()
        trends = self.get_performance_trends()
        
        report = {
            "timestamp": datetime.now().isoformat(),
            "recent_metrics": recent_metrics,
            "total_predictions": len(self.metrics_history),
            "predictions_with_metrics": len([r for r in self.metrics_history if "metrics" in r]),
        }
        
        if not trends.empty:
            report["trends"] = {
                "f1_mean": trends["f1"].mean() if "f1" in trends.columns else None,
                "f1_std": trends["f1"].std() if "f1" in trends.columns else None,
                "f1_min": trends["f1"].min() if "f1" in trends.columns else None,
                "f1_max": trends["f1"].max() if "f1" in trends.columns else None,
            }
        
        return report