rl-trading-agent / preprocess.py
nirmanpatel's picture
Upload 49 files
05b2d96 verified
Raw
History Blame Contribute Delete
10.5 kB
import os
import joblib
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
# ── Configuration ──────────────────────────────────────────────
TICKERS = ["AAPL", "MSFT", "GOOGL", "AMZN", "NVDA"]
RAW_DIR = os.path.join(os.path.dirname(__file__), "raw")
PROCESSED_DIR = os.path.join(os.path.dirname(__file__), "processed")
TRAIN_RATIO = 0.70
VAL_RATIO = 0.15
# TEST_RATIO = 0.15 (implicit: whatever remains)
os.makedirs(PROCESSED_DIR, exist_ok=True)
def load_and_validate(ticker: str) -> pd.DataFrame:
path = os.path.join(RAW_DIR, f"{ticker}.csv")
df = pd.read_csv(path, index_col="date", parse_dates=True)
df.sort_index(inplace=True) # ensure chronological order
print(f"\n── {ticker} ──────────────────────────────")
print(f" Shape : {df.shape}")
print(f" Date range : {df.index[0].date()} β†’ {df.index[-1].date()}")
print(f" Missing vals:\n{df.isnull().sum().to_string()}")
return df
def handle_missing(df: pd.DataFrame) -> pd.DataFrame:
# Forward-fill: propagate last known value forward (e.g. market holidays)
df = df.ffill()
# Back-fill: handle any NaNs at the very start of the series
df = df.bfill()
# After both passes, if NaNs still exist, the column is structurally broken
remaining = df.isnull().sum().sum()
if remaining > 0:
raise ValueError(f"Unfillable NaNs remain: {df.isnull().sum()}")
return df
# ── Trend indicators ───────────────────────────────────────────
def add_sma(df: pd.DataFrame, windows: list[int] = [20, 50]) -> pd.DataFrame:
for w in windows:
df[f"sma_{w}"] = df["close"].rolling(window=w).mean()
return df
def add_ema(df: pd.DataFrame, spans: list[int] = [12, 26]) -> pd.DataFrame:
for s in spans:
df[f"ema_{s}"] = df["close"].ewm(span=s, adjust=False).mean()
return df
# ── Momentum indicator ─────────────────────────────────────────
def add_rsi(df: pd.DataFrame, period: int = 14) -> pd.DataFrame:
delta = df["close"].diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.ewm(com=period - 1, min_periods=period).mean()
avg_loss = loss.ewm(com=period - 1, min_periods=period).mean()
rs = avg_gain / avg_loss
df["rsi_14"] = 100 - (100 / (1 + rs))
return df
# ── Trend + momentum hybrid ────────────────────────────────────
def add_macd(
df: pd.DataFrame, fast: int = 12, slow: int = 26, signal: int = 9
) -> pd.DataFrame:
ema_fast = df["close"].ewm(span=fast, adjust=False).mean()
ema_slow = df["close"].ewm(span=slow, adjust=False).mean()
df["macd_line"] = ema_fast - ema_slow
df["macd_signal"] = df["macd_line"].ewm(span=signal, adjust=False).mean()
df["macd_hist"] = df["macd_line"] - df["macd_signal"]
return df
# ── Volatility indicator ───────────────────────────────────────
def add_bollinger_bands(
df: pd.DataFrame, window: int = 20, num_std: float = 2.0
) -> pd.DataFrame:
sma = df["close"].rolling(window=window).mean()
std = df["close"].rolling(window=window).std()
df["bb_upper"] = sma + (num_std * std)
df["bb_lower"] = sma - (num_std * std)
df["bb_width"] = (df["bb_upper"] - df["bb_lower"]) / sma # normalized width
return df
# ── Volume indicator ───────────────────────────────────────────
def add_obv(df: pd.DataFrame) -> pd.DataFrame:
"""On-Balance Volume: cumulative volume flow driven by price direction."""
direction = np.sign(df["close"].diff()).fillna(0)
df["obv"] = (direction * df["volume"]).cumsum()
return df
# ── Return features ────────────────────────────────────────────
def add_returns(df: pd.DataFrame) -> pd.DataFrame:
df["daily_return"] = df["close"].pct_change()
df["log_return"] = np.log(df["close"] / df["close"].shift(1))
return df
# ── Trend strength indicator ───────────────────────────────────
def add_adx(df: pd.DataFrame, period: int = 14) -> pd.DataFrame:
"""Average Directional Index: measures trend strength (0–1 after normalisation).
ADX > 0.25 (raw > 25) indicates a strong trend regardless of direction."""
high = df["high"].values
low = df["low"].values
close = df["close"].values
tr = np.maximum(
high[1:] - low[1:],
np.maximum(np.abs(high[1:] - close[:-1]), np.abs(low[1:] - close[:-1])),
)
dmp = np.where(
high[1:] - high[:-1] > low[:-1] - low[1:],
np.maximum(high[1:] - high[:-1], 0),
0,
)
dmm = np.where(
low[:-1] - low[1:] > high[1:] - high[:-1], np.maximum(low[:-1] - low[1:], 0), 0
)
atr = pd.Series(tr).ewm(span=period, adjust=False).mean().values
pdmi = (
pd.Series(dmp / np.where(atr > 0, atr, 1e-8))
.ewm(span=period, adjust=False)
.mean()
.values
)
mdmi = (
pd.Series(dmm / np.where(atr > 0, atr, 1e-8))
.ewm(span=period, adjust=False)
.mean()
.values
)
denom = np.where(pdmi + mdmi > 0, pdmi + mdmi, 1e-8)
dx = 100 * np.abs(pdmi - mdmi) / denom
adx = (
pd.Series(np.concatenate([[np.nan], dx])).ewm(span=period, adjust=False).mean()
)
df["adx_14"] = (adx.values / 100).clip(0.0, 1.0) # normalise to [0, 1]
return df
# ── Long-term momentum ─────────────────────────────────────────
def add_long_momentum(df: pd.DataFrame) -> pd.DataFrame:
"""SMA-100 / SMA-200 ratio: > 1 means medium-term trend is above long-term (bullish).
Clipped to [0.8, 1.2] and then rescaled to [0, 1] so the scaler sees a bounded feature."""
sma100 = df["close"].rolling(100).mean()
sma200 = df["close"].rolling(200).mean()
ratio = (sma100 / sma200).clip(0.8, 1.2)
# Rescale [0.8, 1.2] β†’ [0, 1]
df["sma_ratio"] = (ratio - 0.8) / 0.4
return df
# ── Rate of Change ─────────────────────────────────────────────
def add_roc(df: pd.DataFrame, period: int = 20) -> pd.DataFrame:
"""20-day Rate of Change: signed momentum over a medium-term horizon."""
df["roc_20"] = df["close"].pct_change(periods=period)
return df
# ── Realised volatility ────────────────────────────────────────
def add_realised_vol(df: pd.DataFrame, window: int = 20) -> pd.DataFrame:
"""
20-day annualised realised volatility from log-returns.
Normalised to [0, 1] assuming a max equity annualised vol of 80%
so the MinMaxScaler sees a bounded, stable range.
This pre-computed column keeps the env's regime observation
consistent with what the scaler fitted during training.
"""
log_ret = np.log(df["close"] / df["close"].shift(1))
rv_daily = log_ret.rolling(window).std()
rv_annual = rv_daily * np.sqrt(252)
df["realised_vol_20"] = (rv_annual / 0.80).clip(0.0, 1.0)
return df
def drop_warmup_rows(df: pd.DataFrame) -> pd.DataFrame:
before = len(df)
df.dropna(inplace=True)
after = len(df)
print(f" Dropped {before - after} warmup rows (largest window = SMA-200)")
return df
def split_and_save(df: pd.DataFrame, ticker: str) -> dict:
n = len(df)
t_end = int(n * TRAIN_RATIO)
v_end = int(n * (TRAIN_RATIO + VAL_RATIO))
train = df.iloc[:t_end].copy()
val = df.iloc[t_end:v_end].copy()
test = df.iloc[v_end:].copy()
print(
f" Train : {len(train)} rows | {train.index[0].date()} β†’ {train.index[-1].date()}"
)
print(
f" Val : {len(val)} rows | {val.index[0].date()} β†’ {val.index[-1].date()}"
)
print(
f" Test : {len(test)} rows | {test.index[0].date()} β†’ {test.index[-1].date()}"
)
for split_name, split_df in [("train", train), ("val", val), ("test", test)]:
path = os.path.join(PROCESSED_DIR, f"{ticker}_{split_name}.csv")
split_df.to_csv(path)
return {"train": train, "val": val, "test": test}
def process_ticker(ticker: str) -> dict:
df = load_and_validate(ticker)
df = handle_missing(df)
# Engineer all features
df = add_sma(df)
df = add_ema(df)
df = add_rsi(df)
df = add_macd(df)
df = add_bollinger_bands(df)
df = add_obv(df)
df = add_returns(df)
df = add_adx(df)
df = add_long_momentum(df)
df = add_roc(df)
df = add_realised_vol(df)
df = drop_warmup_rows(df)
# Split first, then fit scaler ONLY on train to prevent leakage
n = len(df)
t_end = int(n * TRAIN_RATIO)
v_end = int(n * (TRAIN_RATIO + VAL_RATIO))
train_raw = df.iloc[:t_end].copy()
val_raw = df.iloc[t_end:v_end].copy()
test_raw = df.iloc[v_end:].copy()
feature_cols = list(df.columns)
scaler = MinMaxScaler(feature_range=(0, 1))
scaler.fit(train_raw[feature_cols])
for split_df in [train_raw, val_raw, test_raw]:
split_df[feature_cols] = scaler.transform(split_df[feature_cols])
scaler_path = os.path.join(PROCESSED_DIR, f"{ticker}_scaler.pkl")
joblib.dump(scaler, scaler_path)
splits = {"train": train_raw, "val": val_raw, "test": test_raw}
for name, sdf in splits.items():
sdf.to_csv(os.path.join(PROCESSED_DIR, f"{ticker}_{name}.csv"))
print(f" Features : {list(df.columns)}")
print(f" Train rows : {len(train_raw)}")
print(f" Val rows : {len(val_raw)}")
print(f" Test rows : {len(test_raw)}")
return splits
if __name__ == "__main__":
all_splits = {}
for ticker in TICKERS:
all_splits[ticker] = process_ticker(ticker)
print("\nAll tickers processed.")