"""Synthetic transaction data generator. Design features: 1. Merchant catalog: 10K merchants with fixed MCC assignments and Zipf popularity. Creates realistic merchant-MCC correlation the model can learn. 2. MCC-conditional amounts: transaction amounts are sampled from MCC-specific distributions. Restaurants produce $15-80, electronics $200-2000, etc. 3. Profile blending: each customer is a mixture of primary + secondary profile, not a pure archetype. Continuous behavioral diversity. 4. MCC-conditional entry_mode: online merchants -> CNP, restaurants -> tap/chip. 5. MCC-conditional hour: restaurants peak at lunch/dinner, transit at commute. 6. MCC-conditional is_recurring: subscriptions/streaming are high, restaurants low. 7. Customer_tenure-card_product correlation: new customers get basic cards. 8. Country-amount correlation: international transactions tend higher value. Output format: token_ids.npy (N,T,F), sequence_labels.npy, transaction_labels.npy, split_indices.npz, fingerprint.txt. Drives the training pipeline via schema.yaml and model.yaml. """ from __future__ import annotations import hashlib import json import logging from dataclasses import dataclass from pathlib import Path from typing import Any import numpy as np import yaml from src.data.schema import SchemaConfig, load_schema, NULL_TOKEN, VALUES_START from src.data.tokenizer import TransactionTokenizer logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Amount range bins: 16 coarse ranges over the 256 fine-grained amount buckets. # Each range covers 16 consecutive buckets. Used as the prediction target for # the amount_range fine-tuning head (replaces the 259-class amount head). # Dollar labels are approximate (quantile buckets are non-linear). # --------------------------------------------------------------------------- NUM_AMOUNT_RANGES: int = 16 AMOUNT_RANGE_WIDTH: int = 16 # 256 buckets / 16 ranges AMOUNT_RANGE_LABELS: dict[int, str] = { 0: "$0-5", 1: "$5-15", 2: "$15-30", 3: "$30-50", 4: "$50-80", 5: "$80-120", 6: "$120-175", 7: "$175-250", 8: "$250-375", 9: "$375-550", 10: "$550-800", 11: "$800-1.2K", 12: "$1.2K-2K", 13: "$2K-3.5K", 14: "$3.5K-7.5K", 15: "$7.5K+", } def amount_bucket_to_range(bucket: np.ndarray) -> np.ndarray: """Map 256 fine-grained amount buckets (0-255) to 16 coarse ranges (0-15).""" return np.clip(bucket // AMOUNT_RANGE_WIDTH, 0, NUM_AMOUNT_RANGES - 1) # --------------------------------------------------------------------------- # MCC-conditional amount distributions (mean_bucket, std_bucket). # 256 amount buckets spanning $0.01-$25K in quantile space. These bucket-index # ranges are calibrated for behavioral plausibility across MCC categories. # --------------------------------------------------------------------------- MCC_AMOUNT_MAP: dict[tuple[int, int], tuple[float, float]] = { (0, 3): (25.0, 12.0), # transit, coffee, gas: $3-20 (4, 9): (45.0, 20.0), # convenience, misc low: $8-50 (10, 14): (160.0, 55.0), # airlines, hotels, car rental: $80-800 (15, 19): (50.0, 30.0), # other transport: $10-100 (20, 24): (80.0, 45.0), # online retail general: $15-200 (25, 29): (55.0, 30.0), # online retail niche: $10-120 (30, 34): (55.0, 25.0), # restaurants, bars: $15-100 (35, 39): (40.0, 20.0), # other food service: $8-60 (40, 44): (185.0, 55.0), # luxury, dept stores, jewelry: $80-2000 (45, 49): (70.0, 35.0), # fashion retail: $20-200 (50, 54): (110.0, 50.0), # office supplies, wholesale: $30-500 (55, 59): (90.0, 50.0), # professional services: $30-400 (60, 64): (50.0, 22.0), # grocery, pharmacy: $12-120 (65, 69): (55.0, 30.0), # healthcare, fitness: $15-150 (70, 74): (22.0, 12.0), # fast food, entertainment: $4-25 (75, 79): (35.0, 20.0), # streaming, digital goods: $5-50 (80, 84): (30.0, 22.0), # subscriptions, SaaS: $5-80 (85, 89): (100.0, 55.0), # automotive: $25-500 (90, 94): (95.0, 55.0), # home improvement: $20-600 (95, 99): (75.0, 50.0), # education, government: $15-500 } def _mcc_to_amount_params(mcc_value: int) -> tuple[float, float]: """Look up amount distribution params for an MCC value.""" for (lo, hi), params in MCC_AMOUNT_MAP.items(): if lo <= mcc_value <= hi: return params return (60.0, 40.0) # --------------------------------------------------------------------------- # MCC-conditional entry_mode weights. # Indices: 0=card_present, 1=card_not_present, 2=contactless, 3=chip, 4=manual # --------------------------------------------------------------------------- MCC_ENTRY_MODE: dict[tuple[int, int], list[float]] = { (0, 3): [2, 1, 5, 2, 0], # transit/coffee: contactless heavy (10, 14): [1, 6, 1, 1, 0], # travel: mostly online bookings (20, 29): [0, 8, 0, 0, 1], # online retail: card-not-present (30, 39): [3, 0, 4, 3, 0], # restaurants: contactless/chip (40, 49): [3, 2, 3, 2, 0], # fashion/luxury: mixed (50, 59): [2, 4, 1, 1, 1], # business: mix of online + in-person (60, 69): [3, 1, 4, 3, 0], # grocery/pharmacy: in-person (70, 74): [2, 1, 5, 2, 0], # fast food: contactless (75, 84): [0, 9, 0, 0, 0], # digital/subscriptions: all online (85, 99): [3, 2, 2, 3, 0], # auto/home/edu: mixed } def _mcc_to_entry_mode_weights(mcc_value: int) -> list[float]: """Look up entry_mode distribution for an MCC value.""" for (lo, hi), weights in MCC_ENTRY_MODE.items(): if lo <= mcc_value <= hi: return weights return [2, 3, 2, 2, 1] # --------------------------------------------------------------------------- # MCC-conditional hour-of-day distributions. # Peak hours where a category has higher probability. Applied as a 50% blend # with the profile-sampled hour to keep per-customer variation. # Format: (peak_hours, peak_weight) -- applied on a uniform-24 base. # --------------------------------------------------------------------------- MCC_HOUR_PEAKS: dict[tuple[int, int], tuple[list[int], float]] = { (0, 3): ([7, 8, 9, 17, 18], 5.0), # transit/coffee: commute hours (10, 14): ([9, 10, 14, 15, 20], 2.5), # travel: business hours + evening (20, 29): ([10, 11, 20, 21, 22], 3.0), # online: late morning + evening (30, 31): ([12, 13, 18, 19, 20], 6.0), # casual dining: lunch + dinner (32, 34): ([19, 20, 21, 22, 23], 5.0), # fine dining/bars: evening + late (35, 39): ([11, 12, 13, 17, 18], 4.0), # other food: lunch + early dinner (60, 64): ([10, 11, 16, 17, 18], 4.0), # grocery/pharmacy: mid-morning + after work (70, 71): ([12, 13, 18, 19, 21], 4.5), # fast food: lunch + dinner + late (75, 84): ([0, 1, 6, 7], 2.0), # digital/subs: auto-renew any hour } # --------------------------------------------------------------------------- # MCC-conditional is_recurring probability. # --------------------------------------------------------------------------- MCC_RECURRING_RATE: dict[tuple[int, int], float] = { (0, 3): 0.20, # transit passes (10, 14): 0.05, # travel: rarely recurring (20, 29): 0.12, # online retail: occasional subscriptions (30, 39): 0.03, # restaurants: very rarely recurring (40, 49): 0.02, # luxury/fashion: almost never (50, 54): 0.15, # office supplies: recurring orders (60, 64): 0.08, # grocery: delivery subscriptions (65, 69): 0.25, # fitness/health: gym memberships (70, 74): 0.05, # fast food: rarely (75, 79): 0.60, # streaming: almost always recurring (80, 84): 0.70, # SaaS/subscriptions: by definition (85, 94): 0.05, # auto/home: rarely (95, 99): 0.10, # education/gov: tuition installments } # --------------------------------------------------------------------------- # Customer tenure -> card_product correlation. # New customers (low tenure bucket) skew toward basic cards. # Established customers (high tenure bucket) skew toward premium/rewards. # Format: tenure_bucket_threshold -> card_product weights (10 products). # --------------------------------------------------------------------------- TENURE_CARD_PRODUCT: list[tuple[int, list[float]]] = [ (2, [6, 3, 1, 0, 0, 0, 0, 2, 1, 0]), # 0-2: new -> basic/prepaid/virtual (5, [3, 4, 3, 1, 0, 1, 1, 1, 1, 0]), # 3-5: moderate -> basic/rewards (7, [2, 3, 4, 2, 1, 1, 2, 0, 1, 1]), # 6-7: established -> rewards/business (99, [1, 2, 3, 3, 2, 1, 2, 0, 0, 1]), # 8+: long-tenure -> premium/platinum ] # --------------------------------------------------------------------------- # Merchant catalog # --------------------------------------------------------------------------- @dataclass class MerchantCatalog: """10K merchants with MCC, popularity, and characteristic amount range.""" merchant_ids: np.ndarray # (num_merchants,) int32 mcc_assignments: np.ndarray # (num_merchants,) int32 popularity: np.ndarray # (num_merchants,) float64, sums to 1 amount_mean: np.ndarray # (num_merchants,) float64, amount bucket mean amount_std: np.ndarray # (num_merchants,) float64, amount bucket std mcc_to_merchants: dict[int, np.ndarray] # MCC -> array of merchant indices @staticmethod def build( num_merchants: int, num_mccs: int, rng: np.random.Generator, zipf_exponent: float = 1.2, ) -> MerchantCatalog: """Build a merchant catalog with MCC assignments, Zipf popularity, and per-merchant amount distributions. Each merchant gets a characteristic amount (mean, std) derived from its MCC's range with per-merchant variation. Std is kept tight (4-8 buckets) so the model can learn merchant -> amount. """ merchant_ids = np.arange(num_merchants, dtype=np.int32) mcc_merchant_counts = _sample_mcc_merchant_counts( num_merchants, num_mccs, rng, ) mcc_assignments = np.zeros(num_merchants, dtype=np.int32) offset = 0 for mcc, count in enumerate(mcc_merchant_counts): mcc_assignments[offset : offset + count] = mcc offset += count rng.shuffle(mcc_assignments) raw_pop = 1.0 / np.arange(1, num_merchants + 1, dtype=np.float64) ** zipf_exponent rng.shuffle(raw_pop) popularity = raw_pop / raw_pop.sum() amount_mean = np.zeros(num_merchants, dtype=np.float64) amount_std = np.zeros(num_merchants, dtype=np.float64) for i in range(num_merchants): mcc_mean, mcc_std = _mcc_to_amount_params(int(mcc_assignments[i])) amount_mean[i] = np.clip( mcc_mean + rng.normal(0, mcc_std * 0.3), 1.0, 254.0, ) amount_std[i] = np.clip(rng.uniform(3.0, 7.0), 2.0, 10.0) mcc_to_merchants: dict[int, np.ndarray] = {} for mcc in range(num_mccs): mask = mcc_assignments == mcc mcc_to_merchants[mcc] = np.where(mask)[0].astype(np.int32) return MerchantCatalog( merchant_ids=merchant_ids, mcc_assignments=mcc_assignments, popularity=popularity, amount_mean=amount_mean, amount_std=amount_std, mcc_to_merchants=mcc_to_merchants, ) def sample_merchants_for_mccs( self, mcc_values: np.ndarray, rng: np.random.Generator, ) -> np.ndarray: """Given MCC values (any shape), return merchant IDs from that MCC. Falls back to popularity-weighted global sampling when an MCC has no assigned merchants (shouldn't happen with proper catalog). """ flat_mccs = mcc_values.ravel() result = np.zeros(len(flat_mccs), dtype=np.int32) unique_mccs = np.unique(flat_mccs) for mcc in unique_mccs: positions = np.where(flat_mccs == mcc)[0] merchants = self.mcc_to_merchants.get(int(mcc)) if merchants is None or len(merchants) == 0: result[positions] = rng.integers(0, len(self.merchant_ids), size=len(positions)) continue pop = self.popularity[merchants] pop = pop / pop.sum() chosen = rng.choice(merchants, size=len(positions), p=pop) result[positions] = chosen return result.reshape(mcc_values.shape) def _sample_mcc_merchant_counts( num_merchants: int, num_mccs: int, rng: np.random.Generator, ) -> np.ndarray: """Distribute merchants across MCCs with realistic category sizes. Restaurants/retail get more merchants (fragmented industries). Airlines/utilities get fewer (concentrated industries). """ base_weights = np.ones(num_mccs, dtype=np.float64) category_weights = { (0, 3): 0.6, # transit/coffee: moderate (4, 9): 0.8, (10, 14): 0.3, # airlines/hotels: concentrated (15, 19): 0.5, (20, 24): 1.5, # online retail: many merchants (25, 29): 1.0, (30, 34): 2.0, # restaurants: very fragmented (35, 39): 1.2, (40, 44): 0.5, # luxury/dept: fewer (45, 49): 1.3, # fashion: many (50, 54): 0.8, (55, 59): 0.7, (60, 64): 1.0, # grocery: moderate (65, 69): 0.8, (70, 74): 1.5, # fast food: many (75, 79): 0.6, (80, 84): 0.5, # subscriptions: concentrated (85, 89): 0.7, (90, 94): 0.9, (95, 99): 0.4, # education/gov: concentrated } for (lo, hi), w in category_weights.items(): lo_clamped = min(lo, num_mccs - 1) hi_clamped = min(hi, num_mccs - 1) base_weights[lo_clamped : hi_clamped + 1] = w noise = rng.uniform(0.8, 1.2, size=num_mccs) weights = base_weights * noise weights /= weights.sum() counts = (weights * num_merchants).astype(np.int32) remainder = num_merchants - counts.sum() if remainder > 0: top_idx = np.argsort(weights)[-remainder:] counts[top_idx] += 1 elif remainder < 0: top_idx = np.argsort(counts)[remainder:] counts[top_idx] -= 1 assert counts.sum() == num_merchants return counts # --------------------------------------------------------------------------- # Generated dataset # --------------------------------------------------------------------------- @dataclass class GeneratedDataset: """Output of the generation pipeline.""" token_ids: np.ndarray sequence_labels: np.ndarray transaction_labels: np.ndarray amount_range_labels: np.ndarray split_indices: dict[str, np.ndarray] fingerprint: str # --------------------------------------------------------------------------- # Generator # --------------------------------------------------------------------------- class DataGenerator: """Enhanced data generator with merchant catalog and correlated features.""" def __init__( self, schema_path: str | Path = "data/schema.yaml", profiles_path: str | Path = "data/profiles.yaml", fraud_path: str | Path = "data/fraud_patterns.yaml", null_rates_path: str | Path = "data/null_rates.yaml", seed: int = 42, blend_fraction: float = 0.25, ) -> None: self.schema = load_schema(schema_path) self._profiles_raw = self._load_yaml(profiles_path) self._fraud_raw = self._load_yaml(fraud_path) self._null_raw = self._load_yaml(null_rates_path) self._config_paths = [ Path(p) for p in [schema_path, profiles_path, fraud_path, null_rates_path] ] self.rng = np.random.default_rng(seed) self._seed = seed self._blend_fraction = blend_fraction em = self.schema.get_feature("entry_mode") self._entry_mode_name_to_val: dict[str, int] = {} if em.values: self._entry_mode_name_to_val = {v: k for k, v in em.values.items()} n_merchants = self.schema.get_feature("merchant_id").num_values n_mccs = self.schema.get_feature("mcc").num_values self.catalog = MerchantCatalog.build(n_merchants, n_mccs, self.rng) logger.info( "Built merchant catalog: %d merchants across %d MCCs", n_merchants, n_mccs, ) @staticmethod def _load_yaml(path: str | Path) -> dict[str, Any]: with open(path) as fh: return yaml.safe_load(fh) # --- Distribution sampling --- def _sample_feature( self, config: dict[str, Any], num_values: int, shape: tuple[int, int], ) -> np.ndarray: dist = config["distribution"] if dist == "uniform": return self.rng.integers(0, num_values, size=shape, dtype=np.int32) if dist == "weights": w = np.array(config["weights"], dtype=np.float64) w /= w.sum() return self.rng.choice(len(w), size=shape, p=w).astype(np.int32) if dist == "peaks": w = np.ones(num_values, dtype=np.float64) for p in config["peaks"]: if 0 <= p < num_values: w[p] = config["peak_weight"] w /= w.sum() return self.rng.choice(num_values, size=shape, p=w).astype(np.int32) if dist == "normal": mean = float(config["mean_bucket"]) std = max(float(config["std_bucket"]), 0.1) vals = self.rng.normal(mean, std, size=shape) return np.clip(np.round(vals), 0, num_values - 1).astype(np.int32) if dist == "bernoulli": return (self.rng.random(size=shape) < config["p_true"]).astype(np.int32) if dist == "concentrated": return self._sample_concentrated(config, num_values, shape) raise ValueError(f"Unknown distribution type: {dist}") def _sample_concentrated( self, config: dict[str, Any], num_values: int, shape: tuple[int, int], ) -> np.ndarray: """Sample from a per-customer concentrated distribution with Zipf skew. Each customer has a fixed set of preferred values. Within the preferred set, frequency follows Zipf: the top item gets ~30% of traffic, the second ~18%, etc. This creates a learnable frequency ranking the model can exploit from sequence history. """ top_n = min(config["top_n"], num_values) concentration = config["concentration"] zipf_exp = config.get("zipf_exponent", 1.0) n_seq, n_tx = shape preferred = np.stack([ self.rng.choice(num_values, size=top_n, replace=False) for _ in range(n_seq) ]) zipf_weights = 1.0 / np.arange(1, top_n + 1, dtype=np.float64) ** zipf_exp zipf_weights /= zipf_weights.sum() use_preferred = self.rng.random(shape) < concentration pref_idx = self.rng.choice(top_n, size=shape, p=zipf_weights).astype(np.int32) pref_values = preferred[np.arange(n_seq)[:, None], pref_idx] unif_values = self.rng.integers(0, num_values, size=shape, dtype=np.int32) return np.where(use_preferred, pref_values, unif_values).astype(np.int32) # --- Core generation --- def generate(self, num_sequences: int = 100_000) -> GeneratedDataset: """Generate the full dataset with correlated features.""" N = num_sequences T = self.schema.num_transactions F = self.schema.num_features raw_values = np.zeros((N, T, F), dtype=np.int32) profiles = self._profiles_raw["profiles"] profile_weights = np.array([p["weight"] for p in profiles]) profile_weights /= profile_weights.sum() primary_indices = self.rng.choice(len(profiles), size=N, p=profile_weights) secondary_indices = self.rng.choice(len(profiles), size=N, p=profile_weights) blend_mask = self.rng.random((N, T)) < self._blend_fraction mcc_idx = self.schema.feature_index("mcc") merchant_idx = self.schema.feature_index("merchant_id") amount_idx = self.schema.feature_index("amount") entry_mode_idx = self.schema.feature_index("entry_mode") for prof_idx, profile in enumerate(profiles): primary_mask = primary_indices == prof_idx n_primary = int(primary_mask.sum()) if n_primary == 0: continue logger.debug( "Profile '%s': %d primary sequences", profile["name"], n_primary, ) for feat_idx, feature in enumerate(self.schema.features): feat_dist = profile["features"][feature.name] values = self._sample_feature(feat_dist, feature.num_values, (n_primary, T)) raw_values[primary_mask, :, feat_idx] = values for prof_idx, profile in enumerate(profiles): secondary_mask = secondary_indices == prof_idx n_secondary = int(secondary_mask.sum()) if n_secondary == 0: continue seq_indices = np.where(secondary_mask)[0] for feat_idx, feature in enumerate(self.schema.features): feat_dist = profile["features"][feature.name] alt_values = self._sample_feature( feat_dist, feature.num_values, (n_secondary, T), ) for local_i, seq_i in enumerate(seq_indices): tx_blend = blend_mask[seq_i] raw_values[seq_i, tx_blend, feat_idx] = alt_values[local_i, tx_blend] logger.info("Base features sampled with profile blending (%.0f%%)", self._blend_fraction * 100) # --- Per-customer preferred merchant set + MCC-aligned assignment --- mcc_values = raw_values[:, :, mcc_idx] merch_params = np.zeros((N, 3), dtype=np.float64) for i in range(N): prof = profiles[primary_indices[i]] md = prof["features"]["merchant_id"] merch_params[i, 0] = md.get("top_n", 8) merch_params[i, 1] = md.get("concentration", 0.88) merch_params[i, 2] = md.get("zipf_exponent", 1.0) self._assign_customer_merchants( raw_values, mcc_values, merchant_idx, merch_params, ) # --- Per-merchant amount distributions (tight std 4-7 buckets) --- self._apply_merchant_amounts(raw_values, merchant_idx, amount_idx) # --- MCC-conditional entry_mode (50% blend with profile) --- self._apply_mcc_conditional_entry_mode(raw_values, mcc_values, entry_mode_idx) # --- MCC-conditional hour-of-day --- hour_idx = self.schema.feature_index("hour") self._apply_mcc_conditional_hour(raw_values, mcc_values, hour_idx) # --- MCC-conditional is_recurring --- recurring_idx = self.schema.feature_index("is_recurring") self._apply_mcc_conditional_recurring(raw_values, mcc_values, recurring_idx) # --- Customer_tenure -> card_product correlation --- tenure_idx = self.schema.feature_index("customer_tenure") card_idx = self.schema.feature_index("card_product") self._apply_tenure_card_correlation(raw_values, tenure_idx, card_idx) # --- Country -> amount boost for international --- country_idx = self.schema.feature_index("country") self._apply_international_amount_boost(raw_values, country_idx, amount_idx) # --- Temporal autocorrelation (MCC and merchant repeat) --- self._apply_temporal_autocorrelation( raw_values, mcc_idx, merchant_idx, amount_idx, ) logger.info( "Applied feature correlations: merchant-amount, MCC-entry_mode, " "MCC-hour, MCC-recurring, tenure-card, country-amount, temporal-repeat", ) # --- Tokenize --- token_ids = (raw_values + VALUES_START).astype(np.int16) # --- Fraud injection --- tx_labels = np.zeros((N, T), dtype=np.int8) self._inject_fraud(token_ids, tx_labels, primary_indices) # --- Amount range labels (derived after fraud, before NULL) --- amount_buckets = token_ids[:, :, amount_idx].astype(np.int32) - VALUES_START amount_range_labels = amount_bucket_to_range(amount_buckets).astype(np.int8) # --- NULL injection --- self._apply_nulls(token_ids) seq_labels = tx_labels.any(axis=1).astype(np.int8) fraud_rate = seq_labels.mean() logger.info("Generated %d sequences, fraud rate: %.3f", N, fraud_rate) splits = self._compute_splits(N) tokenizer = self._build_tokenizer() fingerprint = self._compute_fingerprint(tokenizer, splits) return GeneratedDataset( token_ids=token_ids, sequence_labels=seq_labels, transaction_labels=tx_labels, amount_range_labels=amount_range_labels, split_indices=splits, fingerprint=fingerprint, ) def _assign_customer_merchants( self, raw_values: np.ndarray, mcc_values: np.ndarray, merchant_idx: int, per_customer_params: np.ndarray, ) -> None: """Assign per-customer preferred merchant sets aligned with their MCCs. Each customer gets a profile-specific number of preferred merchants drawn from their most-visited MCCs. Concentration and Zipf exponent also come from the customer's profile, so a retiree (8 merchants, 0.92 concentration) behaves differently from a digital nomad (18 merchants, 0.72 concentration). Args: per_customer_params: (N, 3) array of [preferred_count, concentration, zipf_exp] """ N, T, _ = raw_values.shape mcc_idx_col = self.schema.feature_index("mcc") for i in range(N): pref_count = int(per_customer_params[i, 0]) concentration = float(per_customer_params[i, 1]) zipf_exp = float(per_customer_params[i, 2]) zipf_w = 1.0 / np.arange(1, pref_count + 1, dtype=np.float64) ** zipf_exp zipf_w /= zipf_w.sum() customer_mccs = mcc_values[i] mcc_counts = np.bincount(customer_mccs[customer_mccs >= 0], minlength=100) top_mccs = np.argsort(mcc_counts)[-pref_count:][::-1] top_mccs = top_mccs[mcc_counts[top_mccs] > 0] preferred_merchants: list[int] = [] for mcc_val in top_mccs: candidates = self.catalog.mcc_to_merchants.get(int(mcc_val), np.array([])) if len(candidates) == 0: continue pop = self.catalog.popularity[candidates] pop = pop / pop.sum() n_from_mcc = max(1, pref_count // len(top_mccs)) n_from_mcc = min(n_from_mcc, len(candidates)) chosen = self.rng.choice(candidates, size=n_from_mcc, replace=False, p=pop) preferred_merchants.extend(chosen.tolist()) if len(preferred_merchants) == 0: raw_values[i, :, merchant_idx] = self.catalog.sample_merchants_for_mccs( customer_mccs, self.rng, ) continue pref_arr = np.array(preferred_merchants[:pref_count], dtype=np.int32) n_pref = len(pref_arr) pref_mccs = self.catalog.mcc_assignments[pref_arr] pref_zipf = zipf_w[:n_pref].copy() pref_zipf /= pref_zipf.sum() for t in range(T): tx_mcc = int(customer_mccs[t]) if self.rng.random() < concentration: same_mcc_mask = pref_mccs == tx_mcc if same_mcc_mask.any(): candidates_idx = np.where(same_mcc_mask)[0] w = pref_zipf[candidates_idx] w /= w.sum() chosen = self.rng.choice(candidates_idx, p=w) raw_values[i, t, merchant_idx] = pref_arr[chosen] else: chosen = self.rng.choice(n_pref, p=pref_zipf) raw_values[i, t, merchant_idx] = pref_arr[chosen] raw_values[i, t, mcc_idx_col] = int(pref_mccs[chosen]) else: candidates = self.catalog.mcc_to_merchants.get(tx_mcc, np.array([])) if len(candidates) > 0: pop = self.catalog.popularity[candidates] pop = pop / pop.sum() raw_values[i, t, merchant_idx] = self.rng.choice(candidates, p=pop) else: raw_values[i, t, merchant_idx] = self.rng.integers(0, len(self.catalog.merchant_ids)) def _apply_merchant_amounts( self, raw_values: np.ndarray, merchant_idx: int, amount_idx: int, ) -> None: """Sample amounts from per-merchant distributions (tight std 4-7 buckets). Each merchant has a characteristic (mean, std) derived from its MCC category with per-merchant variation. This creates a strong merchant->amount signal the model can learn. """ N, T, _ = raw_values.shape num_amount = self.schema.features[amount_idx].num_values flat_merchants = raw_values[:, :, merchant_idx].ravel() means = self.catalog.amount_mean[flat_merchants] stds = self.catalog.amount_std[flat_merchants] vals = self.rng.normal(means, stds) vals = np.clip(np.round(vals), 0, num_amount - 1).astype(np.int32) raw_values[:, :, amount_idx] = vals.reshape(N, T) def _apply_mcc_conditional_entry_mode( self, raw_values: np.ndarray, mcc_values: np.ndarray, entry_mode_idx: int, ) -> None: """Blend MCC-conditional entry_mode with profile-sampled values (50/50).""" N, T, _ = raw_values.shape num_em = self.schema.features[entry_mode_idx].num_values use_mcc = self.rng.random((N, T)) < 0.5 for (lo, hi), weights in MCC_ENTRY_MODE.items(): mask = (mcc_values >= lo) & (mcc_values <= hi) & use_mcc n_matching = int(mask.sum()) if n_matching == 0: continue w = np.array(weights, dtype=np.float64) w /= w.sum() vals = self.rng.choice(len(w), size=n_matching, p=w).astype(np.int32) raw_values[:, :, entry_mode_idx][mask] = vals def _apply_mcc_conditional_hour( self, raw_values: np.ndarray, mcc_values: np.ndarray, hour_idx: int, ) -> None: """Blend MCC-conditional hour peaks with profile-sampled hours (50/50).""" N, T, _ = raw_values.shape use_mcc = self.rng.random((N, T)) < 0.5 for (lo, hi), (peak_hours, peak_weight) in MCC_HOUR_PEAKS.items(): mask = (mcc_values >= lo) & (mcc_values <= hi) & use_mcc n_matching = int(mask.sum()) if n_matching == 0: continue w = np.ones(24, dtype=np.float64) for h in peak_hours: w[h] = peak_weight w /= w.sum() vals = self.rng.choice(24, size=n_matching, p=w).astype(np.int32) raw_values[:, :, hour_idx][mask] = vals def _apply_mcc_conditional_recurring( self, raw_values: np.ndarray, mcc_values: np.ndarray, recurring_idx: int, ) -> None: """Override is_recurring based on MCC category (70% weight).""" N, T, _ = raw_values.shape use_mcc = self.rng.random((N, T)) < 0.7 for (lo, hi), rate in MCC_RECURRING_RATE.items(): mask = (mcc_values >= lo) & (mcc_values <= hi) & use_mcc n_matching = int(mask.sum()) if n_matching == 0: continue vals = (self.rng.random(n_matching) < rate).astype(np.int32) raw_values[:, :, recurring_idx][mask] = vals def _apply_tenure_card_correlation( self, raw_values: np.ndarray, tenure_idx: int, card_idx: int, ) -> None: """Correlate card_product with customer_tenure (60% blend).""" N, T, _ = raw_values.shape tenure_values = raw_values[:, :, tenure_idx] use_corr = self.rng.random((N, T)) < 0.6 num_cards = self.schema.features[card_idx].num_values for threshold, weights in TENURE_CARD_PRODUCT: prev_threshold = 0 for prev_t, _ in TENURE_CARD_PRODUCT: if prev_t < threshold: prev_threshold = prev_t + 1 break if threshold == TENURE_CARD_PRODUCT[0][0]: mask = (tenure_values <= threshold) & use_corr else: lower = 0 for i, (t, _) in enumerate(TENURE_CARD_PRODUCT): if t == threshold and i > 0: lower = TENURE_CARD_PRODUCT[i - 1][0] + 1 break mask = (tenure_values >= lower) & (tenure_values <= threshold) & use_corr n_matching = int(mask.sum()) if n_matching == 0: continue w = np.array(weights[:num_cards], dtype=np.float64) w /= w.sum() vals = self.rng.choice(num_cards, size=n_matching, p=w).astype(np.int32) raw_values[:, :, card_idx][mask] = vals def _apply_international_amount_boost( self, raw_values: np.ndarray, country_idx: int, amount_idx: int, ) -> None: """International transactions (country > 0) get a 30% amount boost.""" country_values = raw_values[:, :, country_idx] num_amount = self.schema.features[amount_idx].num_values international = country_values > 0 n_intl = int(international.sum()) if n_intl == 0: return current = raw_values[:, :, amount_idx][international].astype(np.float64) boosted = current * 1.3 + self.rng.normal(0, 5, size=n_intl) boosted = np.clip(np.round(boosted), 0, num_amount - 1).astype(np.int32) raw_values[:, :, amount_idx][international] = boosted def _apply_temporal_autocorrelation( self, raw_values: np.ndarray, mcc_idx: int, merchant_idx: int, amount_idx: int, ) -> None: """Apply Markov-style temporal dependencies within each sequence. For each transaction t > 0: - P(repeat merchant+MCC from t-1) = 0.20 (same store again) - P(repeat MCC only from t-1) = 0.10 (same category, maybe same store) - If repeating, amount stays close to previous (std 3 buckets) """ N, T, _ = raw_values.shape num_amount = self.schema.features[amount_idx].num_values for t in range(1, T): merchant_repeat = self.rng.random(N) < 0.20 raw_values[merchant_repeat, t, merchant_idx] = raw_values[merchant_repeat, t - 1, merchant_idx] raw_values[merchant_repeat, t, mcc_idx] = raw_values[merchant_repeat, t - 1, mcc_idx] mcc_only = (self.rng.random(N) < 0.10) & ~merchant_repeat raw_values[mcc_only, t, mcc_idx] = raw_values[mcc_only, t - 1, mcc_idx] repeat_mask = merchant_repeat | mcc_only n_repeating = int(repeat_mask.sum()) if n_repeating > 0: prev_amt = raw_values[repeat_mask, t - 1, amount_idx].astype(np.float64) new_amt = prev_amt + self.rng.normal(0, 3.0, size=n_repeating) new_amt = np.clip(np.round(new_amt), 0, num_amount - 1).astype(np.int32) raw_values[repeat_mask, t, amount_idx] = new_amt # --- Fraud injection --- def _inject_fraud( self, token_ids: np.ndarray, tx_labels: np.ndarray, profile_indices: np.ndarray, ) -> None: N = token_ids.shape[0] fraud_rate = self._fraud_raw["overall_fraud_rate"] patterns = self._fraud_raw["patterns"] pattern_weights = np.array([p["weight"] for p in patterns]) pattern_weights /= pattern_weights.sum() fraud_seq_mask = self.rng.random(N) < fraud_rate fraud_indices = np.where(fraud_seq_mask)[0] if len(fraud_indices) == 0: return pattern_assignments = self.rng.choice( len(patterns), size=len(fraud_indices), p=pattern_weights, ) for i, seq_idx in enumerate(fraud_indices): pattern = patterns[pattern_assignments[i]] self._inject_single_fraud( token_ids[seq_idx], tx_labels[seq_idx], pattern, ) def _inject_single_fraud( self, seq_tokens: np.ndarray, seq_tx_labels: np.ndarray, pattern: dict[str, Any], ) -> None: T = seq_tokens.shape[0] inj = pattern["injection"] lo, hi = inj["num_transactions"] num_fraud_tx = self.rng.integers(lo, hi + 1) num_fraud_tx = min(num_fraud_tx, T) if inj["position"] == "late": max_start = max(T // 2, T - num_fraud_tx) start = self.rng.integers(T // 2, max_start + 1) else: start = self.rng.integers(0, max(T - num_fraud_tx + 1, 1)) end = min(start + num_fraud_tx, T) seq_tx_labels[start:end] = 1 overrides = pattern.get("feature_overrides", {}) blend = overrides.pop("blend_with_profile", False) if "blend_with_profile" in overrides else False span_len = end - start for feat_name, feat_dist in overrides.items(): feat_idx = self.schema.feature_index(feat_name) feat = self.schema.features[feat_idx] fraud_values = self._sample_feature(feat_dist, feat.num_values, (1, span_len)) fraud_tokens = (fraud_values[0] + VALUES_START).astype(np.int16) if blend: keep_mask = self.rng.random(span_len) < 0.5 fraud_tokens[keep_mask] = seq_tokens[start:end, feat_idx][keep_mask] seq_tokens[start:end, feat_idx] = fraud_tokens # --- NULL injection --- def _apply_nulls(self, token_ids: np.ndarray) -> None: N, T, _ = token_ids.shape null_rates = self._null_raw["null_rates"] entry_mode_idx = self.schema.feature_index("entry_mode") entry_mode_vals = token_ids[:, :, entry_mode_idx].astype(np.int32) - VALUES_START for feat_idx, feature in enumerate(self.schema.features): rate_config = null_rates.get(feature.name, 0.0) if isinstance(rate_config, (int, float)): if rate_config > 0: null_mask = self.rng.random((N, T)) < rate_config token_ids[:, :, feat_idx][null_mask] = NULL_TOKEN elif isinstance(rate_config, dict): null_mask = self.rng.random((N, T)) < rate_config.get("default", 0.0) for cond in rate_config.get("conditional", []): when_modes = cond["when"]["entry_mode"] if isinstance(when_modes, str): when_modes = [when_modes] mode_vals = [self._entry_mode_name_to_val[m] for m in when_modes] cond_positions = np.isin(entry_mode_vals, mode_vals) cond_rand = self.rng.random((N, T)) null_mask[cond_positions] = cond_rand[cond_positions] < cond["rate"] token_ids[:, :, feat_idx][null_mask] = NULL_TOKEN # --- Splits and fingerprint --- def _compute_splits(self, num_sequences: int) -> dict[str, np.ndarray]: perm = self.rng.permutation(num_sequences) n_test = int(num_sequences * 0.10) n_val = int(num_sequences * 0.05) test_idx = np.sort(perm[:n_test]) val_idx = np.sort(perm[n_test : n_test + n_val]) train_idx = np.sort(perm[n_test + n_val :]) assert len(test_idx) + len(val_idx) + len(train_idx) == num_sequences return {"train": train_idx, "val": val_idx, "test": test_idx} def _build_tokenizer(self) -> TransactionTokenizer: tokenizer = TransactionTokenizer(self.schema) for feature in self.schema.features: if feature.type == "bucketed": tokenizer.get_feature_tokenizer(feature.name).fit_uniform_from_range() return tokenizer def _compute_fingerprint( self, tokenizer: TransactionTokenizer, splits: dict[str, np.ndarray], ) -> str: hasher = hashlib.sha256() for path in sorted(self._config_paths, key=str): with open(path, "rb") as fh: hasher.update(fh.read()) state_bytes = json.dumps(tokenizer.get_state(), sort_keys=True).encode("utf-8") hasher.update(state_bytes) for key in sorted(splits.keys()): hasher.update(splits[key].tobytes()) hasher.update(str(self._seed).encode("utf-8")) return hasher.hexdigest() # --- Persistence --- def save_dataset( self, dataset: GeneratedDataset, output_dir: str | Path = "data/synthetic", ) -> None: out = Path(output_dir) out.mkdir(parents=True, exist_ok=True) np.save(out / "token_ids.npy", dataset.token_ids) np.save(out / "sequence_labels.npy", dataset.sequence_labels) np.save(out / "transaction_labels.npy", dataset.transaction_labels) np.save(out / "amount_range_labels.npy", dataset.amount_range_labels) np.savez( out / "split_indices.npz", train=dataset.split_indices["train"], val=dataset.split_indices["val"], test=dataset.split_indices["test"], ) tokenizer = self._build_tokenizer() tokenizer.save_state(out / "tokenizer_state.json") with open(out / "fingerprint.txt", "w") as fh: fh.write(dataset.fingerprint + "\n") token_mb = dataset.token_ids.nbytes / 1024 / 1024 logger.info( "Saved dataset to %s: token_ids=%.1f MB, fingerprint=%s", out, token_mb, dataset.fingerprint[:16], )