Spaces:
Sleeping
Sleeping
File size: 2,836 Bytes
514c4c0 b0d8e39 514c4c0 457b70c 514c4c0 457b70c 514c4c0 09fbd2c 514c4c0 09fbd2c 514c4c0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
import os, json, pickle
from datetime import datetime, timedelta
import numpy as np
import yfinance as yf
import torch
from models import StockLSTM
os.environ["CUDA_VISIBLE_DEVICES"] = ""
ARTIFACTS_DIR = "artifacts"
device = torch.device("cpu")
def _paths(symbol: str):
base = os.path.join(ARTIFACTS_DIR, symbol.upper())
return {
"model": os.path.join(base, "model.pt"),
"scaler": os.path.join(base, "scaler.pkl"),
"meta": os.path.join(base, "meta.json"),
}
def _load_artifacts(symbol: str):
p = _paths(symbol)
if not (os.path.exists(p["model"]) and os.path.exists(p["scaler"])):
raise FileNotFoundError(f"Model/scaler not found for {symbol}. Train first.")
with open(p["meta"], "r") as f:
meta = json.load(f)
with open(p["scaler"], "rb") as f:
scaler = pickle.load(f)
model = StockLSTM(input_dim=1, hidden_dim=64, num_layers=2, dropout=0.2)
model = model.to(device)
model.load_state_dict(torch.load(p["model"], map_location="cpu"))
model.eval()
return model, scaler, meta
def _last_close_series(symbol: str, days: int = 400):
end = datetime.utcnow().date()
start = end - timedelta(days=days)
df = yf.download(symbol, start=start.isoformat(), end=end.isoformat(), progress=False, auto_adjust=True)
if df.empty:
raise ValueError(f"No data for {symbol}")
return df["Close"].values.reshape(-1, 1)
@torch.no_grad()
def predict_next(symbol: str, n_days: int = 1):
model, scaler, meta = _load_artifacts(symbol)
seq_len = meta["seq_len"]
closes = _last_close_series(symbol, days=max(400, seq_len*5 + 20))
# Compute log returns on the fly
# closes is [N, 1]
prices = closes.flatten()
returns = np.log(prices[1:] / prices[:-1]).reshape(-1, 1)
scaled = scaler.transform(returns)
# seed window
window = scaled[-seq_len:].reshape(1, seq_len, 1).astype(np.float32)
window_t = torch.from_numpy(window)
preds_scaled = []
for _ in range(n_days):
yhat = model(window_t).numpy() # [1,1] in scaled space
preds_scaled.append(yhat[0, 0])
# roll
next_window = np.concatenate([window[:, 1:, :], yhat.reshape(1, 1, 1)], axis=1)
window = next_window
window_t = torch.from_numpy(window.astype(np.float32))
preds_scaled = np.array(preds_scaled, dtype=np.float32).reshape(-1, 1)
preds_returns = scaler.inverse_transform(preds_scaled).flatten()
# Reconstruct prices from the last known close
last_close = prices[-1]
curr = last_close
preds_unscaled = []
for r in preds_returns:
curr = curr * np.exp(r)
preds_unscaled.append(curr)
return {"symbol": symbol.upper(), "days": n_days, "predictions": preds_unscaled, "seq_len": seq_len, "meta": meta}
|