Create_ONNX / app.py
eaglelandsonce's picture
Update app.py
6143392 verified
import os
import io
import math
import tempfile
from dataclasses import dataclass
from functools import lru_cache
from typing import List, Tuple
import gradio as gr
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import requests
import torch
import torch.nn as nn
import torch.nn.functional as F
import lightning.pytorch as pl
from torch.utils.data import DataLoader, TensorDataset
import onnx
from onnx import external_data_helper
import onnxruntime as ort
DISCLAIMER = """
**Disclaimer (Educational Use Only):**
This app produces *model signals* from historical price data for learning/demonstration.
It is **not** financial advice, not a recommendation, and not suitable for real trading decisions.
Markets are risky; consult a qualified professional for investment guidance.
"""
# -----------------------------
# Feature engineering
# -----------------------------
@dataclass
class FeatureSpec:
lookback_days: int = 730
sma_fast: int = 10
sma_slow: int = 20
rsi_period: int = 14
vol_window: int = 20
def _rsi(close: pd.Series, period: int = 14) -> pd.Series:
delta = close.diff()
gain = (delta.where(delta > 0, 0.0)).rolling(period).mean()
loss = (-delta.where(delta < 0, 0.0)).rolling(period).mean()
rs = gain / (loss + 1e-9)
return 100 - (100 / (1 + rs))
def _normalize_stooq_ticker(ticker: str) -> str:
t = ticker.strip().lower()
if not t:
return t
if "." not in t:
t = f"{t}.us"
return t
@lru_cache(maxsize=128)
def fetch_prices_stooq(ticker: str) -> pd.DataFrame:
sym = _normalize_stooq_ticker(ticker)
url = f"https://stooq.com/q/d/l/?s={sym}&i=d"
r = requests.get(url, timeout=25)
r.raise_for_status()
df = pd.read_csv(io.StringIO(r.text))
if df.empty or "Date" not in df.columns:
raise ValueError(f"No data returned for ticker '{ticker}' (stooq symbol '{sym}').")
df["Date"] = pd.to_datetime(df["Date"])
df = df.set_index("Date").sort_index()
needed = {"Open", "High", "Low", "Close"}
if not needed.issubset(set(df.columns)):
raise ValueError(f"Unexpected Stooq columns for '{ticker}': {list(df.columns)}")
for c in ["Open", "High", "Low", "Close", "Volume"]:
if c in df.columns:
df[c] = pd.to_numeric(df[c], errors="coerce")
df = df.dropna(subset=["Close"]).copy()
return df
def build_features(prices: pd.DataFrame, spec: FeatureSpec) -> pd.DataFrame:
df = prices.copy()
df["close"] = df["Close"].astype(float)
df["ret_1"] = df["close"].pct_change()
df["ret_5"] = df["close"].pct_change(5)
df["sma_fast"] = df["close"].rolling(spec.sma_fast).mean()
df["sma_slow"] = df["close"].rolling(spec.sma_slow).mean()
df["sma_ratio"] = df["sma_fast"] / (df["sma_slow"] + 1e-9) - 1.0
df["rsi"] = _rsi(df["close"], spec.rsi_period)
df["vol"] = df["ret_1"].rolling(spec.vol_window).std()
df["ret_next"] = df["close"].pct_change().shift(-1)
df["target"] = (df["ret_next"] > 0).astype(int)
df = df.dropna().copy()
out = df[["close", "ret_1", "ret_5", "sma_ratio", "rsi", "vol", "ret_next", "target"]].copy()
return out
def make_dataset_for_tickers(tickers: List[str], spec: FeatureSpec) -> Tuple[pd.DataFrame, List[str]]:
frames = []
failed = []
for t in tickers:
try:
prices = fetch_prices_stooq(t)
prices = prices.iloc[-(spec.lookback_days + 120):].copy()
feats = build_features(prices, spec)
feats = feats.reset_index().rename(columns={"Date": "date"})
feats["ticker"] = t.upper()
frames.append(feats)
except Exception:
failed.append(t.upper())
if not frames:
raise ValueError("No tickers returned usable data. Try different tickers (e.g., AAPL, MSFT).")
df = pd.concat(frames, ignore_index=True)
df["date"] = pd.to_datetime(df["date"])
df = df.sort_values(["ticker", "date"]).reset_index(drop=True)
return df, failed
def split_train_val_per_ticker(df: pd.DataFrame, train_frac: float = 0.8) -> pd.DataFrame:
parts = []
for t, dft in df.groupby("ticker", sort=False):
dft = dft.sort_values("date").reset_index(drop=True)
n = len(dft)
cut = max(int(n * train_frac), 1)
dft["split"] = "train"
if cut < n:
dft.loc[cut:, "split"] = "val"
parts.append(dft)
return pd.concat(parts, ignore_index=True)
def fig_to_image(fig) -> np.ndarray:
buf = io.BytesIO()
fig.savefig(buf, format="png", bbox_inches="tight", dpi=160)
plt.close(fig)
buf.seek(0)
return plt.imread(buf)
def save_df_to_temp_csv(df: pd.DataFrame, prefix: str) -> str:
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv", prefix=prefix)
df.to_csv(tmp.name, index=False)
return tmp.name
def save_bytes_to_temp_file(b: bytes, suffix: str, prefix: str) -> str:
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix, prefix=prefix)
with open(tmp.name, "wb") as f:
f.write(b)
return tmp.name
# -----------------------------
# Lightning model
# -----------------------------
class LitClassifier(pl.LightningModule):
def __init__(self, n_features: int, lr: float = 1e-2):
super().__init__()
self.save_hyperparameters()
self.net = nn.Sequential(
nn.Linear(n_features, 16),
nn.ReLU(),
nn.Linear(16, 1),
)
self.lr = lr
def forward(self, x):
return self.net(x).squeeze(-1)
def training_step(self, batch, _):
x, y = batch
logits = self(x)
loss = F.binary_cross_entropy_with_logits(logits, y)
self.log("train_loss", loss, on_step=False, on_epoch=True)
return loss
def validation_step(self, batch, _):
x, y = batch
logits = self(x)
loss = F.binary_cross_entropy_with_logits(logits, y)
self.log("val_loss", loss, on_step=False, on_epoch=True)
return loss
def configure_optimizers(self):
return torch.optim.Adam(self.parameters(), lr=self.lr)
def signal_from_prob(p_up: float, buy_th: float, sell_th: float) -> str:
if p_up >= buy_th:
return "BUY (signal)"
if p_up <= sell_th:
return "SELL (signal)"
return "HOLD (signal)"
# -----------------------------
# ONNX wrapper: includes preprocessing + sigmoid
# -----------------------------
class OnnxWrapper(nn.Module):
"""
Takes RAW features in this order:
[ret_1, ret_5, sma_ratio, rsi, vol]
Applies standardization using stored mu/sd, then runs net, then sigmoid -> p_up.
"""
def __init__(self, net: nn.Module, mu: np.ndarray, sd: np.ndarray):
super().__init__()
self.net = net
self.register_buffer("mu", torch.tensor(mu, dtype=torch.float32))
self.register_buffer("sd", torch.tensor(sd, dtype=torch.float32))
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = (x - self.mu) / self.sd
logits = self.net(x).squeeze(-1)
return torch.sigmoid(logits)
def export_onnx_model(trained_model, mu: np.ndarray, sd: np.ndarray, n_features: int) -> str:
"""
Exports a SINGLE-FILE ONNX.
If PyTorch writes external data (onnx_path + '.data'), we merge it into the .onnx
so you do NOT need a separate weights file for inference.
"""
wrapper = OnnxWrapper(trained_model.net.cpu().eval(), mu=mu, sd=sd).eval()
dummy = torch.zeros(1, n_features, dtype=torch.float32)
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".onnx", prefix="signals_model_")
onnx_path = tmp.name
torch.onnx.export(
wrapper,
dummy,
onnx_path,
input_names=["features"],
output_names=["p_up"],
dynamic_axes={"features": {0: "batch"}, "p_up": {0: "batch"}},
opset_version=17,
do_constant_folding=True,
)
# Merge external data into the ONNX (if created)
data_path = onnx_path + ".data"
if os.path.exists(data_path):
m = onnx.load_model(onnx_path, load_external_data=True)
external_data_helper.convert_model_from_external_data(m)
onnx.save_model(m, onnx_path)
try:
os.remove(data_path)
except OSError:
pass
return onnx_path
def onnx_predict_probs(onnx_path: str, X: np.ndarray) -> np.ndarray:
sess = ort.InferenceSession(onnx_path, providers=["CPUExecutionProvider"])
input_name = sess.get_inputs()[0].name
out = sess.run(None, {input_name: X.astype(np.float32)})
probs = out[0]
return probs.reshape(-1)
# -----------------------------
# Main Gradio function
# -----------------------------
def run_app(
tickers_text: str,
lookback_days: int,
lr: float,
batch_size: int,
epochs: int,
seed: int,
buy_threshold: float,
sell_threshold: float,
device_choice: str,
):
pl.seed_everything(int(seed), workers=True)
tickers = [t.strip().upper() for t in tickers_text.split(",") if t.strip()]
tickers = tickers[:10]
if not tickers:
raise gr.Error("Enter at least 1 ticker, e.g. AAPL, MSFT, NVDA")
spec = FeatureSpec(lookback_days=int(lookback_days))
df_raw, failed = make_dataset_for_tickers(tickers, spec)
df = split_train_val_per_ticker(df_raw, train_frac=0.8)
feature_cols = ["ret_1", "ret_5", "sma_ratio", "rsi", "vol"]
n_features = len(feature_cols)
# Standardize using TRAIN split stats
train_df = df[df["split"] == "train"].copy()
mu = train_df[feature_cols].mean().values.astype(np.float32)
sd = train_df[feature_cols].std().replace(0, 1.0).values.astype(np.float32)
df_std = df.copy()
df_std[feature_cols] = (df_std[feature_cols] - mu) / sd
df_std = df_std.replace([np.inf, -np.inf], np.nan).dropna().copy()
# Torch tensors
X_train = torch.tensor(df_std[df_std["split"] == "train"][feature_cols].values, dtype=torch.float32)
y_train = torch.tensor(df_std[df_std["split"] == "train"]["target"].values, dtype=torch.float32)
X_val = torch.tensor(df_std[df_std["split"] == "val"][feature_cols].values, dtype=torch.float32)
y_val = torch.tensor(df_std[df_std["split"] == "val"]["target"].values, dtype=torch.float32)
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=int(batch_size), shuffle=True)
val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=int(batch_size), shuffle=False)
# Lightning Trainer device selection
want_cuda = (device_choice == "cuda")
has_cuda = torch.cuda.is_available()
using_cuda = want_cuda and has_cuda
accelerator = "gpu" if using_cuda else "cpu"
model = LitClassifier(n_features=n_features, lr=float(lr))
trainer = pl.Trainer(
max_epochs=int(epochs),
accelerator=accelerator,
devices=1,
logger=False,
enable_checkpointing=False,
enable_progress_bar=False,
enable_model_summary=False,
deterministic=True,
)
trainer.fit(model, train_dataloaders=train_loader, val_dataloaders=val_loader)
# ---- Export ONNX (single-file; includes preprocessing + sigmoid)
onnx_path = export_onnx_model(model, mu=mu, sd=sd, n_features=n_features)
# ---- Inference: latest row per ticker (compare Torch vs ONNX)
model.eval()
out_rows = []
torch_probs_for_onnx_compare = []
onnx_inputs = []
for t in tickers:
dft_raw = df[df["ticker"] == t].sort_values("date")
if dft_raw.empty:
continue
last_raw = dft_raw.iloc[-1]
x_raw = last_raw[feature_cols].values.astype(np.float32) # raw features (ONNX expects raw)
onnx_inputs.append(x_raw)
x_std = (x_raw - mu) / sd
x_t = torch.tensor(x_std, dtype=torch.float32).unsqueeze(0)
with torch.no_grad():
logit = model(x_t).item()
p_up_torch = 1 / (1 + math.exp(-logit))
torch_probs_for_onnx_compare.append(p_up_torch)
onnx_probs = np.array([])
if len(onnx_inputs) > 0:
X_onnx = np.stack(onnx_inputs, axis=0)
onnx_probs = onnx_predict_probs(onnx_path, X_onnx)
idx = 0
for t in tickers:
dft_raw = df[df["ticker"] == t].sort_values("date")
if dft_raw.empty:
continue
last_raw = dft_raw.iloc[-1]
p_torch = float(torch_probs_for_onnx_compare[idx])
p_onnx = float(onnx_probs[idx]) if len(onnx_probs) else float("nan")
sig = signal_from_prob(
p_onnx if not math.isnan(p_onnx) else p_torch,
float(buy_threshold),
float(sell_threshold),
)
out_rows.append(
{
"ticker": t,
"date": last_raw["date"].date().isoformat(),
"last_close": round(float(last_raw["close"]), 4),
"p_up_torch": round(p_torch, 4),
"p_up_onnx": round(p_onnx, 4) if not math.isnan(p_onnx) else np.nan,
"abs_diff": round(abs(p_torch - p_onnx), 6) if not math.isnan(p_onnx) else np.nan,
"signal (from ONNX if available)": sig,
}
)
idx += 1
signals_df = pd.DataFrame(out_rows)
if not signals_df.empty:
signals_df = signals_df.sort_values("p_up_onnx", ascending=False, na_position="last").reset_index(drop=True)
# Toy backtest for first ticker (val split only)
backtest_img = None
t0 = tickers[0]
d0_raw = df[(df["ticker"] == t0) & (df["split"] == "val")].sort_values("date").copy()
if len(d0_raw) >= 30:
X0_raw = d0_raw[feature_cols].values.astype(np.float32)
p = onnx_predict_probs(onnx_path, X0_raw)
pos = np.zeros_like(p, dtype=float)
pos[p >= float(buy_threshold)] = 1.0
pos[p <= float(sell_threshold)] = -1.0
strat = pos * d0_raw["ret_next"].values
equity = (1 + strat).cumprod()
fig = plt.figure()
plt.plot(equity)
plt.title(f"Toy Backtest (VAL only) — {t0} | ONNX probs")
plt.xlabel("Val days")
plt.ylabel("Equity (start=1.0)")
plt.grid(True, alpha=0.3)
backtest_img = fig_to_image(fig)
# Data preview + download
export_df = df.copy()
export_df["date"] = export_df["date"].dt.date.astype(str)
export_df = export_df[
["date", "ticker", "split", "close", "ret_1", "ret_5", "sma_ratio", "rsi", "vol", "ret_next", "target"]
]
preview_df = export_df.head(25).round(6)
csv_path = save_df_to_temp_csv(export_df.round(8), prefix="signals_dataset_")
inference_snippet = """import numpy as np
import onnxruntime as ort
onnx_path = "model.onnx"
sess = ort.InferenceSession(onnx_path, providers=["CPUExecutionProvider"])
inp = sess.get_inputs()[0].name
# One row of RAW features in order:
# [ret_1, ret_5, sma_ratio, rsi, vol]
x = np.array([[0.001, 0.01, 0.02, 55.0, 0.012]], dtype=np.float32)
p_up = sess.run(None, {inp: x})[0]
print("p_up:", float(np.array(p_up).reshape(-1)[0]))
"""
snippet_path = save_bytes_to_temp_file(inference_snippet.encode("utf-8"), suffix=".py", prefix="onnx_inference_example_")
summary_lines = [
f"Using device for training: {'cuda' if using_cuda else 'cpu'}",
f"Tickers requested (max 10): {', '.join(tickers)}",
f"Rows: {len(export_df)} | train={int((export_df['split']=='train').sum())} | val={int((export_df['split']=='val').sum())}",
f"BUY if p_up >= {buy_threshold:.2f} | SELL if p_up <= {sell_threshold:.2f}",
"ONNX export: wrapper includes preprocessing + sigmoid; exported ONNX is SINGLE-FILE (no .onnx.data).",
]
if failed:
summary_lines.append(f"Tickers with no data / error: {', '.join(failed)}")
summary = "\n".join(summary_lines)
return signals_df, backtest_img, preview_df, csv_path, onnx_path, snippet_path, summary
# -----------------------------
# Gradio UI
# -----------------------------
with gr.Blocks(title="Educational Stock Signals (Lightning + ONNX)") as demo:
gr.Markdown("# Educational Stock Signals (Lightning + ONNX)\n" + DISCLAIMER)
tickers_text = gr.Textbox(
value="AAPL, MSFT, NVDA, AMZN, GOOGL, META, TSLA, JPM, V, XOM",
label="Tickers (comma-separated, up to 10)",
info="Stooq default: AAPL -> aapl.us. If needed, include suffix (e.g., 7203.jp).",
)
with gr.Row():
lookback_days = gr.Slider(200, 2000, value=730, step=10, label="Lookback days (history window)")
lr = gr.Slider(1e-4, 5e-2, value=1e-2, step=1e-4, label="Learning rate (Adam)")
with gr.Row():
batch_size = gr.Dropdown([16, 32, 64, 128, 256], value=64, label="Batch size")
epochs = gr.Slider(1, 30, value=8, step=1, label="Epochs")
seed = gr.Number(value=42, precision=0, label="Seed")
with gr.Row():
buy_threshold = gr.Slider(0.50, 0.80, value=0.55, step=0.01, label="BUY threshold (p_up)")
sell_threshold = gr.Slider(0.20, 0.50, value=0.45, step=0.01, label="SELL threshold (p_up)")
device_choice = gr.Radio(["cpu", "cuda"], value="cpu", label="Device (cuda only if available)")
run_btn = gr.Button("Train + Export ONNX + Infer", variant="primary")
with gr.Tab("Signals (Torch vs ONNX)"):
signals_out = gr.Dataframe(label="Signals + Torch/ONNX comparison", wrap=True)
with gr.Tab("Backtest (toy)"):
backtest_out = gr.Image(label="Toy equity curve (val only; first ticker) using ONNX probs", type="numpy")
with gr.Tab("Data"):
preview_out = gr.Dataframe(label="Feature dataset preview", wrap=True)
download_out = gr.File(label="Download dataset CSV (features + target + split)")
summary_out = gr.Textbox(label="Run summary", lines=10)
with gr.Tab("ONNX Export"):
onnx_file = gr.File(label="Download ONNX model (.onnx) — single-file")
onnx_example = gr.File(label="Download ONNX inference example (.py)")
run_btn.click(
fn=run_app,
inputs=[
tickers_text, lookback_days, lr, batch_size, epochs, seed,
buy_threshold, sell_threshold, device_choice
],
outputs=[signals_out, backtest_out, preview_out, download_out, onnx_file, onnx_example, summary_out],
)
if __name__ == "__main__":
demo.launch()