model-tester / src /features.py
Arkm20's picture
Create features.py
86b656d verified
"""
features.py — Sniper v7.1 feature engineering & label construction.
This is the single source of truth used by both the backtester and evaluator.
Ported directly from sniper_v7_1.py training code.
"""
import numpy as np
import pandas as pd
# ---------------------------------------------------------------------------
# Feature engineering (mirrors sniper_v7_1.py build_features exactly)
# ---------------------------------------------------------------------------
def build_features(df: pd.DataFrame, vix_data=None, sp500_data=None) -> pd.DataFrame:
"""
Build all 100+ technical features from OHLCV data.
Inputs must have columns: Open, High, Low, Close, Volume.
Returns a DataFrame of features (shifted 1 day to prevent lookahead).
"""
feat = pd.DataFrame(index=df.index)
c = df["Close"]
h = df["High"]
l = df["Low"]
o = df["Open"]
v = df["Volume"]
daily_ret = c.pct_change()
# --- Exhaustion / Mean-reversion signals ---
down = (c < c.shift(1)).astype(int)
feat["consec_down_days"] = down.groupby((down != down.shift()).cumsum()).cumsum()
up = (c > c.shift(1)).astype(int)
feat["consec_up_days"] = up.groupby((up != up.shift()).cumsum()).cumsum()
for n in [5, 10, 20, 50]:
feat[f"dist_from_{n}d_low"] = (c - l.rolling(n).min()) / c
feat[f"dist_from_{n}d_high"] = (h.rolling(n).max() - c) / c
feat["vol_ratio_5d"] = v / v.rolling(5).mean()
feat["vol_ratio_20d"] = v / v.rolling(20).mean()
for n in [3, 5, 10]:
feat[f"drawdown_{n}d"] = (c / c.rolling(n).max()) - 1
feat["sell_climax_5d"] = daily_ret.rolling(5).min() * feat["vol_ratio_5d"]
# --- Oscillators ---
delta = c.diff()
gain14 = delta.where(delta > 0, 0.0).rolling(14).mean()
loss14 = (-delta.where(delta < 0, 0.0)).rolling(14).mean()
rs14 = gain14 / loss14.replace(0, np.nan)
feat["rsi_14"] = 100 - (100 / (1 + rs14))
gain7 = delta.where(delta > 0, 0.0).rolling(7).mean()
loss7 = (-delta.where(delta < 0, 0.0)).rolling(7).mean()
rs7 = gain7 / loss7.replace(0, np.nan)
feat["rsi_7"] = 100 - (100 / (1 + rs7))
low14 = l.rolling(14).min()
high14 = h.rolling(14).max()
rng14 = (high14 - low14).replace(0, np.nan)
feat["stoch_k"] = 100 * (c - low14) / rng14
feat["stoch_d"] = feat["stoch_k"].rolling(3).mean()
feat["williams_r"] = -100 * (high14 - c) / rng14
tp = (h + l + c) / 3
sma_tp = tp.rolling(20).mean()
mad = tp.rolling(20).apply(lambda x: np.mean(np.abs(x - x.mean())), raw=True)
feat["cci_20"] = (tp - sma_tp) / (0.015 * mad).replace(0, np.nan)
ema12 = c.ewm(span=12).mean()
ema26 = c.ewm(span=26).mean()
macd_line = ema12 - ema26
signal_line = macd_line.ewm(span=9).mean()
feat["macd_hist"] = macd_line - signal_line
feat["macd_hist_norm"] = feat["macd_hist"] / c
mf = tp * v
pos_mf = mf.where(tp > tp.shift(1), 0).rolling(14).sum()
neg_mf = mf.where(tp <= tp.shift(1), 0).rolling(14).sum()
feat["mfi_14"] = 100 - (100 / (1 + pos_mf / neg_mf.replace(0, np.nan)))
feat["rsi_div_5d"] = (
(feat["rsi_14"] - feat["rsi_14"].rolling(5).min())
- (c - c.rolling(5).min()) / c * 100
)
# --- Volume / OBV ---
obv = (np.sign(daily_ret) * v).cumsum()
feat["obv_slope_10d"] = obv.pct_change(10)
feat["obv_slope_20d"] = obv.pct_change(20)
# --- Volatility ---
tr = pd.concat(
[h - l, (h - c.shift(1)).abs(), (l - c.shift(1)).abs()], axis=1
).max(axis=1)
feat["atr_14"] = tr.rolling(14).mean()
feat["atr_ratio"] = feat["atr_14"] / c
for n in [5, 10, 20, 60]:
feat[f"hvol_{n}d"] = daily_ret.rolling(n).std() * np.sqrt(252)
feat["vol_contraction"] = feat["hvol_5d"] / feat["hvol_20d"].replace(0, np.nan)
feat["vol_contraction_long"] = feat["hvol_10d"] / feat["hvol_60d"].replace(0, np.nan)
sma20 = c.rolling(20).mean()
std20 = c.rolling(20).std()
bb_upper = sma20 + 2 * std20
bb_lower = sma20 - 2 * std20
feat["bb_width"] = (bb_upper - bb_lower) / sma20
feat["bb_pctb"] = (c - bb_lower) / (bb_upper - bb_lower).replace(0, np.nan)
kc_mid = c.ewm(span=20).mean()
kc_upper = kc_mid + 1.5 * feat["atr_14"]
kc_lower = kc_mid - 1.5 * feat["atr_14"]
feat["keltner_pos"] = (c - kc_lower) / (kc_upper - kc_lower).replace(0, np.nan)
feat["squeeze"] = ((bb_lower > kc_lower) & (bb_upper < kc_upper)).astype(int)
for n in [5, 10, 20]:
feat[f"range_pct_{n}d"] = (h.rolling(n).max() - l.rolling(n).min()) / c
if vix_data is not None:
vix_aligned = vix_data.reindex(df.index, method="ffill")
feat["rv_iv_ratio"] = (feat["hvol_20d"] * 100) / vix_aligned.replace(0, np.nan)
# --- Returns ---
for n in [1, 2, 3, 5, 10, 20, 60]:
feat[f"ret_{n}d"] = c.pct_change(n)
# --- Trend / Price structure ---
sma50 = c.rolling(50).mean()
for n in [5, 10, 20, 50, 200]:
sma = c.rolling(n).mean()
feat[f"dist_sma_{n}"] = (c - sma) / sma
for n in [8, 21, 55]:
ema = c.ewm(span=n).mean()
feat[f"dist_ema_{n}"] = (c - ema) / ema
feat["sma50_slope"] = sma50.pct_change(5)
feat["sma20_slope"] = sma20.pct_change(5)
feat["above_sma200"] = (c > c.rolling(200).mean()).astype(int)
feat["above_sma50"] = (c > sma50).astype(int)
feat["gap"] = (o - c.shift(1)) / c.shift(1)
body = (c - o).abs()
total_range = (h - l).replace(0, np.nan)
feat["body_ratio"] = body / total_range
feat["upper_wick_ratio"] = (h - pd.concat([c, o], axis=1).max(axis=1)) / total_range
feat["lower_wick_ratio"] = (pd.concat([c, o], axis=1).min(axis=1) - l) / total_range
# --- Lagged signals ---
for lag in [1, 2, 3, 5, 10, 21]:
feat[f"ret_1d_lag{lag}"] = daily_ret.shift(max(0, lag - 1))
for lag in [1, 5, 10]:
feat[f"vol_ratio_lag{lag}"] = feat["vol_ratio_20d"].shift(max(0, lag - 1))
for lag in [1, 3, 5]:
feat[f"rsi_lag{lag}"] = feat["rsi_14"].shift(max(0, lag - 1))
feat["mean_rev_5d"] = feat["ret_5d"] * (feat["rsi_14"] < 30).astype(float)
feat["autocorr_5d"] = daily_ret.rolling(20).apply(
lambda x: x.autocorr(lag=5) if len(x) > 5 else 0.0, raw=False
)
# --- External / Market context ---
if vix_data is not None:
vix_aligned = vix_data.reindex(df.index, method="ffill")
feat["vix"] = vix_aligned
feat["vix_ma10"] = vix_aligned.rolling(10).mean()
feat["vix_pctile"] = vix_aligned.rolling(252).rank(pct=True)
feat["vix_change_5d"] = vix_aligned.pct_change(5)
feat["vix_term_structure"] = vix_aligned / vix_aligned.rolling(20).mean()
if sp500_data is not None:
sp_aligned = sp500_data.reindex(df.index, method="ffill")
sp_ret = sp_aligned.pct_change()
feat["sp500_ret_5d"] = sp_aligned.pct_change(5)
feat["sp500_ret_20d"] = sp_aligned.pct_change(20)
feat["sp500_above_sma200"] = (sp_aligned > sp_aligned.rolling(200).mean()).astype(int)
feat["sp500_hvol_20d"] = sp_ret.rolling(20).std() * np.sqrt(252)
feat["market_breadth_proxy"] = (
feat.get("sp500_ret_5d", pd.Series(0, index=df.index))
- feat.get("ret_5d", pd.Series(0, index=df.index))
)
# Shift 1 to prevent lookahead leakage
feat = feat.shift(1)
return feat
# ---------------------------------------------------------------------------
# Label construction (mirrors sniper_v7_1.py construct_labels exactly)
# ---------------------------------------------------------------------------
def construct_labels(
df: pd.DataFrame,
pt_multiplier: float = 3.0,
sl_multiplier: float = 0.5,
atr_period: int = 20,
horizon: int = 15,
use_time_weight: bool = True,
time_weight_decay: float = 0.80,
) -> tuple:
"""
Dual-barrier label construction.
Returns (labels Series, time_weights Series).
label = 1 if PT hit before SL within horizon days, else 0.
Last `horizon` rows are masked as -1.
"""
c = df["Close"].values
h = df["High"].values
l = df["Low"].values
tr = np.maximum(
h[1:] - l[1:],
np.maximum(np.abs(h[1:] - c[:-1]), np.abs(l[1:] - c[:-1])),
)
atr = pd.Series(np.concatenate([[np.nan], tr])).rolling(atr_period).mean().values
n = len(c)
labels = np.zeros(n, dtype=int)
time_weights = np.ones(n, dtype=float)
for i in range(n - horizon):
if np.isnan(atr[i]) or atr[i] == 0:
continue
entry_price = c[i]
upper_barrier = entry_price + pt_multiplier * atr[i]
lower_barrier = entry_price - sl_multiplier * atr[i]
for j in range(1, horizon + 1):
if i + j >= n:
break
if l[i + j] <= lower_barrier:
break
if h[i + j] >= upper_barrier:
labels[i] = 1
if use_time_weight:
time_weights[i] = time_weight_decay ** (j - 1)
break
labels[-horizon:] = -1
return pd.Series(labels, index=df.index), pd.Series(time_weights, index=df.index)
# ---------------------------------------------------------------------------
# ATR helper (for live stop/target calculation in the backtester)
# ---------------------------------------------------------------------------
def compute_atr(df: pd.DataFrame, period: int = 14) -> pd.Series:
c = df["Close"]
h = df["High"]
l = df["Low"]
tr = pd.concat(
[h - l, (h - c.shift(1)).abs(), (l - c.shift(1)).abs()], axis=1
).max(axis=1)
return tr.rolling(period).mean()
# ---------------------------------------------------------------------------
# Confluence scoring (bonus filter, same as trainer)
# ---------------------------------------------------------------------------
def compute_confluence(X: pd.DataFrame) -> pd.Series:
score = pd.Series(np.zeros(len(X)), index=X.index)
def _get(col, default):
return X[col] if col in X.columns else pd.Series(default, index=X.index)
checks = {
"RSI oversold": _get("rsi_14", 50) < 35,
"Stoch oversold": _get("stoch_k", 50) < 25,
"MFI oversold": _get("mfi_14", 50) < 30,
"Below BB lower": _get("bb_pctb", 0.5) < 0.1,
"Near SMA support": _get("dist_sma_20", 0) < -0.03,
"Volume spike": _get("vol_ratio_20d", 1) > 1.5,
"VIX elevated": _get("vix_pctile", 0.5) > 0.7,
"Consec down": _get("consec_down_days", 0) >= 3,
"Recent drawdown": _get("drawdown_5d", 0) < -0.05,
"Trend intact": _get("sma50_slope", 0) > 0,
}
for _, cond in checks.items():
score += cond.astype(float).fillna(0)
return score