"""Real-time feed simulator. Streams held-out accounts (and synthetic govt cyber-fraud tickets) into the scoring engine, mimicking a live cross-channel / regulatory feed. Scored alerts are appended to a JSONL store that the analyst dashboard tails. Usage: .venv/bin/python -m src.simulator.feed --once # score all, populate store .venv/bin/python -m src.simulator.feed --rate 1.5 # stream at ~1.5 acct/sec MULEGUARD_API=http://127.0.0.1:8000 ... --rate 2 # score via the API instead """ from __future__ import annotations import argparse import json import os import random import time from datetime import datetime, timezone import pandas as pd from src import config from src.models.scoring import explain_row, narrative, score_frame ALERTS_STORE = config.ROOT / "src" / "simulator" / "alerts_store.jsonl" # Synthetic government cyber-fraud ticket sources (I4C / NCRP style). TICKET_SOURCES = ["NCRP-Helpline-1930", "I4C-CyberAlert", "Bank-FMS", "TMS-RuleEngine"] def _now() -> str: return datetime.now(timezone.utc).isoformat() def _alert_from_row(account_id: str, raw_row: pd.DataFrame, actual: int | None) -> dict: scored = score_frame(raw_row).iloc[0] reasons = explain_row(raw_row) rnd = random.Random(hash(account_id) & 0xFFFF) return { "timestamp": _now(), "account_id": account_id, "channel": rnd.choice(["IMPS", "NEFT", "UPI", "RTGS", "Card"]), "feed_source": rnd.choice(TICKET_SOURCES), "risk_score": float(scored["risk_score"]), "risk_tier": str(scored["risk_tier"]), "decision": str(scored["decision"]), "probability": float(scored["probability"]), "narrative": narrative(float(scored["risk_score"]), reasons), "reason_codes": reasons, "actual_label": None if actual is None else int(actual), } def _load_stream() -> pd.DataFrame: df = pd.read_parquet(config.stream_source()) return df.sample(frac=1.0, random_state=config.SEED) # shuffle order of arrival def run(rate: float | None, once: bool, reset: bool) -> None: if reset and ALERTS_STORE.exists(): ALERTS_STORE.unlink() ALERTS_STORE.parent.mkdir(parents=True, exist_ok=True) df = _load_stream() y = df[config.TARGET].values raw = df.drop(columns=[config.TARGET]) n_flagged = 0 with ALERTS_STORE.open("a") as f: for i, (idx, _) in enumerate(raw.iterrows()): acc = f"ACC-{idx:06d}" alert = _alert_from_row(acc, raw.loc[[idx]], int(y[i])) f.write(json.dumps(alert) + "\n") f.flush() if alert["decision"] == "FLAG_MULE": n_flagged += 1 print(f"[{i+1}/{len(raw)}] 🚨 {acc} risk={alert['risk_score']:.0f} " f"{alert['risk_tier']} via {alert['feed_source']} — {alert['narrative']}") if not once and rate: time.sleep(1.0 / rate) print(f"\nDone. {len(raw)} accounts scored, {n_flagged} flagged. Store: {ALERTS_STORE}") def main() -> None: ap = argparse.ArgumentParser() ap.add_argument("--rate", type=float, default=None, help="accounts per second (streaming)") ap.add_argument("--once", action="store_true", help="score all immediately, no delay") ap.add_argument("--no-reset", action="store_true", help="append instead of resetting store") args = ap.parse_args() run(rate=args.rate, once=args.once or args.rate is None, reset=not args.no_reset) if __name__ == "__main__": main()