Spaces:
Sleeping
Sleeping
| """Feature engineering: 50+ technical, price-derived, cross-asset, and sector features.""" | |
| import logging | |
| from typing import Optional | |
| import numpy as np | |
| import pandas as pd | |
| logger = logging.getLogger(__name__) | |
| # Try pandas-ta-classic, fall back to pandas_ta | |
| try: | |
| import pandas_ta_classic as ta | |
| except ImportError: | |
| try: | |
| import pandas_ta as ta | |
| except ImportError: | |
| logger.warning("pandas-ta-classic not installed, technical indicators unavailable") | |
| ta = None | |
| class FeatureEngineer: | |
| """Computes all features for a given ticker's OHLCV data.""" | |
| def __init__(self, lookback: int = 200): | |
| self.lookback = lookback | |
| def compute_all( | |
| self, | |
| ohlcv: pd.DataFrame, | |
| fred_data: Optional[pd.DataFrame] = None, | |
| sector_data: Optional[pd.DataFrame] = None, | |
| sentiment_data: Optional[pd.DataFrame] = None, | |
| stock_type: str = "large_cap", | |
| ) -> pd.DataFrame: | |
| """Compute all features and return a single DataFrame.""" | |
| features = pd.DataFrame(index=ohlcv.index) | |
| # Technical indicators | |
| tech = self.compute_technical_indicators(ohlcv) | |
| features = features.join(tech) | |
| # Price-derived features | |
| price = self.compute_price_features(ohlcv) | |
| features = features.join(price) | |
| # Cross-asset features (FRED) | |
| if fred_data is not None and not fred_data.empty: | |
| macro = self.compute_macro_features(fred_data, ohlcv.index) | |
| features = features.join(macro) | |
| # Sector rotation features | |
| if sector_data is not None and not sector_data.empty: | |
| sector = self.compute_sector_features(sector_data, ohlcv) | |
| features = features.join(sector) | |
| # Sentiment features (pre-computed scores) | |
| if sentiment_data is not None and not sentiment_data.empty: | |
| features = features.join(sentiment_data.reindex(features.index)) | |
| # Asset-type-specific features | |
| type_feats = self.compute_type_specific_features(ohlcv, stock_type) | |
| features = features.join(type_feats) | |
| return features | |
| def compute_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Compute trend, momentum, volatility, and volume indicators.""" | |
| features = pd.DataFrame(index=df.index) | |
| close = df["Close"] | |
| high = df["High"] | |
| low = df["Low"] | |
| volume = df["Volume"] | |
| if ta is None: | |
| return features | |
| # === Trend === | |
| for period in [5, 10, 20, 50, 200]: | |
| features[f"sma_{period}"] = ta.sma(close, length=period) | |
| features["ema_12"] = ta.ema(close, length=12) | |
| features["ema_26"] = ta.ema(close, length=26) | |
| macd = ta.macd(close) | |
| if macd is not None: | |
| features = features.join(macd) | |
| adx = ta.adx(high, low, close) | |
| if adx is not None: | |
| features = features.join(adx) | |
| aroon = ta.aroon(high, low) | |
| if aroon is not None: | |
| features = features.join(aroon) | |
| # === Momentum === | |
| features["rsi_14"] = ta.rsi(close, length=14) | |
| stoch = ta.stoch(high, low, close) | |
| if stoch is not None: | |
| features = features.join(stoch) | |
| features["willr"] = ta.willr(high, low, close) | |
| features["roc_10"] = ta.roc(close, length=10) | |
| features["cci_20"] = ta.cci(high, low, close, length=20) | |
| ppo = ta.ppo(close) | |
| if ppo is not None: | |
| if isinstance(ppo, pd.DataFrame): | |
| features = features.join(ppo) | |
| else: | |
| features["ppo"] = ppo | |
| # === Volatility === | |
| bbands = ta.bbands(close) | |
| if bbands is not None: | |
| features = features.join(bbands) | |
| features["atr_14"] = ta.atr(high, low, close, length=14) | |
| kc = ta.kc(high, low, close) | |
| if kc is not None: | |
| features = features.join(kc) | |
| features["hvol_20"] = close.pct_change().rolling(20).std() * np.sqrt(252) | |
| features["hvol_60"] = close.pct_change().rolling(60).std() * np.sqrt(252) | |
| # === Volume === | |
| features["obv"] = ta.obv(close, volume) | |
| features["mfi_14"] = ta.mfi(high, low, close, volume, length=14) | |
| ad = ta.ad(high, low, close, volume) | |
| if ad is not None: | |
| features["ad"] = ad | |
| features["vol_sma_ratio"] = volume / volume.rolling(20).mean() | |
| return features | |
| def compute_price_features(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Compute price-derived features.""" | |
| features = pd.DataFrame(index=df.index) | |
| close = df["Close"] | |
| # Log returns at multiple windows | |
| for window in [1, 5, 10, 20, 60]: | |
| features[f"log_return_{window}d"] = np.log(close / close.shift(window)) | |
| # High-low range | |
| features["hl_range"] = (df["High"] - df["Low"]) / close | |
| features["gap_pct"] = (df["Open"] - close.shift(1)) / close.shift(1) | |
| # Distance from 52-week extremes | |
| features["dist_52w_high"] = close / close.rolling(252).max() - 1 | |
| features["dist_52w_low"] = close / close.rolling(252).min() - 1 | |
| # Rolling Z-score (200-day) | |
| roll_mean = close.rolling(200).mean() | |
| roll_std = close.rolling(200).std() | |
| features["zscore_200"] = (close - roll_mean) / roll_std.replace(0, np.nan) | |
| # Consecutive up/down days | |
| daily_ret = close.pct_change() | |
| up = (daily_ret > 0).astype(int) | |
| down = (daily_ret < 0).astype(int) | |
| # Count consecutive using cumsum trick | |
| features["consec_up"] = up * (up.groupby((up != up.shift()).cumsum()).cumcount() + 1) | |
| features["consec_down"] = down * (down.groupby((down != down.shift()).cumsum()).cumcount() + 1) | |
| return features | |
| def compute_macro_features( | |
| self, fred_df: pd.DataFrame, target_index: pd.DatetimeIndex | |
| ) -> pd.DataFrame: | |
| """Align FRED macro data to the target index.""" | |
| # Reindex to target dates, forward-fill | |
| aligned = fred_df.reindex(target_index, method="ffill") | |
| features = pd.DataFrame(index=target_index) | |
| # Rate levels | |
| for col in fred_df.columns: | |
| features[f"fred_{col}"] = aligned[col] | |
| # Rate changes | |
| if "DGS10" in aligned.columns: | |
| features["dgs10_change_5d"] = aligned["DGS10"].diff(5) | |
| if "T10Y2Y" in aligned.columns: | |
| features["yield_curve_slope"] = aligned["T10Y2Y"] | |
| if "VIXCLS" in aligned.columns: | |
| features["vix_change_5d"] = aligned["VIXCLS"].diff(5) | |
| return features | |
| def compute_sector_features( | |
| self, sector_close: pd.DataFrame, ticker_df: pd.DataFrame | |
| ) -> pd.DataFrame: | |
| """Compute sector rotation features.""" | |
| features = pd.DataFrame(index=ticker_df.index) | |
| returns = sector_close.pct_change() | |
| spy_ret = returns.get("SPY") | |
| if spy_ret is None: | |
| return features | |
| sector_cols = [c for c in returns.columns if c != "SPY"] | |
| # Relative strength (20-day rolling) | |
| for period in [20, 60]: | |
| for col in sector_cols: | |
| col_ret = returns[col].rolling(period).sum() | |
| spy_roll = spy_ret.rolling(period).sum() | |
| features[f"rs_{col}_{period}d"] = (col_ret - spy_roll).reindex(ticker_df.index) | |
| # Sector spread (dispersion) | |
| sector_20d = returns[sector_cols].rolling(20).sum() | |
| features["sector_spread_20d"] = (sector_20d.max(axis=1) - sector_20d.min(axis=1)).reindex( | |
| ticker_df.index | |
| ) | |
| return features | |
| def compute_type_specific_features( | |
| self, df: pd.DataFrame, stock_type: str | |
| ) -> pd.DataFrame: | |
| """Compute features specific to a stock type.""" | |
| features = pd.DataFrame(index=df.index) | |
| if stock_type == "penny": | |
| features["volume_ratio_5d"] = df["Volume"] / df["Volume"].rolling(5).mean() | |
| features["price_level"] = df["Close"] | |
| features["intraday_range_pct"] = (df["High"] - df["Low"]) / df["Close"] | |
| elif stock_type == "reit": | |
| # REIT-specific: price relative to dividend yield proxy | |
| features["price_to_sma50_ratio"] = df["Close"] / df["Close"].rolling(50).mean() | |
| elif stock_type == "etf": | |
| # ETF mean-reversion signal | |
| features["etf_deviation_20d"] = df["Close"] / df["Close"].rolling(20).mean() - 1 | |
| return features | |
| def compute_targets( | |
| close: pd.Series, horizons: list[int] | |
| ) -> pd.DataFrame: | |
| """Compute prediction targets for given horizons.""" | |
| targets = pd.DataFrame(index=close.index) | |
| for h in horizons: | |
| future_return = close.shift(-h) / close - 1 | |
| targets[f"magnitude_{h}d"] = future_return | |
| targets[f"direction_{h}d"] = np.sign(future_return).astype("Int64") | |
| targets[f"volatility_{h}d"] = close.pct_change().rolling(h).std().shift(-h) * np.sqrt(252) | |
| return targets | |