PyTorchTest / app.py
eaglelandsonce's picture
Create app.py
c042a55 verified
# app.py
import os
import tempfile
import uuid
import gradio as gr
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
def _pick_device(device_choice: str) -> torch.device:
if device_choice == "cuda":
return torch.device("cuda" if torch.cuda.is_available() else "cpu")
if device_choice == "cpu":
return torch.device("cpu")
# auto
return torch.device("cuda" if torch.cuda.is_available() else "cpu")
def make_synthetic_regression(n_samples: int, noise_std: float, seed: int):
"""
X shape: (n_samples, 10)
y = X @ w_true + b_true + noise
"""
n_features = 10
g = torch.Generator().manual_seed(int(seed))
X = torch.randn(n_samples, n_features, generator=g)
w_true = torch.randn(n_features, 1, generator=g)
b_true = torch.randn(1, generator=g)
noise = noise_std * torch.randn(n_samples, 1, generator=g)
y = X @ w_true + b_true + noise
# 80/20 split (shuffled)
idx = torch.randperm(n_samples, generator=g)
n_train = int(round(0.8 * n_samples))
train_idx = idx[:n_train]
val_idx = idx[n_train:]
X_train, y_train = X[train_idx], y[train_idx]
X_val, y_val = X[val_idx], y[val_idx]
# Full dataframe for CSV download
cols = [f"x{i}" for i in range(n_features)]
df = pd.DataFrame(X.numpy(), columns=cols)
df["y"] = y.numpy().reshape(-1)
split = np.array(["val"] * n_samples, dtype=object)
split[train_idx.numpy()] = "train"
df["split"] = split
# Data preview: first 20 TRAIN rows
df_train_preview = df[df["split"] == "train"].head(20).reset_index(drop=True)
return (X_train, y_train, X_val, y_val, w_true, b_true, df, df_train_preview)
def train_raw_pytorch_loop(
X_train: torch.Tensor,
y_train: torch.Tensor,
X_val: torch.Tensor,
y_val: torch.Tensor,
lr: float,
batch_size: int,
epochs: int,
seed: int,
device: torch.device,
):
# Ensure deterministic-ish behavior for model init
torch.manual_seed(int(seed) + 12345)
model = nn.Linear(10, 1).to(device)
loss_fn = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
train_loader = DataLoader(
TensorDataset(X_train, y_train),
batch_size=batch_size,
shuffle=True,
drop_last=False,
)
val_loader = DataLoader(
TensorDataset(X_val, y_val),
batch_size=batch_size,
shuffle=False,
drop_last=False,
)
train_losses = []
val_losses = []
for _epoch in range(epochs):
# ---- TRAIN ----
model.train()
running = 0.0
n_seen = 0
for xb, yb in train_loader:
xb = xb.to(device)
yb = yb.to(device)
# Manual training loop steps:
optimizer.zero_grad() # 1) zero_grad
y_pred = model(xb) # 2) forward
loss = loss_fn(y_pred, yb) # 3) loss
loss.backward() # 4) backward
optimizer.step() # 5) step
bs = xb.shape[0]
running += loss.item() * bs
n_seen += bs
train_losses.append(running / max(1, n_seen))
# ---- VAL ----
model.eval()
running = 0.0
n_seen = 0
with torch.no_grad():
for xb, yb in val_loader:
xb = xb.to(device)
yb = yb.to(device)
y_pred = model(xb)
loss = loss_fn(y_pred, yb)
bs = xb.shape[0]
running += loss.item() * bs
n_seen += bs
val_losses.append(running / max(1, n_seen))
return model, train_losses, val_losses
def build_weight_comparison(w_true: torch.Tensor, b_true: torch.Tensor, model: nn.Linear):
w_learned = model.weight.detach().cpu().numpy().reshape(-1)
b_learned = float(model.bias.detach().cpu().numpy().reshape(-1)[0])
w_true_np = w_true.detach().cpu().numpy().reshape(-1)
b_true_np = float(b_true.detach().cpu().numpy().reshape(-1)[0])
rows = []
for i in range(10):
rows.append(
{
"param": f"w[{i}] (x{i})",
"true": float(w_true_np[i]),
"learned": float(w_learned[i]),
"abs_error": float(abs(w_true_np[i] - w_learned[i])),
}
)
rows.append(
{
"param": "bias (b)",
"true": b_true_np,
"learned": b_learned,
"abs_error": float(abs(b_true_np - b_learned)),
}
)
return pd.DataFrame(rows)
def make_loss_plot(train_losses, val_losses):
fig, ax = plt.subplots()
xs = np.arange(1, len(train_losses) + 1)
ax.plot(xs, train_losses, label="train")
ax.plot(xs, val_losses, label="val")
ax.set_title("Raw PyTorch Training Loop (Linear Regression)")
ax.set_xlabel("Epoch")
ax.set_ylabel("MSE Loss")
ax.legend()
ax.grid(True, alpha=0.3)
fig.tight_layout()
return fig
def run_experiment(n_samples, noise_std, lr, batch_size, epochs, seed, device_choice):
# sanitize
n_samples = int(n_samples)
batch_size = int(batch_size)
epochs = int(epochs)
seed = int(seed)
noise_std = float(noise_std)
lr = float(lr)
device = _pick_device(device_choice)
X_train, y_train, X_val, y_val, w_true, b_true, df_full, df_train_preview = make_synthetic_regression(
n_samples=n_samples,
noise_std=noise_std,
seed=seed,
)
model, train_losses, val_losses = train_raw_pytorch_loop(
X_train=X_train,
y_train=y_train,
X_val=X_val,
y_val=y_val,
lr=lr,
batch_size=batch_size,
epochs=epochs,
seed=seed,
device=device,
)
fig = make_loss_plot(train_losses, val_losses)
w_table = build_weight_comparison(w_true, b_true, model)
# Save dataset CSV for download
out_path = os.path.join(
tempfile.gettempdir(),
f"synthetic_regression_{uuid.uuid4().hex}.csv",
)
df_full.to_csv(out_path, index=False)
summary = (
"Raw PyTorch loop steps used each batch:\n"
" optimizer.zero_grad() -> model(x) -> loss_fn(...) -> loss.backward() -> optimizer.step()\n\n"
f"Device used: {device.type}\n"
f"Samples: {n_samples} (train={int(round(0.8*n_samples))}, val={n_samples-int(round(0.8*n_samples))})\n"
f"Noise std: {noise_std}\n"
f"LR: {lr}, Batch size: {batch_size}, Epochs: {epochs}, Seed: {seed}\n\n"
f"Final train loss: {train_losses[-1]:.6f}\n"
f"Final val loss: {val_losses[-1]:.6f}\n"
)
return fig, w_table, summary, df_train_preview, out_path
def build_ui():
available_devices = ["auto", "cpu"]
if torch.cuda.is_available():
available_devices.append("cuda")
with gr.Blocks(title="Raw PyTorch Training Loop (Gradio)") as demo:
gr.Markdown(
"""
# Raw PyTorch Training Loop (Linear Regression)
This Space generates a fresh synthetic regression dataset each run and trains a `nn.Linear(10, 1)` model using a **manual** PyTorch training loop.
"""
)
with gr.Tabs():
with gr.Tab("Train & Results"):
with gr.Row():
with gr.Column(scale=1):
n_samples = gr.Slider(
minimum=200,
maximum=20000,
value=2000,
step=100,
label="n_samples",
)
noise_std = gr.Slider(
minimum=0.0,
maximum=5.0,
value=1.0,
step=0.05,
label="noise_std",
)
lr = gr.Number(value=0.01, label="lr (SGD learning rate)", precision=6)
batch_size = gr.Slider(
minimum=8,
maximum=1024,
value=64,
step=8,
label="batch_size",
)
epochs = gr.Slider(
minimum=1,
maximum=200,
value=20,
step=1,
label="epochs",
)
seed = gr.Number(value=42, label="seed", precision=0)
device_choice = gr.Dropdown(
choices=available_devices,
value="auto",
label="device (cpu/cuda if available)",
)
run_btn = gr.Button("Run training")
with gr.Column(scale=2):
loss_plot = gr.Plot(label="Loss curve (train vs val)")
w_compare = gr.Dataframe(
label="w_true vs w_learned (and bias)",
interactive=False,
wrap=True,
)
summary = gr.Textbox(
label="Summary",
lines=10,
interactive=False,
)
dataset_file = gr.File(
label="Download full dataset CSV (train+val): columns x0..x9, y, split",
interactive=False,
)
run_btn.click(
fn=run_experiment,
inputs=[n_samples, noise_std, lr, batch_size, epochs, seed, device_choice],
outputs=[loss_plot, w_compare, summary, gr.State(), dataset_file],
)
# We need the Data Preview tab to show first 20 training rows.
# We'll store it in a hidden state then route it to the other tab via a small helper.
train_preview_state = gr.State()
def _capture_preview(fig, wtab, summ, preview_df, csv_path):
return fig, wtab, summ, preview_df, csv_path, preview_df
run_btn.click(
fn=_capture_preview,
inputs=[loss_plot, w_compare, summary, gr.State(), dataset_file],
outputs=[loss_plot, w_compare, summary, gr.State(), dataset_file, train_preview_state],
)
with gr.Tab("Data Preview"):
gr.Markdown("### First 20 rows from the **training split**")
preview_df = gr.Dataframe(
label="Training rows (first 20)",
interactive=False,
wrap=True,
)
# Update preview automatically after training run
def _show_preview(df):
if df is None:
return pd.DataFrame(columns=[f"x{i}" for i in range(10)] + ["y", "split"])
return df
demo.load(fn=_show_preview, inputs=[train_preview_state], outputs=[preview_df])
# Also allow a manual refresh button (handy on Spaces)
refresh = gr.Button("Refresh preview")
refresh.click(fn=_show_preview, inputs=[train_preview_state], outputs=[preview_df])
gr.Markdown(
"""
**Notes**
- Dataset is regenerated each run (based on `seed`).
- Train/val split is 80/20 and uses `DataLoader`.
- Model: `nn.Linear(10,1)`, Loss: `nn.MSELoss()`, Optimizer: `torch.optim.SGD(lr=...)`.
"""
)
return demo
if __name__ == "__main__":
demo = build_ui()
demo.queue()
demo.launch()