Spaces:
Sleeping
Sleeping
File size: 10,374 Bytes
41615f4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 |
"""
test_script.py — Run Monte Carlo Dropout inference on trained LSTM models (CPU-only).
Supports:
• Per-ticker models (default)
• Unified model (--unified flag)
Usage:
python test_script.py
python test_script.py --unified
"""
import os
import argparse
import joblib
import numpy as np
import pandas as pd
import torch
import json
from retrain import LSTMModel
# ---- CONFIG ----
TICKERS = {
"TSLA": "TSLA",
# "NVDA": "NVDA",
# "SPY": "SPY",
}
HORIZON_CONFIGS = {
"1d": {"days": 1},
"1w": {"days": 5},
"4w": {"days": 28},
"6m": {"days": 180},
"1y": {"days": 365},
}
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
DATA_PATH = os.path.join(BASE_DIR, "data")
MODELS_DIR = os.path.join(BASE_DIR, "models")
DEVICE = "cpu" # CPU only
# ---------------------------------------------------------------------
# MC DROPOUT
# ---------------------------------------------------------------------
def enable_mc_dropout(model):
"""Force all dropout layers to remain active during inference."""
for m in model.modules():
if isinstance(m, torch.nn.Dropout):
m.train()
return model
def mc_dropout_predict(model, X_tensor, y_scaler, n_samples=200):
"""Perform Monte Carlo Dropout forward passes."""
model = enable_mc_dropout(model)
preds = []
with torch.no_grad():
for _ in range(n_samples):
preds.append(model(X_tensor).item())
preds = np.array(preds)
mean_pred = preds.mean()
std_pred = preds.std()
# Convert back to original scale
mean_pred_orig = y_scaler.inverse_transform([[mean_pred]])[0, 0]
std_pred_orig = (
y_scaler.inverse_transform([[mean_pred + std_pred]])[0, 0] - mean_pred_orig
)
plus_minus_percent = (
(std_pred_orig / mean_pred_orig) * 100 if mean_pred_orig != 0 else 0
)
lower_bound = mean_pred_orig - std_pred_orig
upper_bound = mean_pred_orig + std_pred_orig
return {
"predicted_price": mean_pred_orig,
"plus_minus_percent": plus_minus_percent,
"confidence_percent": 95.0,
"lower_bound": lower_bound,
"upper_bound": upper_bound,
}
# ---------------------------------------------------------------------
# DATA PREPARATION
# ---------------------------------------------------------------------
def prepare_input_sequence(df, x_scaler, seq_len=90):
features = df[["Open", "High", "Low", "Close", "Volume"]].values
X_scaled = x_scaler.transform(features)
X_seq = X_scaled[-seq_len:]
X_tensor = torch.tensor(X_seq, dtype=torch.float32).unsqueeze(0).to(DEVICE)
return X_tensor
def prepare_unified_input_sequence(df, x_scaler, seq_len, ticker_idx, num_tickers):
features = df[["Open", "High", "Low", "Close", "Volume"]].values
X_scaled = x_scaler.transform(features)
onehot = np.eye(num_tickers)[ticker_idx]
onehot_seq = np.repeat(onehot.reshape(1, -1), seq_len, axis=0)
X_full = np.hstack([X_scaled[-seq_len:], onehot_seq])
X_tensor = torch.tensor(X_full, dtype=torch.float32).unsqueeze(0).to(DEVICE)
return X_tensor
# ---------------------------------------------------------------------
# LOAD MODELS
# ---------------------------------------------------------------------
def load_model_and_scalers(ticker, horizon_name):
out_dir = os.path.join(MODELS_DIR, ticker)
model_path = os.path.join(out_dir, f"{ticker}_{horizon_name}_model.pth")
x_scaler_path = os.path.join(out_dir, f"{ticker}_{horizon_name}_scaler.pkl")
y_scaler_path = os.path.join(out_dir, f"{ticker}_{horizon_name}_y_scaler.pkl")
config_path = os.path.join(out_dir, f"{ticker}_{horizon_name}_config.json")
if not all(os.path.exists(p) for p in [model_path, x_scaler_path, y_scaler_path]):
raise FileNotFoundError(f"❌ Missing model/scalers for {ticker} ({horizon_name})")
x_scaler = joblib.load(x_scaler_path)
y_scaler = joblib.load(y_scaler_path)
# 🔹 Load model hyperparameters if available
if os.path.exists(config_path):
with open(config_path, "r") as f:
cfg = json.load(f)
input_size = cfg.get("input_size", len(x_scaler.mean_))
hidden_size = cfg.get("hidden_size", 128)
num_layers = cfg.get("num_layers", 2)
dropout = cfg.get("dropout", 0.2)
seq_len = cfg.get("seq_len", 90)
else:
print(f"⚠️ Missing configuration file for {ticker} ({horizon_name}): {config_path}")
print(" Using fallback defaults: input_size=?, hidden_size=128, num_layers=2, dropout=0.2, seq_len=90\n")
input_size = len(x_scaler.mean_)
hidden_size, num_layers, dropout, seq_len = 128, 2, 0.2, 90
model = LSTMModel(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
dropout=dropout,
)
try:
model.load_state_dict(torch.load(model_path, map_location=DEVICE, weights_only=True))
except Exception as e:
print(f"⚠️ Skipping {ticker} ({horizon_name}) — model incompatible with config: {e}\n")
return None
model.to(DEVICE)
return model, x_scaler, y_scaler, seq_len
def load_unified_model_and_scalers(horizon_name):
udir = os.path.join(MODELS_DIR, "unified")
model_path = os.path.join(udir, f"unified_{horizon_name}_model.pth")
x_scaler_path = os.path.join(udir, f"unified_{horizon_name}_scaler.pkl")
y_scaler_path = os.path.join(udir, f"unified_{horizon_name}_y_scaler.pkl")
map_path = os.path.join(udir, "unified_tickers.pkl")
config_path = os.path.join(udir, f"unified_{horizon_name}_config.json")
if not all(os.path.exists(p) for p in [model_path, x_scaler_path, y_scaler_path, map_path]):
raise FileNotFoundError(f"❌ Missing unified model/scalers for {horizon_name}")
x_scaler = joblib.load(x_scaler_path)
y_scaler = joblib.load(y_scaler_path)
ticker_map = joblib.load(map_path)
# 🔹 Load hyperparameters if available
if os.path.exists(config_path):
with open(config_path, "r") as f:
cfg = json.load(f)
input_size = cfg.get("input_size", len(x_scaler.mean_) + len(ticker_map))
hidden_size = cfg.get("hidden_size", 128)
num_layers = cfg.get("num_layers", 2)
dropout = cfg.get("dropout", 0.2)
seq_len = cfg.get("seq_len", 90)
else:
print(f"⚠️ Missing configuration file for unified model ({horizon_name}): {config_path}")
print(" Using fallback defaults: input_size=?, hidden_size=128, num_layers=2, dropout=0.2, seq_len=90\n")
input_size = len(x_scaler.mean_) + len(ticker_map)
hidden_size, num_layers, dropout, seq_len = 128, 2, 0.2, 90
model = LSTMModel(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
dropout=dropout,
)
model.load_state_dict(torch.load(model_path, map_location=DEVICE, weights_only=True))
model.to(DEVICE)
return model, x_scaler, y_scaler, ticker_map, seq_len
# ---------------------------------------------------------------------
# RUN MODES
# ---------------------------------------------------------------------
def run_per_ticker_mode():
print("\n📈 Running MC Dropout predictions (per-ticker mode)...\n")
for ticker, symbol in TICKERS.items():
print(f"=== {symbol} ({ticker}) ===")
csv_path = os.path.join(DATA_PATH, f"{ticker}.csv")
if not os.path.exists(csv_path):
print(f"⚠️ Missing data file: {csv_path}")
continue
df = pd.read_csv(csv_path)
for horizon_name in HORIZON_CONFIGS.keys():
try:
model, x_scaler, y_scaler, seq_len = load_model_and_scalers(ticker, horizon_name)
X = prepare_input_sequence(df, x_scaler, seq_len=seq_len)
result = mc_dropout_predict(model, X, y_scaler)
print(
f" [{horizon_name}] Predicted Close: ${result['predicted_price']:.2f} "
f"plus or minus {result['plus_minus_percent']:.2f}% "
f"({result['confidence_percent']:.1f}% confidence, "
f"range: ${result['lower_bound']:.2f} - ${result['upper_bound']:.2f})"
)
except Exception as e:
print(f" ⚠️ Error for {ticker} ({horizon_name}): {e}")
print("")
print("✅ Done!\n")
def run_unified_mode():
print("\n🤝 Running MC Dropout predictions (unified model)...\n")
for horizon_name in HORIZON_CONFIGS.keys():
try:
model, x_scaler, y_scaler, ticker_map, seq_len = load_unified_model_and_scalers(horizon_name)
except Exception as e:
print(e)
continue
num_tickers = len(ticker_map)
for ticker, symbol in TICKERS.items():
if ticker not in ticker_map:
print(f"⚠️ {ticker} not found in unified mapping; skipping.")
continue
csv_path = os.path.join(DATA_PATH, f"{ticker}.csv")
if not os.path.exists(csv_path):
print(f"⚠️ Missing data for {ticker}.")
continue
df = pd.read_csv(csv_path)
X = prepare_unified_input_sequence(
df, x_scaler, seq_len=seq_len, ticker_idx=ticker_map[ticker], num_tickers=num_tickers
)
result = mc_dropout_predict(model, X, y_scaler)
print(
f" [{ticker} - {horizon_name}] Predicted Close: ${result['predicted_price']:.2f} "
f"plus or minus {result['plus_minus_percent']:.2f}% "
f"({result['confidence_percent']:.1f}% confidence, "
f"range: ${result['lower_bound']:.2f} - ${result['upper_bound']:.2f})"
)
print("")
print("✅ Done!\n")
# ---------------------------------------------------------------------
# MAIN
# ---------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run MC Dropout inference on trained LSTM models.")
parser.add_argument("--unified", action="store_true", help="Use unified model instead of per-ticker models")
args = parser.parse_args()
if args.unified:
run_unified_mode()
else:
run_per_ticker_mode()
|