argus-mlops / src /api /routes /monitor.py
hodfa840's picture
Fix scroll reset for HF Spaces double-iframe context
1aa566a
"""
Monitoring & drift endpoints.
GET /monitor/metrics → rolling performance metrics
GET /monitor/drift → run drift check on recent live data vs reference
POST /monitor/retrain → manually trigger retraining evaluation
GET /monitor/history → drift + performance history
"""
from __future__ import annotations
import json
import time
from pathlib import Path
from typing import Any
import pandas as pd
from fastapi import APIRouter, HTTPException, Request
from src.api.schemas import (
DriftCheckResponse,
PerformanceMetricsResponse,
RetrainingResponse,
)
from src.utils.config import settings, resolve
from src.utils.logging_config import get_logger
router = APIRouter(prefix="/monitor", tags=["Monitoring"])
log = get_logger(__name__)
@router.get("/metrics", response_model=PerformanceMetricsResponse)
async def get_metrics(request: Request) -> PerformanceMetricsResponse:
"""Return rolling performance metrics from matched predictions."""
monitor = request.app.state.monitor
metrics = monitor.compute_metrics()
baseline = monitor.get_baseline_rmse()
if metrics is None:
return PerformanceMetricsResponse(
rmse=None, mae=None, r2=None,
n_samples=monitor.matched_count(),
n_pending=monitor.pending_count(),
baseline_rmse=baseline,
timestamp=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
)
return PerformanceMetricsResponse(
rmse=metrics["rmse"],
mae=metrics["mae"],
r2=metrics["r2"],
n_samples=metrics["n_samples"],
n_pending=monitor.pending_count(),
baseline_rmse=baseline,
timestamp=metrics["timestamp"],
)
@router.get("/drift", response_model=DriftCheckResponse)
async def check_drift(request: Request) -> DriftCheckResponse:
"""
Run drift detection on live data collected since last check.
Combines feature drift (PSI/KS) + performance drift into one report
and runs root-cause analysis if drift is detected.
"""
app_state = request.app.state
drift_detector = app_state.drift_detector
rca = app_state.rca
monitor = app_state.monitor
if not drift_detector.has_reference():
raise HTTPException(status_code=503, detail="Reference dataset not loaded yet.")
# Get live feature data from matched predictions
live_df = monitor.get_matched_dataframe()
min_samples = settings.monitoring.drift.min_samples_for_drift_test
if len(live_df) < min_samples:
return DriftCheckResponse(
drift_detected=False,
root_cause=[],
performance_drop=None,
action="insufficient_data",
feature_results={},
drifted_features=[],
rca_details=None,
timestamp=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
)
# Feature drift
feature_cols = [c for c in settings.data.features if c in live_df.columns]
feat_report = drift_detector.detect_feature_drift(live_df, features=feature_cols)
# Performance drift
metrics = monitor.compute_metrics()
baseline = monitor.get_baseline_rmse()
perf_report: dict = {"drift_detected": False}
performance_drop = None
if metrics and baseline:
perf_report = drift_detector.detect_performance_drift(metrics["rmse"], baseline)
if perf_report["drift_detected"]:
pct = perf_report["pct_change"]
performance_drop = f"{pct:.1f}%"
# RCA
rca_result: dict = {}
if feat_report["drift_detected"]:
rca_result = rca.analyze(feat_report)
# Retraining decision
trigger = app_state.trigger
decision = trigger.should_retrain(
feature_drift_report=feat_report,
performance_report=perf_report,
samples_since_last_retrain=app_state.samples_since_last_retrain,
)
action = "no_action"
if decision["should_retrain"]:
action = "retraining_triggered"
# Fire async retraining (background)
import asyncio
asyncio.create_task(_run_retraining(app_state, rca_result))
elif feat_report["drift_detected"]:
action = "drift_detected_monitoring"
return DriftCheckResponse(
drift_detected=feat_report["drift_detected"] or perf_report.get("drift_detected", False),
root_cause=feat_report.get("drifted_features", []),
performance_drop=performance_drop,
action=action,
feature_results=feat_report.get("feature_results", {}),
drifted_features=feat_report.get("drifted_features", []),
rca_details=rca_result.get("root_causes"),
timestamp=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
)
@router.post("/retrain", response_model=RetrainingResponse)
async def trigger_retrain(request: Request) -> RetrainingResponse:
"""Manually trigger a retraining run (bypasses drift gates)."""
app_state = request.app.state
monitor = app_state.monitor
pipeline = app_state.retrain_pipeline
live_df = monitor.get_matched_dataframe()
if len(live_df) < 50:
return RetrainingResponse(
triggered=False,
promoted=None,
improvement_pct=None,
root_causes=[],
action="insufficient_labeled_data",
message=f"Only {len(live_df)} labeled samples available. Need at least 50.",
timestamp=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
)
result = pipeline.run(
training_df=live_df,
eval_df=live_df.sample(frac=0.2, random_state=42),
tags={"trigger": "manual"},
)
if result["promoted"]:
model = app_state.registry.load_champion()
if model:
app_state.model = model
app_state.model_version = f"v{int(time.time())}"
if result["challenger_metrics"].get("rmse"):
monitor.set_baseline_rmse(result["challenger_metrics"]["rmse"])
return RetrainingResponse(
triggered=True,
promoted=result["promoted"],
improvement_pct=result.get("improvement_pct"),
root_causes=[c["feature"] for c in result.get("root_causes", [])],
action="champion_promoted" if result["promoted"] else "challenger_not_promoted",
message="Manual retraining completed.",
timestamp=result["timestamp"],
)
@router.get("/history")
async def get_history(
limit: int = 100,
log_type: str = "drift",
) -> list[dict]:
"""
Return recent log entries.
log_type: 'drift' | 'performance' | 'retrain' | 'feedback'
"""
log_paths = {
"drift": resolve(settings.monitoring.drift_report_path),
"performance": resolve(settings.monitoring.performance_log_path),
"retrain": resolve(settings.retraining.retrain_log_path),
"feedback": resolve(settings.delayed_feedback.feedback_log_path),
}
path = log_paths.get(log_type)
if path is None:
raise HTTPException(status_code=400, detail=f"Unknown log_type '{log_type}'")
if not path.exists():
return []
lines = path.read_text(encoding="utf-8").splitlines()
entries = []
for line in lines[-limit:]:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
pass
return entries
# ------------------------------------------------------------------
# Background task helper
# ------------------------------------------------------------------
async def _run_retraining(app_state: Any, rca_result: dict) -> None:
"""Fire-and-forget retraining coroutine."""
try:
monitor = app_state.monitor
pipeline = app_state.retrain_pipeline
live_df = monitor.get_matched_dataframe()
if len(live_df) < 50:
log.warning("Not enough labeled samples for retraining (%d).", len(live_df))
return
result = pipeline.run(
training_df=live_df,
eval_df=live_df.sample(frac=0.2, random_state=42),
rca_report=rca_result,
tags={"trigger": "auto_drift"},
)
if result["promoted"]:
model = app_state.registry.load_champion()
if model:
app_state.model = model
app_state.model_version = f"v{int(time.time())}"
if result["challenger_metrics"].get("rmse"):
monitor.set_baseline_rmse(result["challenger_metrics"]["rmse"])
app_state.trigger.record_retrain_completed()
app_state.samples_since_last_retrain = 0
log.info("Auto-retraining complete — new champion promoted.")
except Exception as exc:
log.error("Background retraining failed: %s", exc, exc_info=True)