lfm2-transaction-encoder / encoder /src /data /synthetic_fraud_pattern.py
cdotsanghvi's picture
initial transaction co-pilot deployment
b3112c7
Raw
History Blame Contribute Delete
19.2 kB
"""Programmatic synthesizer for the fraud-pattern surface.
Mirrors the dispute / collections synthesizers but emits TWO categorical
labels per example:
pattern_stage (5-class): pre_attack / probing / monetization /
exfiltration / dormant
pattern_type (4-class): victim_fraud / account_takeover /
scam_redirected / declined_legit
Inputs the model conditions on:
feature_ids (B, 64, 15) customer transaction history
flagged_idx (B,) position the upstream detector flagged
context_text free-form analyst note + upstream fraud score
The cross-position properties the labels depend on:
probe_cluster_density count of small-amount CNP in window before flagged
post_attack_high_density count of large-amount unfamiliar tx around flagged
novel_device_score 1.0 if flagged device_hash appears only at flagged
sig_clean_score 1.0 if flagged tx matches customer's mode pattern
(home country, CVV match, AVS match, familiar merchant)
These mirror the dispute markers (subscription, exotic_country) and the
collections markers (velocity, sub_burden, etc.) — local biases at the
flagged position that make the label readable to a 350M backbone.
"""
from __future__ import annotations
import json
import random
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any
import numpy as np
# --- Feature column indices (must match data/schema.yaml) ---
FEATURE_ENTRY_MODE = 7
FEATURE_AMOUNT = 8
FEATURE_COUNTRY = 10
FEATURE_AVS = 11
FEATURE_CVV = 12
FEATURE_DEVICE_HASH = 13
FEATURE_MERCHANT_ID = 5
FEATURE_CUSTOMER_MERCHANT_COUNT = 6
RESERVED_OFFSET = 3
# Entry mode values: 0=card_present, 1=card_not_present → tokens 3, 4
ENTRY_CNP = RESERVED_OFFSET + 1 # = 4
# Verification tokens
CVV_MATCH = RESERVED_OFFSET + 0 # 3
CVV_NO_MATCH = RESERVED_OFFSET + 1 # 4
AVS_FULL_MATCH = RESERVED_OFFSET + 0 # 3
AVS_NO_MATCH = RESERVED_OFFSET + 3 # 6
# Amount thresholds
AMOUNT_SMALL_THRESH = RESERVED_OFFSET + 8 # token <= 11 = ~$5 ≈ probe-size
AMOUNT_LARGE_THRESH = RESERVED_OFFSET + 150 # token >= 153 = ~$150+ = monetization-size
# Customer-merchant count thresholds (token = bucket + 3)
CMC_FAMILIAR = RESERVED_OFFSET + 5 # bucket >= 5 = "knows merchant"
CMC_UNFAMILIAR = RESERVED_OFFSET + 0 # bucket 0 = "never seen"
# Window sizes
PROBE_WINDOW = 6 # check 6 tx before flagged for probing cluster
POST_ATTACK_WINDOW = 6 # check flagged + 5 after for exfiltration density
RECENT_AUTHORIZE_WINDOW = 16 # for SCAM_REDIRECTED detection
# Class labels.
STAGE_PRE_ATTACK = 0
STAGE_PROBING = 1
STAGE_MONETIZATION = 2
STAGE_EXFILTRATION = 3
STAGE_DORMANT = 4
NUM_STAGES = 5
STAGE_NAMES = ["pre_attack", "probing", "monetization", "exfiltration", "dormant"]
TYPE_VICTIM_FRAUD = 0
TYPE_ACCOUNT_TAKEOVER = 1
TYPE_SCAM_REDIRECTED = 2
TYPE_DECLINED_LEGIT = 3
NUM_TYPES = 4
TYPE_NAMES = ["victim_fraud", "account_takeover", "scam_redirected", "declined_legit"]
# --- Cross-position signals (mirror in encoder forward) ---
def signal_probe_density(history: np.ndarray, flagged_idx: int) -> int:
"""Count of small-amount CNP transactions in the PROBE_WINDOW before flagged."""
start = max(0, flagged_idx - PROBE_WINDOW)
window = history[start:flagged_idx]
cnp = window[:, FEATURE_ENTRY_MODE] == ENTRY_CNP
small = window[:, FEATURE_AMOUNT] <= AMOUNT_SMALL_THRESH
return int(np.sum(cnp & small))
def signal_post_attack_density(history: np.ndarray, flagged_idx: int) -> int:
"""Count of large-amount unfamiliar-merchant transactions in the
POST_ATTACK_WINDOW around flagged (inclusive of flagged + after)."""
end = min(64, flagged_idx + POST_ATTACK_WINDOW)
window = history[flagged_idx:end]
large = window[:, FEATURE_AMOUNT] >= AMOUNT_LARGE_THRESH
unfamiliar = window[:, FEATURE_CUSTOMER_MERCHANT_COUNT] <= CMC_UNFAMILIAR + 1
return int(np.sum(large & unfamiliar))
def signal_novel_device(history: np.ndarray, flagged_idx: int) -> bool:
"""True iff the flagged transaction's device_hash appears nowhere else."""
flagged_device = history[flagged_idx, FEATURE_DEVICE_HASH]
matches = (history[:, FEATURE_DEVICE_HASH] == flagged_device).sum()
return matches <= 1
def signal_signature_clean(history: np.ndarray, flagged_idx: int) -> bool:
"""True iff the flagged transaction looks normal for this customer:
home country, CVV match, AVS match, familiar merchant."""
countries = history[:, FEATURE_COUNTRY]
mode_country = int(np.bincount(countries).argmax())
return bool(
history[flagged_idx, FEATURE_COUNTRY] == mode_country
and history[flagged_idx, FEATURE_CVV] == CVV_MATCH
and history[flagged_idx, FEATURE_AVS] == AVS_FULL_MATCH
and history[flagged_idx, FEATURE_CUSTOMER_MERCHANT_COUNT] >= CMC_FAMILIAR
)
def signal_recent_authorize_density(history: np.ndarray, flagged_idx: int) -> int:
"""Count of CNP transactions at unfamiliar merchants in the recent window.
A SCAM_REDIRECTED customer authorizes many one-time payments to
unfamiliar merchants (often the scammer's payment processors). This
differs from PROBING because the amounts are typical (not micro-CNP)
and the device is the customer's own.
"""
start = max(0, flagged_idx - RECENT_AUTHORIZE_WINDOW)
window = history[start:flagged_idx + 1]
cnp = window[:, FEATURE_ENTRY_MODE] == ENTRY_CNP
unfamiliar = window[:, FEATURE_CUSTOMER_MERCHANT_COUNT] <= CMC_UNFAMILIAR + 1
return int(np.sum(cnp & unfamiliar))
# --- Stage rule ---
def classify_stage(history: np.ndarray, flagged_idx: int) -> int:
"""Programmatic 5-class pattern stage from cross-position signals."""
probe_count = signal_probe_density(history, flagged_idx)
post_count = signal_post_attack_density(history, flagged_idx)
flagged_amount = int(history[flagged_idx, FEATURE_AMOUNT])
flagged_cmc = int(history[flagged_idx, FEATURE_CUSTOMER_MERCHANT_COUNT])
sig_clean = signal_signature_clean(history, flagged_idx)
# DORMANT: customer's pattern is unchanged at the flagged tx —
# upstream detector likely false-positive.
if sig_clean and probe_count == 0 and post_count == 0:
return STAGE_DORMANT
# EXFILTRATION: probing + multiple large unfamiliar charges
# (probing succeeded; attacker is harvesting).
if probe_count >= 3 and post_count >= 2:
return STAGE_EXFILTRATION
# MONETIZATION: probing followed by a single large unfamiliar charge
# (probe-into-big-purchase, the most common attack shape).
if probe_count >= 3 and (
flagged_amount >= AMOUNT_LARGE_THRESH
and flagged_cmc <= CMC_UNFAMILIAR + 1
):
return STAGE_MONETIZATION
# PROBING: 3+ probes preceding but no big charge yet.
if probe_count >= 3:
return STAGE_PROBING
# EXFILTRATION (no-probe variant): multiple large unfamiliar charges
# in the window. Attacker skipped probing or it's elsewhere.
if post_count >= 2:
return STAGE_EXFILTRATION
# PRE_ATTACK: weak signal — single anomalous transaction, no chain.
return STAGE_PRE_ATTACK
# --- Type rule ---
def classify_type(history: np.ndarray, flagged_idx: int) -> int:
"""Programmatic 4-class pattern type from cross-position signals.
Decision order:
1. ACCOUNT_TAKEOVER if device is novel
2. DECLINED_LEGIT if signature is clean
3. SCAM_REDIRECTED if customer authorized many recent CNP-to-unfamiliar
4. VICTIM_FRAUD otherwise (default)
"""
if signal_novel_device(history, flagged_idx):
return TYPE_ACCOUNT_TAKEOVER
if signal_signature_clean(history, flagged_idx):
return TYPE_DECLINED_LEGIT
if signal_recent_authorize_density(history, flagged_idx) >= 5:
return TYPE_SCAM_REDIRECTED
return TYPE_VICTIM_FRAUD
def classify_pattern(history: np.ndarray, flagged_idx: int) -> tuple[int, int]:
"""(stage, type) per the rules above."""
return classify_stage(history, flagged_idx), classify_type(history, flagged_idx)
# --- Attribution ---
def attribution_for_pattern(
history: np.ndarray,
flagged_idx: int,
stage: int,
) -> np.ndarray:
"""Per-position contribution labels keyed off the stage.
The attribution glow highlights the positions an investigator should
look at first. For probing/monetization: the small-CNP cluster
preceding. For exfiltration: the large-charge cluster around flagged.
For dormant: just the flagged position (no other signal to attend to).
For pre_attack: the flagged position + any visible anomaly.
"""
attr = np.zeros(64, dtype=np.float32)
attr[flagged_idx] = 1.0
if stage in (STAGE_PROBING, STAGE_MONETIZATION, STAGE_EXFILTRATION):
start = max(0, flagged_idx - PROBE_WINDOW)
window = history[start:flagged_idx]
cnp = window[:, FEATURE_ENTRY_MODE] == ENTRY_CNP
small = window[:, FEATURE_AMOUNT] <= AMOUNT_SMALL_THRESH
for i, hit in enumerate(cnp & small):
if hit:
attr[start + i] = 1.0
if stage in (STAGE_EXFILTRATION, STAGE_MONETIZATION):
end = min(64, flagged_idx + POST_ATTACK_WINDOW)
window = history[flagged_idx:end]
large = window[:, FEATURE_AMOUNT] >= AMOUNT_LARGE_THRESH
unfamiliar = window[:, FEATURE_CUSTOMER_MERCHANT_COUNT] <= CMC_UNFAMILIAR + 1
for i, hit in enumerate(large & unfamiliar):
if hit:
attr[flagged_idx + i] = 1.0
if stage == STAGE_PRE_ATTACK:
# Mark any preceding tx with a verification anomaly.
window = history[max(0, flagged_idx - 8):flagged_idx]
cvv_bad = window[:, FEATURE_CVV] == CVV_NO_MATCH
avs_bad = window[:, FEATURE_AVS] == AVS_NO_MATCH
start = max(0, flagged_idx - 8)
for i, hit in enumerate(cvv_bad | avs_bad):
if hit:
attr[start + i] = 1.0
return attr
# --- Context text bank ---
CONTEXT_TEMPLATES: dict[str, list[str]] = {
"formal": [
"Upstream fraud detector flagged transaction #{flagged_idx} at score {upstream:.2f}. Please assess pattern stage and type.",
"Transaction {flagged_idx} flagged for review (detector score {upstream:.2f}). Recommend pattern classification.",
"Investigation requested for transaction {flagged_idx}. Upstream model score {upstream:.2f}. Stage + type?",
],
"casual": [
"Hey, tx {flagged_idx} pinged the fraud detector at {upstream:.2f}. What's going on?",
"Got a flag on transaction {flagged_idx}, detector says {upstream:.2f}. Pattern call?",
"Fraud queue: tx {flagged_idx} at {upstream:.2f}. Take a look?",
],
"terse": [
"tx{flagged_idx} flagged @ {upstream:.2f}. Classify.",
"Flag tx {flagged_idx}, score {upstream:.2f}.",
"Fraud tx{flagged_idx} {upstream:.2f}.",
],
"detailed": [
"The upstream fraud detector escalated transaction {flagged_idx} with a score of {upstream:.2f}. The customer's recent history shows mixed signals — request model classification on pattern stage and underlying type.",
"Investigator review on tx {flagged_idx} (detector {upstream:.2f}). Need stage (probing/monetization/etc.) plus underlying type (victim/takeover/scam/false-positive).",
],
"urgent": [
"URGENT: tx {flagged_idx} flagged at {upstream:.2f}. Pre-decline window closing — classify now.",
"Time-sensitive fraud queue: tx {flagged_idx}, score {upstream:.2f}. Need stage + type asap.",
],
}
def _build_context_text(
flagged_idx: int,
rng: random.Random,
) -> tuple[str, str, dict[str, Any]]:
"""Render an analyst-facing flag context."""
upstream = round(rng.uniform(0.55, 0.95), 2)
tone = rng.choice(list(CONTEXT_TEMPLATES.keys()))
template = rng.choice(CONTEXT_TEMPLATES[tone])
text = template.format(flagged_idx=flagged_idx, upstream=upstream)
return text, tone, {"flagged_idx": flagged_idx, "upstream_score": upstream}
# --- Reasoning ---
def build_reasoning_text(
history: np.ndarray,
flagged_idx: int,
stage: int,
pattern_type: int,
) -> str:
"""Templated reasoning grounded in cross-position signals."""
probe_count = signal_probe_density(history, flagged_idx)
post_count = signal_post_attack_density(history, flagged_idx)
novel_device = signal_novel_device(history, flagged_idx)
sig_clean = signal_signature_clean(history, flagged_idx)
recent_auth = signal_recent_authorize_density(history, flagged_idx)
stage_name = STAGE_NAMES[stage]
type_name = TYPE_NAMES[pattern_type]
parts: list[str] = []
parts.append(
f"Pattern verdict: stage={stage_name}, type={type_name}."
)
parts.append(
f"Cross-position signals — probe-cluster density: {probe_count} small-CNP "
f"in the {PROBE_WINDOW}-tx window before tx{flagged_idx}, "
f"post-attack density: {post_count} large unfamiliar charges around "
f"the flag, novel-device: {novel_device}, signature-clean: {sig_clean}, "
f"recent-authorize density: {recent_auth} CNP-to-unfamiliar in last 16."
)
if stage == STAGE_PROBING:
parts.append(
"Probing pattern is consistent with card-testing: small "
"card-not-present charges preceding the flag, attacker confirming "
"the card works before escalation. Recommend containment + step-up auth."
)
elif stage == STAGE_MONETIZATION:
parts.append(
"Monetization pattern: the probe phase succeeded and the attacker "
"is converting access into value. The flagged transaction is the "
"first big charge after probes. Recommend immediate block + customer outreach."
)
elif stage == STAGE_EXFILTRATION:
parts.append(
"Exfiltration pattern: multiple large unfamiliar charges around "
"the flag indicate the attack is mature. Recommend full card "
"freeze + customer notification within the hour."
)
elif stage == STAGE_DORMANT:
parts.append(
"Dormant pattern: the flagged transaction sits inside the customer's "
"normal signature. Upstream detector is likely a false-positive. "
"Recommend release with low priority follow-up."
)
else:
parts.append(
"Pre-attack stage: a single anomalous transaction with no chain "
"evidence yet. Recommend step-up auth and watch the next 24 hours."
)
if pattern_type == TYPE_ACCOUNT_TAKEOVER:
parts.append(
"Underlying type is account_takeover — the device fingerprint at "
"the flag is one the customer has not used elsewhere in this history. "
"Treat as credential / device compromise."
)
elif pattern_type == TYPE_VICTIM_FRAUD:
parts.append(
"Underlying type is victim_fraud — device matches the customer's "
"own, but the transaction pattern is anomalous. Likely the customer "
"was tricked into authorizing or shared credentials."
)
elif pattern_type == TYPE_SCAM_REDIRECTED:
parts.append(
"Underlying type is scam_redirected — recent history shows the "
"customer authorizing many one-time payments to unfamiliar merchants. "
"Pattern is consistent with romance / impostor scam."
)
else:
parts.append(
"Underlying type is declined_legit — the flagged tx looks like a "
"normal customer purchase. The upstream score is plausibly a "
"false-positive of the rules-based detector."
)
return " ".join(parts)
# --- Example dataclass + corpus generation ---
@dataclass
class FraudPatternExample:
customer_idx: int
flagged_idx: int
context_text: str
stage_label: int
type_label: int
attribution_labels: list[float]
reasoning_text: str
tone: str
is_adversarial: bool
context_vars: dict[str, Any]
def to_dict(self) -> dict[str, Any]:
return asdict(self)
def _apply_adversarial_perturbation(text: str, rng: random.Random) -> str:
chars = list(text)
for i in range(len(chars)):
if chars[i].isalpha() and rng.random() < 0.10:
chars[i] = chars[i].swapcase()
if rng.random() < 0.3 and " " in text:
idx = text.index(" ")
chars.insert(idx, " ")
return "".join(chars)
def synthesize_one(
history: np.ndarray,
customer_idx: int,
flagged_idx: int,
rng: random.Random,
adversarial: bool = False,
) -> FraudPatternExample:
stage = classify_stage(history, flagged_idx)
ptype = classify_type(history, flagged_idx)
attribution = attribution_for_pattern(history, flagged_idx, stage)
context_text, tone, context_vars = _build_context_text(flagged_idx, rng)
if adversarial:
context_text = _apply_adversarial_perturbation(context_text, rng)
reasoning = build_reasoning_text(history, flagged_idx, stage, ptype)
return FraudPatternExample(
customer_idx=customer_idx,
flagged_idx=flagged_idx,
context_text=context_text,
stage_label=stage,
type_label=ptype,
attribution_labels=attribution.tolist(),
reasoning_text=reasoning,
tone=tone,
is_adversarial=adversarial,
context_vars=context_vars,
)
def generate_corpus(
histories: np.ndarray,
train_indices: np.ndarray,
target_size: int = 4000,
seed: int = 42,
adversarial_fraction: float = 0.10,
) -> list[FraudPatternExample]:
"""Broad corpus: random customer + random flagged_idx, accept whatever
label falls out. Targeted slices add rare-class density separately.
"""
rng = random.Random(seed)
np_rng = np.random.RandomState(seed)
examples: list[FraudPatternExample] = []
n_adv_target = int(target_size * adversarial_fraction)
n_adv = 0
attempts = 0
max_attempts = target_size * 4
while len(examples) < target_size and attempts < max_attempts:
attempts += 1
customer_idx = int(np_rng.choice(train_indices))
history = histories[customer_idx]
# Sample a flagged_idx weighted toward later positions (most attacks
# surface late in the history).
flagged_idx = int(np_rng.randint(8, 64))
adversarial = (n_adv < n_adv_target) and (rng.random() < 0.15)
example = synthesize_one(
history, customer_idx, flagged_idx, rng, adversarial=adversarial,
)
examples.append(example)
if adversarial:
n_adv += 1
rng.shuffle(examples)
return examples
def write_jsonl(
examples: list[FraudPatternExample],
output_path: Path | str,
) -> None:
output_path = Path(output_path)
with output_path.open("w") as f:
for example in examples:
f.write(json.dumps(example.to_dict()) + "\n")