File size: 3,884 Bytes
bcceb77
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""Data preprocessing: normalization, walk-forward splitting, target computation."""

import logging
from dataclasses import dataclass
from typing import Iterator

import numpy as np
import pandas as pd

logger = logging.getLogger(__name__)


@dataclass
class WalkForwardSplit:
    """A single train/test split from walk-forward validation."""
    train_X: pd.DataFrame
    train_y: pd.DataFrame
    test_X: pd.DataFrame
    test_y: pd.DataFrame
    train_end: pd.Timestamp
    test_start: pd.Timestamp
    test_end: pd.Timestamp


class Preprocessor:
    """Handles normalization and walk-forward data splitting."""

    def __init__(
        self,
        purge_days: int = 5,
        test_window_days: int = 20,
        min_train_days: int = 504,  # ~2 years of trading days
    ):
        self.purge_days = purge_days
        self.test_window_days = test_window_days
        self.min_train_days = min_train_days

    def normalize_rolling_zscore(
        self, df: pd.DataFrame, window: int = 200
    ) -> pd.DataFrame:
        """Normalize features using rolling Z-score (no look-ahead bias)."""
        roll_mean = df.rolling(window, min_periods=window // 2).mean()
        roll_std = df.rolling(window, min_periods=window // 2).std()
        normalized = (df - roll_mean) / roll_std.replace(0, np.nan)
        return normalized

    def walk_forward_splits(
        self,
        X: pd.DataFrame,
        y: pd.DataFrame,
        expanding: bool = True,
    ) -> Iterator[WalkForwardSplit]:
        """Generate walk-forward train/test splits with purge gap.

        |--- Train (expanding) ---|-- Purge --|-- Test --|
        """
        n = len(X)
        if n < self.min_train_days + self.purge_days + self.test_window_days:
            logger.warning("Not enough data for walk-forward split")
            return

        test_start_idx = self.min_train_days + self.purge_days
        while test_start_idx + self.test_window_days <= n:
            train_end_idx = test_start_idx - self.purge_days
            test_end_idx = min(test_start_idx + self.test_window_days, n)

            if expanding:
                train_start_idx = 0
            else:
                train_start_idx = max(0, train_end_idx - self.min_train_days)

            split = WalkForwardSplit(
                train_X=X.iloc[train_start_idx:train_end_idx],
                train_y=y.iloc[train_start_idx:train_end_idx],
                test_X=X.iloc[test_start_idx:test_end_idx],
                test_y=y.iloc[test_start_idx:test_end_idx],
                train_end=X.index[train_end_idx - 1],
                test_start=X.index[test_start_idx],
                test_end=X.index[test_end_idx - 1],
            )
            yield split

            test_start_idx += self.test_window_days

    def prepare_dataset(
        self,
        features: pd.DataFrame,
        targets: pd.DataFrame,
        normalize: bool = True,
    ) -> tuple[pd.DataFrame, pd.DataFrame]:
        """Clean, align, and optionally normalize features and targets."""
        # Align indices
        common_idx = features.index.intersection(targets.index)
        X = features.loc[common_idx].copy()
        y = targets.loc[common_idx].copy()

        # Drop rows where all features are NaN (warmup period)
        valid_mask = X.notna().any(axis=1) & y.notna().any(axis=1)
        X = X[valid_mask]
        y = y[valid_mask]

        if normalize:
            X = self.normalize_rolling_zscore(X)
            # Drop rows where >50% of features are NaN (warmup period)
            # Then fill remaining NaN with 0 (features that have longer warmup)
            nan_ratio = X.isna().mean(axis=1)
            valid_mask = nan_ratio < 0.5
            X = X[valid_mask]
            y = y.loc[X.index]

        # Fill remaining NaN AFTER normalization to avoid leaking synthetic zeros
        X = X.fillna(0)

        return X, y