File size: 6,737 Bytes
669d6a1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from typing import Union

import numpy as np
import pandas as pd
from numba import njit, prange


def get_period_returns(close: pd.Series, **time_delta_kwargs) -> pd.Series:
    """
    Compute periodic returns for a given time period, robust to non-consecutive trading days.

    This function calculates returns by finding the closing price from a specified
    time duration (days, hours, minutes) in the past. It handles cases where
    the prior period might not be a trading day by using `searchsorted` to find
    the nearest valid previous index.

    :param close: (pd.Series) closing prices, indexed by datetime
    :param time_delta_kwargs: Time components for calculating period returns:
    - **days**: (int) Number of days
    - **hours**: (int) Number of hours
    - **minutes**: (int) Number of minutes
    - **seconds**: (int) Number of seconds
    return: (pd.Series) Periodic returns (percentage changes), aligned to the prior valid trading period
    """
    # Find previous valid trading day for each date
    prev_idx = close.index.searchsorted(close.index - pd.Timedelta(**time_delta_kwargs))

    # Drop indices that are before the start of the 'close' Series
    prev_idx = prev_idx[prev_idx > 0]

    # Align current and previous closes
    curr_idx = close.index[close.shape[0] - prev_idx.shape[0] :]
    prev_close = close.iloc[prev_idx - 1].values

    ret = close.loc[curr_idx] / prev_close - 1
    return ret


@njit(parallel=True, cache=True)
def rolling_autocorr_numba(data: np.ndarray, lookback: int) -> np.ndarray:
    """
    Computes rolling autocorrelation for a 1D NumPy array using Numba for performance.

    This function calculates the autocorrelation between `data[t]` and `data[t-1]`
    within a rolling window of `lookback` size. It leverages Numba's `njit` and
    `prange` for parallel execution, making it efficient for large datasets.

    Args:
        data: A 1D NumPy array of numerical data (e.g., returns).
        lookback: The size of the rolling window for autocorrelation calculation.

    Returns:
        A NumPy array containing the rolling autocorrelation values.
        The initial `lookback - 1` values will be NaN as there isn't enough data.
    """
    result = np.full(len(data), np.nan)
    for i in prange(lookback - 1, len(data)):
        window = data[i - lookback + 1 : i + 1]
        # [0, 1] extracts the correlation between the two series (not self-correlation)
        result[i] = np.corrcoef(window[:-1], window[1:])[0, 1]
    return result


def get_period_autocorr(
    close: pd.Series, lookback: int = 100, **time_delta_kwargs
) -> pd.Series:
    """
    Estimates rolling periodic autocorrelation of closing prices.

    This function first calculates the periodic returns using `get_period_returns`
    and then computes the rolling autocorrelation of these returns using the
    Numba-optimized `rolling_autocorr_numba` function.

    :param close: (pd.Series) closing prices, indexed by datetime
    :param lookback: (int) The window equivalent of the Simple Moving Average for the Exponentially Weighted Moving
                average calculation (default is 100)
    :param time_delta_kwargs: Time components for calculating period returns:
    - **days**: (int) Number of days
    - **hours**: (int) Number of hours
    - **minutes**: (int) Number of minutes
    - **seconds**: (int) Number of seconds
    return: (pd.Series) of rolling periodic autocorrelation values, indexed by the datetime index of the input `close` Series.
    """
    ret = get_period_returns(close, **time_delta_kwargs)
    acorr = rolling_autocorr_numba(ret.to_numpy(), lookback)
    df0 = pd.Series(acorr, index=ret.index)
    return df0


def get_lagged_returns(
    prices: Union[pd.Series, pd.DataFrame],
    lags: list,
    nperiods: int = 3,
) -> pd.DataFrame:
    """
    Generates a DataFrame of various lagged returns and optionally forward target returns.

    This function calculates returns for specified lag periods, clips extreme
    values based on quantiles, and then creates additional lagged features
    (e.g., `returns_X_lag_Y`). It can also generate forward returns
    as a target variable.

    Args:
        prices: A pandas Series or DataFrame of close prices. If a Series, it's
                treated as a single instrument. If a DataFrame, each column
                represents a different instrument or asset. The index should
                be datetime-based.
        lags: A list of integers, where each integer represents a lag period
              for which returns should be calculated (e.g., `[1, 5, 20]` for
              daily, weekly, and monthly returns).
        nperiods: The number of additional lagged versions to create for each
                 return series. For example, if `nperiods=3` and `lags=[1]`,
                 it will create `returns_1_lag_1`, `returns_1_lag_2`,
                 `returns_1_lag_3`. Defaults to 3.

    Returns:
        A pandas DataFrame containing the calculated returns and their lagged versions.
        If `target` is True, it will also include forward target returns.
    """
    q = 0.0001  # Quantile cut-off for winsorizing extreme prices
    df = pd.DataFrame()

    price_columns = (
        [(prices.name or "price", prices)]
        if isinstance(prices, pd.Series)
        else [(col, prices[col]) for col in prices.columns]
    )

    for col_name, price_series in price_columns:
        prefix = "" if len(price_columns) == 1 else f"{col_name}_"
        for lag in lags:
            # Calculate 1-period geometric mean return of the lag period and
            # winsorize extreme values by clipping.
            returns = price_series.pct_change(lag)
            returns = returns.clip(lower=returns.quantile(q), upper=returns.quantile(1 - q))
            df[f"{prefix}returns_{lag}"] = returns.add(1).pow(1 / lag).sub(1)

    # Create additional lagged versions of the calculated returns
    for t in range(1, nperiods + 1):
        for col_name, _ in price_columns:
            prefix = "" if len(price_columns) == 1 else f"{col_name}_"
            for lag in lags:
                df[f"{prefix}returns_{lag}_lag_{t}"] = df[
                    f"{prefix}returns_{lag}"
                ].shift(t * lag)

    df.rename(columns={"returns_1": "returns"}, inplace=True)
    return df


def get_return_dist_features(close, window=10):
    """Distribution of log-return features"""
    df = pd.DataFrame(index=close.index)
    ret = np.log(close).diff()
    sma_returns = ret.rolling(window, min_periods=3)
    df["returns_norm"] = (ret - sma_returns.mean()) / sma_returns.std()
    df[f"returns_skew"] = sma_returns.skew()
    df[f"returns_kurt"] = sma_returns.kurt()
    return df