argus-mlops / src /monitoring /performance_monitor.py
hodfa840's picture
Fix flat performance graph on HF Spaces
50145b8
"""Rolling performance monitor with delayed-feedback support.
Predictions are logged immediately. Ground truth is matched by request_id
and may arrive hours after the prediction was made.
"""
from __future__ import annotations
import json
import time
from collections import deque
from pathlib import Path
from typing import Optional, Deque
import numpy as np
from src.utils.config import settings, resolve
from src.utils.logging_config import get_logger
log = get_logger(__name__)
class _PredictionRecord:
__slots__ = ("request_id", "prediction", "ground_truth", "timestamp", "features")
def __init__(
self,
request_id: str,
prediction: float,
timestamp: float,
features: dict,
) -> None:
self.request_id = request_id
self.prediction = prediction
self.ground_truth: Optional[float] = None
self.timestamp = timestamp
self.features = features
class PerformanceMonitor:
"""Rolling window performance tracker with delayed-feedback support."""
def __init__(self) -> None:
window_size = settings.monitoring.window_size
self._pending: dict[str, _PredictionRecord] = {}
self._matched: Deque[_PredictionRecord] = deque(maxlen=window_size)
self._baseline_rmse: Optional[float] = None
self._perf_log_path = resolve(settings.monitoring.performance_log_path)
self._last_log_time: float = 0.0
self._last_logged_rmse: Optional[float] = None
def log_prediction(self, request_id: str, prediction: float, features: dict) -> None:
record = _PredictionRecord(
request_id=request_id,
prediction=prediction,
timestamp=time.time(),
features=features,
)
self._pending[request_id] = record
def log_ground_truth(self, request_id: str, actual: float) -> bool:
"""Match ground truth to a pending prediction. Returns True if matched."""
record = self._pending.pop(request_id, None)
if record is None:
log.warning("Ground truth for unknown request_id=%s ignored.", request_id)
return False
record.ground_truth = actual
self._matched.append(record)
self._append_feedback_log(record)
log.debug(
"Ground truth matched: request_id=%s pred=%.2f actual=%.2f delay=%.1fs",
request_id, record.prediction, actual, time.time() - record.timestamp,
)
return True
def compute_metrics(self) -> Optional[dict]:
"""Compute rolling metrics over matched predictions. Returns None if insufficient data."""
min_samples = settings.monitoring.min_samples_for_evaluation
matched = list(self._matched)
if len(matched) < min_samples:
log.debug("Only %d matched samples, need %d.", len(matched), min_samples)
return None
preds = np.array([r.prediction for r in matched])
actuals = np.array([r.ground_truth for r in matched])
rmse = float(np.sqrt(np.mean((preds - actuals) ** 2)))
mae = float(np.mean(np.abs(preds - actuals)))
ss_res = np.sum((actuals - preds) ** 2)
ss_tot = np.sum((actuals - actuals.mean()) ** 2)
r2 = float(1 - ss_res / max(ss_tot, 1e-9))
metrics = {
"rmse": round(rmse, 4),
"mae": round(mae, 4),
"r2": round(r2, 4),
"n_samples": len(matched),
"n_pending": len(self._pending),
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
self._append_performance_log(metrics)
return metrics
def set_baseline_rmse(self, rmse: float) -> None:
self._baseline_rmse = rmse
log.info("Baseline RMSE set to %.4f", rmse)
def get_baseline_rmse(self) -> Optional[float]:
return self._baseline_rmse
def matched_count(self) -> int:
return len(self._matched)
def pending_count(self) -> int:
return len(self._pending)
def get_matched_dataframe(self):
import pandas as pd
records = list(self._matched)
if not records:
return pd.DataFrame()
rows = []
for r in records:
row = dict(r.features)
row["prediction"] = r.prediction
row["ground_truth"] = r.ground_truth
row["timestamp"] = r.timestamp
rows.append(row)
return pd.DataFrame(rows)
def _append_performance_log(self, metrics: dict) -> None:
"""Write metrics to log, throttled to avoid flooding with duplicate values."""
now = time.time()
current_rmse = metrics.get("rmse", 0.0)
if self._last_logged_rmse is not None:
rmse_change = abs(current_rmse - self._last_logged_rmse) / max(self._last_logged_rmse, 0.01)
time_ok = (now - self._last_log_time) < 30
if rmse_change < 0.01 and time_ok:
return
self._last_log_time = now
self._last_logged_rmse = current_rmse
with open(self._perf_log_path, "a", encoding="utf-8") as fh:
fh.write(json.dumps(metrics) + "\n")
def _append_feedback_log(self, record: _PredictionRecord) -> None:
feedback_path = resolve(settings.delayed_feedback.feedback_log_path)
entry = {
"request_id": record.request_id,
"prediction": record.prediction,
"ground_truth": record.ground_truth,
"prediction_timestamp": record.timestamp,
"feedback_timestamp": time.time(),
"delay_seconds": time.time() - record.timestamp,
}
with open(feedback_path, "a", encoding="utf-8") as fh:
fh.write(json.dumps(entry) + "\n")