Spaces:
Sleeping
Sleeping
File size: 5,759 Bytes
1aa566a 50145b8 1aa566a 50145b8 1aa566a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | """Rolling performance monitor with delayed-feedback support.
Predictions are logged immediately. Ground truth is matched by request_id
and may arrive hours after the prediction was made.
"""
from __future__ import annotations
import json
import time
from collections import deque
from pathlib import Path
from typing import Optional, Deque
import numpy as np
from src.utils.config import settings, resolve
from src.utils.logging_config import get_logger
log = get_logger(__name__)
class _PredictionRecord:
__slots__ = ("request_id", "prediction", "ground_truth", "timestamp", "features")
def __init__(
self,
request_id: str,
prediction: float,
timestamp: float,
features: dict,
) -> None:
self.request_id = request_id
self.prediction = prediction
self.ground_truth: Optional[float] = None
self.timestamp = timestamp
self.features = features
class PerformanceMonitor:
"""Rolling window performance tracker with delayed-feedback support."""
def __init__(self) -> None:
window_size = settings.monitoring.window_size
self._pending: dict[str, _PredictionRecord] = {}
self._matched: Deque[_PredictionRecord] = deque(maxlen=window_size)
self._baseline_rmse: Optional[float] = None
self._perf_log_path = resolve(settings.monitoring.performance_log_path)
self._last_log_time: float = 0.0
self._last_logged_rmse: Optional[float] = None
def log_prediction(self, request_id: str, prediction: float, features: dict) -> None:
record = _PredictionRecord(
request_id=request_id,
prediction=prediction,
timestamp=time.time(),
features=features,
)
self._pending[request_id] = record
def log_ground_truth(self, request_id: str, actual: float) -> bool:
"""Match ground truth to a pending prediction. Returns True if matched."""
record = self._pending.pop(request_id, None)
if record is None:
log.warning("Ground truth for unknown request_id=%s ignored.", request_id)
return False
record.ground_truth = actual
self._matched.append(record)
self._append_feedback_log(record)
log.debug(
"Ground truth matched: request_id=%s pred=%.2f actual=%.2f delay=%.1fs",
request_id, record.prediction, actual, time.time() - record.timestamp,
)
return True
def compute_metrics(self) -> Optional[dict]:
"""Compute rolling metrics over matched predictions. Returns None if insufficient data."""
min_samples = settings.monitoring.min_samples_for_evaluation
matched = list(self._matched)
if len(matched) < min_samples:
log.debug("Only %d matched samples, need %d.", len(matched), min_samples)
return None
preds = np.array([r.prediction for r in matched])
actuals = np.array([r.ground_truth for r in matched])
rmse = float(np.sqrt(np.mean((preds - actuals) ** 2)))
mae = float(np.mean(np.abs(preds - actuals)))
ss_res = np.sum((actuals - preds) ** 2)
ss_tot = np.sum((actuals - actuals.mean()) ** 2)
r2 = float(1 - ss_res / max(ss_tot, 1e-9))
metrics = {
"rmse": round(rmse, 4),
"mae": round(mae, 4),
"r2": round(r2, 4),
"n_samples": len(matched),
"n_pending": len(self._pending),
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
self._append_performance_log(metrics)
return metrics
def set_baseline_rmse(self, rmse: float) -> None:
self._baseline_rmse = rmse
log.info("Baseline RMSE set to %.4f", rmse)
def get_baseline_rmse(self) -> Optional[float]:
return self._baseline_rmse
def matched_count(self) -> int:
return len(self._matched)
def pending_count(self) -> int:
return len(self._pending)
def get_matched_dataframe(self):
import pandas as pd
records = list(self._matched)
if not records:
return pd.DataFrame()
rows = []
for r in records:
row = dict(r.features)
row["prediction"] = r.prediction
row["ground_truth"] = r.ground_truth
row["timestamp"] = r.timestamp
rows.append(row)
return pd.DataFrame(rows)
def _append_performance_log(self, metrics: dict) -> None:
"""Write metrics to log, throttled to avoid flooding with duplicate values."""
now = time.time()
current_rmse = metrics.get("rmse", 0.0)
if self._last_logged_rmse is not None:
rmse_change = abs(current_rmse - self._last_logged_rmse) / max(self._last_logged_rmse, 0.01)
time_ok = (now - self._last_log_time) < 30
if rmse_change < 0.01 and time_ok:
return
self._last_log_time = now
self._last_logged_rmse = current_rmse
with open(self._perf_log_path, "a", encoding="utf-8") as fh:
fh.write(json.dumps(metrics) + "\n")
def _append_feedback_log(self, record: _PredictionRecord) -> None:
feedback_path = resolve(settings.delayed_feedback.feedback_log_path)
entry = {
"request_id": record.request_id,
"prediction": record.prediction,
"ground_truth": record.ground_truth,
"prediction_timestamp": record.timestamp,
"feedback_timestamp": time.time(),
"delay_seconds": time.time() - record.timestamp,
}
with open(feedback_path, "a", encoding="utf-8") as fh:
fh.write(json.dumps(entry) + "\n")
|