GoshawkVortexAI commited on
Commit
96d72eb
·
verified ·
1 Parent(s): e365f22

Create walk_forward.py

Browse files
Files changed (1) hide show
  1. walk_forward.py +291 -0
walk_forward.py ADDED
@@ -0,0 +1,291 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ walk_forward.py — Strict time-series walk-forward cross-validation.
3
+
4
+ Architecture:
5
+ ┌─────────────────────────────────────────────────────────┐
6
+ │ FOLD 1: [=TRAIN=======|=VAL=|----TEST----] │
7
+ │ FOLD 2: [=TRAIN============|=VAL=|--TEST--] │
8
+ │ FOLD 3: [=TRAIN==================|=VAL=|TEST] │
9
+ └─────────────────────────────────────────────────────────┘
10
+
11
+ Key anti-lookahead rules enforced here:
12
+ 1. Train/val/test boundaries are strictly chronological
13
+ 2. No future data ever seen during training or threshold search
14
+ 3. Labels computed BEFORE fold construction (in labeler.py)
15
+ 4. Threshold optimized on VAL set; reported metric on TEST set only
16
+ 5. Model fitted fresh for each fold (no weight leakage)
17
+ """
18
+
19
+ import json
20
+ import logging
21
+ from dataclasses import dataclass, field
22
+ from typing import List, Tuple, Optional
23
+
24
+ import numpy as np
25
+ import pandas as pd
26
+
27
+ from ml_config import (
28
+ WF_N_SPLITS,
29
+ WF_TRAIN_FRAC,
30
+ WF_MIN_TRAIN_OBS,
31
+ LGBM_PARAMS,
32
+ THRESHOLD_MIN,
33
+ THRESHOLD_MAX,
34
+ THRESHOLD_STEPS,
35
+ THRESHOLD_OBJECTIVE,
36
+ ROUND_TRIP_COST,
37
+ TARGET_RR,
38
+ FEATURE_COLUMNS,
39
+ )
40
+ from model_backend import ModelBackend
41
+
42
+ logger = logging.getLogger(__name__)
43
+
44
+
45
+ @dataclass
46
+ class FoldResult:
47
+ fold: int
48
+ n_train: int
49
+ n_val: int
50
+ n_test: int
51
+ train_win_rate: float
52
+ val_win_rate: float
53
+ test_win_rate: float
54
+ best_threshold: float
55
+ val_objective: float # objective on val (used to pick threshold)
56
+ test_sharpe: float # out-of-sample Sharpe after thresholding
57
+ test_expectancy: float # out-of-sample expectancy per trade
58
+ test_precision: float # win rate of filtered trades on test
59
+ test_n_trades: int # number of trades passing filter on test
60
+ feature_importances: np.ndarray = field(repr=False)
61
+
62
+
63
+ def _compute_expectancy(y_true: np.ndarray, rr: float = TARGET_RR, cost: float = ROUND_TRIP_COST) -> float:
64
+ """
65
+ Mathematical expectancy per trade (in R units):
66
+ E = win_rate * RR - loss_rate * 1 - cost
67
+ """
68
+ if len(y_true) == 0:
69
+ return -999.0
70
+ win_rate = float(y_true.mean())
71
+ loss_rate = 1.0 - win_rate
72
+ return win_rate * rr - loss_rate * 1.0 - cost
73
+
74
+
75
+ def _compute_sharpe(y_true: np.ndarray, rr: float = TARGET_RR, cost: float = ROUND_TRIP_COST) -> float:
76
+ """
77
+ Approximate trade Sharpe: mean(trade PnL) / std(trade PnL).
78
+ Trade PnL in R: +RR for win, -1 for loss.
79
+ """
80
+ if len(y_true) < 5:
81
+ return -999.0
82
+ pnl = np.where(y_true == 1, rr, -1.0) - cost
83
+ std = pnl.std()
84
+ if std < 1e-9:
85
+ return 0.0
86
+ return float(pnl.mean() / std * np.sqrt(252)) # annualized loosely
87
+
88
+
89
+ def _optimize_threshold(
90
+ probs: np.ndarray,
91
+ y_true: np.ndarray,
92
+ objective: str = THRESHOLD_OBJECTIVE,
93
+ ) -> Tuple[float, float]:
94
+ """
95
+ Grid-search threshold on VAL set.
96
+ Returns (best_threshold, best_objective_value).
97
+ """
98
+ thresholds = np.linspace(THRESHOLD_MIN, THRESHOLD_MAX, THRESHOLD_STEPS)
99
+ best_thresh = THRESHOLD_MIN
100
+ best_val = -np.inf
101
+
102
+ for t in thresholds:
103
+ mask = probs >= t
104
+ if mask.sum() < 10: # too few trades to be meaningful
105
+ continue
106
+ y_filtered = y_true[mask]
107
+ if objective == "expectancy":
108
+ val = _compute_expectancy(y_filtered)
109
+ elif objective == "sharpe":
110
+ val = _compute_sharpe(y_filtered)
111
+ elif objective == "precision_recall":
112
+ prec = y_filtered.mean()
113
+ recall = y_filtered.sum() / (y_true.sum() + 1e-9)
114
+ val = 2 * prec * recall / (prec + recall + 1e-9) # F1
115
+ else:
116
+ val = y_filtered.mean() # default: win rate
117
+
118
+ if val > best_val:
119
+ best_val = val
120
+ best_thresh = t
121
+
122
+ return float(best_thresh), float(best_val)
123
+
124
+
125
+ def _make_folds(
126
+ n: int,
127
+ n_splits: int = WF_N_SPLITS,
128
+ train_frac: float = WF_TRAIN_FRAC,
129
+ ) -> List[Tuple[range, range, range]]:
130
+ """
131
+ Generate (train, val, test) index ranges for walk-forward CV.
132
+ Each fold grows the training window while test always moves forward.
133
+ Val is 15% of the train fraction; test is the remaining hold-out.
134
+ """
135
+ folds = []
136
+ fold_size = n // (n_splits + 1)
137
+ val_frac = 0.15
138
+
139
+ for i in range(n_splits):
140
+ test_end = n - (n_splits - 1 - i) * fold_size
141
+ test_start = test_end - fold_size
142
+ val_end = test_start
143
+ val_start = int(val_end * (1 - val_frac))
144
+ train_end = val_start
145
+ train_start = 0 # expanding window
146
+
147
+ if train_end - train_start < WF_MIN_TRAIN_OBS:
148
+ continue
149
+
150
+ folds.append((
151
+ range(train_start, train_end),
152
+ range(val_start, val_end),
153
+ range(test_start, test_end),
154
+ ))
155
+ return folds
156
+
157
+
158
+ def run_walk_forward(
159
+ X: np.ndarray,
160
+ y: np.ndarray,
161
+ timestamps: Optional[np.ndarray] = None,
162
+ params: dict = None,
163
+ ) -> List[FoldResult]:
164
+ """
165
+ Execute full walk-forward validation.
166
+
167
+ Args:
168
+ X: Feature matrix (N, n_features) — rows in chronological order
169
+ y: Label array (N,) — 0/1 binary
170
+ timestamps: Optional array of timestamps for logging
171
+ params: Model hyperparameters (defaults to ml_config.LGBM_PARAMS)
172
+
173
+ Returns:
174
+ List of FoldResult, one per valid fold.
175
+ """
176
+ if params is None:
177
+ params = LGBM_PARAMS
178
+
179
+ results: List[FoldResult] = []
180
+ folds = _make_folds(len(X), WF_N_SPLITS, WF_TRAIN_FRAC)
181
+
182
+ if not folds:
183
+ raise ValueError(f"Insufficient data for walk-forward CV. Need >= {WF_MIN_TRAIN_OBS * (WF_N_SPLITS + 1)} rows.")
184
+
185
+ all_importances = []
186
+
187
+ for fold_idx, (tr, va, te) in enumerate(folds, 1):
188
+ X_tr, y_tr = X[tr], y[tr]
189
+ X_va, y_va = X[va], y[va]
190
+ X_te, y_te = X[te], y[te]
191
+
192
+ if len(np.unique(y_tr)) < 2:
193
+ logger.warning(f"Fold {fold_idx}: only one class in training set — skipping")
194
+ continue
195
+
196
+ logger.info(
197
+ f"Fold {fold_idx}/{len(folds)}: "
198
+ f"train={len(X_tr)} val={len(X_va)} test={len(X_te)} "
199
+ f"(wr_tr={y_tr.mean():.3f} wr_va={y_va.mean():.3f} wr_te={y_te.mean():.3f})"
200
+ )
201
+
202
+ # Compute class weight to handle imbalance (crypto: ~35-45% win rate)
203
+ pos_frac = y_tr.mean()
204
+ if 0.05 < pos_frac < 0.95:
205
+ sample_weight = np.where(y_tr == 1, 1.0 / pos_frac, 1.0 / (1 - pos_frac))
206
+ else:
207
+ sample_weight = None
208
+
209
+ backend = ModelBackend(params=params, calibrate=True)
210
+ backend.fit(X_tr, y_tr, X_va, y_va, sample_weight=sample_weight)
211
+
212
+ val_probs = backend.predict_win_prob(X_va)
213
+ test_probs = backend.predict_win_prob(X_te)
214
+
215
+ best_thresh, best_val_obj = _optimize_threshold(val_probs, y_va)
216
+
217
+ # Evaluate on TEST set using threshold from VAL
218
+ test_mask = test_probs >= best_thresh
219
+ y_te_filtered = y_te[test_mask]
220
+ n_test_trades = int(test_mask.sum())
221
+
222
+ test_expectancy = _compute_expectancy(y_te_filtered) if n_test_trades > 0 else -999.0
223
+ test_sharpe = _compute_sharpe(y_te_filtered) if n_test_trades > 0 else -999.0
224
+ test_precision = float(y_te_filtered.mean()) if n_test_trades > 0 else 0.0
225
+
226
+ all_importances.append(backend.feature_importances_)
227
+
228
+ result = FoldResult(
229
+ fold=fold_idx,
230
+ n_train=len(X_tr),
231
+ n_val=len(X_va),
232
+ n_test=len(X_te),
233
+ train_win_rate=float(y_tr.mean()),
234
+ val_win_rate=float(y_va.mean()),
235
+ test_win_rate=float(y_te.mean()),
236
+ best_threshold=best_thresh,
237
+ val_objective=best_val_obj,
238
+ test_sharpe=test_sharpe,
239
+ test_expectancy=test_expectancy,
240
+ test_precision=test_precision,
241
+ test_n_trades=n_test_trades,
242
+ feature_importances=backend.feature_importances_,
243
+ )
244
+ results.append(result)
245
+
246
+ logger.info(
247
+ f"Fold {fold_idx}: thresh={best_thresh:.3f} "
248
+ f"test_expectancy={test_expectancy:.4f} "
249
+ f"test_sharpe={test_sharpe:.3f} "
250
+ f"test_prec={test_precision:.3f} "
251
+ f"n_trades={n_test_trades}"
252
+ )
253
+
254
+ return results
255
+
256
+
257
+ def summarize_walk_forward(results: List[FoldResult]) -> dict:
258
+ """Aggregate walk-forward results into a summary dict."""
259
+ if not results:
260
+ return {}
261
+
262
+ thresholds = [r.best_threshold for r in results]
263
+ expectancies = [r.test_expectancy for r in results if r.test_expectancy > -999]
264
+ sharpes = [r.test_sharpe for r in results if r.test_sharpe > -999]
265
+ precisions = [r.test_precision for r in results]
266
+ n_trades = [r.test_n_trades for r in results]
267
+
268
+ avg_importance = np.mean([r.feature_importances for r in results], axis=0)
269
+
270
+ return {
271
+ "n_folds": len(results),
272
+ "mean_threshold": round(float(np.mean(thresholds)), 4),
273
+ "std_threshold": round(float(np.std(thresholds)), 4),
274
+ "mean_expectancy": round(float(np.mean(expectancies)), 4) if expectancies else None,
275
+ "std_expectancy": round(float(np.std(expectancies)), 4) if expectancies else None,
276
+ "mean_sharpe": round(float(np.mean(sharpes)), 4) if sharpes else None,
277
+ "mean_precision": round(float(np.mean(precisions)), 4),
278
+ "mean_n_trades_per_fold": round(float(np.mean(n_trades)), 1),
279
+ "avg_feature_importance": avg_importance.tolist(),
280
+ "fold_details": [
281
+ {
282
+ "fold": r.fold,
283
+ "threshold": r.best_threshold,
284
+ "test_expectancy": r.test_expectancy,
285
+ "test_sharpe": r.test_sharpe,
286
+ "test_precision": r.test_precision,
287
+ "test_n_trades": r.test_n_trades,
288
+ }
289
+ for r in results
290
+ ],
291
+ }