Premchan369 commited on
Commit
c5a733a
·
verified ·
1 Parent(s): 172c3ca

Add walk-forward validation engine with purged CV and combinatorial CPCV

Browse files
Files changed (1) hide show
  1. walk_forward_validation.py +432 -0
walk_forward_validation.py ADDED
@@ -0,0 +1,432 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Walk-Forward Validation Engine - The #1 Most Critical Missing Piece
2
+
3
+ Walk-forward validation is the ONLY correct way to test time series strategies.
4
+ Random train/test split = GUARANTEED data leakage and false results.
5
+
6
+ Based on: Ong & Herremans 2023 (MTL-TSMOM), Lopez de Prado 2018
7
+ """
8
+ import numpy as np
9
+ import pandas as pd
10
+ from typing import Dict, List, Tuple, Optional, Callable, Iterator
11
+ from dataclasses import dataclass
12
+ import warnings
13
+ warnings.filterwarnings('ignore')
14
+
15
+
16
+ @dataclass
17
+ class WalkForwardConfig:
18
+ """Configuration for walk-forward validation"""
19
+ min_train_size: int = 252 # Minimum training days (1 year)
20
+ test_size: int = 63 # Test window (3 months)
21
+ step_size: int = 21 # Step forward (1 month)
22
+ embargo_gap: int = 5 # Days between train and test (prevents leakage)
23
+ purge_k: int = 0 # Purge k overlapping observations
24
+ n_splits: Optional[int] = None # Number of splits (auto-calculated if None)
25
+
26
+
27
+ class PurgedKFoldCV:
28
+ """
29
+ Purged K-Fold Cross-Validation for Time Series.
30
+
31
+ Based on Marcos Lopez de Prado (2018) "Advances in Financial Machine Learning".
32
+
33
+ Key idea: When you train up to date T and test starting at T+1,
34
+ observations NEAR T may still leak information because they're autocorrelated.
35
+ We "purge" (remove) observations within `purge_k` of the boundary.
36
+
37
+ This prevents the #1 error in quant backtesting: data leakage.
38
+ """
39
+
40
+ def __init__(self, n_splits: int = 5, purge_k: int = 10):
41
+ self.n_splits = n_splits
42
+ self.purge_k = purge_k
43
+
44
+ def split(self, X: np.ndarray, y: Optional[np.ndarray] = None,
45
+ groups: Optional[np.ndarray] = None) -> Iterator[Tuple[np.ndarray, np.ndarray]]:
46
+ """Generate train/test indices with purging"""
47
+ n_samples = len(X)
48
+ fold_size = n_samples // self.n_splits
49
+
50
+ for i in range(self.n_splits):
51
+ # Test indices
52
+ test_start = i * fold_size
53
+ test_end = min((i + 1) * fold_size, n_samples)
54
+ test_indices = np.arange(test_start, test_end)
55
+
56
+ # Train indices: everything before test, with purge gap
57
+ train_end = max(0, test_start - self.purge_k)
58
+ train_indices = np.arange(0, train_end)
59
+
60
+ # Also exclude overlapping test observations from previous folds
61
+ if i > 0:
62
+ # Add a gap after previous test set
63
+ prev_test_end = i * fold_size
64
+ train_indices = train_indices[train_indices < (prev_test_end - self.purge_k)]
65
+
66
+ yield train_indices, test_indices
67
+
68
+
69
+ class ExpandingWindowWalkForward:
70
+ """
71
+ Expanding window walk-forward with embargo gap.
72
+
73
+ This is the standard for financial backtests:
74
+ - Train on [0, T]
75
+ - Embargo gap: [T+1, T+gap] (no overlap, no leakage)
76
+ - Test on [T+gap+1, T+gap+test_size]
77
+ - Next fold: Train on [0, T+step], test on [T+step+gap+1, T+step+gap+test_size]
78
+
79
+ The training set GROWS over time (expanding window), simulating how
80
+ you would actually trade: you start with less data, gain more over time.
81
+ """
82
+
83
+ def __init__(self, config: Optional[WalkForwardConfig] = None):
84
+ self.config = config or WalkForwardConfig()
85
+
86
+ def split(self, X: np.ndarray, y: Optional[np.ndarray] = None,
87
+ groups: Optional[np.ndarray] = None) -> Iterator[Tuple[np.ndarray, np.ndarray]]:
88
+ """Generate expanding window train/test splits"""
89
+ n_samples = len(X)
90
+ cfg = self.config
91
+
92
+ # Calculate number of splits
93
+ if cfg.n_splits is not None:
94
+ n_splits = cfg.n_splits
95
+ else:
96
+ # Calculate based on data size
97
+ available = n_samples - cfg.min_train_size - cfg.embargo_gap - cfg.test_size
98
+ n_splits = max(1, available // cfg.step_size)
99
+
100
+ for i in range(n_splits):
101
+ # Expanding train window
102
+ train_end = cfg.min_train_size + i * cfg.step_size
103
+
104
+ if train_end >= n_samples - cfg.embargo_gap - cfg.test_size:
105
+ break
106
+
107
+ train_indices = np.arange(0, train_end)
108
+
109
+ # Embargo gap (prevents leakage)
110
+ test_start = train_end + cfg.embargo_gap
111
+ test_end = min(test_start + cfg.test_size, n_samples)
112
+ test_indices = np.arange(test_start, test_end)
113
+
114
+ if len(test_indices) < 10:
115
+ break
116
+
117
+ yield train_indices, test_indices
118
+
119
+ def get_n_splits(self, X: np.ndarray, y=None, groups=None) -> int:
120
+ """Get number of splits"""
121
+ count = 0
122
+ for _ in self.split(X):
123
+ count += 1
124
+ return count
125
+
126
+
127
+ class SlidingWindowWalkForward:
128
+ """
129
+ Sliding window walk-forward (fixed-size training window).
130
+
131
+ Unlike expanding window, the training set size stays constant.
132
+ Old data drops off as new data comes in.
133
+
134
+ Better for: Regime-changing markets where old data becomes irrelevant.
135
+ Worse for: Early periods with limited data.
136
+ """
137
+
138
+ def __init__(self, train_size: int = 504, test_size: int = 63,
139
+ step_size: int = 21, embargo_gap: int = 5):
140
+ self.train_size = train_size
141
+ self.test_size = test_size
142
+ self.step_size = step_size
143
+ self.embargo_gap = embargo_gap
144
+
145
+ def split(self, X: np.ndarray, y: Optional[np.ndarray] = None,
146
+ groups: Optional[np.ndarray] = None) -> Iterator[Tuple[np.ndarray, np.ndarray]]:
147
+ """Generate sliding window train/test splits"""
148
+ n_samples = len(X)
149
+
150
+ start = self.train_size
151
+ while start + self.embargo_gap + self.test_size <= n_samples:
152
+ train_start = start - self.train_size
153
+ train_end = start
154
+ train_indices = np.arange(train_start, train_end)
155
+
156
+ test_start = train_end + self.embargo_gap
157
+ test_end = test_start + self.test_size
158
+ test_indices = np.arange(test_start, test_end)
159
+
160
+ yield train_indices, test_indices
161
+ start += self.step_size
162
+
163
+
164
+ class CombinatorialPurgedCV:
165
+ """
166
+ Combinatorial Purged Cross-Validation (CPCV).
167
+
168
+ THE GOLD STANDARD for financial ML backtesting.
169
+
170
+ Based on Lopez de Prado (2019): Instead of sequential splits, we create
171
+ all possible combinations of train/test splits with embargo gaps.
172
+ This gives N choose K test sets, providing much more robust statistics.
173
+
174
+ Why this matters:
175
+ - Standard walk-forward: You test on 5 periods. Maybe 4 are bull, 1 is bear.
176
+ Your model looks great but fails in bear markets.
177
+ - CPCV: You test on ALL combinations. Some train sets include bear, some don't.
178
+ You get a distribution of performance, not a single number.
179
+
180
+ This is the difference between "my strategy returned 20%" and
181
+ "my strategy has a 95% chance of returning 10-30% with max drawdown < 15%."
182
+ """
183
+
184
+ def __init__(self, n_splits: int = 6, n_test_splits: int = 2,
185
+ embargo_pct: float = 0.04):
186
+ """
187
+ Args:
188
+ n_splits: Total number of groups to divide data into
189
+ n_test_splits: How many groups form the test set
190
+ embargo_pct: Percentage of data to embargo between train and test
191
+ """
192
+ self.n_splits = n_splits
193
+ self.n_test_splits = n_test_splits
194
+ self.embargo_pct = embargo_pct
195
+
196
+ from itertools import combinations
197
+ self.test_combinations = list(combinations(range(n_splits), n_test_splits))
198
+
199
+ def split(self, X: np.ndarray, y: Optional[np.ndarray] = None,
200
+ groups: Optional[np.ndarray] = None) -> Iterator[Tuple[np.ndarray, np.ndarray]]:
201
+ """Generate combinatorial purged train/test splits"""
202
+ n_samples = len(X)
203
+ fold_size = n_samples // self.n_splits
204
+ embargo_size = int(fold_size * self.embargo_pct)
205
+
206
+ for test_groups in self.test_combinations:
207
+ # Test indices: union of selected test groups
208
+ test_indices = []
209
+ for g in test_groups:
210
+ start = g * fold_size
211
+ end = min((g + 1) * fold_size, n_samples)
212
+ test_indices.extend(range(start, end))
213
+ test_indices = np.array(test_indices)
214
+
215
+ # Train indices: everything NOT in test, with embargo gaps
216
+ train_indices = []
217
+ for g in range(self.n_splits):
218
+ if g in test_groups:
219
+ continue
220
+
221
+ start = g * fold_size
222
+ end = min((g + 1) * fold_size, n_samples)
223
+
224
+ # Add embargo gap if adjacent to test group
225
+ for tg in test_groups:
226
+ if abs(g - tg) == 1: # Adjacent
227
+ if g < tg:
228
+ end = max(start, end - embargo_size)
229
+ else:
230
+ start = min(start + embargo_size, end)
231
+
232
+ if start < end:
233
+ train_indices.extend(range(start, end))
234
+
235
+ train_indices = np.array(train_indices)
236
+
237
+ if len(train_indices) > 0 and len(test_indices) > 0:
238
+ yield train_indices, test_indices
239
+
240
+ def get_n_splits(self, X=None, y=None, groups=None) -> int:
241
+ return len(self.test_combinations)
242
+
243
+
244
+ class WalkForwardBacktest:
245
+ """
246
+ Complete walk-forward backtest engine.
247
+
248
+ This runs your ENTIRE pipeline (data → features → model → portfolio → execution)
249
+ through walk-forward validation, giving you the ONLY honest backtest result.
250
+
251
+ Usage:
252
+ backtest = WalkForwardBacktest(config=WalkForwardConfig(min_train_size=504))
253
+ results = backtest.run(
254
+ data_pipeline=data_pipeline,
255
+ alpha_model_factory=alpha_factory,
256
+ portfolio_optimizer=optimizer,
257
+ backtest_engine=backtest_engine
258
+ )
259
+
260
+ Returns: Honest Sharpe, drawdown, IC distributions — not fake overfit numbers.
261
+ """
262
+
263
+ def __init__(self, config: Optional[WalkForwardConfig] = None,
264
+ cv_type: str = 'expanding'):
265
+ self.config = config or WalkForwardConfig()
266
+ self.cv_type = cv_type
267
+
268
+ if cv_type == 'expanding':
269
+ self.cv = ExpandingWindowWalkForward(config)
270
+ elif cv_type == 'sliding':
271
+ self.cv = SlidingWindowWalkForward(
272
+ config.min_train_size, config.test_size,
273
+ config.step_size, config.embargo_gap
274
+ )
275
+ elif cv_type == 'purged':
276
+ self.cv = PurgedKFoldCV(n_splits=5, purge_k=config.embargo_gap)
277
+ elif cv_type == 'combinatorial':
278
+ self.cv = CombinatorialPurgedCV(n_splits=6, n_test_splits=2)
279
+ else:
280
+ raise ValueError(f"Unknown cv_type: {cv_type}")
281
+
282
+ def run(self, X: np.ndarray, y: np.ndarray,
283
+ model_factory: Callable,
284
+ eval_fn: Callable[[np.ndarray, np.ndarray], Dict]) -> Dict:
285
+ """
286
+ Run walk-forward validation.
287
+
288
+ Args:
289
+ X: Features array
290
+ y: Target array
291
+ model_factory: Callable that returns a NEW model instance
292
+ eval_fn: Callable(pred, actual) -> dict of metrics
293
+
294
+ Returns:
295
+ Dict with fold-by-fold results and aggregate statistics
296
+ """
297
+ fold_results = []
298
+
299
+ print(f"Running {self.cv_type} walk-forward validation...")
300
+ print(f"Config: train_min={self.config.min_train_size}, "
301
+ f"test={self.config.test_size}, step={self.config.step_size}, "
302
+ f"embargo={self.config.embargo_gap}")
303
+
304
+ for fold, (train_idx, test_idx) in enumerate(self.cv.split(X, y)):
305
+ print(f"\nFold {fold + 1}/{self.cv.get_n_splits(X)}")
306
+ print(f" Train: {len(train_idx)} samples ({train_idx[0]} to {train_idx[-1]})")
307
+ print(f" Test: {len(test_idx)} samples ({test_idx[0]} to {test_idx[-1]})")
308
+
309
+ # Split data
310
+ X_train, X_test = X[train_idx], X[test_idx]
311
+ y_train, y_test = y[train_idx], y[test_idx]
312
+
313
+ # Train fresh model (NO LOOKAHEAD!)
314
+ model = model_factory()
315
+ model.fit(X_train, y_train)
316
+
317
+ # Predict
318
+ y_pred = model.predict(X_test)
319
+
320
+ # Evaluate
321
+ metrics = eval_fn(y_pred, y_test)
322
+ metrics['fold'] = fold
323
+ metrics['train_size'] = len(train_idx)
324
+ metrics['test_size'] = len(test_idx)
325
+ metrics['train_start'] = int(train_idx[0])
326
+ metrics['train_end'] = int(train_idx[-1])
327
+ metrics['test_start'] = int(test_idx[0])
328
+ metrics['test_end'] = int(test_idx[-1])
329
+
330
+ print(f" Metrics: {metrics}")
331
+ fold_results.append(metrics)
332
+
333
+ # Aggregate statistics
334
+ aggregate = self._aggregate_results(fold_results)
335
+
336
+ return {
337
+ 'fold_results': fold_results,
338
+ 'aggregate': aggregate,
339
+ 'cv_type': self.cv_type,
340
+ 'config': self.config
341
+ }
342
+
343
+ def _aggregate_results(self, fold_results: List[Dict]) -> Dict:
344
+ """Compute aggregate statistics across folds"""
345
+ if not fold_results:
346
+ return {}
347
+
348
+ # Collect numeric metrics
349
+ numeric_keys = []
350
+ for key in fold_results[0].keys():
351
+ if key not in ['fold', 'train_start', 'train_end', 'test_start', 'test_end',
352
+ 'train_size', 'test_size']:
353
+ if isinstance(fold_results[0][key], (int, float, np.number)):
354
+ numeric_keys.append(key)
355
+
356
+ aggregate = {}
357
+ for key in numeric_keys:
358
+ values = [r[key] for r in fold_results if key in r and r[key] is not None]
359
+ if not values:
360
+ continue
361
+
362
+ values = np.array(values)
363
+ aggregate[key] = {
364
+ 'mean': float(np.mean(values)),
365
+ 'std': float(np.std(values)),
366
+ 'min': float(np.min(values)),
367
+ 'max': float(np.max(values)),
368
+ 'median': float(np.median(values)),
369
+ 'pct_5th': float(np.percentile(values, 5)),
370
+ 'pct_95th': float(np.percentile(values, 95))
371
+ }
372
+
373
+ return aggregate
374
+
375
+
376
+ def honest_backtest_example():
377
+ """Example of how walk-forward prevents false results"""
378
+ from sklearn.linear_model import Ridge
379
+ from scipy.stats import spearmanr
380
+
381
+ # Generate fake time series with autocorrelation (like real markets)
382
+ np.random.seed(42)
383
+ n = 2000
384
+ y = np.zeros(n)
385
+ y[0] = np.random.randn()
386
+ for i in range(1, n):
387
+ y[i] = 0.7 * y[i-1] + np.random.randn() * 0.5 # AR(1) process
388
+
389
+ # Features: lagged y + noise (realistic)
390
+ X = np.zeros((n, 5))
391
+ for lag in range(5):
392
+ X[lag+1:, lag] = y[:-lag-1] if lag > 0 else y[:-1]
393
+ X[0, :] = 0 # First row has no history
394
+
395
+ # Random train/test split (WRONG for time series!)
396
+ from sklearn.model_selection import train_test_split
397
+ X_train_bad, X_test_bad, y_train_bad, y_test_bad = train_test_split(
398
+ X[5:], y[5:], test_size=0.3, random_state=42
399
+ )
400
+
401
+ model_bad = Ridge().fit(X_train_bad, y_train_bad)
402
+ pred_bad = model_bad.predict(X_test_bad)
403
+ ic_bad, _ = spearmanr(pred_bad, y_test_bad)
404
+
405
+ # Walk-forward split (CORRECT!)
406
+ wf = ExpandingWindowWalkForward(
407
+ WalkForwardConfig(min_train_size=500, test_size=200, step_size=200, embargo_gap=10)
408
+ )
409
+ ics_wf = []
410
+ for train_idx, test_idx in wf.split(X[5:], y[5:]):
411
+ X_train_wf, X_test_wf = X[5:][train_idx], X[5:][test_idx]
412
+ y_train_wf, y_test_wf = y[5:][train_idx], y[5:][test_idx]
413
+
414
+ model_wf = Ridge().fit(X_train_wf, y_train_wf)
415
+ pred_wf = model_wf.predict(X_test_wf)
416
+ ic_wf, _ = spearmanr(pred_wf, y_test_wf)
417
+ ics_wf.append(ic_wf)
418
+
419
+ print("=" * 60)
420
+ print("THE WALK-FORWARD TRUTH BOMB")
421
+ print("=" * 60)
422
+ print(f"Random split IC: {ic_bad:.4f} ← This is a LIE")
423
+ print(f"Walk-forward IC: {np.mean(ics_wf):.4f} ± {np.std(ics_wf):.4f}")
424
+ print(f"Walk-forward range: [{np.min(ics_wf):.4f}, {np.max(ics_wf):.4f}]")
425
+ print()
426
+ print("Random split looks great because future data leaked into training!")
427
+ print("Walk-forward is honest because it only trains on PAST data.")
428
+ print(f"Difference: {abs(ic_bad - np.mean(ics_wf)):.4f} — this is your FALSE HOPE.")
429
+
430
+
431
+ if __name__ == '__main__':
432
+ honest_backtest_example()