Spaces:
Running
Running
| import os | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.preprocessing import MinMaxScaler | |
| # ββ Configuration ββββββββββββββββββββββββββββββββββββββββββββββ | |
| TICKERS = ["AAPL", "MSFT", "GOOGL", "AMZN", "NVDA"] | |
| RAW_DIR = os.path.join(os.path.dirname(__file__), "raw") | |
| PROCESSED_DIR = os.path.join(os.path.dirname(__file__), "processed") | |
| TRAIN_RATIO = 0.70 | |
| VAL_RATIO = 0.15 | |
| # TEST_RATIO = 0.15 (implicit: whatever remains) | |
| os.makedirs(PROCESSED_DIR, exist_ok=True) | |
| def load_and_validate(ticker: str) -> pd.DataFrame: | |
| path = os.path.join(RAW_DIR, f"{ticker}.csv") | |
| df = pd.read_csv(path, index_col="date", parse_dates=True) | |
| df.sort_index(inplace=True) # ensure chronological order | |
| print(f"\nββ {ticker} ββββββββββββββββββββββββββββββ") | |
| print(f" Shape : {df.shape}") | |
| print(f" Date range : {df.index[0].date()} β {df.index[-1].date()}") | |
| print(f" Missing vals:\n{df.isnull().sum().to_string()}") | |
| return df | |
| def handle_missing(df: pd.DataFrame) -> pd.DataFrame: | |
| # Forward-fill: propagate last known value forward (e.g. market holidays) | |
| df = df.ffill() | |
| # Back-fill: handle any NaNs at the very start of the series | |
| df = df.bfill() | |
| # After both passes, if NaNs still exist, the column is structurally broken | |
| remaining = df.isnull().sum().sum() | |
| if remaining > 0: | |
| raise ValueError(f"Unfillable NaNs remain: {df.isnull().sum()}") | |
| return df | |
| # ββ Trend indicators βββββββββββββββββββββββββββββββββββββββββββ | |
| def add_sma(df: pd.DataFrame, windows: list[int] = [20, 50]) -> pd.DataFrame: | |
| for w in windows: | |
| df[f"sma_{w}"] = df["close"].rolling(window=w).mean() | |
| return df | |
| def add_ema(df: pd.DataFrame, spans: list[int] = [12, 26]) -> pd.DataFrame: | |
| for s in spans: | |
| df[f"ema_{s}"] = df["close"].ewm(span=s, adjust=False).mean() | |
| return df | |
| # ββ Momentum indicator βββββββββββββββββββββββββββββββββββββββββ | |
| def add_rsi(df: pd.DataFrame, period: int = 14) -> pd.DataFrame: | |
| delta = df["close"].diff() | |
| gain = delta.clip(lower=0) | |
| loss = -delta.clip(upper=0) | |
| avg_gain = gain.ewm(com=period - 1, min_periods=period).mean() | |
| avg_loss = loss.ewm(com=period - 1, min_periods=period).mean() | |
| rs = avg_gain / avg_loss | |
| df["rsi_14"] = 100 - (100 / (1 + rs)) | |
| return df | |
| # ββ Trend + momentum hybrid ββββββββββββββββββββββββββββββββββββ | |
| def add_macd( | |
| df: pd.DataFrame, fast: int = 12, slow: int = 26, signal: int = 9 | |
| ) -> pd.DataFrame: | |
| ema_fast = df["close"].ewm(span=fast, adjust=False).mean() | |
| ema_slow = df["close"].ewm(span=slow, adjust=False).mean() | |
| df["macd_line"] = ema_fast - ema_slow | |
| df["macd_signal"] = df["macd_line"].ewm(span=signal, adjust=False).mean() | |
| df["macd_hist"] = df["macd_line"] - df["macd_signal"] | |
| return df | |
| # ββ Volatility indicator βββββββββββββββββββββββββββββββββββββββ | |
| def add_bollinger_bands( | |
| df: pd.DataFrame, window: int = 20, num_std: float = 2.0 | |
| ) -> pd.DataFrame: | |
| sma = df["close"].rolling(window=window).mean() | |
| std = df["close"].rolling(window=window).std() | |
| df["bb_upper"] = sma + (num_std * std) | |
| df["bb_lower"] = sma - (num_std * std) | |
| df["bb_width"] = (df["bb_upper"] - df["bb_lower"]) / sma # normalized width | |
| return df | |
| # ββ Volume indicator βββββββββββββββββββββββββββββββββββββββββββ | |
| def add_obv(df: pd.DataFrame) -> pd.DataFrame: | |
| """On-Balance Volume: cumulative volume flow driven by price direction.""" | |
| direction = np.sign(df["close"].diff()).fillna(0) | |
| df["obv"] = (direction * df["volume"]).cumsum() | |
| return df | |
| # ββ Return features ββββββββββββββββββββββββββββββββββββββββββββ | |
| def add_returns(df: pd.DataFrame) -> pd.DataFrame: | |
| df["daily_return"] = df["close"].pct_change() | |
| df["log_return"] = np.log(df["close"] / df["close"].shift(1)) | |
| return df | |
| # ββ Trend strength indicator βββββββββββββββββββββββββββββββββββ | |
| def add_adx(df: pd.DataFrame, period: int = 14) -> pd.DataFrame: | |
| """Average Directional Index: measures trend strength (0β1 after normalisation). | |
| ADX > 0.25 (raw > 25) indicates a strong trend regardless of direction.""" | |
| high = df["high"].values | |
| low = df["low"].values | |
| close = df["close"].values | |
| tr = np.maximum( | |
| high[1:] - low[1:], | |
| np.maximum(np.abs(high[1:] - close[:-1]), np.abs(low[1:] - close[:-1])), | |
| ) | |
| dmp = np.where( | |
| high[1:] - high[:-1] > low[:-1] - low[1:], | |
| np.maximum(high[1:] - high[:-1], 0), | |
| 0, | |
| ) | |
| dmm = np.where( | |
| low[:-1] - low[1:] > high[1:] - high[:-1], np.maximum(low[:-1] - low[1:], 0), 0 | |
| ) | |
| atr = pd.Series(tr).ewm(span=period, adjust=False).mean().values | |
| pdmi = ( | |
| pd.Series(dmp / np.where(atr > 0, atr, 1e-8)) | |
| .ewm(span=period, adjust=False) | |
| .mean() | |
| .values | |
| ) | |
| mdmi = ( | |
| pd.Series(dmm / np.where(atr > 0, atr, 1e-8)) | |
| .ewm(span=period, adjust=False) | |
| .mean() | |
| .values | |
| ) | |
| denom = np.where(pdmi + mdmi > 0, pdmi + mdmi, 1e-8) | |
| dx = 100 * np.abs(pdmi - mdmi) / denom | |
| adx = ( | |
| pd.Series(np.concatenate([[np.nan], dx])).ewm(span=period, adjust=False).mean() | |
| ) | |
| df["adx_14"] = (adx.values / 100).clip(0.0, 1.0) # normalise to [0, 1] | |
| return df | |
| # ββ Long-term momentum βββββββββββββββββββββββββββββββββββββββββ | |
| def add_long_momentum(df: pd.DataFrame) -> pd.DataFrame: | |
| """SMA-100 / SMA-200 ratio: > 1 means medium-term trend is above long-term (bullish). | |
| Clipped to [0.8, 1.2] and then rescaled to [0, 1] so the scaler sees a bounded feature.""" | |
| sma100 = df["close"].rolling(100).mean() | |
| sma200 = df["close"].rolling(200).mean() | |
| ratio = (sma100 / sma200).clip(0.8, 1.2) | |
| # Rescale [0.8, 1.2] β [0, 1] | |
| df["sma_ratio"] = (ratio - 0.8) / 0.4 | |
| return df | |
| # ββ Rate of Change βββββββββββββββββββββββββββββββββββββββββββββ | |
| def add_roc(df: pd.DataFrame, period: int = 20) -> pd.DataFrame: | |
| """20-day Rate of Change: signed momentum over a medium-term horizon.""" | |
| df["roc_20"] = df["close"].pct_change(periods=period) | |
| return df | |
| # ββ Realised volatility ββββββββββββββββββββββββββββββββββββββββ | |
| def add_realised_vol(df: pd.DataFrame, window: int = 20) -> pd.DataFrame: | |
| """ | |
| 20-day annualised realised volatility from log-returns. | |
| Normalised to [0, 1] assuming a max equity annualised vol of 80% | |
| so the MinMaxScaler sees a bounded, stable range. | |
| This pre-computed column keeps the env's regime observation | |
| consistent with what the scaler fitted during training. | |
| """ | |
| log_ret = np.log(df["close"] / df["close"].shift(1)) | |
| rv_daily = log_ret.rolling(window).std() | |
| rv_annual = rv_daily * np.sqrt(252) | |
| df["realised_vol_20"] = (rv_annual / 0.80).clip(0.0, 1.0) | |
| return df | |
| def drop_warmup_rows(df: pd.DataFrame) -> pd.DataFrame: | |
| before = len(df) | |
| df.dropna(inplace=True) | |
| after = len(df) | |
| print(f" Dropped {before - after} warmup rows (largest window = SMA-200)") | |
| return df | |
| def split_and_save(df: pd.DataFrame, ticker: str) -> dict: | |
| n = len(df) | |
| t_end = int(n * TRAIN_RATIO) | |
| v_end = int(n * (TRAIN_RATIO + VAL_RATIO)) | |
| train = df.iloc[:t_end].copy() | |
| val = df.iloc[t_end:v_end].copy() | |
| test = df.iloc[v_end:].copy() | |
| print( | |
| f" Train : {len(train)} rows | {train.index[0].date()} β {train.index[-1].date()}" | |
| ) | |
| print( | |
| f" Val : {len(val)} rows | {val.index[0].date()} β {val.index[-1].date()}" | |
| ) | |
| print( | |
| f" Test : {len(test)} rows | {test.index[0].date()} β {test.index[-1].date()}" | |
| ) | |
| for split_name, split_df in [("train", train), ("val", val), ("test", test)]: | |
| path = os.path.join(PROCESSED_DIR, f"{ticker}_{split_name}.csv") | |
| split_df.to_csv(path) | |
| return {"train": train, "val": val, "test": test} | |
| def process_ticker(ticker: str) -> dict: | |
| df = load_and_validate(ticker) | |
| df = handle_missing(df) | |
| # Engineer all features | |
| df = add_sma(df) | |
| df = add_ema(df) | |
| df = add_rsi(df) | |
| df = add_macd(df) | |
| df = add_bollinger_bands(df) | |
| df = add_obv(df) | |
| df = add_returns(df) | |
| df = add_adx(df) | |
| df = add_long_momentum(df) | |
| df = add_roc(df) | |
| df = add_realised_vol(df) | |
| df = drop_warmup_rows(df) | |
| # Split first, then fit scaler ONLY on train to prevent leakage | |
| n = len(df) | |
| t_end = int(n * TRAIN_RATIO) | |
| v_end = int(n * (TRAIN_RATIO + VAL_RATIO)) | |
| train_raw = df.iloc[:t_end].copy() | |
| val_raw = df.iloc[t_end:v_end].copy() | |
| test_raw = df.iloc[v_end:].copy() | |
| feature_cols = list(df.columns) | |
| scaler = MinMaxScaler(feature_range=(0, 1)) | |
| scaler.fit(train_raw[feature_cols]) | |
| for split_df in [train_raw, val_raw, test_raw]: | |
| split_df[feature_cols] = scaler.transform(split_df[feature_cols]) | |
| scaler_path = os.path.join(PROCESSED_DIR, f"{ticker}_scaler.pkl") | |
| joblib.dump(scaler, scaler_path) | |
| splits = {"train": train_raw, "val": val_raw, "test": test_raw} | |
| for name, sdf in splits.items(): | |
| sdf.to_csv(os.path.join(PROCESSED_DIR, f"{ticker}_{name}.csv")) | |
| print(f" Features : {list(df.columns)}") | |
| print(f" Train rows : {len(train_raw)}") | |
| print(f" Val rows : {len(val_raw)}") | |
| print(f" Test rows : {len(test_raw)}") | |
| return splits | |
| if __name__ == "__main__": | |
| all_splits = {} | |
| for ticker in TICKERS: | |
| all_splits[ticker] = process_ticker(ticker) | |
| print("\nAll tickers processed.") | |