| """ |
| features.py |
| =========== |
| Production-grade feature engineering for financial time-series models. |
| |
| Implements the three feature families from: |
| "Deep Learning for Financial Time Series" (VLSTM / PsLSTM benchmark paper). |
| |
| Original Features (close-only) |
| ------------------------------- |
| 1. EWMA Volatility Ο_t β Eqs. 16β17 |
| 2. Multi-Horizon Normalized Returns r_norm(t,h) β Eq. 18 |
| 3. Multi-Scale MACD Momentum Signals β Eqs. 19β21 |
| 4. Volatility Scaling Factor 1/Ο_t β Eq. 22 |
| 5. Normalized Return Target (training only) β Eq. 23 |
| |
| Added Features (OHLC-based, all NO_SCALE bucket) |
| ------------------------------------------------- |
| 6. Kaufman Efficiency Ratio feat_efficiency [-1, +1] |
| 7. Internal Close Position feat_icp [-1, +1] |
| 8. RSI (centered) feat_momentum_rsi [-1, +1] |
| 9. Directional Vol Asymmetry feat_vol_asymmetry [-1, +1] |
| 10. Local Structure Position feat_local_structure [-1, +1] |
| 11. Session Time Sin feat_session_sin [-1, +1] |
| 12. Session Time Cos feat_session_cos [-1, +1] |
| 13. Vol Squeeze (ATR ratio) feat_vol_squeeze [>0, ROBUST] |
| |
| Column count: 13 close-only + 8 OHLC-based = 21 total model inputs |
| (vs_factor is ROBUST; feat_vol_squeeze is ROBUST; all others NO_SCALE) |
| """ |
|
|
| from __future__ import annotations |
| import logging |
| import math |
| import os |
| from dataclasses import dataclass, field |
| from typing import Optional, Tuple |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| logger = logging.getLogger(__name__) |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s", |
| ) |
|
|
| _EPS = 1e-10 |
|
|
|
|
| |
| |
| |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class FeatureConfig: |
| """ |
| Central configuration for all feature engineering hyperparameters. |
| |
| Original paper fields |
| --------------------- |
| ewma_span : int |
| Span for the EWMA volatility estimator (Ο_t). |
| Ξ± = 2 / (span + 1). Larger span β slower regime adaptation. |
| |
| return_horizons : list[int] |
| Lookback windows (bars) for multi-horizon normalised return features. |
| |
| macd_pairs : list[tuple[int, int]] |
| (short_span, long_span) pairs for multi-scale MACD signals. |
| |
| macd_price_std_window : int |
| Rolling window for Step-2 price-scale normalisation. |
| |
| macd_signal_std_window : int |
| Rolling window for Step-3 regime normalisation. |
| |
| target_clip : float |
| Symmetric clip bound for the normalised return target (Β±20). |
| |
| New OHLC feature fields |
| ----------------------- |
| momentum_period : int |
| Lookback for KER (efficiency ratio) and RSI. Default 14. |
| |
| rsi_period : int |
| Separate RSI period if you want it decoupled from momentum_period. |
| If None, falls back to momentum_period. |
| |
| vol_asym_window : int |
| Rolling window for directional volatility asymmetry. Default 20. |
| |
| icp_period : int |
| Smoothing window for Internal Close Position. Default 14. |
| |
| local_structure_bars : int |
| Rolling window for local high/low range. Default 65 bars |
| (β 5 trading days on 30-min NIFTY data). |
| |
| vol_squeeze_fast : int |
| Fast ATR window for squeeze ratio. Default 5. |
| |
| vol_squeeze_slow : int |
| Slow ATR window for squeeze ratio. Default 20. |
| |
| atr_period : int |
| ATR period used internally. Must match config.ATR_PERIOD. Default 14. |
| |
| session_open : str |
| Session open time "HH:MM" for cyclic time-of-day encoding. |
| |
| session_close : str |
| Session close time "HH:MM". |
| |
| session_tz : str |
| Timezone string (e.g. "Asia/Kolkata"). |
| |
| add_session_features : bool |
| Whether to compute feat_session_sin / feat_session_cos. |
| Requires a DatetimeIndex. Default True. |
| """ |
|
|
| |
| ewma_span: int = 260 |
|
|
| return_horizons: list[int] = field( |
| default_factory=lambda: [1, 3, 6, 13, 26, 65, 130, 260] |
| ) |
|
|
| macd_pairs: list[tuple[int, int]] = field( |
| default_factory=lambda: [(8, 24), (26, 78), (52, 156)] |
| ) |
|
|
| macd_price_std_window: int = 260 |
| macd_signal_std_window: int = 3276 |
| target_clip: float = 20.0 |
|
|
| |
| momentum_period: int = 26 |
| rsi_period: Optional[int] = 14 |
| vol_asym_window: int = 65 |
| icp_period: int = 13 |
| local_structure_bars: int = 65 |
| vol_squeeze_fast: int = 5 |
| vol_squeeze_slow: int = 26 |
| atr_period: int = 14 |
|
|
| session_open: str = "09:15" |
| session_close: str = "15:30" |
| session_tz: str = "Asia/Kolkata" |
| add_session_features: bool = True |
|
|
| use_talib: bool = False |
|
|
| def __post_init__(self) -> None: |
| if self.ewma_span < 1: |
| raise ValueError(f"ewma_span must be >= 1, got {self.ewma_span}") |
| if not self.return_horizons: |
| raise ValueError("return_horizons must not be empty.") |
| for s, l in self.macd_pairs: |
| if s >= l: |
| raise ValueError( |
| f"MACD short_span ({s}) must be < long_span ({l})." |
| ) |
| if self.target_clip <= 0: |
| raise ValueError("target_clip must be positive.") |
| if self.momentum_period < 2: |
| raise ValueError("momentum_period must be >= 2.") |
| if self.vol_asym_window < 2: |
| raise ValueError("vol_asym_window must be >= 2.") |
| if self.icp_period < 1: |
| raise ValueError("icp_period must be >= 1.") |
| if self.local_structure_bars < 2: |
| raise ValueError("local_structure_bars must be >= 2.") |
| if self.vol_squeeze_fast < 1 or self.vol_squeeze_slow < 2: |
| raise ValueError("vol_squeeze_fast >= 1 and vol_squeeze_slow >= 2.") |
| if self.vol_squeeze_fast >= self.vol_squeeze_slow: |
| raise ValueError("vol_squeeze_fast must be < vol_squeeze_slow.") |
|
|
| if self.atr_period < 1: |
| raise ValueError( |
| f"atr_period must be >= 1, got {self.atr_period}." |
| ) |
| if self.macd_price_std_window < 2: |
| raise ValueError( |
| f"macd_price_std_window must be >= 2 (std of 1 value is NaN), " |
| f"got {self.macd_price_std_window}." |
| ) |
| if self.macd_signal_std_window < 2: |
| raise ValueError( |
| f"macd_signal_std_window must be >= 2 (std of 1 value is NaN), " |
| f"got {self.macd_signal_std_window}." |
| ) |
|
|
| @property |
| def effective_rsi_period(self) -> int: |
| return self.rsi_period if self.rsi_period is not None else self.momentum_period |
|
|
| @property |
| def macd_col_names(self) -> list[str]: |
| """Return the expected column names for the multi-scale MACD features.""" |
| return [f"macd_{s}_{l}" for s, l in self.macd_pairs] |
|
|
|
|
| |
| |
| |
|
|
| def _validate_prices(prices: pd.Series) -> None: |
| if not isinstance(prices, pd.Series): |
| raise TypeError(f"Expected pd.Series, got {type(prices).__name__}.") |
| if prices.empty: |
| raise ValueError("Price series is empty.") |
| non_null = prices.dropna() |
| if non_null.empty: |
| raise ValueError("Price series contains only NaN values.") |
| if (non_null <= 0).any(): |
| raise ValueError( |
| "All prices must be strictly positive; " |
| f"found {(non_null <= 0).sum()} non-positive value(s)." |
| ) |
| n_nan = prices.isna().sum() |
| if n_nan > 0: |
| logger.warning( |
| "Price series '%s' has %d NaN value(s). " |
| "Features will propagate NaN at those positions.", |
| prices.name, |
| n_nan, |
| ) |
|
|
|
|
| def _validate_ohlc(ohlc: pd.DataFrame) -> None: |
| """Validate an OHLC DataFrame with lowercase column names.""" |
| required = {"open", "high", "low", "close"} |
| missing = required - set(ohlc.columns) |
| if missing: |
| raise ValueError(f"OHLC DataFrame missing columns: {sorted(missing)}") |
| if ohlc.empty: |
| raise ValueError("OHLC DataFrame is empty.") |
|
|
| |
|
|
| |
| price_cols = ohlc[["open", "high", "low", "close"]] |
| non_null_prices = price_cols.dropna() |
| if not (non_null_prices > 0).all(axis=None): |
| bad_counts = (non_null_prices <= 0).sum() |
| raise ValueError( |
| f"OHLC contains non-positive prices: {bad_counts[bad_counts > 0].to_dict()}" |
| ) |
|
|
| |
| inverted = ohlc["high"] < ohlc["low"] |
| n_inverted = inverted.sum() |
| if n_inverted > 0: |
| first_bad = ohlc.index[inverted][0] |
| raise ValueError( |
| f"OHLC has {n_inverted} bar(s) where high < low " |
| f"(first occurrence: {first_bad}). " |
| "Check for data corruption or bad corporate-action adjustment." |
| ) |
|
|
| |
| |
| close_above_high = (ohlc["close"] > ohlc["high"]).sum() |
| close_below_low = (ohlc["close"] < ohlc["low"]).sum() |
| if close_above_high > 0: |
| logger.warning( |
| "_validate_ohlc: %d bar(s) have close > high. " |
| "Possible bad price adjustment. feat_icp will be clipped.", |
| close_above_high, |
| ) |
| if close_below_low > 0: |
| logger.warning( |
| "_validate_ohlc: %d bar(s) have close < low. " |
| "Possible bad price adjustment. feat_icp will be clipped.", |
| close_below_low, |
| ) |
|
|
| |
| n_nan = ohlc[["open", "high", "low", "close"]].isna().sum() |
| total_nan = n_nan.sum() |
| if total_nan > 0: |
| logger.warning( |
| "_validate_ohlc: NaN values found β %s. " |
| "OHLC features will propagate NaN at those positions.", |
| n_nan[n_nan > 0].to_dict(), |
| ) |
| |
|
|
|
|
| |
| |
| |
|
|
| def log_returns(prices: pd.Series) -> pd.Series: |
| """ |
| Compute log returns: r_t = log(P_t / P_{t-1}). |
| |
| Time-additive and approximately symmetric β preferred over simple returns |
| for financial time-series modelling. |
| """ |
| |
| original_index = prices.index |
| arr = np.array(prices.values, dtype=np.float64) |
| |
| log_ret = np.empty_like(arr) |
| log_ret[0] = np.nan |
| |
| with np.errstate(divide='ignore', invalid='ignore'): |
| log_ret[1:] = np.log(arr[1:] / arr[:-1]) |
| |
| return pd.Series(log_ret, index=original_index, name=prices.name) |
|
|
|
|
| def _cumulative_log_return(prices: pd.Series, h: int) -> pd.Series: |
| """h-bar cumulative log return via rolling sum (handles NaN gaps).""" |
| r = log_returns(prices) |
| return r.rolling(window=h, min_periods=h).sum() |
|
|
|
|
| |
| |
| |
|
|
| def _numpy_ewm_mean(arr: np.ndarray, span: int) -> np.ndarray: |
| """Pure numpy EWMA mean β cudf-proof implementation.""" |
| alpha = 2.0 / (span + 1.0) |
| out = np.empty(len(arr), dtype=np.float64) |
| out[0] = arr[0] |
| for i in range(1, len(arr)): |
| if np.isnan(arr[i]): |
| out[i] = np.nan |
| else: |
| out[i] = alpha * arr[i] + (1.0 - alpha) * out[i - 1] |
| return out |
|
|
|
|
| def _ewm_wilder_seeded(arr: np.ndarray, alpha: float, seed_period: int) -> np.ndarray: |
| """ |
| EWM with correct warm-up: seed = simple mean of first `seed_period` valid bars. |
| Carry-forward across NaN gaps. Restart with fresh seed_period mean post-gap. |
| |
| This helper provides the standard 'Wilder' initialization (SMA seed) while |
| preserving the 'skip-NaN' carry-forward logic required for robust |
| financial feature engineering. |
| """ |
| n = len(arr) |
| out = np.full(n, np.nan) |
| valid = np.where(~np.isnan(arr))[0] |
|
|
| if len(valid) < seed_period: |
| return out |
|
|
| i = 0 |
| while i < len(valid): |
| |
| seg_end = i + seed_period |
| if seg_end > len(valid): |
| break |
| seed_idx = valid[i:seg_end] |
| seed_val = np.mean(arr[seed_idx]) |
| out[seed_idx[-1]] = seed_val |
|
|
| |
| prev = seed_idx[-1] |
| for j in range(seed_idx[-1] + 1, n): |
| if np.isnan(arr[j]): |
| out[j] = out[prev] |
| else: |
| out[j] = alpha * arr[j] + (1.0 - alpha) * out[prev] |
| prev = j |
| break |
|
|
| return out |
|
|
|
|
| def ewma_volatility(prices: pd.Series, span: int = 63) -> pd.Series: |
| """ |
| Conditional daily volatility Ο_t via EWMA (paper Eqs. 16β17). |
| |
| Ξ± = 2/(span+1). |
| ΟΒ²_t = Ξ±Β·(r_t β ΞΌ_t)Β² + (1βΞ±)Β·ΟΒ²_{tβ1} |
| |
| NaN gaps (circuit breakers, corporate actions) are handled by |
| carrying forward the last valid EWMA state β never injecting |
| phantom zero-returns into the variance estimator. |
| """ |
| _validate_prices(prices) |
|
|
| |
| try: |
| r_vals = prices.values.get() |
| except AttributeError: |
| r_vals = np.array(prices.values, dtype=np.float64) |
|
|
| |
| with np.errstate(divide='ignore', invalid='ignore'): |
| log_r = np.log(r_vals[1:] / r_vals[:-1]) |
| log_r = np.concatenate([[np.nan], log_r]) |
|
|
| original_index = prices.index |
|
|
| |
| alpha = 2.0 / (span + 1.0) |
| seed_period = min(span, 30) |
| ewma_mean = _ewm_wilder_seeded(log_r, alpha, seed_period) |
| |
| demeaned_sq = np.where(np.isnan(log_r), np.nan, (log_r - ewma_mean) ** 2) |
| ewma_var = _ewm_wilder_seeded(demeaned_sq, alpha, seed_period) |
| |
|
|
| sigma_vals = np.sqrt(ewma_var) |
| sigma_vals[0] = np.nan |
|
|
| |
| sigma = pd.Series( |
| sigma_vals.tolist(), |
| index=original_index, |
| name=f"ewma_vol_span{span}", |
| dtype="float64", |
| ) |
| return sigma |
|
|
|
|
| |
| |
| |
|
|
| def normalized_returns( |
| prices: pd.Series, |
| horizons: Optional[list[int]] = None, |
| span: int = 63, |
| ) -> pd.DataFrame: |
| """ |
| Vol-normalised multi-horizon returns: r_norm(t,h) = r(t,h) / (Ο_t Β· βh). |
| |
| Dimensionless, horizon-invariant, concentrated in [-2, 2]. |
| """ |
| if horizons is None: |
| horizons = FeatureConfig().return_horizons |
| _validate_prices(prices) |
| sigma = ewma_volatility(prices, span=span) |
| out: dict[str, pd.Series] = {} |
| for h in horizons: |
| cum_ret = _cumulative_log_return(prices, h) |
| denom = sigma * np.sqrt(h) |
| with np.errstate(invalid="ignore", divide="ignore"): |
| norm_ret = cum_ret / denom |
| norm_ret = norm_ret.where(denom > _EPS, other=np.nan) |
| out[f"ret_norm_{h}d"] = norm_ret |
| return pd.DataFrame(out, index=prices.index) |
|
|
|
|
| |
| |
| |
|
|
| def macd_signal( |
| prices: pd.Series, |
| short_span: int = 8, |
| long_span: int = 24, |
| price_std_window: int = 63, |
| signal_std_window: int = 252, |
| ) -> pd.Series: |
| """ |
| Three-step normalised MACD: Raw β price-scale β regime normalisation. |
| |
| Result is ~unit-variance, concentrated in [-4, 4]. |
| """ |
| _validate_prices(prices) |
|
|
| |
| try: |
| p_vals = prices.values.get() |
| except AttributeError: |
| p_vals = np.array(prices.values, dtype=np.float64) |
|
|
| alpha_s = 2.0 / (short_span + 1.0) |
| alpha_l = 2.0 / (long_span + 1.0) |
| ewma_s_arr = _ewm_wilder_seeded(p_vals, alpha_s, short_span) |
| ewma_l_arr = _ewm_wilder_seeded(p_vals, alpha_l, short_span) |
|
|
| |
| ewma_s = pd.Series(ewma_s_arr.tolist(), index=prices.index, dtype="float64") |
| ewma_l = pd.Series(ewma_l_arr.tolist(), index=prices.index, dtype="float64") |
|
|
| macd_raw = ewma_s - ewma_l |
|
|
| |
| mp = price_std_window |
| ms = signal_std_window |
|
|
| price_std = prices.rolling(window=price_std_window, min_periods=mp).std() |
| with np.errstate(invalid="ignore", divide="ignore"): |
| q = macd_raw / price_std |
| q = q.where(price_std > _EPS, other=np.nan) |
|
|
| q_std = q.rolling(window=signal_std_window, min_periods=ms).std() |
| with np.errstate(invalid="ignore", divide="ignore"): |
| signal = q / q_std |
| signal = signal.where(q_std > _EPS, other=np.nan) |
| signal.name = f"macd_{short_span}_{long_span}" |
| return signal |
|
|
|
|
| def macd_signals_multi( |
| prices: pd.Series, |
| pairs: Optional[list[tuple[int, int]]] = None, |
| price_std_window: int = 63, |
| signal_std_window: int = 252, |
| ) -> pd.DataFrame: |
| """Compute normalised MACD for multiple (short, long) span pairs.""" |
| if pairs is None: |
| pairs = FeatureConfig().macd_pairs |
| signals: dict[str, pd.Series] = {} |
| for short_span, long_span in pairs: |
| sig = macd_signal( |
| prices, |
| short_span=short_span, |
| long_span=long_span, |
| price_std_window=price_std_window, |
| signal_std_window=signal_std_window, |
| ) |
| signals[sig.name] = sig |
| return pd.DataFrame(signals, index=prices.index) |
|
|
|
|
| |
| |
| |
|
|
| def volatility_scaling_factor(prices: pd.Series, span: int = 63) -> pd.Series: |
| """ |
| 1/Ο_t β used for volatility-targeted position sizing (Eq. 22). |
| |
| Heavily right-skewed β ROBUST scaling bucket in data_loader.py. |
| """ |
| sigma = ewma_volatility(prices, span=span) |
| vs = 1.0 / sigma.where(sigma > _EPS, other=np.nan) |
| vs.name = f"vs_factor_span{span}" |
| return vs |
|
|
|
|
| |
| |
| |
|
|
| def normalized_return_target( |
| prices: pd.Series, |
| span: int = 63, |
| clip_value: float = 20.0, |
| inference_mode: bool = False, |
| _allow_in_inference: bool = False, |
| ) -> pd.Series: |
| """ |
| Clipped vol-normalised next-bar return target (Eq. 23). |
| |
| target_t = clip( r_{t+1} / Ο_t, Β±clip_value ) |
| |
| β οΈ NEVER include during live inference β r_{t+1} is not available. |
| Use set_inference_mode(True) in your inference script to enforce this |
| as a hard RuntimeError rather than a docstring convention. |
| """ |
| |
| if inference_mode and not _allow_in_inference: |
| raise RuntimeError( |
| "normalized_return_target() was called while inference_mode=True. " |
| "This function uses r_{t+1} (future data) and must never run during " |
| "live inference. Pass inference_mode=False to the FeatureEngineer " |
| "constructor for training builds." |
| ) |
| |
|
|
| _validate_prices(prices) |
| sigma = ewma_volatility(prices, span=span) |
| r_next = log_returns(prices).shift(-1) |
| with np.errstate(invalid="ignore", divide="ignore"): |
| norm_target = r_next / sigma |
| norm_target = norm_target.where(sigma > _EPS, other=np.nan) |
| norm_target = norm_target.clip(lower=-clip_value, upper=clip_value) |
| norm_target.name = "target_norm_ret" |
| return norm_target |
|
|
|
|
| |
| |
| |
|
|
| def kaufman_efficiency_ratio( |
| close: pd.Series, |
| period: int = 14, |
| ) -> pd.Series: |
| """ |
| Kaufman Efficiency Ratio (KER) β directional efficiency of price movement. |
| |
| Formula |
| ------- |
| net_move = |close_t β close_{tβperiod}| (directional displacement) |
| path_len = Ξ£|close_i β close_{iβ1}| over period bars (total path) |
| ker_raw = net_move / path_len |
| ker = ker_raw Β· sign(close_t β close_{tβperiod}) |
| |
| Interpretation |
| -------------- |
| Β±1 β perfectly trending (every bar moves in the same direction) |
| 0 β perfectly choppy (price returns to start after a random walk) |
| |
| Output: [-1, +1] β NO_SCALE bucket. |
| Warm-up: period bars. |
| """ |
| net_move = close.diff(period) |
| path_len = close.diff().abs().rolling(period, min_periods=period).sum() |
| ker_unsigned = net_move.abs() / (path_len + _EPS) |
| ker = (ker_unsigned * np.sign(net_move)).clip(-1.0, 1.0) |
| ker.name = "feat_efficiency" |
| return ker |
|
|
|
|
| |
| |
| |
|
|
| def internal_close_position( |
| high: pd.Series, |
| low: pd.Series, |
| close: pd.Series, |
| period: int = 14, |
| ) -> pd.Series: |
| """ |
| Smoothed Internal Close Position (ICP) β where did close land in the bar range? |
| |
| Formula |
| ------- |
| raw_icp = (close β low) / (high β low + Ξ΅) β [0, 1] |
| scaled = raw_icp * 2 β 1 β [β1, +1] |
| icp = rolling_mean(scaled, period) |
| |
| Interpretation |
| -------------- |
| +1 β close at top of every bar over the window (bullish pressure) |
| β1 β close at bottom of every bar (bearish pressure) |
| 0 β neutral / indecisive |
| |
| Output: [-1, +1] β NO_SCALE bucket. |
| Warm-up: period bars. |
| """ |
| |
| _DOJI_THRESH = 1e-6 |
| |
| |
| |
|
|
| hl_range = high - low |
|
|
| |
| raw = (close - low) / (hl_range + _EPS) |
| raw = raw.where(hl_range >= _DOJI_THRESH, 0.5) |
| |
|
|
| |
| |
| raw = raw.clip(0.0, 1.0) |
|
|
| scaled = (raw * 2.0) - 1.0 |
| icp = scaled.rolling(period, min_periods=period).mean().clip(-1.0, 1.0) |
| icp.name = "feat_icp" |
| return icp |
|
|
|
|
| |
| |
| |
|
|
| def centered_rsi(close: pd.Series, period: int = 14) -> pd.Series: |
| """ |
| Wilder RSI re-centered to [-1, +1]. |
| |
| Formula |
| ------- |
| delta = close_t β close_{tβ1} |
| avg_up = EWMA(max(delta, 0), Ξ±=1/period) |
| avg_down = EWMA(max(βdelta, 0), Ξ±=1/period) |
| RS = avg_up / avg_down |
| RSI = 100 β 100/(1+RS) |
| output = (RSI β 50) / 50 β [β1, +1] |
| |
| Interpretation |
| -------------- |
| +1 β RSI=100 (overbought extreme β mean-reversion candidate) |
| β1 β RSI=0 (oversold extreme) |
| 0 β RSI=50 (neutral momentum) |
| |
| Output: [-1, +1] β NO_SCALE bucket. |
| Warm-up: approx 2*period bars. |
| """ |
| delta = close.diff() |
| up = delta.clip(lower=0.0) |
| dn = (-delta).clip(lower=0.0) |
|
|
| |
| try: |
| up_vals = up.values.get() |
| dn_vals = dn.values.get() |
| except AttributeError: |
| up_vals = np.array(up.values, dtype=np.float64) |
| dn_vals = np.array(dn.values, dtype=np.float64) |
|
|
| alpha = 1.0 / float(period) |
| avg_up_arr = _ewm_wilder_seeded(up_vals, alpha, period) |
| avg_dn_arr = _ewm_wilder_seeded(dn_vals, alpha, period) |
|
|
| avg_up = pd.Series(avg_up_arr.tolist(), index=close.index, dtype="float64") |
| avg_dn = pd.Series(avg_dn_arr.tolist(), index=close.index, dtype="float64") |
| |
|
|
| rs = avg_up / (avg_dn + _EPS) |
| rsi = 100.0 - (100.0 / (1.0 + rs)) |
| rsi_centered = ((rsi - 50.0) / 50.0).clip(-1.0, 1.0) |
| rsi_centered.name = "feat_momentum_rsi" |
| return rsi_centered |
|
|
|
|
| |
| |
| |
|
|
| def directional_vol_asymmetry( |
| close: pd.Series, |
| window: int = 20, |
| ) -> pd.Series: |
| """ |
| Asymmetry between upside and downside volatility over a rolling window. |
| |
| Formula |
| ------- |
| up_vol = std(max(r_t, 0), window) |
| down_vol = std(max(βr_t, 0), window) |
| output = (up_vol β down_vol) / (up_vol + down_vol + Ξ΅) |
| |
| Interpretation |
| -------------- |
| +1 β upside vol >> downside vol (unusual; often pre-breakout) |
| β1 β downside vol >> upside vol (crash regime / leverage effect) |
| 0 β symmetric volatility |
| |
| Markets typically show negative asymmetry (leverage effect). |
| Output: [-1, +1] β NO_SCALE bucket. |
| Warm-up: window bars. |
| """ |
| r = log_returns(close) |
| |
| _min_p = max(window // 3, 10) |
|
|
| |
| up_vol = r.where(r > 0, np.nan).rolling(window, min_periods=_min_p).std() |
| dn_vol = r.where(r < 0, np.nan).rolling(window, min_periods=_min_p).std() |
|
|
| asym = (up_vol - dn_vol) / (up_vol + dn_vol + _EPS) |
|
|
| |
| |
| n_up = r.where(r > 0, np.nan).rolling(window, min_periods=1).count() |
| n_dn = r.where(r < 0, np.nan).rolling(window, min_periods=1).count() |
| asym = asym.where((n_up >= _min_p) & (n_dn >= _min_p), np.nan) |
|
|
| asym = asym.clip(-1.0, 1.0) |
| asym.name = "feat_vol_asymmetry" |
| return asym |
|
|
|
|
| |
| |
| |
|
|
| def local_structure_position( |
| high: pd.Series, |
| low: pd.Series, |
| close: pd.Series, |
| window: int = 65, |
| ) -> pd.Series: |
| """ |
| Normalised price position within its recent high-low range (Donchian position). |
| |
| Formula |
| ------- |
| roll_high = rolling_max(high, window) |
| roll_low = rolling_min(low, window) |
| output = ((close β roll_low) / (roll_high β roll_low + Ξ΅)) * 2 β 1 |
| |
| Interpretation |
| -------------- |
| +1 β close at top of the N-bar range (strong resistance zone) |
| β1 β close at bottom (strong support zone) |
| 0 β mid-range |
| |
| Output: [-1, +1] β NO_SCALE bucket. |
| Warm-up: window bars. |
| """ |
| roll_high = high.rolling(window, min_periods=window).max() |
| roll_low = low.rolling(window, min_periods=window).min() |
|
|
| _DOJI_THRESH = 1e-6 |
| hl_range = roll_high - roll_low |
| pos_raw = (close - roll_low) / (hl_range + _EPS) |
| pos_raw = pos_raw.where(hl_range >= _DOJI_THRESH, 0.5) |
| pos = (pos_raw * 2.0 - 1.0).clip(-1.0, 1.0) |
| pos.name = "feat_local_structure" |
| return pos |
|
|
|
|
| |
| |
| |
|
|
| def _parse_hhmm(hhmm: str) -> Tuple[int, int]: |
| parts = hhmm.split(":") |
| if len(parts) != 2: |
| raise ValueError(f"Invalid time string: {hhmm!r} β expected 'HH:MM'.") |
| h, m = int(parts[0]), int(parts[1]) |
| if not (0 <= h <= 23) or not (0 <= m <= 59): |
| raise ValueError(f"Invalid time: {hhmm!r}") |
| return h, m |
|
|
|
|
| def session_cyclic_features( |
| index: pd.DatetimeIndex, |
| session_open: str = "09:15", |
| session_close: str = "15:30", |
| tz: str = "Asia/Kolkata", |
| ) -> Tuple[pd.Series, pd.Series]: |
| """ |
| Cyclic sine/cosine encoding of intraday time position. |
| |
| Maps each bar's timestamp to its fractional position within the trading |
| session, then encodes that fraction as (sin, cos) to avoid discontinuities |
| at session boundaries. |
| |
| Formula |
| ------- |
| pos = (minutes_since_open) / session_length_minutes β [0, 1] |
| angle = 2Ο Β· pos |
| sin_t = sin(angle), cos_t = cos(angle) |
| |
| Why sin/cos instead of raw position |
| ------------------------------------ |
| A raw linear position creates a discontinuity (1β0 jump) the model has to |
| learn to ignore. The sin/cos pair encodes it continuously β temporal |
| distance between any two points matches their angular distance. |
| |
| Interpretation |
| -------------- |
| (sin=0, cos=1) β session open (9:15) |
| (sin=1, cos=0) β mid-session (~12:22) |
| (sin=0, cos=β1) β session close (15:30) |
| |
| Output: both in [-1, +1] β NO_SCALE bucket. |
| Zero warm-up β pure timestamp function. |
| """ |
| if not isinstance(index, pd.DatetimeIndex): |
| raise TypeError("session_cyclic_features requires a DatetimeIndex.") |
|
|
| |
| if index.tz is not None: |
| idx = index.tz_convert(tz) |
| else: |
| logger.warning( |
| "session_cyclic_features: DatetimeIndex is tz-naive. " |
| "Assuming timestamps are already in %s. " |
| "If your data is UTC or another timezone, localize before calling: " |
| "index = index.tz_localize('UTC').tz_convert('%s')", |
| tz, tz, |
| ) |
| |
| |
| |
| idx = index.tz_localize(tz, ambiguous="infer", nonexistent="shift_forward") |
| |
|
|
| oh, om = _parse_hhmm(session_open) |
| ch, cm = _parse_hhmm(session_close) |
| open_min = oh * 60 + om |
| close_min = ch * 60 + cm |
| session_len = close_min - open_min |
| if session_len <= 0: |
| raise ValueError("session_close must be after session_open.") |
|
|
| minutes = pd.Series( |
| idx.hour.astype(np.int32) * 60 + idx.minute.astype(np.int32), |
| index=index, |
| ).clip(lower=open_min, upper=close_min) |
|
|
| pos = (minutes - open_min) / float(session_len) |
| angle = math.pi * pos |
|
|
| |
| s_sin = pd.Series(np.sin(angle), index=index, name="feat_session_sin", dtype="float64") |
| s_cos = pd.Series(np.cos(angle), index=index, name="feat_session_cos", dtype="float64") |
| |
| return s_sin, s_cos |
|
|
|
|
| |
| |
| |
|
|
| def vol_squeeze_ratio( |
| high: pd.Series, |
| low: pd.Series, |
| close: pd.Series, |
| fast_window: int = 5, |
| slow_window: int = 20, |
| ) -> pd.Series: |
| """ |
| Ratio of short-term ATR to medium-term ATR β detects volatility contractions. |
| |
| Formula |
| ------- |
| TR_t = max(HβL, |HβC_{t-1}|, |LβC_{t-1}|) |
| ATR_fast = rolling_mean(TR, fast_window) |
| ATR_slow = rolling_mean(TR, slow_window) |
| output = ATR_fast / (ATR_slow + Ξ΅) |
| |
| Interpretation |
| -------------- |
| < 1.0 β current vol below medium-term average (compression / squeeze) |
| = 1.0 β neutral |
| > 1.0 β expanding volatility (breakout mode) |
| |
| Distribution: right-skewed and unbounded above β ROBUST scaling bucket. |
| Warm-up: slow_window bars. |
| """ |
| prev_close = close.shift(1) |
| tr = pd.concat([ |
| high - low, |
| (high - prev_close).abs(), |
| (low - prev_close).abs(), |
| ], axis=1).max(axis=1) |
|
|
| atr_fast = tr.rolling(fast_window, min_periods=fast_window).mean() |
| atr_slow = tr.rolling(slow_window, min_periods=slow_window).mean() |
| squeeze = atr_fast / (atr_slow + _EPS) |
| squeeze.name = "feat_vol_squeeze" |
| return squeeze |
|
|
|
|
| |
| |
| |
|
|
| class FeatureEngineer: |
| """ |
| End-to-end feature engineering for financial time-series models. |
| |
| Orchestrates all 13 feature families into a single aligned DataFrame. |
| |
| Usage |
| ----- |
| Close-only (original paper features): |
| fe = FeatureEngineer() |
| feats = fe.build(close_series, include_target=False, dropna=False) |
| |
| Full OHLC features (all 13 families): |
| fe = FeatureEngineer() |
| feats = fe.build(close_series, ohlc=ohlc_df, include_target=False) |
| |
| Notes |
| ----- |
| - ohlc must have lowercase columns: open, high, low, close. |
| - ohlc.index must align with prices.index. |
| - If ohlc is None, the 8 OHLC-based features are silently skipped |
| (fully backward-compatible with existing train.py). |
| - feat_session_sin / feat_session_cos require a DatetimeIndex. If the |
| index is not a DatetimeIndex, session features are skipped with a warning. |
| """ |
|
|
| def __init__( |
| self, |
| config: Optional[FeatureConfig] = None, |
| inference_mode: bool = False, |
| ) -> None: |
| self.config = config or FeatureConfig() |
| self.inference_mode = inference_mode |
|
|
| |
| |
| |
|
|
| def build( |
| self, |
| prices: pd.Series, |
| ohlc: Optional[pd.DataFrame] = None, |
| include_target: bool = False, |
| dropna: bool = False, |
| ) -> pd.DataFrame: |
| """ |
| Compute the full feature matrix for one asset. |
| |
| Parameters |
| ---------- |
| prices : pd.Series |
| Close prices. Index should be a DatetimeIndex for session features. |
| ohlc : pd.DataFrame, optional |
| DataFrame with columns open/high/low/close (same index as prices). |
| If provided, computes the 8 OHLC-based features (6β13). |
| If None, only the 4 close-only paper features are computed. |
| include_target : bool |
| Append the normalised return target column (training only). |
| dropna : bool |
| Drop NaN warm-up rows. Use True for training, False for inference. |
| |
| Returns |
| ------- |
| pd.DataFrame |
| Column layout (when ohlc provided): |
| Close-only (NO_SCALE): |
| ewma_vol_span{N} |
| ret_norm_{h}d for h in cfg.return_horizons |
| default: ret_norm_1d, ret_norm_3d, ret_norm_6d, ret_norm_13d, |
| ret_norm_26d, ret_norm_65d, ret_norm_130d, ret_norm_260d |
| count: len(cfg.return_horizons) β 8 with default config |
| macd_8_24, macd_26_78, macd_52_156 (len(cfg.macd_pairs) β 3 with default) |
| Close-only (ROBUST): |
| vs_factor_span{N} |
| OHLC-based (NO_SCALE): |
| feat_efficiency |
| feat_icp |
| feat_momentum_rsi |
| feat_vol_asymmetry |
| feat_local_structure |
| feat_session_sin |
| feat_session_cos |
| OHLC-based (ROBUST): |
| feat_vol_squeeze |
| Training only: |
| target_norm_ret |
| """ |
| cfg = self.config |
| logger.info( |
| "Building features for '%s' | %d rows | ohlc=%s.", |
| prices.name or "unnamed", |
| len(prices), |
| ohlc is not None, |
| ) |
|
|
| _validate_prices(prices) |
| prices = prices.sort_index() |
|
|
| if ohlc is not None: |
| |
| |
| ohlc = ohlc.sort_index().reindex(prices.index) |
|
|
| |
| overlap_frac = ohlc.notna().any(axis=1).mean() |
| if overlap_frac == 0.0: |
| raise ValueError( |
| "OHLC DataFrame has zero overlap with prices.index after reindex. " |
| "This is almost certainly a timezone or frequency mismatch. " |
| f"OHLC index sample: {ohlc.index[:3].tolist()}, " |
| f"Prices index sample: {prices.index[:3].tolist()}. " |
| "Fix: align timezones before calling build() β e.g., " |
| "ohlc.index = ohlc.index.tz_convert('Asia/Kolkata')." |
| ) |
| elif overlap_frac < 0.5: |
| logger.warning( |
| "build(): OHLC overlap with prices.index is only %.1f%% after reindex. " |
| "%.1f%% of OHLC bars are NaN β likely a timezone or frequency mismatch. " |
| "OHLC index sample: %s | Prices index sample: %s", |
| overlap_frac * 100, |
| (1 - overlap_frac) * 100, |
| ohlc.index[:3].tolist(), |
| prices.index[:3].tolist(), |
| ) |
| elif overlap_frac < 0.95: |
| logger.warning( |
| "build(): OHLC overlap with prices.index is %.1f%% β " |
| "%d NaN bars will propagate into OHLC features.", |
| overlap_frac * 100, |
| int((1 - overlap_frac) * len(ohlc)), |
| ) |
| |
|
|
| _validate_ohlc(ohlc) |
|
|
| parts: list[pd.DataFrame | pd.Series] = [] |
|
|
| |
| vol = ewma_volatility(prices, span=cfg.ewma_span) |
| parts.append(vol) |
|
|
| |
| ret_feats = normalized_returns( |
| prices, |
| horizons=cfg.return_horizons, |
| span=cfg.ewma_span, |
| ) |
| parts.append(ret_feats) |
|
|
| |
| macd_feats = macd_signals_multi( |
| prices, |
| pairs=cfg.macd_pairs, |
| price_std_window=cfg.macd_price_std_window, |
| signal_std_window=cfg.macd_signal_std_window, |
| ) |
| parts.append(macd_feats) |
|
|
| |
|
|
| |
| if ohlc is not None: |
| h = ohlc["high"] |
| l = ohlc["low"] |
| c = ohlc["close"] |
|
|
| |
| parts.append(kaufman_efficiency_ratio(c, period=cfg.momentum_period)) |
|
|
| |
| parts.append(internal_close_position(h, l, c, period=cfg.icp_period)) |
|
|
| |
| parts.append(centered_rsi(c, period=cfg.effective_rsi_period)) |
|
|
| |
| parts.append(directional_vol_asymmetry(c, window=cfg.vol_asym_window)) |
|
|
| |
| parts.append(local_structure_position(h, l, c, window=cfg.local_structure_bars)) |
|
|
| |
| if cfg.add_session_features: |
| if isinstance(prices.index, pd.DatetimeIndex): |
| s_sin, s_cos = session_cyclic_features( |
| index=prices.index, |
| session_open=cfg.session_open, |
| session_close=cfg.session_close, |
| tz=cfg.session_tz, |
| ) |
| parts.append(s_sin) |
| parts.append(s_cos) |
| else: |
| logger.warning( |
| "Session features skipped β index is not a DatetimeIndex " |
| "(got %s).", type(prices.index).__name__ |
| ) |
|
|
| |
| parts.append(vol_squeeze_ratio( |
| h, l, c, |
| fast_window=cfg.vol_squeeze_fast, |
| slow_window=cfg.vol_squeeze_slow, |
| )) |
|
|
| |
| if cfg.use_talib: |
| try: |
| from talib_features import build_talib_features |
| talib_df = build_talib_features(ohlc if ohlc is not None else pd.DataFrame(prices.rename("close"))) |
| if not talib_df.empty: |
| |
| talib_df = talib_df.reindex(prices.index) |
| parts.append(talib_df) |
| else: |
| logger.warning("TA-Lib feature build returned empty DataFrame.") |
| except ImportError: |
| logger.error("talib_features.py not found. TA-Lib integration failed.") |
| except Exception as e: |
| logger.error("Error building TA-Lib features: %s", e) |
|
|
| |
| if include_target: |
| |
| if self.inference_mode: |
| raise RuntimeError( |
| "FeatureEngineer.build() called with include_target=True while " |
| "inference_mode=True. The target column uses r_{t+1} (look-ahead). " |
| "Pass inference_mode=False to the FeatureEngineer constructor " |
| "for training builds." |
| ) |
| |
| target = normalized_return_target( |
| prices, |
| span=cfg.ewma_span, |
| clip_value=cfg.target_clip, |
| inference_mode=self.inference_mode, |
| ) |
| parts.append(target) |
|
|
| result = pd.concat(parts, axis=1) |
|
|
| |
| if not (result.dtypes == "float64").all(): |
| raise ValueError( |
| f"Mixed dtypes after concat: " |
| f"{result.dtypes[result.dtypes != 'float64'].to_dict()}" |
| ) |
|
|
| if dropna: |
| n_before = len(result) |
| result = result.dropna() |
| n_dropped = n_before - len(result) |
| logger.info( |
| "Warm-up rows dropped: %d / %d (%.1f%%)", |
| n_dropped, |
| n_before, |
| 100 * n_dropped / n_before, |
| ) |
|
|
| logger.info( |
| "Feature matrix built: shape=%s | NaN count=%d", |
| result.shape, |
| result.isna().sum().sum(), |
| ) |
| return result |
|
|
| |
| |
| |
|
|
| def build_multi_asset( |
| self, |
| price_df: pd.DataFrame, |
| ohlc_dict: Optional[dict[str, pd.DataFrame]] = None, |
| include_target: bool = False, |
| dropna: bool = False, |
| ) -> dict[str, pd.DataFrame]: |
| """ |
| Build features for every asset (column) in a price panel. |
| |
| Parameters |
| ---------- |
| price_df : pd.DataFrame |
| Columns = ticker symbols, values = close prices. |
| ohlc_dict : dict[str, pd.DataFrame], optional |
| {ticker: ohlc_df} for OHLC-based features per asset. |
| """ |
| result: dict[str, pd.DataFrame] = {} |
| for ticker in price_df.columns: |
| series = price_df[ticker].dropna() |
| series.name = ticker |
| ohlc = ohlc_dict.get(ticker) if ohlc_dict else None |
| try: |
| result[ticker] = self.build( |
| series, |
| ohlc=ohlc, |
| include_target=include_target, |
| dropna=dropna, |
| ) |
| except Exception as exc: |
| logger.warning("Feature build FAILED for '%s': %s", ticker, exc) |
| return result |
|
|
| def stack_for_model( |
| self, |
| feature_dict: dict[str, pd.DataFrame], |
| lookback: int = 63, |
| ) -> tuple[np.ndarray, np.ndarray, list[str], list[str]]: |
| """ |
| Stack per-asset feature DataFrames into aligned 3-D tensors. |
| |
| Returns |
| ------- |
| X : np.ndarray shape (T, K, d) |
| y : np.ndarray shape (T, K) or empty if no target column |
| dates : list[str] |
| tickers : list[str] |
| """ |
| if not feature_dict: |
| raise ValueError( |
| "stack_for_model: feature_dict is empty β all ticker builds failed. " |
| "Check the warnings logged by build_multi_asset() for per-ticker errors." |
| ) |
|
|
| tickers = list(feature_dict.keys()) |
| has_target = "target_norm_ret" in next(iter(feature_dict.values())).columns |
|
|
| |
| |
| min_len_ticker = min(feature_dict, key=lambda t: len(feature_dict[t])) |
| min_len = len(feature_dict[min_len_ticker]) |
| if min_len <= lookback: |
| logger.warning( |
| "stack_for_model(): ticker '%s' has only %d bars β shorter than " |
| "lookback=%d. This will likely produce an empty common_idx after slicing.", |
| min_len_ticker, |
| min_len, |
| lookback, |
| ) |
|
|
| common_idx = feature_dict[tickers[0]].index |
| for df in feature_dict.values(): |
| common_idx = common_idx.intersection(df.index) |
|
|
| n_common = len(common_idx) |
| common_idx = common_idx[lookback:] |
|
|
| |
| if len(common_idx) == 0: |
| raise ValueError( |
| f"stack_for_model(): no dates remain after applying lookback={lookback}. " |
| f"The common index across {len(tickers)} ticker(s) contained only " |
| f"{n_common} bar(s) β which is <= lookback ({lookback}). " |
| "Fix: reduce lookback, supply more history, or check that all tickers " |
| "cover the same date range (a single short ticker shrinks the intersection)." |
| ) |
| logger.info( |
| "stack_for_model(): common_idx after lookback trim: %d bars " |
| "(%d dropped as warm-up, spanning %s β %s).", |
| len(common_idx), |
| lookback, |
| common_idx[0] if len(common_idx) > 0 else "None", |
| common_idx[-1] if len(common_idx) > 0 else "None", |
| ) |
| |
|
|
| |
| |
| |
|
|
| all_col_sets = [set(df.columns) for df in feature_dict.values()] |
| common_col_set = set.intersection(*all_col_sets) |
|
|
| |
| for ticker, df in feature_dict.items(): |
| dropped = set(df.columns) - common_col_set |
| if dropped: |
| logger.warning( |
| "stack_for_model: ticker '%s' has extra columns not present " |
| "in all tickers β dropping from tensor: %s", |
| ticker, |
| sorted(dropped), |
| ) |
|
|
| |
| feature_cols = sorted( |
| col for col in common_col_set |
| if col != "target_norm_ret" |
| ) |
|
|
| if not feature_cols: |
| raise ValueError( |
| "stack_for_model: no feature columns common to all tickers. " |
| "Check that all assets were built with the same FeatureConfig." |
| ) |
|
|
| logger.info( |
| "stack_for_model: %d common feature columns across %d tickers.", |
| len(feature_cols), len(tickers), |
| ) |
| |
|
|
| X_list, y_list = [], [] |
| for ticker in tickers: |
| df = feature_dict[ticker].loc[common_idx] |
| X_list.append(df[feature_cols].values) |
| if has_target: |
| y_list.append(df["target_norm_ret"].values) |
|
|
| X = np.stack(X_list, axis=1).astype(np.float32) |
| y = np.stack(y_list, axis=1).astype(np.float32) if has_target else np.array([]) |
| return X, y, [str(d) for d in common_idx], tickers |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| import warnings |
| warnings.filterwarnings("ignore") |
|
|
| np.random.seed(42) |
| N = 10000 |
|
|
| def _gbm(n: int, mu: float = 0.0002, sigma: float = 0.015, s0: float = 100.0) -> pd.DataFrame: |
| start = pd.Timestamp("2020-01-02 09:15:00", tz="Asia/Kolkata") |
| idx = pd.date_range(start, periods=n * 5, freq="30min", tz="Asia/Kolkata") |
| idx = idx[(idx.hour * 60 + idx.minute >= 9 * 60 + 15) & |
| (idx.hour * 60 + idx.minute <= 15 * 60 + 30)][:n] |
| log_r = np.random.normal(mu, sigma, len(idx)) |
| closes = s0 * np.exp(np.cumsum(log_r)) |
| highs = closes * (1 + np.abs(np.random.normal(0, 0.003, len(idx)))) |
| lows = closes * (1 - np.abs(np.random.normal(0, 0.003, len(idx)))) |
| opens = np.roll(closes, 1); opens[0] = s0 |
| return pd.DataFrame( |
| {"open": opens, "high": highs, "low": lows, "close": closes}, |
| index=idx, |
| ) |
|
|
| raw = _gbm(N) |
| close = raw["close"].rename("NIFTY") |
|
|
| cfg = FeatureConfig() |
|
|
| fe = FeatureEngineer(config=cfg) |
| feats = fe.build(close, ohlc=raw, include_target=True, dropna=True) |
|
|
| print("\n" + "=" * 65) |
| print(" FEATURE MATRIX (close-only + OHLC, dropna=True)") |
| print("=" * 65) |
| print(f" Shape : {feats.shape}") |
| print(f" Columns ({len(feats.columns)}):") |
| for col in feats.columns: |
| s = feats[col] |
| print(f" {col:<30} mean={s.mean():+.4f} std={s.std():.4f} " |
| f"min={s.min():+.4f} max={s.max():+.4f}") |
| print("\n NaN remaining:", feats.isna().sum().sum()) |