Spaces:
Sleeping
Sleeping
| """Real-time feed simulator. | |
| Streams held-out accounts (and synthetic govt cyber-fraud tickets) into the | |
| scoring engine, mimicking a live cross-channel / regulatory feed. Scored alerts | |
| are appended to a JSONL store that the analyst dashboard tails. | |
| Usage: | |
| .venv/bin/python -m src.simulator.feed --once # score all, populate store | |
| .venv/bin/python -m src.simulator.feed --rate 1.5 # stream at ~1.5 acct/sec | |
| MULEGUARD_API=http://127.0.0.1:8000 ... --rate 2 # score via the API instead | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import random | |
| import time | |
| from datetime import datetime, timezone | |
| import pandas as pd | |
| from src import config | |
| from src.models.scoring import explain_row, narrative, score_frame | |
| ALERTS_STORE = config.ROOT / "src" / "simulator" / "alerts_store.jsonl" | |
| # Synthetic government cyber-fraud ticket sources (I4C / NCRP style). | |
| TICKET_SOURCES = ["NCRP-Helpline-1930", "I4C-CyberAlert", "Bank-FMS", "TMS-RuleEngine"] | |
| def _now() -> str: | |
| return datetime.now(timezone.utc).isoformat() | |
| def _alert_from_row(account_id: str, raw_row: pd.DataFrame, actual: int | None) -> dict: | |
| scored = score_frame(raw_row).iloc[0] | |
| reasons = explain_row(raw_row) | |
| rnd = random.Random(hash(account_id) & 0xFFFF) | |
| return { | |
| "timestamp": _now(), | |
| "account_id": account_id, | |
| "channel": rnd.choice(["IMPS", "NEFT", "UPI", "RTGS", "Card"]), | |
| "feed_source": rnd.choice(TICKET_SOURCES), | |
| "risk_score": float(scored["risk_score"]), | |
| "risk_tier": str(scored["risk_tier"]), | |
| "decision": str(scored["decision"]), | |
| "probability": float(scored["probability"]), | |
| "narrative": narrative(float(scored["risk_score"]), reasons), | |
| "reason_codes": reasons, | |
| "actual_label": None if actual is None else int(actual), | |
| } | |
| def _load_stream() -> pd.DataFrame: | |
| df = pd.read_parquet(config.stream_source()) | |
| return df.sample(frac=1.0, random_state=config.SEED) # shuffle order of arrival | |
| def run(rate: float | None, once: bool, reset: bool) -> None: | |
| if reset and ALERTS_STORE.exists(): | |
| ALERTS_STORE.unlink() | |
| ALERTS_STORE.parent.mkdir(parents=True, exist_ok=True) | |
| df = _load_stream() | |
| y = df[config.TARGET].values | |
| raw = df.drop(columns=[config.TARGET]) | |
| n_flagged = 0 | |
| with ALERTS_STORE.open("a") as f: | |
| for i, (idx, _) in enumerate(raw.iterrows()): | |
| acc = f"ACC-{idx:06d}" | |
| alert = _alert_from_row(acc, raw.loc[[idx]], int(y[i])) | |
| f.write(json.dumps(alert) + "\n") | |
| f.flush() | |
| if alert["decision"] == "FLAG_MULE": | |
| n_flagged += 1 | |
| print(f"[{i+1}/{len(raw)}] 🚨 {acc} risk={alert['risk_score']:.0f} " | |
| f"{alert['risk_tier']} via {alert['feed_source']} — {alert['narrative']}") | |
| if not once and rate: | |
| time.sleep(1.0 / rate) | |
| print(f"\nDone. {len(raw)} accounts scored, {n_flagged} flagged. Store: {ALERTS_STORE}") | |
| def main() -> None: | |
| ap = argparse.ArgumentParser() | |
| ap.add_argument("--rate", type=float, default=None, help="accounts per second (streaming)") | |
| ap.add_argument("--once", action="store_true", help="score all immediately, no delay") | |
| ap.add_argument("--no-reset", action="store_true", help="append instead of resetting store") | |
| args = ap.parse_args() | |
| run(rate=args.rate, once=args.once or args.rate is None, reset=not args.no_reset) | |
| if __name__ == "__main__": | |
| main() | |