cdotsanghvi's picture
add multi-head demo as 4th-6th tabs; restore Why Liquid + Integration
083b138
Raw
History Blame Contribute Delete
42.4 kB
"""Synthetic transaction data generator.
Design features:
1. Merchant catalog: 10K merchants with fixed MCC assignments and Zipf
popularity. Creates realistic merchant-MCC correlation the model can learn.
2. MCC-conditional amounts: transaction amounts are sampled from MCC-specific
distributions. Restaurants produce $15-80, electronics $200-2000, etc.
3. Profile blending: each customer is a mixture of primary + secondary
profile, not a pure archetype. Continuous behavioral diversity.
4. MCC-conditional entry_mode: online merchants -> CNP, restaurants -> tap/chip.
5. MCC-conditional hour: restaurants peak at lunch/dinner, transit at commute.
6. MCC-conditional is_recurring: subscriptions/streaming are high, restaurants low.
7. Customer_tenure-card_product correlation: new customers get basic cards.
8. Country-amount correlation: international transactions tend higher value.
Output format: token_ids.npy (N,T,F), sequence_labels.npy,
transaction_labels.npy, split_indices.npz, fingerprint.txt. Drives the
training pipeline via schema.yaml and model.yaml.
"""
from __future__ import annotations
import hashlib
import json
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import numpy as np
import yaml
from src.data.schema import SchemaConfig, load_schema, NULL_TOKEN, VALUES_START
from src.data.tokenizer import TransactionTokenizer
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Amount range bins: 16 coarse ranges over the 256 fine-grained amount buckets.
# Each range covers 16 consecutive buckets. Used as the prediction target for
# the amount_range fine-tuning head (replaces the 259-class amount head).
# Dollar labels are approximate (quantile buckets are non-linear).
# ---------------------------------------------------------------------------
NUM_AMOUNT_RANGES: int = 16
AMOUNT_RANGE_WIDTH: int = 16 # 256 buckets / 16 ranges
AMOUNT_RANGE_LABELS: dict[int, str] = {
0: "$0-5",
1: "$5-15",
2: "$15-30",
3: "$30-50",
4: "$50-80",
5: "$80-120",
6: "$120-175",
7: "$175-250",
8: "$250-375",
9: "$375-550",
10: "$550-800",
11: "$800-1.2K",
12: "$1.2K-2K",
13: "$2K-3.5K",
14: "$3.5K-7.5K",
15: "$7.5K+",
}
def amount_bucket_to_range(bucket: np.ndarray) -> np.ndarray:
"""Map 256 fine-grained amount buckets (0-255) to 16 coarse ranges (0-15)."""
return np.clip(bucket // AMOUNT_RANGE_WIDTH, 0, NUM_AMOUNT_RANGES - 1)
# ---------------------------------------------------------------------------
# MCC-conditional amount distributions (mean_bucket, std_bucket).
# 256 amount buckets spanning $0.01-$25K in quantile space. These bucket-index
# ranges are calibrated for behavioral plausibility across MCC categories.
# ---------------------------------------------------------------------------
MCC_AMOUNT_MAP: dict[tuple[int, int], tuple[float, float]] = {
(0, 3): (25.0, 12.0), # transit, coffee, gas: $3-20
(4, 9): (45.0, 20.0), # convenience, misc low: $8-50
(10, 14): (160.0, 55.0), # airlines, hotels, car rental: $80-800
(15, 19): (50.0, 30.0), # other transport: $10-100
(20, 24): (80.0, 45.0), # online retail general: $15-200
(25, 29): (55.0, 30.0), # online retail niche: $10-120
(30, 34): (55.0, 25.0), # restaurants, bars: $15-100
(35, 39): (40.0, 20.0), # other food service: $8-60
(40, 44): (185.0, 55.0), # luxury, dept stores, jewelry: $80-2000
(45, 49): (70.0, 35.0), # fashion retail: $20-200
(50, 54): (110.0, 50.0), # office supplies, wholesale: $30-500
(55, 59): (90.0, 50.0), # professional services: $30-400
(60, 64): (50.0, 22.0), # grocery, pharmacy: $12-120
(65, 69): (55.0, 30.0), # healthcare, fitness: $15-150
(70, 74): (22.0, 12.0), # fast food, entertainment: $4-25
(75, 79): (35.0, 20.0), # streaming, digital goods: $5-50
(80, 84): (30.0, 22.0), # subscriptions, SaaS: $5-80
(85, 89): (100.0, 55.0), # automotive: $25-500
(90, 94): (95.0, 55.0), # home improvement: $20-600
(95, 99): (75.0, 50.0), # education, government: $15-500
}
def _mcc_to_amount_params(mcc_value: int) -> tuple[float, float]:
"""Look up amount distribution params for an MCC value."""
for (lo, hi), params in MCC_AMOUNT_MAP.items():
if lo <= mcc_value <= hi:
return params
return (60.0, 40.0)
# ---------------------------------------------------------------------------
# MCC-conditional entry_mode weights.
# Indices: 0=card_present, 1=card_not_present, 2=contactless, 3=chip, 4=manual
# ---------------------------------------------------------------------------
MCC_ENTRY_MODE: dict[tuple[int, int], list[float]] = {
(0, 3): [2, 1, 5, 2, 0], # transit/coffee: contactless heavy
(10, 14): [1, 6, 1, 1, 0], # travel: mostly online bookings
(20, 29): [0, 8, 0, 0, 1], # online retail: card-not-present
(30, 39): [3, 0, 4, 3, 0], # restaurants: contactless/chip
(40, 49): [3, 2, 3, 2, 0], # fashion/luxury: mixed
(50, 59): [2, 4, 1, 1, 1], # business: mix of online + in-person
(60, 69): [3, 1, 4, 3, 0], # grocery/pharmacy: in-person
(70, 74): [2, 1, 5, 2, 0], # fast food: contactless
(75, 84): [0, 9, 0, 0, 0], # digital/subscriptions: all online
(85, 99): [3, 2, 2, 3, 0], # auto/home/edu: mixed
}
def _mcc_to_entry_mode_weights(mcc_value: int) -> list[float]:
"""Look up entry_mode distribution for an MCC value."""
for (lo, hi), weights in MCC_ENTRY_MODE.items():
if lo <= mcc_value <= hi:
return weights
return [2, 3, 2, 2, 1]
# ---------------------------------------------------------------------------
# MCC-conditional hour-of-day distributions.
# Peak hours where a category has higher probability. Applied as a 50% blend
# with the profile-sampled hour to keep per-customer variation.
# Format: (peak_hours, peak_weight) -- applied on a uniform-24 base.
# ---------------------------------------------------------------------------
MCC_HOUR_PEAKS: dict[tuple[int, int], tuple[list[int], float]] = {
(0, 3): ([7, 8, 9, 17, 18], 5.0), # transit/coffee: commute hours
(10, 14): ([9, 10, 14, 15, 20], 2.5), # travel: business hours + evening
(20, 29): ([10, 11, 20, 21, 22], 3.0), # online: late morning + evening
(30, 31): ([12, 13, 18, 19, 20], 6.0), # casual dining: lunch + dinner
(32, 34): ([19, 20, 21, 22, 23], 5.0), # fine dining/bars: evening + late
(35, 39): ([11, 12, 13, 17, 18], 4.0), # other food: lunch + early dinner
(60, 64): ([10, 11, 16, 17, 18], 4.0), # grocery/pharmacy: mid-morning + after work
(70, 71): ([12, 13, 18, 19, 21], 4.5), # fast food: lunch + dinner + late
(75, 84): ([0, 1, 6, 7], 2.0), # digital/subs: auto-renew any hour
}
# ---------------------------------------------------------------------------
# MCC-conditional is_recurring probability.
# ---------------------------------------------------------------------------
MCC_RECURRING_RATE: dict[tuple[int, int], float] = {
(0, 3): 0.20, # transit passes
(10, 14): 0.05, # travel: rarely recurring
(20, 29): 0.12, # online retail: occasional subscriptions
(30, 39): 0.03, # restaurants: very rarely recurring
(40, 49): 0.02, # luxury/fashion: almost never
(50, 54): 0.15, # office supplies: recurring orders
(60, 64): 0.08, # grocery: delivery subscriptions
(65, 69): 0.25, # fitness/health: gym memberships
(70, 74): 0.05, # fast food: rarely
(75, 79): 0.60, # streaming: almost always recurring
(80, 84): 0.70, # SaaS/subscriptions: by definition
(85, 94): 0.05, # auto/home: rarely
(95, 99): 0.10, # education/gov: tuition installments
}
# ---------------------------------------------------------------------------
# Customer tenure -> card_product correlation.
# New customers (low tenure bucket) skew toward basic cards.
# Established customers (high tenure bucket) skew toward premium/rewards.
# Format: tenure_bucket_threshold -> card_product weights (10 products).
# ---------------------------------------------------------------------------
TENURE_CARD_PRODUCT: list[tuple[int, list[float]]] = [
(2, [6, 3, 1, 0, 0, 0, 0, 2, 1, 0]), # 0-2: new -> basic/prepaid/virtual
(5, [3, 4, 3, 1, 0, 1, 1, 1, 1, 0]), # 3-5: moderate -> basic/rewards
(7, [2, 3, 4, 2, 1, 1, 2, 0, 1, 1]), # 6-7: established -> rewards/business
(99, [1, 2, 3, 3, 2, 1, 2, 0, 0, 1]), # 8+: long-tenure -> premium/platinum
]
# ---------------------------------------------------------------------------
# Merchant catalog
# ---------------------------------------------------------------------------
@dataclass
class MerchantCatalog:
"""10K merchants with MCC, popularity, and characteristic amount range."""
merchant_ids: np.ndarray # (num_merchants,) int32
mcc_assignments: np.ndarray # (num_merchants,) int32
popularity: np.ndarray # (num_merchants,) float64, sums to 1
amount_mean: np.ndarray # (num_merchants,) float64, amount bucket mean
amount_std: np.ndarray # (num_merchants,) float64, amount bucket std
mcc_to_merchants: dict[int, np.ndarray] # MCC -> array of merchant indices
@staticmethod
def build(
num_merchants: int,
num_mccs: int,
rng: np.random.Generator,
zipf_exponent: float = 1.2,
) -> MerchantCatalog:
"""Build a merchant catalog with MCC assignments, Zipf popularity,
and per-merchant amount distributions.
Each merchant gets a characteristic amount (mean, std) derived from
its MCC's range with per-merchant variation. Std is kept tight (4-8
buckets) so the model can learn merchant -> amount.
"""
merchant_ids = np.arange(num_merchants, dtype=np.int32)
mcc_merchant_counts = _sample_mcc_merchant_counts(
num_merchants, num_mccs, rng,
)
mcc_assignments = np.zeros(num_merchants, dtype=np.int32)
offset = 0
for mcc, count in enumerate(mcc_merchant_counts):
mcc_assignments[offset : offset + count] = mcc
offset += count
rng.shuffle(mcc_assignments)
raw_pop = 1.0 / np.arange(1, num_merchants + 1, dtype=np.float64) ** zipf_exponent
rng.shuffle(raw_pop)
popularity = raw_pop / raw_pop.sum()
amount_mean = np.zeros(num_merchants, dtype=np.float64)
amount_std = np.zeros(num_merchants, dtype=np.float64)
for i in range(num_merchants):
mcc_mean, mcc_std = _mcc_to_amount_params(int(mcc_assignments[i]))
amount_mean[i] = np.clip(
mcc_mean + rng.normal(0, mcc_std * 0.3), 1.0, 254.0,
)
amount_std[i] = np.clip(rng.uniform(3.0, 7.0), 2.0, 10.0)
mcc_to_merchants: dict[int, np.ndarray] = {}
for mcc in range(num_mccs):
mask = mcc_assignments == mcc
mcc_to_merchants[mcc] = np.where(mask)[0].astype(np.int32)
return MerchantCatalog(
merchant_ids=merchant_ids,
mcc_assignments=mcc_assignments,
popularity=popularity,
amount_mean=amount_mean,
amount_std=amount_std,
mcc_to_merchants=mcc_to_merchants,
)
def sample_merchants_for_mccs(
self,
mcc_values: np.ndarray,
rng: np.random.Generator,
) -> np.ndarray:
"""Given MCC values (any shape), return merchant IDs from that MCC.
Falls back to popularity-weighted global sampling when an MCC has
no assigned merchants (shouldn't happen with proper catalog).
"""
flat_mccs = mcc_values.ravel()
result = np.zeros(len(flat_mccs), dtype=np.int32)
unique_mccs = np.unique(flat_mccs)
for mcc in unique_mccs:
positions = np.where(flat_mccs == mcc)[0]
merchants = self.mcc_to_merchants.get(int(mcc))
if merchants is None or len(merchants) == 0:
result[positions] = rng.integers(0, len(self.merchant_ids), size=len(positions))
continue
pop = self.popularity[merchants]
pop = pop / pop.sum()
chosen = rng.choice(merchants, size=len(positions), p=pop)
result[positions] = chosen
return result.reshape(mcc_values.shape)
def _sample_mcc_merchant_counts(
num_merchants: int, num_mccs: int, rng: np.random.Generator,
) -> np.ndarray:
"""Distribute merchants across MCCs with realistic category sizes.
Restaurants/retail get more merchants (fragmented industries).
Airlines/utilities get fewer (concentrated industries).
"""
base_weights = np.ones(num_mccs, dtype=np.float64)
category_weights = {
(0, 3): 0.6, # transit/coffee: moderate
(4, 9): 0.8,
(10, 14): 0.3, # airlines/hotels: concentrated
(15, 19): 0.5,
(20, 24): 1.5, # online retail: many merchants
(25, 29): 1.0,
(30, 34): 2.0, # restaurants: very fragmented
(35, 39): 1.2,
(40, 44): 0.5, # luxury/dept: fewer
(45, 49): 1.3, # fashion: many
(50, 54): 0.8,
(55, 59): 0.7,
(60, 64): 1.0, # grocery: moderate
(65, 69): 0.8,
(70, 74): 1.5, # fast food: many
(75, 79): 0.6,
(80, 84): 0.5, # subscriptions: concentrated
(85, 89): 0.7,
(90, 94): 0.9,
(95, 99): 0.4, # education/gov: concentrated
}
for (lo, hi), w in category_weights.items():
lo_clamped = min(lo, num_mccs - 1)
hi_clamped = min(hi, num_mccs - 1)
base_weights[lo_clamped : hi_clamped + 1] = w
noise = rng.uniform(0.8, 1.2, size=num_mccs)
weights = base_weights * noise
weights /= weights.sum()
counts = (weights * num_merchants).astype(np.int32)
remainder = num_merchants - counts.sum()
if remainder > 0:
top_idx = np.argsort(weights)[-remainder:]
counts[top_idx] += 1
elif remainder < 0:
top_idx = np.argsort(counts)[remainder:]
counts[top_idx] -= 1
assert counts.sum() == num_merchants
return counts
# ---------------------------------------------------------------------------
# Generated dataset
# ---------------------------------------------------------------------------
@dataclass
class GeneratedDataset:
"""Output of the generation pipeline."""
token_ids: np.ndarray
sequence_labels: np.ndarray
transaction_labels: np.ndarray
amount_range_labels: np.ndarray
split_indices: dict[str, np.ndarray]
fingerprint: str
# ---------------------------------------------------------------------------
# Generator
# ---------------------------------------------------------------------------
class DataGenerator:
"""Enhanced data generator with merchant catalog and correlated features."""
def __init__(
self,
schema_path: str | Path = "data/schema.yaml",
profiles_path: str | Path = "data/profiles.yaml",
fraud_path: str | Path = "data/fraud_patterns.yaml",
null_rates_path: str | Path = "data/null_rates.yaml",
seed: int = 42,
blend_fraction: float = 0.25,
) -> None:
self.schema = load_schema(schema_path)
self._profiles_raw = self._load_yaml(profiles_path)
self._fraud_raw = self._load_yaml(fraud_path)
self._null_raw = self._load_yaml(null_rates_path)
self._config_paths = [
Path(p) for p in [schema_path, profiles_path, fraud_path, null_rates_path]
]
self.rng = np.random.default_rng(seed)
self._seed = seed
self._blend_fraction = blend_fraction
em = self.schema.get_feature("entry_mode")
self._entry_mode_name_to_val: dict[str, int] = {}
if em.values:
self._entry_mode_name_to_val = {v: k for k, v in em.values.items()}
n_merchants = self.schema.get_feature("merchant_id").num_values
n_mccs = self.schema.get_feature("mcc").num_values
self.catalog = MerchantCatalog.build(n_merchants, n_mccs, self.rng)
logger.info(
"Built merchant catalog: %d merchants across %d MCCs",
n_merchants, n_mccs,
)
@staticmethod
def _load_yaml(path: str | Path) -> dict[str, Any]:
with open(path) as fh:
return yaml.safe_load(fh)
# --- Distribution sampling ---
def _sample_feature(
self, config: dict[str, Any], num_values: int, shape: tuple[int, int],
) -> np.ndarray:
dist = config["distribution"]
if dist == "uniform":
return self.rng.integers(0, num_values, size=shape, dtype=np.int32)
if dist == "weights":
w = np.array(config["weights"], dtype=np.float64)
w /= w.sum()
return self.rng.choice(len(w), size=shape, p=w).astype(np.int32)
if dist == "peaks":
w = np.ones(num_values, dtype=np.float64)
for p in config["peaks"]:
if 0 <= p < num_values:
w[p] = config["peak_weight"]
w /= w.sum()
return self.rng.choice(num_values, size=shape, p=w).astype(np.int32)
if dist == "normal":
mean = float(config["mean_bucket"])
std = max(float(config["std_bucket"]), 0.1)
vals = self.rng.normal(mean, std, size=shape)
return np.clip(np.round(vals), 0, num_values - 1).astype(np.int32)
if dist == "bernoulli":
return (self.rng.random(size=shape) < config["p_true"]).astype(np.int32)
if dist == "concentrated":
return self._sample_concentrated(config, num_values, shape)
raise ValueError(f"Unknown distribution type: {dist}")
def _sample_concentrated(
self, config: dict[str, Any], num_values: int, shape: tuple[int, int],
) -> np.ndarray:
"""Sample from a per-customer concentrated distribution with Zipf skew.
Each customer has a fixed set of preferred values. Within the preferred
set, frequency follows Zipf: the top item gets ~30% of traffic, the
second ~18%, etc. This creates a learnable frequency ranking the model
can exploit from sequence history.
"""
top_n = min(config["top_n"], num_values)
concentration = config["concentration"]
zipf_exp = config.get("zipf_exponent", 1.0)
n_seq, n_tx = shape
preferred = np.stack([
self.rng.choice(num_values, size=top_n, replace=False)
for _ in range(n_seq)
])
zipf_weights = 1.0 / np.arange(1, top_n + 1, dtype=np.float64) ** zipf_exp
zipf_weights /= zipf_weights.sum()
use_preferred = self.rng.random(shape) < concentration
pref_idx = self.rng.choice(top_n, size=shape, p=zipf_weights).astype(np.int32)
pref_values = preferred[np.arange(n_seq)[:, None], pref_idx]
unif_values = self.rng.integers(0, num_values, size=shape, dtype=np.int32)
return np.where(use_preferred, pref_values, unif_values).astype(np.int32)
# --- Core generation ---
def generate(self, num_sequences: int = 100_000) -> GeneratedDataset:
"""Generate the full dataset with correlated features."""
N = num_sequences
T = self.schema.num_transactions
F = self.schema.num_features
raw_values = np.zeros((N, T, F), dtype=np.int32)
profiles = self._profiles_raw["profiles"]
profile_weights = np.array([p["weight"] for p in profiles])
profile_weights /= profile_weights.sum()
primary_indices = self.rng.choice(len(profiles), size=N, p=profile_weights)
secondary_indices = self.rng.choice(len(profiles), size=N, p=profile_weights)
blend_mask = self.rng.random((N, T)) < self._blend_fraction
mcc_idx = self.schema.feature_index("mcc")
merchant_idx = self.schema.feature_index("merchant_id")
amount_idx = self.schema.feature_index("amount")
entry_mode_idx = self.schema.feature_index("entry_mode")
for prof_idx, profile in enumerate(profiles):
primary_mask = primary_indices == prof_idx
n_primary = int(primary_mask.sum())
if n_primary == 0:
continue
logger.debug(
"Profile '%s': %d primary sequences", profile["name"], n_primary,
)
for feat_idx, feature in enumerate(self.schema.features):
feat_dist = profile["features"][feature.name]
values = self._sample_feature(feat_dist, feature.num_values, (n_primary, T))
raw_values[primary_mask, :, feat_idx] = values
for prof_idx, profile in enumerate(profiles):
secondary_mask = secondary_indices == prof_idx
n_secondary = int(secondary_mask.sum())
if n_secondary == 0:
continue
seq_indices = np.where(secondary_mask)[0]
for feat_idx, feature in enumerate(self.schema.features):
feat_dist = profile["features"][feature.name]
alt_values = self._sample_feature(
feat_dist, feature.num_values, (n_secondary, T),
)
for local_i, seq_i in enumerate(seq_indices):
tx_blend = blend_mask[seq_i]
raw_values[seq_i, tx_blend, feat_idx] = alt_values[local_i, tx_blend]
logger.info("Base features sampled with profile blending (%.0f%%)", self._blend_fraction * 100)
# --- Per-customer preferred merchant set + MCC-aligned assignment ---
mcc_values = raw_values[:, :, mcc_idx]
merch_params = np.zeros((N, 3), dtype=np.float64)
for i in range(N):
prof = profiles[primary_indices[i]]
md = prof["features"]["merchant_id"]
merch_params[i, 0] = md.get("top_n", 8)
merch_params[i, 1] = md.get("concentration", 0.88)
merch_params[i, 2] = md.get("zipf_exponent", 1.0)
self._assign_customer_merchants(
raw_values, mcc_values, merchant_idx, merch_params,
)
# --- Per-merchant amount distributions (tight std 4-7 buckets) ---
self._apply_merchant_amounts(raw_values, merchant_idx, amount_idx)
# --- MCC-conditional entry_mode (50% blend with profile) ---
self._apply_mcc_conditional_entry_mode(raw_values, mcc_values, entry_mode_idx)
# --- MCC-conditional hour-of-day ---
hour_idx = self.schema.feature_index("hour")
self._apply_mcc_conditional_hour(raw_values, mcc_values, hour_idx)
# --- MCC-conditional is_recurring ---
recurring_idx = self.schema.feature_index("is_recurring")
self._apply_mcc_conditional_recurring(raw_values, mcc_values, recurring_idx)
# --- Customer_tenure -> card_product correlation ---
tenure_idx = self.schema.feature_index("customer_tenure")
card_idx = self.schema.feature_index("card_product")
self._apply_tenure_card_correlation(raw_values, tenure_idx, card_idx)
# --- Country -> amount boost for international ---
country_idx = self.schema.feature_index("country")
self._apply_international_amount_boost(raw_values, country_idx, amount_idx)
# --- Temporal autocorrelation (MCC and merchant repeat) ---
self._apply_temporal_autocorrelation(
raw_values, mcc_idx, merchant_idx, amount_idx,
)
logger.info(
"Applied feature correlations: merchant-amount, MCC-entry_mode, "
"MCC-hour, MCC-recurring, tenure-card, country-amount, temporal-repeat",
)
# --- Tokenize ---
token_ids = (raw_values + VALUES_START).astype(np.int16)
# --- Fraud injection ---
tx_labels = np.zeros((N, T), dtype=np.int8)
self._inject_fraud(token_ids, tx_labels, primary_indices)
# --- Amount range labels (derived after fraud, before NULL) ---
amount_buckets = token_ids[:, :, amount_idx].astype(np.int32) - VALUES_START
amount_range_labels = amount_bucket_to_range(amount_buckets).astype(np.int8)
# --- NULL injection ---
self._apply_nulls(token_ids)
seq_labels = tx_labels.any(axis=1).astype(np.int8)
fraud_rate = seq_labels.mean()
logger.info("Generated %d sequences, fraud rate: %.3f", N, fraud_rate)
splits = self._compute_splits(N)
tokenizer = self._build_tokenizer()
fingerprint = self._compute_fingerprint(tokenizer, splits)
return GeneratedDataset(
token_ids=token_ids,
sequence_labels=seq_labels,
transaction_labels=tx_labels,
amount_range_labels=amount_range_labels,
split_indices=splits,
fingerprint=fingerprint,
)
def _assign_customer_merchants(
self,
raw_values: np.ndarray,
mcc_values: np.ndarray,
merchant_idx: int,
per_customer_params: np.ndarray,
) -> None:
"""Assign per-customer preferred merchant sets aligned with their MCCs.
Each customer gets a profile-specific number of preferred merchants
drawn from their most-visited MCCs. Concentration and Zipf exponent
also come from the customer's profile, so a retiree (8 merchants,
0.92 concentration) behaves differently from a digital nomad
(18 merchants, 0.72 concentration).
Args:
per_customer_params: (N, 3) array of [preferred_count, concentration, zipf_exp]
"""
N, T, _ = raw_values.shape
mcc_idx_col = self.schema.feature_index("mcc")
for i in range(N):
pref_count = int(per_customer_params[i, 0])
concentration = float(per_customer_params[i, 1])
zipf_exp = float(per_customer_params[i, 2])
zipf_w = 1.0 / np.arange(1, pref_count + 1, dtype=np.float64) ** zipf_exp
zipf_w /= zipf_w.sum()
customer_mccs = mcc_values[i]
mcc_counts = np.bincount(customer_mccs[customer_mccs >= 0], minlength=100)
top_mccs = np.argsort(mcc_counts)[-pref_count:][::-1]
top_mccs = top_mccs[mcc_counts[top_mccs] > 0]
preferred_merchants: list[int] = []
for mcc_val in top_mccs:
candidates = self.catalog.mcc_to_merchants.get(int(mcc_val), np.array([]))
if len(candidates) == 0:
continue
pop = self.catalog.popularity[candidates]
pop = pop / pop.sum()
n_from_mcc = max(1, pref_count // len(top_mccs))
n_from_mcc = min(n_from_mcc, len(candidates))
chosen = self.rng.choice(candidates, size=n_from_mcc, replace=False, p=pop)
preferred_merchants.extend(chosen.tolist())
if len(preferred_merchants) == 0:
raw_values[i, :, merchant_idx] = self.catalog.sample_merchants_for_mccs(
customer_mccs, self.rng,
)
continue
pref_arr = np.array(preferred_merchants[:pref_count], dtype=np.int32)
n_pref = len(pref_arr)
pref_mccs = self.catalog.mcc_assignments[pref_arr]
pref_zipf = zipf_w[:n_pref].copy()
pref_zipf /= pref_zipf.sum()
for t in range(T):
tx_mcc = int(customer_mccs[t])
if self.rng.random() < concentration:
same_mcc_mask = pref_mccs == tx_mcc
if same_mcc_mask.any():
candidates_idx = np.where(same_mcc_mask)[0]
w = pref_zipf[candidates_idx]
w /= w.sum()
chosen = self.rng.choice(candidates_idx, p=w)
raw_values[i, t, merchant_idx] = pref_arr[chosen]
else:
chosen = self.rng.choice(n_pref, p=pref_zipf)
raw_values[i, t, merchant_idx] = pref_arr[chosen]
raw_values[i, t, mcc_idx_col] = int(pref_mccs[chosen])
else:
candidates = self.catalog.mcc_to_merchants.get(tx_mcc, np.array([]))
if len(candidates) > 0:
pop = self.catalog.popularity[candidates]
pop = pop / pop.sum()
raw_values[i, t, merchant_idx] = self.rng.choice(candidates, p=pop)
else:
raw_values[i, t, merchant_idx] = self.rng.integers(0, len(self.catalog.merchant_ids))
def _apply_merchant_amounts(
self,
raw_values: np.ndarray,
merchant_idx: int,
amount_idx: int,
) -> None:
"""Sample amounts from per-merchant distributions (tight std 4-7 buckets).
Each merchant has a characteristic (mean, std) derived from its MCC
category with per-merchant variation. This creates a strong
merchant->amount signal the model can learn.
"""
N, T, _ = raw_values.shape
num_amount = self.schema.features[amount_idx].num_values
flat_merchants = raw_values[:, :, merchant_idx].ravel()
means = self.catalog.amount_mean[flat_merchants]
stds = self.catalog.amount_std[flat_merchants]
vals = self.rng.normal(means, stds)
vals = np.clip(np.round(vals), 0, num_amount - 1).astype(np.int32)
raw_values[:, :, amount_idx] = vals.reshape(N, T)
def _apply_mcc_conditional_entry_mode(
self,
raw_values: np.ndarray,
mcc_values: np.ndarray,
entry_mode_idx: int,
) -> None:
"""Blend MCC-conditional entry_mode with profile-sampled values (50/50)."""
N, T, _ = raw_values.shape
num_em = self.schema.features[entry_mode_idx].num_values
use_mcc = self.rng.random((N, T)) < 0.5
for (lo, hi), weights in MCC_ENTRY_MODE.items():
mask = (mcc_values >= lo) & (mcc_values <= hi) & use_mcc
n_matching = int(mask.sum())
if n_matching == 0:
continue
w = np.array(weights, dtype=np.float64)
w /= w.sum()
vals = self.rng.choice(len(w), size=n_matching, p=w).astype(np.int32)
raw_values[:, :, entry_mode_idx][mask] = vals
def _apply_mcc_conditional_hour(
self,
raw_values: np.ndarray,
mcc_values: np.ndarray,
hour_idx: int,
) -> None:
"""Blend MCC-conditional hour peaks with profile-sampled hours (50/50)."""
N, T, _ = raw_values.shape
use_mcc = self.rng.random((N, T)) < 0.5
for (lo, hi), (peak_hours, peak_weight) in MCC_HOUR_PEAKS.items():
mask = (mcc_values >= lo) & (mcc_values <= hi) & use_mcc
n_matching = int(mask.sum())
if n_matching == 0:
continue
w = np.ones(24, dtype=np.float64)
for h in peak_hours:
w[h] = peak_weight
w /= w.sum()
vals = self.rng.choice(24, size=n_matching, p=w).astype(np.int32)
raw_values[:, :, hour_idx][mask] = vals
def _apply_mcc_conditional_recurring(
self,
raw_values: np.ndarray,
mcc_values: np.ndarray,
recurring_idx: int,
) -> None:
"""Override is_recurring based on MCC category (70% weight)."""
N, T, _ = raw_values.shape
use_mcc = self.rng.random((N, T)) < 0.7
for (lo, hi), rate in MCC_RECURRING_RATE.items():
mask = (mcc_values >= lo) & (mcc_values <= hi) & use_mcc
n_matching = int(mask.sum())
if n_matching == 0:
continue
vals = (self.rng.random(n_matching) < rate).astype(np.int32)
raw_values[:, :, recurring_idx][mask] = vals
def _apply_tenure_card_correlation(
self,
raw_values: np.ndarray,
tenure_idx: int,
card_idx: int,
) -> None:
"""Correlate card_product with customer_tenure (60% blend)."""
N, T, _ = raw_values.shape
tenure_values = raw_values[:, :, tenure_idx]
use_corr = self.rng.random((N, T)) < 0.6
num_cards = self.schema.features[card_idx].num_values
for threshold, weights in TENURE_CARD_PRODUCT:
prev_threshold = 0
for prev_t, _ in TENURE_CARD_PRODUCT:
if prev_t < threshold:
prev_threshold = prev_t + 1
break
if threshold == TENURE_CARD_PRODUCT[0][0]:
mask = (tenure_values <= threshold) & use_corr
else:
lower = 0
for i, (t, _) in enumerate(TENURE_CARD_PRODUCT):
if t == threshold and i > 0:
lower = TENURE_CARD_PRODUCT[i - 1][0] + 1
break
mask = (tenure_values >= lower) & (tenure_values <= threshold) & use_corr
n_matching = int(mask.sum())
if n_matching == 0:
continue
w = np.array(weights[:num_cards], dtype=np.float64)
w /= w.sum()
vals = self.rng.choice(num_cards, size=n_matching, p=w).astype(np.int32)
raw_values[:, :, card_idx][mask] = vals
def _apply_international_amount_boost(
self,
raw_values: np.ndarray,
country_idx: int,
amount_idx: int,
) -> None:
"""International transactions (country > 0) get a 30% amount boost."""
country_values = raw_values[:, :, country_idx]
num_amount = self.schema.features[amount_idx].num_values
international = country_values > 0
n_intl = int(international.sum())
if n_intl == 0:
return
current = raw_values[:, :, amount_idx][international].astype(np.float64)
boosted = current * 1.3 + self.rng.normal(0, 5, size=n_intl)
boosted = np.clip(np.round(boosted), 0, num_amount - 1).astype(np.int32)
raw_values[:, :, amount_idx][international] = boosted
def _apply_temporal_autocorrelation(
self,
raw_values: np.ndarray,
mcc_idx: int,
merchant_idx: int,
amount_idx: int,
) -> None:
"""Apply Markov-style temporal dependencies within each sequence.
For each transaction t > 0:
- P(repeat merchant+MCC from t-1) = 0.20 (same store again)
- P(repeat MCC only from t-1) = 0.10 (same category, maybe same store)
- If repeating, amount stays close to previous (std 3 buckets)
"""
N, T, _ = raw_values.shape
num_amount = self.schema.features[amount_idx].num_values
for t in range(1, T):
merchant_repeat = self.rng.random(N) < 0.20
raw_values[merchant_repeat, t, merchant_idx] = raw_values[merchant_repeat, t - 1, merchant_idx]
raw_values[merchant_repeat, t, mcc_idx] = raw_values[merchant_repeat, t - 1, mcc_idx]
mcc_only = (self.rng.random(N) < 0.10) & ~merchant_repeat
raw_values[mcc_only, t, mcc_idx] = raw_values[mcc_only, t - 1, mcc_idx]
repeat_mask = merchant_repeat | mcc_only
n_repeating = int(repeat_mask.sum())
if n_repeating > 0:
prev_amt = raw_values[repeat_mask, t - 1, amount_idx].astype(np.float64)
new_amt = prev_amt + self.rng.normal(0, 3.0, size=n_repeating)
new_amt = np.clip(np.round(new_amt), 0, num_amount - 1).astype(np.int32)
raw_values[repeat_mask, t, amount_idx] = new_amt
# --- Fraud injection ---
def _inject_fraud(
self,
token_ids: np.ndarray,
tx_labels: np.ndarray,
profile_indices: np.ndarray,
) -> None:
N = token_ids.shape[0]
fraud_rate = self._fraud_raw["overall_fraud_rate"]
patterns = self._fraud_raw["patterns"]
pattern_weights = np.array([p["weight"] for p in patterns])
pattern_weights /= pattern_weights.sum()
fraud_seq_mask = self.rng.random(N) < fraud_rate
fraud_indices = np.where(fraud_seq_mask)[0]
if len(fraud_indices) == 0:
return
pattern_assignments = self.rng.choice(
len(patterns), size=len(fraud_indices), p=pattern_weights,
)
for i, seq_idx in enumerate(fraud_indices):
pattern = patterns[pattern_assignments[i]]
self._inject_single_fraud(
token_ids[seq_idx], tx_labels[seq_idx], pattern,
)
def _inject_single_fraud(
self,
seq_tokens: np.ndarray,
seq_tx_labels: np.ndarray,
pattern: dict[str, Any],
) -> None:
T = seq_tokens.shape[0]
inj = pattern["injection"]
lo, hi = inj["num_transactions"]
num_fraud_tx = self.rng.integers(lo, hi + 1)
num_fraud_tx = min(num_fraud_tx, T)
if inj["position"] == "late":
max_start = max(T // 2, T - num_fraud_tx)
start = self.rng.integers(T // 2, max_start + 1)
else:
start = self.rng.integers(0, max(T - num_fraud_tx + 1, 1))
end = min(start + num_fraud_tx, T)
seq_tx_labels[start:end] = 1
overrides = pattern.get("feature_overrides", {})
blend = overrides.pop("blend_with_profile", False) if "blend_with_profile" in overrides else False
span_len = end - start
for feat_name, feat_dist in overrides.items():
feat_idx = self.schema.feature_index(feat_name)
feat = self.schema.features[feat_idx]
fraud_values = self._sample_feature(feat_dist, feat.num_values, (1, span_len))
fraud_tokens = (fraud_values[0] + VALUES_START).astype(np.int16)
if blend:
keep_mask = self.rng.random(span_len) < 0.5
fraud_tokens[keep_mask] = seq_tokens[start:end, feat_idx][keep_mask]
seq_tokens[start:end, feat_idx] = fraud_tokens
# --- NULL injection ---
def _apply_nulls(self, token_ids: np.ndarray) -> None:
N, T, _ = token_ids.shape
null_rates = self._null_raw["null_rates"]
entry_mode_idx = self.schema.feature_index("entry_mode")
entry_mode_vals = token_ids[:, :, entry_mode_idx].astype(np.int32) - VALUES_START
for feat_idx, feature in enumerate(self.schema.features):
rate_config = null_rates.get(feature.name, 0.0)
if isinstance(rate_config, (int, float)):
if rate_config > 0:
null_mask = self.rng.random((N, T)) < rate_config
token_ids[:, :, feat_idx][null_mask] = NULL_TOKEN
elif isinstance(rate_config, dict):
null_mask = self.rng.random((N, T)) < rate_config.get("default", 0.0)
for cond in rate_config.get("conditional", []):
when_modes = cond["when"]["entry_mode"]
if isinstance(when_modes, str):
when_modes = [when_modes]
mode_vals = [self._entry_mode_name_to_val[m] for m in when_modes]
cond_positions = np.isin(entry_mode_vals, mode_vals)
cond_rand = self.rng.random((N, T))
null_mask[cond_positions] = cond_rand[cond_positions] < cond["rate"]
token_ids[:, :, feat_idx][null_mask] = NULL_TOKEN
# --- Splits and fingerprint ---
def _compute_splits(self, num_sequences: int) -> dict[str, np.ndarray]:
perm = self.rng.permutation(num_sequences)
n_test = int(num_sequences * 0.10)
n_val = int(num_sequences * 0.05)
test_idx = np.sort(perm[:n_test])
val_idx = np.sort(perm[n_test : n_test + n_val])
train_idx = np.sort(perm[n_test + n_val :])
assert len(test_idx) + len(val_idx) + len(train_idx) == num_sequences
return {"train": train_idx, "val": val_idx, "test": test_idx}
def _build_tokenizer(self) -> TransactionTokenizer:
tokenizer = TransactionTokenizer(self.schema)
for feature in self.schema.features:
if feature.type == "bucketed":
tokenizer.get_feature_tokenizer(feature.name).fit_uniform_from_range()
return tokenizer
def _compute_fingerprint(
self, tokenizer: TransactionTokenizer, splits: dict[str, np.ndarray],
) -> str:
hasher = hashlib.sha256()
for path in sorted(self._config_paths, key=str):
with open(path, "rb") as fh:
hasher.update(fh.read())
state_bytes = json.dumps(tokenizer.get_state(), sort_keys=True).encode("utf-8")
hasher.update(state_bytes)
for key in sorted(splits.keys()):
hasher.update(splits[key].tobytes())
hasher.update(str(self._seed).encode("utf-8"))
return hasher.hexdigest()
# --- Persistence ---
def save_dataset(
self, dataset: GeneratedDataset, output_dir: str | Path = "data/synthetic",
) -> None:
out = Path(output_dir)
out.mkdir(parents=True, exist_ok=True)
np.save(out / "token_ids.npy", dataset.token_ids)
np.save(out / "sequence_labels.npy", dataset.sequence_labels)
np.save(out / "transaction_labels.npy", dataset.transaction_labels)
np.save(out / "amount_range_labels.npy", dataset.amount_range_labels)
np.savez(
out / "split_indices.npz",
train=dataset.split_indices["train"],
val=dataset.split_indices["val"],
test=dataset.split_indices["test"],
)
tokenizer = self._build_tokenizer()
tokenizer.save_state(out / "tokenizer_state.json")
with open(out / "fingerprint.txt", "w") as fh:
fh.write(dataset.fingerprint + "\n")
token_mb = dataset.token_ids.nbytes / 1024 / 1024
logger.info(
"Saved dataset to %s: token_ids=%.1f MB, fingerprint=%s",
out, token_mb, dataset.fingerprint[:16],
)