| | """ |
| | walk_forward.py β Strict time-series walk-forward cross-validation. |
| | |
| | Architecture: |
| | βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| | β FOLD 1: [=TRAIN=======|=VAL=|----TEST----] β |
| | β FOLD 2: [=TRAIN============|=VAL=|--TEST--] β |
| | β FOLD 3: [=TRAIN==================|=VAL=|TEST] β |
| | βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| | |
| | Key anti-lookahead rules enforced here: |
| | 1. Train/val/test boundaries are strictly chronological |
| | 2. No future data ever seen during training or threshold search |
| | 3. Labels computed BEFORE fold construction (in labeler.py) |
| | 4. Threshold optimized on VAL set; reported metric on TEST set only |
| | 5. Model fitted fresh for each fold (no weight leakage) |
| | """ |
| |
|
| | import json |
| | import logging |
| | from dataclasses import dataclass, field |
| | from typing import List, Tuple, Optional |
| |
|
| | import numpy as np |
| | import pandas as pd |
| |
|
| | from ml_config import ( |
| | WF_N_SPLITS, |
| | WF_TRAIN_FRAC, |
| | WF_MIN_TRAIN_OBS, |
| | LGBM_PARAMS, |
| | THRESHOLD_MIN, |
| | THRESHOLD_MAX, |
| | THRESHOLD_STEPS, |
| | THRESHOLD_OBJECTIVE, |
| | ROUND_TRIP_COST, |
| | TARGET_RR, |
| | FEATURE_COLUMNS, |
| | ) |
| | from model_backend import ModelBackend |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | @dataclass |
| | class FoldResult: |
| | fold: int |
| | n_train: int |
| | n_val: int |
| | n_test: int |
| | train_win_rate: float |
| | val_win_rate: float |
| | test_win_rate: float |
| | best_threshold: float |
| | val_objective: float |
| | test_sharpe: float |
| | test_expectancy: float |
| | test_precision: float |
| | test_n_trades: int |
| | feature_importances: np.ndarray = field(repr=False) |
| |
|
| |
|
| | def _compute_expectancy(y_true: np.ndarray, rr: float = TARGET_RR, cost: float = ROUND_TRIP_COST) -> float: |
| | """ |
| | Mathematical expectancy per trade (in R units): |
| | E = win_rate * RR - loss_rate * 1 - cost |
| | """ |
| | if len(y_true) == 0: |
| | return -999.0 |
| | win_rate = float(y_true.mean()) |
| | loss_rate = 1.0 - win_rate |
| | return win_rate * rr - loss_rate * 1.0 - cost |
| |
|
| |
|
| | def _compute_sharpe(y_true: np.ndarray, rr: float = TARGET_RR, cost: float = ROUND_TRIP_COST) -> float: |
| | """ |
| | Approximate trade Sharpe: mean(trade PnL) / std(trade PnL). |
| | Trade PnL in R: +RR for win, -1 for loss. |
| | """ |
| | if len(y_true) < 5: |
| | return -999.0 |
| | pnl = np.where(y_true == 1, rr, -1.0) - cost |
| | std = pnl.std() |
| | if std < 1e-9: |
| | return 0.0 |
| | return float(pnl.mean() / std * np.sqrt(252)) |
| |
|
| |
|
| | def _optimize_threshold( |
| | probs: np.ndarray, |
| | y_true: np.ndarray, |
| | objective: str = THRESHOLD_OBJECTIVE, |
| | ) -> Tuple[float, float]: |
| | """ |
| | Grid-search threshold on VAL set. |
| | Returns (best_threshold, best_objective_value). |
| | """ |
| | thresholds = np.linspace(THRESHOLD_MIN, THRESHOLD_MAX, THRESHOLD_STEPS) |
| | best_thresh = THRESHOLD_MIN |
| | best_val = -np.inf |
| |
|
| | for t in thresholds: |
| | mask = probs >= t |
| | if mask.sum() < 10: |
| | continue |
| | y_filtered = y_true[mask] |
| | if objective == "expectancy": |
| | val = _compute_expectancy(y_filtered) |
| | elif objective == "sharpe": |
| | val = _compute_sharpe(y_filtered) |
| | elif objective == "precision_recall": |
| | prec = y_filtered.mean() |
| | recall = y_filtered.sum() / (y_true.sum() + 1e-9) |
| | val = 2 * prec * recall / (prec + recall + 1e-9) |
| | else: |
| | val = y_filtered.mean() |
| |
|
| | if val > best_val: |
| | best_val = val |
| | best_thresh = t |
| |
|
| | return float(best_thresh), float(best_val) |
| |
|
| |
|
| | def _make_folds( |
| | n: int, |
| | n_splits: int = WF_N_SPLITS, |
| | train_frac: float = WF_TRAIN_FRAC, |
| | ) -> List[Tuple[range, range, range]]: |
| | """ |
| | Generate (train, val, test) index ranges for walk-forward CV. |
| | Each fold grows the training window while test always moves forward. |
| | Val is 15% of the train fraction; test is the remaining hold-out. |
| | """ |
| | folds = [] |
| | fold_size = n // (n_splits + 1) |
| | val_frac = 0.15 |
| |
|
| | for i in range(n_splits): |
| | test_end = n - (n_splits - 1 - i) * fold_size |
| | test_start = test_end - fold_size |
| | val_end = test_start |
| | val_start = int(val_end * (1 - val_frac)) |
| | train_end = val_start |
| | train_start = 0 |
| |
|
| | if train_end - train_start < WF_MIN_TRAIN_OBS: |
| | continue |
| |
|
| | folds.append(( |
| | range(train_start, train_end), |
| | range(val_start, val_end), |
| | range(test_start, test_end), |
| | )) |
| | return folds |
| |
|
| |
|
| | def run_walk_forward( |
| | X: np.ndarray, |
| | y: np.ndarray, |
| | timestamps: Optional[np.ndarray] = None, |
| | params: dict = None, |
| | ) -> List[FoldResult]: |
| | """ |
| | Execute full walk-forward validation. |
| | |
| | Args: |
| | X: Feature matrix (N, n_features) β rows in chronological order |
| | y: Label array (N,) β 0/1 binary |
| | timestamps: Optional array of timestamps for logging |
| | params: Model hyperparameters (defaults to ml_config.LGBM_PARAMS) |
| | |
| | Returns: |
| | List of FoldResult, one per valid fold. |
| | """ |
| | if params is None: |
| | params = LGBM_PARAMS |
| |
|
| | results: List[FoldResult] = [] |
| | folds = _make_folds(len(X), WF_N_SPLITS, WF_TRAIN_FRAC) |
| |
|
| | if not folds: |
| | raise ValueError(f"Insufficient data for walk-forward CV. Need >= {WF_MIN_TRAIN_OBS * (WF_N_SPLITS + 1)} rows.") |
| |
|
| | all_importances = [] |
| |
|
| | for fold_idx, (tr, va, te) in enumerate(folds, 1): |
| | X_tr, y_tr = X[tr], y[tr] |
| | X_va, y_va = X[va], y[va] |
| | X_te, y_te = X[te], y[te] |
| |
|
| | if len(np.unique(y_tr)) < 2: |
| | logger.warning(f"Fold {fold_idx}: only one class in training set β skipping") |
| | continue |
| |
|
| | logger.info( |
| | f"Fold {fold_idx}/{len(folds)}: " |
| | f"train={len(X_tr)} val={len(X_va)} test={len(X_te)} " |
| | f"(wr_tr={y_tr.mean():.3f} wr_va={y_va.mean():.3f} wr_te={y_te.mean():.3f})" |
| | ) |
| |
|
| | |
| | pos_frac = y_tr.mean() |
| | if 0.05 < pos_frac < 0.95: |
| | sample_weight = np.where(y_tr == 1, 1.0 / pos_frac, 1.0 / (1 - pos_frac)) |
| | else: |
| | sample_weight = None |
| |
|
| | backend = ModelBackend(params=params, calibrate=True) |
| | backend.fit(X_tr, y_tr, X_va, y_va, sample_weight=sample_weight) |
| |
|
| | val_probs = backend.predict_win_prob(X_va) |
| | test_probs = backend.predict_win_prob(X_te) |
| |
|
| | best_thresh, best_val_obj = _optimize_threshold(val_probs, y_va) |
| |
|
| | |
| | test_mask = test_probs >= best_thresh |
| | y_te_filtered = y_te[test_mask] |
| | n_test_trades = int(test_mask.sum()) |
| |
|
| | test_expectancy = _compute_expectancy(y_te_filtered) if n_test_trades > 0 else -999.0 |
| | test_sharpe = _compute_sharpe(y_te_filtered) if n_test_trades > 0 else -999.0 |
| | test_precision = float(y_te_filtered.mean()) if n_test_trades > 0 else 0.0 |
| |
|
| | all_importances.append(backend.feature_importances_) |
| |
|
| | result = FoldResult( |
| | fold=fold_idx, |
| | n_train=len(X_tr), |
| | n_val=len(X_va), |
| | n_test=len(X_te), |
| | train_win_rate=float(y_tr.mean()), |
| | val_win_rate=float(y_va.mean()), |
| | test_win_rate=float(y_te.mean()), |
| | best_threshold=best_thresh, |
| | val_objective=best_val_obj, |
| | test_sharpe=test_sharpe, |
| | test_expectancy=test_expectancy, |
| | test_precision=test_precision, |
| | test_n_trades=n_test_trades, |
| | feature_importances=backend.feature_importances_, |
| | ) |
| | results.append(result) |
| |
|
| | logger.info( |
| | f"Fold {fold_idx}: thresh={best_thresh:.3f} " |
| | f"test_expectancy={test_expectancy:.4f} " |
| | f"test_sharpe={test_sharpe:.3f} " |
| | f"test_prec={test_precision:.3f} " |
| | f"n_trades={n_test_trades}" |
| | ) |
| |
|
| | return results |
| |
|
| |
|
| | def summarize_walk_forward(results: List[FoldResult]) -> dict: |
| | """Aggregate walk-forward results into a summary dict.""" |
| | if not results: |
| | return {} |
| |
|
| | thresholds = [r.best_threshold for r in results] |
| | expectancies = [r.test_expectancy for r in results if r.test_expectancy > -999] |
| | sharpes = [r.test_sharpe for r in results if r.test_sharpe > -999] |
| | precisions = [r.test_precision for r in results] |
| | n_trades = [r.test_n_trades for r in results] |
| |
|
| | avg_importance = np.mean([r.feature_importances for r in results], axis=0) |
| |
|
| | return { |
| | "n_folds": len(results), |
| | "mean_threshold": round(float(np.mean(thresholds)), 4), |
| | "std_threshold": round(float(np.std(thresholds)), 4), |
| | "mean_expectancy": round(float(np.mean(expectancies)), 4) if expectancies else None, |
| | "std_expectancy": round(float(np.std(expectancies)), 4) if expectancies else None, |
| | "mean_sharpe": round(float(np.mean(sharpes)), 4) if sharpes else None, |
| | "mean_precision": round(float(np.mean(precisions)), 4), |
| | "mean_n_trades_per_fold": round(float(np.mean(n_trades)), 1), |
| | "avg_feature_importance": avg_importance.tolist(), |
| | "fold_details": [ |
| | { |
| | "fold": r.fold, |
| | "threshold": r.best_threshold, |
| | "test_expectancy": r.test_expectancy, |
| | "test_sharpe": r.test_sharpe, |
| | "test_precision": r.test_precision, |
| | "test_n_trades": r.test_n_trades, |
| | } |
| | for r in results |
| | ], |
| | } |
| |
|