|
|
|
|
|
import streamlit as st |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.optim as optim |
|
|
import numpy as np |
|
|
import matplotlib.pyplot as plt |
|
|
from sklearn.datasets import make_moons, make_circles, make_classification |
|
|
from sklearn.model_selection import train_test_split |
|
|
from sklearn.preprocessing import StandardScaler |
|
|
from sklearn.metrics import accuracy_score |
|
|
import io |
|
|
|
|
|
|
|
|
st.set_page_config(page_title="🧠 ANN Visualizer", layout="wide") |
|
|
st.markdown("<style>.main { background: linear-gradient(to right, #ece9e6, #ffffff); }</style>", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
st.sidebar.title("⚙️ Model Configuration") |
|
|
|
|
|
dataset = st.sidebar.selectbox("Select Dataset", ["moons", "circles", "classification"]) |
|
|
n_samples = st.sidebar.slider("Number of Samples", 100, 5000, 500, step=100) |
|
|
noise = st.sidebar.slider("Noise Level", 0.0, 1.0, 0.2, step=0.05) |
|
|
|
|
|
st.sidebar.subheader("Network Architecture") |
|
|
n_layers = st.sidebar.number_input("Number of Hidden Layers", 1, 10, 2) |
|
|
|
|
|
layer_config = [] |
|
|
activation_options = ["ReLU", "Tanh", "Sigmoid"] |
|
|
reg_options = ["None", "L1", "L2", "L1_L2"] |
|
|
|
|
|
for i in range(n_layers): |
|
|
with st.sidebar.expander(f"Layer {i+1}"): |
|
|
units = st.number_input(f"Units", 1, 512, 8, key=f"units_{i}") |
|
|
activation = st.selectbox("Activation", activation_options, key=f"act_{i}") |
|
|
dropout = st.slider("Dropout", 0.0, 0.9, 0.0, step=0.05, key=f"drop_{i}") |
|
|
reg = st.selectbox("Regularization", reg_options, key=f"reg_{i}") |
|
|
reg_strength = st.number_input("Reg Strength", 0.0001, 1.0, 0.001, key=f"reg_strength_{i}") if reg != "None" else 0.0 |
|
|
layer_config.append((units, activation, dropout, reg, reg_strength)) |
|
|
|
|
|
st.sidebar.subheader("Training Settings") |
|
|
learning_rate = st.sidebar.number_input("Learning Rate", 0.0001, 1.0, 0.01) |
|
|
epochs = st.sidebar.slider("Epochs", 100, 5000, 500) |
|
|
early_stop = st.sidebar.checkbox("Early Stopping", True) |
|
|
patience = st.sidebar.slider("Patience", 1, 20, 5) if early_stop else None |
|
|
min_delta = st.sidebar.number_input("Min Improvement (Delta)", 0.0001, 0.1, 0.001) if early_stop else None |
|
|
|
|
|
|
|
|
def load_data(name, n_samples, noise): |
|
|
if name == "moons": |
|
|
X, y = make_moons(n_samples=n_samples, noise=noise, random_state=42) |
|
|
elif name == "circles": |
|
|
X, y = make_circles(n_samples=n_samples, noise=noise, factor=0.5, random_state=42) |
|
|
else: |
|
|
X, y = make_classification(n_samples=n_samples, n_features=2, n_informative=2, n_redundant=0, n_clusters_per_class=1, random_state=42) |
|
|
return X, y |
|
|
|
|
|
|
|
|
activation_map = { |
|
|
"ReLU": nn.ReLU, |
|
|
"Tanh": nn.Tanh, |
|
|
"Sigmoid": nn.Sigmoid |
|
|
} |
|
|
|
|
|
class CustomLayer(nn.Module): |
|
|
def __init__(self, in_dim, out_dim, activation, dropout, reg, reg_strength): |
|
|
super().__init__() |
|
|
self.linear = nn.Linear(in_dim, out_dim) |
|
|
self.activation = activation_map[activation]() |
|
|
self.dropout = nn.Dropout(dropout) |
|
|
self.reg = reg |
|
|
self.reg_strength = reg_strength |
|
|
|
|
|
def forward(self, x): |
|
|
x = self.linear(x) |
|
|
x = self.activation(x) |
|
|
x = self.dropout(x) |
|
|
return x |
|
|
|
|
|
def reg_loss(self): |
|
|
if self.reg == "None": |
|
|
return 0 |
|
|
elif self.reg == "L1": |
|
|
return self.reg_strength * torch.sum(torch.abs(self.linear.weight)) |
|
|
elif self.reg == "L2": |
|
|
return self.reg_strength * torch.sum(self.linear.weight.pow(2)) |
|
|
elif self.reg == "L1_L2": |
|
|
return self.reg_strength * (torch.sum(torch.abs(self.linear.weight)) + torch.sum(self.linear.weight.pow(2))) |
|
|
|
|
|
class ANN(nn.Module): |
|
|
def __init__(self, input_dim, config, output_dim=2): |
|
|
super().__init__() |
|
|
self.layers = nn.ModuleList() |
|
|
self.reg_layers = [] |
|
|
prev_dim = input_dim |
|
|
for units, act, drop, reg, reg_strength in config: |
|
|
layer = CustomLayer(prev_dim, units, act, drop, reg, reg_strength) |
|
|
self.layers.append(layer) |
|
|
self.reg_layers.append(layer) |
|
|
prev_dim = units |
|
|
self.output = nn.Linear(prev_dim, output_dim) |
|
|
|
|
|
def forward(self, x): |
|
|
for layer in self.layers: |
|
|
x = layer(x) |
|
|
return self.output(x) |
|
|
|
|
|
def total_reg_loss(self): |
|
|
return sum(layer.reg_loss() for layer in self.reg_layers) |
|
|
|
|
|
|
|
|
def plot_boundary(model, X, y, title): |
|
|
x_min, x_max = X[:,0].min()-0.5, X[:,0].max()+0.5 |
|
|
y_min, y_max = X[:,1].min()-0.5, X[:,1].max()+0.5 |
|
|
xx, yy = np.meshgrid(np.linspace(x_min, x_max, 400), np.linspace(y_min, y_max, 400)) |
|
|
grid = np.c_[xx.ravel(), yy.ravel()] |
|
|
model.eval() |
|
|
with torch.no_grad(): |
|
|
probs = torch.softmax(model(torch.tensor(grid, dtype=torch.float32)), dim=1)[:,1] |
|
|
Z = probs.reshape(xx.shape).cpu().numpy() |
|
|
|
|
|
fig, ax = plt.subplots() |
|
|
ax.contourf(xx, yy, Z, levels=50, cmap="Spectral", alpha=0.8) |
|
|
ax.scatter(X[:,0], X[:,1], c=y, edgecolors='k', cmap="Spectral", s=20) |
|
|
ax.set_title(title) |
|
|
ax.set_xticks([]) |
|
|
ax.set_yticks([]) |
|
|
return fig |
|
|
|
|
|
|
|
|
def train(model, X_train, y_train, X_test, y_test, epochs, lr, early_stop, patience, min_delta): |
|
|
optimizer = optim.Adam(model.parameters(), lr=lr) |
|
|
criterion = nn.CrossEntropyLoss() |
|
|
train_losses, test_losses = [], [] |
|
|
best_loss = np.inf |
|
|
stop_counter = 0 |
|
|
|
|
|
for epoch in range(epochs): |
|
|
model.train() |
|
|
optimizer.zero_grad() |
|
|
y_pred = model(X_train) |
|
|
loss = criterion(y_pred, y_train) + model.total_reg_loss() |
|
|
loss.backward() |
|
|
optimizer.step() |
|
|
train_losses.append(loss.item()) |
|
|
|
|
|
model.eval() |
|
|
with torch.no_grad(): |
|
|
test_pred = model(X_test) |
|
|
test_loss = criterion(test_pred, y_test) + model.total_reg_loss() |
|
|
test_losses.append(test_loss.item()) |
|
|
|
|
|
|
|
|
if early_stop: |
|
|
if test_losses[-1] < best_loss - min_delta: |
|
|
best_loss = test_losses[-1] |
|
|
stop_counter = 0 |
|
|
else: |
|
|
stop_counter += 1 |
|
|
if stop_counter >= patience: |
|
|
st.warning(f"⏹️ Early Stopping at Epoch {epoch+1}") |
|
|
break |
|
|
|
|
|
yield epoch+1, train_losses, test_losses |
|
|
|
|
|
|
|
|
st.title("🧠 Interactive ANN Visualizer") |
|
|
|
|
|
if st.button("🚀 Start Training"): |
|
|
X, y = load_data(dataset, n_samples, noise) |
|
|
X = StandardScaler().fit_transform(X) |
|
|
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42) |
|
|
|
|
|
X_train_tensor = torch.tensor(X_train, dtype=torch.float32) |
|
|
y_train_tensor = torch.tensor(y_train, dtype=torch.long) |
|
|
X_test_tensor = torch.tensor(X_test, dtype=torch.float32) |
|
|
y_test_tensor = torch.tensor(y_test, dtype=torch.long) |
|
|
|
|
|
model = ANN(X.shape[1], layer_config) |
|
|
|
|
|
tabs = st.tabs(["🌐 Decision Boundary", "📈 Training Curve", "📊 Results"]) |
|
|
|
|
|
with tabs[0]: |
|
|
st.subheader("Initial Random Surface") |
|
|
fig_init = plot_boundary(model, X, y, "Initial Decision Surface") |
|
|
st.pyplot(fig_init) |
|
|
|
|
|
train_losses, test_losses = [], [] |
|
|
progress = st.progress(0) |
|
|
|
|
|
for epoch, train_losses, test_losses in train(model, X_train_tensor, y_train_tensor, X_test_tensor, y_test_tensor, epochs, learning_rate, early_stop, patience, min_delta): |
|
|
if epoch % (epochs//5) == 0: |
|
|
with tabs[0]: |
|
|
st.subheader(f"Epoch {epoch} Decision Surface") |
|
|
fig = plot_boundary(model, X, y, f"Decision Surface at Epoch {epoch}") |
|
|
st.pyplot(fig) |
|
|
progress.progress(epoch/epochs) |
|
|
|
|
|
with tabs[1]: |
|
|
st.subheader("Loss Curves") |
|
|
fig_loss, ax = plt.subplots() |
|
|
ax.plot(train_losses, label="Train Loss") |
|
|
ax.plot(test_losses, label="Test Loss") |
|
|
ax.legend() |
|
|
st.pyplot(fig_loss) |
|
|
|
|
|
with tabs[2]: |
|
|
st.subheader("Final Metrics") |
|
|
model.eval() |
|
|
train_preds = model(X_train_tensor).argmax(dim=1) |
|
|
test_preds = model(X_test_tensor).argmax(dim=1) |
|
|
|
|
|
st.metric("Train Accuracy", f"{accuracy_score(y_train, train_preds):.2%}") |
|
|
st.metric("Test Accuracy", f"{accuracy_score(y_test, test_preds):.2%}") |
|
|
|
|
|
fig_final = plot_boundary(model, X, y, "Final Decision Surface") |
|
|
st.pyplot(fig_final) |
|
|
|
|
|
buf = io.BytesIO() |
|
|
fig_final.savefig(buf, format="png") |
|
|
st.download_button("⬇️ Download Final Boundary", buf.getvalue(), "final_boundary.png", "image/png") |