Spaces:
Running
Running
| """ | |
| Concept-drift monitoring via the Population Stability Index (PSI). | |
| Fraud is adversarial: attack patterns evolve, so a model trained on last | |
| quarter's data silently decays. PSI is the industry-standard, **label-free** | |
| early-warning signal β it compares the distribution of a feature (or the model | |
| score) between a reference window and a recent window, with no need to wait for | |
| fraud labels to arrive. | |
| PSI < 0.10 β stable | |
| 0.10β0.25 β moderate shift, investigate | |
| PSI > 0.25 β significant shift, retrain | |
| Computing PSI on the model's *output score* is the single most useful monitor: | |
| it catches both data drift and concept drift in one number. | |
| """ | |
| from __future__ import annotations | |
| import numpy as np | |
| from src import config | |
| def psi(reference, current, bins: int = config.PSI_BINS) -> float: | |
| """ | |
| Population Stability Index between two 1-D samples. | |
| Bin edges are quantiles of the reference distribution. | |
| """ | |
| reference = np.asarray(reference, dtype=float) | |
| current = np.asarray(current, dtype=float) | |
| # Quantile bin edges from reference; widen the outer edges to catch tails | |
| edges = np.quantile(reference, np.linspace(0, 1, bins + 1)) | |
| edges = np.unique(edges) | |
| if len(edges) < 3: # near-constant feature | |
| return 0.0 | |
| edges[0], edges[-1] = -np.inf, np.inf | |
| ref_pct = np.histogram(reference, bins=edges)[0] / len(reference) | |
| cur_pct = np.histogram(current, bins=edges)[0] / len(current) | |
| # Laplace smoothing to avoid log(0) | |
| eps = 1e-6 | |
| ref_pct = np.clip(ref_pct, eps, None) | |
| cur_pct = np.clip(cur_pct, eps, None) | |
| return float(np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))) | |
| def classify_psi(value: float) -> str: | |
| if value < config.PSI_THRESHOLD_WARN: | |
| return "stable" | |
| if value < config.PSI_THRESHOLD_ALERT: | |
| return "moderate" | |
| return "significant" | |
| def feature_drift_report(ref_df, cur_df, features) -> list[dict]: | |
| """PSI per feature between two periods, sorted by severity.""" | |
| rows = [] | |
| for f in features: | |
| if f not in ref_df.columns or f not in cur_df.columns: | |
| continue | |
| val = psi(ref_df[f].values, cur_df[f].values) | |
| rows.append({"feature": f, "psi": round(val, 4), "status": classify_psi(val)}) | |
| rows.sort(key=lambda r: r["psi"], reverse=True) | |
| return rows | |