File size: 2,918 Bytes
79b961c 61ee9c4 79b961c 61ee9c4 79b961c 31460c4 79b961c 61ee9c4 79b961c 61ee9c4 b1725f1 31460c4 61ee9c4 31460c4 61ee9c4 79b961c 31460c4 79b961c b1725f1 79b961c 31460c4 ce96dae 79b961c 61ee9c4 31460c4 61ee9c4 79b961c 31460c4 79b961c 31460c4 79b961c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# app/api/background_drift.py
import asyncio
import pandas as pd
import os
import json
from app.monitoring.drift import run_drift_check
from app.inference.predictor import Predictor
predictor = Predictor()
REFERENCE_PATH = "models/v1/reference_data.csv"
PROD_LOG_PATH = "data/production/predictions_log.csv"
DASHBOARD_JSON = "reports/evidently/drift_report.json"
MAX_ROWS = 5000 # rolling window
os.makedirs(os.path.dirname(DASHBOARD_JSON), exist_ok=True)
async def drift_loop(interval_seconds: int = 10):
while True:
try:
if not os.path.exists(PROD_LOG_PATH):
await asyncio.sleep(interval_seconds)
continue
prod_df = pd.read_csv(PROD_LOG_PATH)
# Retention window
if len(prod_df) > MAX_ROWS:
prod_df = prod_df.tail(MAX_ROWS)
prod_df.to_csv(PROD_LOG_PATH, index=False)
# Keep only rows with all required features
missing_features = set(predictor.features) - set(prod_df.columns)
if missing_features:
print(f"Skipping drift check, missing features: {missing_features}")
await asyncio.sleep(interval_seconds)
continue
prod_df = prod_df.dropna(subset=predictor.features)
if prod_df.empty:
await asyncio.sleep(interval_seconds)
continue
reference_df = pd.read_csv(REFERENCE_PATH)
# ---- Run drift on features only ----
_, drift_dict = run_drift_check(
prod_df[predictor.features],
reference_df[predictor.features],
model_version="v1"
)
# ---- Populate predictions for dashboard ----
results = []
if "model_prediction" in prod_df.columns and "model_probability" in prod_df.columns:
for i, row in prod_df.tail(50).iterrows(): # last 50 rows
results.append({
"row": i,
"prediction": "Default" if row["model_prediction"] == 1 else "No Default",
"probability": round(float(row["model_probability"]), 4),
"risk_level": row.get("model_risk_level", "Unknown")
})
dashboard_payload = {
"n_rows": len(prod_df),
"results": results,
"drift": [
{"column": col, "score": float(score)}
for col, score in drift_dict.items()
],
}
tmp_path = DASHBOARD_JSON + ".tmp"
with open(tmp_path, "w") as f:
json.dump(dashboard_payload, f, indent=2)
os.replace(tmp_path, DASHBOARD_JSON)
except Exception as e:
print("Drift loop error:", e)
await asyncio.sleep(interval_seconds)
|