stockpro-ml / app /services /feature_engineer.py
will702's picture
fix: move Optional import to top of feature_engineer.py
65a23c3
"""Feature engineering for TFT stock price prediction."""
import numpy as np
import pandas as pd
from typing import Optional, Tuple
SEQUENCE_LEN = 60 # lookback window (TFT benefits from longer context)
FORECAST_HORIZON = 30 # max multi-horizon target days
FEATURE_COLS = [
"close_norm",
"volume_norm",
"rsi",
"macd_norm",
"bb_width",
"day_sin",
"day_cos",
"atr_norm",
"obv_norm",
"month_sin",
"month_cos",
]
N_FEATURES = len(FEATURE_COLS)
def _rsi(prices: np.ndarray, period: int = 14) -> np.ndarray:
delta = np.diff(prices, prepend=prices[0])
gain = np.where(delta > 0, delta, 0.0)
loss = np.where(delta < 0, -delta, 0.0)
avg_gain = pd.Series(gain).ewm(alpha=1 / period, adjust=False).mean().values
avg_loss = pd.Series(loss).ewm(alpha=1 / period, adjust=False).mean().values
rs = np.where(avg_loss == 0, 100.0, avg_gain / (avg_loss + 1e-9))
return np.clip(rs / (1 + rs), 0, 1) # normalised 0-1
def _macd(prices: np.ndarray) -> np.ndarray:
s = pd.Series(prices)
macd_line = s.ewm(span=12, adjust=False).mean() - s.ewm(span=26, adjust=False).mean()
signal = macd_line.ewm(span=9, adjust=False).mean()
return (macd_line - signal).values
def _bollinger_width(prices: np.ndarray, period: int = 20) -> np.ndarray:
s = pd.Series(prices)
sma = s.rolling(period, min_periods=1).mean()
std = s.rolling(period, min_periods=1).std(ddof=0).fillna(0)
mid = sma.where(sma != 0, 1)
return (2 * std / mid).fillna(0).values
def _atr(highs: np.ndarray, lows: np.ndarray, closes: np.ndarray, period: int = 14) -> np.ndarray:
"""Average True Range β€” when we only have closes, approximate with price range proxy."""
# If all arrays are the same (closes only), estimate via rolling std
prev_close = np.roll(closes, 1)
prev_close[0] = closes[0]
tr = np.maximum(
highs - lows,
np.maximum(np.abs(highs - prev_close), np.abs(lows - prev_close)),
)
return pd.Series(tr).ewm(alpha=1 / period, adjust=False).mean().values
def _obv(closes: np.ndarray, volumes: np.ndarray) -> np.ndarray:
"""On-Balance Volume."""
direction = np.sign(np.diff(closes, prepend=closes[0]))
return np.cumsum(direction * volumes)
def build_features(
closes: np.ndarray,
volumes: np.ndarray,
timestamps: np.ndarray, # unix seconds
highs: Optional[np.ndarray] = None,
lows: Optional[np.ndarray] = None,
) -> np.ndarray:
"""Return (T, N_FEATURES) feature matrix, normalised."""
# Default highs/lows to closes when not available
if highs is None:
highs = closes
if lows is None:
lows = closes
# ── Price normalisation: rolling 30-day z-score ──
s_close = pd.Series(closes)
roll_mean = s_close.rolling(30, min_periods=1).mean().values
roll_std = s_close.rolling(30, min_periods=1).std(ddof=0).fillna(1).values
roll_std = np.where(roll_std == 0, 1, roll_std)
close_norm = (closes - roll_mean) / roll_std
# ── Volume normalisation ──
s_vol = pd.Series(volumes.astype(float))
v_mean = s_vol.rolling(30, min_periods=1).mean().values
v_std = s_vol.rolling(30, min_periods=1).std(ddof=0).fillna(1).values
v_std = np.where(v_std == 0, 1, v_std)
volume_norm = (volumes - v_mean) / v_std
rsi = _rsi(closes)
macd_raw = _macd(closes)
macd_std = np.std(macd_raw) or 1
macd_norm = macd_raw / macd_std
bb_width = _bollinger_width(closes)
# ── Cyclical day-of-week encoding ──
dt_index = pd.to_datetime(timestamps, unit="s")
days = dt_index.dayofweek.values.astype(float)
day_sin = np.sin(2 * np.pi * days / 5)
day_cos = np.cos(2 * np.pi * days / 5)
# ── ATR (normalised by price) ──
atr_raw = _atr(highs, lows, closes)
atr_norm = atr_raw / (closes + 1e-9)
# ── OBV (normalised) ──
obv_raw = _obv(closes, volumes)
obv_std = np.std(obv_raw) or 1
obv_norm = (obv_raw - np.mean(obv_raw)) / obv_std
# ── Cyclical month encoding ──
months = dt_index.month.values.astype(float)
month_sin = np.sin(2 * np.pi * months / 12)
month_cos = np.cos(2 * np.pi * months / 12)
features = np.stack(
[close_norm, volume_norm, rsi, macd_norm, bb_width,
day_sin, day_cos, atr_norm, obv_norm, month_sin, month_cos],
axis=1,
)
return features.astype(np.float32)
def make_sequences(
features: np.ndarray,
targets: np.ndarray,
seq_len: int = SEQUENCE_LEN,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Slide windows over features to produce (X, y) training pairs.
targets shape: (T, FORECAST_HORIZON) for multi-horizon training
Returns X: (N, seq_len, N_FEATURES), y: (N, FORECAST_HORIZON)
"""
X, y = [], []
max_i = len(features) - seq_len - targets.shape[1] + 1
for i in range(max_i):
X.append(features[i : i + seq_len])
y.append(targets[i + seq_len])
if not X:
return np.empty((0, seq_len, features.shape[1]), dtype=np.float32), np.empty((0, targets.shape[1]), dtype=np.float32)
return np.array(X, dtype=np.float32), np.array(y, dtype=np.float32)