Spaces:
Sleeping
Sleeping
| """Feature engineering for TFT stock price prediction.""" | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Optional, Tuple | |
| SEQUENCE_LEN = 60 # lookback window (TFT benefits from longer context) | |
| FORECAST_HORIZON = 30 # max multi-horizon target days | |
| FEATURE_COLS = [ | |
| "close_norm", | |
| "volume_norm", | |
| "rsi", | |
| "macd_norm", | |
| "bb_width", | |
| "day_sin", | |
| "day_cos", | |
| "atr_norm", | |
| "obv_norm", | |
| "month_sin", | |
| "month_cos", | |
| ] | |
| N_FEATURES = len(FEATURE_COLS) | |
| def _rsi(prices: np.ndarray, period: int = 14) -> np.ndarray: | |
| delta = np.diff(prices, prepend=prices[0]) | |
| gain = np.where(delta > 0, delta, 0.0) | |
| loss = np.where(delta < 0, -delta, 0.0) | |
| avg_gain = pd.Series(gain).ewm(alpha=1 / period, adjust=False).mean().values | |
| avg_loss = pd.Series(loss).ewm(alpha=1 / period, adjust=False).mean().values | |
| rs = np.where(avg_loss == 0, 100.0, avg_gain / (avg_loss + 1e-9)) | |
| return np.clip(rs / (1 + rs), 0, 1) # normalised 0-1 | |
| def _macd(prices: np.ndarray) -> np.ndarray: | |
| s = pd.Series(prices) | |
| macd_line = s.ewm(span=12, adjust=False).mean() - s.ewm(span=26, adjust=False).mean() | |
| signal = macd_line.ewm(span=9, adjust=False).mean() | |
| return (macd_line - signal).values | |
| def _bollinger_width(prices: np.ndarray, period: int = 20) -> np.ndarray: | |
| s = pd.Series(prices) | |
| sma = s.rolling(period, min_periods=1).mean() | |
| std = s.rolling(period, min_periods=1).std(ddof=0).fillna(0) | |
| mid = sma.where(sma != 0, 1) | |
| return (2 * std / mid).fillna(0).values | |
| def _atr(highs: np.ndarray, lows: np.ndarray, closes: np.ndarray, period: int = 14) -> np.ndarray: | |
| """Average True Range β when we only have closes, approximate with price range proxy.""" | |
| # If all arrays are the same (closes only), estimate via rolling std | |
| prev_close = np.roll(closes, 1) | |
| prev_close[0] = closes[0] | |
| tr = np.maximum( | |
| highs - lows, | |
| np.maximum(np.abs(highs - prev_close), np.abs(lows - prev_close)), | |
| ) | |
| return pd.Series(tr).ewm(alpha=1 / period, adjust=False).mean().values | |
| def _obv(closes: np.ndarray, volumes: np.ndarray) -> np.ndarray: | |
| """On-Balance Volume.""" | |
| direction = np.sign(np.diff(closes, prepend=closes[0])) | |
| return np.cumsum(direction * volumes) | |
| def build_features( | |
| closes: np.ndarray, | |
| volumes: np.ndarray, | |
| timestamps: np.ndarray, # unix seconds | |
| highs: Optional[np.ndarray] = None, | |
| lows: Optional[np.ndarray] = None, | |
| ) -> np.ndarray: | |
| """Return (T, N_FEATURES) feature matrix, normalised.""" | |
| # Default highs/lows to closes when not available | |
| if highs is None: | |
| highs = closes | |
| if lows is None: | |
| lows = closes | |
| # ββ Price normalisation: rolling 30-day z-score ββ | |
| s_close = pd.Series(closes) | |
| roll_mean = s_close.rolling(30, min_periods=1).mean().values | |
| roll_std = s_close.rolling(30, min_periods=1).std(ddof=0).fillna(1).values | |
| roll_std = np.where(roll_std == 0, 1, roll_std) | |
| close_norm = (closes - roll_mean) / roll_std | |
| # ββ Volume normalisation ββ | |
| s_vol = pd.Series(volumes.astype(float)) | |
| v_mean = s_vol.rolling(30, min_periods=1).mean().values | |
| v_std = s_vol.rolling(30, min_periods=1).std(ddof=0).fillna(1).values | |
| v_std = np.where(v_std == 0, 1, v_std) | |
| volume_norm = (volumes - v_mean) / v_std | |
| rsi = _rsi(closes) | |
| macd_raw = _macd(closes) | |
| macd_std = np.std(macd_raw) or 1 | |
| macd_norm = macd_raw / macd_std | |
| bb_width = _bollinger_width(closes) | |
| # ββ Cyclical day-of-week encoding ββ | |
| dt_index = pd.to_datetime(timestamps, unit="s") | |
| days = dt_index.dayofweek.values.astype(float) | |
| day_sin = np.sin(2 * np.pi * days / 5) | |
| day_cos = np.cos(2 * np.pi * days / 5) | |
| # ββ ATR (normalised by price) ββ | |
| atr_raw = _atr(highs, lows, closes) | |
| atr_norm = atr_raw / (closes + 1e-9) | |
| # ββ OBV (normalised) ββ | |
| obv_raw = _obv(closes, volumes) | |
| obv_std = np.std(obv_raw) or 1 | |
| obv_norm = (obv_raw - np.mean(obv_raw)) / obv_std | |
| # ββ Cyclical month encoding ββ | |
| months = dt_index.month.values.astype(float) | |
| month_sin = np.sin(2 * np.pi * months / 12) | |
| month_cos = np.cos(2 * np.pi * months / 12) | |
| features = np.stack( | |
| [close_norm, volume_norm, rsi, macd_norm, bb_width, | |
| day_sin, day_cos, atr_norm, obv_norm, month_sin, month_cos], | |
| axis=1, | |
| ) | |
| return features.astype(np.float32) | |
| def make_sequences( | |
| features: np.ndarray, | |
| targets: np.ndarray, | |
| seq_len: int = SEQUENCE_LEN, | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Slide windows over features to produce (X, y) training pairs. | |
| targets shape: (T, FORECAST_HORIZON) for multi-horizon training | |
| Returns X: (N, seq_len, N_FEATURES), y: (N, FORECAST_HORIZON) | |
| """ | |
| X, y = [], [] | |
| max_i = len(features) - seq_len - targets.shape[1] + 1 | |
| for i in range(max_i): | |
| X.append(features[i : i + seq_len]) | |
| y.append(targets[i + seq_len]) | |
| if not X: | |
| return np.empty((0, seq_len, features.shape[1]), dtype=np.float32), np.empty((0, targets.shape[1]), dtype=np.float32) | |
| return np.array(X, dtype=np.float32), np.array(y, dtype=np.float32) | |