import os import io import math import tempfile from dataclasses import dataclass from functools import lru_cache from typing import List, Tuple import gradio as gr import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import numpy as np import pandas as pd import requests import torch import torch.nn as nn import torch.nn.functional as F import lightning.pytorch as pl from torch.utils.data import DataLoader, TensorDataset import onnx from onnx import external_data_helper import onnxruntime as ort DISCLAIMER = """ **Disclaimer (Educational Use Only):** This app produces *model signals* from historical price data for learning/demonstration. It is **not** financial advice, not a recommendation, and not suitable for real trading decisions. Markets are risky; consult a qualified professional for investment guidance. """ # ----------------------------- # Feature engineering # ----------------------------- @dataclass class FeatureSpec: lookback_days: int = 730 sma_fast: int = 10 sma_slow: int = 20 rsi_period: int = 14 vol_window: int = 20 def _rsi(close: pd.Series, period: int = 14) -> pd.Series: delta = close.diff() gain = (delta.where(delta > 0, 0.0)).rolling(period).mean() loss = (-delta.where(delta < 0, 0.0)).rolling(period).mean() rs = gain / (loss + 1e-9) return 100 - (100 / (1 + rs)) def _normalize_stooq_ticker(ticker: str) -> str: t = ticker.strip().lower() if not t: return t if "." not in t: t = f"{t}.us" return t @lru_cache(maxsize=128) def fetch_prices_stooq(ticker: str) -> pd.DataFrame: sym = _normalize_stooq_ticker(ticker) url = f"https://stooq.com/q/d/l/?s={sym}&i=d" r = requests.get(url, timeout=25) r.raise_for_status() df = pd.read_csv(io.StringIO(r.text)) if df.empty or "Date" not in df.columns: raise ValueError(f"No data returned for ticker '{ticker}' (stooq symbol '{sym}').") df["Date"] = pd.to_datetime(df["Date"]) df = df.set_index("Date").sort_index() needed = {"Open", "High", "Low", "Close"} if not needed.issubset(set(df.columns)): raise ValueError(f"Unexpected Stooq columns for '{ticker}': {list(df.columns)}") for c in ["Open", "High", "Low", "Close", "Volume"]: if c in df.columns: df[c] = pd.to_numeric(df[c], errors="coerce") df = df.dropna(subset=["Close"]).copy() return df def build_features(prices: pd.DataFrame, spec: FeatureSpec) -> pd.DataFrame: df = prices.copy() df["close"] = df["Close"].astype(float) df["ret_1"] = df["close"].pct_change() df["ret_5"] = df["close"].pct_change(5) df["sma_fast"] = df["close"].rolling(spec.sma_fast).mean() df["sma_slow"] = df["close"].rolling(spec.sma_slow).mean() df["sma_ratio"] = df["sma_fast"] / (df["sma_slow"] + 1e-9) - 1.0 df["rsi"] = _rsi(df["close"], spec.rsi_period) df["vol"] = df["ret_1"].rolling(spec.vol_window).std() df["ret_next"] = df["close"].pct_change().shift(-1) df["target"] = (df["ret_next"] > 0).astype(int) df = df.dropna().copy() out = df[["close", "ret_1", "ret_5", "sma_ratio", "rsi", "vol", "ret_next", "target"]].copy() return out def make_dataset_for_tickers(tickers: List[str], spec: FeatureSpec) -> Tuple[pd.DataFrame, List[str]]: frames = [] failed = [] for t in tickers: try: prices = fetch_prices_stooq(t) prices = prices.iloc[-(spec.lookback_days + 120):].copy() feats = build_features(prices, spec) feats = feats.reset_index().rename(columns={"Date": "date"}) feats["ticker"] = t.upper() frames.append(feats) except Exception: failed.append(t.upper()) if not frames: raise ValueError("No tickers returned usable data. Try different tickers (e.g., AAPL, MSFT).") df = pd.concat(frames, ignore_index=True) df["date"] = pd.to_datetime(df["date"]) df = df.sort_values(["ticker", "date"]).reset_index(drop=True) return df, failed def split_train_val_per_ticker(df: pd.DataFrame, train_frac: float = 0.8) -> pd.DataFrame: parts = [] for t, dft in df.groupby("ticker", sort=False): dft = dft.sort_values("date").reset_index(drop=True) n = len(dft) cut = max(int(n * train_frac), 1) dft["split"] = "train" if cut < n: dft.loc[cut:, "split"] = "val" parts.append(dft) return pd.concat(parts, ignore_index=True) def fig_to_image(fig) -> np.ndarray: buf = io.BytesIO() fig.savefig(buf, format="png", bbox_inches="tight", dpi=160) plt.close(fig) buf.seek(0) return plt.imread(buf) def save_df_to_temp_csv(df: pd.DataFrame, prefix: str) -> str: tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv", prefix=prefix) df.to_csv(tmp.name, index=False) return tmp.name def save_bytes_to_temp_file(b: bytes, suffix: str, prefix: str) -> str: tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix, prefix=prefix) with open(tmp.name, "wb") as f: f.write(b) return tmp.name # ----------------------------- # Lightning model # ----------------------------- class LitClassifier(pl.LightningModule): def __init__(self, n_features: int, lr: float = 1e-2): super().__init__() self.save_hyperparameters() self.net = nn.Sequential( nn.Linear(n_features, 16), nn.ReLU(), nn.Linear(16, 1), ) self.lr = lr def forward(self, x): return self.net(x).squeeze(-1) def training_step(self, batch, _): x, y = batch logits = self(x) loss = F.binary_cross_entropy_with_logits(logits, y) self.log("train_loss", loss, on_step=False, on_epoch=True) return loss def validation_step(self, batch, _): x, y = batch logits = self(x) loss = F.binary_cross_entropy_with_logits(logits, y) self.log("val_loss", loss, on_step=False, on_epoch=True) return loss def configure_optimizers(self): return torch.optim.Adam(self.parameters(), lr=self.lr) def signal_from_prob(p_up: float, buy_th: float, sell_th: float) -> str: if p_up >= buy_th: return "BUY (signal)" if p_up <= sell_th: return "SELL (signal)" return "HOLD (signal)" # ----------------------------- # ONNX wrapper: includes preprocessing + sigmoid # ----------------------------- class OnnxWrapper(nn.Module): """ Takes RAW features in this order: [ret_1, ret_5, sma_ratio, rsi, vol] Applies standardization using stored mu/sd, then runs net, then sigmoid -> p_up. """ def __init__(self, net: nn.Module, mu: np.ndarray, sd: np.ndarray): super().__init__() self.net = net self.register_buffer("mu", torch.tensor(mu, dtype=torch.float32)) self.register_buffer("sd", torch.tensor(sd, dtype=torch.float32)) def forward(self, x: torch.Tensor) -> torch.Tensor: x = (x - self.mu) / self.sd logits = self.net(x).squeeze(-1) return torch.sigmoid(logits) def export_onnx_model(trained_model, mu: np.ndarray, sd: np.ndarray, n_features: int) -> str: """ Exports a SINGLE-FILE ONNX. If PyTorch writes external data (onnx_path + '.data'), we merge it into the .onnx so you do NOT need a separate weights file for inference. """ wrapper = OnnxWrapper(trained_model.net.cpu().eval(), mu=mu, sd=sd).eval() dummy = torch.zeros(1, n_features, dtype=torch.float32) tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".onnx", prefix="signals_model_") onnx_path = tmp.name torch.onnx.export( wrapper, dummy, onnx_path, input_names=["features"], output_names=["p_up"], dynamic_axes={"features": {0: "batch"}, "p_up": {0: "batch"}}, opset_version=17, do_constant_folding=True, ) # Merge external data into the ONNX (if created) data_path = onnx_path + ".data" if os.path.exists(data_path): m = onnx.load_model(onnx_path, load_external_data=True) external_data_helper.convert_model_from_external_data(m) onnx.save_model(m, onnx_path) try: os.remove(data_path) except OSError: pass return onnx_path def onnx_predict_probs(onnx_path: str, X: np.ndarray) -> np.ndarray: sess = ort.InferenceSession(onnx_path, providers=["CPUExecutionProvider"]) input_name = sess.get_inputs()[0].name out = sess.run(None, {input_name: X.astype(np.float32)}) probs = out[0] return probs.reshape(-1) # ----------------------------- # Main Gradio function # ----------------------------- def run_app( tickers_text: str, lookback_days: int, lr: float, batch_size: int, epochs: int, seed: int, buy_threshold: float, sell_threshold: float, device_choice: str, ): pl.seed_everything(int(seed), workers=True) tickers = [t.strip().upper() for t in tickers_text.split(",") if t.strip()] tickers = tickers[:10] if not tickers: raise gr.Error("Enter at least 1 ticker, e.g. AAPL, MSFT, NVDA") spec = FeatureSpec(lookback_days=int(lookback_days)) df_raw, failed = make_dataset_for_tickers(tickers, spec) df = split_train_val_per_ticker(df_raw, train_frac=0.8) feature_cols = ["ret_1", "ret_5", "sma_ratio", "rsi", "vol"] n_features = len(feature_cols) # Standardize using TRAIN split stats train_df = df[df["split"] == "train"].copy() mu = train_df[feature_cols].mean().values.astype(np.float32) sd = train_df[feature_cols].std().replace(0, 1.0).values.astype(np.float32) df_std = df.copy() df_std[feature_cols] = (df_std[feature_cols] - mu) / sd df_std = df_std.replace([np.inf, -np.inf], np.nan).dropna().copy() # Torch tensors X_train = torch.tensor(df_std[df_std["split"] == "train"][feature_cols].values, dtype=torch.float32) y_train = torch.tensor(df_std[df_std["split"] == "train"]["target"].values, dtype=torch.float32) X_val = torch.tensor(df_std[df_std["split"] == "val"][feature_cols].values, dtype=torch.float32) y_val = torch.tensor(df_std[df_std["split"] == "val"]["target"].values, dtype=torch.float32) train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=int(batch_size), shuffle=True) val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=int(batch_size), shuffle=False) # Lightning Trainer device selection want_cuda = (device_choice == "cuda") has_cuda = torch.cuda.is_available() using_cuda = want_cuda and has_cuda accelerator = "gpu" if using_cuda else "cpu" model = LitClassifier(n_features=n_features, lr=float(lr)) trainer = pl.Trainer( max_epochs=int(epochs), accelerator=accelerator, devices=1, logger=False, enable_checkpointing=False, enable_progress_bar=False, enable_model_summary=False, deterministic=True, ) trainer.fit(model, train_dataloaders=train_loader, val_dataloaders=val_loader) # ---- Export ONNX (single-file; includes preprocessing + sigmoid) onnx_path = export_onnx_model(model, mu=mu, sd=sd, n_features=n_features) # ---- Inference: latest row per ticker (compare Torch vs ONNX) model.eval() out_rows = [] torch_probs_for_onnx_compare = [] onnx_inputs = [] for t in tickers: dft_raw = df[df["ticker"] == t].sort_values("date") if dft_raw.empty: continue last_raw = dft_raw.iloc[-1] x_raw = last_raw[feature_cols].values.astype(np.float32) # raw features (ONNX expects raw) onnx_inputs.append(x_raw) x_std = (x_raw - mu) / sd x_t = torch.tensor(x_std, dtype=torch.float32).unsqueeze(0) with torch.no_grad(): logit = model(x_t).item() p_up_torch = 1 / (1 + math.exp(-logit)) torch_probs_for_onnx_compare.append(p_up_torch) onnx_probs = np.array([]) if len(onnx_inputs) > 0: X_onnx = np.stack(onnx_inputs, axis=0) onnx_probs = onnx_predict_probs(onnx_path, X_onnx) idx = 0 for t in tickers: dft_raw = df[df["ticker"] == t].sort_values("date") if dft_raw.empty: continue last_raw = dft_raw.iloc[-1] p_torch = float(torch_probs_for_onnx_compare[idx]) p_onnx = float(onnx_probs[idx]) if len(onnx_probs) else float("nan") sig = signal_from_prob( p_onnx if not math.isnan(p_onnx) else p_torch, float(buy_threshold), float(sell_threshold), ) out_rows.append( { "ticker": t, "date": last_raw["date"].date().isoformat(), "last_close": round(float(last_raw["close"]), 4), "p_up_torch": round(p_torch, 4), "p_up_onnx": round(p_onnx, 4) if not math.isnan(p_onnx) else np.nan, "abs_diff": round(abs(p_torch - p_onnx), 6) if not math.isnan(p_onnx) else np.nan, "signal (from ONNX if available)": sig, } ) idx += 1 signals_df = pd.DataFrame(out_rows) if not signals_df.empty: signals_df = signals_df.sort_values("p_up_onnx", ascending=False, na_position="last").reset_index(drop=True) # Toy backtest for first ticker (val split only) backtest_img = None t0 = tickers[0] d0_raw = df[(df["ticker"] == t0) & (df["split"] == "val")].sort_values("date").copy() if len(d0_raw) >= 30: X0_raw = d0_raw[feature_cols].values.astype(np.float32) p = onnx_predict_probs(onnx_path, X0_raw) pos = np.zeros_like(p, dtype=float) pos[p >= float(buy_threshold)] = 1.0 pos[p <= float(sell_threshold)] = -1.0 strat = pos * d0_raw["ret_next"].values equity = (1 + strat).cumprod() fig = plt.figure() plt.plot(equity) plt.title(f"Toy Backtest (VAL only) — {t0} | ONNX probs") plt.xlabel("Val days") plt.ylabel("Equity (start=1.0)") plt.grid(True, alpha=0.3) backtest_img = fig_to_image(fig) # Data preview + download export_df = df.copy() export_df["date"] = export_df["date"].dt.date.astype(str) export_df = export_df[ ["date", "ticker", "split", "close", "ret_1", "ret_5", "sma_ratio", "rsi", "vol", "ret_next", "target"] ] preview_df = export_df.head(25).round(6) csv_path = save_df_to_temp_csv(export_df.round(8), prefix="signals_dataset_") inference_snippet = """import numpy as np import onnxruntime as ort onnx_path = "model.onnx" sess = ort.InferenceSession(onnx_path, providers=["CPUExecutionProvider"]) inp = sess.get_inputs()[0].name # One row of RAW features in order: # [ret_1, ret_5, sma_ratio, rsi, vol] x = np.array([[0.001, 0.01, 0.02, 55.0, 0.012]], dtype=np.float32) p_up = sess.run(None, {inp: x})[0] print("p_up:", float(np.array(p_up).reshape(-1)[0])) """ snippet_path = save_bytes_to_temp_file(inference_snippet.encode("utf-8"), suffix=".py", prefix="onnx_inference_example_") summary_lines = [ f"Using device for training: {'cuda' if using_cuda else 'cpu'}", f"Tickers requested (max 10): {', '.join(tickers)}", f"Rows: {len(export_df)} | train={int((export_df['split']=='train').sum())} | val={int((export_df['split']=='val').sum())}", f"BUY if p_up >= {buy_threshold:.2f} | SELL if p_up <= {sell_threshold:.2f}", "ONNX export: wrapper includes preprocessing + sigmoid; exported ONNX is SINGLE-FILE (no .onnx.data).", ] if failed: summary_lines.append(f"Tickers with no data / error: {', '.join(failed)}") summary = "\n".join(summary_lines) return signals_df, backtest_img, preview_df, csv_path, onnx_path, snippet_path, summary # ----------------------------- # Gradio UI # ----------------------------- with gr.Blocks(title="Educational Stock Signals (Lightning + ONNX)") as demo: gr.Markdown("# Educational Stock Signals (Lightning + ONNX)\n" + DISCLAIMER) tickers_text = gr.Textbox( value="AAPL, MSFT, NVDA, AMZN, GOOGL, META, TSLA, JPM, V, XOM", label="Tickers (comma-separated, up to 10)", info="Stooq default: AAPL -> aapl.us. If needed, include suffix (e.g., 7203.jp).", ) with gr.Row(): lookback_days = gr.Slider(200, 2000, value=730, step=10, label="Lookback days (history window)") lr = gr.Slider(1e-4, 5e-2, value=1e-2, step=1e-4, label="Learning rate (Adam)") with gr.Row(): batch_size = gr.Dropdown([16, 32, 64, 128, 256], value=64, label="Batch size") epochs = gr.Slider(1, 30, value=8, step=1, label="Epochs") seed = gr.Number(value=42, precision=0, label="Seed") with gr.Row(): buy_threshold = gr.Slider(0.50, 0.80, value=0.55, step=0.01, label="BUY threshold (p_up)") sell_threshold = gr.Slider(0.20, 0.50, value=0.45, step=0.01, label="SELL threshold (p_up)") device_choice = gr.Radio(["cpu", "cuda"], value="cpu", label="Device (cuda only if available)") run_btn = gr.Button("Train + Export ONNX + Infer", variant="primary") with gr.Tab("Signals (Torch vs ONNX)"): signals_out = gr.Dataframe(label="Signals + Torch/ONNX comparison", wrap=True) with gr.Tab("Backtest (toy)"): backtest_out = gr.Image(label="Toy equity curve (val only; first ticker) using ONNX probs", type="numpy") with gr.Tab("Data"): preview_out = gr.Dataframe(label="Feature dataset preview", wrap=True) download_out = gr.File(label="Download dataset CSV (features + target + split)") summary_out = gr.Textbox(label="Run summary", lines=10) with gr.Tab("ONNX Export"): onnx_file = gr.File(label="Download ONNX model (.onnx) — single-file") onnx_example = gr.File(label="Download ONNX inference example (.py)") run_btn.click( fn=run_app, inputs=[ tickers_text, lookback_days, lr, batch_size, epochs, seed, buy_threshold, sell_threshold, device_choice ], outputs=[signals_out, backtest_out, preview_out, download_out, onnx_file, onnx_example, summary_out], ) if __name__ == "__main__": demo.launch()