import streamlit as st
import networkx as nx
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import io
from sklearn.datasets import make_blobs, make_circles
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from mlxtend.plotting import plot_decision_regions
import keras
from keras.optimizers import SGD
from keras.models import Sequential
from keras.layers import Input, Dense
from keras.losses import BinaryCrossentropy
from keras.regularizers import l2, l1
st.set_page_config(layout='wide')
# Session state for tracking training process
for key, value in {
"training": False,
"num_hidden_layers": 0,
"hidden_layer_neurons": [],
"prev_params": {},
}.items():
if key not in st.session_state:
st.session_state[key] = value
def reset_session():
st.session_state.clear()
st.title("Neural Network Playground")
# Sidebar for paramters
st.sidebar.title("Configure & Train Model")
problem_type = st.sidebar.selectbox("Problem Type", ["Classification",]) #"Regression"])
dataset_type = None
if problem_type == "Classification":
dataset_type = st.sidebar.selectbox("Select Dataset Type", ["Circle", "Gaussian", "Exclusive OR"])
# else:
# dataset_type = st.sidebar.selectbox("Select Dataset Type", ["Plane", "Gaussian Plane"])
col1, col2 = st.sidebar.columns(2)
with col1:
learning_rate = st.selectbox("Learning Rate", [0.00001,0.0001,0.001,0.01,0.03,0.1,0.3,1,3,10])
with col2:
activation_function = st.selectbox("Activation Function", ["ReLU", "Sigmoid", "Tanh"])
col1, col2 = st.sidebar.columns(2)
with col1:
regularization_type = st.selectbox("Regularization", ["None", "L1", "L2"])
with col2:
regularization_rate = st.selectbox("Regularization Rate", [0.0,0.001, 0.003, 0.01, 0.03, 0.1, 0.3, 1, 3, 10], disabled=(regularization_type == "None"))
train_to_test_ratio = st.sidebar.slider("Train-to-Test Ratio (%)", 10, 90, 20, 10) / 100
noise_level_slider = st.sidebar.slider("Noise Level", 0, 50, step=5)
batch_size = st.sidebar.slider("Batch Size", 1, 30, 10)
if st.sidebar.button("đ Reset Session"):
reset_session()
st.rerun()
# min noise
min_noise = 0.09
# Scaling the noise level to range [0.02, 0.2]
noise_level = min_noise + (noise_level_slider / 50) * (0.2 - min_noise)
# Store current parameter values in a dictionary
current_params = {
"dataset_type": dataset_type,
"learning_rate": learning_rate,
"regularization_type": regularization_type,
"regularization_rate": regularization_rate,
"activation_function": activation_function,
"train_to_test_ratio": train_to_test_ratio,
"batch_size": batch_size,
"noise_level": noise_level
}
gaussian_noise = 2.0 + ((noise_level_slider - 1) / 50) ** 2 * (10)
def make_xor(n_samples=250, noise=0):
# Base spread ensures some separation even when noise = 0
base_spread = 2.0
min_offset = 0.1 # Prevents tight clustering at corners
# Generate XOR quadrants
X1 = np.random.uniform(-base_spread, -min_offset, (n_samples, 2)) # Bottom-left
X2 = np.random.uniform(min_offset, base_spread, (n_samples, 2)) # Top-right
X3_x = np.random.uniform(-base_spread, -min_offset, (n_samples, 1)) # Top-left (x)
X3_y = np.random.uniform(min_offset, base_spread, (n_samples, 1)) # Top-left (y)
X3 = np.hstack([X3_x, X3_y])
X4_x = np.random.uniform(min_offset, base_spread, (n_samples, 1)) # Bottom-right (x)
X4_y = np.random.uniform(-base_spread, -min_offset, (n_samples, 1)) # Bottom-right (y)
X4 = np.hstack([X4_x, X4_y])
X = np.vstack([X1, X2, X3, X4])
# Apply smooth noise scaling
if noise > 0:
noise_scale = 0.05 + (noise / 100) # Small increase for gradual effect
X += np.random.randn(*X.shape) * noise_scale
# Define XOR labels: (1 if x and y have same sign, else 0)
y = np.logical_xor(X[:, 0] > 0, X[:, 1] > 0).astype(int)
return X, y
# Total dataset size
total_samples = 800
# Calculate training set size
train_size = int(total_samples * train_to_test_ratio)
def get_dataset(dataset_type, total_samples, noise_level, gaussian_noise, noise_level_slider):
# Dataset generators
dataset_generators = {
"Gaussian": lambda: make_blobs(n_samples=total_samples, centers=2, n_features=2, cluster_std=gaussian_noise, random_state=45),
"Circle": lambda: make_circles(n_samples=total_samples, shuffle=True, noise=noise_level, factor=0.2),
"Exclusive OR": lambda: make_xor(n_samples=total_samples, noise=noise_level_slider),
#"Spiral": lambda: make_spiral(n_samples=total_samples, noise=noise_level_slider),
}
return dataset_generators.get(dataset_type, lambda: (None, None))()
# Fetch dataset
if problem_type == "Classification":
fv, cv = get_dataset(dataset_type, total_samples, noise_level, gaussian_noise, noise_level_slider)
# Functions for modifying hidden layers
def add_layer():
if st.session_state.num_hidden_layers < 6:
st.session_state.num_hidden_layers += 1
st.session_state.hidden_layer_neurons.append(1)
def remove_layer():
if st.session_state.num_hidden_layers > 0 and st.session_state.hidden_layer_neurons:
st.session_state.num_hidden_layers -= 1
st.session_state.hidden_layer_neurons.pop()
# Functions for modifying neurons in each layer
def increase_neurons(layer_idx):
if st.session_state.hidden_layer_neurons[layer_idx] < 8:
st.session_state.hidden_layer_neurons[layer_idx] += 1
def decrease_neurons(layer_idx):
if st.session_state.hidden_layer_neurons[layer_idx] > 1:
st.session_state.hidden_layer_neurons[layer_idx] -= 1
col1, col2, col3 = st.columns([2, 2, 2])
with col1:
st.subheader("Select Input Features")
# Compute new features
std = StandardScaler()
X = std.fit_transform(fv)
x1, x2 = X[:, 0], X[:, 1]
# Update feature selection
available_features = ["X1", "X2"]
st.markdown("""
""", unsafe_allow_html=True)
selected_features = [feature for feature in available_features if st.checkbox(feature, value = st.session_state.get(feature, feature in ["X1", "X2"]), key=feature)]
st.session_state.selected_features = selected_features
num_inputs = len(selected_features)
# Map feature names to actual values
feature_mapping = {
"X1": x1,
"X2": x2,
}
if problem_type == 'Classification':
# Ensure a balanced split (Stratified Sampling)
x_train, x_test, y_train, y_test = train_test_split(
fv, cv,
test_size=1-train_to_test_ratio,
stratify=cv,
)
else:
# Ensure a balanced split
x_train, x_test, y_train, y_test = train_test_split(
fv, cv,
test_size=1-train_to_test_ratio
)
with col2:
# Visualize dataset
st.subheader("Dataset Preview")
fig, ax = plt.subplots(figsize=(3, 3))
scatter = ax.scatter(x_train[:, 0], x_train[:, 1], c=y_train, cmap="coolwarm", edgecolors="k", alpha=0.7)
ax.set_xticks([])
ax.set_yticks([])
ax.set_facecolor("#f0f0f0")
st.pyplot(fig)
num_outputs = 1
with col3:
st.subheader("Hidden Layers")
col1, col2 = st.columns([1, 1])
with col1:
st.button("â Add Layer", on_click=add_layer)
with col2:
st.button("â Remove Layer", on_click=remove_layer)
st.write("**Adjust Neurons in Each Layer:**")
for i in range(st.session_state.num_hidden_layers):
col1, col2, col3 = st.columns([1, 2, 1])
with col1:
st.button("â", key=f"dec_neuron_{i}", on_click=decrease_neurons, args=(i,))
with col2:
st.markdown(f"**Layer {i+1}: {st.session_state.hidden_layer_neurons[i]} neurons**")
with col3:
st.button("â", key=f"inc_neuron_{i}", on_click=increase_neurons, args=(i,))
# Stack selected features for training
selected_data = np.column_stack([feature_mapping[feature] for feature in selected_features])
# Function to draw the neural network visually
def draw_nn(selected_features, hidden_layer_neurons, num_outputs):
G = nx.DiGraph()
# Define layers dynamically
input_layer = selected_features # Match node names with feature names
hidden_layers = []
if st.session_state.num_hidden_layers > 0:
hidden_layers = [[f"hl{i+1}_{j+1}" for j in range(hidden_layer_neurons[i])] for i in range(st.session_state.num_hidden_layers)]
output_layer = ["y1"] # Single output neuron
layers = [input_layer] + hidden_layers + [output_layer]
# Add nodes and assign colors
node_colors = {}
input_color = "lightgreen"
hidden_color = "lightblue"
output_color = "salmon"
# Add nodes
# for layer_idx, layer in enumerate(layers):
# for node in layer:
# G.add_node(node, layer=layer_idx, edgecolors='black')
for layer_idx, layer in enumerate(layers):
for node in layer:
G.add_node(node, layer=layer_idx, edgecolors='black')
if layer_idx == 0:
node_colors[node] = input_color # Input layer
elif layer_idx == len(layers) - 1:
node_colors[node] = output_color # Output layer
else:
node_colors[node] = hidden_color # Hidden layers
# Add edges (fully connected between layers)
for i in range(len(layers) - 1):
for node1 in layers[i]:
for node2 in layers[i + 1]:
G.add_edge(node1, node2)
# Graph Layout
pos = nx.multipartite_layout(G, subset_key="layer")
fig, ax = plt.subplots(figsize=(12, 4))
# Style updates for TensorFlow Playground look
fig.patch.set_alpha(0)
ax.set_facecolor("#252830") # Dark background
ax.patch.set_alpha(1)
# Get color list
color_list = [node_colors[node] for node in G.nodes]
nx.draw(G, pos, with_labels=True, node_color=color_list, edge_color="white", edgecolors = "black",
node_size=800, font_size=7.5, ax=ax, width=0.4, font_color="black", font_weight="bold")
return fig
def create_ann_model(input_dim, hidden_layers, neurons_per_layer):
model = Sequential()
model.add(Input(shape=(input_dim,))) # Input layer
reg = None
if regularization_type == "L1":
reg = l1(regularization_rate)
elif regularization_type == "L2":
reg = l2(regularization_rate)
# Add hidden layers
for neurons in neurons_per_layer:
model.add(Dense(neurons, activation=activation_function.lower(), kernel_regularizer=reg))
# Output layer
model.add(Dense(1, activation='sigmoid'))
# Compile the model with explicit learning rate
optimizer = SGD(learning_rate=learning_rate)
model.compile(
optimizer=optimizer,
loss=BinaryCrossentropy(),
metrics=['accuracy']
)
return model
def plot_decision_boundary(model, x_train, y_train):
plt.figure(figsize=(6, 4))
plot_decision_regions(x_train, y_train, clf=model, legend=2)
#plt.title('Decision Boundary')
return plt
class LossPlotCallback(keras.callbacks.Callback):
def __init__(self, X, y, display_epochs=10):
super().__init__()
self.loss_df = pd.DataFrame(columns=["Epoch", "Train Loss", "Val Loss"])
#self.display_epochs = display_epochs
self.X = X
self.y = y
self.plot_placeholder = st.empty() # SINGLE container to update dynamically
def on_epoch_end(self, epoch, logs=None):
# Append new train and validation loss values
new_row = pd.DataFrame({
"Epoch": [epoch + 1],
"Train Loss": [logs['loss']],
"Val Loss": [logs['val_loss']]
})
self.loss_df = pd.concat([self.loss_df, new_row], ignore_index=True)
with self.plot_placeholder.container():
col1, col2 = st.columns([1, 1])
# Left Column: Decision Surface
with col1:
st.write("### Decision Boundary")
fig1 = plot_decision_boundary(ann_model, selected_data, cv)
st.pyplot(fig1, clear_figure=True)
# Right Column: Loss Plot
with col2:
st.write("### Training vs Validation Loss")
fig2, ax = plt.subplots(figsize=(6, 4), dpi=100)
ax.plot(self.loss_df["Epoch"], self.loss_df["Train Loss"], marker='o', markersize=1, linestyle='-', color='b', label="Train Loss")
if "Val Loss" in self.loss_df.columns and self.loss_df["Val Loss"].notna().any():
ax.plot(self.loss_df["Epoch"], self.loss_df["Val Loss"], marker='s',markersize=1, linestyle='--', color='r', label="Val Loss")
ax.set_xlabel("Epochs", fontsize=12, fontweight='bold')
ax.set_ylabel("Loss", fontsize=12, fontweight='bold')
#ax.set_title("Training vs Validation Loss", fontsize=14, fontweight='bold')
ax.legend(fontsize=10)
ax.grid(True, linestyle='--', alpha=0.6)
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.set_xticks(range(1, len(self.loss_df) + 1),)
plt.xticks(rotation=45)
#ax.set_yticks(range(0, 1.0, 0.1))
st.pyplot(fig2, clear_figure=True)
if current_params != st.session_state.prev_params:
st.session_state.training = False # Stop training when a parameter changes
st.session_state.prev_params = current_params
# Start/Stop Buttons
col1, col2 = st.columns([1, 1])
with col1:
if st.button("âļī¸ Start Training"):
st.session_state.training = True
st.session_state.model_trained = False
with col2:
if st.button("âšī¸ Stop Training"):
st.session_state.training = False
# Render the neural network visualization
st.write("### Logical Structure of the Neural Network")
st.pyplot(draw_nn(selected_features, st.session_state.hidden_layer_neurons, num_outputs))
# Train Model if Start is clicked
if st.session_state.training:
# Train the model and track loss in a DataFrame
ann_model = create_ann_model(
len(selected_features),
st.session_state.num_hidden_layers,
st.session_state.hidden_layer_neurons
)
st.session_state.model_trained = True
loss_plot_callback = LossPlotCallback(X=selected_data, y=cv)
# Capture model summary
model_summary = io.StringIO()
ann_model.summary(print_fn=lambda x: model_summary.write(x + "\n"))
# Display ANN model summary in Streamlit
st.subheader("Artificial Neural Network Model Summary")
st.code(model_summary.getvalue(), language="plaintext")
history = ann_model.fit(
x_train, y_train,
epochs=999999,
validation_data= (x_test, y_test),
batch_size=batch_size,
callbacks=[loss_plot_callback],
)
st.markdown(
"""
""",
unsafe_allow_html=True)