| """Data loader for the encoder subproject. |
| |
| Reuses the parent's `FinetuneDataset` verbatim — the parent's tokenized arrays |
| at `data/synthetic/` are already shaped `(N, 64, 15)`, which is exactly what |
| the per-transaction encoder needs. This module is a thin orchestrator that |
| resolves data paths (via the `encoder/data/synthetic -> ../../data/synthetic` |
| symlink), builds train/val/test loaders, and exposes a fingerprint-verification |
| helper so accidental data regeneration breaks fast. |
| |
| Why we don't define a new Dataset class: the encoder's input contract is |
| identical to the parent's (`(B, 64, 15) int64` plus fraud + amount_range |
| labels). The only thing that changes is what the model does with those |
| tokens. Keeping the Dataset shared guarantees apples-to-apples comparison. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from pathlib import Path |
|
|
| import numpy as np |
| import torch |
| from torch.utils.data import DataLoader |
|
|
| from src.training.finetune import FinetuneDataset |
|
|
|
|
| def load_data_arrays( |
| data_dir: Path | str, |
| ) -> tuple[np.ndarray, np.ndarray, np.ndarray | None, dict[str, np.ndarray]]: |
| """Load raw token arrays + split indices from `data_dir`. |
| |
| Returns: |
| token_ids: (N, 64, 15) int16 |
| sequence_labels: (N,) int8 (fraud) |
| ar_targets: (N,) int8 last-transaction amount_range, or None if file absent |
| splits: dict with keys 'train' / 'val' / 'test', each int64 indices |
| """ |
| data_dir = Path(data_dir) |
| token_ids = np.load(data_dir / "token_ids.npy") |
| sequence_labels = np.load(data_dir / "sequence_labels.npy") |
| splits = dict(np.load(data_dir / "split_indices.npz")) |
|
|
| ar_path = data_dir / "amount_range_labels.npy" |
| ar_targets: np.ndarray | None = None |
| if ar_path.exists(): |
| |
| |
| ar_all = np.load(ar_path) |
| ar_targets = ar_all[:, -1] |
|
|
| return token_ids, sequence_labels, ar_targets, splits |
|
|
|
|
| def verify_fingerprint(data_dir: Path | str, expected: str) -> None: |
| """Raise if data fingerprint differs from `expected`. |
| |
| Catches the silent failure where data has been regenerated under us — |
| in which case head-to-head comparison numbers against the parent's |
| already-published eval.md.json would not be apples-to-apples. |
| """ |
| fp_path = Path(data_dir) / "fingerprint.txt" |
| if not fp_path.exists(): |
| raise FileNotFoundError( |
| f"No fingerprint.txt at {fp_path}. Encoder relies on the parent's " |
| f"data/synthetic/ for head-to-head; regenerate via parent's " |
| f"`python -m scripts.generate` if missing.", |
| ) |
| actual = fp_path.read_text().strip() |
| if actual != expected: |
| raise ValueError( |
| f"Data fingerprint mismatch:\n" |
| f" expected: {expected}\n" |
| f" actual: {actual}\n" |
| f"Data has been regenerated since this config was pinned. Head-to-head " |
| f"comparison against the parent's eval.md.json would not be valid.", |
| ) |
|
|
|
|
| def build_loaders( |
| data_dir: Path | str, |
| batch_size: int = 32, |
| label_fraction: float = 1.0, |
| seed: int = 42, |
| num_workers: int = 4, |
| ) -> tuple[DataLoader, DataLoader, DataLoader]: |
| """Build train/val/test DataLoaders. |
| |
| Args: |
| data_dir: path to the tokenized synthetic arrays (symlink to parent OK). |
| batch_size: applied to all three loaders. |
| label_fraction: subsample fraction of `train` indices for the |
| label-scarcity sweep (1.0 = full, 0.10 = 10%, 0.01 = 1%). Val and |
| test are never subsampled. |
| seed: RNG seed for the train-subset selection. Same seed as the parent's |
| scarcity protocol so the head-to-head selects the same training |
| subsets across both architectures. |
| num_workers: DataLoader worker count for train. Val/test use half. |
| |
| Returns: |
| (train_loader, val_loader, test_loader) |
| """ |
| token_ids, sequence_labels, ar_targets, splits = load_data_arrays(data_dir) |
|
|
| train_indices = splits["train"] |
| if label_fraction < 1.0: |
| |
| |
| rng = np.random.RandomState(seed) |
| n_keep = max(1, int(len(train_indices) * label_fraction)) |
| train_indices = rng.choice(train_indices, n_keep, replace=False) |
|
|
| train_ds = FinetuneDataset(token_ids, sequence_labels, train_indices, ar_targets) |
| val_ds = FinetuneDataset(token_ids, sequence_labels, splits["val"], ar_targets) |
| test_ds = FinetuneDataset(token_ids, sequence_labels, splits["test"], ar_targets) |
|
|
| train_loader = DataLoader( |
| train_ds, |
| batch_size=batch_size, |
| shuffle=True, |
| num_workers=num_workers, |
| pin_memory=torch.cuda.is_available(), |
| drop_last=True, |
| ) |
| eval_workers = max(0, num_workers // 2) |
| val_loader = DataLoader( |
| val_ds, |
| batch_size=batch_size, |
| shuffle=False, |
| num_workers=eval_workers, |
| pin_memory=torch.cuda.is_available(), |
| ) |
| test_loader = DataLoader( |
| test_ds, |
| batch_size=batch_size, |
| shuffle=False, |
| num_workers=eval_workers, |
| pin_memory=torch.cuda.is_available(), |
| ) |
| return train_loader, val_loader, test_loader |
|
|