MuleGuard / src /simulator /feed.py
MuleGuard
MuleGuard: end-to-end mule-account detection + HF Space deploy
af879c2
Raw
History Blame Contribute Delete
3.53 kB
"""Real-time feed simulator.
Streams held-out accounts (and synthetic govt cyber-fraud tickets) into the
scoring engine, mimicking a live cross-channel / regulatory feed. Scored alerts
are appended to a JSONL store that the analyst dashboard tails.
Usage:
.venv/bin/python -m src.simulator.feed --once # score all, populate store
.venv/bin/python -m src.simulator.feed --rate 1.5 # stream at ~1.5 acct/sec
MULEGUARD_API=http://127.0.0.1:8000 ... --rate 2 # score via the API instead
"""
from __future__ import annotations
import argparse
import json
import os
import random
import time
from datetime import datetime, timezone
import pandas as pd
from src import config
from src.models.scoring import explain_row, narrative, score_frame
ALERTS_STORE = config.ROOT / "src" / "simulator" / "alerts_store.jsonl"
# Synthetic government cyber-fraud ticket sources (I4C / NCRP style).
TICKET_SOURCES = ["NCRP-Helpline-1930", "I4C-CyberAlert", "Bank-FMS", "TMS-RuleEngine"]
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _alert_from_row(account_id: str, raw_row: pd.DataFrame, actual: int | None) -> dict:
scored = score_frame(raw_row).iloc[0]
reasons = explain_row(raw_row)
rnd = random.Random(hash(account_id) & 0xFFFF)
return {
"timestamp": _now(),
"account_id": account_id,
"channel": rnd.choice(["IMPS", "NEFT", "UPI", "RTGS", "Card"]),
"feed_source": rnd.choice(TICKET_SOURCES),
"risk_score": float(scored["risk_score"]),
"risk_tier": str(scored["risk_tier"]),
"decision": str(scored["decision"]),
"probability": float(scored["probability"]),
"narrative": narrative(float(scored["risk_score"]), reasons),
"reason_codes": reasons,
"actual_label": None if actual is None else int(actual),
}
def _load_stream() -> pd.DataFrame:
df = pd.read_parquet(config.stream_source())
return df.sample(frac=1.0, random_state=config.SEED) # shuffle order of arrival
def run(rate: float | None, once: bool, reset: bool) -> None:
if reset and ALERTS_STORE.exists():
ALERTS_STORE.unlink()
ALERTS_STORE.parent.mkdir(parents=True, exist_ok=True)
df = _load_stream()
y = df[config.TARGET].values
raw = df.drop(columns=[config.TARGET])
n_flagged = 0
with ALERTS_STORE.open("a") as f:
for i, (idx, _) in enumerate(raw.iterrows()):
acc = f"ACC-{idx:06d}"
alert = _alert_from_row(acc, raw.loc[[idx]], int(y[i]))
f.write(json.dumps(alert) + "\n")
f.flush()
if alert["decision"] == "FLAG_MULE":
n_flagged += 1
print(f"[{i+1}/{len(raw)}] 🚨 {acc} risk={alert['risk_score']:.0f} "
f"{alert['risk_tier']} via {alert['feed_source']}{alert['narrative']}")
if not once and rate:
time.sleep(1.0 / rate)
print(f"\nDone. {len(raw)} accounts scored, {n_flagged} flagged. Store: {ALERTS_STORE}")
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--rate", type=float, default=None, help="accounts per second (streaming)")
ap.add_argument("--once", action="store_true", help="score all immediately, no delay")
ap.add_argument("--no-reset", action="store_true", help="append instead of resetting store")
args = ap.parse_args()
run(rate=args.rate, once=args.once or args.rate is None, reset=not args.no_reset)
if __name__ == "__main__":
main()