| """Programmatic synthesizer for the fraud-pattern surface. |
| |
| Mirrors the dispute / collections synthesizers but emits TWO categorical |
| labels per example: |
| |
| pattern_stage (5-class): pre_attack / probing / monetization / |
| exfiltration / dormant |
| pattern_type (4-class): victim_fraud / account_takeover / |
| scam_redirected / declined_legit |
| |
| Inputs the model conditions on: |
| |
| feature_ids (B, 64, 15) customer transaction history |
| flagged_idx (B,) position the upstream detector flagged |
| context_text free-form analyst note + upstream fraud score |
| |
| The cross-position properties the labels depend on: |
| |
| probe_cluster_density count of small-amount CNP in window before flagged |
| post_attack_high_density count of large-amount unfamiliar tx around flagged |
| novel_device_score 1.0 if flagged device_hash appears only at flagged |
| sig_clean_score 1.0 if flagged tx matches customer's mode pattern |
| (home country, CVV match, AVS match, familiar merchant) |
| |
| These mirror the dispute markers (subscription, exotic_country) and the |
| collections markers (velocity, sub_burden, etc.) — local biases at the |
| flagged position that make the label readable to a 350M backbone. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import random |
| from dataclasses import asdict, dataclass |
| from pathlib import Path |
| from typing import Any |
|
|
| import numpy as np |
|
|
|
|
| |
| FEATURE_ENTRY_MODE = 7 |
| FEATURE_AMOUNT = 8 |
| FEATURE_COUNTRY = 10 |
| FEATURE_AVS = 11 |
| FEATURE_CVV = 12 |
| FEATURE_DEVICE_HASH = 13 |
| FEATURE_MERCHANT_ID = 5 |
| FEATURE_CUSTOMER_MERCHANT_COUNT = 6 |
|
|
| RESERVED_OFFSET = 3 |
|
|
| |
| ENTRY_CNP = RESERVED_OFFSET + 1 |
|
|
| |
| CVV_MATCH = RESERVED_OFFSET + 0 |
| CVV_NO_MATCH = RESERVED_OFFSET + 1 |
| AVS_FULL_MATCH = RESERVED_OFFSET + 0 |
| AVS_NO_MATCH = RESERVED_OFFSET + 3 |
|
|
| |
| AMOUNT_SMALL_THRESH = RESERVED_OFFSET + 8 |
| AMOUNT_LARGE_THRESH = RESERVED_OFFSET + 150 |
|
|
| |
| CMC_FAMILIAR = RESERVED_OFFSET + 5 |
| CMC_UNFAMILIAR = RESERVED_OFFSET + 0 |
|
|
| |
| PROBE_WINDOW = 6 |
| POST_ATTACK_WINDOW = 6 |
| RECENT_AUTHORIZE_WINDOW = 16 |
|
|
| |
| STAGE_PRE_ATTACK = 0 |
| STAGE_PROBING = 1 |
| STAGE_MONETIZATION = 2 |
| STAGE_EXFILTRATION = 3 |
| STAGE_DORMANT = 4 |
| NUM_STAGES = 5 |
| STAGE_NAMES = ["pre_attack", "probing", "monetization", "exfiltration", "dormant"] |
|
|
| TYPE_VICTIM_FRAUD = 0 |
| TYPE_ACCOUNT_TAKEOVER = 1 |
| TYPE_SCAM_REDIRECTED = 2 |
| TYPE_DECLINED_LEGIT = 3 |
| NUM_TYPES = 4 |
| TYPE_NAMES = ["victim_fraud", "account_takeover", "scam_redirected", "declined_legit"] |
|
|
|
|
| |
|
|
|
|
| def signal_probe_density(history: np.ndarray, flagged_idx: int) -> int: |
| """Count of small-amount CNP transactions in the PROBE_WINDOW before flagged.""" |
| start = max(0, flagged_idx - PROBE_WINDOW) |
| window = history[start:flagged_idx] |
| cnp = window[:, FEATURE_ENTRY_MODE] == ENTRY_CNP |
| small = window[:, FEATURE_AMOUNT] <= AMOUNT_SMALL_THRESH |
| return int(np.sum(cnp & small)) |
|
|
|
|
| def signal_post_attack_density(history: np.ndarray, flagged_idx: int) -> int: |
| """Count of large-amount unfamiliar-merchant transactions in the |
| POST_ATTACK_WINDOW around flagged (inclusive of flagged + after).""" |
| end = min(64, flagged_idx + POST_ATTACK_WINDOW) |
| window = history[flagged_idx:end] |
| large = window[:, FEATURE_AMOUNT] >= AMOUNT_LARGE_THRESH |
| unfamiliar = window[:, FEATURE_CUSTOMER_MERCHANT_COUNT] <= CMC_UNFAMILIAR + 1 |
| return int(np.sum(large & unfamiliar)) |
|
|
|
|
| def signal_novel_device(history: np.ndarray, flagged_idx: int) -> bool: |
| """True iff the flagged transaction's device_hash appears nowhere else.""" |
| flagged_device = history[flagged_idx, FEATURE_DEVICE_HASH] |
| matches = (history[:, FEATURE_DEVICE_HASH] == flagged_device).sum() |
| return matches <= 1 |
|
|
|
|
| def signal_signature_clean(history: np.ndarray, flagged_idx: int) -> bool: |
| """True iff the flagged transaction looks normal for this customer: |
| home country, CVV match, AVS match, familiar merchant.""" |
| countries = history[:, FEATURE_COUNTRY] |
| mode_country = int(np.bincount(countries).argmax()) |
| return bool( |
| history[flagged_idx, FEATURE_COUNTRY] == mode_country |
| and history[flagged_idx, FEATURE_CVV] == CVV_MATCH |
| and history[flagged_idx, FEATURE_AVS] == AVS_FULL_MATCH |
| and history[flagged_idx, FEATURE_CUSTOMER_MERCHANT_COUNT] >= CMC_FAMILIAR |
| ) |
|
|
|
|
| def signal_recent_authorize_density(history: np.ndarray, flagged_idx: int) -> int: |
| """Count of CNP transactions at unfamiliar merchants in the recent window. |
| |
| A SCAM_REDIRECTED customer authorizes many one-time payments to |
| unfamiliar merchants (often the scammer's payment processors). This |
| differs from PROBING because the amounts are typical (not micro-CNP) |
| and the device is the customer's own. |
| """ |
| start = max(0, flagged_idx - RECENT_AUTHORIZE_WINDOW) |
| window = history[start:flagged_idx + 1] |
| cnp = window[:, FEATURE_ENTRY_MODE] == ENTRY_CNP |
| unfamiliar = window[:, FEATURE_CUSTOMER_MERCHANT_COUNT] <= CMC_UNFAMILIAR + 1 |
| return int(np.sum(cnp & unfamiliar)) |
|
|
|
|
| |
|
|
|
|
| def classify_stage(history: np.ndarray, flagged_idx: int) -> int: |
| """Programmatic 5-class pattern stage from cross-position signals.""" |
| probe_count = signal_probe_density(history, flagged_idx) |
| post_count = signal_post_attack_density(history, flagged_idx) |
| flagged_amount = int(history[flagged_idx, FEATURE_AMOUNT]) |
| flagged_cmc = int(history[flagged_idx, FEATURE_CUSTOMER_MERCHANT_COUNT]) |
| sig_clean = signal_signature_clean(history, flagged_idx) |
|
|
| |
| |
| if sig_clean and probe_count == 0 and post_count == 0: |
| return STAGE_DORMANT |
|
|
| |
| |
| if probe_count >= 3 and post_count >= 2: |
| return STAGE_EXFILTRATION |
|
|
| |
| |
| if probe_count >= 3 and ( |
| flagged_amount >= AMOUNT_LARGE_THRESH |
| and flagged_cmc <= CMC_UNFAMILIAR + 1 |
| ): |
| return STAGE_MONETIZATION |
|
|
| |
| if probe_count >= 3: |
| return STAGE_PROBING |
|
|
| |
| |
| if post_count >= 2: |
| return STAGE_EXFILTRATION |
|
|
| |
| return STAGE_PRE_ATTACK |
|
|
|
|
| |
|
|
|
|
| def classify_type(history: np.ndarray, flagged_idx: int) -> int: |
| """Programmatic 4-class pattern type from cross-position signals. |
| |
| Decision order: |
| 1. ACCOUNT_TAKEOVER if device is novel |
| 2. DECLINED_LEGIT if signature is clean |
| 3. SCAM_REDIRECTED if customer authorized many recent CNP-to-unfamiliar |
| 4. VICTIM_FRAUD otherwise (default) |
| """ |
| if signal_novel_device(history, flagged_idx): |
| return TYPE_ACCOUNT_TAKEOVER |
| if signal_signature_clean(history, flagged_idx): |
| return TYPE_DECLINED_LEGIT |
| if signal_recent_authorize_density(history, flagged_idx) >= 5: |
| return TYPE_SCAM_REDIRECTED |
| return TYPE_VICTIM_FRAUD |
|
|
|
|
| def classify_pattern(history: np.ndarray, flagged_idx: int) -> tuple[int, int]: |
| """(stage, type) per the rules above.""" |
| return classify_stage(history, flagged_idx), classify_type(history, flagged_idx) |
|
|
|
|
| |
|
|
|
|
| def attribution_for_pattern( |
| history: np.ndarray, |
| flagged_idx: int, |
| stage: int, |
| ) -> np.ndarray: |
| """Per-position contribution labels keyed off the stage. |
| |
| The attribution glow highlights the positions an investigator should |
| look at first. For probing/monetization: the small-CNP cluster |
| preceding. For exfiltration: the large-charge cluster around flagged. |
| For dormant: just the flagged position (no other signal to attend to). |
| For pre_attack: the flagged position + any visible anomaly. |
| """ |
| attr = np.zeros(64, dtype=np.float32) |
| attr[flagged_idx] = 1.0 |
|
|
| if stage in (STAGE_PROBING, STAGE_MONETIZATION, STAGE_EXFILTRATION): |
| start = max(0, flagged_idx - PROBE_WINDOW) |
| window = history[start:flagged_idx] |
| cnp = window[:, FEATURE_ENTRY_MODE] == ENTRY_CNP |
| small = window[:, FEATURE_AMOUNT] <= AMOUNT_SMALL_THRESH |
| for i, hit in enumerate(cnp & small): |
| if hit: |
| attr[start + i] = 1.0 |
|
|
| if stage in (STAGE_EXFILTRATION, STAGE_MONETIZATION): |
| end = min(64, flagged_idx + POST_ATTACK_WINDOW) |
| window = history[flagged_idx:end] |
| large = window[:, FEATURE_AMOUNT] >= AMOUNT_LARGE_THRESH |
| unfamiliar = window[:, FEATURE_CUSTOMER_MERCHANT_COUNT] <= CMC_UNFAMILIAR + 1 |
| for i, hit in enumerate(large & unfamiliar): |
| if hit: |
| attr[flagged_idx + i] = 1.0 |
|
|
| if stage == STAGE_PRE_ATTACK: |
| |
| window = history[max(0, flagged_idx - 8):flagged_idx] |
| cvv_bad = window[:, FEATURE_CVV] == CVV_NO_MATCH |
| avs_bad = window[:, FEATURE_AVS] == AVS_NO_MATCH |
| start = max(0, flagged_idx - 8) |
| for i, hit in enumerate(cvv_bad | avs_bad): |
| if hit: |
| attr[start + i] = 1.0 |
|
|
| return attr |
|
|
|
|
| |
|
|
| CONTEXT_TEMPLATES: dict[str, list[str]] = { |
| "formal": [ |
| "Upstream fraud detector flagged transaction #{flagged_idx} at score {upstream:.2f}. Please assess pattern stage and type.", |
| "Transaction {flagged_idx} flagged for review (detector score {upstream:.2f}). Recommend pattern classification.", |
| "Investigation requested for transaction {flagged_idx}. Upstream model score {upstream:.2f}. Stage + type?", |
| ], |
| "casual": [ |
| "Hey, tx {flagged_idx} pinged the fraud detector at {upstream:.2f}. What's going on?", |
| "Got a flag on transaction {flagged_idx}, detector says {upstream:.2f}. Pattern call?", |
| "Fraud queue: tx {flagged_idx} at {upstream:.2f}. Take a look?", |
| ], |
| "terse": [ |
| "tx{flagged_idx} flagged @ {upstream:.2f}. Classify.", |
| "Flag tx {flagged_idx}, score {upstream:.2f}.", |
| "Fraud tx{flagged_idx} {upstream:.2f}.", |
| ], |
| "detailed": [ |
| "The upstream fraud detector escalated transaction {flagged_idx} with a score of {upstream:.2f}. The customer's recent history shows mixed signals — request model classification on pattern stage and underlying type.", |
| "Investigator review on tx {flagged_idx} (detector {upstream:.2f}). Need stage (probing/monetization/etc.) plus underlying type (victim/takeover/scam/false-positive).", |
| ], |
| "urgent": [ |
| "URGENT: tx {flagged_idx} flagged at {upstream:.2f}. Pre-decline window closing — classify now.", |
| "Time-sensitive fraud queue: tx {flagged_idx}, score {upstream:.2f}. Need stage + type asap.", |
| ], |
| } |
|
|
|
|
| def _build_context_text( |
| flagged_idx: int, |
| rng: random.Random, |
| ) -> tuple[str, str, dict[str, Any]]: |
| """Render an analyst-facing flag context.""" |
| upstream = round(rng.uniform(0.55, 0.95), 2) |
| tone = rng.choice(list(CONTEXT_TEMPLATES.keys())) |
| template = rng.choice(CONTEXT_TEMPLATES[tone]) |
| text = template.format(flagged_idx=flagged_idx, upstream=upstream) |
| return text, tone, {"flagged_idx": flagged_idx, "upstream_score": upstream} |
|
|
|
|
| |
|
|
|
|
| def build_reasoning_text( |
| history: np.ndarray, |
| flagged_idx: int, |
| stage: int, |
| pattern_type: int, |
| ) -> str: |
| """Templated reasoning grounded in cross-position signals.""" |
| probe_count = signal_probe_density(history, flagged_idx) |
| post_count = signal_post_attack_density(history, flagged_idx) |
| novel_device = signal_novel_device(history, flagged_idx) |
| sig_clean = signal_signature_clean(history, flagged_idx) |
| recent_auth = signal_recent_authorize_density(history, flagged_idx) |
|
|
| stage_name = STAGE_NAMES[stage] |
| type_name = TYPE_NAMES[pattern_type] |
|
|
| parts: list[str] = [] |
| parts.append( |
| f"Pattern verdict: stage={stage_name}, type={type_name}." |
| ) |
| parts.append( |
| f"Cross-position signals — probe-cluster density: {probe_count} small-CNP " |
| f"in the {PROBE_WINDOW}-tx window before tx{flagged_idx}, " |
| f"post-attack density: {post_count} large unfamiliar charges around " |
| f"the flag, novel-device: {novel_device}, signature-clean: {sig_clean}, " |
| f"recent-authorize density: {recent_auth} CNP-to-unfamiliar in last 16." |
| ) |
|
|
| if stage == STAGE_PROBING: |
| parts.append( |
| "Probing pattern is consistent with card-testing: small " |
| "card-not-present charges preceding the flag, attacker confirming " |
| "the card works before escalation. Recommend containment + step-up auth." |
| ) |
| elif stage == STAGE_MONETIZATION: |
| parts.append( |
| "Monetization pattern: the probe phase succeeded and the attacker " |
| "is converting access into value. The flagged transaction is the " |
| "first big charge after probes. Recommend immediate block + customer outreach." |
| ) |
| elif stage == STAGE_EXFILTRATION: |
| parts.append( |
| "Exfiltration pattern: multiple large unfamiliar charges around " |
| "the flag indicate the attack is mature. Recommend full card " |
| "freeze + customer notification within the hour." |
| ) |
| elif stage == STAGE_DORMANT: |
| parts.append( |
| "Dormant pattern: the flagged transaction sits inside the customer's " |
| "normal signature. Upstream detector is likely a false-positive. " |
| "Recommend release with low priority follow-up." |
| ) |
| else: |
| parts.append( |
| "Pre-attack stage: a single anomalous transaction with no chain " |
| "evidence yet. Recommend step-up auth and watch the next 24 hours." |
| ) |
|
|
| if pattern_type == TYPE_ACCOUNT_TAKEOVER: |
| parts.append( |
| "Underlying type is account_takeover — the device fingerprint at " |
| "the flag is one the customer has not used elsewhere in this history. " |
| "Treat as credential / device compromise." |
| ) |
| elif pattern_type == TYPE_VICTIM_FRAUD: |
| parts.append( |
| "Underlying type is victim_fraud — device matches the customer's " |
| "own, but the transaction pattern is anomalous. Likely the customer " |
| "was tricked into authorizing or shared credentials." |
| ) |
| elif pattern_type == TYPE_SCAM_REDIRECTED: |
| parts.append( |
| "Underlying type is scam_redirected — recent history shows the " |
| "customer authorizing many one-time payments to unfamiliar merchants. " |
| "Pattern is consistent with romance / impostor scam." |
| ) |
| else: |
| parts.append( |
| "Underlying type is declined_legit — the flagged tx looks like a " |
| "normal customer purchase. The upstream score is plausibly a " |
| "false-positive of the rules-based detector." |
| ) |
|
|
| return " ".join(parts) |
|
|
|
|
| |
|
|
|
|
| @dataclass |
| class FraudPatternExample: |
| customer_idx: int |
| flagged_idx: int |
| context_text: str |
| stage_label: int |
| type_label: int |
| attribution_labels: list[float] |
| reasoning_text: str |
| tone: str |
| is_adversarial: bool |
| context_vars: dict[str, Any] |
|
|
| def to_dict(self) -> dict[str, Any]: |
| return asdict(self) |
|
|
|
|
| def _apply_adversarial_perturbation(text: str, rng: random.Random) -> str: |
| chars = list(text) |
| for i in range(len(chars)): |
| if chars[i].isalpha() and rng.random() < 0.10: |
| chars[i] = chars[i].swapcase() |
| if rng.random() < 0.3 and " " in text: |
| idx = text.index(" ") |
| chars.insert(idx, " ") |
| return "".join(chars) |
|
|
|
|
| def synthesize_one( |
| history: np.ndarray, |
| customer_idx: int, |
| flagged_idx: int, |
| rng: random.Random, |
| adversarial: bool = False, |
| ) -> FraudPatternExample: |
| stage = classify_stage(history, flagged_idx) |
| ptype = classify_type(history, flagged_idx) |
| attribution = attribution_for_pattern(history, flagged_idx, stage) |
| context_text, tone, context_vars = _build_context_text(flagged_idx, rng) |
| if adversarial: |
| context_text = _apply_adversarial_perturbation(context_text, rng) |
| reasoning = build_reasoning_text(history, flagged_idx, stage, ptype) |
| return FraudPatternExample( |
| customer_idx=customer_idx, |
| flagged_idx=flagged_idx, |
| context_text=context_text, |
| stage_label=stage, |
| type_label=ptype, |
| attribution_labels=attribution.tolist(), |
| reasoning_text=reasoning, |
| tone=tone, |
| is_adversarial=adversarial, |
| context_vars=context_vars, |
| ) |
|
|
|
|
| def generate_corpus( |
| histories: np.ndarray, |
| train_indices: np.ndarray, |
| target_size: int = 4000, |
| seed: int = 42, |
| adversarial_fraction: float = 0.10, |
| ) -> list[FraudPatternExample]: |
| """Broad corpus: random customer + random flagged_idx, accept whatever |
| label falls out. Targeted slices add rare-class density separately. |
| """ |
| rng = random.Random(seed) |
| np_rng = np.random.RandomState(seed) |
| examples: list[FraudPatternExample] = [] |
| n_adv_target = int(target_size * adversarial_fraction) |
| n_adv = 0 |
|
|
| attempts = 0 |
| max_attempts = target_size * 4 |
| while len(examples) < target_size and attempts < max_attempts: |
| attempts += 1 |
| customer_idx = int(np_rng.choice(train_indices)) |
| history = histories[customer_idx] |
| |
| |
| flagged_idx = int(np_rng.randint(8, 64)) |
| adversarial = (n_adv < n_adv_target) and (rng.random() < 0.15) |
| example = synthesize_one( |
| history, customer_idx, flagged_idx, rng, adversarial=adversarial, |
| ) |
| examples.append(example) |
| if adversarial: |
| n_adv += 1 |
|
|
| rng.shuffle(examples) |
| return examples |
|
|
|
|
| def write_jsonl( |
| examples: list[FraudPatternExample], |
| output_path: Path | str, |
| ) -> None: |
| output_path = Path(output_path) |
| with output_path.open("w") as f: |
| for example in examples: |
| f.write(json.dumps(example.to_dict()) + "\n") |
|
|