""" test_script.py — Run Monte Carlo Dropout inference on trained LSTM models (CPU-only). Supports: • Per-ticker models (default) • Unified model (--unified flag) Usage: python test_script.py python test_script.py --unified """ import os import argparse import joblib import numpy as np import pandas as pd import torch import json from retrain import LSTMModel # ---- CONFIG ---- TICKERS = { "TSLA": "TSLA", # "NVDA": "NVDA", # "SPY": "SPY", } HORIZON_CONFIGS = { "1d": {"days": 1}, "1w": {"days": 5}, "4w": {"days": 28}, "6m": {"days": 180}, "1y": {"days": 365}, } BASE_DIR = os.path.dirname(os.path.dirname(__file__)) DATA_PATH = os.path.join(BASE_DIR, "data") MODELS_DIR = os.path.join(BASE_DIR, "models") DEVICE = "cpu" # CPU only # --------------------------------------------------------------------- # MC DROPOUT # --------------------------------------------------------------------- def enable_mc_dropout(model): """Force all dropout layers to remain active during inference.""" for m in model.modules(): if isinstance(m, torch.nn.Dropout): m.train() return model def mc_dropout_predict(model, X_tensor, y_scaler, n_samples=200): """Perform Monte Carlo Dropout forward passes.""" model = enable_mc_dropout(model) preds = [] with torch.no_grad(): for _ in range(n_samples): preds.append(model(X_tensor).item()) preds = np.array(preds) mean_pred = preds.mean() std_pred = preds.std() # Convert back to original scale mean_pred_orig = y_scaler.inverse_transform([[mean_pred]])[0, 0] std_pred_orig = ( y_scaler.inverse_transform([[mean_pred + std_pred]])[0, 0] - mean_pred_orig ) plus_minus_percent = ( (std_pred_orig / mean_pred_orig) * 100 if mean_pred_orig != 0 else 0 ) lower_bound = mean_pred_orig - std_pred_orig upper_bound = mean_pred_orig + std_pred_orig return { "predicted_price": mean_pred_orig, "plus_minus_percent": plus_minus_percent, "confidence_percent": 95.0, "lower_bound": lower_bound, "upper_bound": upper_bound, } # --------------------------------------------------------------------- # DATA PREPARATION # --------------------------------------------------------------------- def prepare_input_sequence(df, x_scaler, seq_len=90): features = df[["Open", "High", "Low", "Close", "Volume"]].values X_scaled = x_scaler.transform(features) X_seq = X_scaled[-seq_len:] X_tensor = torch.tensor(X_seq, dtype=torch.float32).unsqueeze(0).to(DEVICE) return X_tensor def prepare_unified_input_sequence(df, x_scaler, seq_len, ticker_idx, num_tickers): features = df[["Open", "High", "Low", "Close", "Volume"]].values X_scaled = x_scaler.transform(features) onehot = np.eye(num_tickers)[ticker_idx] onehot_seq = np.repeat(onehot.reshape(1, -1), seq_len, axis=0) X_full = np.hstack([X_scaled[-seq_len:], onehot_seq]) X_tensor = torch.tensor(X_full, dtype=torch.float32).unsqueeze(0).to(DEVICE) return X_tensor # --------------------------------------------------------------------- # LOAD MODELS # --------------------------------------------------------------------- def load_model_and_scalers(ticker, horizon_name): out_dir = os.path.join(MODELS_DIR, ticker) model_path = os.path.join(out_dir, f"{ticker}_{horizon_name}_model.pth") x_scaler_path = os.path.join(out_dir, f"{ticker}_{horizon_name}_scaler.pkl") y_scaler_path = os.path.join(out_dir, f"{ticker}_{horizon_name}_y_scaler.pkl") config_path = os.path.join(out_dir, f"{ticker}_{horizon_name}_config.json") if not all(os.path.exists(p) for p in [model_path, x_scaler_path, y_scaler_path]): raise FileNotFoundError(f"āŒ Missing model/scalers for {ticker} ({horizon_name})") x_scaler = joblib.load(x_scaler_path) y_scaler = joblib.load(y_scaler_path) # šŸ”¹ Load model hyperparameters if available if os.path.exists(config_path): with open(config_path, "r") as f: cfg = json.load(f) input_size = cfg.get("input_size", len(x_scaler.mean_)) hidden_size = cfg.get("hidden_size", 128) num_layers = cfg.get("num_layers", 2) dropout = cfg.get("dropout", 0.2) seq_len = cfg.get("seq_len", 90) else: print(f"āš ļø Missing configuration file for {ticker} ({horizon_name}): {config_path}") print(" Using fallback defaults: input_size=?, hidden_size=128, num_layers=2, dropout=0.2, seq_len=90\n") input_size = len(x_scaler.mean_) hidden_size, num_layers, dropout, seq_len = 128, 2, 0.2, 90 model = LSTMModel( input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, dropout=dropout, ) try: model.load_state_dict(torch.load(model_path, map_location=DEVICE, weights_only=True)) except Exception as e: print(f"āš ļø Skipping {ticker} ({horizon_name}) — model incompatible with config: {e}\n") return None model.to(DEVICE) return model, x_scaler, y_scaler, seq_len def load_unified_model_and_scalers(horizon_name): udir = os.path.join(MODELS_DIR, "unified") model_path = os.path.join(udir, f"unified_{horizon_name}_model.pth") x_scaler_path = os.path.join(udir, f"unified_{horizon_name}_scaler.pkl") y_scaler_path = os.path.join(udir, f"unified_{horizon_name}_y_scaler.pkl") map_path = os.path.join(udir, "unified_tickers.pkl") config_path = os.path.join(udir, f"unified_{horizon_name}_config.json") if not all(os.path.exists(p) for p in [model_path, x_scaler_path, y_scaler_path, map_path]): raise FileNotFoundError(f"āŒ Missing unified model/scalers for {horizon_name}") x_scaler = joblib.load(x_scaler_path) y_scaler = joblib.load(y_scaler_path) ticker_map = joblib.load(map_path) # šŸ”¹ Load hyperparameters if available if os.path.exists(config_path): with open(config_path, "r") as f: cfg = json.load(f) input_size = cfg.get("input_size", len(x_scaler.mean_) + len(ticker_map)) hidden_size = cfg.get("hidden_size", 128) num_layers = cfg.get("num_layers", 2) dropout = cfg.get("dropout", 0.2) seq_len = cfg.get("seq_len", 90) else: print(f"āš ļø Missing configuration file for unified model ({horizon_name}): {config_path}") print(" Using fallback defaults: input_size=?, hidden_size=128, num_layers=2, dropout=0.2, seq_len=90\n") input_size = len(x_scaler.mean_) + len(ticker_map) hidden_size, num_layers, dropout, seq_len = 128, 2, 0.2, 90 model = LSTMModel( input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, dropout=dropout, ) model.load_state_dict(torch.load(model_path, map_location=DEVICE, weights_only=True)) model.to(DEVICE) return model, x_scaler, y_scaler, ticker_map, seq_len # --------------------------------------------------------------------- # RUN MODES # --------------------------------------------------------------------- def run_per_ticker_mode(): print("\nšŸ“ˆ Running MC Dropout predictions (per-ticker mode)...\n") for ticker, symbol in TICKERS.items(): print(f"=== {symbol} ({ticker}) ===") csv_path = os.path.join(DATA_PATH, f"{ticker}.csv") if not os.path.exists(csv_path): print(f"āš ļø Missing data file: {csv_path}") continue df = pd.read_csv(csv_path) for horizon_name in HORIZON_CONFIGS.keys(): try: model, x_scaler, y_scaler, seq_len = load_model_and_scalers(ticker, horizon_name) X = prepare_input_sequence(df, x_scaler, seq_len=seq_len) result = mc_dropout_predict(model, X, y_scaler) print( f" [{horizon_name}] Predicted Close: ${result['predicted_price']:.2f} " f"plus or minus {result['plus_minus_percent']:.2f}% " f"({result['confidence_percent']:.1f}% confidence, " f"range: ${result['lower_bound']:.2f} - ${result['upper_bound']:.2f})" ) except Exception as e: print(f" āš ļø Error for {ticker} ({horizon_name}): {e}") print("") print("āœ… Done!\n") def run_unified_mode(): print("\nšŸ¤ Running MC Dropout predictions (unified model)...\n") for horizon_name in HORIZON_CONFIGS.keys(): try: model, x_scaler, y_scaler, ticker_map, seq_len = load_unified_model_and_scalers(horizon_name) except Exception as e: print(e) continue num_tickers = len(ticker_map) for ticker, symbol in TICKERS.items(): if ticker not in ticker_map: print(f"āš ļø {ticker} not found in unified mapping; skipping.") continue csv_path = os.path.join(DATA_PATH, f"{ticker}.csv") if not os.path.exists(csv_path): print(f"āš ļø Missing data for {ticker}.") continue df = pd.read_csv(csv_path) X = prepare_unified_input_sequence( df, x_scaler, seq_len=seq_len, ticker_idx=ticker_map[ticker], num_tickers=num_tickers ) result = mc_dropout_predict(model, X, y_scaler) print( f" [{ticker} - {horizon_name}] Predicted Close: ${result['predicted_price']:.2f} " f"plus or minus {result['plus_minus_percent']:.2f}% " f"({result['confidence_percent']:.1f}% confidence, " f"range: ${result['lower_bound']:.2f} - ${result['upper_bound']:.2f})" ) print("") print("āœ… Done!\n") # --------------------------------------------------------------------- # MAIN # --------------------------------------------------------------------- if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run MC Dropout inference on trained LSTM models.") parser.add_argument("--unified", action="store_true", help="Use unified model instead of per-ticker models") args = parser.parse_args() if args.unified: run_unified_mode() else: run_per_ticker_mode()