m
Initial deployment: ensemble stock predictor with trained models
bcceb77
"""Feature engineering: 50+ technical, price-derived, cross-asset, and sector features."""
import logging
from typing import Optional
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
# Try pandas-ta-classic, fall back to pandas_ta
try:
import pandas_ta_classic as ta
except ImportError:
try:
import pandas_ta as ta
except ImportError:
logger.warning("pandas-ta-classic not installed, technical indicators unavailable")
ta = None
class FeatureEngineer:
"""Computes all features for a given ticker's OHLCV data."""
def __init__(self, lookback: int = 200):
self.lookback = lookback
def compute_all(
self,
ohlcv: pd.DataFrame,
fred_data: Optional[pd.DataFrame] = None,
sector_data: Optional[pd.DataFrame] = None,
sentiment_data: Optional[pd.DataFrame] = None,
stock_type: str = "large_cap",
) -> pd.DataFrame:
"""Compute all features and return a single DataFrame."""
features = pd.DataFrame(index=ohlcv.index)
# Technical indicators
tech = self.compute_technical_indicators(ohlcv)
features = features.join(tech)
# Price-derived features
price = self.compute_price_features(ohlcv)
features = features.join(price)
# Cross-asset features (FRED)
if fred_data is not None and not fred_data.empty:
macro = self.compute_macro_features(fred_data, ohlcv.index)
features = features.join(macro)
# Sector rotation features
if sector_data is not None and not sector_data.empty:
sector = self.compute_sector_features(sector_data, ohlcv)
features = features.join(sector)
# Sentiment features (pre-computed scores)
if sentiment_data is not None and not sentiment_data.empty:
features = features.join(sentiment_data.reindex(features.index))
# Asset-type-specific features
type_feats = self.compute_type_specific_features(ohlcv, stock_type)
features = features.join(type_feats)
return features
def compute_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""Compute trend, momentum, volatility, and volume indicators."""
features = pd.DataFrame(index=df.index)
close = df["Close"]
high = df["High"]
low = df["Low"]
volume = df["Volume"]
if ta is None:
return features
# === Trend ===
for period in [5, 10, 20, 50, 200]:
features[f"sma_{period}"] = ta.sma(close, length=period)
features["ema_12"] = ta.ema(close, length=12)
features["ema_26"] = ta.ema(close, length=26)
macd = ta.macd(close)
if macd is not None:
features = features.join(macd)
adx = ta.adx(high, low, close)
if adx is not None:
features = features.join(adx)
aroon = ta.aroon(high, low)
if aroon is not None:
features = features.join(aroon)
# === Momentum ===
features["rsi_14"] = ta.rsi(close, length=14)
stoch = ta.stoch(high, low, close)
if stoch is not None:
features = features.join(stoch)
features["willr"] = ta.willr(high, low, close)
features["roc_10"] = ta.roc(close, length=10)
features["cci_20"] = ta.cci(high, low, close, length=20)
ppo = ta.ppo(close)
if ppo is not None:
if isinstance(ppo, pd.DataFrame):
features = features.join(ppo)
else:
features["ppo"] = ppo
# === Volatility ===
bbands = ta.bbands(close)
if bbands is not None:
features = features.join(bbands)
features["atr_14"] = ta.atr(high, low, close, length=14)
kc = ta.kc(high, low, close)
if kc is not None:
features = features.join(kc)
features["hvol_20"] = close.pct_change().rolling(20).std() * np.sqrt(252)
features["hvol_60"] = close.pct_change().rolling(60).std() * np.sqrt(252)
# === Volume ===
features["obv"] = ta.obv(close, volume)
features["mfi_14"] = ta.mfi(high, low, close, volume, length=14)
ad = ta.ad(high, low, close, volume)
if ad is not None:
features["ad"] = ad
features["vol_sma_ratio"] = volume / volume.rolling(20).mean()
return features
def compute_price_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Compute price-derived features."""
features = pd.DataFrame(index=df.index)
close = df["Close"]
# Log returns at multiple windows
for window in [1, 5, 10, 20, 60]:
features[f"log_return_{window}d"] = np.log(close / close.shift(window))
# High-low range
features["hl_range"] = (df["High"] - df["Low"]) / close
features["gap_pct"] = (df["Open"] - close.shift(1)) / close.shift(1)
# Distance from 52-week extremes
features["dist_52w_high"] = close / close.rolling(252).max() - 1
features["dist_52w_low"] = close / close.rolling(252).min() - 1
# Rolling Z-score (200-day)
roll_mean = close.rolling(200).mean()
roll_std = close.rolling(200).std()
features["zscore_200"] = (close - roll_mean) / roll_std.replace(0, np.nan)
# Consecutive up/down days
daily_ret = close.pct_change()
up = (daily_ret > 0).astype(int)
down = (daily_ret < 0).astype(int)
# Count consecutive using cumsum trick
features["consec_up"] = up * (up.groupby((up != up.shift()).cumsum()).cumcount() + 1)
features["consec_down"] = down * (down.groupby((down != down.shift()).cumsum()).cumcount() + 1)
return features
def compute_macro_features(
self, fred_df: pd.DataFrame, target_index: pd.DatetimeIndex
) -> pd.DataFrame:
"""Align FRED macro data to the target index."""
# Reindex to target dates, forward-fill
aligned = fred_df.reindex(target_index, method="ffill")
features = pd.DataFrame(index=target_index)
# Rate levels
for col in fred_df.columns:
features[f"fred_{col}"] = aligned[col]
# Rate changes
if "DGS10" in aligned.columns:
features["dgs10_change_5d"] = aligned["DGS10"].diff(5)
if "T10Y2Y" in aligned.columns:
features["yield_curve_slope"] = aligned["T10Y2Y"]
if "VIXCLS" in aligned.columns:
features["vix_change_5d"] = aligned["VIXCLS"].diff(5)
return features
def compute_sector_features(
self, sector_close: pd.DataFrame, ticker_df: pd.DataFrame
) -> pd.DataFrame:
"""Compute sector rotation features."""
features = pd.DataFrame(index=ticker_df.index)
returns = sector_close.pct_change()
spy_ret = returns.get("SPY")
if spy_ret is None:
return features
sector_cols = [c for c in returns.columns if c != "SPY"]
# Relative strength (20-day rolling)
for period in [20, 60]:
for col in sector_cols:
col_ret = returns[col].rolling(period).sum()
spy_roll = spy_ret.rolling(period).sum()
features[f"rs_{col}_{period}d"] = (col_ret - spy_roll).reindex(ticker_df.index)
# Sector spread (dispersion)
sector_20d = returns[sector_cols].rolling(20).sum()
features["sector_spread_20d"] = (sector_20d.max(axis=1) - sector_20d.min(axis=1)).reindex(
ticker_df.index
)
return features
def compute_type_specific_features(
self, df: pd.DataFrame, stock_type: str
) -> pd.DataFrame:
"""Compute features specific to a stock type."""
features = pd.DataFrame(index=df.index)
if stock_type == "penny":
features["volume_ratio_5d"] = df["Volume"] / df["Volume"].rolling(5).mean()
features["price_level"] = df["Close"]
features["intraday_range_pct"] = (df["High"] - df["Low"]) / df["Close"]
elif stock_type == "reit":
# REIT-specific: price relative to dividend yield proxy
features["price_to_sma50_ratio"] = df["Close"] / df["Close"].rolling(50).mean()
elif stock_type == "etf":
# ETF mean-reversion signal
features["etf_deviation_20d"] = df["Close"] / df["Close"].rolling(20).mean() - 1
return features
def compute_targets(
close: pd.Series, horizons: list[int]
) -> pd.DataFrame:
"""Compute prediction targets for given horizons."""
targets = pd.DataFrame(index=close.index)
for h in horizons:
future_return = close.shift(-h) / close - 1
targets[f"magnitude_{h}d"] = future_return
targets[f"direction_{h}d"] = np.sign(future_return).astype("Int64")
targets[f"volatility_{h}d"] = close.pct_change().rolling(h).std().shift(-h) * np.sqrt(252)
return targets