Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import math | |
| import tempfile | |
| from dataclasses import dataclass | |
| from functools import lru_cache | |
| from typing import List, Tuple | |
| import gradio as gr | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import pandas as pd | |
| import requests | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import lightning.pytorch as pl | |
| from torch.utils.data import DataLoader, TensorDataset | |
| import onnx | |
| from onnx import external_data_helper | |
| import onnxruntime as ort | |
| DISCLAIMER = """ | |
| **Disclaimer (Educational Use Only):** | |
| This app produces *model signals* from historical price data for learning/demonstration. | |
| It is **not** financial advice, not a recommendation, and not suitable for real trading decisions. | |
| Markets are risky; consult a qualified professional for investment guidance. | |
| """ | |
| # ----------------------------- | |
| # Feature engineering | |
| # ----------------------------- | |
| class FeatureSpec: | |
| lookback_days: int = 730 | |
| sma_fast: int = 10 | |
| sma_slow: int = 20 | |
| rsi_period: int = 14 | |
| vol_window: int = 20 | |
| def _rsi(close: pd.Series, period: int = 14) -> pd.Series: | |
| delta = close.diff() | |
| gain = (delta.where(delta > 0, 0.0)).rolling(period).mean() | |
| loss = (-delta.where(delta < 0, 0.0)).rolling(period).mean() | |
| rs = gain / (loss + 1e-9) | |
| return 100 - (100 / (1 + rs)) | |
| def _normalize_stooq_ticker(ticker: str) -> str: | |
| t = ticker.strip().lower() | |
| if not t: | |
| return t | |
| if "." not in t: | |
| t = f"{t}.us" | |
| return t | |
| def fetch_prices_stooq(ticker: str) -> pd.DataFrame: | |
| sym = _normalize_stooq_ticker(ticker) | |
| url = f"https://stooq.com/q/d/l/?s={sym}&i=d" | |
| r = requests.get(url, timeout=25) | |
| r.raise_for_status() | |
| df = pd.read_csv(io.StringIO(r.text)) | |
| if df.empty or "Date" not in df.columns: | |
| raise ValueError(f"No data returned for ticker '{ticker}' (stooq symbol '{sym}').") | |
| df["Date"] = pd.to_datetime(df["Date"]) | |
| df = df.set_index("Date").sort_index() | |
| needed = {"Open", "High", "Low", "Close"} | |
| if not needed.issubset(set(df.columns)): | |
| raise ValueError(f"Unexpected Stooq columns for '{ticker}': {list(df.columns)}") | |
| for c in ["Open", "High", "Low", "Close", "Volume"]: | |
| if c in df.columns: | |
| df[c] = pd.to_numeric(df[c], errors="coerce") | |
| df = df.dropna(subset=["Close"]).copy() | |
| return df | |
| def build_features(prices: pd.DataFrame, spec: FeatureSpec) -> pd.DataFrame: | |
| df = prices.copy() | |
| df["close"] = df["Close"].astype(float) | |
| df["ret_1"] = df["close"].pct_change() | |
| df["ret_5"] = df["close"].pct_change(5) | |
| df["sma_fast"] = df["close"].rolling(spec.sma_fast).mean() | |
| df["sma_slow"] = df["close"].rolling(spec.sma_slow).mean() | |
| df["sma_ratio"] = df["sma_fast"] / (df["sma_slow"] + 1e-9) - 1.0 | |
| df["rsi"] = _rsi(df["close"], spec.rsi_period) | |
| df["vol"] = df["ret_1"].rolling(spec.vol_window).std() | |
| df["ret_next"] = df["close"].pct_change().shift(-1) | |
| df["target"] = (df["ret_next"] > 0).astype(int) | |
| df = df.dropna().copy() | |
| out = df[["close", "ret_1", "ret_5", "sma_ratio", "rsi", "vol", "ret_next", "target"]].copy() | |
| return out | |
| def make_dataset_for_tickers(tickers: List[str], spec: FeatureSpec) -> Tuple[pd.DataFrame, List[str]]: | |
| frames = [] | |
| failed = [] | |
| for t in tickers: | |
| try: | |
| prices = fetch_prices_stooq(t) | |
| prices = prices.iloc[-(spec.lookback_days + 120):].copy() | |
| feats = build_features(prices, spec) | |
| feats = feats.reset_index().rename(columns={"Date": "date"}) | |
| feats["ticker"] = t.upper() | |
| frames.append(feats) | |
| except Exception: | |
| failed.append(t.upper()) | |
| if not frames: | |
| raise ValueError("No tickers returned usable data. Try different tickers (e.g., AAPL, MSFT).") | |
| df = pd.concat(frames, ignore_index=True) | |
| df["date"] = pd.to_datetime(df["date"]) | |
| df = df.sort_values(["ticker", "date"]).reset_index(drop=True) | |
| return df, failed | |
| def split_train_val_per_ticker(df: pd.DataFrame, train_frac: float = 0.8) -> pd.DataFrame: | |
| parts = [] | |
| for t, dft in df.groupby("ticker", sort=False): | |
| dft = dft.sort_values("date").reset_index(drop=True) | |
| n = len(dft) | |
| cut = max(int(n * train_frac), 1) | |
| dft["split"] = "train" | |
| if cut < n: | |
| dft.loc[cut:, "split"] = "val" | |
| parts.append(dft) | |
| return pd.concat(parts, ignore_index=True) | |
| def fig_to_image(fig) -> np.ndarray: | |
| buf = io.BytesIO() | |
| fig.savefig(buf, format="png", bbox_inches="tight", dpi=160) | |
| plt.close(fig) | |
| buf.seek(0) | |
| return plt.imread(buf) | |
| def save_df_to_temp_csv(df: pd.DataFrame, prefix: str) -> str: | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv", prefix=prefix) | |
| df.to_csv(tmp.name, index=False) | |
| return tmp.name | |
| def save_bytes_to_temp_file(b: bytes, suffix: str, prefix: str) -> str: | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix, prefix=prefix) | |
| with open(tmp.name, "wb") as f: | |
| f.write(b) | |
| return tmp.name | |
| # ----------------------------- | |
| # Lightning model | |
| # ----------------------------- | |
| class LitClassifier(pl.LightningModule): | |
| def __init__(self, n_features: int, lr: float = 1e-2): | |
| super().__init__() | |
| self.save_hyperparameters() | |
| self.net = nn.Sequential( | |
| nn.Linear(n_features, 16), | |
| nn.ReLU(), | |
| nn.Linear(16, 1), | |
| ) | |
| self.lr = lr | |
| def forward(self, x): | |
| return self.net(x).squeeze(-1) | |
| def training_step(self, batch, _): | |
| x, y = batch | |
| logits = self(x) | |
| loss = F.binary_cross_entropy_with_logits(logits, y) | |
| self.log("train_loss", loss, on_step=False, on_epoch=True) | |
| return loss | |
| def validation_step(self, batch, _): | |
| x, y = batch | |
| logits = self(x) | |
| loss = F.binary_cross_entropy_with_logits(logits, y) | |
| self.log("val_loss", loss, on_step=False, on_epoch=True) | |
| return loss | |
| def configure_optimizers(self): | |
| return torch.optim.Adam(self.parameters(), lr=self.lr) | |
| def signal_from_prob(p_up: float, buy_th: float, sell_th: float) -> str: | |
| if p_up >= buy_th: | |
| return "BUY (signal)" | |
| if p_up <= sell_th: | |
| return "SELL (signal)" | |
| return "HOLD (signal)" | |
| # ----------------------------- | |
| # ONNX wrapper: includes preprocessing + sigmoid | |
| # ----------------------------- | |
| class OnnxWrapper(nn.Module): | |
| """ | |
| Takes RAW features in this order: | |
| [ret_1, ret_5, sma_ratio, rsi, vol] | |
| Applies standardization using stored mu/sd, then runs net, then sigmoid -> p_up. | |
| """ | |
| def __init__(self, net: nn.Module, mu: np.ndarray, sd: np.ndarray): | |
| super().__init__() | |
| self.net = net | |
| self.register_buffer("mu", torch.tensor(mu, dtype=torch.float32)) | |
| self.register_buffer("sd", torch.tensor(sd, dtype=torch.float32)) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| x = (x - self.mu) / self.sd | |
| logits = self.net(x).squeeze(-1) | |
| return torch.sigmoid(logits) | |
| def export_onnx_model(trained_model, mu: np.ndarray, sd: np.ndarray, n_features: int) -> str: | |
| """ | |
| Exports a SINGLE-FILE ONNX. | |
| If PyTorch writes external data (onnx_path + '.data'), we merge it into the .onnx | |
| so you do NOT need a separate weights file for inference. | |
| """ | |
| wrapper = OnnxWrapper(trained_model.net.cpu().eval(), mu=mu, sd=sd).eval() | |
| dummy = torch.zeros(1, n_features, dtype=torch.float32) | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".onnx", prefix="signals_model_") | |
| onnx_path = tmp.name | |
| torch.onnx.export( | |
| wrapper, | |
| dummy, | |
| onnx_path, | |
| input_names=["features"], | |
| output_names=["p_up"], | |
| dynamic_axes={"features": {0: "batch"}, "p_up": {0: "batch"}}, | |
| opset_version=17, | |
| do_constant_folding=True, | |
| ) | |
| # Merge external data into the ONNX (if created) | |
| data_path = onnx_path + ".data" | |
| if os.path.exists(data_path): | |
| m = onnx.load_model(onnx_path, load_external_data=True) | |
| external_data_helper.convert_model_from_external_data(m) | |
| onnx.save_model(m, onnx_path) | |
| try: | |
| os.remove(data_path) | |
| except OSError: | |
| pass | |
| return onnx_path | |
| def onnx_predict_probs(onnx_path: str, X: np.ndarray) -> np.ndarray: | |
| sess = ort.InferenceSession(onnx_path, providers=["CPUExecutionProvider"]) | |
| input_name = sess.get_inputs()[0].name | |
| out = sess.run(None, {input_name: X.astype(np.float32)}) | |
| probs = out[0] | |
| return probs.reshape(-1) | |
| # ----------------------------- | |
| # Main Gradio function | |
| # ----------------------------- | |
| def run_app( | |
| tickers_text: str, | |
| lookback_days: int, | |
| lr: float, | |
| batch_size: int, | |
| epochs: int, | |
| seed: int, | |
| buy_threshold: float, | |
| sell_threshold: float, | |
| device_choice: str, | |
| ): | |
| pl.seed_everything(int(seed), workers=True) | |
| tickers = [t.strip().upper() for t in tickers_text.split(",") if t.strip()] | |
| tickers = tickers[:10] | |
| if not tickers: | |
| raise gr.Error("Enter at least 1 ticker, e.g. AAPL, MSFT, NVDA") | |
| spec = FeatureSpec(lookback_days=int(lookback_days)) | |
| df_raw, failed = make_dataset_for_tickers(tickers, spec) | |
| df = split_train_val_per_ticker(df_raw, train_frac=0.8) | |
| feature_cols = ["ret_1", "ret_5", "sma_ratio", "rsi", "vol"] | |
| n_features = len(feature_cols) | |
| # Standardize using TRAIN split stats | |
| train_df = df[df["split"] == "train"].copy() | |
| mu = train_df[feature_cols].mean().values.astype(np.float32) | |
| sd = train_df[feature_cols].std().replace(0, 1.0).values.astype(np.float32) | |
| df_std = df.copy() | |
| df_std[feature_cols] = (df_std[feature_cols] - mu) / sd | |
| df_std = df_std.replace([np.inf, -np.inf], np.nan).dropna().copy() | |
| # Torch tensors | |
| X_train = torch.tensor(df_std[df_std["split"] == "train"][feature_cols].values, dtype=torch.float32) | |
| y_train = torch.tensor(df_std[df_std["split"] == "train"]["target"].values, dtype=torch.float32) | |
| X_val = torch.tensor(df_std[df_std["split"] == "val"][feature_cols].values, dtype=torch.float32) | |
| y_val = torch.tensor(df_std[df_std["split"] == "val"]["target"].values, dtype=torch.float32) | |
| train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=int(batch_size), shuffle=True) | |
| val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=int(batch_size), shuffle=False) | |
| # Lightning Trainer device selection | |
| want_cuda = (device_choice == "cuda") | |
| has_cuda = torch.cuda.is_available() | |
| using_cuda = want_cuda and has_cuda | |
| accelerator = "gpu" if using_cuda else "cpu" | |
| model = LitClassifier(n_features=n_features, lr=float(lr)) | |
| trainer = pl.Trainer( | |
| max_epochs=int(epochs), | |
| accelerator=accelerator, | |
| devices=1, | |
| logger=False, | |
| enable_checkpointing=False, | |
| enable_progress_bar=False, | |
| enable_model_summary=False, | |
| deterministic=True, | |
| ) | |
| trainer.fit(model, train_dataloaders=train_loader, val_dataloaders=val_loader) | |
| # ---- Export ONNX (single-file; includes preprocessing + sigmoid) | |
| onnx_path = export_onnx_model(model, mu=mu, sd=sd, n_features=n_features) | |
| # ---- Inference: latest row per ticker (compare Torch vs ONNX) | |
| model.eval() | |
| out_rows = [] | |
| torch_probs_for_onnx_compare = [] | |
| onnx_inputs = [] | |
| for t in tickers: | |
| dft_raw = df[df["ticker"] == t].sort_values("date") | |
| if dft_raw.empty: | |
| continue | |
| last_raw = dft_raw.iloc[-1] | |
| x_raw = last_raw[feature_cols].values.astype(np.float32) # raw features (ONNX expects raw) | |
| onnx_inputs.append(x_raw) | |
| x_std = (x_raw - mu) / sd | |
| x_t = torch.tensor(x_std, dtype=torch.float32).unsqueeze(0) | |
| with torch.no_grad(): | |
| logit = model(x_t).item() | |
| p_up_torch = 1 / (1 + math.exp(-logit)) | |
| torch_probs_for_onnx_compare.append(p_up_torch) | |
| onnx_probs = np.array([]) | |
| if len(onnx_inputs) > 0: | |
| X_onnx = np.stack(onnx_inputs, axis=0) | |
| onnx_probs = onnx_predict_probs(onnx_path, X_onnx) | |
| idx = 0 | |
| for t in tickers: | |
| dft_raw = df[df["ticker"] == t].sort_values("date") | |
| if dft_raw.empty: | |
| continue | |
| last_raw = dft_raw.iloc[-1] | |
| p_torch = float(torch_probs_for_onnx_compare[idx]) | |
| p_onnx = float(onnx_probs[idx]) if len(onnx_probs) else float("nan") | |
| sig = signal_from_prob( | |
| p_onnx if not math.isnan(p_onnx) else p_torch, | |
| float(buy_threshold), | |
| float(sell_threshold), | |
| ) | |
| out_rows.append( | |
| { | |
| "ticker": t, | |
| "date": last_raw["date"].date().isoformat(), | |
| "last_close": round(float(last_raw["close"]), 4), | |
| "p_up_torch": round(p_torch, 4), | |
| "p_up_onnx": round(p_onnx, 4) if not math.isnan(p_onnx) else np.nan, | |
| "abs_diff": round(abs(p_torch - p_onnx), 6) if not math.isnan(p_onnx) else np.nan, | |
| "signal (from ONNX if available)": sig, | |
| } | |
| ) | |
| idx += 1 | |
| signals_df = pd.DataFrame(out_rows) | |
| if not signals_df.empty: | |
| signals_df = signals_df.sort_values("p_up_onnx", ascending=False, na_position="last").reset_index(drop=True) | |
| # Toy backtest for first ticker (val split only) | |
| backtest_img = None | |
| t0 = tickers[0] | |
| d0_raw = df[(df["ticker"] == t0) & (df["split"] == "val")].sort_values("date").copy() | |
| if len(d0_raw) >= 30: | |
| X0_raw = d0_raw[feature_cols].values.astype(np.float32) | |
| p = onnx_predict_probs(onnx_path, X0_raw) | |
| pos = np.zeros_like(p, dtype=float) | |
| pos[p >= float(buy_threshold)] = 1.0 | |
| pos[p <= float(sell_threshold)] = -1.0 | |
| strat = pos * d0_raw["ret_next"].values | |
| equity = (1 + strat).cumprod() | |
| fig = plt.figure() | |
| plt.plot(equity) | |
| plt.title(f"Toy Backtest (VAL only) — {t0} | ONNX probs") | |
| plt.xlabel("Val days") | |
| plt.ylabel("Equity (start=1.0)") | |
| plt.grid(True, alpha=0.3) | |
| backtest_img = fig_to_image(fig) | |
| # Data preview + download | |
| export_df = df.copy() | |
| export_df["date"] = export_df["date"].dt.date.astype(str) | |
| export_df = export_df[ | |
| ["date", "ticker", "split", "close", "ret_1", "ret_5", "sma_ratio", "rsi", "vol", "ret_next", "target"] | |
| ] | |
| preview_df = export_df.head(25).round(6) | |
| csv_path = save_df_to_temp_csv(export_df.round(8), prefix="signals_dataset_") | |
| inference_snippet = """import numpy as np | |
| import onnxruntime as ort | |
| onnx_path = "model.onnx" | |
| sess = ort.InferenceSession(onnx_path, providers=["CPUExecutionProvider"]) | |
| inp = sess.get_inputs()[0].name | |
| # One row of RAW features in order: | |
| # [ret_1, ret_5, sma_ratio, rsi, vol] | |
| x = np.array([[0.001, 0.01, 0.02, 55.0, 0.012]], dtype=np.float32) | |
| p_up = sess.run(None, {inp: x})[0] | |
| print("p_up:", float(np.array(p_up).reshape(-1)[0])) | |
| """ | |
| snippet_path = save_bytes_to_temp_file(inference_snippet.encode("utf-8"), suffix=".py", prefix="onnx_inference_example_") | |
| summary_lines = [ | |
| f"Using device for training: {'cuda' if using_cuda else 'cpu'}", | |
| f"Tickers requested (max 10): {', '.join(tickers)}", | |
| f"Rows: {len(export_df)} | train={int((export_df['split']=='train').sum())} | val={int((export_df['split']=='val').sum())}", | |
| f"BUY if p_up >= {buy_threshold:.2f} | SELL if p_up <= {sell_threshold:.2f}", | |
| "ONNX export: wrapper includes preprocessing + sigmoid; exported ONNX is SINGLE-FILE (no .onnx.data).", | |
| ] | |
| if failed: | |
| summary_lines.append(f"Tickers with no data / error: {', '.join(failed)}") | |
| summary = "\n".join(summary_lines) | |
| return signals_df, backtest_img, preview_df, csv_path, onnx_path, snippet_path, summary | |
| # ----------------------------- | |
| # Gradio UI | |
| # ----------------------------- | |
| with gr.Blocks(title="Educational Stock Signals (Lightning + ONNX)") as demo: | |
| gr.Markdown("# Educational Stock Signals (Lightning + ONNX)\n" + DISCLAIMER) | |
| tickers_text = gr.Textbox( | |
| value="AAPL, MSFT, NVDA, AMZN, GOOGL, META, TSLA, JPM, V, XOM", | |
| label="Tickers (comma-separated, up to 10)", | |
| info="Stooq default: AAPL -> aapl.us. If needed, include suffix (e.g., 7203.jp).", | |
| ) | |
| with gr.Row(): | |
| lookback_days = gr.Slider(200, 2000, value=730, step=10, label="Lookback days (history window)") | |
| lr = gr.Slider(1e-4, 5e-2, value=1e-2, step=1e-4, label="Learning rate (Adam)") | |
| with gr.Row(): | |
| batch_size = gr.Dropdown([16, 32, 64, 128, 256], value=64, label="Batch size") | |
| epochs = gr.Slider(1, 30, value=8, step=1, label="Epochs") | |
| seed = gr.Number(value=42, precision=0, label="Seed") | |
| with gr.Row(): | |
| buy_threshold = gr.Slider(0.50, 0.80, value=0.55, step=0.01, label="BUY threshold (p_up)") | |
| sell_threshold = gr.Slider(0.20, 0.50, value=0.45, step=0.01, label="SELL threshold (p_up)") | |
| device_choice = gr.Radio(["cpu", "cuda"], value="cpu", label="Device (cuda only if available)") | |
| run_btn = gr.Button("Train + Export ONNX + Infer", variant="primary") | |
| with gr.Tab("Signals (Torch vs ONNX)"): | |
| signals_out = gr.Dataframe(label="Signals + Torch/ONNX comparison", wrap=True) | |
| with gr.Tab("Backtest (toy)"): | |
| backtest_out = gr.Image(label="Toy equity curve (val only; first ticker) using ONNX probs", type="numpy") | |
| with gr.Tab("Data"): | |
| preview_out = gr.Dataframe(label="Feature dataset preview", wrap=True) | |
| download_out = gr.File(label="Download dataset CSV (features + target + split)") | |
| summary_out = gr.Textbox(label="Run summary", lines=10) | |
| with gr.Tab("ONNX Export"): | |
| onnx_file = gr.File(label="Download ONNX model (.onnx) — single-file") | |
| onnx_example = gr.File(label="Download ONNX inference example (.py)") | |
| run_btn.click( | |
| fn=run_app, | |
| inputs=[ | |
| tickers_text, lookback_days, lr, batch_size, epochs, seed, | |
| buy_threshold, sell_threshold, device_choice | |
| ], | |
| outputs=[signals_out, backtest_out, preview_out, download_out, onnx_file, onnx_example, summary_out], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |