File size: 5,759 Bytes
1aa566a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
50145b8
 
1aa566a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
50145b8
 
 
 
 
 
 
 
 
 
 
 
1aa566a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""Rolling performance monitor with delayed-feedback support.

Predictions are logged immediately. Ground truth is matched by request_id
and may arrive hours after the prediction was made.
"""
from __future__ import annotations

import json
import time
from collections import deque
from pathlib import Path
from typing import Optional, Deque

import numpy as np

from src.utils.config import settings, resolve
from src.utils.logging_config import get_logger

log = get_logger(__name__)


class _PredictionRecord:
    __slots__ = ("request_id", "prediction", "ground_truth", "timestamp", "features")

    def __init__(
        self,
        request_id: str,
        prediction: float,
        timestamp: float,
        features: dict,
    ) -> None:
        self.request_id = request_id
        self.prediction = prediction
        self.ground_truth: Optional[float] = None
        self.timestamp = timestamp
        self.features = features


class PerformanceMonitor:
    """Rolling window performance tracker with delayed-feedback support."""

    def __init__(self) -> None:
        window_size = settings.monitoring.window_size
        self._pending: dict[str, _PredictionRecord] = {}
        self._matched: Deque[_PredictionRecord] = deque(maxlen=window_size)
        self._baseline_rmse: Optional[float] = None
        self._perf_log_path = resolve(settings.monitoring.performance_log_path)
        self._last_log_time: float = 0.0
        self._last_logged_rmse: Optional[float] = None

    def log_prediction(self, request_id: str, prediction: float, features: dict) -> None:
        record = _PredictionRecord(
            request_id=request_id,
            prediction=prediction,
            timestamp=time.time(),
            features=features,
        )
        self._pending[request_id] = record

    def log_ground_truth(self, request_id: str, actual: float) -> bool:
        """Match ground truth to a pending prediction. Returns True if matched."""
        record = self._pending.pop(request_id, None)
        if record is None:
            log.warning("Ground truth for unknown request_id=%s ignored.", request_id)
            return False

        record.ground_truth = actual
        self._matched.append(record)
        self._append_feedback_log(record)
        log.debug(
            "Ground truth matched: request_id=%s pred=%.2f actual=%.2f delay=%.1fs",
            request_id, record.prediction, actual, time.time() - record.timestamp,
        )
        return True

    def compute_metrics(self) -> Optional[dict]:
        """Compute rolling metrics over matched predictions. Returns None if insufficient data."""
        min_samples = settings.monitoring.min_samples_for_evaluation
        matched = list(self._matched)

        if len(matched) < min_samples:
            log.debug("Only %d matched samples, need %d.", len(matched), min_samples)
            return None

        preds = np.array([r.prediction for r in matched])
        actuals = np.array([r.ground_truth for r in matched])

        rmse = float(np.sqrt(np.mean((preds - actuals) ** 2)))
        mae = float(np.mean(np.abs(preds - actuals)))
        ss_res = np.sum((actuals - preds) ** 2)
        ss_tot = np.sum((actuals - actuals.mean()) ** 2)
        r2 = float(1 - ss_res / max(ss_tot, 1e-9))

        metrics = {
            "rmse": round(rmse, 4),
            "mae": round(mae, 4),
            "r2": round(r2, 4),
            "n_samples": len(matched),
            "n_pending": len(self._pending),
            "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
        }

        self._append_performance_log(metrics)
        return metrics

    def set_baseline_rmse(self, rmse: float) -> None:
        self._baseline_rmse = rmse
        log.info("Baseline RMSE set to %.4f", rmse)

    def get_baseline_rmse(self) -> Optional[float]:
        return self._baseline_rmse

    def matched_count(self) -> int:
        return len(self._matched)

    def pending_count(self) -> int:
        return len(self._pending)

    def get_matched_dataframe(self):
        import pandas as pd
        records = list(self._matched)
        if not records:
            return pd.DataFrame()
        rows = []
        for r in records:
            row = dict(r.features)
            row["prediction"] = r.prediction
            row["ground_truth"] = r.ground_truth
            row["timestamp"] = r.timestamp
            rows.append(row)
        return pd.DataFrame(rows)

    def _append_performance_log(self, metrics: dict) -> None:
        """Write metrics to log, throttled to avoid flooding with duplicate values."""
        now = time.time()
        current_rmse = metrics.get("rmse", 0.0)

        if self._last_logged_rmse is not None:
            rmse_change = abs(current_rmse - self._last_logged_rmse) / max(self._last_logged_rmse, 0.01)
            time_ok = (now - self._last_log_time) < 30
            if rmse_change < 0.01 and time_ok:
                return

        self._last_log_time = now
        self._last_logged_rmse = current_rmse
        with open(self._perf_log_path, "a", encoding="utf-8") as fh:
            fh.write(json.dumps(metrics) + "\n")

    def _append_feedback_log(self, record: _PredictionRecord) -> None:
        feedback_path = resolve(settings.delayed_feedback.feedback_log_path)
        entry = {
            "request_id": record.request_id,
            "prediction": record.prediction,
            "ground_truth": record.ground_truth,
            "prediction_timestamp": record.timestamp,
            "feedback_timestamp": time.time(),
            "delay_seconds": time.time() - record.timestamp,
        }
        with open(feedback_path, "a", encoding="utf-8") as fh:
            fh.write(json.dumps(entry) + "\n")