"""Data preprocessing: normalization, walk-forward splitting, target computation.""" import logging from dataclasses import dataclass from typing import Iterator import numpy as np import pandas as pd logger = logging.getLogger(__name__) @dataclass class WalkForwardSplit: """A single train/test split from walk-forward validation.""" train_X: pd.DataFrame train_y: pd.DataFrame test_X: pd.DataFrame test_y: pd.DataFrame train_end: pd.Timestamp test_start: pd.Timestamp test_end: pd.Timestamp class Preprocessor: """Handles normalization and walk-forward data splitting.""" def __init__( self, purge_days: int = 5, test_window_days: int = 20, min_train_days: int = 504, # ~2 years of trading days ): self.purge_days = purge_days self.test_window_days = test_window_days self.min_train_days = min_train_days def normalize_rolling_zscore( self, df: pd.DataFrame, window: int = 200 ) -> pd.DataFrame: """Normalize features using rolling Z-score (no look-ahead bias).""" roll_mean = df.rolling(window, min_periods=window // 2).mean() roll_std = df.rolling(window, min_periods=window // 2).std() normalized = (df - roll_mean) / roll_std.replace(0, np.nan) return normalized def walk_forward_splits( self, X: pd.DataFrame, y: pd.DataFrame, expanding: bool = True, ) -> Iterator[WalkForwardSplit]: """Generate walk-forward train/test splits with purge gap. |--- Train (expanding) ---|-- Purge --|-- Test --| """ n = len(X) if n < self.min_train_days + self.purge_days + self.test_window_days: logger.warning("Not enough data for walk-forward split") return test_start_idx = self.min_train_days + self.purge_days while test_start_idx + self.test_window_days <= n: train_end_idx = test_start_idx - self.purge_days test_end_idx = min(test_start_idx + self.test_window_days, n) if expanding: train_start_idx = 0 else: train_start_idx = max(0, train_end_idx - self.min_train_days) split = WalkForwardSplit( train_X=X.iloc[train_start_idx:train_end_idx], train_y=y.iloc[train_start_idx:train_end_idx], test_X=X.iloc[test_start_idx:test_end_idx], test_y=y.iloc[test_start_idx:test_end_idx], train_end=X.index[train_end_idx - 1], test_start=X.index[test_start_idx], test_end=X.index[test_end_idx - 1], ) yield split test_start_idx += self.test_window_days def prepare_dataset( self, features: pd.DataFrame, targets: pd.DataFrame, normalize: bool = True, ) -> tuple[pd.DataFrame, pd.DataFrame]: """Clean, align, and optionally normalize features and targets.""" # Align indices common_idx = features.index.intersection(targets.index) X = features.loc[common_idx].copy() y = targets.loc[common_idx].copy() # Drop rows where all features are NaN (warmup period) valid_mask = X.notna().any(axis=1) & y.notna().any(axis=1) X = X[valid_mask] y = y[valid_mask] if normalize: X = self.normalize_rolling_zscore(X) # Drop rows where >50% of features are NaN (warmup period) # Then fill remaining NaN with 0 (features that have longer warmup) nan_ratio = X.isna().mean(axis=1) valid_mask = nan_ratio < 0.5 X = X[valid_mask] y = y.loc[X.index] # Fill remaining NaN AFTER normalization to avoid leaking synthetic zeros X = X.fillna(0) return X, y