Spaces:
Sleeping
Sleeping
| """ | |
| test_script.py — Run Monte Carlo Dropout inference on trained LSTM models (CPU-only). | |
| Supports: | |
| • Per-ticker models (default) | |
| • Unified model (--unified flag) | |
| Usage: | |
| python test_script.py | |
| python test_script.py --unified | |
| """ | |
| import os | |
| import argparse | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| import json | |
| from retrain import LSTMModel | |
| # ---- CONFIG ---- | |
| TICKERS = { | |
| "TSLA": "TSLA", | |
| # "NVDA": "NVDA", | |
| # "SPY": "SPY", | |
| } | |
| HORIZON_CONFIGS = { | |
| "1d": {"days": 1}, | |
| "1w": {"days": 5}, | |
| "4w": {"days": 28}, | |
| "6m": {"days": 180}, | |
| "1y": {"days": 365}, | |
| } | |
| BASE_DIR = os.path.dirname(os.path.dirname(__file__)) | |
| DATA_PATH = os.path.join(BASE_DIR, "data") | |
| MODELS_DIR = os.path.join(BASE_DIR, "models") | |
| DEVICE = "cpu" # CPU only | |
| # --------------------------------------------------------------------- | |
| # MC DROPOUT | |
| # --------------------------------------------------------------------- | |
| def enable_mc_dropout(model): | |
| """Force all dropout layers to remain active during inference.""" | |
| for m in model.modules(): | |
| if isinstance(m, torch.nn.Dropout): | |
| m.train() | |
| return model | |
| def mc_dropout_predict(model, X_tensor, y_scaler, n_samples=200): | |
| """Perform Monte Carlo Dropout forward passes.""" | |
| model = enable_mc_dropout(model) | |
| preds = [] | |
| with torch.no_grad(): | |
| for _ in range(n_samples): | |
| preds.append(model(X_tensor).item()) | |
| preds = np.array(preds) | |
| mean_pred = preds.mean() | |
| std_pred = preds.std() | |
| # Convert back to original scale | |
| mean_pred_orig = y_scaler.inverse_transform([[mean_pred]])[0, 0] | |
| std_pred_orig = ( | |
| y_scaler.inverse_transform([[mean_pred + std_pred]])[0, 0] - mean_pred_orig | |
| ) | |
| plus_minus_percent = ( | |
| (std_pred_orig / mean_pred_orig) * 100 if mean_pred_orig != 0 else 0 | |
| ) | |
| lower_bound = mean_pred_orig - std_pred_orig | |
| upper_bound = mean_pred_orig + std_pred_orig | |
| return { | |
| "predicted_price": mean_pred_orig, | |
| "plus_minus_percent": plus_minus_percent, | |
| "confidence_percent": 95.0, | |
| "lower_bound": lower_bound, | |
| "upper_bound": upper_bound, | |
| } | |
| # --------------------------------------------------------------------- | |
| # DATA PREPARATION | |
| # --------------------------------------------------------------------- | |
| def prepare_input_sequence(df, x_scaler, seq_len=90): | |
| features = df[["Open", "High", "Low", "Close", "Volume"]].values | |
| X_scaled = x_scaler.transform(features) | |
| X_seq = X_scaled[-seq_len:] | |
| X_tensor = torch.tensor(X_seq, dtype=torch.float32).unsqueeze(0).to(DEVICE) | |
| return X_tensor | |
| def prepare_unified_input_sequence(df, x_scaler, seq_len, ticker_idx, num_tickers): | |
| features = df[["Open", "High", "Low", "Close", "Volume"]].values | |
| X_scaled = x_scaler.transform(features) | |
| onehot = np.eye(num_tickers)[ticker_idx] | |
| onehot_seq = np.repeat(onehot.reshape(1, -1), seq_len, axis=0) | |
| X_full = np.hstack([X_scaled[-seq_len:], onehot_seq]) | |
| X_tensor = torch.tensor(X_full, dtype=torch.float32).unsqueeze(0).to(DEVICE) | |
| return X_tensor | |
| # --------------------------------------------------------------------- | |
| # LOAD MODELS | |
| # --------------------------------------------------------------------- | |
| def load_model_and_scalers(ticker, horizon_name): | |
| out_dir = os.path.join(MODELS_DIR, ticker) | |
| model_path = os.path.join(out_dir, f"{ticker}_{horizon_name}_model.pth") | |
| x_scaler_path = os.path.join(out_dir, f"{ticker}_{horizon_name}_scaler.pkl") | |
| y_scaler_path = os.path.join(out_dir, f"{ticker}_{horizon_name}_y_scaler.pkl") | |
| config_path = os.path.join(out_dir, f"{ticker}_{horizon_name}_config.json") | |
| if not all(os.path.exists(p) for p in [model_path, x_scaler_path, y_scaler_path]): | |
| raise FileNotFoundError(f"❌ Missing model/scalers for {ticker} ({horizon_name})") | |
| x_scaler = joblib.load(x_scaler_path) | |
| y_scaler = joblib.load(y_scaler_path) | |
| # 🔹 Load model hyperparameters if available | |
| if os.path.exists(config_path): | |
| with open(config_path, "r") as f: | |
| cfg = json.load(f) | |
| input_size = cfg.get("input_size", len(x_scaler.mean_)) | |
| hidden_size = cfg.get("hidden_size", 128) | |
| num_layers = cfg.get("num_layers", 2) | |
| dropout = cfg.get("dropout", 0.2) | |
| seq_len = cfg.get("seq_len", 90) | |
| else: | |
| print(f"⚠️ Missing configuration file for {ticker} ({horizon_name}): {config_path}") | |
| print(" Using fallback defaults: input_size=?, hidden_size=128, num_layers=2, dropout=0.2, seq_len=90\n") | |
| input_size = len(x_scaler.mean_) | |
| hidden_size, num_layers, dropout, seq_len = 128, 2, 0.2, 90 | |
| model = LSTMModel( | |
| input_size=input_size, | |
| hidden_size=hidden_size, | |
| num_layers=num_layers, | |
| dropout=dropout, | |
| ) | |
| try: | |
| model.load_state_dict(torch.load(model_path, map_location=DEVICE, weights_only=True)) | |
| except Exception as e: | |
| print(f"⚠️ Skipping {ticker} ({horizon_name}) — model incompatible with config: {e}\n") | |
| return None | |
| model.to(DEVICE) | |
| return model, x_scaler, y_scaler, seq_len | |
| def load_unified_model_and_scalers(horizon_name): | |
| udir = os.path.join(MODELS_DIR, "unified") | |
| model_path = os.path.join(udir, f"unified_{horizon_name}_model.pth") | |
| x_scaler_path = os.path.join(udir, f"unified_{horizon_name}_scaler.pkl") | |
| y_scaler_path = os.path.join(udir, f"unified_{horizon_name}_y_scaler.pkl") | |
| map_path = os.path.join(udir, "unified_tickers.pkl") | |
| config_path = os.path.join(udir, f"unified_{horizon_name}_config.json") | |
| if not all(os.path.exists(p) for p in [model_path, x_scaler_path, y_scaler_path, map_path]): | |
| raise FileNotFoundError(f"❌ Missing unified model/scalers for {horizon_name}") | |
| x_scaler = joblib.load(x_scaler_path) | |
| y_scaler = joblib.load(y_scaler_path) | |
| ticker_map = joblib.load(map_path) | |
| # 🔹 Load hyperparameters if available | |
| if os.path.exists(config_path): | |
| with open(config_path, "r") as f: | |
| cfg = json.load(f) | |
| input_size = cfg.get("input_size", len(x_scaler.mean_) + len(ticker_map)) | |
| hidden_size = cfg.get("hidden_size", 128) | |
| num_layers = cfg.get("num_layers", 2) | |
| dropout = cfg.get("dropout", 0.2) | |
| seq_len = cfg.get("seq_len", 90) | |
| else: | |
| print(f"⚠️ Missing configuration file for unified model ({horizon_name}): {config_path}") | |
| print(" Using fallback defaults: input_size=?, hidden_size=128, num_layers=2, dropout=0.2, seq_len=90\n") | |
| input_size = len(x_scaler.mean_) + len(ticker_map) | |
| hidden_size, num_layers, dropout, seq_len = 128, 2, 0.2, 90 | |
| model = LSTMModel( | |
| input_size=input_size, | |
| hidden_size=hidden_size, | |
| num_layers=num_layers, | |
| dropout=dropout, | |
| ) | |
| model.load_state_dict(torch.load(model_path, map_location=DEVICE, weights_only=True)) | |
| model.to(DEVICE) | |
| return model, x_scaler, y_scaler, ticker_map, seq_len | |
| # --------------------------------------------------------------------- | |
| # RUN MODES | |
| # --------------------------------------------------------------------- | |
| def run_per_ticker_mode(): | |
| print("\n📈 Running MC Dropout predictions (per-ticker mode)...\n") | |
| for ticker, symbol in TICKERS.items(): | |
| print(f"=== {symbol} ({ticker}) ===") | |
| csv_path = os.path.join(DATA_PATH, f"{ticker}.csv") | |
| if not os.path.exists(csv_path): | |
| print(f"⚠️ Missing data file: {csv_path}") | |
| continue | |
| df = pd.read_csv(csv_path) | |
| for horizon_name in HORIZON_CONFIGS.keys(): | |
| try: | |
| model, x_scaler, y_scaler, seq_len = load_model_and_scalers(ticker, horizon_name) | |
| X = prepare_input_sequence(df, x_scaler, seq_len=seq_len) | |
| result = mc_dropout_predict(model, X, y_scaler) | |
| print( | |
| f" [{horizon_name}] Predicted Close: ${result['predicted_price']:.2f} " | |
| f"plus or minus {result['plus_minus_percent']:.2f}% " | |
| f"({result['confidence_percent']:.1f}% confidence, " | |
| f"range: ${result['lower_bound']:.2f} - ${result['upper_bound']:.2f})" | |
| ) | |
| except Exception as e: | |
| print(f" ⚠️ Error for {ticker} ({horizon_name}): {e}") | |
| print("") | |
| print("✅ Done!\n") | |
| def run_unified_mode(): | |
| print("\n🤝 Running MC Dropout predictions (unified model)...\n") | |
| for horizon_name in HORIZON_CONFIGS.keys(): | |
| try: | |
| model, x_scaler, y_scaler, ticker_map, seq_len = load_unified_model_and_scalers(horizon_name) | |
| except Exception as e: | |
| print(e) | |
| continue | |
| num_tickers = len(ticker_map) | |
| for ticker, symbol in TICKERS.items(): | |
| if ticker not in ticker_map: | |
| print(f"⚠️ {ticker} not found in unified mapping; skipping.") | |
| continue | |
| csv_path = os.path.join(DATA_PATH, f"{ticker}.csv") | |
| if not os.path.exists(csv_path): | |
| print(f"⚠️ Missing data for {ticker}.") | |
| continue | |
| df = pd.read_csv(csv_path) | |
| X = prepare_unified_input_sequence( | |
| df, x_scaler, seq_len=seq_len, ticker_idx=ticker_map[ticker], num_tickers=num_tickers | |
| ) | |
| result = mc_dropout_predict(model, X, y_scaler) | |
| print( | |
| f" [{ticker} - {horizon_name}] Predicted Close: ${result['predicted_price']:.2f} " | |
| f"plus or minus {result['plus_minus_percent']:.2f}% " | |
| f"({result['confidence_percent']:.1f}% confidence, " | |
| f"range: ${result['lower_bound']:.2f} - ${result['upper_bound']:.2f})" | |
| ) | |
| print("") | |
| print("✅ Done!\n") | |
| # --------------------------------------------------------------------- | |
| # MAIN | |
| # --------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="Run MC Dropout inference on trained LSTM models.") | |
| parser.add_argument("--unified", action="store_true", help="Use unified model instead of per-ticker models") | |
| args = parser.parse_args() | |
| if args.unified: | |
| run_unified_mode() | |
| else: | |
| run_per_ticker_mode() | |