fraud-detection / src /drift.py
fikri0o0's picture
2026-06-05: deploy fraud detection dashboard (LightGBM + GNN + autoencoder, SHAP, drift, live scoring)
99bc19c verified
"""
Concept-drift monitoring via the Population Stability Index (PSI).
Fraud is adversarial: attack patterns evolve, so a model trained on last
quarter's data silently decays. PSI is the industry-standard, **label-free**
early-warning signal β€” it compares the distribution of a feature (or the model
score) between a reference window and a recent window, with no need to wait for
fraud labels to arrive.
PSI < 0.10 β†’ stable
0.10–0.25 β†’ moderate shift, investigate
PSI > 0.25 β†’ significant shift, retrain
Computing PSI on the model's *output score* is the single most useful monitor:
it catches both data drift and concept drift in one number.
"""
from __future__ import annotations
import numpy as np
from src import config
def psi(reference, current, bins: int = config.PSI_BINS) -> float:
"""
Population Stability Index between two 1-D samples.
Bin edges are quantiles of the reference distribution.
"""
reference = np.asarray(reference, dtype=float)
current = np.asarray(current, dtype=float)
# Quantile bin edges from reference; widen the outer edges to catch tails
edges = np.quantile(reference, np.linspace(0, 1, bins + 1))
edges = np.unique(edges)
if len(edges) < 3: # near-constant feature
return 0.0
edges[0], edges[-1] = -np.inf, np.inf
ref_pct = np.histogram(reference, bins=edges)[0] / len(reference)
cur_pct = np.histogram(current, bins=edges)[0] / len(current)
# Laplace smoothing to avoid log(0)
eps = 1e-6
ref_pct = np.clip(ref_pct, eps, None)
cur_pct = np.clip(cur_pct, eps, None)
return float(np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct)))
def classify_psi(value: float) -> str:
if value < config.PSI_THRESHOLD_WARN:
return "stable"
if value < config.PSI_THRESHOLD_ALERT:
return "moderate"
return "significant"
def feature_drift_report(ref_df, cur_df, features) -> list[dict]:
"""PSI per feature between two periods, sorted by severity."""
rows = []
for f in features:
if f not in ref_df.columns or f not in cur_df.columns:
continue
val = psi(ref_df[f].values, cur_df[f].values)
rows.append({"feature": f, "psi": round(val, 4), "status": classify_psi(val)})
rows.sort(key=lambda r: r["psi"], reverse=True)
return rows