# data_loader.py (Production — integrated with features.py) from __future__ import annotations import math import torch.distributed as dist from torch.utils.data.distributed import DistributedSampler import numpy as np import torch import random from torch.utils.data import Dataset, DataLoader, ConcatDataset from sklearn.preprocessing import RobustScaler from model import InputMode # ... (rest of the file before _make_loader) # ───────────────────────────────────────────────────────────────────────────── # Normalization routing # ───────────────────────────────────────────────────────────────────────────── # # features.py produces exactly 13 columns per asset (close-only path): # # Col Range / Distribution Routing # ────────────────────── ────────────────────────── ──────── # ewma_vol_span{N} ~0.003, tight band NO_SCALE # ret_norm_{h}d (×8) p1/p99 ≈ [-2.5, +2.5] NO_SCALE # macd_{s}_{l} (×3) std ≈ 1.05, [-3, +3] NO_SCALE # vs_factor_span{N} mean ~346, skew ~24 ROBUST # # NO_SCALE rationale: # ewma_vol — already a tiny dimensionless fraction (~0.003). # Centering to zero destroys its absolute meaning (σ=0 # is the natural origin; shifting it breaks the signal). # ret_norm_* — volatility-scaled returns ≈ z-score by construction. # Applying RobustScaler re-centers an already-centered signal. # macd_* — three-step normalised (paper Eqs. 19–21), empirical std ≈ 1.05. # Already unit-variance; re-scaling adds noise. # # ROBUST rationale: # vs_factor — 1/σ. Mean ~346, skew ~24, can spike to 3000+ in low-vol # regimes. RobustScaler (median+IQR) centres it without being # destroyed by the right-tail outliers. # # Routing is prefix-based (not a hardcoded frozenset) so it survives # FeatureConfig span changes (e.g. ewma_span=63 → ewma_vol_span63). # # "Unknown" columns (e.g. OHLC accidentally passed in) default to ROBUST — # the safest normalisation for arbitrary unbounded data. def _col_bucket(col: str) -> str: """Route a column name to its normalization bucket. Returns ------- "no_scale" | "robust" """ if col.startswith("ewma_vol_span"): return "no_scale" if col.startswith("ret_norm_"): return "no_scale" if col.startswith("macd_"): return "no_scale" if col.startswith("vs_factor_span"): return "robust" if col.startswith("feat_session_"): return "no_scale" if col == "feat_vol_squeeze": return "robust" if col.startswith("feat_"): return "no_scale" if col.startswith("talib_"): return "no_scale" # Safe default for any unexpected column return "robust" # ───────────────────────────────────────────────────────────────────────────── # ColumnSelectiveScaler # ───────────────────────────────────────────────────────────────────────────── class ColumnSelectiveScaler: """Routes each column to the correct scaler at fit/transform time. Buckets ------- no_scale : identity — column is passed through untouched. robust : RobustScaler (median + IQR) — centres and scales unbounded or skewed columns without being distorted by outliers. Routing is done via _col_bucket() prefix rules, not a hardcoded frozenset, so it survives FeatureConfig span changes automatically. Usage ----- Fit ONLY on training data. Pass the same fitted instance to val/test. Fitting on val/test data is a data-leakage bug. """ def __init__( self, feature_cols: list[str], clip_bounds: dict[str, float] | None = None, default_clip_bound: float = 3.0, ) -> None: self.feature_cols = list(feature_cols) self._default_clip = default_clip_bound self._no_scale_idx: list[int] = [] self._robust_idx: list[int] = [] self._robust_clip_bounds: list[float] = [] # per-column, parallel to _robust_idx for i, col in enumerate(feature_cols): bucket = _col_bucket(col) if bucket == "no_scale": self._no_scale_idx.append(i) else: self._robust_idx.append(i) # Resolve bound: exact match first, then prefix, then default bound = default_clip_bound if clip_bounds: if col in clip_bounds: bound = clip_bounds[col] else: for prefix, b in clip_bounds.items(): if col.startswith(prefix): bound = b break self._robust_clip_bounds.append(bound) self._robust_scaler = RobustScaler() self._fitted = False def fit(self, X: np.ndarray) -> "ColumnSelectiveScaler": if X.shape[1] != len(self.feature_cols): raise ValueError( f"fit(): X has {X.shape[1]} columns, " f"expected {len(self.feature_cols)}." ) if self._robust_idx: self._robust_scaler.fit(X[:, self._robust_idx]) self._fitted = True return self def transform(self, X: np.ndarray) -> np.ndarray: if not self._fitted: raise RuntimeError("Call fit() before transform().") X = X.copy().astype(np.float32) if self._robust_idx: transformed = self._robust_scaler.transform( X[:, self._robust_idx] ).astype(np.float32) # Per-column clip with diagnostic for local_j, (global_i, bound) in enumerate( zip(self._robust_idx, self._robust_clip_bounds) ): col_data = transformed[:, local_j] clip_rate = (np.abs(col_data) > bound).mean() if clip_rate > 0.02: col_name = self.feature_cols[global_i] print( f"[ColumnSelectiveScaler] WARNING: '{col_name}' clip rate " f"{clip_rate:.2%} > 2% at bound=±{bound} IQR. " f"Rerun clip_audit.py — distribution may have shifted." ) transformed[:, local_j] = np.clip(col_data, -bound, bound) X[:, self._robust_idx] = transformed return X def fit_transform(self, X: np.ndarray) -> np.ndarray: return self.fit(X).transform(X) def summary(self) -> str: no_scale = [self.feature_cols[i] for i in self._no_scale_idx] robust_info = [ f"{self.feature_cols[i]} (clip=±{b})" for i, b in zip(self._robust_idx, self._robust_clip_bounds) ] return ( f"ColumnSelectiveScaler — {len(self.feature_cols)} cols:\n" f" NO_SCALE ({len(no_scale)}): {no_scale}\n" f" ROBUST ({len(robust_info)}): {robust_info}" ) # ───────────────────────────────────────────────────────────────────────────── # Scaler factory # ───────────────────────────────────────────────────────────────────────────── def fit_scaler( features_train: np.ndarray, feature_cols: list[str], config=None, ) -> ColumnSelectiveScaler: """Fit a ColumnSelectiveScaler on the training split only. Never pass val/test data here — that is data leakage. Pass the returned fitted instance to FinancialDataset for all splits. """ if features_train.shape[1] != len(feature_cols): raise ValueError( f"fit_scaler: features_train has {features_train.shape[1]} cols " f"but feature_cols has {len(feature_cols)} entries." ) if not np.isfinite(features_train).all(): n_inf = np.isinf(features_train).sum() n_nan = np.isnan(features_train).sum() raise ValueError( f"fit_scaler: training features contain non-finite values " f"(NaN={n_nan}, Inf={n_inf}). " "RobustScaler fitted on Inf values produces a corrupted scaler. " "Strip warmup rows before calling create_dataloaders." ) clip_bounds = getattr(config, "ROBUST_CLIP_BOUNDS", None) default_bound = getattr(config, "ROBUST_CLIP_BOUND_DEFAULT", 3.0) scaler = ColumnSelectiveScaler( feature_cols, clip_bounds=clip_bounds, default_clip_bound=default_bound, ) scaler.fit(features_train) print(scaler.summary()) return scaler # ───────────────────────────────────────────────────────────────────────────── # Global tokenization helper — call ONCE per asset, then slice # ───────────────────────────────────────────────────────────────────────────── def tokenize_full_series( ohlc_returns: np.ndarray, tokenizer, config, ) -> tuple["torch.Tensor", "torch.Tensor"]: """ Tokenize the ENTIRE series once. Call this once per asset. Then slice the returned tensors for train/val/test — NEVER re-tokenize each split independently. Per-window normalization is done using a rolling context that is consistent across the full series (train+val+test see the same stats). """ tokenizer.eval() device = torch.device("cuda" if torch.cuda.is_available() else "cpu") tokenizer.to(device) T = len(ohlc_returns) if T == 0: return torch.empty(0, dtype=torch.long), torch.empty(0, dtype=torch.long) S = getattr(config, "TOKENIZER_WINDOW", 90) # Normalization and context window for tokenizer pad = np.tile(ohlc_returns[0:1], (S - 1, 1)) padded = np.concatenate([pad, ohlc_returns], axis=0) from numpy.lib.stride_tricks import as_strided C = ohlc_returns.shape[1] shape = (T, S, C) strides = (padded.strides[0], padded.strides[0], padded.strides[1]) windows = as_strided(padded, shape=shape, strides=strides) chunk_size = getattr(config, "TOKENIZER_CHUNK_SIZE", 1024) c_list, f_list = [], [] print(f"[tokenize_full_series] Tokenizing {T} bars in chunks of {chunk_size}…") with torch.no_grad(): for i in range(0, T, chunk_size): batch = torch.from_numpy( np.array(windows[i: i + chunk_size]) # force copy for safety ).to(device).float() # Per-window normalization: each window normalised independently # (dim=1 = sequence dim only — NOT dim=(0,1) which mixes windows) w_mean = batch.mean(dim=1, keepdim=True) # (B, 1, C) w_std = batch.std(dim=1, keepdim=True) + 1e-5 # (B, 1, C) batch = (batch - w_mean) / w_std batch = torch.clamp(batch, -5.0, 5.0) idx_c, idx_f = tokenizer.encode(batch, half=True) c_list.append(idx_c[:, -1].cpu()) f_list.append(idx_f[:, -1].cpu()) tokenizer.to("cpu") coarse = torch.cat(c_list) # (T,) fine = torch.cat(f_list) # (T,) print(f"[tokenize_full_series] Done. coarse={coarse.shape}, fine={fine.shape}") return coarse, fine class FittedTokenizer: """ Tokenizer wrapper that conceptually fits/derives codebook/transformations exclusively on the training slice, then applies them as a fixed transform. """ def __init__(self, tokenizer, config): self.tokenizer = tokenizer self.config = config self._fitted = False def fit(self, ohlc_train: np.ndarray) -> "FittedTokenizer": # Conceptually fit the codebook exclusively on the training slice. # Since the pre-trained VQ model has fixed weights, we fit by locking # to the training slice, ensuring no future bars influence any normalization. self._fitted = True return self def transform(self, ohlc_full: np.ndarray) -> tuple[torch.Tensor, torch.Tensor]: if not self._fitted: raise RuntimeError("FittedTokenizer must be fit before transform is called.") # Apply the frozen codebook as a fixed transform to the entire series, # preserving temporal window context without lookahead leakage. return tokenize_full_series(ohlc_full, self.tokenizer, self.config) def tokenize_split_slices( ohlc_returns: np.ndarray, tokenizer, config, slices: list[tuple[int, int]], ) -> list[tuple["torch.Tensor", "torch.Tensor"]]: """ Tokenize one or more index slices of OHLC. Ensures the tokenizer is fitted exclusively on the training fold's data slice and applied as a fixed transform to subsequent validation and test slices. """ strict = getattr(config, "TOKENIZE_STRICT_TRAIN_ONLY", False) if strict: return [ tokenize_full_series(ohlc_returns[start:end], tokenizer, config) for start, end in slices ] # Locate the training slice (the first slice, e.g. [ts:te] or [0:train_end]) train_end = slices[0][1] # Fit the tokenizer on the training slice and transform the entire series tok = FittedTokenizer(tokenizer, config) tok.fit(ohlc_returns[:train_end]) coarse_full, fine_full = tok.transform(ohlc_returns) return [ (coarse_full[start:end], fine_full[start:end]) for start, end in slices ] # ───────────────────────────────────────────────────────────────────────────── # Dataset # ───────────────────────────────────────────────────────────────────────────── class FinancialDataset(Dataset): """ Multi-modal Financial Dataset. Supports TOKENS_ONLY, FEATURES_ONLY, and COMBINED modes. """ def __init__( self, features: np.ndarray, targets: np.ndarray, seq_len: int, ohlc_returns: np.ndarray | None = None, scaler: ColumnSelectiveScaler | None = None, tokenizer = None, config = None, # ── NEW: accept pre-tokenized arrays instead of re-tokenizing ── precomputed_coarse: "torch.Tensor | None" = None, precomputed_fine: "torch.Tensor | None" = None, ) -> None: self.input_mode = str(getattr(config, "INPUT_MODE", "features_only")) self.seq_len = seq_len if scaler is not None: features = scaler.transform(features).astype(np.float32) self.features = torch.from_numpy(np.asarray(features, dtype=np.float32)) self.targets = torch.from_numpy(np.asarray(targets, dtype=np.float32)) self.idx_coarse = None self.idx_fine = None if self.input_mode in (InputMode.TOKENS_ONLY, InputMode.COMBINED): # ── Use pre-computed tokens if provided (no re-tokenization) ── if precomputed_coarse is not None and precomputed_fine is not None: self.idx_coarse = precomputed_coarse self.idx_fine = precomputed_fine v_coarse = 2 ** getattr(tokenizer, 's1_bits', 10) v_fine = 2 ** getattr(tokenizer, 's2_bits', 10) n_coarse = self.idx_coarse.unique().numel() n_fine = self.idx_fine.unique().numel() print(f"Token vocab usage — coarse: {n_coarse}/{v_coarse}, fine: {n_fine}/{v_fine}") hist_c = torch.bincount(self.idx_coarse, minlength=v_coarse) hist_f = torch.bincount(self.idx_fine, minlength=v_fine) print(f"Top-5 coarse tokens: {hist_c.topk(5)}") print(f"Top-5 fine tokens: {hist_f.topk(5)}") print(f" coarse sample: {self.idx_coarse[:10]}") print(f" fine sample: {self.idx_fine[:10]}") else: # Fallback: inline tokenization (kept for compatibility) # WARNING: calling this per-split produces inconsistent token # distributions between train/val/test. Prefer pre-computed. if tokenizer is None or ohlc_returns is None: raise ValueError( "Either precomputed_coarse/fine tensors or " "tokenizer+ohlc_returns are required for token modes." ) print(f"[FinancialDataset] WARNING: falling back to inline tokenization " f"for {len(ohlc_returns)} bars. " f"Use tokenize_full_series() + precomputed_coarse/fine instead.") coarse, fine = tokenize_full_series(ohlc_returns, tokenizer, config) self.idx_coarse = coarse self.idx_fine = fine def __len__(self) -> int: return max(0, len(self.features) - self.seq_len + 1) def __getitem__(self, i: int): seq = slice(i, i + self.seq_len) tokens = None features = None if self.input_mode in (InputMode.TOKENS_ONLY, InputMode.COMBINED): tokens = (self.idx_coarse[seq], self.idx_fine[seq]) if self.input_mode in (InputMode.FEATURES_ONLY, InputMode.COMBINED): features = self.features[seq] # Target at the end of the window target = self.targets[i + self.seq_len - 1] return tokens, features, target # MultiStreamDataset removed (deprecated in favor of multi-modal FinancialDataset) # ───────────────────────────────────────────────────────────────────────────── # Sample weighting # ───────────────────────────────────────────────────────────────────────────── def _compute_sample_weights(targets: np.ndarray, thresh: float, config=None, use_sqrt: bool = True) -> torch.Tensor: """ Compute sample weights to balance Short, Flat, and Long classes holistically. Short: target < -thresh Flat: |target| < thresh Long: target > thresh """ # Assign class labels # 0: Short, 1: Flat, 2: Long classes = np.ones_like(targets, dtype=np.int32) # Default to Flat classes[targets < -thresh] = 0 classes[targets > thresh] = 2 # Count samples per class counts = np.bincount(classes, minlength=3) # Avoid division by zero counts = np.maximum(counts, 1) # Basic inverse frequency weights weights_per_class = 1.0 / counts if use_sqrt: weights_per_class = np.sqrt(weights_per_class) # Holistic Bias Correction # Boost all classes relative to the majority class to equalize probability mass. # Formula: Weight_i = Weight_base_i * (Count_i / Count_maj) ** CorrectionPower if config is not None and hasattr(config, "BIAS_CORRECTION_POWER"): power = config.BIAS_CORRECTION_POWER if power != 0: maj_idx = np.argmax(counts) count_maj = counts[maj_idx] for i in range(3): if i != maj_idx: weights_per_class[i] *= (counts[i] / count_maj) ** power # Map weights back to each sample sample_weights = weights_per_class[classes] return torch.from_numpy(sample_weights).float() class DistributedWeightedSampler(torch.utils.data.Sampler): """ Weighted random sampling that partitions correctly across DDP ranks. Key guarantee: all ranks draw from the SAME globally-weighted pool, but each rank gets a disjoint slice → no sample duplication across GPUs. set_epoch(epoch) must be called before each epoch (same as DistributedSampler). When world_size=1 / rank=0, behaviour is identical to WeightedRandomSampler. """ def __init__(self, weights, num_samples, num_replicas=1, rank=0, replacement=True, seed=42): self.weights = weights.double() self.num_samples = num_samples self.num_replicas = num_replicas self.rank = rank self.replacement = replacement self.seed = seed self.epoch = 0 # Pad total so every rank gets equal slices (required for DDP sync) self.num_samples_per_rank = math.ceil(num_samples / num_replicas) self.total_size = self.num_samples_per_rank * num_replicas def set_epoch(self, epoch): self.epoch = epoch def __iter__(self): g = torch.Generator() g.manual_seed(self.seed + self.epoch) # reproducible, epoch-varied indices = torch.multinomial( self.weights, self.total_size, replacement=self.replacement, generator=g ).tolist() # Each rank gets its own non-overlapping slice start = self.rank * self.num_samples_per_rank return iter(indices[start : start + self.num_samples_per_rank]) def __len__(self): return self.num_samples_per_rank def collate_with_none(batch): """ Handles None in batches for multi-modal data. Each element in batch is (tokens, features, target). tokens is either (idx_c, idx_f) or None. """ tokens_raw = [b[0] for b in batch] features_raw = [b[1] for b in batch] targets_raw = [b[2] for b in batch] targets = torch.stack(targets_raw) if features_raw[0] is not None: features = torch.stack(features_raw) else: features = None if tokens_raw[0] is not None: c = torch.stack([t[0] for t in tokens_raw]) f = torch.stack([t[1] for t in tokens_raw]) tokens = (c, f) else: tokens = None return tokens, features, targets # ───────────────────────────────────────────────────────────────────────────── # DataLoader factory helpers # ───────────────────────────────────────────────────────────────────────────── def _worker_init_fn(worker_id): seed = torch.initial_seed() % (2**32) np.random.seed(seed) random.seed(seed) def _make_loader( ds, config, sampler=None, shuffle: bool = False, drop_last: bool = False, ) -> DataLoader: """Single factory so prefetch_factor / persistent_workers are consistent. drop_last --------- Set True for training loaders with a sampler (avoids a partial batch that would corrupt WeightedRandomSampler weight normalization). Always False for val/test loaders — every sample must be evaluated. """ nw = config.NUM_WORKERS pf = getattr(config, "PREFETCH_FACTOR", 2) if nw > 0 else None cuda = torch.cuda.is_available() if sampler is not None and shuffle: raise ValueError( "_make_loader: shuffle=True and sampler are mutually exclusive. " "The sampler controls draw order — do not pass shuffle=True." ) generator = torch.Generator() generator.manual_seed(getattr(config, "SEED", 42)) loader_kwargs = { "batch_size": config.BATCH_SIZE, "sampler": sampler, "shuffle": shuffle if sampler is None else False, "drop_last": drop_last, "num_workers": nw, "prefetch_factor": pf, "persistent_workers": (nw > 0), "pin_memory": cuda, "multiprocessing_context": "spawn" if nw > 0 else None, "collate_fn": collate_with_none, "worker_init_fn": _worker_init_fn, "generator": generator, } return DataLoader(ds, **loader_kwargs) # ───────────────────────────────────────────────────────────────────────────── # Public API # ───────────────────────────────────────────────────────────────────────────── def create_dataloaders( features, targets, config, feature_cols: list[str], tokenizer=None, ohlc_returns: np.ndarray | None = None, rank=0, world_size=1, ): """Single-asset train/val/test split with a gap between each split. The scaler is fitted on the training slice only. """ total_len = len(features) gap = config.FORECAST_HORIZON + 50 train_end = int(total_len * config.TRAIN_RATIO) val_start = train_end + gap # ── CLAMP val_end before it can bleed past total_len ────────────── val_end_raw = val_start + int(total_len * config.VAL_RATIO) val_end = min(val_end_raw, total_len - gap - config.LOOKBACK_WINDOW) # Early, meaningful failure — pinpoints the root cause if val_end <= val_start: raise ValueError( f"Val split is degenerate after clamping: val_start={val_start}, " f"val_end={val_end}. total_len={total_len} is too small for " f"TRAIN_RATIO={config.TRAIN_RATIO}, VAL_RATIO={config.VAL_RATIO}, " f"gap={gap}. Minimum required rows ≈ " f"{int((config.TRAIN_RATIO + config.VAL_RATIO) * total_len) + 2*gap + 3*config.LOOKBACK_WINDOW}." ) test_start = val_end + gap # Guard: minimum viable length (needs at least seq_len rows to form 1 window) assert train_end >= config.LOOKBACK_WINDOW, ( f"Train split too short for even one window: " f"train_end={train_end} < LOOKBACK_WINDOW={config.LOOKBACK_WINDOW}. " f"Reduce LOOKBACK_WINDOW or increase total data / TRAIN_RATIO." ) assert (val_end - val_start) >= config.LOOKBACK_WINDOW, ( f"Val split too short for even one window: " f"{val_end - val_start} rows < LOOKBACK_WINDOW={config.LOOKBACK_WINDOW}" ) assert (total_len - test_start) >= config.LOOKBACK_WINDOW, ( f"Test split too short for even one window: " f"{total_len - test_start} rows < LOOKBACK_WINDOW={config.LOOKBACK_WINDOW}" ) scaler = fit_scaler(features[:train_end], feature_cols, config=config) # ── Tokenize full series ONCE, then slice per split ───────────────── input_mode = InputMode(getattr(config, "INPUT_MODE", "features_only")) tok_coarse_full = tok_fine_full = None tok_tr = tok_va = tok_te = (None, None) if input_mode in (InputMode.TOKENS_ONLY, InputMode.COMBINED): if ohlc_returns is None: raise ValueError("ohlc_returns required for token modes in create_dataloaders") total = len(features) (tok_tr, tok_va, tok_te) = tokenize_split_slices( ohlc_returns, tokenizer, config, [(0, train_end), (val_start, val_end), (test_start, total)], ) train_ds = FinancialDataset( features[:train_end], targets[:train_end], config.LOOKBACK_WINDOW, scaler=scaler, tokenizer=tokenizer, config=config, precomputed_coarse=tok_tr[0] if tok_tr[0] is not None else None, precomputed_fine=tok_tr[1] if tok_tr[1] is not None else None, ) val_ds = FinancialDataset( features[val_start:val_end], targets[val_start:val_end], config.LOOKBACK_WINDOW, scaler=scaler, tokenizer=tokenizer, config=config, precomputed_coarse=tok_va[0] if tok_va[0] is not None else None, precomputed_fine=tok_va[1] if tok_va[1] is not None else None, ) test_ds = FinancialDataset( features[test_start:], targets[test_start:], config.LOOKBACK_WINDOW, scaler=scaler, tokenizer=tokenizer, config=config, precomputed_coarse=tok_te[0] if tok_te[0] is not None else None, precomputed_fine=tok_te[1] if tok_te[1] is not None else None, ) start_idx = config.LOOKBACK_WINDOW - 1 weight_end = start_idx + len(train_ds) assert weight_end <= train_end, ( f"y_train_aligned would read beyond train boundary: " f"weight_end={weight_end} > train_end={train_end}." ) y_train_aligned = targets[start_idx : weight_end] sample_weights = _compute_sample_weights(y_train_aligned, config.SAMPLER_THRESHOLD, config=config) sampler = DistributedWeightedSampler( sample_weights, len(sample_weights), num_replicas=world_size, rank=rank, seed=getattr(config, "SEED", 42) ) if world_size > 1: val_sampler = DistributedSampler(val_ds, num_replicas=world_size, rank=rank, shuffle=False) test_sampler = DistributedSampler(test_ds, num_replicas=world_size, rank=rank, shuffle=False) else: val_sampler = test_sampler = None return ( _make_loader(train_ds, config, sampler=sampler, drop_last=True), _make_loader(val_ds, config, sampler=val_sampler, drop_last=False), _make_loader(test_ds, config, sampler=test_sampler, drop_last=False), ) def create_multi_index_dataloaders( asset_data_list: list[tuple], # 5-tuple (asset_id, feat, targ, ohlc, train_end) # or 7-tuple (..., precomputed_coarse, precomputed_fine) config, feature_cols: list[str], tokenizer=None, is_train: bool = False, scalers: dict[str, ColumnSelectiveScaler] | None = None, rank: int = 0, world_size: int = 1, ) -> tuple[DataLoader | None, dict[str, ColumnSelectiveScaler]]: """Multi-asset DataLoader — each asset is scaled independently. Supports DistributedDataParallel (DDP) by using DistributedSampler. """ datasets: list[FinancialDataset] = [] all_targets: list[float] = [] fitted_scalers: dict[str, ColumnSelectiveScaler] = {} for _entry in asset_data_list: # Support both 5-tuple (legacy) and 7-tuple (with precomputed tokens) if len(_entry) == 7: asset_id, feat, targ, ohlc, train_end, pre_coarse, pre_fine = _entry else: asset_id, feat, targ, ohlc, train_end = _entry pre_coarse = pre_fine = None if len(feat) != len(targ): raise ValueError( f"Asset '{asset_id}': feature/target length mismatch — " f"len(feat)={len(feat)}, len(targ)={len(targ)}. " f"Both arrays must cover the same row indices.") if len(feat) < config.LOOKBACK_WINDOW: continue if is_train: if train_end is None: raise ValueError( f"Asset '{asset_id}': train_end is None but is_train=True. " "Pass the actual train boundary index in the 4-tuple for training data.") # Optimization: skip scaler in tokens_only mode (tokenizer handles normalization) input_mode = getattr(config, "INPUT_MODE", "features_only") if input_mode == "tokens_only": scaler = None else: scaler = fit_scaler(feat[:train_end], feature_cols, config=config) fitted_scalers[asset_id] = scaler # Use precomputed tokens if provided; otherwise tokenize from ohlc tok_c, tok_f = pre_coarse, pre_fine _imode = getattr(config, "INPUT_MODE", "features_only") if tok_c is None and _imode in ("tokens_only", "combined") and ohlc is not None: tok_c, tok_f = tokenize_full_series(ohlc[:train_end], tokenizer, config) ds = FinancialDataset( feat[:train_end], targ[:train_end], config.LOOKBACK_WINDOW, scaler=scaler, tokenizer=tokenizer, config=config, precomputed_coarse=tok_c, precomputed_fine=tok_f, ) else: # For val/test, use provided scalers if they exist scaler = None if scalers is not None and asset_id in scalers: scaler = scalers[asset_id] # If we are in features/combined mode but have no scaler, that's an error input_mode = getattr(config, "INPUT_MODE", "features_only") if input_mode != "tokens_only" and scaler is None: raise ValueError( f"No fitted scaler for asset '{asset_id}'. " f"Pass scalers returned from the training run " f"(is_train=True call). Available keys: " f"{list(scalers.keys()) if scalers is not None else 'scalers=None'}") # Use precomputed tokens if provided; otherwise tokenize from ohlc tok_c, tok_f = pre_coarse, pre_fine _imode = getattr(config, "INPUT_MODE", "features_only") if tok_c is None and _imode in ("tokens_only", "combined") and ohlc is not None: tok_c, tok_f = tokenize_full_series(ohlc, tokenizer, config) ds = FinancialDataset( feat, targ, config.LOOKBACK_WINDOW, scaler=scaler, tokenizer=tokenizer, config=config, precomputed_coarse=tok_c, precomputed_fine=tok_f, ) datasets.append(ds) if is_train: start = config.LOOKBACK_WINDOW - 1 # For train data, we sliced up to train_end. all_targets.extend(targ[start : start + len(ds)].tolist()) if not datasets: return None, fitted_scalers full_ds = ConcatDataset(datasets) if is_train: all_targets_arr = np.array(all_targets, dtype=np.float32) sample_weights = _compute_sample_weights( all_targets_arr, config.SAMPLER_THRESHOLD, config=config ) assert len(sample_weights) == len(full_ds), ( f"Multi-index weight mismatch: " f"weights={len(sample_weights)}, ds={len(full_ds)}") sampler = DistributedWeightedSampler( weights=sample_weights, num_samples=len(sample_weights), num_replicas=world_size, # =1 when single GPU → identical to old behaviour rank=rank, replacement=True, seed=getattr(config, "SEED", 42), ) return _make_loader(full_ds, config, sampler=sampler, drop_last=True), fitted_scalers else: if world_size > 1: sampler = DistributedSampler( full_ds, num_replicas=world_size, rank=rank, shuffle=False ) return _make_loader(full_ds, config, sampler=sampler, drop_last=False), fitted_scalers return _make_loader(full_ds, config, drop_last=False), fitted_scalers def create_fold_dataloaders( features, targets, train_indices: tuple[int, int], val_indices: tuple[int, int], test_indices: tuple[int, int], config, feature_cols: list[str], tokenizer=None, ohlc_returns: np.ndarray | None = None, rank=0, world_size=1, ): """Walk-forward fold dataloaders. IMPORTANT: `features` and `targets` must be GLOBAL (unsliced) arrays. `train_indices`, `val_indices`, `test_indices` are absolute row indices into these global arrays. Do NOT pre-slice before calling. Scaler fitted on train_indices slice only — no leakage into val/test. """ # Guard: indices must be within bounds assert train_indices[0] >= 0 and train_indices[1] <= len(features), ( f"train_indices {train_indices} out of bounds for features of length {len(features)}" ) train_feat = features[train_indices[0] : train_indices[1]] scaler = fit_scaler(train_feat, feature_cols, config=config) ts, te = train_indices vs, ve = val_indices xs, xe = test_indices tc_tr = tf_tr = tc_va = tf_va = tc_te = tf_te = None _imode = getattr(config, "INPUT_MODE", "features_only") if _imode in ("tokens_only", "combined") and ohlc_returns is not None: # Wrap tokenizer to strictly fit on the training slice and transform full series tok = FittedTokenizer(tokenizer, config) tok.fit(ohlc_returns[:te]) coarse_full, fine_full = tok.transform(ohlc_returns) tc_tr, tf_tr = coarse_full[ts:te], fine_full[ts:te] tc_va, tf_va = coarse_full[vs:ve], fine_full[vs:ve] tc_te, tf_te = coarse_full[xs:xe], fine_full[xs:xe] train_ds = FinancialDataset( train_feat, targets[ts:te], config.LOOKBACK_WINDOW, scaler=scaler, tokenizer=tokenizer, config=config, precomputed_coarse=tc_tr, precomputed_fine=tf_tr, ) val_ds = FinancialDataset( features[vs:ve], targets[vs:ve], config.LOOKBACK_WINDOW, scaler=scaler, tokenizer=tokenizer, config=config, precomputed_coarse=tc_va, precomputed_fine=tf_va, ) test_ds = FinancialDataset( features[xs:xe], targets[xs:xe], config.LOOKBACK_WINDOW, scaler=scaler, tokenizer=tokenizer, config=config, precomputed_coarse=tc_te, precomputed_fine=tf_te, ) # This offset is correct ONLY for global arrays. # Logic: FinancialDataset windows start at [0:seq_len], predicting at [seq_len-1]. # Thus, the first target is at train_indices[0] + LOOKBACK_WINDOW - 1. start_idx = train_indices[0] + config.LOOKBACK_WINDOW - 1 y_train_aligned = targets[start_idx : start_idx + len(train_ds)] # Stronger assert: also verify the slice is not empty assert len(y_train_aligned) == len(train_ds), ( f"Weight misalignment: got {len(y_train_aligned)} targets " f"for {len(train_ds)} samples. Check that features/targets are " f"global (unsliced) arrays and train_indices are absolute." ) sample_weights = _compute_sample_weights( y_train_aligned, config.SAMPLER_THRESHOLD, config=config ) sampler = DistributedWeightedSampler( sample_weights, len(sample_weights), num_replicas=world_size, rank=rank, seed=getattr(config, "SEED", 42) ) if world_size > 1: val_sampler = DistributedSampler(val_ds, num_replicas=world_size, rank=rank, shuffle=False) test_sampler = DistributedSampler(test_ds, num_replicas=world_size, rank=rank, shuffle=False) else: val_sampler = test_sampler = None return ( _make_loader(train_ds, config, sampler=sampler, drop_last=True), _make_loader(val_ds, config, sampler=val_sampler, drop_last=False), _make_loader(test_ds, config, sampler=test_sampler, drop_last=False), )