""" features.py — Sniper v7.1 feature engineering & label construction. This is the single source of truth used by both the backtester and evaluator. Ported directly from sniper_v7_1.py training code. """ import numpy as np import pandas as pd # --------------------------------------------------------------------------- # Feature engineering (mirrors sniper_v7_1.py build_features exactly) # --------------------------------------------------------------------------- def build_features(df: pd.DataFrame, vix_data=None, sp500_data=None) -> pd.DataFrame: """ Build all 100+ technical features from OHLCV data. Inputs must have columns: Open, High, Low, Close, Volume. Returns a DataFrame of features (shifted 1 day to prevent lookahead). """ feat = pd.DataFrame(index=df.index) c = df["Close"] h = df["High"] l = df["Low"] o = df["Open"] v = df["Volume"] daily_ret = c.pct_change() # --- Exhaustion / Mean-reversion signals --- down = (c < c.shift(1)).astype(int) feat["consec_down_days"] = down.groupby((down != down.shift()).cumsum()).cumsum() up = (c > c.shift(1)).astype(int) feat["consec_up_days"] = up.groupby((up != up.shift()).cumsum()).cumsum() for n in [5, 10, 20, 50]: feat[f"dist_from_{n}d_low"] = (c - l.rolling(n).min()) / c feat[f"dist_from_{n}d_high"] = (h.rolling(n).max() - c) / c feat["vol_ratio_5d"] = v / v.rolling(5).mean() feat["vol_ratio_20d"] = v / v.rolling(20).mean() for n in [3, 5, 10]: feat[f"drawdown_{n}d"] = (c / c.rolling(n).max()) - 1 feat["sell_climax_5d"] = daily_ret.rolling(5).min() * feat["vol_ratio_5d"] # --- Oscillators --- delta = c.diff() gain14 = delta.where(delta > 0, 0.0).rolling(14).mean() loss14 = (-delta.where(delta < 0, 0.0)).rolling(14).mean() rs14 = gain14 / loss14.replace(0, np.nan) feat["rsi_14"] = 100 - (100 / (1 + rs14)) gain7 = delta.where(delta > 0, 0.0).rolling(7).mean() loss7 = (-delta.where(delta < 0, 0.0)).rolling(7).mean() rs7 = gain7 / loss7.replace(0, np.nan) feat["rsi_7"] = 100 - (100 / (1 + rs7)) low14 = l.rolling(14).min() high14 = h.rolling(14).max() rng14 = (high14 - low14).replace(0, np.nan) feat["stoch_k"] = 100 * (c - low14) / rng14 feat["stoch_d"] = feat["stoch_k"].rolling(3).mean() feat["williams_r"] = -100 * (high14 - c) / rng14 tp = (h + l + c) / 3 sma_tp = tp.rolling(20).mean() mad = tp.rolling(20).apply(lambda x: np.mean(np.abs(x - x.mean())), raw=True) feat["cci_20"] = (tp - sma_tp) / (0.015 * mad).replace(0, np.nan) ema12 = c.ewm(span=12).mean() ema26 = c.ewm(span=26).mean() macd_line = ema12 - ema26 signal_line = macd_line.ewm(span=9).mean() feat["macd_hist"] = macd_line - signal_line feat["macd_hist_norm"] = feat["macd_hist"] / c mf = tp * v pos_mf = mf.where(tp > tp.shift(1), 0).rolling(14).sum() neg_mf = mf.where(tp <= tp.shift(1), 0).rolling(14).sum() feat["mfi_14"] = 100 - (100 / (1 + pos_mf / neg_mf.replace(0, np.nan))) feat["rsi_div_5d"] = ( (feat["rsi_14"] - feat["rsi_14"].rolling(5).min()) - (c - c.rolling(5).min()) / c * 100 ) # --- Volume / OBV --- obv = (np.sign(daily_ret) * v).cumsum() feat["obv_slope_10d"] = obv.pct_change(10) feat["obv_slope_20d"] = obv.pct_change(20) # --- Volatility --- tr = pd.concat( [h - l, (h - c.shift(1)).abs(), (l - c.shift(1)).abs()], axis=1 ).max(axis=1) feat["atr_14"] = tr.rolling(14).mean() feat["atr_ratio"] = feat["atr_14"] / c for n in [5, 10, 20, 60]: feat[f"hvol_{n}d"] = daily_ret.rolling(n).std() * np.sqrt(252) feat["vol_contraction"] = feat["hvol_5d"] / feat["hvol_20d"].replace(0, np.nan) feat["vol_contraction_long"] = feat["hvol_10d"] / feat["hvol_60d"].replace(0, np.nan) sma20 = c.rolling(20).mean() std20 = c.rolling(20).std() bb_upper = sma20 + 2 * std20 bb_lower = sma20 - 2 * std20 feat["bb_width"] = (bb_upper - bb_lower) / sma20 feat["bb_pctb"] = (c - bb_lower) / (bb_upper - bb_lower).replace(0, np.nan) kc_mid = c.ewm(span=20).mean() kc_upper = kc_mid + 1.5 * feat["atr_14"] kc_lower = kc_mid - 1.5 * feat["atr_14"] feat["keltner_pos"] = (c - kc_lower) / (kc_upper - kc_lower).replace(0, np.nan) feat["squeeze"] = ((bb_lower > kc_lower) & (bb_upper < kc_upper)).astype(int) for n in [5, 10, 20]: feat[f"range_pct_{n}d"] = (h.rolling(n).max() - l.rolling(n).min()) / c if vix_data is not None: vix_aligned = vix_data.reindex(df.index, method="ffill") feat["rv_iv_ratio"] = (feat["hvol_20d"] * 100) / vix_aligned.replace(0, np.nan) # --- Returns --- for n in [1, 2, 3, 5, 10, 20, 60]: feat[f"ret_{n}d"] = c.pct_change(n) # --- Trend / Price structure --- sma50 = c.rolling(50).mean() for n in [5, 10, 20, 50, 200]: sma = c.rolling(n).mean() feat[f"dist_sma_{n}"] = (c - sma) / sma for n in [8, 21, 55]: ema = c.ewm(span=n).mean() feat[f"dist_ema_{n}"] = (c - ema) / ema feat["sma50_slope"] = sma50.pct_change(5) feat["sma20_slope"] = sma20.pct_change(5) feat["above_sma200"] = (c > c.rolling(200).mean()).astype(int) feat["above_sma50"] = (c > sma50).astype(int) feat["gap"] = (o - c.shift(1)) / c.shift(1) body = (c - o).abs() total_range = (h - l).replace(0, np.nan) feat["body_ratio"] = body / total_range feat["upper_wick_ratio"] = (h - pd.concat([c, o], axis=1).max(axis=1)) / total_range feat["lower_wick_ratio"] = (pd.concat([c, o], axis=1).min(axis=1) - l) / total_range # --- Lagged signals --- for lag in [1, 2, 3, 5, 10, 21]: feat[f"ret_1d_lag{lag}"] = daily_ret.shift(max(0, lag - 1)) for lag in [1, 5, 10]: feat[f"vol_ratio_lag{lag}"] = feat["vol_ratio_20d"].shift(max(0, lag - 1)) for lag in [1, 3, 5]: feat[f"rsi_lag{lag}"] = feat["rsi_14"].shift(max(0, lag - 1)) feat["mean_rev_5d"] = feat["ret_5d"] * (feat["rsi_14"] < 30).astype(float) feat["autocorr_5d"] = daily_ret.rolling(20).apply( lambda x: x.autocorr(lag=5) if len(x) > 5 else 0.0, raw=False ) # --- External / Market context --- if vix_data is not None: vix_aligned = vix_data.reindex(df.index, method="ffill") feat["vix"] = vix_aligned feat["vix_ma10"] = vix_aligned.rolling(10).mean() feat["vix_pctile"] = vix_aligned.rolling(252).rank(pct=True) feat["vix_change_5d"] = vix_aligned.pct_change(5) feat["vix_term_structure"] = vix_aligned / vix_aligned.rolling(20).mean() if sp500_data is not None: sp_aligned = sp500_data.reindex(df.index, method="ffill") sp_ret = sp_aligned.pct_change() feat["sp500_ret_5d"] = sp_aligned.pct_change(5) feat["sp500_ret_20d"] = sp_aligned.pct_change(20) feat["sp500_above_sma200"] = (sp_aligned > sp_aligned.rolling(200).mean()).astype(int) feat["sp500_hvol_20d"] = sp_ret.rolling(20).std() * np.sqrt(252) feat["market_breadth_proxy"] = ( feat.get("sp500_ret_5d", pd.Series(0, index=df.index)) - feat.get("ret_5d", pd.Series(0, index=df.index)) ) # Shift 1 to prevent lookahead leakage feat = feat.shift(1) return feat # --------------------------------------------------------------------------- # Label construction (mirrors sniper_v7_1.py construct_labels exactly) # --------------------------------------------------------------------------- def construct_labels( df: pd.DataFrame, pt_multiplier: float = 3.0, sl_multiplier: float = 0.5, atr_period: int = 20, horizon: int = 15, use_time_weight: bool = True, time_weight_decay: float = 0.80, ) -> tuple: """ Dual-barrier label construction. Returns (labels Series, time_weights Series). label = 1 if PT hit before SL within horizon days, else 0. Last `horizon` rows are masked as -1. """ c = df["Close"].values h = df["High"].values l = df["Low"].values tr = np.maximum( h[1:] - l[1:], np.maximum(np.abs(h[1:] - c[:-1]), np.abs(l[1:] - c[:-1])), ) atr = pd.Series(np.concatenate([[np.nan], tr])).rolling(atr_period).mean().values n = len(c) labels = np.zeros(n, dtype=int) time_weights = np.ones(n, dtype=float) for i in range(n - horizon): if np.isnan(atr[i]) or atr[i] == 0: continue entry_price = c[i] upper_barrier = entry_price + pt_multiplier * atr[i] lower_barrier = entry_price - sl_multiplier * atr[i] for j in range(1, horizon + 1): if i + j >= n: break if l[i + j] <= lower_barrier: break if h[i + j] >= upper_barrier: labels[i] = 1 if use_time_weight: time_weights[i] = time_weight_decay ** (j - 1) break labels[-horizon:] = -1 return pd.Series(labels, index=df.index), pd.Series(time_weights, index=df.index) # --------------------------------------------------------------------------- # ATR helper (for live stop/target calculation in the backtester) # --------------------------------------------------------------------------- def compute_atr(df: pd.DataFrame, period: int = 14) -> pd.Series: c = df["Close"] h = df["High"] l = df["Low"] tr = pd.concat( [h - l, (h - c.shift(1)).abs(), (l - c.shift(1)).abs()], axis=1 ).max(axis=1) return tr.rolling(period).mean() # --------------------------------------------------------------------------- # Confluence scoring (bonus filter, same as trainer) # --------------------------------------------------------------------------- def compute_confluence(X: pd.DataFrame) -> pd.Series: score = pd.Series(np.zeros(len(X)), index=X.index) def _get(col, default): return X[col] if col in X.columns else pd.Series(default, index=X.index) checks = { "RSI oversold": _get("rsi_14", 50) < 35, "Stoch oversold": _get("stoch_k", 50) < 25, "MFI oversold": _get("mfi_14", 50) < 30, "Below BB lower": _get("bb_pctb", 0.5) < 0.1, "Near SMA support": _get("dist_sma_20", 0) < -0.03, "Volume spike": _get("vol_ratio_20d", 1) > 1.5, "VIX elevated": _get("vix_pctile", 0.5) > 0.7, "Consec down": _get("consec_down_days", 0) >= 3, "Recent drawdown": _get("drawdown_5d", 0) < -0.05, "Trend intact": _get("sma50_slope", 0) > 0, } for _, cond in checks.items(): score += cond.astype(float).fillna(0) return score