Spaces:
Sleeping
Sleeping
File size: 3,940 Bytes
41615f4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | """
model_utils.py — LSTM model utilities for inference with MC Dropout
Provides:
- load_model_and_scalers()
- prepare_input_sequence()
- predict_with_mc_dropout()
"""
import os
import numpy as np
import torch
import joblib
import pandas as pd
from retrain import LSTMModel, enable_mc_dropout, DEVICE
# ---------------------------------------------------------------------
# 📥 LOAD MODEL + SCALERS
# ---------------------------------------------------------------------
def load_model_and_scalers(ticker_name, horizon_name):
"""
Load trained LSTM model and associated scalers for a given ticker/horizon.
"""
base_dir = os.path.dirname(os.path.dirname(__file__))
model_dir = os.path.join(base_dir, "models", ticker_name)
model_path = os.path.join(model_dir, f"{ticker_name}_{horizon_name}_model.pth")
xscaler_path = os.path.join(model_dir, f"{ticker_name}_{horizon_name}_scaler.pkl")
yscaler_path = os.path.join(model_dir, f"{ticker_name}_{horizon_name}_y_scaler.pkl")
if not all(os.path.exists(p) for p in [model_path, xscaler_path, yscaler_path]):
raise FileNotFoundError(f"❌ Missing model/scaler files for {ticker_name} ({horizon_name})")
# Load scalers
x_scaler = joblib.load(xscaler_path)
y_scaler = joblib.load(yscaler_path)
# Determine input size dynamically
sample_input_size = len(x_scaler.mean_)
model = LSTMModel(input_size=sample_input_size,
hidden_size=256, num_layers=3, dropout=0.2).to(DEVICE)
model.load_state_dict(torch.load(model_path, map_location=DEVICE, weights_only=True))
model.eval()
return model, x_scaler, y_scaler
# ---------------------------------------------------------------------
# 📊 PREPARE INPUT SEQUENCE
# ---------------------------------------------------------------------
def prepare_input_sequence(df, x_scaler, seq_len=90):
"""
Prepares the most recent sequence for inference.
Args:
df: DataFrame containing at least columns ["Open","High","Low","Close","Volume"]
x_scaler: fitted StandardScaler
seq_len: number of timesteps expected by model
Returns:
Tensor of shape [1, seq_len, features]
"""
df = df.dropna(subset=["Open","High","Low","Close","Volume"])
feats = df[["Open","High","Low","Close","Volume"]].values[-seq_len:]
X_scaled = x_scaler.transform(feats)
X_tensor = torch.tensor(X_scaled, dtype=torch.float32).unsqueeze(0).to(DEVICE)
return X_tensor
# ---------------------------------------------------------------------
# 🎲 MONTE CARLO DROPOUT INFERENCE
# ---------------------------------------------------------------------
def predict_with_mc_dropout(model, X_tensor, y_scaler, n_samples=100):
"""
Perform MC Dropout predictions to estimate uncertainty.
Returns:
{
"predicted_price": float,
"lower_bound": float,
"upper_bound": float,
"confidence_percent": float,
"all_preds": np.ndarray
}
"""
model = enable_mc_dropout(model)
preds = []
with torch.no_grad():
for _ in range(n_samples):
preds.append(model(X_tensor).item())
preds = np.array(preds)
mean_pred = preds.mean()
std_pred = preds.std()
# Inverse scale to get price prediction
mean_price = y_scaler.inverse_transform([[mean_pred]])[0][0]
std_price = y_scaler.inverse_transform([[std_pred]])[0][0] - y_scaler.inverse_transform([[0]])[0][0]
lower = mean_price - 2 * std_price
upper = mean_price + 2 * std_price
confidence_range = (upper - lower) / mean_price
confidence_percent = max(0.0, 100 * (1 - confidence_range))
return {
"predicted_price": float(mean_price),
"lower_bound": float(lower),
"upper_bound": float(upper),
"confidence_percent": round(confidence_percent, 2),
"all_preds": preds
}
|