import os import joblib import numpy as np import pandas as pd from sklearn.preprocessing import MinMaxScaler # ── Configuration ────────────────────────────────────────────── TICKERS = ["AAPL", "MSFT", "GOOGL", "AMZN", "NVDA"] RAW_DIR = os.path.join(os.path.dirname(__file__), "raw") PROCESSED_DIR = os.path.join(os.path.dirname(__file__), "processed") TRAIN_RATIO = 0.70 VAL_RATIO = 0.15 # TEST_RATIO = 0.15 (implicit: whatever remains) os.makedirs(PROCESSED_DIR, exist_ok=True) def load_and_validate(ticker: str) -> pd.DataFrame: path = os.path.join(RAW_DIR, f"{ticker}.csv") df = pd.read_csv(path, index_col="date", parse_dates=True) df.sort_index(inplace=True) # ensure chronological order print(f"\n── {ticker} ──────────────────────────────") print(f" Shape : {df.shape}") print(f" Date range : {df.index[0].date()} → {df.index[-1].date()}") print(f" Missing vals:\n{df.isnull().sum().to_string()}") return df def handle_missing(df: pd.DataFrame) -> pd.DataFrame: # Forward-fill: propagate last known value forward (e.g. market holidays) df = df.ffill() # Back-fill: handle any NaNs at the very start of the series df = df.bfill() # After both passes, if NaNs still exist, the column is structurally broken remaining = df.isnull().sum().sum() if remaining > 0: raise ValueError(f"Unfillable NaNs remain: {df.isnull().sum()}") return df # ── Trend indicators ─────────────────────────────────────────── def add_sma(df: pd.DataFrame, windows: list[int] = [20, 50]) -> pd.DataFrame: for w in windows: df[f"sma_{w}"] = df["close"].rolling(window=w).mean() return df def add_ema(df: pd.DataFrame, spans: list[int] = [12, 26]) -> pd.DataFrame: for s in spans: df[f"ema_{s}"] = df["close"].ewm(span=s, adjust=False).mean() return df # ── Momentum indicator ───────────────────────────────────────── def add_rsi(df: pd.DataFrame, period: int = 14) -> pd.DataFrame: delta = df["close"].diff() gain = delta.clip(lower=0) loss = -delta.clip(upper=0) avg_gain = gain.ewm(com=period - 1, min_periods=period).mean() avg_loss = loss.ewm(com=period - 1, min_periods=period).mean() rs = avg_gain / avg_loss df["rsi_14"] = 100 - (100 / (1 + rs)) return df # ── Trend + momentum hybrid ──────────────────────────────────── def add_macd( df: pd.DataFrame, fast: int = 12, slow: int = 26, signal: int = 9 ) -> pd.DataFrame: ema_fast = df["close"].ewm(span=fast, adjust=False).mean() ema_slow = df["close"].ewm(span=slow, adjust=False).mean() df["macd_line"] = ema_fast - ema_slow df["macd_signal"] = df["macd_line"].ewm(span=signal, adjust=False).mean() df["macd_hist"] = df["macd_line"] - df["macd_signal"] return df # ── Volatility indicator ─────────────────────────────────────── def add_bollinger_bands( df: pd.DataFrame, window: int = 20, num_std: float = 2.0 ) -> pd.DataFrame: sma = df["close"].rolling(window=window).mean() std = df["close"].rolling(window=window).std() df["bb_upper"] = sma + (num_std * std) df["bb_lower"] = sma - (num_std * std) df["bb_width"] = (df["bb_upper"] - df["bb_lower"]) / sma # normalized width return df # ── Volume indicator ─────────────────────────────────────────── def add_obv(df: pd.DataFrame) -> pd.DataFrame: """On-Balance Volume: cumulative volume flow driven by price direction.""" direction = np.sign(df["close"].diff()).fillna(0) df["obv"] = (direction * df["volume"]).cumsum() return df # ── Return features ──────────────────────────────────────────── def add_returns(df: pd.DataFrame) -> pd.DataFrame: df["daily_return"] = df["close"].pct_change() df["log_return"] = np.log(df["close"] / df["close"].shift(1)) return df # ── Trend strength indicator ─────────────────────────────────── def add_adx(df: pd.DataFrame, period: int = 14) -> pd.DataFrame: """Average Directional Index: measures trend strength (0–1 after normalisation). ADX > 0.25 (raw > 25) indicates a strong trend regardless of direction.""" high = df["high"].values low = df["low"].values close = df["close"].values tr = np.maximum( high[1:] - low[1:], np.maximum(np.abs(high[1:] - close[:-1]), np.abs(low[1:] - close[:-1])), ) dmp = np.where( high[1:] - high[:-1] > low[:-1] - low[1:], np.maximum(high[1:] - high[:-1], 0), 0, ) dmm = np.where( low[:-1] - low[1:] > high[1:] - high[:-1], np.maximum(low[:-1] - low[1:], 0), 0 ) atr = pd.Series(tr).ewm(span=period, adjust=False).mean().values pdmi = ( pd.Series(dmp / np.where(atr > 0, atr, 1e-8)) .ewm(span=period, adjust=False) .mean() .values ) mdmi = ( pd.Series(dmm / np.where(atr > 0, atr, 1e-8)) .ewm(span=period, adjust=False) .mean() .values ) denom = np.where(pdmi + mdmi > 0, pdmi + mdmi, 1e-8) dx = 100 * np.abs(pdmi - mdmi) / denom adx = ( pd.Series(np.concatenate([[np.nan], dx])).ewm(span=period, adjust=False).mean() ) df["adx_14"] = (adx.values / 100).clip(0.0, 1.0) # normalise to [0, 1] return df # ── Long-term momentum ───────────────────────────────────────── def add_long_momentum(df: pd.DataFrame) -> pd.DataFrame: """SMA-100 / SMA-200 ratio: > 1 means medium-term trend is above long-term (bullish). Clipped to [0.8, 1.2] and then rescaled to [0, 1] so the scaler sees a bounded feature.""" sma100 = df["close"].rolling(100).mean() sma200 = df["close"].rolling(200).mean() ratio = (sma100 / sma200).clip(0.8, 1.2) # Rescale [0.8, 1.2] → [0, 1] df["sma_ratio"] = (ratio - 0.8) / 0.4 return df # ── Rate of Change ───────────────────────────────────────────── def add_roc(df: pd.DataFrame, period: int = 20) -> pd.DataFrame: """20-day Rate of Change: signed momentum over a medium-term horizon.""" df["roc_20"] = df["close"].pct_change(periods=period) return df # ── Realised volatility ──────────────────────────────────────── def add_realised_vol(df: pd.DataFrame, window: int = 20) -> pd.DataFrame: """ 20-day annualised realised volatility from log-returns. Normalised to [0, 1] assuming a max equity annualised vol of 80% so the MinMaxScaler sees a bounded, stable range. This pre-computed column keeps the env's regime observation consistent with what the scaler fitted during training. """ log_ret = np.log(df["close"] / df["close"].shift(1)) rv_daily = log_ret.rolling(window).std() rv_annual = rv_daily * np.sqrt(252) df["realised_vol_20"] = (rv_annual / 0.80).clip(0.0, 1.0) return df def drop_warmup_rows(df: pd.DataFrame) -> pd.DataFrame: before = len(df) df.dropna(inplace=True) after = len(df) print(f" Dropped {before - after} warmup rows (largest window = SMA-200)") return df def split_and_save(df: pd.DataFrame, ticker: str) -> dict: n = len(df) t_end = int(n * TRAIN_RATIO) v_end = int(n * (TRAIN_RATIO + VAL_RATIO)) train = df.iloc[:t_end].copy() val = df.iloc[t_end:v_end].copy() test = df.iloc[v_end:].copy() print( f" Train : {len(train)} rows | {train.index[0].date()} → {train.index[-1].date()}" ) print( f" Val : {len(val)} rows | {val.index[0].date()} → {val.index[-1].date()}" ) print( f" Test : {len(test)} rows | {test.index[0].date()} → {test.index[-1].date()}" ) for split_name, split_df in [("train", train), ("val", val), ("test", test)]: path = os.path.join(PROCESSED_DIR, f"{ticker}_{split_name}.csv") split_df.to_csv(path) return {"train": train, "val": val, "test": test} def process_ticker(ticker: str) -> dict: df = load_and_validate(ticker) df = handle_missing(df) # Engineer all features df = add_sma(df) df = add_ema(df) df = add_rsi(df) df = add_macd(df) df = add_bollinger_bands(df) df = add_obv(df) df = add_returns(df) df = add_adx(df) df = add_long_momentum(df) df = add_roc(df) df = add_realised_vol(df) df = drop_warmup_rows(df) # Split first, then fit scaler ONLY on train to prevent leakage n = len(df) t_end = int(n * TRAIN_RATIO) v_end = int(n * (TRAIN_RATIO + VAL_RATIO)) train_raw = df.iloc[:t_end].copy() val_raw = df.iloc[t_end:v_end].copy() test_raw = df.iloc[v_end:].copy() feature_cols = list(df.columns) scaler = MinMaxScaler(feature_range=(0, 1)) scaler.fit(train_raw[feature_cols]) for split_df in [train_raw, val_raw, test_raw]: split_df[feature_cols] = scaler.transform(split_df[feature_cols]) scaler_path = os.path.join(PROCESSED_DIR, f"{ticker}_scaler.pkl") joblib.dump(scaler, scaler_path) splits = {"train": train_raw, "val": val_raw, "test": test_raw} for name, sdf in splits.items(): sdf.to_csv(os.path.join(PROCESSED_DIR, f"{ticker}_{name}.csv")) print(f" Features : {list(df.columns)}") print(f" Train rows : {len(train_raw)}") print(f" Val rows : {len(val_raw)}") print(f" Test rows : {len(test_raw)}") return splits if __name__ == "__main__": all_splits = {} for ticker in TICKERS: all_splits[ticker] = process_ticker(ticker) print("\nAll tickers processed.")