from typing import Union import numpy as np import pandas as pd from numba import njit, prange def get_period_returns(close: pd.Series, **time_delta_kwargs) -> pd.Series: """ Compute periodic returns for a given time period, robust to non-consecutive trading days. This function calculates returns by finding the closing price from a specified time duration (days, hours, minutes) in the past. It handles cases where the prior period might not be a trading day by using `searchsorted` to find the nearest valid previous index. :param close: (pd.Series) closing prices, indexed by datetime :param time_delta_kwargs: Time components for calculating period returns: - **days**: (int) Number of days - **hours**: (int) Number of hours - **minutes**: (int) Number of minutes - **seconds**: (int) Number of seconds return: (pd.Series) Periodic returns (percentage changes), aligned to the prior valid trading period """ # Find previous valid trading day for each date prev_idx = close.index.searchsorted(close.index - pd.Timedelta(**time_delta_kwargs)) # Drop indices that are before the start of the 'close' Series prev_idx = prev_idx[prev_idx > 0] # Align current and previous closes curr_idx = close.index[close.shape[0] - prev_idx.shape[0] :] prev_close = close.iloc[prev_idx - 1].values ret = close.loc[curr_idx] / prev_close - 1 return ret @njit(parallel=True, cache=True) def rolling_autocorr_numba(data: np.ndarray, lookback: int) -> np.ndarray: """ Computes rolling autocorrelation for a 1D NumPy array using Numba for performance. This function calculates the autocorrelation between `data[t]` and `data[t-1]` within a rolling window of `lookback` size. It leverages Numba's `njit` and `prange` for parallel execution, making it efficient for large datasets. Args: data: A 1D NumPy array of numerical data (e.g., returns). lookback: The size of the rolling window for autocorrelation calculation. Returns: A NumPy array containing the rolling autocorrelation values. The initial `lookback - 1` values will be NaN as there isn't enough data. """ result = np.full(len(data), np.nan) for i in prange(lookback - 1, len(data)): window = data[i - lookback + 1 : i + 1] # [0, 1] extracts the correlation between the two series (not self-correlation) result[i] = np.corrcoef(window[:-1], window[1:])[0, 1] return result def get_period_autocorr( close: pd.Series, lookback: int = 100, **time_delta_kwargs ) -> pd.Series: """ Estimates rolling periodic autocorrelation of closing prices. This function first calculates the periodic returns using `get_period_returns` and then computes the rolling autocorrelation of these returns using the Numba-optimized `rolling_autocorr_numba` function. :param close: (pd.Series) closing prices, indexed by datetime :param lookback: (int) The window equivalent of the Simple Moving Average for the Exponentially Weighted Moving average calculation (default is 100) :param time_delta_kwargs: Time components for calculating period returns: - **days**: (int) Number of days - **hours**: (int) Number of hours - **minutes**: (int) Number of minutes - **seconds**: (int) Number of seconds return: (pd.Series) of rolling periodic autocorrelation values, indexed by the datetime index of the input `close` Series. """ ret = get_period_returns(close, **time_delta_kwargs) acorr = rolling_autocorr_numba(ret.to_numpy(), lookback) df0 = pd.Series(acorr, index=ret.index) return df0 def get_lagged_returns( prices: Union[pd.Series, pd.DataFrame], lags: list, nperiods: int = 3, ) -> pd.DataFrame: """ Generates a DataFrame of various lagged returns and optionally forward target returns. This function calculates returns for specified lag periods, clips extreme values based on quantiles, and then creates additional lagged features (e.g., `returns_X_lag_Y`). It can also generate forward returns as a target variable. Args: prices: A pandas Series or DataFrame of close prices. If a Series, it's treated as a single instrument. If a DataFrame, each column represents a different instrument or asset. The index should be datetime-based. lags: A list of integers, where each integer represents a lag period for which returns should be calculated (e.g., `[1, 5, 20]` for daily, weekly, and monthly returns). nperiods: The number of additional lagged versions to create for each return series. For example, if `nperiods=3` and `lags=[1]`, it will create `returns_1_lag_1`, `returns_1_lag_2`, `returns_1_lag_3`. Defaults to 3. Returns: A pandas DataFrame containing the calculated returns and their lagged versions. If `target` is True, it will also include forward target returns. """ q = 0.0001 # Quantile cut-off for winsorizing extreme prices df = pd.DataFrame() price_columns = ( [(prices.name or "price", prices)] if isinstance(prices, pd.Series) else [(col, prices[col]) for col in prices.columns] ) for col_name, price_series in price_columns: prefix = "" if len(price_columns) == 1 else f"{col_name}_" for lag in lags: # Calculate 1-period geometric mean return of the lag period and # winsorize extreme values by clipping. returns = price_series.pct_change(lag) returns = returns.clip(lower=returns.quantile(q), upper=returns.quantile(1 - q)) df[f"{prefix}returns_{lag}"] = returns.add(1).pow(1 / lag).sub(1) # Create additional lagged versions of the calculated returns for t in range(1, nperiods + 1): for col_name, _ in price_columns: prefix = "" if len(price_columns) == 1 else f"{col_name}_" for lag in lags: df[f"{prefix}returns_{lag}_lag_{t}"] = df[ f"{prefix}returns_{lag}" ].shift(t * lag) df.rename(columns={"returns_1": "returns"}, inplace=True) return df def get_return_dist_features(close, window=10): """Distribution of log-return features""" df = pd.DataFrame(index=close.index) ret = np.log(close).diff() sma_returns = ret.rolling(window, min_periods=3) df["returns_norm"] = (ret - sma_returns.mean()) / sma_returns.std() df[f"returns_skew"] = sma_returns.skew() df[f"returns_kurt"] = sma_returns.kurt() return df