Spaces:
No application file
No application file
| """Combinatorial Purged Cross-Validation and CPCV-based performance analysis.""" | |
| import itertools | |
| import math | |
| import numbers | |
| from collections.abc import Iterator | |
| from math import comb | |
| from itertools import combinations | |
| from typing import List, Optional, Tuple | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| import plotly.graph_objects as go | |
| from sklearn.base import clone | |
| from sklearn.model_selection import BaseCrossValidator | |
| import sklearn.utils as sku | |
| from joblib import Parallel, delayed | |
| from numba import njit | |
| from scipy.stats import norm, skew as scipy_skew, kurtosis as scipy_kurt | |
| from .cross_validation import ml_get_train_times | |
| _MAX_COMBINATIONS = 100_000 | |
| # --------------------------------------------------------------------------- | |
| # Numba Optimized Utilities | |
| # --------------------------------------------------------------------------- | |
| def fill_sides_numba(num_close, t0_idx, t1_idx, side): | |
| """ | |
| Maps event-based signals to a continuous timeline by additive accumulation. | |
| Parameters | |
| ---------- | |
| num_close : int | |
| The total number of bars in the reference price series. | |
| t0_idx : np.ndarray (int64) | |
| Integer indices for the start (entry) of each bet. | |
| t1_idx : np.ndarray (int64) | |
| Integer indices for the end (exit) of each bet. | |
| side : np.ndarray (float64) | |
| The signal values/sizes to be mapped. | |
| Returns | |
| ------- | |
| np.ndarray (float64) | |
| A continuous timeline of accumulated bet sizes. | |
| """ | |
| full_side = np.zeros(num_close, dtype=np.float64) | |
| for i in range(len(t0_idx)): | |
| start, end = t0_idx[i], t1_idx[i] | |
| if start != -1 and end != -1: | |
| full_side[start : end + 1] += side[i] | |
| return full_side | |
| def fill_average_active_sides(num_close, t0_idx, t1_idx, side): | |
| """ | |
| Maps event-based signals to a timeline by averaging concurrent signals. | |
| Implementation of AFML Snippet 10.3 logic. | |
| Parameters | |
| ---------- | |
| num_close : int | |
| The total number of bars in the reference price series. | |
| t0_idx : np.ndarray (int64) | |
| Integer indices for signal entries. | |
| t1_idx : np.ndarray (int64) | |
| Integer indices for signal exits. | |
| side : np.ndarray (float64) | |
| The conviction/probability signals. | |
| Returns | |
| ------- | |
| np.ndarray (float64) | |
| The time-weighted average signal at every timestamp. | |
| """ | |
| sum_side = np.zeros(num_close, dtype=np.float64) | |
| active_count = np.zeros(num_close, dtype=np.int32) | |
| for i in range(len(t0_idx)): | |
| start, end = t0_idx[i], t1_idx[i] | |
| if start != -1 and end != -1: | |
| sum_side[start : end + 1] += side[i] | |
| active_count[start : end + 1] += 1 | |
| avg_side = np.zeros(num_close, dtype=np.float64) | |
| for t in range(num_close): | |
| if active_count[t] > 0: | |
| avg_side[t] = sum_side[t] / active_count[t] | |
| return avg_side | |
| # --------------------------------------------------------------------------- | |
| # Helper statistics functions | |
| # --------------------------------------------------------------------------- | |
| def _n_splits(n_folds: int, n_test_folds: int) -> int: | |
| """Number of splits = C(n_folds, n_test_folds).""" | |
| return math.comb(n_folds, n_test_folds) | |
| def _n_test_paths(n_folds: int, n_test_folds: int) -> int: | |
| """Number of distinct backtest paths that can be reconstructed.""" | |
| return _n_splits(n_folds=n_folds, n_test_folds=n_test_folds) * n_test_folds // n_folds | |
| def _avg_train_size(n_observations: int, n_folds: int, n_test_folds: int) -> float: | |
| """Average number of observations in each training set.""" | |
| return n_observations / n_folds * (n_folds - n_test_folds) | |
| # --------------------------------------------------------------------------- | |
| # Main class | |
| # --------------------------------------------------------------------------- | |
| class CombinatorialPurgedCV(BaseCrossValidator): | |
| """ | |
| Combinatorial Purged Cross-Validation for financial time series. | |
| Provides train/test indices to split time series data samples based on | |
| Combinatorial Purged Cross-Validation [1]_. | |
| Compared to ``KFold``, which uses a single test fold, this class uses | |
| ``n_test_folds > 1`` test folds per split so that multiple independent | |
| backtest paths can be recombined from the train/test combinations. | |
| To prevent data leakage, **purging** and **embargoing** are applied: | |
| * **Purging** (event-based, via ``t1``) – removes every training | |
| observation whose label formation period ``[t1.index[i], t1.iloc[i]]`` | |
| overlaps in time with any test-set event window. This is the exact | |
| overlap test from López de Prado (2018) Ch. 7, implemented in | |
| :func:`ml_get_train_times`. | |
| * **Embargoing** (positional, via ``pct_embargo``) – after the last | |
| observation of each contiguous test block, the next ``embargo_size`` | |
| training observations are excluded to guard against serial-correlation | |
| leakage (e.g. from ARMA-type features). ``embargo_size`` is computed | |
| as ``int(n_samples * pct_embargo)`` at ``split`` time. | |
| Parameters | |
| ---------- | |
| n_folds : int, default=10 | |
| Total number of folds. Must be at least 3. | |
| n_test_folds : int, default=2 | |
| Number of test folds per split. Must be at least 2 and strictly less | |
| than ``n_folds``. For one test fold use ``sklearn.model_selection.KFold``. | |
| t1 : pd.Series | |
| The information range for each observation. | |
| * ``t1.index`` – timestamp when information extraction *started* | |
| (bar open / observation start time). | |
| * ``t1.values`` – timestamp when the label was *finalised* | |
| (e.g. when a triple-barrier was hit). | |
| Must be aligned with ``X`` (same index). Used exclusively for | |
| event-based purging via :func:`ml_get_train_times`. | |
| pct_embargo : float, default=0.01 | |
| Fraction of total observations to exclude from the start of each | |
| training segment immediately following a test block (positional | |
| embargo). Set to ``0.0`` to disable. | |
| Attributes | |
| ---------- | |
| index_train_test_ : ndarray of shape (n_observations, n_splits) | |
| Populated after the generator from ``split()`` is fully exhausted. | |
| Encodes each observation's role per split: | |
| * ``0`` – training | |
| * ``1`` – test | |
| * ``-1`` – excluded (purged or embargoed) | |
| Examples | |
| -------- | |
| >>> import numpy as np, pandas as pd | |
| >>> from combinatorial import CombinatorialPurgedCV | |
| >>> dates = pd.date_range("2020-01-01", periods=12, freq="D") | |
| >>> X = pd.DataFrame(np.random.randn(12, 2), index=dates) | |
| >>> t1 = pd.Series(dates + pd.Timedelta(days=2), index=dates) | |
| >>> cv = CombinatorialPurgedCV(n_folds=3, n_test_folds=2, t1=t1, pct_embargo=0.0) | |
| >>> for i, (train_idx, test_list) in enumerate(cv.split(X)): | |
| ... print(f"Split {i}: train={train_idx.tolist()}") | |
| ... for j, t in enumerate(test_list): | |
| ... print(f" test[{j}]={t.tolist()}") | |
| References | |
| ---------- | |
| .. [1] "Advances in Financial Machine Learning", | |
| Marcos López de Prado (2018). | |
| """ | |
| def __init__( | |
| self, | |
| n_folds: int = 10, | |
| n_test_folds: int = 2, | |
| t1: Optional[pd.Series] = None, | |
| pct_embargo: float = 0.01, | |
| ) -> None: | |
| # ---- validate t1 ---- | |
| if not isinstance(t1, pd.Series): | |
| raise TypeError( | |
| "`t1` must be a pd.Series whose index holds event-start times " | |
| "and whose values hold event-end times." | |
| ) | |
| # ---- validate n_folds ---- | |
| if not isinstance(n_folds, numbers.Integral): | |
| raise ValueError( | |
| "The number of folds must be of Integral type. " | |
| f"{n_folds} of type {type(n_folds)} was passed." | |
| ) | |
| n_folds = int(n_folds) | |
| if n_folds <= 2: | |
| raise ValueError(f"`n_folds` must be at least 3, got `n_folds={n_folds}`.") | |
| # ---- validate n_test_folds ---- | |
| if n_test_folds <= 1: | |
| raise ValueError( | |
| f"`n_test_folds` must be at least 2, got `n_test_folds={n_test_folds}`." | |
| ) | |
| if n_test_folds >= n_folds: | |
| raise ValueError( | |
| "`n_folds` must be strictly greater than `n_test_folds`, " | |
| f"got n_folds={n_folds}, n_test_folds={n_test_folds}." | |
| ) | |
| # ---- validate pct_embargo ---- | |
| if not (0.0 <= pct_embargo < 1.0): | |
| raise ValueError("`pct_embargo` must be in [0, 1).") | |
| # ---- guard against combinatorial explosion ---- | |
| n_combinations = math.comb(n_folds, n_test_folds) | |
| if n_combinations > _MAX_COMBINATIONS: | |
| raise ValueError( | |
| f"n_folds={n_folds} and n_test_folds={n_test_folds} produce " | |
| f"{n_combinations:,} splits, exceeding the maximum of " | |
| f"{_MAX_COMBINATIONS:,}. Reduce `n_folds` or move `n_test_folds` " | |
| "further from n_folds / 2." | |
| ) | |
| self.n_folds = n_folds | |
| self.n_test_folds = n_test_folds | |
| self.t1 = t1 | |
| self.pct_embargo = pct_embargo | |
| # ------------------------------------------------------------------ | |
| # sklearn BaseCrossValidator interface | |
| # ------------------------------------------------------------------ | |
| def get_n_splits(self, X=None, y=None, groups=None) -> int: | |
| """Return the total number of splits = C(n_folds, n_test_folds).""" | |
| return self.n_splits | |
| def _iter_test_indices(self, X=None, y=None, groups=None): | |
| """Required stub; not useful here – use ``split()`` directly.""" | |
| raise NotImplementedError( | |
| "Use `split()` directly – it yields (train_index, [test_0, test_1, ...])." | |
| ) | |
| # -------------------------------------------------------------------------- | |
| # Read-only combinatorial properties (depend only on n_folds / n_test_folds) | |
| # -------------------------------------------------------------------------- | |
| def n_splits(self) -> int: | |
| """Total number of train/test splits.""" | |
| return _n_splits(n_folds=self.n_folds, n_test_folds=self.n_test_folds) | |
| def n_test_paths(self) -> int: | |
| """Number of distinct backtest paths that can be reconstructed.""" | |
| return _n_test_paths(n_folds=self.n_folds, n_test_folds=self.n_test_folds) | |
| def test_set_index(self) -> np.ndarray: | |
| """Shape (n_splits, n_test_folds) – fold numbers used as test per split.""" | |
| return np.array( | |
| list(itertools.combinations(np.arange(self.n_folds), self.n_test_folds)) | |
| ).reshape(-1, self.n_test_folds) | |
| def binary_train_test_sets(self) -> np.ndarray: | |
| """Shape (n_folds, n_splits) – 0 = train fold, 1 = test fold.""" | |
| mat = np.zeros((self.n_folds, self.n_splits)) | |
| mat[self.test_set_index, np.arange(self.n_splits)[:, np.newaxis]] = 1 | |
| return mat | |
| def recombined_paths(self) -> np.ndarray: | |
| """ | |
| Shape (n_folds, n_test_paths) – split index in which each fold appears | |
| as a test set, ordered by path. Used to reconstruct backtest paths.""" | |
| return np.argwhere(self.binary_train_test_sets == 1)[:, 1].reshape( | |
| self.n_folds, -1 | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Path IDs | |
| # ------------------------------------------------------------------ | |
| def get_path_ids(self) -> np.ndarray: | |
| """Return the path id of each test set in each split. | |
| Returns | |
| ------- | |
| path_ids : ndarray of shape (n_splits, n_test_folds) | |
| ``path_ids[i, j]`` is the path index that the j-th test fold of | |
| split *i* contributes to. | |
| """ | |
| recombined = self.recombined_paths | |
| path_ids = np.zeros((self.n_splits, self.n_test_folds), dtype=int) | |
| for i in range(self.n_splits): | |
| for j in range(self.n_test_folds): | |
| path_ids[i, j] = int(np.argwhere(recombined == i)[j][1]) | |
| return path_ids | |
| # ------------------------------------------------------------------ | |
| # Core split | |
| # ------------------------------------------------------------------ | |
| def split( | |
| self, | |
| X, | |
| y=None, | |
| groups=None, | |
| ) -> Iterator[Tuple[np.ndarray, List[np.ndarray]]]: | |
| """ | |
| Generate train/test index arrays for each combinatorial split. | |
| Each yielded training set has already been: | |
| 1. **Purged** (event-based) – observations whose ``t1`` window overlaps | |
| any test-set event window are removed via :func:`ml_get_train_times`. | |
| 2. **Embargoed** (positional) – the first ``embargo_size`` training | |
| observations immediately following the end of each contiguous test | |
| block are removed. | |
| Parameters | |
| ---------- | |
| X : array-like of shape (n_samples, n_features) | |
| Must share its index with ``self.t1``. | |
| y : array-like of shape (n_samples,), optional | |
| groups : ignored | |
| Yields | |
| ------ | |
| train_index : ndarray | |
| Row indices for the purged + embargoed training set. | |
| test_index_list : list of ndarray | |
| One array of row indices per test fold, in ascending fold order. | |
| """ | |
| X, y = sku.indexable(X, y) | |
| n_samples = X.shape[0] | |
| # ---- validate X / t1 alignment ---- | |
| x_index = X.index if hasattr(X, "index") else pd.RangeIndex(n_samples) | |
| if len(x_index) != len(self.t1) or not x_index.equals(self.t1.index): | |
| raise ValueError( | |
| "X and t1 must share the same index " | |
| "(same length and identical index labels)." | |
| ) | |
| embargo_size = int(n_samples * self.pct_embargo) | |
| # Map every observation to a fold id (0 … n_folds-1) | |
| fold_index_num = np.arange(n_samples) // (n_samples // self.n_folds) | |
| fold_index_num[fold_index_num == self.n_folds] = self.n_folds - 1 | |
| # Pre-compute positional arrays for each fold | |
| fold_pos: dict = { | |
| fid: np.argwhere(fold_index_num == fid).reshape(-1) | |
| for fid in range(self.n_folds) | |
| } | |
| # Observation-role matrix: 0=train, 1=test, -1=excluded | |
| index_train_test = np.zeros((n_samples, self.n_splits), dtype=np.int8) | |
| test_set_index = self.test_set_index # (n_splits, n_test_folds) | |
| recombined = self.recombined_paths # (n_folds, n_test_paths) | |
| for i in range(self.n_splits): | |
| # 1. Mark test observations | |
| for fid in test_set_index[i]: | |
| index_train_test[fold_pos[fid], i] = 1 | |
| # 2. Event-based purging via t1 | |
| test_pos = np.argwhere(index_train_test[:, i] == 1).reshape(-1) | |
| train_pos = np.argwhere(index_train_test[:, i] == 0).reshape(-1) | |
| t1_test = pd.Series( | |
| self.t1.iloc[test_pos].values, | |
| index=self.t1.index[test_pos], | |
| ) | |
| t1_train_candidate = pd.Series( | |
| self.t1.iloc[train_pos].values, | |
| index=self.t1.index[train_pos], | |
| ) | |
| t1_purged = ml_get_train_times(t1_train_candidate, t1_test) | |
| surviving = set(t1_purged.index) | |
| purged_pos = np.array( | |
| [p for p in train_pos if self.t1.index[p] not in surviving], | |
| dtype=int, | |
| ) | |
| if purged_pos.size: | |
| index_train_test[purged_pos, i] = -1 | |
| # 3. Positional embargo after each contiguous test block | |
| if embargo_size > 0: | |
| test_mask = index_train_test[:, i] == 1 | |
| # Block-end: test obs followed by non-test obs (or end of series) | |
| block_ends = np.where( | |
| test_mask & ~np.append(test_mask[1:], False) | |
| )[0] | |
| for end_pos in block_ends: | |
| for ep in range(end_pos + 1, min(end_pos + 1 + embargo_size, n_samples)): | |
| if index_train_test[ep, i] == 0: | |
| index_train_test[ep, i] = -1 | |
| # 4. Yield | |
| train_index = np.argwhere(index_train_test[:, i] == 0).reshape(-1) | |
| test_index_list = [ | |
| fold_pos[fid] for fid, _ in np.argwhere(recombined == i) | |
| ] | |
| yield train_index, test_index_list | |
| # Store after all splits are generated | |
| self.index_train_test_ = index_train_test | |
| self._fold_index_num = fold_index_num | |
| # ------------------------------------------------------------------ | |
| # Prediction recombination | |
| # ------------------------------------------------------------------ | |
| def recombine_test_predictions( | |
| self, | |
| all_predictions: List[np.ndarray], | |
| ) -> List[np.ndarray]: | |
| """ | |
| Recombine per-split predictions into complete backtest paths. | |
| Each path covers every test observation exactly once, assigned to | |
| paths via :meth:`get_path_ids`. | |
| Parameters | |
| ---------- | |
| all_predictions : list of ndarray, length ``n_splits`` | |
| ``all_predictions[i]`` contains model predictions for split *i*, | |
| ordered to match ``np.concatenate(test_index_list)`` from | |
| ``split()``. | |
| Returns | |
| ------- | |
| paths : list of ndarray, length ``n_test_paths`` | |
| Each array holds predictions for all test observations on that | |
| path, ordered by their original row position in ``X``. | |
| """ | |
| if not hasattr(self, "index_train_test_") or not hasattr(self, "_fold_index_num"): | |
| raise RuntimeError( | |
| "Exhaust the `split(X)` generator fully before calling " | |
| "`recombine_test_predictions`." | |
| ) | |
| path_ids = self.get_path_ids() # (n_splits, n_test_folds) | |
| test_set_index = self.test_set_index # (n_splits, n_test_folds) | |
| # path_store[p] : {row_position -> [prediction, ...]} | |
| path_store: List[dict] = [{} for _ in range(self.n_test_paths)] | |
| for i, (preds, fold_ids, pid_row) in enumerate( | |
| zip(all_predictions, test_set_index, path_ids) | |
| ): | |
| split_preds = np.asarray(preds) | |
| # Test observations for this split in positional order | |
| test_pos_all = np.argwhere(self.index_train_test_[:, i] == 1).reshape(-1) | |
| offset = 0 | |
| for j, fold_id in enumerate(fold_ids): | |
| path_id = int(pid_row[j]) | |
| fold_size = int((self._fold_index_num == fold_id).sum()) | |
| chunk_pos = test_pos_all[offset: offset + fold_size] | |
| chunk_preds = split_preds[offset: offset + fold_size] | |
| for pos, pred_val in zip(chunk_pos, chunk_preds): | |
| path_store[path_id].setdefault(int(pos), []).append(float(pred_val)) | |
| offset += fold_size | |
| paths = [] | |
| for store in path_store: | |
| sorted_keys = sorted(store.keys()) | |
| paths.append(np.array([np.mean(store[k]) for k in sorted_keys])) | |
| return paths | |
| # ------------------------------------------------------------------ | |
| # Summary | |
| # ------------------------------------------------------------------ | |
| def summary(self, X) -> pd.Series: | |
| """ | |
| Return a human-readable summary of the CV configuration. | |
| Parameters | |
| ---------- | |
| X : array-like of shape (n_samples, n_features) | |
| Returns | |
| ------- | |
| pd.Series | |
| """ | |
| n_obs = np.asarray(X).shape[0] | |
| return pd.Series( | |
| { | |
| "Number of Observations": n_obs, | |
| "Total Number of Folds": self.n_folds, | |
| "Number of Test Folds": self.n_test_folds, | |
| "Embargo Size (obs)": int(n_obs * self.pct_embargo), | |
| "Average Training Size": int( | |
| _avg_train_size(n_obs, self.n_folds, self.n_test_folds) | |
| ), | |
| "Number of Test Paths": self.n_test_paths, | |
| "Number of Training Combinations": self.n_splits, | |
| } | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Visualization | |
| # ------------------------------------------------------------------ | |
| def plot_train_test_folds(self) -> go.Figure: | |
| """Plot the train/test fold locations.""" | |
| values = self.binary_train_test_sets | |
| fill_color = np.where(values == 0, "blue", "red") | |
| fill_color = fill_color.astype(object) | |
| fill_color = np.insert( | |
| fill_color, 0, np.array(["darkblue" for _ in range(self.n_splits)]), axis=0 | |
| ) | |
| values = np.insert(values, 0, np.arange(self.n_splits), axis=0) | |
| fig = go.Figure( | |
| data=[ | |
| go.Table( | |
| header=dict( | |
| values=["Train Combinations"] | |
| + [f"Fold {i}" for i in range(self.n_folds)], | |
| fill_color="darkblue", | |
| font=dict(color="white"), | |
| align="left", | |
| ), | |
| cells=dict( | |
| values=values, | |
| font=dict(color="white"), | |
| fill_color=fill_color, | |
| line_color="grey", | |
| align="left", | |
| ), | |
| ) | |
| ] | |
| ) | |
| fig.update_layout(title="Split Train (0) /Test (1) Folds per Combination") | |
| return fig | |
| def plot_train_test_index(self, X) -> go.Figure: | |
| """ | |
| Plot the training and test indices for each split by assigning ``0`` to | |
| training, ``1`` to test, and ``-1`` to purged or embargoed observations. | |
| Each column of the resulting table corresponds to one of the | |
| ``n_splits`` combinatorial train/test splits; each row corresponds to | |
| one observation in ``X``. The colour encoding is: | |
| * **Blue** (0) – observation is in the training set for that split. | |
| * **Red** (1) – observation is in the test set for that split. | |
| * **Green** (−1) – observation has been excluded from the training set | |
| due to purging (event-label overlap with the test window) or | |
| embargoing (positional buffer immediately after the test block). | |
| Parameters | |
| ---------- | |
| X : array-like of shape (n_samples, n_features) | |
| Must share its index with ``self.t1``. Only the shape and index | |
| are used; the feature values are not accessed. | |
| Returns | |
| ------- | |
| go.Figure | |
| An interactive Plotly table figure. | |
| Notes | |
| ----- | |
| This method requires ``index_train_test_`` to be fully populated | |
| across **all** ``n_splits`` columns before it can render a correct | |
| visualisation. ``index_train_test_`` is built incrementally inside | |
| the ``split()`` generator and is only finalised once the generator is | |
| fully exhausted. | |
| To guarantee this, the method checks whether ``index_train_test_`` | |
| already exists (e.g. because ``CPCVAnalyzer.fit_predict`` or a manual | |
| ``list(self.split(X))`` call has already run). If it does not exist, | |
| the full generator is exhausted here automatically. The previous | |
| behaviour of calling ``next(self.split(X))`` — which populated only | |
| the first column — has been removed, as it produced an incorrect | |
| visualisation for all splits after the first. | |
| """ | |
| if not hasattr(self, "index_train_test_"): | |
| # Exhaust the full generator so every column of index_train_test_ | |
| # is populated before we attempt to render the table. | |
| for _ in self.split(X): | |
| pass | |
| n_samples = X.shape[0] | |
| cond = [ | |
| self.index_train_test_ == -1, | |
| self.index_train_test_ == 0, | |
| self.index_train_test_ == 1, | |
| ] | |
| values = self.index_train_test_.T | |
| values = np.insert(values, 0, np.arange(n_samples), axis=0) | |
| fill_color = np.select(cond, ["green", "blue", "red"], default="green").T | |
| fill_color = fill_color.astype(object) | |
| fill_color = np.insert( | |
| fill_color, 0, np.array(["darkblue" for _ in range(n_samples)]), axis=0 | |
| ) | |
| fig = go.Figure( | |
| data=[ | |
| go.Table( | |
| header=dict( | |
| values=["observations"] | |
| + [f"Split {i}" for i in range(self.n_splits)], | |
| fill_color="darkblue", | |
| font=dict(color="white"), | |
| align="left", | |
| ), | |
| cells=dict( | |
| values=values, | |
| font=dict(color="white"), | |
| fill_color=fill_color, | |
| line_color="grey", | |
| align="left", | |
| ), | |
| ) | |
| ] | |
| ) | |
| fig.update_layout( | |
| title="Train (0), Test (1) and Purge/Embargo (-1) observations per splits" | |
| ) | |
| return fig | |
| # --------------------------------------------------------------------------- | |
| # Optimal-folds search | |
| # --------------------------------------------------------------------------- | |
| def optimal_folds_number( | |
| n_observations: int, | |
| target_train_size: int, | |
| target_n_test_paths: int, | |
| weight_train_size: float = 1.0, | |
| weight_n_test_paths: float = 1.0, | |
| ) -> Tuple[int, int]: | |
| r"""Find the ``(n_folds, n_test_folds)`` pair that best matches the targets. | |
| Minimises the weighted relative distance: | |
| .. math:: | |
| \text{cost}(x,y) = | |
| w_f \left|\frac{f(x,y) - f_{\text{target}}}{f_{\text{target}}}\right| | |
| + w_g \left|\frac{g(x,y) - g_{\text{target}}}{g_{\text{target}}}\right| | |
| where :math:`f` is the average training size and :math:`g` is the number | |
| of test paths. | |
| Parameters | |
| ---------- | |
| n_observations : int | |
| target_train_size : int | |
| target_n_test_paths : int | |
| weight_train_size : float, default=1.0 | |
| weight_n_test_paths : float, default=1.0 | |
| Returns | |
| ------- | |
| n_folds : int | |
| n_test_folds : int | |
| """ | |
| def _cost(x: int, y: int) -> float: | |
| n_paths = _n_test_paths(n_folds=x, n_test_folds=y) | |
| avg_tr = _avg_train_size(n_observations, x, y) | |
| return ( | |
| weight_n_test_paths * abs(n_paths - target_n_test_paths) / target_n_test_paths | |
| + weight_train_size * abs(avg_tr - target_train_size) / target_train_size | |
| ) | |
| costs, res = [], [] | |
| for n_folds in range(3, n_observations + 1): | |
| cutoff = None | |
| for n_test_folds in range(2, n_folds): | |
| if cutoff is None or n_folds - n_test_folds <= cutoff: | |
| c = _cost(n_folds, n_test_folds) | |
| costs.append(c) | |
| res.append((n_folds, n_test_folds)) | |
| if cutoff is None and c > 1e5: | |
| cutoff = n_test_folds | |
| return res[int(np.argmin(costs))] | |
| # --------------------------------------------------------------------------- | |
| # CPCVAnalyzer — module-level worker functions | |
| # (must be at module scope so joblib can pickle them across processes) | |
| # --------------------------------------------------------------------------- | |
| def _fit_predict_fold( | |
| estimator, | |
| X: pd.DataFrame, | |
| y: pd.Series, | |
| train_idx: np.ndarray, | |
| test_idx: np.ndarray, # flat array — caller concatenates test_index_list | |
| fold_idx: int, | |
| sample_weight: Optional[pd.Series] = None, | |
| ) -> Tuple[int, np.ndarray, np.ndarray]: | |
| """ | |
| Train one fold and return (fold_idx, test_idx, predictions). | |
| Returns the fold index so that results can be reordered after parallel | |
| execution regardless of completion order. | |
| """ | |
| model = clone(estimator) | |
| fit_kwargs = {} | |
| if sample_weight is not None: | |
| fit_kwargs["sample_weight"] = sample_weight.iloc[train_idx] | |
| model.fit(X.iloc[train_idx], y.iloc[train_idx], **fit_kwargs) | |
| if hasattr(model, "predict_proba"): | |
| preds = model.predict_proba(X.iloc[test_idx])[:, 1] | |
| else: | |
| preds = model.predict(X.iloc[test_idx]) | |
| return fold_idx, test_idx, preds | |
| def _apply_bet_method_static( | |
| method: str, | |
| probs: pd.Series, | |
| sides: pd.Series, | |
| num_classes: int, | |
| ): | |
| """ | |
| Apply a bet-sizing method and return a signal Series. | |
| Extracted to module scope so it is picklable by joblib workers. | |
| """ | |
| from ..bet_sizing.ch10_snippets import get_signal # project-local import | |
| if method == "sigmoid": | |
| return get_signal(probs, num_classes, pred=sides) | |
| elif method == "power": | |
| conviction = get_signal(probs, num_classes) | |
| return sides * (conviction ** 2) | |
| elif method == "binary": | |
| return sides * (probs > 0.5).astype(float) | |
| raise ValueError(f"Unknown bet method: {method!r}") | |
| def _compute_path_metrics( | |
| path_id: int, | |
| path_probs: pd.Series, | |
| method: str, | |
| primary_sides: pd.Series, | |
| close_index: pd.Index, | |
| t1: pd.Series, | |
| log_returns_values: np.ndarray, | |
| num_classes: int, | |
| step_size: float, | |
| ) -> Optional[dict]: | |
| """ | |
| Compute all performance metrics for one (path, method) combination. | |
| Module-level so joblib can serialise it. Returns None when there are | |
| insufficient OOS observations to produce meaningful statistics. | |
| """ | |
| clean_probs = path_probs.dropna() | |
| if clean_probs.empty: | |
| return None | |
| clean_sides = primary_sides.loc[clean_probs.index] | |
| signals = _apply_bet_method_static(method, clean_probs, clean_sides, num_classes) | |
| if step_size > 0: | |
| from ..bet_sizing.ch10_snippets import discrete_signal | |
| signals = discrete_signal(signals, step_size) | |
| # Positional index arrays for the MtM averaging kernel | |
| t0_idx = close_index.get_indexer(signals.index) | |
| t1_idx = close_index.get_indexer(t1.loc[signals.index]) | |
| avg_pos = fill_average_active_sides( | |
| len(close_index), t0_idx, t1_idx, signals.values | |
| ) | |
| full_log_rets = avg_pos * log_returns_values | |
| active_mask = avg_pos != 0 | |
| oos_log_rets = full_log_rets[active_mask] | |
| if len(oos_log_rets) < 5: | |
| return None | |
| # ---- summary statistics (scipy avoids pd.Series allocation overhead) ---- | |
| arith_rets = np.expm1(oos_log_rets) | |
| sr_raw = oos_log_rets.mean() / oos_log_rets.std(ddof=1) | |
| skewness = float(scipy_skew(oos_log_rets)) | |
| excess_kurt = float(scipy_kurt(oos_log_rets, fisher=True)) # excess kurtosis | |
| n = len(oos_log_rets) | |
| # ---- profit factor ---- | |
| neg_sum = arith_rets[arith_rets < 0].sum() | |
| pf = (arith_rets[arith_rets > 0].sum() / abs(neg_sum)) if neg_sum != 0 else 0.0 | |
| # ---- PSR (López de Prado AFML Ch. 14) ---- | |
| var_sr = (1 - skewness * sr_raw + ((excess_kurt - 1) / 4) * sr_raw ** 2) / (n - 1) | |
| psr = float(norm.cdf(sr_raw / np.sqrt(var_sr))) if var_sr > 0 else 0.0 | |
| # ---- max drawdown (log-space for numerical stability) ---- | |
| cum = np.exp(np.cumsum(full_log_rets)) | |
| peak = np.maximum.accumulate(cum) | |
| max_dd = float(np.min(cum / peak - 1)) | |
| # ---- turnover ---- | |
| turnover = float(np.abs(np.diff(avg_pos)).sum()) | |
| return { | |
| "method": method, | |
| "path_id": path_id, | |
| "mtm_profit_factor": pf, | |
| "mtm_sharpe": sr_raw, | |
| "psr": psr, | |
| "max_drawdown": max_dd, | |
| "turnover": turnover, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # CPCVAnalyzer | |
| # --------------------------------------------------------------------------- | |
| class CPCVAnalyzer: | |
| """ | |
| Parallel CPCV execution and MtM performance metric distribution. | |
| Handles the full pipeline from raw model predictions to a distribution of | |
| time-weighted portfolio performance across all combinatorial backtest paths. | |
| Parameters | |
| ---------- | |
| estimator : sklearn-compatible estimator | |
| cv_gen : CombinatorialPurgedCV | |
| The configured CPCV generator. Must have ``t1`` set. | |
| close_prices : pd.Series | |
| Continuous price series for Mark-to-Market calculation. | |
| n_jobs : int, default=-1 | |
| Joblib parallelism for both ``fit_predict`` and ``get_distribution_metrics``. | |
| """ | |
| _BET_METHODS = ("sigmoid", "power", "binary") | |
| def __init__( | |
| self, | |
| estimator, | |
| cv_gen: CombinatorialPurgedCV, | |
| close_prices: pd.Series, | |
| n_jobs: int = -1, | |
| ) -> None: | |
| self.estimator = estimator | |
| self.cv_gen = cv_gen | |
| self.close = close_prices | |
| self.n_jobs = n_jobs if n_jobs != -1 else joblib.cpu_count(only_physical_cores=True) | |
| # Pre-compute log returns once; aligned to close_prices index | |
| self.log_returns: pd.Series = np.log(close_prices).diff().shift(-1).fillna(0) | |
| # Set by fit_predict | |
| self._X: Optional[pd.DataFrame] = None | |
| self._split_predictions: Optional[List[np.ndarray]] = None | |
| self._backtest_paths_cache: Optional[List[pd.Series]] = None | |
| # ------------------------------------------------------------------ | |
| # Training | |
| # ------------------------------------------------------------------ | |
| def fit_predict( | |
| self, | |
| X: pd.DataFrame, | |
| y: pd.Series, | |
| sample_weight: Optional[pd.Series] = None, | |
| ) -> pd.Series: | |
| """Train the estimator across all CPCV splits in parallel. | |
| The split generator is exhausted eagerly so that ``cv_gen`` stores | |
| ``index_train_test_`` and ``_fold_index_num`` before any downstream | |
| method reads them. | |
| Returns | |
| ------- | |
| pd.Series | |
| Recombined (bagged) predictions — mean across all paths per timestamp. | |
| """ | |
| self._X = X | |
| self._backtest_paths_cache = None # invalidate on refit | |
| # Exhaust the generator eagerly: this populates cv_gen.index_train_test_ | |
| # and cv_gen._fold_index_num, which recombine_test_predictions requires. | |
| splits = [ | |
| (train, np.concatenate(test_list)) | |
| for train, test_list in self.cv_gen.split(X, y) | |
| ] # length == cv_gen.n_splits | |
| results: List[Tuple[int, np.ndarray, np.ndarray]] = Parallel(n_jobs=self.n_jobs)( | |
| delayed(_fit_predict_fold)( | |
| self.estimator, X, y, train, test, i, sample_weight | |
| ) | |
| for i, (train, test) in enumerate(splits) | |
| ) | |
| # Sort by fold index — Parallel does not guarantee return order | |
| results.sort(key=lambda r: r[0]) | |
| self._split_predictions = [preds for _, _, preds in results] | |
| return self.recombined_predictions | |
| # ------------------------------------------------------------------ | |
| # Derived views (cached) | |
| # ------------------------------------------------------------------ | |
| def backtest_paths(self) -> List[pd.Series]: | |
| """Assemble the ``n_test_paths`` backtest paths from split predictions. | |
| Delegates entirely to ``cv_gen.recombine_test_predictions`` — no | |
| manual fold-index arithmetic here. The result is cached after the | |
| first call and invalidated by ``fit_predict``. | |
| Returns | |
| ------- | |
| list of pd.Series, length ``cv_gen.n_test_paths`` | |
| Each Series covers all observations in the dataset, indexed by | |
| the original timestamps from ``X``. | |
| """ | |
| if self._split_predictions is None: | |
| raise RuntimeError("Call fit_predict before accessing backtest_paths.") | |
| if self._backtest_paths_cache is None: | |
| raw: List[np.ndarray] = self.cv_gen.recombine_test_predictions( | |
| self._split_predictions | |
| ) | |
| self._backtest_paths_cache = [ | |
| pd.Series(path, index=self._X.index, name=f"path_{j}") | |
| for j, path in enumerate(raw) | |
| ] | |
| return self._backtest_paths_cache | |
| def recombined_predictions(self) -> pd.Series: | |
| """Mean prediction across all paths at each timestamp. | |
| Each observation appears in every path exactly once, so this is | |
| equivalent to averaging across splits where the observation was OOS. | |
| """ | |
| return ( | |
| pd.concat(self.backtest_paths, axis=1) | |
| .mean(axis=1) | |
| .rename("recombined_prediction") | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Performance distribution | |
| # ------------------------------------------------------------------ | |
| def get_distribution_metrics( | |
| self, | |
| primary_sides: pd.Series, | |
| num_classes: int = 2, | |
| step_size: float = 0.0, | |
| ) -> pd.DataFrame: | |
| """Calculate MtM performance metrics for all paths and bet-sizing methods. | |
| All ``n_test_paths × 3`` (path, method) pairs are evaluated in parallel | |
| so the wall time is roughly that of the single slowest combination. | |
| Parameters | |
| ---------- | |
| primary_sides : pd.Series | |
| Direction (+1 / -1) from the primary model for meta-labeling. | |
| num_classes : int, default=2 | |
| Number of classes for confidence z-score in ``get_signal``. | |
| step_size : float, default=0.0 | |
| Discretisation step for bet sizes; 0.0 = continuous. | |
| Returns | |
| ------- | |
| pd.DataFrame | |
| MultiIndex ``[method, path_id]`` with columns: | |
| ``mtm_profit_factor``, ``mtm_sharpe``, ``psr``, | |
| ``max_drawdown``, ``turnover``. | |
| """ | |
| # Snapshot heavy attributes once — avoids repeated self-lookups inside workers | |
| close_index = self.close.index | |
| t1 = self.cv_gen.t1 | |
| log_ret_vals = self.log_returns.values | |
| raw_results: List[Optional[dict]] = Parallel(n_jobs=self.n_jobs)( | |
| delayed(_compute_path_metrics)( | |
| path_id=i, | |
| path_probs=path, | |
| method=method, | |
| primary_sides=primary_sides, | |
| close_index=close_index, | |
| t1=t1, | |
| log_returns_values=log_ret_vals, | |
| num_classes=num_classes, | |
| step_size=step_size, | |
| ) | |
| for i, path in enumerate(self.backtest_paths) | |
| for method in self._BET_METHODS | |
| ) | |
| records = [r for r in raw_results if r is not None] | |
| if not records: | |
| return pd.DataFrame( | |
| columns=["mtm_profit_factor", "mtm_sharpe", "psr", "max_drawdown", "turnover"] | |
| ) | |
| return ( | |
| pd.DataFrame(records) | |
| .set_index(["method", "path_id"]) | |
| .sort_index() | |
| ) |