Spaces:
Sleeping
Sleeping
| """ | |
| features.py — Sniper v7.1 feature engineering & label construction. | |
| This is the single source of truth used by both the backtester and evaluator. | |
| Ported directly from sniper_v7_1.py training code. | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| # --------------------------------------------------------------------------- | |
| # Feature engineering (mirrors sniper_v7_1.py build_features exactly) | |
| # --------------------------------------------------------------------------- | |
| def build_features(df: pd.DataFrame, vix_data=None, sp500_data=None) -> pd.DataFrame: | |
| """ | |
| Build all 100+ technical features from OHLCV data. | |
| Inputs must have columns: Open, High, Low, Close, Volume. | |
| Returns a DataFrame of features (shifted 1 day to prevent lookahead). | |
| """ | |
| feat = pd.DataFrame(index=df.index) | |
| c = df["Close"] | |
| h = df["High"] | |
| l = df["Low"] | |
| o = df["Open"] | |
| v = df["Volume"] | |
| daily_ret = c.pct_change() | |
| # --- Exhaustion / Mean-reversion signals --- | |
| down = (c < c.shift(1)).astype(int) | |
| feat["consec_down_days"] = down.groupby((down != down.shift()).cumsum()).cumsum() | |
| up = (c > c.shift(1)).astype(int) | |
| feat["consec_up_days"] = up.groupby((up != up.shift()).cumsum()).cumsum() | |
| for n in [5, 10, 20, 50]: | |
| feat[f"dist_from_{n}d_low"] = (c - l.rolling(n).min()) / c | |
| feat[f"dist_from_{n}d_high"] = (h.rolling(n).max() - c) / c | |
| feat["vol_ratio_5d"] = v / v.rolling(5).mean() | |
| feat["vol_ratio_20d"] = v / v.rolling(20).mean() | |
| for n in [3, 5, 10]: | |
| feat[f"drawdown_{n}d"] = (c / c.rolling(n).max()) - 1 | |
| feat["sell_climax_5d"] = daily_ret.rolling(5).min() * feat["vol_ratio_5d"] | |
| # --- Oscillators --- | |
| delta = c.diff() | |
| gain14 = delta.where(delta > 0, 0.0).rolling(14).mean() | |
| loss14 = (-delta.where(delta < 0, 0.0)).rolling(14).mean() | |
| rs14 = gain14 / loss14.replace(0, np.nan) | |
| feat["rsi_14"] = 100 - (100 / (1 + rs14)) | |
| gain7 = delta.where(delta > 0, 0.0).rolling(7).mean() | |
| loss7 = (-delta.where(delta < 0, 0.0)).rolling(7).mean() | |
| rs7 = gain7 / loss7.replace(0, np.nan) | |
| feat["rsi_7"] = 100 - (100 / (1 + rs7)) | |
| low14 = l.rolling(14).min() | |
| high14 = h.rolling(14).max() | |
| rng14 = (high14 - low14).replace(0, np.nan) | |
| feat["stoch_k"] = 100 * (c - low14) / rng14 | |
| feat["stoch_d"] = feat["stoch_k"].rolling(3).mean() | |
| feat["williams_r"] = -100 * (high14 - c) / rng14 | |
| tp = (h + l + c) / 3 | |
| sma_tp = tp.rolling(20).mean() | |
| mad = tp.rolling(20).apply(lambda x: np.mean(np.abs(x - x.mean())), raw=True) | |
| feat["cci_20"] = (tp - sma_tp) / (0.015 * mad).replace(0, np.nan) | |
| ema12 = c.ewm(span=12).mean() | |
| ema26 = c.ewm(span=26).mean() | |
| macd_line = ema12 - ema26 | |
| signal_line = macd_line.ewm(span=9).mean() | |
| feat["macd_hist"] = macd_line - signal_line | |
| feat["macd_hist_norm"] = feat["macd_hist"] / c | |
| mf = tp * v | |
| pos_mf = mf.where(tp > tp.shift(1), 0).rolling(14).sum() | |
| neg_mf = mf.where(tp <= tp.shift(1), 0).rolling(14).sum() | |
| feat["mfi_14"] = 100 - (100 / (1 + pos_mf / neg_mf.replace(0, np.nan))) | |
| feat["rsi_div_5d"] = ( | |
| (feat["rsi_14"] - feat["rsi_14"].rolling(5).min()) | |
| - (c - c.rolling(5).min()) / c * 100 | |
| ) | |
| # --- Volume / OBV --- | |
| obv = (np.sign(daily_ret) * v).cumsum() | |
| feat["obv_slope_10d"] = obv.pct_change(10) | |
| feat["obv_slope_20d"] = obv.pct_change(20) | |
| # --- Volatility --- | |
| tr = pd.concat( | |
| [h - l, (h - c.shift(1)).abs(), (l - c.shift(1)).abs()], axis=1 | |
| ).max(axis=1) | |
| feat["atr_14"] = tr.rolling(14).mean() | |
| feat["atr_ratio"] = feat["atr_14"] / c | |
| for n in [5, 10, 20, 60]: | |
| feat[f"hvol_{n}d"] = daily_ret.rolling(n).std() * np.sqrt(252) | |
| feat["vol_contraction"] = feat["hvol_5d"] / feat["hvol_20d"].replace(0, np.nan) | |
| feat["vol_contraction_long"] = feat["hvol_10d"] / feat["hvol_60d"].replace(0, np.nan) | |
| sma20 = c.rolling(20).mean() | |
| std20 = c.rolling(20).std() | |
| bb_upper = sma20 + 2 * std20 | |
| bb_lower = sma20 - 2 * std20 | |
| feat["bb_width"] = (bb_upper - bb_lower) / sma20 | |
| feat["bb_pctb"] = (c - bb_lower) / (bb_upper - bb_lower).replace(0, np.nan) | |
| kc_mid = c.ewm(span=20).mean() | |
| kc_upper = kc_mid + 1.5 * feat["atr_14"] | |
| kc_lower = kc_mid - 1.5 * feat["atr_14"] | |
| feat["keltner_pos"] = (c - kc_lower) / (kc_upper - kc_lower).replace(0, np.nan) | |
| feat["squeeze"] = ((bb_lower > kc_lower) & (bb_upper < kc_upper)).astype(int) | |
| for n in [5, 10, 20]: | |
| feat[f"range_pct_{n}d"] = (h.rolling(n).max() - l.rolling(n).min()) / c | |
| if vix_data is not None: | |
| vix_aligned = vix_data.reindex(df.index, method="ffill") | |
| feat["rv_iv_ratio"] = (feat["hvol_20d"] * 100) / vix_aligned.replace(0, np.nan) | |
| # --- Returns --- | |
| for n in [1, 2, 3, 5, 10, 20, 60]: | |
| feat[f"ret_{n}d"] = c.pct_change(n) | |
| # --- Trend / Price structure --- | |
| sma50 = c.rolling(50).mean() | |
| for n in [5, 10, 20, 50, 200]: | |
| sma = c.rolling(n).mean() | |
| feat[f"dist_sma_{n}"] = (c - sma) / sma | |
| for n in [8, 21, 55]: | |
| ema = c.ewm(span=n).mean() | |
| feat[f"dist_ema_{n}"] = (c - ema) / ema | |
| feat["sma50_slope"] = sma50.pct_change(5) | |
| feat["sma20_slope"] = sma20.pct_change(5) | |
| feat["above_sma200"] = (c > c.rolling(200).mean()).astype(int) | |
| feat["above_sma50"] = (c > sma50).astype(int) | |
| feat["gap"] = (o - c.shift(1)) / c.shift(1) | |
| body = (c - o).abs() | |
| total_range = (h - l).replace(0, np.nan) | |
| feat["body_ratio"] = body / total_range | |
| feat["upper_wick_ratio"] = (h - pd.concat([c, o], axis=1).max(axis=1)) / total_range | |
| feat["lower_wick_ratio"] = (pd.concat([c, o], axis=1).min(axis=1) - l) / total_range | |
| # --- Lagged signals --- | |
| for lag in [1, 2, 3, 5, 10, 21]: | |
| feat[f"ret_1d_lag{lag}"] = daily_ret.shift(max(0, lag - 1)) | |
| for lag in [1, 5, 10]: | |
| feat[f"vol_ratio_lag{lag}"] = feat["vol_ratio_20d"].shift(max(0, lag - 1)) | |
| for lag in [1, 3, 5]: | |
| feat[f"rsi_lag{lag}"] = feat["rsi_14"].shift(max(0, lag - 1)) | |
| feat["mean_rev_5d"] = feat["ret_5d"] * (feat["rsi_14"] < 30).astype(float) | |
| feat["autocorr_5d"] = daily_ret.rolling(20).apply( | |
| lambda x: x.autocorr(lag=5) if len(x) > 5 else 0.0, raw=False | |
| ) | |
| # --- External / Market context --- | |
| if vix_data is not None: | |
| vix_aligned = vix_data.reindex(df.index, method="ffill") | |
| feat["vix"] = vix_aligned | |
| feat["vix_ma10"] = vix_aligned.rolling(10).mean() | |
| feat["vix_pctile"] = vix_aligned.rolling(252).rank(pct=True) | |
| feat["vix_change_5d"] = vix_aligned.pct_change(5) | |
| feat["vix_term_structure"] = vix_aligned / vix_aligned.rolling(20).mean() | |
| if sp500_data is not None: | |
| sp_aligned = sp500_data.reindex(df.index, method="ffill") | |
| sp_ret = sp_aligned.pct_change() | |
| feat["sp500_ret_5d"] = sp_aligned.pct_change(5) | |
| feat["sp500_ret_20d"] = sp_aligned.pct_change(20) | |
| feat["sp500_above_sma200"] = (sp_aligned > sp_aligned.rolling(200).mean()).astype(int) | |
| feat["sp500_hvol_20d"] = sp_ret.rolling(20).std() * np.sqrt(252) | |
| feat["market_breadth_proxy"] = ( | |
| feat.get("sp500_ret_5d", pd.Series(0, index=df.index)) | |
| - feat.get("ret_5d", pd.Series(0, index=df.index)) | |
| ) | |
| # Shift 1 to prevent lookahead leakage | |
| feat = feat.shift(1) | |
| return feat | |
| # --------------------------------------------------------------------------- | |
| # Label construction (mirrors sniper_v7_1.py construct_labels exactly) | |
| # --------------------------------------------------------------------------- | |
| def construct_labels( | |
| df: pd.DataFrame, | |
| pt_multiplier: float = 3.0, | |
| sl_multiplier: float = 0.5, | |
| atr_period: int = 20, | |
| horizon: int = 15, | |
| use_time_weight: bool = True, | |
| time_weight_decay: float = 0.80, | |
| ) -> tuple: | |
| """ | |
| Dual-barrier label construction. | |
| Returns (labels Series, time_weights Series). | |
| label = 1 if PT hit before SL within horizon days, else 0. | |
| Last `horizon` rows are masked as -1. | |
| """ | |
| c = df["Close"].values | |
| h = df["High"].values | |
| l = df["Low"].values | |
| tr = np.maximum( | |
| h[1:] - l[1:], | |
| np.maximum(np.abs(h[1:] - c[:-1]), np.abs(l[1:] - c[:-1])), | |
| ) | |
| atr = pd.Series(np.concatenate([[np.nan], tr])).rolling(atr_period).mean().values | |
| n = len(c) | |
| labels = np.zeros(n, dtype=int) | |
| time_weights = np.ones(n, dtype=float) | |
| for i in range(n - horizon): | |
| if np.isnan(atr[i]) or atr[i] == 0: | |
| continue | |
| entry_price = c[i] | |
| upper_barrier = entry_price + pt_multiplier * atr[i] | |
| lower_barrier = entry_price - sl_multiplier * atr[i] | |
| for j in range(1, horizon + 1): | |
| if i + j >= n: | |
| break | |
| if l[i + j] <= lower_barrier: | |
| break | |
| if h[i + j] >= upper_barrier: | |
| labels[i] = 1 | |
| if use_time_weight: | |
| time_weights[i] = time_weight_decay ** (j - 1) | |
| break | |
| labels[-horizon:] = -1 | |
| return pd.Series(labels, index=df.index), pd.Series(time_weights, index=df.index) | |
| # --------------------------------------------------------------------------- | |
| # ATR helper (for live stop/target calculation in the backtester) | |
| # --------------------------------------------------------------------------- | |
| def compute_atr(df: pd.DataFrame, period: int = 14) -> pd.Series: | |
| c = df["Close"] | |
| h = df["High"] | |
| l = df["Low"] | |
| tr = pd.concat( | |
| [h - l, (h - c.shift(1)).abs(), (l - c.shift(1)).abs()], axis=1 | |
| ).max(axis=1) | |
| return tr.rolling(period).mean() | |
| # --------------------------------------------------------------------------- | |
| # Confluence scoring (bonus filter, same as trainer) | |
| # --------------------------------------------------------------------------- | |
| def compute_confluence(X: pd.DataFrame) -> pd.Series: | |
| score = pd.Series(np.zeros(len(X)), index=X.index) | |
| def _get(col, default): | |
| return X[col] if col in X.columns else pd.Series(default, index=X.index) | |
| checks = { | |
| "RSI oversold": _get("rsi_14", 50) < 35, | |
| "Stoch oversold": _get("stoch_k", 50) < 25, | |
| "MFI oversold": _get("mfi_14", 50) < 30, | |
| "Below BB lower": _get("bb_pctb", 0.5) < 0.1, | |
| "Near SMA support": _get("dist_sma_20", 0) < -0.03, | |
| "Volume spike": _get("vol_ratio_20d", 1) > 1.5, | |
| "VIX elevated": _get("vix_pctile", 0.5) > 0.7, | |
| "Consec down": _get("consec_down_days", 0) >= 3, | |
| "Recent drawdown": _get("drawdown_5d", 0) < -0.05, | |
| "Trend intact": _get("sma50_slope", 0) > 0, | |
| } | |
| for _, cond in checks.items(): | |
| score += cond.astype(float).fillna(0) | |
| return score |