""" Feature Engine Module ===================== Computes OHLCV features, technical indicators, volatility metrics, market regime detection, and sentiment features. Inspired by: - Kronos (2508.02739): OHLCVA K-line tokenization - PatchTST (2211.14730): Patch-based time series representation - FinMultiTime (2506.05019): Multi-modal financial features """ import numpy as np import pandas as pd from typing import Dict, List, Optional, Tuple import ta class FeatureEngine: """Comprehensive feature engineering for financial time series.""" def __init__(self, lookback_window: int = 60, prediction_horizons: List[int] = [1, 5, 20]): """ Args: lookback_window: Number of periods for feature computation prediction_horizons: Short (1), mid (5), long (20) term horizons """ self.lookback_window = lookback_window self.prediction_horizons = prediction_horizons self.feature_names = [] def compute_all_features(self, df: pd.DataFrame) -> pd.DataFrame: """ Compute all features from OHLCV data. Args: df: DataFrame with columns [open, high, low, close, volume] Returns: DataFrame with all computed features """ features = df.copy() # 1. Price-based features features = self._compute_price_features(features) # 2. Technical indicators (RSI, MACD, ATR, EMA, Bollinger) features = self._compute_technical_indicators(features) # 3. Volatility metrics features = self._compute_volatility_features(features) # 4. Volume features features = self._compute_volume_features(features) # 5. Market regime features features = self._compute_regime_features(features) # 6. Return targets for multi-horizon prediction features = self._compute_targets(features) # Drop NaN rows from indicator computation features = features.dropna().reset_index(drop=True) self.feature_names = [c for c in features.columns if c not in ['open', 'high', 'low', 'close', 'volume', 'timestamp', 'date', 'symbol'] and 'target' not in c and 'direction' not in c] return features def _compute_price_features(self, df: pd.DataFrame) -> pd.DataFrame: """Compute raw price-derived features.""" df = df.copy() # Log returns df['log_return'] = np.log(df['close'] / df['close'].shift(1)) # Price ratios df['high_low_ratio'] = df['high'] / df['low'] df['close_open_ratio'] = df['close'] / df['open'] # Candlestick body and shadows (Kronos-inspired OHLCVA encoding) df['body'] = df['close'] - df['open'] df['upper_shadow'] = df['high'] - df[['close', 'open']].max(axis=1) df['lower_shadow'] = df[['close', 'open']].min(axis=1) - df['low'] df['body_ratio'] = df['body'] / (df['high'] - df['low'] + 1e-8) # Price momentum for period in [5, 10, 20]: df[f'momentum_{period}'] = df['close'] / df['close'].shift(period) - 1 df[f'sma_{period}'] = df['close'].rolling(period).mean() df[f'price_to_sma_{period}'] = df['close'] / df[f'sma_{period}'] # Gap analysis df['gap'] = df['open'] / df['close'].shift(1) - 1 return df def _compute_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame: """Compute standard technical analysis indicators using ta library.""" df = df.copy() # RSI (multiple periods) df['rsi_14'] = ta.momentum.RSIIndicator(close=df['close'], window=14).rsi() df['rsi_7'] = ta.momentum.RSIIndicator(close=df['close'], window=7).rsi() # MACD macd = ta.trend.MACD(close=df['close']) df['macd'] = macd.macd() df['macd_signal'] = macd.macd_signal() df['macd_histogram'] = macd.macd_diff() # ATR (Average True Range) df['atr_14'] = ta.volatility.AverageTrueRange( high=df['high'], low=df['low'], close=df['close'], window=14 ).average_true_range() df['atr_ratio'] = df['atr_14'] / df['close'] # EMAs for period in [9, 21, 50]: df[f'ema_{period}'] = ta.trend.EMAIndicator(close=df['close'], window=period).ema_indicator() df[f'price_to_ema_{period}'] = df['close'] / df[f'ema_{period}'] # Bollinger Bands bb = ta.volatility.BollingerBands(close=df['close'], window=20, window_dev=2) df['bb_upper'] = bb.bollinger_hband() df['bb_lower'] = bb.bollinger_lband() df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['close'] df['bb_position'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'] + 1e-8) # Stochastic Oscillator stoch = ta.momentum.StochasticOscillator( high=df['high'], low=df['low'], close=df['close'] ) df['stoch_k'] = stoch.stoch() df['stoch_d'] = stoch.stoch_signal() # ADX (Average Directional Index) adx = ta.trend.ADXIndicator(high=df['high'], low=df['low'], close=df['close']) df['adx'] = adx.adx() df['di_plus'] = adx.adx_pos() df['di_minus'] = adx.adx_neg() # Williams %R df['williams_r'] = ta.momentum.WilliamsRIndicator( high=df['high'], low=df['low'], close=df['close'] ).williams_r() # CCI (Commodity Channel Index) df['cci'] = ta.trend.CCIIndicator( high=df['high'], low=df['low'], close=df['close'] ).cci() return df def _compute_volatility_features(self, df: pd.DataFrame) -> pd.DataFrame: """Compute volatility metrics for risk modeling.""" df = df.copy() # Realized volatility (multiple windows) for window in [5, 10, 20]: df[f'realized_vol_{window}'] = df['log_return'].rolling(window).std() * np.sqrt(252) # Garman-Klass volatility estimator df['gk_vol'] = np.sqrt( 0.5 * np.log(df['high'] / df['low'])**2 - (2 * np.log(2) - 1) * np.log(df['close'] / df['open'])**2 ) df['gk_vol_20'] = df['gk_vol'].rolling(20).mean() # Parkinson volatility df['parkinson_vol'] = np.sqrt( 1 / (4 * np.log(2)) * np.log(df['high'] / df['low'])**2 ) df['parkinson_vol_20'] = df['parkinson_vol'].rolling(20).mean() # Volatility ratio (short-term vs long-term) df['vol_ratio'] = df['realized_vol_5'] / (df['realized_vol_20'] + 1e-8) # Volatility of volatility (vol-of-vol) df['vol_of_vol'] = df['realized_vol_5'].rolling(10).std() return df def _compute_volume_features(self, df: pd.DataFrame) -> pd.DataFrame: """Compute volume-based features.""" df = df.copy() # Volume moving averages for period in [5, 10, 20]: df[f'vol_sma_{period}'] = df['volume'].rolling(period).mean() df[f'vol_ratio_{period}'] = df['volume'] / (df[f'vol_sma_{period}'] + 1e-8) # On-Balance Volume (OBV) df['obv'] = ta.volume.OnBalanceVolumeIndicator( close=df['close'], volume=df['volume'] ).on_balance_volume() df['obv_sma'] = df['obv'].rolling(20).mean() df['obv_ratio'] = df['obv'] / (df['obv_sma'] + 1e-8) # Volume-Price Trend df['vpt'] = ta.volume.VolumePriceTrendIndicator( close=df['close'], volume=df['volume'] ).volume_price_trend() # VWAP approximation df['vwap'] = (df['volume'] * (df['high'] + df['low'] + df['close']) / 3).cumsum() / df['volume'].cumsum() df['price_to_vwap'] = df['close'] / (df['vwap'] + 1e-8) # Money Flow Index df['mfi'] = ta.volume.MFIIndicator( high=df['high'], low=df['low'], close=df['close'], volume=df['volume'] ).money_flow_index() return df def _compute_regime_features(self, df: pd.DataFrame) -> pd.DataFrame: """ Market regime detection features. Regimes: Trending (bullish/bearish), Mean-reverting, High-volatility Based on ADX, volatility clustering, and trend strength. """ df = df.copy() # Trend strength (based on ADX and EMAs) if 'adx' in df.columns: df['is_trending'] = (df['adx'] > 25).astype(float) df['trend_direction'] = np.where( df['ema_9'] > df['ema_21'], 1.0, -1.0 ) df['trend_strength'] = df['is_trending'] * df['trend_direction'] # Regime: volatility regime vol_median = df['realized_vol_20'].rolling(60).median() df['high_vol_regime'] = (df['realized_vol_20'] > vol_median).astype(float) # Regime: mean reversion tendency # Hurst exponent approximation (simple R/S analysis) window = 20 returns = df['log_return'] cumdev = (returns - returns.rolling(window).mean()).rolling(window).sum() r_range = cumdev.rolling(window).max() - cumdev.rolling(window).min() s = returns.rolling(window).std() df['hurst_approx'] = np.log(r_range / (s + 1e-8) + 1e-8) / np.log(window) # Regime classification: 0=mean-reverting, 1=random walk, 2=trending df['regime_class'] = np.where( df['hurst_approx'] < 0.4, 0, np.where(df['hurst_approx'] > 0.6, 2, 1) ) # EMA crossover signals df['ema_cross_9_21'] = np.where( (df['ema_9'] > df['ema_21']) & (df['ema_9'].shift(1) <= df['ema_21'].shift(1)), 1, np.where( (df['ema_9'] < df['ema_21']) & (df['ema_9'].shift(1) >= df['ema_21'].shift(1)), -1, 0 ) ).astype(float) return df def _compute_targets(self, df: pd.DataFrame) -> pd.DataFrame: """Compute multi-horizon prediction targets.""" df = df.copy() for h in self.prediction_horizons: # Return target (continuous) df[f'target_return_{h}'] = df['close'].shift(-h) / df['close'] - 1 # Direction target (binary: 1=up, 0=down) df[f'target_direction_{h}'] = (df[f'target_return_{h}'] > 0).astype(float) # Magnitude target (absolute return) df[f'target_magnitude_{h}'] = df[f'target_return_{h}'].abs() return df def normalize_features(self, df: pd.DataFrame, method: str = 'zscore') -> Tuple[pd.DataFrame, Dict]: """ Normalize features using z-score or min-max. Returns: Normalized DataFrame and normalization parameters """ feature_cols = self.feature_names norm_params = {} df_norm = df.copy() for col in feature_cols: if col in df_norm.columns: if method == 'zscore': mean = df_norm[col].mean() std = df_norm[col].std() + 1e-8 df_norm[col] = (df_norm[col] - mean) / std norm_params[col] = {'mean': mean, 'std': std} elif method == 'minmax': min_val = df_norm[col].min() max_val = df_norm[col].max() df_norm[col] = (df_norm[col] - min_val) / (max_val - min_val + 1e-8) norm_params[col] = {'min': min_val, 'max': max_val} return df_norm, norm_params def create_sequences(self, df: pd.DataFrame, feature_cols: List[str] = None, target_cols: List[str] = None) -> Tuple[np.ndarray, np.ndarray]: """ Create windowed sequences for model input. PatchTST-style: (batch, channels, sequence_length) Args: df: Feature DataFrame feature_cols: Columns to use as input features target_cols: Columns to use as targets Returns: X: (N, num_features, lookback_window) y: (N, num_targets) """ if feature_cols is None: feature_cols = self.feature_names if target_cols is None: target_cols = [c for c in df.columns if 'target' in c] # Filter to existing columns feature_cols = [c for c in feature_cols if c in df.columns] target_cols = [c for c in target_cols if c in df.columns] X_data = df[feature_cols].values y_data = df[target_cols].values X_sequences = [] y_sequences = [] for i in range(self.lookback_window, len(df)): X_sequences.append(X_data[i - self.lookback_window:i].T) # (features, lookback) y_sequences.append(y_data[i]) return np.array(X_sequences, dtype=np.float32), np.array(y_sequences, dtype=np.float32) class SentimentFeatureEngine: """ Process sentiment from financial news/tweets. Inspired by FinMultiTime (2506.05019) multi-modal approach. Supports pre-computed sentiment scores. """ def __init__(self): self.sentiment_vocab = { 'bullish': 1.0, 'bearish': -1.0, 'upgrade': 0.8, 'downgrade': -0.8, 'beat': 0.6, 'miss': -0.6, 'growth': 0.5, 'decline': -0.5, 'profit': 0.4, 'loss': -0.4, 'buy': 0.7, 'sell': -0.7, 'outperform': 0.8, 'underperform': -0.8, 'raise': 0.5, 'cut': -0.5, 'positive': 0.6, 'negative': -0.6, 'strong': 0.4, 'weak': -0.4, 'rally': 0.7, 'crash': -0.9, 'surge': 0.8, 'plunge': -0.8, 'breakout': 0.6, 'breakdown': -0.6, 'recovery': 0.5, 'recession': -0.7, } def compute_rule_based_sentiment(self, text: str) -> float: """Simple rule-based sentiment scorer using financial lexicon.""" text_lower = text.lower() score = 0.0 count = 0 for word, value in self.sentiment_vocab.items(): if word in text_lower: score += value count += 1 return score / max(count, 1) def aggregate_daily_sentiment(self, sentiments: pd.DataFrame, date_col: str = 'date', score_col: str = 'sentiment') -> pd.DataFrame: """ Aggregate sentiment scores to daily level. Returns: DataFrame with daily sentiment features: - mean sentiment - sentiment std (disagreement) - sentiment count (attention) - positive ratio """ daily = sentiments.groupby(date_col).agg( sentiment_mean=(score_col, 'mean'), sentiment_std=(score_col, 'std'), sentiment_count=(score_col, 'count'), sentiment_positive_ratio=(score_col, lambda x: (x > 0).mean()), ).reset_index() daily['sentiment_std'] = daily['sentiment_std'].fillna(0) # Momentum of sentiment daily['sentiment_momentum_3'] = daily['sentiment_mean'].rolling(3).mean() daily['sentiment_momentum_7'] = daily['sentiment_mean'].rolling(7).mean() # Sentiment reversal signal daily['sentiment_reversal'] = daily['sentiment_mean'] - daily['sentiment_momentum_7'] return daily