Spaces:
Sleeping
Sleeping
| """Data preprocessing: normalization, walk-forward splitting, target computation.""" | |
| import logging | |
| from dataclasses import dataclass | |
| from typing import Iterator | |
| import numpy as np | |
| import pandas as pd | |
| logger = logging.getLogger(__name__) | |
| class WalkForwardSplit: | |
| """A single train/test split from walk-forward validation.""" | |
| train_X: pd.DataFrame | |
| train_y: pd.DataFrame | |
| test_X: pd.DataFrame | |
| test_y: pd.DataFrame | |
| train_end: pd.Timestamp | |
| test_start: pd.Timestamp | |
| test_end: pd.Timestamp | |
| class Preprocessor: | |
| """Handles normalization and walk-forward data splitting.""" | |
| def __init__( | |
| self, | |
| purge_days: int = 5, | |
| test_window_days: int = 20, | |
| min_train_days: int = 504, # ~2 years of trading days | |
| ): | |
| self.purge_days = purge_days | |
| self.test_window_days = test_window_days | |
| self.min_train_days = min_train_days | |
| def normalize_rolling_zscore( | |
| self, df: pd.DataFrame, window: int = 200 | |
| ) -> pd.DataFrame: | |
| """Normalize features using rolling Z-score (no look-ahead bias).""" | |
| roll_mean = df.rolling(window, min_periods=window // 2).mean() | |
| roll_std = df.rolling(window, min_periods=window // 2).std() | |
| normalized = (df - roll_mean) / roll_std.replace(0, np.nan) | |
| return normalized | |
| def walk_forward_splits( | |
| self, | |
| X: pd.DataFrame, | |
| y: pd.DataFrame, | |
| expanding: bool = True, | |
| ) -> Iterator[WalkForwardSplit]: | |
| """Generate walk-forward train/test splits with purge gap. | |
| |--- Train (expanding) ---|-- Purge --|-- Test --| | |
| """ | |
| n = len(X) | |
| if n < self.min_train_days + self.purge_days + self.test_window_days: | |
| logger.warning("Not enough data for walk-forward split") | |
| return | |
| test_start_idx = self.min_train_days + self.purge_days | |
| while test_start_idx + self.test_window_days <= n: | |
| train_end_idx = test_start_idx - self.purge_days | |
| test_end_idx = min(test_start_idx + self.test_window_days, n) | |
| if expanding: | |
| train_start_idx = 0 | |
| else: | |
| train_start_idx = max(0, train_end_idx - self.min_train_days) | |
| split = WalkForwardSplit( | |
| train_X=X.iloc[train_start_idx:train_end_idx], | |
| train_y=y.iloc[train_start_idx:train_end_idx], | |
| test_X=X.iloc[test_start_idx:test_end_idx], | |
| test_y=y.iloc[test_start_idx:test_end_idx], | |
| train_end=X.index[train_end_idx - 1], | |
| test_start=X.index[test_start_idx], | |
| test_end=X.index[test_end_idx - 1], | |
| ) | |
| yield split | |
| test_start_idx += self.test_window_days | |
| def prepare_dataset( | |
| self, | |
| features: pd.DataFrame, | |
| targets: pd.DataFrame, | |
| normalize: bool = True, | |
| ) -> tuple[pd.DataFrame, pd.DataFrame]: | |
| """Clean, align, and optionally normalize features and targets.""" | |
| # Align indices | |
| common_idx = features.index.intersection(targets.index) | |
| X = features.loc[common_idx].copy() | |
| y = targets.loc[common_idx].copy() | |
| # Drop rows where all features are NaN (warmup period) | |
| valid_mask = X.notna().any(axis=1) & y.notna().any(axis=1) | |
| X = X[valid_mask] | |
| y = y[valid_mask] | |
| if normalize: | |
| X = self.normalize_rolling_zscore(X) | |
| # Drop rows where >50% of features are NaN (warmup period) | |
| # Then fill remaining NaN with 0 (features that have longer warmup) | |
| nan_ratio = X.isna().mean(axis=1) | |
| valid_mask = nan_ratio < 0.5 | |
| X = X[valid_mask] | |
| y = y.loc[X.index] | |
| # Fill remaining NaN AFTER normalization to avoid leaking synthetic zeros | |
| X = X.fillna(0) | |
| return X, y | |