"""Feature engineering for TFT stock price prediction.""" import numpy as np import pandas as pd from typing import Optional, Tuple SEQUENCE_LEN = 60 # lookback window (TFT benefits from longer context) FORECAST_HORIZON = 30 # max multi-horizon target days FEATURE_COLS = [ "close_norm", "volume_norm", "rsi", "macd_norm", "bb_width", "day_sin", "day_cos", "atr_norm", "obv_norm", "month_sin", "month_cos", ] N_FEATURES = len(FEATURE_COLS) def _rsi(prices: np.ndarray, period: int = 14) -> np.ndarray: delta = np.diff(prices, prepend=prices[0]) gain = np.where(delta > 0, delta, 0.0) loss = np.where(delta < 0, -delta, 0.0) avg_gain = pd.Series(gain).ewm(alpha=1 / period, adjust=False).mean().values avg_loss = pd.Series(loss).ewm(alpha=1 / period, adjust=False).mean().values rs = np.where(avg_loss == 0, 100.0, avg_gain / (avg_loss + 1e-9)) return np.clip(rs / (1 + rs), 0, 1) # normalised 0-1 def _macd(prices: np.ndarray) -> np.ndarray: s = pd.Series(prices) macd_line = s.ewm(span=12, adjust=False).mean() - s.ewm(span=26, adjust=False).mean() signal = macd_line.ewm(span=9, adjust=False).mean() return (macd_line - signal).values def _bollinger_width(prices: np.ndarray, period: int = 20) -> np.ndarray: s = pd.Series(prices) sma = s.rolling(period, min_periods=1).mean() std = s.rolling(period, min_periods=1).std(ddof=0).fillna(0) mid = sma.where(sma != 0, 1) return (2 * std / mid).fillna(0).values def _atr(highs: np.ndarray, lows: np.ndarray, closes: np.ndarray, period: int = 14) -> np.ndarray: """Average True Range — when we only have closes, approximate with price range proxy.""" # If all arrays are the same (closes only), estimate via rolling std prev_close = np.roll(closes, 1) prev_close[0] = closes[0] tr = np.maximum( highs - lows, np.maximum(np.abs(highs - prev_close), np.abs(lows - prev_close)), ) return pd.Series(tr).ewm(alpha=1 / period, adjust=False).mean().values def _obv(closes: np.ndarray, volumes: np.ndarray) -> np.ndarray: """On-Balance Volume.""" direction = np.sign(np.diff(closes, prepend=closes[0])) return np.cumsum(direction * volumes) def build_features( closes: np.ndarray, volumes: np.ndarray, timestamps: np.ndarray, # unix seconds highs: Optional[np.ndarray] = None, lows: Optional[np.ndarray] = None, ) -> np.ndarray: """Return (T, N_FEATURES) feature matrix, normalised.""" # Default highs/lows to closes when not available if highs is None: highs = closes if lows is None: lows = closes # ── Price normalisation: rolling 30-day z-score ── s_close = pd.Series(closes) roll_mean = s_close.rolling(30, min_periods=1).mean().values roll_std = s_close.rolling(30, min_periods=1).std(ddof=0).fillna(1).values roll_std = np.where(roll_std == 0, 1, roll_std) close_norm = (closes - roll_mean) / roll_std # ── Volume normalisation ── s_vol = pd.Series(volumes.astype(float)) v_mean = s_vol.rolling(30, min_periods=1).mean().values v_std = s_vol.rolling(30, min_periods=1).std(ddof=0).fillna(1).values v_std = np.where(v_std == 0, 1, v_std) volume_norm = (volumes - v_mean) / v_std rsi = _rsi(closes) macd_raw = _macd(closes) macd_std = np.std(macd_raw) or 1 macd_norm = macd_raw / macd_std bb_width = _bollinger_width(closes) # ── Cyclical day-of-week encoding ── dt_index = pd.to_datetime(timestamps, unit="s") days = dt_index.dayofweek.values.astype(float) day_sin = np.sin(2 * np.pi * days / 5) day_cos = np.cos(2 * np.pi * days / 5) # ── ATR (normalised by price) ── atr_raw = _atr(highs, lows, closes) atr_norm = atr_raw / (closes + 1e-9) # ── OBV (normalised) ── obv_raw = _obv(closes, volumes) obv_std = np.std(obv_raw) or 1 obv_norm = (obv_raw - np.mean(obv_raw)) / obv_std # ── Cyclical month encoding ── months = dt_index.month.values.astype(float) month_sin = np.sin(2 * np.pi * months / 12) month_cos = np.cos(2 * np.pi * months / 12) features = np.stack( [close_norm, volume_norm, rsi, macd_norm, bb_width, day_sin, day_cos, atr_norm, obv_norm, month_sin, month_cos], axis=1, ) return features.astype(np.float32) def make_sequences( features: np.ndarray, targets: np.ndarray, seq_len: int = SEQUENCE_LEN, ) -> Tuple[np.ndarray, np.ndarray]: """ Slide windows over features to produce (X, y) training pairs. targets shape: (T, FORECAST_HORIZON) for multi-horizon training Returns X: (N, seq_len, N_FEATURES), y: (N, FORECAST_HORIZON) """ X, y = [], [] max_i = len(features) - seq_len - targets.shape[1] + 1 for i in range(max_i): X.append(features[i : i + seq_len]) y.append(targets[i + seq_len]) if not X: return np.empty((0, seq_len, features.shape[1]), dtype=np.float32), np.empty((0, targets.shape[1]), dtype=np.float32) return np.array(X, dtype=np.float32), np.array(y, dtype=np.float32)