| """Synthetic transaction data generator. |
| |
| Design features: |
| 1. Merchant catalog: 10K merchants with fixed MCC assignments and Zipf |
| popularity. Creates realistic merchant-MCC correlation the model can learn. |
| 2. MCC-conditional amounts: transaction amounts are sampled from MCC-specific |
| distributions. Restaurants produce $15-80, electronics $200-2000, etc. |
| 3. Profile blending: each customer is a mixture of primary + secondary |
| profile, not a pure archetype. Continuous behavioral diversity. |
| 4. MCC-conditional entry_mode: online merchants -> CNP, restaurants -> tap/chip. |
| 5. MCC-conditional hour: restaurants peak at lunch/dinner, transit at commute. |
| 6. MCC-conditional is_recurring: subscriptions/streaming are high, restaurants low. |
| 7. Customer_tenure-card_product correlation: new customers get basic cards. |
| 8. Country-amount correlation: international transactions tend higher value. |
| |
| Output format: token_ids.npy (N,T,F), sequence_labels.npy, |
| transaction_labels.npy, split_indices.npz, fingerprint.txt. Drives the |
| training pipeline via schema.yaml and model.yaml. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import json |
| import logging |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any |
|
|
| import numpy as np |
| import yaml |
|
|
| from src.data.schema import SchemaConfig, load_schema, NULL_TOKEN, VALUES_START |
| from src.data.tokenizer import TransactionTokenizer |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
| |
| |
| |
|
|
| NUM_AMOUNT_RANGES: int = 16 |
| AMOUNT_RANGE_WIDTH: int = 16 |
|
|
| AMOUNT_RANGE_LABELS: dict[int, str] = { |
| 0: "$0-5", |
| 1: "$5-15", |
| 2: "$15-30", |
| 3: "$30-50", |
| 4: "$50-80", |
| 5: "$80-120", |
| 6: "$120-175", |
| 7: "$175-250", |
| 8: "$250-375", |
| 9: "$375-550", |
| 10: "$550-800", |
| 11: "$800-1.2K", |
| 12: "$1.2K-2K", |
| 13: "$2K-3.5K", |
| 14: "$3.5K-7.5K", |
| 15: "$7.5K+", |
| } |
|
|
|
|
| def amount_bucket_to_range(bucket: np.ndarray) -> np.ndarray: |
| """Map 256 fine-grained amount buckets (0-255) to 16 coarse ranges (0-15).""" |
| return np.clip(bucket // AMOUNT_RANGE_WIDTH, 0, NUM_AMOUNT_RANGES - 1) |
|
|
|
|
| |
| |
| |
| |
| |
|
|
| MCC_AMOUNT_MAP: dict[tuple[int, int], tuple[float, float]] = { |
| (0, 3): (25.0, 12.0), |
| (4, 9): (45.0, 20.0), |
| (10, 14): (160.0, 55.0), |
| (15, 19): (50.0, 30.0), |
| (20, 24): (80.0, 45.0), |
| (25, 29): (55.0, 30.0), |
| (30, 34): (55.0, 25.0), |
| (35, 39): (40.0, 20.0), |
| (40, 44): (185.0, 55.0), |
| (45, 49): (70.0, 35.0), |
| (50, 54): (110.0, 50.0), |
| (55, 59): (90.0, 50.0), |
| (60, 64): (50.0, 22.0), |
| (65, 69): (55.0, 30.0), |
| (70, 74): (22.0, 12.0), |
| (75, 79): (35.0, 20.0), |
| (80, 84): (30.0, 22.0), |
| (85, 89): (100.0, 55.0), |
| (90, 94): (95.0, 55.0), |
| (95, 99): (75.0, 50.0), |
| } |
|
|
|
|
| def _mcc_to_amount_params(mcc_value: int) -> tuple[float, float]: |
| """Look up amount distribution params for an MCC value.""" |
| for (lo, hi), params in MCC_AMOUNT_MAP.items(): |
| if lo <= mcc_value <= hi: |
| return params |
| return (60.0, 40.0) |
|
|
|
|
| |
| |
| |
| |
|
|
| MCC_ENTRY_MODE: dict[tuple[int, int], list[float]] = { |
| (0, 3): [2, 1, 5, 2, 0], |
| (10, 14): [1, 6, 1, 1, 0], |
| (20, 29): [0, 8, 0, 0, 1], |
| (30, 39): [3, 0, 4, 3, 0], |
| (40, 49): [3, 2, 3, 2, 0], |
| (50, 59): [2, 4, 1, 1, 1], |
| (60, 69): [3, 1, 4, 3, 0], |
| (70, 74): [2, 1, 5, 2, 0], |
| (75, 84): [0, 9, 0, 0, 0], |
| (85, 99): [3, 2, 2, 3, 0], |
| } |
|
|
|
|
| def _mcc_to_entry_mode_weights(mcc_value: int) -> list[float]: |
| """Look up entry_mode distribution for an MCC value.""" |
| for (lo, hi), weights in MCC_ENTRY_MODE.items(): |
| if lo <= mcc_value <= hi: |
| return weights |
| return [2, 3, 2, 2, 1] |
|
|
|
|
| |
| |
| |
| |
| |
| |
|
|
| MCC_HOUR_PEAKS: dict[tuple[int, int], tuple[list[int], float]] = { |
| (0, 3): ([7, 8, 9, 17, 18], 5.0), |
| (10, 14): ([9, 10, 14, 15, 20], 2.5), |
| (20, 29): ([10, 11, 20, 21, 22], 3.0), |
| (30, 31): ([12, 13, 18, 19, 20], 6.0), |
| (32, 34): ([19, 20, 21, 22, 23], 5.0), |
| (35, 39): ([11, 12, 13, 17, 18], 4.0), |
| (60, 64): ([10, 11, 16, 17, 18], 4.0), |
| (70, 71): ([12, 13, 18, 19, 21], 4.5), |
| (75, 84): ([0, 1, 6, 7], 2.0), |
| } |
|
|
|
|
| |
| |
| |
|
|
| MCC_RECURRING_RATE: dict[tuple[int, int], float] = { |
| (0, 3): 0.20, |
| (10, 14): 0.05, |
| (20, 29): 0.12, |
| (30, 39): 0.03, |
| (40, 49): 0.02, |
| (50, 54): 0.15, |
| (60, 64): 0.08, |
| (65, 69): 0.25, |
| (70, 74): 0.05, |
| (75, 79): 0.60, |
| (80, 84): 0.70, |
| (85, 94): 0.05, |
| (95, 99): 0.10, |
| } |
|
|
|
|
| |
| |
| |
| |
| |
| |
|
|
| TENURE_CARD_PRODUCT: list[tuple[int, list[float]]] = [ |
| (2, [6, 3, 1, 0, 0, 0, 0, 2, 1, 0]), |
| (5, [3, 4, 3, 1, 0, 1, 1, 1, 1, 0]), |
| (7, [2, 3, 4, 2, 1, 1, 2, 0, 1, 1]), |
| (99, [1, 2, 3, 3, 2, 1, 2, 0, 0, 1]), |
| ] |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class MerchantCatalog: |
| """10K merchants with MCC, popularity, and characteristic amount range.""" |
|
|
| merchant_ids: np.ndarray |
| mcc_assignments: np.ndarray |
| popularity: np.ndarray |
| amount_mean: np.ndarray |
| amount_std: np.ndarray |
| mcc_to_merchants: dict[int, np.ndarray] |
|
|
| @staticmethod |
| def build( |
| num_merchants: int, |
| num_mccs: int, |
| rng: np.random.Generator, |
| zipf_exponent: float = 1.2, |
| ) -> MerchantCatalog: |
| """Build a merchant catalog with MCC assignments, Zipf popularity, |
| and per-merchant amount distributions. |
| |
| Each merchant gets a characteristic amount (mean, std) derived from |
| its MCC's range with per-merchant variation. Std is kept tight (4-8 |
| buckets) so the model can learn merchant -> amount. |
| """ |
| merchant_ids = np.arange(num_merchants, dtype=np.int32) |
|
|
| mcc_merchant_counts = _sample_mcc_merchant_counts( |
| num_merchants, num_mccs, rng, |
| ) |
|
|
| mcc_assignments = np.zeros(num_merchants, dtype=np.int32) |
| offset = 0 |
| for mcc, count in enumerate(mcc_merchant_counts): |
| mcc_assignments[offset : offset + count] = mcc |
| offset += count |
|
|
| rng.shuffle(mcc_assignments) |
|
|
| raw_pop = 1.0 / np.arange(1, num_merchants + 1, dtype=np.float64) ** zipf_exponent |
| rng.shuffle(raw_pop) |
| popularity = raw_pop / raw_pop.sum() |
|
|
| amount_mean = np.zeros(num_merchants, dtype=np.float64) |
| amount_std = np.zeros(num_merchants, dtype=np.float64) |
| for i in range(num_merchants): |
| mcc_mean, mcc_std = _mcc_to_amount_params(int(mcc_assignments[i])) |
| amount_mean[i] = np.clip( |
| mcc_mean + rng.normal(0, mcc_std * 0.3), 1.0, 254.0, |
| ) |
| amount_std[i] = np.clip(rng.uniform(3.0, 7.0), 2.0, 10.0) |
|
|
| mcc_to_merchants: dict[int, np.ndarray] = {} |
| for mcc in range(num_mccs): |
| mask = mcc_assignments == mcc |
| mcc_to_merchants[mcc] = np.where(mask)[0].astype(np.int32) |
|
|
| return MerchantCatalog( |
| merchant_ids=merchant_ids, |
| mcc_assignments=mcc_assignments, |
| popularity=popularity, |
| amount_mean=amount_mean, |
| amount_std=amount_std, |
| mcc_to_merchants=mcc_to_merchants, |
| ) |
|
|
| def sample_merchants_for_mccs( |
| self, |
| mcc_values: np.ndarray, |
| rng: np.random.Generator, |
| ) -> np.ndarray: |
| """Given MCC values (any shape), return merchant IDs from that MCC. |
| |
| Falls back to popularity-weighted global sampling when an MCC has |
| no assigned merchants (shouldn't happen with proper catalog). |
| """ |
| flat_mccs = mcc_values.ravel() |
| result = np.zeros(len(flat_mccs), dtype=np.int32) |
|
|
| unique_mccs = np.unique(flat_mccs) |
| for mcc in unique_mccs: |
| positions = np.where(flat_mccs == mcc)[0] |
| merchants = self.mcc_to_merchants.get(int(mcc)) |
| if merchants is None or len(merchants) == 0: |
| result[positions] = rng.integers(0, len(self.merchant_ids), size=len(positions)) |
| continue |
| pop = self.popularity[merchants] |
| pop = pop / pop.sum() |
| chosen = rng.choice(merchants, size=len(positions), p=pop) |
| result[positions] = chosen |
|
|
| return result.reshape(mcc_values.shape) |
|
|
|
|
| def _sample_mcc_merchant_counts( |
| num_merchants: int, num_mccs: int, rng: np.random.Generator, |
| ) -> np.ndarray: |
| """Distribute merchants across MCCs with realistic category sizes. |
| |
| Restaurants/retail get more merchants (fragmented industries). |
| Airlines/utilities get fewer (concentrated industries). |
| """ |
| base_weights = np.ones(num_mccs, dtype=np.float64) |
|
|
| category_weights = { |
| (0, 3): 0.6, |
| (4, 9): 0.8, |
| (10, 14): 0.3, |
| (15, 19): 0.5, |
| (20, 24): 1.5, |
| (25, 29): 1.0, |
| (30, 34): 2.0, |
| (35, 39): 1.2, |
| (40, 44): 0.5, |
| (45, 49): 1.3, |
| (50, 54): 0.8, |
| (55, 59): 0.7, |
| (60, 64): 1.0, |
| (65, 69): 0.8, |
| (70, 74): 1.5, |
| (75, 79): 0.6, |
| (80, 84): 0.5, |
| (85, 89): 0.7, |
| (90, 94): 0.9, |
| (95, 99): 0.4, |
| } |
|
|
| for (lo, hi), w in category_weights.items(): |
| lo_clamped = min(lo, num_mccs - 1) |
| hi_clamped = min(hi, num_mccs - 1) |
| base_weights[lo_clamped : hi_clamped + 1] = w |
|
|
| noise = rng.uniform(0.8, 1.2, size=num_mccs) |
| weights = base_weights * noise |
| weights /= weights.sum() |
|
|
| counts = (weights * num_merchants).astype(np.int32) |
| remainder = num_merchants - counts.sum() |
| if remainder > 0: |
| top_idx = np.argsort(weights)[-remainder:] |
| counts[top_idx] += 1 |
| elif remainder < 0: |
| top_idx = np.argsort(counts)[remainder:] |
| counts[top_idx] -= 1 |
|
|
| assert counts.sum() == num_merchants |
| return counts |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class GeneratedDataset: |
| """Output of the generation pipeline.""" |
|
|
| token_ids: np.ndarray |
| sequence_labels: np.ndarray |
| transaction_labels: np.ndarray |
| amount_range_labels: np.ndarray |
| split_indices: dict[str, np.ndarray] |
| fingerprint: str |
|
|
|
|
| |
| |
| |
|
|
| class DataGenerator: |
| """Enhanced data generator with merchant catalog and correlated features.""" |
|
|
| def __init__( |
| self, |
| schema_path: str | Path = "data/schema.yaml", |
| profiles_path: str | Path = "data/profiles.yaml", |
| fraud_path: str | Path = "data/fraud_patterns.yaml", |
| null_rates_path: str | Path = "data/null_rates.yaml", |
| seed: int = 42, |
| blend_fraction: float = 0.25, |
| ) -> None: |
| self.schema = load_schema(schema_path) |
| self._profiles_raw = self._load_yaml(profiles_path) |
| self._fraud_raw = self._load_yaml(fraud_path) |
| self._null_raw = self._load_yaml(null_rates_path) |
| self._config_paths = [ |
| Path(p) for p in [schema_path, profiles_path, fraud_path, null_rates_path] |
| ] |
| self.rng = np.random.default_rng(seed) |
| self._seed = seed |
| self._blend_fraction = blend_fraction |
|
|
| em = self.schema.get_feature("entry_mode") |
| self._entry_mode_name_to_val: dict[str, int] = {} |
| if em.values: |
| self._entry_mode_name_to_val = {v: k for k, v in em.values.items()} |
|
|
| n_merchants = self.schema.get_feature("merchant_id").num_values |
| n_mccs = self.schema.get_feature("mcc").num_values |
| self.catalog = MerchantCatalog.build(n_merchants, n_mccs, self.rng) |
| logger.info( |
| "Built merchant catalog: %d merchants across %d MCCs", |
| n_merchants, n_mccs, |
| ) |
|
|
| @staticmethod |
| def _load_yaml(path: str | Path) -> dict[str, Any]: |
| with open(path) as fh: |
| return yaml.safe_load(fh) |
|
|
| |
|
|
| def _sample_feature( |
| self, config: dict[str, Any], num_values: int, shape: tuple[int, int], |
| ) -> np.ndarray: |
| dist = config["distribution"] |
|
|
| if dist == "uniform": |
| return self.rng.integers(0, num_values, size=shape, dtype=np.int32) |
|
|
| if dist == "weights": |
| w = np.array(config["weights"], dtype=np.float64) |
| w /= w.sum() |
| return self.rng.choice(len(w), size=shape, p=w).astype(np.int32) |
|
|
| if dist == "peaks": |
| w = np.ones(num_values, dtype=np.float64) |
| for p in config["peaks"]: |
| if 0 <= p < num_values: |
| w[p] = config["peak_weight"] |
| w /= w.sum() |
| return self.rng.choice(num_values, size=shape, p=w).astype(np.int32) |
|
|
| if dist == "normal": |
| mean = float(config["mean_bucket"]) |
| std = max(float(config["std_bucket"]), 0.1) |
| vals = self.rng.normal(mean, std, size=shape) |
| return np.clip(np.round(vals), 0, num_values - 1).astype(np.int32) |
|
|
| if dist == "bernoulli": |
| return (self.rng.random(size=shape) < config["p_true"]).astype(np.int32) |
|
|
| if dist == "concentrated": |
| return self._sample_concentrated(config, num_values, shape) |
|
|
| raise ValueError(f"Unknown distribution type: {dist}") |
|
|
| def _sample_concentrated( |
| self, config: dict[str, Any], num_values: int, shape: tuple[int, int], |
| ) -> np.ndarray: |
| """Sample from a per-customer concentrated distribution with Zipf skew. |
| |
| Each customer has a fixed set of preferred values. Within the preferred |
| set, frequency follows Zipf: the top item gets ~30% of traffic, the |
| second ~18%, etc. This creates a learnable frequency ranking the model |
| can exploit from sequence history. |
| """ |
| top_n = min(config["top_n"], num_values) |
| concentration = config["concentration"] |
| zipf_exp = config.get("zipf_exponent", 1.0) |
| n_seq, n_tx = shape |
|
|
| preferred = np.stack([ |
| self.rng.choice(num_values, size=top_n, replace=False) |
| for _ in range(n_seq) |
| ]) |
|
|
| zipf_weights = 1.0 / np.arange(1, top_n + 1, dtype=np.float64) ** zipf_exp |
| zipf_weights /= zipf_weights.sum() |
|
|
| use_preferred = self.rng.random(shape) < concentration |
| pref_idx = self.rng.choice(top_n, size=shape, p=zipf_weights).astype(np.int32) |
| pref_values = preferred[np.arange(n_seq)[:, None], pref_idx] |
| unif_values = self.rng.integers(0, num_values, size=shape, dtype=np.int32) |
|
|
| return np.where(use_preferred, pref_values, unif_values).astype(np.int32) |
|
|
| |
|
|
| def generate(self, num_sequences: int = 100_000) -> GeneratedDataset: |
| """Generate the full dataset with correlated features.""" |
| N = num_sequences |
| T = self.schema.num_transactions |
| F = self.schema.num_features |
|
|
| raw_values = np.zeros((N, T, F), dtype=np.int32) |
| profiles = self._profiles_raw["profiles"] |
| profile_weights = np.array([p["weight"] for p in profiles]) |
| profile_weights /= profile_weights.sum() |
|
|
| primary_indices = self.rng.choice(len(profiles), size=N, p=profile_weights) |
| secondary_indices = self.rng.choice(len(profiles), size=N, p=profile_weights) |
| blend_mask = self.rng.random((N, T)) < self._blend_fraction |
|
|
| mcc_idx = self.schema.feature_index("mcc") |
| merchant_idx = self.schema.feature_index("merchant_id") |
| amount_idx = self.schema.feature_index("amount") |
| entry_mode_idx = self.schema.feature_index("entry_mode") |
|
|
| for prof_idx, profile in enumerate(profiles): |
| primary_mask = primary_indices == prof_idx |
| n_primary = int(primary_mask.sum()) |
| if n_primary == 0: |
| continue |
|
|
| logger.debug( |
| "Profile '%s': %d primary sequences", profile["name"], n_primary, |
| ) |
|
|
| for feat_idx, feature in enumerate(self.schema.features): |
| feat_dist = profile["features"][feature.name] |
| values = self._sample_feature(feat_dist, feature.num_values, (n_primary, T)) |
| raw_values[primary_mask, :, feat_idx] = values |
|
|
| for prof_idx, profile in enumerate(profiles): |
| secondary_mask = secondary_indices == prof_idx |
| n_secondary = int(secondary_mask.sum()) |
| if n_secondary == 0: |
| continue |
|
|
| seq_indices = np.where(secondary_mask)[0] |
| for feat_idx, feature in enumerate(self.schema.features): |
| feat_dist = profile["features"][feature.name] |
| alt_values = self._sample_feature( |
| feat_dist, feature.num_values, (n_secondary, T), |
| ) |
| for local_i, seq_i in enumerate(seq_indices): |
| tx_blend = blend_mask[seq_i] |
| raw_values[seq_i, tx_blend, feat_idx] = alt_values[local_i, tx_blend] |
|
|
| logger.info("Base features sampled with profile blending (%.0f%%)", self._blend_fraction * 100) |
|
|
| |
| mcc_values = raw_values[:, :, mcc_idx] |
| merch_params = np.zeros((N, 3), dtype=np.float64) |
| for i in range(N): |
| prof = profiles[primary_indices[i]] |
| md = prof["features"]["merchant_id"] |
| merch_params[i, 0] = md.get("top_n", 8) |
| merch_params[i, 1] = md.get("concentration", 0.88) |
| merch_params[i, 2] = md.get("zipf_exponent", 1.0) |
| self._assign_customer_merchants( |
| raw_values, mcc_values, merchant_idx, merch_params, |
| ) |
|
|
| |
| self._apply_merchant_amounts(raw_values, merchant_idx, amount_idx) |
|
|
| |
| self._apply_mcc_conditional_entry_mode(raw_values, mcc_values, entry_mode_idx) |
|
|
| |
| hour_idx = self.schema.feature_index("hour") |
| self._apply_mcc_conditional_hour(raw_values, mcc_values, hour_idx) |
|
|
| |
| recurring_idx = self.schema.feature_index("is_recurring") |
| self._apply_mcc_conditional_recurring(raw_values, mcc_values, recurring_idx) |
|
|
| |
| tenure_idx = self.schema.feature_index("customer_tenure") |
| card_idx = self.schema.feature_index("card_product") |
| self._apply_tenure_card_correlation(raw_values, tenure_idx, card_idx) |
|
|
| |
| country_idx = self.schema.feature_index("country") |
| self._apply_international_amount_boost(raw_values, country_idx, amount_idx) |
|
|
| |
| self._apply_temporal_autocorrelation( |
| raw_values, mcc_idx, merchant_idx, amount_idx, |
| ) |
|
|
| logger.info( |
| "Applied feature correlations: merchant-amount, MCC-entry_mode, " |
| "MCC-hour, MCC-recurring, tenure-card, country-amount, temporal-repeat", |
| ) |
|
|
| |
| token_ids = (raw_values + VALUES_START).astype(np.int16) |
|
|
| |
| tx_labels = np.zeros((N, T), dtype=np.int8) |
| self._inject_fraud(token_ids, tx_labels, primary_indices) |
|
|
| |
| amount_buckets = token_ids[:, :, amount_idx].astype(np.int32) - VALUES_START |
| amount_range_labels = amount_bucket_to_range(amount_buckets).astype(np.int8) |
|
|
| |
| self._apply_nulls(token_ids) |
|
|
| seq_labels = tx_labels.any(axis=1).astype(np.int8) |
| fraud_rate = seq_labels.mean() |
| logger.info("Generated %d sequences, fraud rate: %.3f", N, fraud_rate) |
|
|
| splits = self._compute_splits(N) |
| tokenizer = self._build_tokenizer() |
| fingerprint = self._compute_fingerprint(tokenizer, splits) |
|
|
| return GeneratedDataset( |
| token_ids=token_ids, |
| sequence_labels=seq_labels, |
| transaction_labels=tx_labels, |
| amount_range_labels=amount_range_labels, |
| split_indices=splits, |
| fingerprint=fingerprint, |
| ) |
|
|
| def _assign_customer_merchants( |
| self, |
| raw_values: np.ndarray, |
| mcc_values: np.ndarray, |
| merchant_idx: int, |
| per_customer_params: np.ndarray, |
| ) -> None: |
| """Assign per-customer preferred merchant sets aligned with their MCCs. |
| |
| Each customer gets a profile-specific number of preferred merchants |
| drawn from their most-visited MCCs. Concentration and Zipf exponent |
| also come from the customer's profile, so a retiree (8 merchants, |
| 0.92 concentration) behaves differently from a digital nomad |
| (18 merchants, 0.72 concentration). |
| |
| Args: |
| per_customer_params: (N, 3) array of [preferred_count, concentration, zipf_exp] |
| """ |
| N, T, _ = raw_values.shape |
| mcc_idx_col = self.schema.feature_index("mcc") |
|
|
| for i in range(N): |
| pref_count = int(per_customer_params[i, 0]) |
| concentration = float(per_customer_params[i, 1]) |
| zipf_exp = float(per_customer_params[i, 2]) |
|
|
| zipf_w = 1.0 / np.arange(1, pref_count + 1, dtype=np.float64) ** zipf_exp |
| zipf_w /= zipf_w.sum() |
|
|
| customer_mccs = mcc_values[i] |
| mcc_counts = np.bincount(customer_mccs[customer_mccs >= 0], minlength=100) |
| top_mccs = np.argsort(mcc_counts)[-pref_count:][::-1] |
| top_mccs = top_mccs[mcc_counts[top_mccs] > 0] |
|
|
| preferred_merchants: list[int] = [] |
| for mcc_val in top_mccs: |
| candidates = self.catalog.mcc_to_merchants.get(int(mcc_val), np.array([])) |
| if len(candidates) == 0: |
| continue |
| pop = self.catalog.popularity[candidates] |
| pop = pop / pop.sum() |
| n_from_mcc = max(1, pref_count // len(top_mccs)) |
| n_from_mcc = min(n_from_mcc, len(candidates)) |
| chosen = self.rng.choice(candidates, size=n_from_mcc, replace=False, p=pop) |
| preferred_merchants.extend(chosen.tolist()) |
|
|
| if len(preferred_merchants) == 0: |
| raw_values[i, :, merchant_idx] = self.catalog.sample_merchants_for_mccs( |
| customer_mccs, self.rng, |
| ) |
| continue |
|
|
| pref_arr = np.array(preferred_merchants[:pref_count], dtype=np.int32) |
| n_pref = len(pref_arr) |
| pref_mccs = self.catalog.mcc_assignments[pref_arr] |
|
|
| pref_zipf = zipf_w[:n_pref].copy() |
| pref_zipf /= pref_zipf.sum() |
|
|
| for t in range(T): |
| tx_mcc = int(customer_mccs[t]) |
| if self.rng.random() < concentration: |
| same_mcc_mask = pref_mccs == tx_mcc |
| if same_mcc_mask.any(): |
| candidates_idx = np.where(same_mcc_mask)[0] |
| w = pref_zipf[candidates_idx] |
| w /= w.sum() |
| chosen = self.rng.choice(candidates_idx, p=w) |
| raw_values[i, t, merchant_idx] = pref_arr[chosen] |
| else: |
| chosen = self.rng.choice(n_pref, p=pref_zipf) |
| raw_values[i, t, merchant_idx] = pref_arr[chosen] |
| raw_values[i, t, mcc_idx_col] = int(pref_mccs[chosen]) |
| else: |
| candidates = self.catalog.mcc_to_merchants.get(tx_mcc, np.array([])) |
| if len(candidates) > 0: |
| pop = self.catalog.popularity[candidates] |
| pop = pop / pop.sum() |
| raw_values[i, t, merchant_idx] = self.rng.choice(candidates, p=pop) |
| else: |
| raw_values[i, t, merchant_idx] = self.rng.integers(0, len(self.catalog.merchant_ids)) |
|
|
| def _apply_merchant_amounts( |
| self, |
| raw_values: np.ndarray, |
| merchant_idx: int, |
| amount_idx: int, |
| ) -> None: |
| """Sample amounts from per-merchant distributions (tight std 4-7 buckets). |
| |
| Each merchant has a characteristic (mean, std) derived from its MCC |
| category with per-merchant variation. This creates a strong |
| merchant->amount signal the model can learn. |
| """ |
| N, T, _ = raw_values.shape |
| num_amount = self.schema.features[amount_idx].num_values |
| flat_merchants = raw_values[:, :, merchant_idx].ravel() |
|
|
| means = self.catalog.amount_mean[flat_merchants] |
| stds = self.catalog.amount_std[flat_merchants] |
| vals = self.rng.normal(means, stds) |
| vals = np.clip(np.round(vals), 0, num_amount - 1).astype(np.int32) |
| raw_values[:, :, amount_idx] = vals.reshape(N, T) |
|
|
| def _apply_mcc_conditional_entry_mode( |
| self, |
| raw_values: np.ndarray, |
| mcc_values: np.ndarray, |
| entry_mode_idx: int, |
| ) -> None: |
| """Blend MCC-conditional entry_mode with profile-sampled values (50/50).""" |
| N, T, _ = raw_values.shape |
| num_em = self.schema.features[entry_mode_idx].num_values |
| use_mcc = self.rng.random((N, T)) < 0.5 |
|
|
| for (lo, hi), weights in MCC_ENTRY_MODE.items(): |
| mask = (mcc_values >= lo) & (mcc_values <= hi) & use_mcc |
| n_matching = int(mask.sum()) |
| if n_matching == 0: |
| continue |
| w = np.array(weights, dtype=np.float64) |
| w /= w.sum() |
| vals = self.rng.choice(len(w), size=n_matching, p=w).astype(np.int32) |
| raw_values[:, :, entry_mode_idx][mask] = vals |
|
|
| def _apply_mcc_conditional_hour( |
| self, |
| raw_values: np.ndarray, |
| mcc_values: np.ndarray, |
| hour_idx: int, |
| ) -> None: |
| """Blend MCC-conditional hour peaks with profile-sampled hours (50/50).""" |
| N, T, _ = raw_values.shape |
| use_mcc = self.rng.random((N, T)) < 0.5 |
|
|
| for (lo, hi), (peak_hours, peak_weight) in MCC_HOUR_PEAKS.items(): |
| mask = (mcc_values >= lo) & (mcc_values <= hi) & use_mcc |
| n_matching = int(mask.sum()) |
| if n_matching == 0: |
| continue |
| w = np.ones(24, dtype=np.float64) |
| for h in peak_hours: |
| w[h] = peak_weight |
| w /= w.sum() |
| vals = self.rng.choice(24, size=n_matching, p=w).astype(np.int32) |
| raw_values[:, :, hour_idx][mask] = vals |
|
|
| def _apply_mcc_conditional_recurring( |
| self, |
| raw_values: np.ndarray, |
| mcc_values: np.ndarray, |
| recurring_idx: int, |
| ) -> None: |
| """Override is_recurring based on MCC category (70% weight).""" |
| N, T, _ = raw_values.shape |
| use_mcc = self.rng.random((N, T)) < 0.7 |
|
|
| for (lo, hi), rate in MCC_RECURRING_RATE.items(): |
| mask = (mcc_values >= lo) & (mcc_values <= hi) & use_mcc |
| n_matching = int(mask.sum()) |
| if n_matching == 0: |
| continue |
| vals = (self.rng.random(n_matching) < rate).astype(np.int32) |
| raw_values[:, :, recurring_idx][mask] = vals |
|
|
| def _apply_tenure_card_correlation( |
| self, |
| raw_values: np.ndarray, |
| tenure_idx: int, |
| card_idx: int, |
| ) -> None: |
| """Correlate card_product with customer_tenure (60% blend).""" |
| N, T, _ = raw_values.shape |
| tenure_values = raw_values[:, :, tenure_idx] |
| use_corr = self.rng.random((N, T)) < 0.6 |
| num_cards = self.schema.features[card_idx].num_values |
|
|
| for threshold, weights in TENURE_CARD_PRODUCT: |
| prev_threshold = 0 |
| for prev_t, _ in TENURE_CARD_PRODUCT: |
| if prev_t < threshold: |
| prev_threshold = prev_t + 1 |
| break |
|
|
| if threshold == TENURE_CARD_PRODUCT[0][0]: |
| mask = (tenure_values <= threshold) & use_corr |
| else: |
| lower = 0 |
| for i, (t, _) in enumerate(TENURE_CARD_PRODUCT): |
| if t == threshold and i > 0: |
| lower = TENURE_CARD_PRODUCT[i - 1][0] + 1 |
| break |
| mask = (tenure_values >= lower) & (tenure_values <= threshold) & use_corr |
|
|
| n_matching = int(mask.sum()) |
| if n_matching == 0: |
| continue |
| w = np.array(weights[:num_cards], dtype=np.float64) |
| w /= w.sum() |
| vals = self.rng.choice(num_cards, size=n_matching, p=w).astype(np.int32) |
| raw_values[:, :, card_idx][mask] = vals |
|
|
| def _apply_international_amount_boost( |
| self, |
| raw_values: np.ndarray, |
| country_idx: int, |
| amount_idx: int, |
| ) -> None: |
| """International transactions (country > 0) get a 30% amount boost.""" |
| country_values = raw_values[:, :, country_idx] |
| num_amount = self.schema.features[amount_idx].num_values |
| international = country_values > 0 |
| n_intl = int(international.sum()) |
| if n_intl == 0: |
| return |
| current = raw_values[:, :, amount_idx][international].astype(np.float64) |
| boosted = current * 1.3 + self.rng.normal(0, 5, size=n_intl) |
| boosted = np.clip(np.round(boosted), 0, num_amount - 1).astype(np.int32) |
| raw_values[:, :, amount_idx][international] = boosted |
|
|
| def _apply_temporal_autocorrelation( |
| self, |
| raw_values: np.ndarray, |
| mcc_idx: int, |
| merchant_idx: int, |
| amount_idx: int, |
| ) -> None: |
| """Apply Markov-style temporal dependencies within each sequence. |
| |
| For each transaction t > 0: |
| - P(repeat merchant+MCC from t-1) = 0.20 (same store again) |
| - P(repeat MCC only from t-1) = 0.10 (same category, maybe same store) |
| - If repeating, amount stays close to previous (std 3 buckets) |
| """ |
| N, T, _ = raw_values.shape |
| num_amount = self.schema.features[amount_idx].num_values |
|
|
| for t in range(1, T): |
| merchant_repeat = self.rng.random(N) < 0.20 |
| raw_values[merchant_repeat, t, merchant_idx] = raw_values[merchant_repeat, t - 1, merchant_idx] |
| raw_values[merchant_repeat, t, mcc_idx] = raw_values[merchant_repeat, t - 1, mcc_idx] |
|
|
| mcc_only = (self.rng.random(N) < 0.10) & ~merchant_repeat |
| raw_values[mcc_only, t, mcc_idx] = raw_values[mcc_only, t - 1, mcc_idx] |
|
|
| repeat_mask = merchant_repeat | mcc_only |
| n_repeating = int(repeat_mask.sum()) |
| if n_repeating > 0: |
| prev_amt = raw_values[repeat_mask, t - 1, amount_idx].astype(np.float64) |
| new_amt = prev_amt + self.rng.normal(0, 3.0, size=n_repeating) |
| new_amt = np.clip(np.round(new_amt), 0, num_amount - 1).astype(np.int32) |
| raw_values[repeat_mask, t, amount_idx] = new_amt |
|
|
| |
|
|
| def _inject_fraud( |
| self, |
| token_ids: np.ndarray, |
| tx_labels: np.ndarray, |
| profile_indices: np.ndarray, |
| ) -> None: |
| N = token_ids.shape[0] |
| fraud_rate = self._fraud_raw["overall_fraud_rate"] |
| patterns = self._fraud_raw["patterns"] |
| pattern_weights = np.array([p["weight"] for p in patterns]) |
| pattern_weights /= pattern_weights.sum() |
|
|
| fraud_seq_mask = self.rng.random(N) < fraud_rate |
| fraud_indices = np.where(fraud_seq_mask)[0] |
|
|
| if len(fraud_indices) == 0: |
| return |
|
|
| pattern_assignments = self.rng.choice( |
| len(patterns), size=len(fraud_indices), p=pattern_weights, |
| ) |
|
|
| for i, seq_idx in enumerate(fraud_indices): |
| pattern = patterns[pattern_assignments[i]] |
| self._inject_single_fraud( |
| token_ids[seq_idx], tx_labels[seq_idx], pattern, |
| ) |
|
|
| def _inject_single_fraud( |
| self, |
| seq_tokens: np.ndarray, |
| seq_tx_labels: np.ndarray, |
| pattern: dict[str, Any], |
| ) -> None: |
| T = seq_tokens.shape[0] |
| inj = pattern["injection"] |
| lo, hi = inj["num_transactions"] |
| num_fraud_tx = self.rng.integers(lo, hi + 1) |
| num_fraud_tx = min(num_fraud_tx, T) |
|
|
| if inj["position"] == "late": |
| max_start = max(T // 2, T - num_fraud_tx) |
| start = self.rng.integers(T // 2, max_start + 1) |
| else: |
| start = self.rng.integers(0, max(T - num_fraud_tx + 1, 1)) |
|
|
| end = min(start + num_fraud_tx, T) |
| seq_tx_labels[start:end] = 1 |
|
|
| overrides = pattern.get("feature_overrides", {}) |
| blend = overrides.pop("blend_with_profile", False) if "blend_with_profile" in overrides else False |
| span_len = end - start |
|
|
| for feat_name, feat_dist in overrides.items(): |
| feat_idx = self.schema.feature_index(feat_name) |
| feat = self.schema.features[feat_idx] |
| fraud_values = self._sample_feature(feat_dist, feat.num_values, (1, span_len)) |
| fraud_tokens = (fraud_values[0] + VALUES_START).astype(np.int16) |
|
|
| if blend: |
| keep_mask = self.rng.random(span_len) < 0.5 |
| fraud_tokens[keep_mask] = seq_tokens[start:end, feat_idx][keep_mask] |
|
|
| seq_tokens[start:end, feat_idx] = fraud_tokens |
|
|
| |
|
|
| def _apply_nulls(self, token_ids: np.ndarray) -> None: |
| N, T, _ = token_ids.shape |
| null_rates = self._null_raw["null_rates"] |
| entry_mode_idx = self.schema.feature_index("entry_mode") |
| entry_mode_vals = token_ids[:, :, entry_mode_idx].astype(np.int32) - VALUES_START |
|
|
| for feat_idx, feature in enumerate(self.schema.features): |
| rate_config = null_rates.get(feature.name, 0.0) |
|
|
| if isinstance(rate_config, (int, float)): |
| if rate_config > 0: |
| null_mask = self.rng.random((N, T)) < rate_config |
| token_ids[:, :, feat_idx][null_mask] = NULL_TOKEN |
| elif isinstance(rate_config, dict): |
| null_mask = self.rng.random((N, T)) < rate_config.get("default", 0.0) |
|
|
| for cond in rate_config.get("conditional", []): |
| when_modes = cond["when"]["entry_mode"] |
| if isinstance(when_modes, str): |
| when_modes = [when_modes] |
| mode_vals = [self._entry_mode_name_to_val[m] for m in when_modes] |
| cond_positions = np.isin(entry_mode_vals, mode_vals) |
| cond_rand = self.rng.random((N, T)) |
| null_mask[cond_positions] = cond_rand[cond_positions] < cond["rate"] |
|
|
| token_ids[:, :, feat_idx][null_mask] = NULL_TOKEN |
|
|
| |
|
|
| def _compute_splits(self, num_sequences: int) -> dict[str, np.ndarray]: |
| perm = self.rng.permutation(num_sequences) |
| n_test = int(num_sequences * 0.10) |
| n_val = int(num_sequences * 0.05) |
|
|
| test_idx = np.sort(perm[:n_test]) |
| val_idx = np.sort(perm[n_test : n_test + n_val]) |
| train_idx = np.sort(perm[n_test + n_val :]) |
|
|
| assert len(test_idx) + len(val_idx) + len(train_idx) == num_sequences |
| return {"train": train_idx, "val": val_idx, "test": test_idx} |
|
|
| def _build_tokenizer(self) -> TransactionTokenizer: |
| tokenizer = TransactionTokenizer(self.schema) |
| for feature in self.schema.features: |
| if feature.type == "bucketed": |
| tokenizer.get_feature_tokenizer(feature.name).fit_uniform_from_range() |
| return tokenizer |
|
|
| def _compute_fingerprint( |
| self, tokenizer: TransactionTokenizer, splits: dict[str, np.ndarray], |
| ) -> str: |
| hasher = hashlib.sha256() |
| for path in sorted(self._config_paths, key=str): |
| with open(path, "rb") as fh: |
| hasher.update(fh.read()) |
| state_bytes = json.dumps(tokenizer.get_state(), sort_keys=True).encode("utf-8") |
| hasher.update(state_bytes) |
| for key in sorted(splits.keys()): |
| hasher.update(splits[key].tobytes()) |
| hasher.update(str(self._seed).encode("utf-8")) |
| return hasher.hexdigest() |
|
|
| |
|
|
| def save_dataset( |
| self, dataset: GeneratedDataset, output_dir: str | Path = "data/synthetic", |
| ) -> None: |
| out = Path(output_dir) |
| out.mkdir(parents=True, exist_ok=True) |
|
|
| np.save(out / "token_ids.npy", dataset.token_ids) |
| np.save(out / "sequence_labels.npy", dataset.sequence_labels) |
| np.save(out / "transaction_labels.npy", dataset.transaction_labels) |
| np.save(out / "amount_range_labels.npy", dataset.amount_range_labels) |
|
|
| np.savez( |
| out / "split_indices.npz", |
| train=dataset.split_indices["train"], |
| val=dataset.split_indices["val"], |
| test=dataset.split_indices["test"], |
| ) |
|
|
| tokenizer = self._build_tokenizer() |
| tokenizer.save_state(out / "tokenizer_state.json") |
|
|
| with open(out / "fingerprint.txt", "w") as fh: |
| fh.write(dataset.fingerprint + "\n") |
|
|
| token_mb = dataset.token_ids.nbytes / 1024 / 1024 |
| logger.info( |
| "Saved dataset to %s: token_ids=%.1f MB, fingerprint=%s", |
| out, token_mb, dataset.fingerprint[:16], |
| ) |
|
|