Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from sklearn.datasets import make_moons, make_circles, make_classification | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.metrics import accuracy_score | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import pandas as pd | |
| import time | |
| import io | |
| st.set_page_config(page_title="ANN Visualizer", layout="wide") | |
| st.markdown(""" | |
| <style> | |
| .main { background-image: linear-gradient(to right, #f8f9fa, #e9ecef); padding: 20px; } | |
| .stButton>button { color: white; background: linear-gradient(90deg, #1f77b4, #ff7f0e); border: none; border-radius: 12px; padding: 10px 20px; font-weight: bold; } | |
| .stSlider>div>div>div { background: linear-gradient(to right, #4e54c8, #8f94fb); border-radius: 12px; } | |
| .css-1v0mbdj, .css-1dp5vir { border-radius: 10px; padding: 10px; background-color: #ffffffcc; } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| st.title("\U0001F9E0 Interactive ANN Visualizer") | |
| def sidebar_configuration(): | |
| st.sidebar.header("\U0001F527 Configure Your Model") | |
| dataset = st.sidebar.selectbox("Dataset", ["moons", "circles", "classification"]) | |
| n_samples = st.sidebar.slider("Data Points", 100, 20000, 500, step=100) | |
| noise = st.sidebar.slider("Noise Level", 0.0, 1.0, 0.2, step=0.05) | |
| n_hidden = st.sidebar.number_input("Number of Hidden Layers", 1, 10, 2) | |
| hidden_layers = [] | |
| activations = [] | |
| dropout_rates = [] | |
| regularizations = [] | |
| activation_options = ["ReLU", "Tanh", "Sigmoid"] | |
| reg_options = ["None", "L1", "L2", "L1_L2"] | |
| for i in range(n_hidden): | |
| st.sidebar.markdown(f"### Hidden Layer {i+1}") | |
| units = st.sidebar.number_input(f"Units (Layer {i+1})", 1, 512, 8, key=f"units_{i}") | |
| activation = st.sidebar.selectbox(f"Activation (Layer {i+1})", activation_options, key=f"act_{i}") | |
| dropout = st.sidebar.slider(f"Dropout Rate (Layer {i+1})", 0.0, 0.9, 0.0, 0.05, key=f"drop_{i}") | |
| reg_type = st.sidebar.selectbox(f"Regularization (Layer {i+1})", reg_options, key=f"reg_{i}") | |
| reg_strength = st.sidebar.number_input(f"Reg Strength (Layer {i+1})", 0.0001, 1.0, 0.001, format="%f", key=f"reg_strength_{i}") if reg_type != "None" else 0.0 | |
| hidden_layers.append(units) | |
| activations.append(activation) | |
| dropout_rates.append(dropout) | |
| regularizations.append((reg_type, reg_strength)) | |
| lr = st.sidebar.number_input("Learning Rate", 0.0001, 1.0, 0.01, format="%f") | |
| epochs = st.sidebar.slider("Epochs", 100, 5000, 500, step=100) | |
| early_stopping = st.sidebar.checkbox("Enable Early Stopping", value=True) | |
| patience = st.sidebar.slider("Early Stopping Patience", 1, 20, 5) if early_stopping else None | |
| min_delta = st.sidebar.number_input("Min Delta for Improvement", 0.0001, 0.1, 0.001, format="%f") if early_stopping else None | |
| return dataset, n_samples, noise, hidden_layers, activations, dropout_rates, regularizations, lr, epochs, early_stopping, patience, min_delta | |
| def generate_dataset(dataset, n_samples, noise): | |
| if dataset == "moons": | |
| X, y = make_moons(n_samples=n_samples, noise=noise, random_state=42) | |
| elif dataset == "circles": | |
| X, y = make_circles(n_samples=n_samples, noise=noise, factor=0.5, random_state=42) | |
| else: | |
| X, y = make_classification(n_samples=n_samples, n_features=2, n_informative=2, | |
| n_redundant=0, n_clusters_per_class=1, random_state=42) | |
| return X, y | |
| activation_map = { | |
| "ReLU": nn.ReLU, | |
| "Tanh": nn.Tanh, | |
| "Sigmoid": nn.Sigmoid | |
| } | |
| class CustomLayer(nn.Module): | |
| def __init__(self, in_dim, out_dim, activation, dropout, reg_type, reg_strength): | |
| super().__init__() | |
| self.linear = nn.Linear(in_dim, out_dim) | |
| self.activation = activation_map[activation]() | |
| self.dropout = nn.Dropout(dropout) | |
| self.reg_type = reg_type | |
| self.reg_strength = reg_strength | |
| def forward(self, x): | |
| x = self.linear(x) | |
| x = self.activation(x) | |
| x = self.dropout(x) | |
| return x | |
| def reg_loss(self): | |
| if self.reg_type == "None": | |
| return 0 | |
| elif self.reg_type == "L1": | |
| return self.reg_strength * torch.sum(torch.abs(self.linear.weight)) | |
| elif self.reg_type == "L2": | |
| return self.reg_strength * torch.sum(self.linear.weight ** 2) | |
| elif self.reg_type == "L1_L2": | |
| return self.reg_strength * (torch.sum(torch.abs(self.linear.weight)) + torch.sum(self.linear.weight ** 2)) | |
| class DynamicANN(nn.Module): | |
| def __init__(self, input_dim, hidden_layers, activations, dropout_rates, regularizations, output_dim): | |
| super().__init__() | |
| self.hidden_layers = nn.ModuleList() | |
| self.reg_layers = [] | |
| prev_dim = input_dim | |
| for units, activation, dropout, (reg_type, reg_strength) in zip(hidden_layers, activations, dropout_rates, regularizations): | |
| layer = CustomLayer(prev_dim, units, activation, dropout, reg_type, reg_strength) | |
| self.hidden_layers.append(layer) | |
| self.reg_layers.append(layer) | |
| prev_dim = units | |
| self.output_layer = nn.Linear(prev_dim, output_dim) | |
| def forward(self, x): | |
| for layer in self.hidden_layers: | |
| x = layer(x) | |
| return self.output_layer(x) | |
| def total_reg_loss(self): | |
| return sum(layer.reg_loss() for layer in self.reg_layers) | |
| def train_model(model, criterion, optimizer, X_train, y_train, X_test, y_test, epochs, early_stopping, patience, min_delta): | |
| train_losses = [] | |
| test_losses = [] | |
| best_loss = float('inf') | |
| patience_counter = 0 | |
| for epoch in range(1, epochs + 1): | |
| model.train() | |
| optimizer.zero_grad() | |
| outputs = model(X_train) | |
| loss = criterion(outputs, y_train) + model.total_reg_loss() | |
| loss.backward() | |
| optimizer.step() | |
| train_losses.append(loss.item()) | |
| model.eval() | |
| with torch.no_grad(): | |
| test_outputs = model(X_test) | |
| test_loss = criterion(test_outputs, y_test) + model.total_reg_loss() | |
| test_losses.append(test_loss.item()) | |
| if early_stopping: | |
| if test_loss.item() < best_loss - min_delta: | |
| best_loss = test_loss.item() | |
| patience_counter = 0 | |
| else: | |
| patience_counter += 1 | |
| if patience_counter >= patience: | |
| st.warning(f"Early Stopping at epoch {epoch}") | |
| break | |
| yield epoch, train_losses, test_losses | |
| def plot_decision_boundary(model, X, y, grid_tensor, xx, yy, title): | |
| model.eval() | |
| with torch.no_grad(): | |
| output = model(grid_tensor) | |
| probs = torch.softmax(output, dim=1)[:, 1].cpu().numpy() | |
| probs = probs.reshape(xx.shape) | |
| fig, ax = plt.subplots(figsize=(6,6)) | |
| ax.contourf(xx, yy, probs, levels=50, cmap='Spectral', alpha=0.8) | |
| ax.scatter(X[:,0], X[:,1], c=y, edgecolor='k', s=15, cmap='Spectral') | |
| ax.set_title(title, fontsize=12, fontweight='bold') | |
| ax.set_xticks([]) | |
| ax.set_yticks([]) | |
| st.pyplot(fig) | |
| return fig | |
| dataset, n_samples, noise, hidden_layers, activations, dropout_rates, regularizations, lr, epochs, early_stopping, patience, min_delta = sidebar_configuration() | |
| start = st.button("\U0001F680 Start Training") | |
| if start: | |
| X, y = generate_dataset(dataset, n_samples, noise) | |
| scaler = StandardScaler() | |
| X = scaler.fit_transform(X) | |
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42) | |
| X_train_tensor = torch.tensor(X_train, dtype=torch.float32) | |
| y_train_tensor = torch.tensor(y_train, dtype=torch.long) | |
| X_test_tensor = torch.tensor(X_test, dtype=torch.float32) | |
| y_test_tensor = torch.tensor(y_test, dtype=torch.long) | |
| model = DynamicANN(X.shape[1], hidden_layers, activations, dropout_rates, regularizations, 2) | |
| optimizer = optim.Adam(model.parameters(), lr=lr) | |
| criterion = nn.CrossEntropyLoss() | |
| x_min, x_max = X[:,0].min() - 0.5, X[:,0].max() + 0.5 | |
| y_min, y_max = X[:,1].min() - 0.5, X[:,1].max() + 0.5 | |
| xx, yy = np.meshgrid(np.linspace(x_min, x_max, 400), np.linspace(y_min, y_max, 400)) | |
| grid_tensor = torch.tensor(np.c_[xx.ravel(), yy.ravel()], dtype=torch.float32) | |
| st.subheader("\U0001F300 Initial Decision Surface") | |
| plot_decision_boundary(model, X, y, grid_tensor, xx, yy, "Initial Random Decision Surface") | |
| st.subheader("\U0001F3CB Training Progress") | |
| progress_bar = st.progress(0) | |
| train_losses = [] | |
| test_losses = [] | |
| for epoch, train_losses, test_losses in train_model(model, criterion, optimizer, X_train_tensor, y_train_tensor, X_test_tensor, y_test_tensor, epochs, early_stopping, patience, min_delta): | |
| if epoch % (epochs//10) == 0 or epoch == epochs: | |
| st.markdown(f"### Epoch {epoch}") | |
| plot_decision_boundary(model, X, y, grid_tensor, xx, yy, f"Decision Surface at Epoch {epoch}") | |
| progress_bar.progress(epoch/epochs) | |
| st.success("Training Complete!") | |
| st.subheader("\U0001F4C9 Final Loss Curve") | |
| fig_loss, ax_loss = plt.subplots() | |
| ax_loss.plot(train_losses, label='Train Loss', color='blue') | |
| ax_loss.plot(test_losses, label='Test Loss', color='orange') | |
| ax_loss.legend() | |
| ax_loss.grid(True) | |
| st.pyplot(fig_loss) | |
| buf_loss = io.BytesIO() | |
| fig_loss.savefig(buf_loss, format="png") | |
| st.download_button("Download Loss Curve", buf_loss.getvalue(), file_name="loss_curve.png", mime="image/png") | |
| st.subheader("\U0001F5FA Final Decision Boundary") | |
| fig_final = plot_decision_boundary(model, X, y, grid_tensor, xx, yy, "Final Decision Surface") | |
| buf_final = io.BytesIO() | |
| fig_final.savefig(buf_final, format="png") | |
| st.download_button("Download Final Decision Surface", buf_final.getvalue(), file_name="decision_surface.png", mime="image/png") | |
| model.eval() | |
| with torch.no_grad(): | |
| train_preds = model(X_train_tensor).argmax(dim=1).cpu().numpy() | |
| test_preds = model(X_test_tensor).argmax(dim=1).cpu().numpy() | |
| train_acc = accuracy_score(y_train, train_preds) | |
| test_acc = accuracy_score(y_test, test_preds) | |
| st.metric("Train Accuracy", f"{train_acc:.2%}") | |
| st.metric("Test Accuracy", f"{test_acc:.2%}") | |
| st.info("\U0001F501 Adjust Sidebar settings to retrain!") | |