mlStocks-pred / data_prep /features.py
AlgoX's picture
feature engneering file
69194ec
import pandas as pd
import numpy as np
def _check_price_cols(df):
required = ["Open", "High", "Low", "Close", "Volume"]
missing = [c for c in required if c not in df.columns]
if missing:
raise ValueError(f"Missing required columns: {missing}")
def _rma(series: pd.Series, n: int):
"""Wilder's moving average (RMA)."""
series = series.copy().astype(float)
out = pd.Series(np.nan, index=series.index)
if len(series) < n:
return out
out.iloc[n - 1] = series.iloc[:n].mean()
alpha = 1.0 / n
for i in range(n, len(series)):
out.iat[i] = out.iat[i - 1] * (1 - alpha) + series.iat[i] * alpha
return out
def _true_range(df):
tr1 = df["High"] - df["Low"]
tr2 = (df["High"] - df["Close"].shift(1)).abs()
tr3 = (df["Low"] - df["Close"].shift(1)).abs()
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
return tr
def add_daily_return(df: pd.DataFrame) -> pd.DataFrame:
"""
Add Daily_Return and Log_Return
"""
df = df.copy()
_check_price_cols(df)
cols = ["Open", "High", "Low", "Close", "Volume"]
df[cols] = df[cols].apply(pd.to_numeric, errors="coerce")
df["Daily_Return"] = df["Close"].pct_change()
df["Log_Return"] = np.log(df["Close"] / df["Close"].shift(1))
return df
def add_trend_indicators(
df: pd.DataFrame,
sma_periods=(5, 10, 20, 50, 100, 200),
ema_periods=(5, 10, 12, 20, 26, 50, 100, 200),
) -> pd.DataFrame:
"""
Add SMA, EMA, MACD (12/26) and MACD signal/hist
"""
df = df.copy()
_check_price_cols(df)
for p in sma_periods:
df[f"SMA_{p}"] = df["Close"].rolling(window=p, min_periods=1).mean()
for p in ema_periods:
df[f"EMA_{p}"] = df["Close"].ewm(span=p, adjust=False).mean()
ema12 = df["Close"].ewm(span=12, adjust=False).mean()
ema26 = df["Close"].ewm(span=26, adjust=False).mean()
df["MACD"] = ema12 - ema26
df["MACD_Signal"] = df["MACD"].ewm(span=9, adjust=False).mean()
df["MACD_Hist"] = df["MACD"] - df["MACD_Signal"]
return df
def add_momentum_indicators(
df: pd.DataFrame, rsi_n=14, sto_k=14, sto_d=3, cci_n=20, roc_n=12
) -> pd.DataFrame:
"""
add common momentum indicators:
-RSI
-Stochastic %K/%D
-Williams %R
-CCI
-ROC
-Momentum
-CMO
-Ultimate Oscillator.
"""
df = df.copy()
_check_price_cols(df)
close = df["Close"]
# RSI (Wilder smoothing)
delta = close.diff()
up = delta.clip(lower=0)
down = -delta.clip(upper=0)
df[f"RSI_{rsi_n}"] = 100 - (100 / (1 + (_rma(up, rsi_n) / _rma(down, rsi_n))))
# Stochastic %K and %D
low_min = df["Low"].rolling(window=sto_k).min()
high_max = df["High"].rolling(window=sto_k).max()
df["Sto_%K"] = 100 * (close - low_min) / (high_max - low_min)
df["Sto_%D"] = df["Sto_%K"].rolling(window=sto_d).mean()
# Williams %R
df[f"Williams_%R_{sto_k}"] = -100 * (high_max - close) / (high_max - low_min)
# CCI
tp = (df["High"] + df["Low"] + df["Close"]) / 3.0
sma_tp = tp.rolling(cci_n).mean()
mad = tp.rolling(cci_n).apply(lambda x: np.mean(np.abs(x - np.mean(x))), raw=True)
df[f"CCI_{cci_n}"] = (tp - sma_tp) / (0.015 * mad)
# ROC and Momentum
df[f"ROC_{roc_n}"] = close.pct_change(periods=roc_n)
df[f"Momentum_{roc_n}"] = close - close.shift(roc_n)
# CMO (Chande Momentum Oscillator)
up_sum = delta.clip(lower=0).rolling(rsi_n).sum()
down_sum = -delta.clip(upper=0).rolling(rsi_n).sum()
df[f"CMO_{rsi_n}"] = 100 * (up_sum - down_sum) / (up_sum + down_sum)
# Ultimate Oscillator (7,14,28 default)
def _ultimate_osc(df_, s1=7, s2=14, s3=28):
bp = df_["Close"] - df_[["Low", "Close"]].shift(1).min(axis=1)
tr = _true_range(df_)
avg1 = bp.rolling(s1).sum() / tr.rolling(s1).sum()
avg2 = bp.rolling(s2).sum() / tr.rolling(s2).sum()
avg3 = bp.rolling(s3).sum() / tr.rolling(s3).sum()
return 100 * ((4 * avg1) + (2 * avg2) + (1 * avg3)) / (4 + 2 + 1)
df["Ultimate_Osc"] = _ultimate_osc(df)
return df
def add_volume_indicators(df: pd.DataFrame) -> pd.DataFrame:
"""
Add volume-based indicators:
= OBV
- CMF
- ADL
- VPT
- Volume Oscillator
- MFI
- Force Index.
"""
df = df.copy()
_check_price_cols(df)
# OBV
df["OBV"] = ((np.sign(df["Close"].diff()) * df["Volume"]).fillna(0)).cumsum()
# Money Flow Multiplier and Chaikin Money Flow
mf_mult = ((df["Close"] - df["Low"]) - (df["High"] - df["Close"])) / (
df["High"] - df["Low"]
)
mf_mult = mf_mult.replace([np.inf, -np.inf], 0).fillna(0)
mf_volume = mf_mult * df["Volume"]
df["CMF_20"] = mf_volume.rolling(20).sum() / df["Volume"].rolling(20).sum()
# ADL
df["ADL"] = mf_volume.cumsum()
# VPT
df["VPT"] = (df["Volume"] * df["Close"].pct_change()).fillna(0).cumsum()
# Volume Oscillator (short=10, long=20)
df["VO_short_10"] = df["Volume"].rolling(10).mean()
df["VO_long_20"] = df["Volume"].rolling(20).mean()
df["Volume_Osc"] = (df["VO_short_10"] - df["VO_long_20"]) / df["VO_long_20"]
# MFI (Money Flow Index)
tp = (df["High"] + df["Low"] + df["Close"]) / 3.0
mf = tp * df["Volume"]
pos_mf = mf.where(tp > tp.shift(1), 0)
neg_mf = mf.where(tp < tp.shift(1), 0)
df["MFI_14"] = 100 - (
100 / (1 + (pos_mf.rolling(14).sum() / neg_mf.rolling(14).sum()))
)
# Force Index
df["ForceIndex_1"] = df["Close"].diff(1) * df["Volume"]
df["ForceIndex_EMA_13"] = df["ForceIndex_1"].ewm(span=13, adjust=False).mean()
return df
def add_volatility_indicators(df: pd.DataFrame) -> pd.DataFrame:
"""
Add
- ATR
- Bollinger Bands
- Donchian channels
"""
df = df.copy()
_check_price_cols(df)
tr = _true_range(df)
df["TR"] = tr
df["ATR_14"] = tr.rolling(14).mean()
# Bollinger Bands (20,2)
n = 20
df["BB_MID_20"] = df["Close"].rolling(n).mean()
df["BB_STD_20"] = df["Close"].rolling(n).std()
df["BB_UPPER_20"] = df["BB_MID_20"] + 2 * df["BB_STD_20"]
df["BB_LOWER_20"] = df["BB_MID_20"] - 2 * df["BB_STD_20"]
df["BB_Width"] = (df["BB_UPPER_20"] - df["BB_LOWER_20"]) / df["BB_MID_20"]
# Donchian (20)
dc = 20
df["Donchian_High_20"] = df["High"].rolling(dc).max()
df["Donchian_Low_20"] = df["Low"].rolling(dc).min()
df["Donchian_Mid_20"] = (df["Donchian_High_20"] + df["Donchian_Low_20"]) / 2.0
return df
# made by chatgpt because I have no idea how formula for these even works
def add_hybrid_indicators(df: pd.DataFrame) -> pd.DataFrame:
"""
Add VWAP (cumulative), Heikin-Ashi candles, ADX, Aroon, Vortex.
Each is implemented in a focused manner.
"""
df = df.copy()
_check_price_cols(df)
# VWAP cumulative
tp = (df["High"] + df["Low"] + df["Close"]) / 3.0
cum_vp = (tp * df["Volume"]).cumsum()
cum_vol = df["Volume"].cumsum().replace(0, np.nan)
df["VWAP_cum"] = cum_vp / cum_vol
# Heikin-Ashi
ha_close = (df["Open"] + df["High"] + df["Low"] + df["Close"]) / 4.0
ha_open = ha_close.copy()
if len(ha_open) > 0:
ha_open.iloc[0] = (df["Open"].iloc[0] + df["Close"].iloc[0]) / 2.0
for i in range(1, len(ha_open)):
ha_open.iat[i] = (ha_open.iat[i - 1] + ha_close.iat[i - 1]) / 2.0
df["HA_Open"] = ha_open
df["HA_Close"] = ha_close
df["HA_High"] = df[["High", "HA_Open", "HA_Close"]].max(axis=1)
df["HA_Low"] = df[["Low", "HA_Open", "HA_Close"]].min(axis=1)
# ADX (using DI sums approach)
def _adx(df_, n=14):
up_move = df_["High"].diff()
down_move = -df_["Low"].diff()
plus_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0.0)
minus_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0.0)
tr = _true_range(df_)
atr = tr.rolling(n).mean()
plus_dm_sm = pd.Series(plus_dm, index=df_.index).rolling(window=n).sum()
minus_dm_sm = pd.Series(minus_dm, index=df_.index).rolling(window=n).sum()
plus_di = 100 * (plus_dm_sm / atr)
minus_di = 100 * (minus_dm_sm / atr)
dx = (abs(plus_di - minus_di) / (plus_di + minus_di)) * 100
adx = dx.rolling(n).mean()
return plus_di, minus_di, adx
df["+DI_14"], df["-DI_14"], df["ADX_14"] = _adx(df, 14)
# Aroon (n=25)
def _aroon(df_, n=25):
# Aroon up/down in percentage. This implementation uses rolling apply.
def single_aroon_up(arr):
# arr is an array of highs in the window
idx = np.argmax(arr)
periods_since_high = (len(arr) - 1) - idx
return ((n - periods_since_high) / n) * 100.0
def single_aroon_down(arr):
idx = np.argmin(arr)
periods_since_low = (len(arr) - 1) - idx
return ((n - periods_since_low) / n) * 100.0
aroon_up = df_["High"].rolling(window=n).apply(single_aroon_up, raw=True)
aroon_down = df_["Low"].rolling(window=n).apply(single_aroon_down, raw=True)
return aroon_up, aroon_down
try:
df["Aroon_Up_25"], df["Aroon_Down_25"] = _aroon(df, 25)
except Exception:
df["Aroon_Up_25"] = np.nan
df["Aroon_Down_25"] = np.nan
# Vortex Indicator
def _vortex(df_, n=14):
tr = _true_range(df_)
trn = tr.rolling(n).sum()
vmp = (df_["High"] - df_["Low"].shift(1)).abs().rolling(n).sum()
vmm = (df_["Low"] - df_["High"].shift(1)).abs().rolling(n).sum()
vip = vmp / trn
vim = vmm / trn
return vip, vim
df["Vortex_Pos_14"], df["Vortex_Neg_14"] = _vortex(df, 14)
return df
def add_all_indicators(df: pd.DataFrame) -> pd.DataFrame:
"""
Convenience wrapper that runs all modular functions in a safe order.
"""
df = df.copy()
df = add_daily_return(df)
df = add_trend_indicators(df)
df = add_momentum_indicators(df)
df = add_volume_indicators(df)
df = add_volatility_indicators(df)
df = add_hybrid_indicators(df)
# cleanup infinities
df.replace([np.inf, -np.inf], np.nan, inplace=True)
return df