jtassos2025's picture
Initial Commit
41615f4
"""
test_script.py — Run Monte Carlo Dropout inference on trained LSTM models (CPU-only).
Supports:
• Per-ticker models (default)
• Unified model (--unified flag)
Usage:
python test_script.py
python test_script.py --unified
"""
import os
import argparse
import joblib
import numpy as np
import pandas as pd
import torch
import json
from retrain import LSTMModel
# ---- CONFIG ----
TICKERS = {
"TSLA": "TSLA",
# "NVDA": "NVDA",
# "SPY": "SPY",
}
HORIZON_CONFIGS = {
"1d": {"days": 1},
"1w": {"days": 5},
"4w": {"days": 28},
"6m": {"days": 180},
"1y": {"days": 365},
}
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
DATA_PATH = os.path.join(BASE_DIR, "data")
MODELS_DIR = os.path.join(BASE_DIR, "models")
DEVICE = "cpu" # CPU only
# ---------------------------------------------------------------------
# MC DROPOUT
# ---------------------------------------------------------------------
def enable_mc_dropout(model):
"""Force all dropout layers to remain active during inference."""
for m in model.modules():
if isinstance(m, torch.nn.Dropout):
m.train()
return model
def mc_dropout_predict(model, X_tensor, y_scaler, n_samples=200):
"""Perform Monte Carlo Dropout forward passes."""
model = enable_mc_dropout(model)
preds = []
with torch.no_grad():
for _ in range(n_samples):
preds.append(model(X_tensor).item())
preds = np.array(preds)
mean_pred = preds.mean()
std_pred = preds.std()
# Convert back to original scale
mean_pred_orig = y_scaler.inverse_transform([[mean_pred]])[0, 0]
std_pred_orig = (
y_scaler.inverse_transform([[mean_pred + std_pred]])[0, 0] - mean_pred_orig
)
plus_minus_percent = (
(std_pred_orig / mean_pred_orig) * 100 if mean_pred_orig != 0 else 0
)
lower_bound = mean_pred_orig - std_pred_orig
upper_bound = mean_pred_orig + std_pred_orig
return {
"predicted_price": mean_pred_orig,
"plus_minus_percent": plus_minus_percent,
"confidence_percent": 95.0,
"lower_bound": lower_bound,
"upper_bound": upper_bound,
}
# ---------------------------------------------------------------------
# DATA PREPARATION
# ---------------------------------------------------------------------
def prepare_input_sequence(df, x_scaler, seq_len=90):
features = df[["Open", "High", "Low", "Close", "Volume"]].values
X_scaled = x_scaler.transform(features)
X_seq = X_scaled[-seq_len:]
X_tensor = torch.tensor(X_seq, dtype=torch.float32).unsqueeze(0).to(DEVICE)
return X_tensor
def prepare_unified_input_sequence(df, x_scaler, seq_len, ticker_idx, num_tickers):
features = df[["Open", "High", "Low", "Close", "Volume"]].values
X_scaled = x_scaler.transform(features)
onehot = np.eye(num_tickers)[ticker_idx]
onehot_seq = np.repeat(onehot.reshape(1, -1), seq_len, axis=0)
X_full = np.hstack([X_scaled[-seq_len:], onehot_seq])
X_tensor = torch.tensor(X_full, dtype=torch.float32).unsqueeze(0).to(DEVICE)
return X_tensor
# ---------------------------------------------------------------------
# LOAD MODELS
# ---------------------------------------------------------------------
def load_model_and_scalers(ticker, horizon_name):
out_dir = os.path.join(MODELS_DIR, ticker)
model_path = os.path.join(out_dir, f"{ticker}_{horizon_name}_model.pth")
x_scaler_path = os.path.join(out_dir, f"{ticker}_{horizon_name}_scaler.pkl")
y_scaler_path = os.path.join(out_dir, f"{ticker}_{horizon_name}_y_scaler.pkl")
config_path = os.path.join(out_dir, f"{ticker}_{horizon_name}_config.json")
if not all(os.path.exists(p) for p in [model_path, x_scaler_path, y_scaler_path]):
raise FileNotFoundError(f"❌ Missing model/scalers for {ticker} ({horizon_name})")
x_scaler = joblib.load(x_scaler_path)
y_scaler = joblib.load(y_scaler_path)
# 🔹 Load model hyperparameters if available
if os.path.exists(config_path):
with open(config_path, "r") as f:
cfg = json.load(f)
input_size = cfg.get("input_size", len(x_scaler.mean_))
hidden_size = cfg.get("hidden_size", 128)
num_layers = cfg.get("num_layers", 2)
dropout = cfg.get("dropout", 0.2)
seq_len = cfg.get("seq_len", 90)
else:
print(f"⚠️ Missing configuration file for {ticker} ({horizon_name}): {config_path}")
print(" Using fallback defaults: input_size=?, hidden_size=128, num_layers=2, dropout=0.2, seq_len=90\n")
input_size = len(x_scaler.mean_)
hidden_size, num_layers, dropout, seq_len = 128, 2, 0.2, 90
model = LSTMModel(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
dropout=dropout,
)
try:
model.load_state_dict(torch.load(model_path, map_location=DEVICE, weights_only=True))
except Exception as e:
print(f"⚠️ Skipping {ticker} ({horizon_name}) — model incompatible with config: {e}\n")
return None
model.to(DEVICE)
return model, x_scaler, y_scaler, seq_len
def load_unified_model_and_scalers(horizon_name):
udir = os.path.join(MODELS_DIR, "unified")
model_path = os.path.join(udir, f"unified_{horizon_name}_model.pth")
x_scaler_path = os.path.join(udir, f"unified_{horizon_name}_scaler.pkl")
y_scaler_path = os.path.join(udir, f"unified_{horizon_name}_y_scaler.pkl")
map_path = os.path.join(udir, "unified_tickers.pkl")
config_path = os.path.join(udir, f"unified_{horizon_name}_config.json")
if not all(os.path.exists(p) for p in [model_path, x_scaler_path, y_scaler_path, map_path]):
raise FileNotFoundError(f"❌ Missing unified model/scalers for {horizon_name}")
x_scaler = joblib.load(x_scaler_path)
y_scaler = joblib.load(y_scaler_path)
ticker_map = joblib.load(map_path)
# 🔹 Load hyperparameters if available
if os.path.exists(config_path):
with open(config_path, "r") as f:
cfg = json.load(f)
input_size = cfg.get("input_size", len(x_scaler.mean_) + len(ticker_map))
hidden_size = cfg.get("hidden_size", 128)
num_layers = cfg.get("num_layers", 2)
dropout = cfg.get("dropout", 0.2)
seq_len = cfg.get("seq_len", 90)
else:
print(f"⚠️ Missing configuration file for unified model ({horizon_name}): {config_path}")
print(" Using fallback defaults: input_size=?, hidden_size=128, num_layers=2, dropout=0.2, seq_len=90\n")
input_size = len(x_scaler.mean_) + len(ticker_map)
hidden_size, num_layers, dropout, seq_len = 128, 2, 0.2, 90
model = LSTMModel(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
dropout=dropout,
)
model.load_state_dict(torch.load(model_path, map_location=DEVICE, weights_only=True))
model.to(DEVICE)
return model, x_scaler, y_scaler, ticker_map, seq_len
# ---------------------------------------------------------------------
# RUN MODES
# ---------------------------------------------------------------------
def run_per_ticker_mode():
print("\n📈 Running MC Dropout predictions (per-ticker mode)...\n")
for ticker, symbol in TICKERS.items():
print(f"=== {symbol} ({ticker}) ===")
csv_path = os.path.join(DATA_PATH, f"{ticker}.csv")
if not os.path.exists(csv_path):
print(f"⚠️ Missing data file: {csv_path}")
continue
df = pd.read_csv(csv_path)
for horizon_name in HORIZON_CONFIGS.keys():
try:
model, x_scaler, y_scaler, seq_len = load_model_and_scalers(ticker, horizon_name)
X = prepare_input_sequence(df, x_scaler, seq_len=seq_len)
result = mc_dropout_predict(model, X, y_scaler)
print(
f" [{horizon_name}] Predicted Close: ${result['predicted_price']:.2f} "
f"plus or minus {result['plus_minus_percent']:.2f}% "
f"({result['confidence_percent']:.1f}% confidence, "
f"range: ${result['lower_bound']:.2f} - ${result['upper_bound']:.2f})"
)
except Exception as e:
print(f" ⚠️ Error for {ticker} ({horizon_name}): {e}")
print("")
print("✅ Done!\n")
def run_unified_mode():
print("\n🤝 Running MC Dropout predictions (unified model)...\n")
for horizon_name in HORIZON_CONFIGS.keys():
try:
model, x_scaler, y_scaler, ticker_map, seq_len = load_unified_model_and_scalers(horizon_name)
except Exception as e:
print(e)
continue
num_tickers = len(ticker_map)
for ticker, symbol in TICKERS.items():
if ticker not in ticker_map:
print(f"⚠️ {ticker} not found in unified mapping; skipping.")
continue
csv_path = os.path.join(DATA_PATH, f"{ticker}.csv")
if not os.path.exists(csv_path):
print(f"⚠️ Missing data for {ticker}.")
continue
df = pd.read_csv(csv_path)
X = prepare_unified_input_sequence(
df, x_scaler, seq_len=seq_len, ticker_idx=ticker_map[ticker], num_tickers=num_tickers
)
result = mc_dropout_predict(model, X, y_scaler)
print(
f" [{ticker} - {horizon_name}] Predicted Close: ${result['predicted_price']:.2f} "
f"plus or minus {result['plus_minus_percent']:.2f}% "
f"({result['confidence_percent']:.1f}% confidence, "
f"range: ${result['lower_bound']:.2f} - ${result['upper_bound']:.2f})"
)
print("")
print("✅ Done!\n")
# ---------------------------------------------------------------------
# MAIN
# ---------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run MC Dropout inference on trained LSTM models.")
parser.add_argument("--unified", action="store_true", help="Use unified model instead of per-ticker models")
args = parser.parse_args()
if args.unified:
run_unified_mode()
else:
run_per_ticker_mode()