"""Data loader for the encoder subproject. Reuses the parent's `FinetuneDataset` verbatim — the parent's tokenized arrays at `data/synthetic/` are already shaped `(N, 64, 15)`, which is exactly what the per-transaction encoder needs. This module is a thin orchestrator that resolves data paths (via the `encoder/data/synthetic -> ../../data/synthetic` symlink), builds train/val/test loaders, and exposes a fingerprint-verification helper so accidental data regeneration breaks fast. Why we don't define a new Dataset class: the encoder's input contract is identical to the parent's (`(B, 64, 15) int64` plus fraud + amount_range labels). The only thing that changes is what the model does with those tokens. Keeping the Dataset shared guarantees apples-to-apples comparison. """ from __future__ import annotations from pathlib import Path import numpy as np import torch from torch.utils.data import DataLoader from src.training.finetune import FinetuneDataset def load_data_arrays( data_dir: Path | str, ) -> tuple[np.ndarray, np.ndarray, np.ndarray | None, dict[str, np.ndarray]]: """Load raw token arrays + split indices from `data_dir`. Returns: token_ids: (N, 64, 15) int16 sequence_labels: (N,) int8 (fraud) ar_targets: (N,) int8 last-transaction amount_range, or None if file absent splits: dict with keys 'train' / 'val' / 'test', each int64 indices """ data_dir = Path(data_dir) token_ids = np.load(data_dir / "token_ids.npy") sequence_labels = np.load(data_dir / "sequence_labels.npy") splits = dict(np.load(data_dir / "split_indices.npz")) ar_path = data_dir / "amount_range_labels.npy" ar_targets: np.ndarray | None = None if ar_path.exists(): # Parent stores per-transaction amount_range as (N, 64). The head # targets the LAST transaction's amount bucket, so we slice [:, -1]. ar_all = np.load(ar_path) ar_targets = ar_all[:, -1] return token_ids, sequence_labels, ar_targets, splits def verify_fingerprint(data_dir: Path | str, expected: str) -> None: """Raise if data fingerprint differs from `expected`. Catches the silent failure where data has been regenerated under us — in which case head-to-head comparison numbers against the parent's already-published eval.md.json would not be apples-to-apples. """ fp_path = Path(data_dir) / "fingerprint.txt" if not fp_path.exists(): raise FileNotFoundError( f"No fingerprint.txt at {fp_path}. Encoder relies on the parent's " f"data/synthetic/ for head-to-head; regenerate via parent's " f"`python -m scripts.generate` if missing.", ) actual = fp_path.read_text().strip() if actual != expected: raise ValueError( f"Data fingerprint mismatch:\n" f" expected: {expected}\n" f" actual: {actual}\n" f"Data has been regenerated since this config was pinned. Head-to-head " f"comparison against the parent's eval.md.json would not be valid.", ) def build_loaders( data_dir: Path | str, batch_size: int = 32, label_fraction: float = 1.0, seed: int = 42, num_workers: int = 4, ) -> tuple[DataLoader, DataLoader, DataLoader]: """Build train/val/test DataLoaders. Args: data_dir: path to the tokenized synthetic arrays (symlink to parent OK). batch_size: applied to all three loaders. label_fraction: subsample fraction of `train` indices for the label-scarcity sweep (1.0 = full, 0.10 = 10%, 0.01 = 1%). Val and test are never subsampled. seed: RNG seed for the train-subset selection. Same seed as the parent's scarcity protocol so the head-to-head selects the same training subsets across both architectures. num_workers: DataLoader worker count for train. Val/test use half. Returns: (train_loader, val_loader, test_loader) """ token_ids, sequence_labels, ar_targets, splits = load_data_arrays(data_dir) train_indices = splits["train"] if label_fraction < 1.0: # np.random.RandomState (not Generator) to match the parent's # subsampling RNG exactly. Same seed -> identical train subset. rng = np.random.RandomState(seed) n_keep = max(1, int(len(train_indices) * label_fraction)) train_indices = rng.choice(train_indices, n_keep, replace=False) train_ds = FinetuneDataset(token_ids, sequence_labels, train_indices, ar_targets) val_ds = FinetuneDataset(token_ids, sequence_labels, splits["val"], ar_targets) test_ds = FinetuneDataset(token_ids, sequence_labels, splits["test"], ar_targets) train_loader = DataLoader( train_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=torch.cuda.is_available(), drop_last=True, ) eval_workers = max(0, num_workers // 2) val_loader = DataLoader( val_ds, batch_size=batch_size, shuffle=False, num_workers=eval_workers, pin_memory=torch.cuda.is_available(), ) test_loader = DataLoader( test_ds, batch_size=batch_size, shuffle=False, num_workers=eval_workers, pin_memory=torch.cuda.is_available(), ) return train_loader, val_loader, test_loader