File size: 4,647 Bytes
8ba081b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""Fractional differentiation β€” AFML Ch.5 Β§5.4 (BonusPDF p.46).

Why this module exists
----------------------
Log-returns achieve stationarity but destroy memory: the binomial weights
``(1-B)^d`` collapse to ``[1, -1, 0, 0, ...]`` at ``d=1``. For ``d ∈ (0, 1)``
the weights decay as a long power-law tail, so the series stays stationary
while retaining a long memory of past prices (Table 5.1 in AFML shows most
liquid futures reach ADF stationarity at ``d < 0.6``, and the majority at
``d < 0.3``).

This is a port of AFML Snippets 5.1, 5.3, 5.4 (BonusPDF pp.48, 51, 53).
"""

from __future__ import annotations

import numpy as np
import pandas as pd
from scipy.special import gamma


def get_ffd_weights(d: float, thres: float = 1e-5, max_size: int = 1024) -> np.ndarray:
    """Binomial-series weights for the fractional-differencing operator ``(1-B)^d``.

    Cuts the series off once ``|w_k| < thres``. Uses ``scipy.special.gamma`` for
    a vectorized closed form rather than the recursive loop in AFML Snippet 5.1
    β€” same values, faster and avoids accumulated float error in long series.

    Returns
    -------
    np.ndarray of shape ``(n,)`` ordered from oldest to newest:
        ``[w_{n-1}, w_{n-2}, ..., w_1, w_0]`` so the dot product with
        ``series[t-n+1 : t+1]`` is the differenced value at ``t``.
    """
    k = np.arange(max_size)
    with np.errstate(invalid="ignore", divide="ignore"):
        w = (-1) ** k * gamma(d + 1) / (gamma(k + 1) * gamma(d - k + 1))
    w = np.nan_to_num(w, nan=0.0, posinf=0.0, neginf=0.0)
    cutoff = np.argmax(np.abs(w) < thres) if np.any(np.abs(w) < thres) else max_size
    if cutoff == 0:
        cutoff = max_size
    return w[:cutoff][::-1]


def frac_diff_ffd(series: pd.Series | pd.DataFrame, d: float, thres: float = 1e-5) -> pd.DataFrame:
    """Fixed-width fractional differencing β€” AFML Snippet 5.3 (BonusPDF p.51).

    The fixed-width window keeps weights stable through time (unlike the
    expanding-window variant in Snippet 5.2 which downweights early observations).
    """
    if isinstance(series, pd.Series):
        series = series.to_frame()
    w = get_ffd_weights(d, thres=thres)  # shape (width+1,)
    width = len(w) - 1
    out = {}
    for col in series.columns:
        s = series[[col]].ffill().dropna()
        if len(s) <= width:
            out[col] = pd.Series(index=s.index[width:], dtype=float)
            continue
        values = s[col].to_numpy()
        # Vectorized: build a (n_out, width+1) sliding-window matrix and dot with w
        from numpy.lib.stride_tricks import sliding_window_view
        windows = sliding_window_view(values, width + 1)
        diffed = windows @ w
        out[col] = pd.Series(diffed, index=s.index[width:])
    return pd.concat(out, axis=1)


def find_min_d(series: pd.Series, d_range=(0.0, 1.0), n_steps: int = 11, thres: float = 1e-5) -> pd.DataFrame:
    """Sweep ``d`` and return ADF stat + correlation β€” AFML Snippet 5.4 (BonusPDF p.53).

    Use to pick the smallest ``d`` for which the FFD-differenced log-price passes
    the ADF stationarity test at 95% (statistic < critical value β‰ˆ -2.86).
    Returns a frame indexed by ``d`` with columns: ``adf_stat, p_value, n_obs,
    crit_95, corr_with_original``.
    """
    from statsmodels.tsa.stattools import adfuller

    log_series = np.log(series.dropna()).to_frame(name=series.name or "value")
    results = {}
    for d in np.linspace(d_range[0], d_range[1], n_steps):
        diffed = frac_diff_ffd(log_series, d, thres=thres).dropna()
        if len(diffed) < 50:
            continue
        col = diffed.columns[0]
        adf = adfuller(diffed[col], maxlag=1, regression="c", autolag=None)
        aligned = log_series.loc[diffed.index, col]
        corr = float(aligned.corr(diffed[col]))
        results[round(d, 3)] = {
            "adf_stat": adf[0],
            "p_value": adf[1],
            "n_obs": adf[3],
            "crit_95": adf[4]["5%"],
            "corr_with_original": corr,
        }
    return pd.DataFrame(results).T.rename_axis("d")


def rolling_zscore(series: pd.Series, window: int = 252, min_periods: int | None = None) -> pd.Series:
    """Rolling z-score with leak-free statistics (uses only the trailing window).

    Stronger than a single fit-on-train ``StandardScaler`` because regime shifts
    don't carry stale means forward into the test set.
    """
    min_periods = min_periods or max(window // 4, 20)
    mu = series.rolling(window=window, min_periods=min_periods).mean()
    sd = series.rolling(window=window, min_periods=min_periods).std()
    return (series - mu) / sd.replace(0, np.nan)