"""Programmatic synthesizer for the fraud-pattern surface. Mirrors the dispute / collections synthesizers but emits TWO categorical labels per example: pattern_stage (5-class): pre_attack / probing / monetization / exfiltration / dormant pattern_type (4-class): victim_fraud / account_takeover / scam_redirected / declined_legit Inputs the model conditions on: feature_ids (B, 64, 15) customer transaction history flagged_idx (B,) position the upstream detector flagged context_text free-form analyst note + upstream fraud score The cross-position properties the labels depend on: probe_cluster_density count of small-amount CNP in window before flagged post_attack_high_density count of large-amount unfamiliar tx around flagged novel_device_score 1.0 if flagged device_hash appears only at flagged sig_clean_score 1.0 if flagged tx matches customer's mode pattern (home country, CVV match, AVS match, familiar merchant) These mirror the dispute markers (subscription, exotic_country) and the collections markers (velocity, sub_burden, etc.) — local biases at the flagged position that make the label readable to a 350M backbone. """ from __future__ import annotations import json import random from dataclasses import asdict, dataclass from pathlib import Path from typing import Any import numpy as np # --- Feature column indices (must match data/schema.yaml) --- FEATURE_ENTRY_MODE = 7 FEATURE_AMOUNT = 8 FEATURE_COUNTRY = 10 FEATURE_AVS = 11 FEATURE_CVV = 12 FEATURE_DEVICE_HASH = 13 FEATURE_MERCHANT_ID = 5 FEATURE_CUSTOMER_MERCHANT_COUNT = 6 RESERVED_OFFSET = 3 # Entry mode values: 0=card_present, 1=card_not_present → tokens 3, 4 ENTRY_CNP = RESERVED_OFFSET + 1 # = 4 # Verification tokens CVV_MATCH = RESERVED_OFFSET + 0 # 3 CVV_NO_MATCH = RESERVED_OFFSET + 1 # 4 AVS_FULL_MATCH = RESERVED_OFFSET + 0 # 3 AVS_NO_MATCH = RESERVED_OFFSET + 3 # 6 # Amount thresholds AMOUNT_SMALL_THRESH = RESERVED_OFFSET + 8 # token <= 11 = ~$5 ≈ probe-size AMOUNT_LARGE_THRESH = RESERVED_OFFSET + 150 # token >= 153 = ~$150+ = monetization-size # Customer-merchant count thresholds (token = bucket + 3) CMC_FAMILIAR = RESERVED_OFFSET + 5 # bucket >= 5 = "knows merchant" CMC_UNFAMILIAR = RESERVED_OFFSET + 0 # bucket 0 = "never seen" # Window sizes PROBE_WINDOW = 6 # check 6 tx before flagged for probing cluster POST_ATTACK_WINDOW = 6 # check flagged + 5 after for exfiltration density RECENT_AUTHORIZE_WINDOW = 16 # for SCAM_REDIRECTED detection # Class labels. STAGE_PRE_ATTACK = 0 STAGE_PROBING = 1 STAGE_MONETIZATION = 2 STAGE_EXFILTRATION = 3 STAGE_DORMANT = 4 NUM_STAGES = 5 STAGE_NAMES = ["pre_attack", "probing", "monetization", "exfiltration", "dormant"] TYPE_VICTIM_FRAUD = 0 TYPE_ACCOUNT_TAKEOVER = 1 TYPE_SCAM_REDIRECTED = 2 TYPE_DECLINED_LEGIT = 3 NUM_TYPES = 4 TYPE_NAMES = ["victim_fraud", "account_takeover", "scam_redirected", "declined_legit"] # --- Cross-position signals (mirror in encoder forward) --- def signal_probe_density(history: np.ndarray, flagged_idx: int) -> int: """Count of small-amount CNP transactions in the PROBE_WINDOW before flagged.""" start = max(0, flagged_idx - PROBE_WINDOW) window = history[start:flagged_idx] cnp = window[:, FEATURE_ENTRY_MODE] == ENTRY_CNP small = window[:, FEATURE_AMOUNT] <= AMOUNT_SMALL_THRESH return int(np.sum(cnp & small)) def signal_post_attack_density(history: np.ndarray, flagged_idx: int) -> int: """Count of large-amount unfamiliar-merchant transactions in the POST_ATTACK_WINDOW around flagged (inclusive of flagged + after).""" end = min(64, flagged_idx + POST_ATTACK_WINDOW) window = history[flagged_idx:end] large = window[:, FEATURE_AMOUNT] >= AMOUNT_LARGE_THRESH unfamiliar = window[:, FEATURE_CUSTOMER_MERCHANT_COUNT] <= CMC_UNFAMILIAR + 1 return int(np.sum(large & unfamiliar)) def signal_novel_device(history: np.ndarray, flagged_idx: int) -> bool: """True iff the flagged transaction's device_hash appears nowhere else.""" flagged_device = history[flagged_idx, FEATURE_DEVICE_HASH] matches = (history[:, FEATURE_DEVICE_HASH] == flagged_device).sum() return matches <= 1 def signal_signature_clean(history: np.ndarray, flagged_idx: int) -> bool: """True iff the flagged transaction looks normal for this customer: home country, CVV match, AVS match, familiar merchant.""" countries = history[:, FEATURE_COUNTRY] mode_country = int(np.bincount(countries).argmax()) return bool( history[flagged_idx, FEATURE_COUNTRY] == mode_country and history[flagged_idx, FEATURE_CVV] == CVV_MATCH and history[flagged_idx, FEATURE_AVS] == AVS_FULL_MATCH and history[flagged_idx, FEATURE_CUSTOMER_MERCHANT_COUNT] >= CMC_FAMILIAR ) def signal_recent_authorize_density(history: np.ndarray, flagged_idx: int) -> int: """Count of CNP transactions at unfamiliar merchants in the recent window. A SCAM_REDIRECTED customer authorizes many one-time payments to unfamiliar merchants (often the scammer's payment processors). This differs from PROBING because the amounts are typical (not micro-CNP) and the device is the customer's own. """ start = max(0, flagged_idx - RECENT_AUTHORIZE_WINDOW) window = history[start:flagged_idx + 1] cnp = window[:, FEATURE_ENTRY_MODE] == ENTRY_CNP unfamiliar = window[:, FEATURE_CUSTOMER_MERCHANT_COUNT] <= CMC_UNFAMILIAR + 1 return int(np.sum(cnp & unfamiliar)) # --- Stage rule --- def classify_stage(history: np.ndarray, flagged_idx: int) -> int: """Programmatic 5-class pattern stage from cross-position signals.""" probe_count = signal_probe_density(history, flagged_idx) post_count = signal_post_attack_density(history, flagged_idx) flagged_amount = int(history[flagged_idx, FEATURE_AMOUNT]) flagged_cmc = int(history[flagged_idx, FEATURE_CUSTOMER_MERCHANT_COUNT]) sig_clean = signal_signature_clean(history, flagged_idx) # DORMANT: customer's pattern is unchanged at the flagged tx — # upstream detector likely false-positive. if sig_clean and probe_count == 0 and post_count == 0: return STAGE_DORMANT # EXFILTRATION: probing + multiple large unfamiliar charges # (probing succeeded; attacker is harvesting). if probe_count >= 3 and post_count >= 2: return STAGE_EXFILTRATION # MONETIZATION: probing followed by a single large unfamiliar charge # (probe-into-big-purchase, the most common attack shape). if probe_count >= 3 and ( flagged_amount >= AMOUNT_LARGE_THRESH and flagged_cmc <= CMC_UNFAMILIAR + 1 ): return STAGE_MONETIZATION # PROBING: 3+ probes preceding but no big charge yet. if probe_count >= 3: return STAGE_PROBING # EXFILTRATION (no-probe variant): multiple large unfamiliar charges # in the window. Attacker skipped probing or it's elsewhere. if post_count >= 2: return STAGE_EXFILTRATION # PRE_ATTACK: weak signal — single anomalous transaction, no chain. return STAGE_PRE_ATTACK # --- Type rule --- def classify_type(history: np.ndarray, flagged_idx: int) -> int: """Programmatic 4-class pattern type from cross-position signals. Decision order: 1. ACCOUNT_TAKEOVER if device is novel 2. DECLINED_LEGIT if signature is clean 3. SCAM_REDIRECTED if customer authorized many recent CNP-to-unfamiliar 4. VICTIM_FRAUD otherwise (default) """ if signal_novel_device(history, flagged_idx): return TYPE_ACCOUNT_TAKEOVER if signal_signature_clean(history, flagged_idx): return TYPE_DECLINED_LEGIT if signal_recent_authorize_density(history, flagged_idx) >= 5: return TYPE_SCAM_REDIRECTED return TYPE_VICTIM_FRAUD def classify_pattern(history: np.ndarray, flagged_idx: int) -> tuple[int, int]: """(stage, type) per the rules above.""" return classify_stage(history, flagged_idx), classify_type(history, flagged_idx) # --- Attribution --- def attribution_for_pattern( history: np.ndarray, flagged_idx: int, stage: int, ) -> np.ndarray: """Per-position contribution labels keyed off the stage. The attribution glow highlights the positions an investigator should look at first. For probing/monetization: the small-CNP cluster preceding. For exfiltration: the large-charge cluster around flagged. For dormant: just the flagged position (no other signal to attend to). For pre_attack: the flagged position + any visible anomaly. """ attr = np.zeros(64, dtype=np.float32) attr[flagged_idx] = 1.0 if stage in (STAGE_PROBING, STAGE_MONETIZATION, STAGE_EXFILTRATION): start = max(0, flagged_idx - PROBE_WINDOW) window = history[start:flagged_idx] cnp = window[:, FEATURE_ENTRY_MODE] == ENTRY_CNP small = window[:, FEATURE_AMOUNT] <= AMOUNT_SMALL_THRESH for i, hit in enumerate(cnp & small): if hit: attr[start + i] = 1.0 if stage in (STAGE_EXFILTRATION, STAGE_MONETIZATION): end = min(64, flagged_idx + POST_ATTACK_WINDOW) window = history[flagged_idx:end] large = window[:, FEATURE_AMOUNT] >= AMOUNT_LARGE_THRESH unfamiliar = window[:, FEATURE_CUSTOMER_MERCHANT_COUNT] <= CMC_UNFAMILIAR + 1 for i, hit in enumerate(large & unfamiliar): if hit: attr[flagged_idx + i] = 1.0 if stage == STAGE_PRE_ATTACK: # Mark any preceding tx with a verification anomaly. window = history[max(0, flagged_idx - 8):flagged_idx] cvv_bad = window[:, FEATURE_CVV] == CVV_NO_MATCH avs_bad = window[:, FEATURE_AVS] == AVS_NO_MATCH start = max(0, flagged_idx - 8) for i, hit in enumerate(cvv_bad | avs_bad): if hit: attr[start + i] = 1.0 return attr # --- Context text bank --- CONTEXT_TEMPLATES: dict[str, list[str]] = { "formal": [ "Upstream fraud detector flagged transaction #{flagged_idx} at score {upstream:.2f}. Please assess pattern stage and type.", "Transaction {flagged_idx} flagged for review (detector score {upstream:.2f}). Recommend pattern classification.", "Investigation requested for transaction {flagged_idx}. Upstream model score {upstream:.2f}. Stage + type?", ], "casual": [ "Hey, tx {flagged_idx} pinged the fraud detector at {upstream:.2f}. What's going on?", "Got a flag on transaction {flagged_idx}, detector says {upstream:.2f}. Pattern call?", "Fraud queue: tx {flagged_idx} at {upstream:.2f}. Take a look?", ], "terse": [ "tx{flagged_idx} flagged @ {upstream:.2f}. Classify.", "Flag tx {flagged_idx}, score {upstream:.2f}.", "Fraud tx{flagged_idx} {upstream:.2f}.", ], "detailed": [ "The upstream fraud detector escalated transaction {flagged_idx} with a score of {upstream:.2f}. The customer's recent history shows mixed signals — request model classification on pattern stage and underlying type.", "Investigator review on tx {flagged_idx} (detector {upstream:.2f}). Need stage (probing/monetization/etc.) plus underlying type (victim/takeover/scam/false-positive).", ], "urgent": [ "URGENT: tx {flagged_idx} flagged at {upstream:.2f}. Pre-decline window closing — classify now.", "Time-sensitive fraud queue: tx {flagged_idx}, score {upstream:.2f}. Need stage + type asap.", ], } def _build_context_text( flagged_idx: int, rng: random.Random, ) -> tuple[str, str, dict[str, Any]]: """Render an analyst-facing flag context.""" upstream = round(rng.uniform(0.55, 0.95), 2) tone = rng.choice(list(CONTEXT_TEMPLATES.keys())) template = rng.choice(CONTEXT_TEMPLATES[tone]) text = template.format(flagged_idx=flagged_idx, upstream=upstream) return text, tone, {"flagged_idx": flagged_idx, "upstream_score": upstream} # --- Reasoning --- def build_reasoning_text( history: np.ndarray, flagged_idx: int, stage: int, pattern_type: int, ) -> str: """Templated reasoning grounded in cross-position signals.""" probe_count = signal_probe_density(history, flagged_idx) post_count = signal_post_attack_density(history, flagged_idx) novel_device = signal_novel_device(history, flagged_idx) sig_clean = signal_signature_clean(history, flagged_idx) recent_auth = signal_recent_authorize_density(history, flagged_idx) stage_name = STAGE_NAMES[stage] type_name = TYPE_NAMES[pattern_type] parts: list[str] = [] parts.append( f"Pattern verdict: stage={stage_name}, type={type_name}." ) parts.append( f"Cross-position signals — probe-cluster density: {probe_count} small-CNP " f"in the {PROBE_WINDOW}-tx window before tx{flagged_idx}, " f"post-attack density: {post_count} large unfamiliar charges around " f"the flag, novel-device: {novel_device}, signature-clean: {sig_clean}, " f"recent-authorize density: {recent_auth} CNP-to-unfamiliar in last 16." ) if stage == STAGE_PROBING: parts.append( "Probing pattern is consistent with card-testing: small " "card-not-present charges preceding the flag, attacker confirming " "the card works before escalation. Recommend containment + step-up auth." ) elif stage == STAGE_MONETIZATION: parts.append( "Monetization pattern: the probe phase succeeded and the attacker " "is converting access into value. The flagged transaction is the " "first big charge after probes. Recommend immediate block + customer outreach." ) elif stage == STAGE_EXFILTRATION: parts.append( "Exfiltration pattern: multiple large unfamiliar charges around " "the flag indicate the attack is mature. Recommend full card " "freeze + customer notification within the hour." ) elif stage == STAGE_DORMANT: parts.append( "Dormant pattern: the flagged transaction sits inside the customer's " "normal signature. Upstream detector is likely a false-positive. " "Recommend release with low priority follow-up." ) else: parts.append( "Pre-attack stage: a single anomalous transaction with no chain " "evidence yet. Recommend step-up auth and watch the next 24 hours." ) if pattern_type == TYPE_ACCOUNT_TAKEOVER: parts.append( "Underlying type is account_takeover — the device fingerprint at " "the flag is one the customer has not used elsewhere in this history. " "Treat as credential / device compromise." ) elif pattern_type == TYPE_VICTIM_FRAUD: parts.append( "Underlying type is victim_fraud — device matches the customer's " "own, but the transaction pattern is anomalous. Likely the customer " "was tricked into authorizing or shared credentials." ) elif pattern_type == TYPE_SCAM_REDIRECTED: parts.append( "Underlying type is scam_redirected — recent history shows the " "customer authorizing many one-time payments to unfamiliar merchants. " "Pattern is consistent with romance / impostor scam." ) else: parts.append( "Underlying type is declined_legit — the flagged tx looks like a " "normal customer purchase. The upstream score is plausibly a " "false-positive of the rules-based detector." ) return " ".join(parts) # --- Example dataclass + corpus generation --- @dataclass class FraudPatternExample: customer_idx: int flagged_idx: int context_text: str stage_label: int type_label: int attribution_labels: list[float] reasoning_text: str tone: str is_adversarial: bool context_vars: dict[str, Any] def to_dict(self) -> dict[str, Any]: return asdict(self) def _apply_adversarial_perturbation(text: str, rng: random.Random) -> str: chars = list(text) for i in range(len(chars)): if chars[i].isalpha() and rng.random() < 0.10: chars[i] = chars[i].swapcase() if rng.random() < 0.3 and " " in text: idx = text.index(" ") chars.insert(idx, " ") return "".join(chars) def synthesize_one( history: np.ndarray, customer_idx: int, flagged_idx: int, rng: random.Random, adversarial: bool = False, ) -> FraudPatternExample: stage = classify_stage(history, flagged_idx) ptype = classify_type(history, flagged_idx) attribution = attribution_for_pattern(history, flagged_idx, stage) context_text, tone, context_vars = _build_context_text(flagged_idx, rng) if adversarial: context_text = _apply_adversarial_perturbation(context_text, rng) reasoning = build_reasoning_text(history, flagged_idx, stage, ptype) return FraudPatternExample( customer_idx=customer_idx, flagged_idx=flagged_idx, context_text=context_text, stage_label=stage, type_label=ptype, attribution_labels=attribution.tolist(), reasoning_text=reasoning, tone=tone, is_adversarial=adversarial, context_vars=context_vars, ) def generate_corpus( histories: np.ndarray, train_indices: np.ndarray, target_size: int = 4000, seed: int = 42, adversarial_fraction: float = 0.10, ) -> list[FraudPatternExample]: """Broad corpus: random customer + random flagged_idx, accept whatever label falls out. Targeted slices add rare-class density separately. """ rng = random.Random(seed) np_rng = np.random.RandomState(seed) examples: list[FraudPatternExample] = [] n_adv_target = int(target_size * adversarial_fraction) n_adv = 0 attempts = 0 max_attempts = target_size * 4 while len(examples) < target_size and attempts < max_attempts: attempts += 1 customer_idx = int(np_rng.choice(train_indices)) history = histories[customer_idx] # Sample a flagged_idx weighted toward later positions (most attacks # surface late in the history). flagged_idx = int(np_rng.randint(8, 64)) adversarial = (n_adv < n_adv_target) and (rng.random() < 0.15) example = synthesize_one( history, customer_idx, flagged_idx, rng, adversarial=adversarial, ) examples.append(example) if adversarial: n_adv += 1 rng.shuffle(examples) return examples def write_jsonl( examples: list[FraudPatternExample], output_path: Path | str, ) -> None: output_path = Path(output_path) with output_path.open("w") as f: for example in examples: f.write(json.dumps(example.to_dict()) + "\n")