"""Feature engineering: 50+ technical, price-derived, cross-asset, and sector features.""" import logging from typing import Optional import numpy as np import pandas as pd logger = logging.getLogger(__name__) # Try pandas-ta-classic, fall back to pandas_ta try: import pandas_ta_classic as ta except ImportError: try: import pandas_ta as ta except ImportError: logger.warning("pandas-ta-classic not installed, technical indicators unavailable") ta = None class FeatureEngineer: """Computes all features for a given ticker's OHLCV data.""" def __init__(self, lookback: int = 200): self.lookback = lookback def compute_all( self, ohlcv: pd.DataFrame, fred_data: Optional[pd.DataFrame] = None, sector_data: Optional[pd.DataFrame] = None, sentiment_data: Optional[pd.DataFrame] = None, stock_type: str = "large_cap", ) -> pd.DataFrame: """Compute all features and return a single DataFrame.""" features = pd.DataFrame(index=ohlcv.index) # Technical indicators tech = self.compute_technical_indicators(ohlcv) features = features.join(tech) # Price-derived features price = self.compute_price_features(ohlcv) features = features.join(price) # Cross-asset features (FRED) if fred_data is not None and not fred_data.empty: macro = self.compute_macro_features(fred_data, ohlcv.index) features = features.join(macro) # Sector rotation features if sector_data is not None and not sector_data.empty: sector = self.compute_sector_features(sector_data, ohlcv) features = features.join(sector) # Sentiment features (pre-computed scores) if sentiment_data is not None and not sentiment_data.empty: features = features.join(sentiment_data.reindex(features.index)) # Asset-type-specific features type_feats = self.compute_type_specific_features(ohlcv, stock_type) features = features.join(type_feats) return features def compute_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame: """Compute trend, momentum, volatility, and volume indicators.""" features = pd.DataFrame(index=df.index) close = df["Close"] high = df["High"] low = df["Low"] volume = df["Volume"] if ta is None: return features # === Trend === for period in [5, 10, 20, 50, 200]: features[f"sma_{period}"] = ta.sma(close, length=period) features["ema_12"] = ta.ema(close, length=12) features["ema_26"] = ta.ema(close, length=26) macd = ta.macd(close) if macd is not None: features = features.join(macd) adx = ta.adx(high, low, close) if adx is not None: features = features.join(adx) aroon = ta.aroon(high, low) if aroon is not None: features = features.join(aroon) # === Momentum === features["rsi_14"] = ta.rsi(close, length=14) stoch = ta.stoch(high, low, close) if stoch is not None: features = features.join(stoch) features["willr"] = ta.willr(high, low, close) features["roc_10"] = ta.roc(close, length=10) features["cci_20"] = ta.cci(high, low, close, length=20) ppo = ta.ppo(close) if ppo is not None: if isinstance(ppo, pd.DataFrame): features = features.join(ppo) else: features["ppo"] = ppo # === Volatility === bbands = ta.bbands(close) if bbands is not None: features = features.join(bbands) features["atr_14"] = ta.atr(high, low, close, length=14) kc = ta.kc(high, low, close) if kc is not None: features = features.join(kc) features["hvol_20"] = close.pct_change().rolling(20).std() * np.sqrt(252) features["hvol_60"] = close.pct_change().rolling(60).std() * np.sqrt(252) # === Volume === features["obv"] = ta.obv(close, volume) features["mfi_14"] = ta.mfi(high, low, close, volume, length=14) ad = ta.ad(high, low, close, volume) if ad is not None: features["ad"] = ad features["vol_sma_ratio"] = volume / volume.rolling(20).mean() return features def compute_price_features(self, df: pd.DataFrame) -> pd.DataFrame: """Compute price-derived features.""" features = pd.DataFrame(index=df.index) close = df["Close"] # Log returns at multiple windows for window in [1, 5, 10, 20, 60]: features[f"log_return_{window}d"] = np.log(close / close.shift(window)) # High-low range features["hl_range"] = (df["High"] - df["Low"]) / close features["gap_pct"] = (df["Open"] - close.shift(1)) / close.shift(1) # Distance from 52-week extremes features["dist_52w_high"] = close / close.rolling(252).max() - 1 features["dist_52w_low"] = close / close.rolling(252).min() - 1 # Rolling Z-score (200-day) roll_mean = close.rolling(200).mean() roll_std = close.rolling(200).std() features["zscore_200"] = (close - roll_mean) / roll_std.replace(0, np.nan) # Consecutive up/down days daily_ret = close.pct_change() up = (daily_ret > 0).astype(int) down = (daily_ret < 0).astype(int) # Count consecutive using cumsum trick features["consec_up"] = up * (up.groupby((up != up.shift()).cumsum()).cumcount() + 1) features["consec_down"] = down * (down.groupby((down != down.shift()).cumsum()).cumcount() + 1) return features def compute_macro_features( self, fred_df: pd.DataFrame, target_index: pd.DatetimeIndex ) -> pd.DataFrame: """Align FRED macro data to the target index.""" # Reindex to target dates, forward-fill aligned = fred_df.reindex(target_index, method="ffill") features = pd.DataFrame(index=target_index) # Rate levels for col in fred_df.columns: features[f"fred_{col}"] = aligned[col] # Rate changes if "DGS10" in aligned.columns: features["dgs10_change_5d"] = aligned["DGS10"].diff(5) if "T10Y2Y" in aligned.columns: features["yield_curve_slope"] = aligned["T10Y2Y"] if "VIXCLS" in aligned.columns: features["vix_change_5d"] = aligned["VIXCLS"].diff(5) return features def compute_sector_features( self, sector_close: pd.DataFrame, ticker_df: pd.DataFrame ) -> pd.DataFrame: """Compute sector rotation features.""" features = pd.DataFrame(index=ticker_df.index) returns = sector_close.pct_change() spy_ret = returns.get("SPY") if spy_ret is None: return features sector_cols = [c for c in returns.columns if c != "SPY"] # Relative strength (20-day rolling) for period in [20, 60]: for col in sector_cols: col_ret = returns[col].rolling(period).sum() spy_roll = spy_ret.rolling(period).sum() features[f"rs_{col}_{period}d"] = (col_ret - spy_roll).reindex(ticker_df.index) # Sector spread (dispersion) sector_20d = returns[sector_cols].rolling(20).sum() features["sector_spread_20d"] = (sector_20d.max(axis=1) - sector_20d.min(axis=1)).reindex( ticker_df.index ) return features def compute_type_specific_features( self, df: pd.DataFrame, stock_type: str ) -> pd.DataFrame: """Compute features specific to a stock type.""" features = pd.DataFrame(index=df.index) if stock_type == "penny": features["volume_ratio_5d"] = df["Volume"] / df["Volume"].rolling(5).mean() features["price_level"] = df["Close"] features["intraday_range_pct"] = (df["High"] - df["Low"]) / df["Close"] elif stock_type == "reit": # REIT-specific: price relative to dividend yield proxy features["price_to_sma50_ratio"] = df["Close"] / df["Close"].rolling(50).mean() elif stock_type == "etf": # ETF mean-reversion signal features["etf_deviation_20d"] = df["Close"] / df["Close"].rolling(20).mean() - 1 return features def compute_targets( close: pd.Series, horizons: list[int] ) -> pd.DataFrame: """Compute prediction targets for given horizons.""" targets = pd.DataFrame(index=close.index) for h in horizons: future_return = close.shift(-h) / close - 1 targets[f"magnitude_{h}d"] = future_return targets[f"direction_{h}d"] = np.sign(future_return).astype("Int64") targets[f"volatility_{h}d"] = close.pct_change().rolling(h).std().shift(-h) * np.sqrt(252) return targets