Spaces:
Sleeping
Sleeping
File size: 3,884 Bytes
bcceb77 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | """Data preprocessing: normalization, walk-forward splitting, target computation."""
import logging
from dataclasses import dataclass
from typing import Iterator
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
@dataclass
class WalkForwardSplit:
"""A single train/test split from walk-forward validation."""
train_X: pd.DataFrame
train_y: pd.DataFrame
test_X: pd.DataFrame
test_y: pd.DataFrame
train_end: pd.Timestamp
test_start: pd.Timestamp
test_end: pd.Timestamp
class Preprocessor:
"""Handles normalization and walk-forward data splitting."""
def __init__(
self,
purge_days: int = 5,
test_window_days: int = 20,
min_train_days: int = 504, # ~2 years of trading days
):
self.purge_days = purge_days
self.test_window_days = test_window_days
self.min_train_days = min_train_days
def normalize_rolling_zscore(
self, df: pd.DataFrame, window: int = 200
) -> pd.DataFrame:
"""Normalize features using rolling Z-score (no look-ahead bias)."""
roll_mean = df.rolling(window, min_periods=window // 2).mean()
roll_std = df.rolling(window, min_periods=window // 2).std()
normalized = (df - roll_mean) / roll_std.replace(0, np.nan)
return normalized
def walk_forward_splits(
self,
X: pd.DataFrame,
y: pd.DataFrame,
expanding: bool = True,
) -> Iterator[WalkForwardSplit]:
"""Generate walk-forward train/test splits with purge gap.
|--- Train (expanding) ---|-- Purge --|-- Test --|
"""
n = len(X)
if n < self.min_train_days + self.purge_days + self.test_window_days:
logger.warning("Not enough data for walk-forward split")
return
test_start_idx = self.min_train_days + self.purge_days
while test_start_idx + self.test_window_days <= n:
train_end_idx = test_start_idx - self.purge_days
test_end_idx = min(test_start_idx + self.test_window_days, n)
if expanding:
train_start_idx = 0
else:
train_start_idx = max(0, train_end_idx - self.min_train_days)
split = WalkForwardSplit(
train_X=X.iloc[train_start_idx:train_end_idx],
train_y=y.iloc[train_start_idx:train_end_idx],
test_X=X.iloc[test_start_idx:test_end_idx],
test_y=y.iloc[test_start_idx:test_end_idx],
train_end=X.index[train_end_idx - 1],
test_start=X.index[test_start_idx],
test_end=X.index[test_end_idx - 1],
)
yield split
test_start_idx += self.test_window_days
def prepare_dataset(
self,
features: pd.DataFrame,
targets: pd.DataFrame,
normalize: bool = True,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Clean, align, and optionally normalize features and targets."""
# Align indices
common_idx = features.index.intersection(targets.index)
X = features.loc[common_idx].copy()
y = targets.loc[common_idx].copy()
# Drop rows where all features are NaN (warmup period)
valid_mask = X.notna().any(axis=1) & y.notna().any(axis=1)
X = X[valid_mask]
y = y[valid_mask]
if normalize:
X = self.normalize_rolling_zscore(X)
# Drop rows where >50% of features are NaN (warmup period)
# Then fill remaining NaN with 0 (features that have longer warmup)
nan_ratio = X.isna().mean(axis=1)
valid_mask = nan_ratio < 0.5
X = X[valid_mask]
y = y.loc[X.index]
# Fill remaining NaN AFTER normalization to avoid leaking synthetic zeros
X = X.fillna(0)
return X, y
|