Spaces:
Sleeping
Sleeping
| # Set page config as the very first command | |
| import streamlit as st | |
| st.set_page_config(layout="wide") | |
| # Debug: Check for any unexpected Streamlit commands or state before this point | |
| st.write("Starting app with page config set as first command.") | |
| # Imports (after set_page_config) | |
| import networkx as nx | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from sklearn.datasets import make_blobs, make_circles, make_moons | |
| from sklearn.preprocessing import StandardScaler | |
| from mlxtend.plotting import plot_decision_regions | |
| import tensorflow as tf | |
| from keras.models import Sequential | |
| from keras.layers import Input, Dense | |
| from keras.optimizers import SGD | |
| from keras.losses import MeanSquaredError, BinaryCrossentropy | |
| from keras.regularizers import l2, l1 | |
| from keras.callbacks import Callback | |
| # Check TensorFlow and Keras versions with fallback | |
| try: | |
| tf_version = tf.__version__ | |
| # Try multiple ways to get Keras version, accounting for TensorFlow integration | |
| keras_version = None | |
| if hasattr(tf.keras, '__version__'): | |
| keras_version = tf.keras.__version__ | |
| elif hasattr(tf, 'keras') and hasattr(tf.keras, 'version'): | |
| keras_version = tf.keras.version.__version__ | |
| else: | |
| keras_version = "Keras version not available (bundled with TensorFlow)" | |
| st.write(f"TensorFlow version: {tf_version}") | |
| st.write(f"Keras version: {keras_version}") | |
| except AttributeError as e: | |
| st.error(f"Error checking versions: {e}") | |
| st.write("Falling back to default versions: TensorFlow ~2.15, Keras ~2.15") | |
| # Set TensorFlow Playground CSS | |
| st.markdown(""" | |
| <style> | |
| .stApp { | |
| background-color: #252830; | |
| color: white; | |
| font-family: Arial, sans-serif; | |
| } | |
| h1, h2, h3 { | |
| color: white; | |
| font-weight: bold; | |
| margin: 0; | |
| padding: 5px 0; | |
| } | |
| .stButton>button { | |
| background-color: #555; | |
| color: white; | |
| border: 2px solid #777; | |
| border-radius: 5px; | |
| padding: 5px 10px; | |
| font-size: 14px; | |
| font-weight: bold; | |
| } | |
| .stButton>button:hover { | |
| background-color: #777; | |
| border-color: #999; | |
| } | |
| .stSelectbox, .stSlider { | |
| background-color: #333; | |
| color: white; | |
| border: 2px solid #777; | |
| border-radius: 5px; | |
| padding: 5px; | |
| } | |
| .stCheckbox label { | |
| color: white; | |
| font-size: 14px; | |
| font-weight: bold; | |
| } | |
| .control-bar { | |
| background-color: #1e2126; | |
| padding: 10px; | |
| border: 2px solid #333; | |
| border-radius: 5px; | |
| margin-bottom: 10px; | |
| } | |
| .panel { | |
| background-color: #2e3238; | |
| padding: 10px; | |
| border: 2px solid #777; | |
| border-radius: 5px; | |
| margin: 10px 0; | |
| } | |
| .stSelectbox label, .stSlider label { | |
| color: white; | |
| font-size: 12px; | |
| font-weight: bold; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Session state initialization | |
| if "training" not in st.session_state: | |
| st.session_state.training = False | |
| if "num_hidden_layers" not in st.session_state: | |
| st.session_state.num_hidden_layers = 2 | |
| if "hidden_layer_neurons" not in st.session_state: | |
| st.session_state.hidden_layer_neurons = [4, 2] | |
| if "prev_params" not in st.session_state: | |
| st.session_state.prev_params = {} | |
| def reset_session(): | |
| st.session_state.clear() | |
| st.session_state.num_hidden_layers = 2 | |
| st.session_state.hidden_layer_neurons = [4, 2] | |
| # Two-row top control bar | |
| with st.container(): | |
| st.markdown('<div class="control-bar">', unsafe_allow_html=True) | |
| # Row 1 | |
| col1, col2, col3, col4, col5 = st.columns(5) | |
| with col1: | |
| problem_type = st.selectbox("Problem Type", ["Classification", "Regression"]) | |
| with col2: | |
| dataset_options = {"Classification": ["Blobs", "Circles", "Spirals", "XOR"], "Regression": ["Sine Wave"]} | |
| dataset_type = st.selectbox("Dataset", dataset_options[problem_type]) | |
| with col3: | |
| learning_rate = st.selectbox("Learning Rate", [0.0001, 0.001, 0.03, 0.1, 0.3, 1], index=2) | |
| with col4: | |
| activation = st.selectbox("Activation", ["ReLU", "Sigmoid", "Tanh"], index=2) | |
| with col5: | |
| batch_size = st.slider("Batch Size", 1, 10, 5) # Reduced max batch size for Spaces | |
| # Row 2 | |
| col6, col7, col8, col9, col10 = st.columns(5) | |
| with col6: | |
| noise_level = st.slider("Noise", 0, 50, 0, step=5) | |
| with col7: | |
| reg_type = st.selectbox("Regularization", ["None", "L1", "L2"], index=0) | |
| with col8: | |
| reg_rate = st.selectbox("Reg Rate", [0.0, 0.001, 0.01, 0.1, 1], index=0) | |
| with col9: | |
| train_ratio = st.slider("Train %", 10, 90, 50, 10) / 100 | |
| with col10: | |
| st.button("Reset", key="reset_global", on_click=reset_session) | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| # Dataset generation (reduced sample size for performance) | |
| def generate_xor(n_samples=400): # Reduced from 800 for performance | |
| X = np.random.rand(n_samples, 2) * 2 - 1 | |
| y = np.logical_xor(X[:, 0] > 0, X[:, 1] > 0).astype(int) | |
| return X, y | |
| def generate_sine_wave(noise, n_samples=400): # Reordered: non-default before default | |
| X = np.linspace(-3, 3, n_samples).reshape(-1, 1) | |
| y = np.sin(X) + np.random.normal(0, noise / 100, X.shape) | |
| return np.hstack([X, X**2]), y.ravel() | |
| if problem_type == "Classification": | |
| if dataset_type == "Blobs": | |
| fv, cv = make_blobs(n_samples=400, centers=2, n_features=2, cluster_std=1.5 + noise_level / 50, random_state=42) | |
| elif dataset_type == "Circles": | |
| fv, cv = make_circles(n_samples=400, noise=noise_level / 250, factor=0.2) | |
| elif dataset_type == "Spirals": | |
| fv, cv = make_moons(n_samples=400, noise=noise_level / 250) | |
| elif dataset_type == "XOR": | |
| fv, cv = generate_xor(400) | |
| else: | |
| fv, cv = generate_sine_wave(noise_level, 400) | |
| # Feature preprocessing | |
| std = StandardScaler() | |
| X = std.fit_transform(fv) | |
| x1, x2 = X[:, 0], X[:, 1] | |
| features = { | |
| "X1": x1, "X2": x2, "X1*X2": x1 * x2, "X1^2": x1**2, "X2^2": x2**2, | |
| "cos(X1)": np.cos(x1), "sin(X1)": np.sin(x1), "cos(X2)": np.cos(x2), "sin(X2)": np.sin(x2) | |
| } | |
| selected_features = [f for f in features.keys() if st.session_state.get(f, f in ["X1", "X2"])] | |
| selected_data = np.column_stack([features[f] for f in selected_features]) | |
| if problem_type == "Classification": | |
| cv = cv.astype(int) | |
| # Main layout | |
| col_left, col_center, col_right = st.columns([1, 2, 1]) | |
| # Left panel: Dataset with Seaborn (3x3 size) | |
| with col_left: | |
| st.markdown('<div class="panel">', unsafe_allow_html=True) | |
| st.subheader("Data") | |
| fig, ax = plt.subplots(figsize=(3, 3)) # Fixed size for consistency | |
| if problem_type == "Classification": | |
| sns.scatterplot(x=fv[:, 0], y=fv[:, 1], hue=cv, palette="coolwarm", edgecolor="k", alpha=0.7, ax=ax, legend=False) | |
| else: | |
| sns.scatterplot(x=fv[:, 0], y=cv, color="blue", edgecolor="k", alpha=0.7, ax=ax) | |
| ax.set_xticks([]) | |
| ax.set_yticks([]) | |
| ax.set_facecolor("#333") | |
| st.pyplot(fig) | |
| st.subheader("Features") | |
| for feature in features.keys(): | |
| st.checkbox(feature, value=feature in ["X1", "X2"], key=feature) | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| # Center panel: Horizontal Network Visualization | |
| with col_center: | |
| st.markdown('<div class="panel">', unsafe_allow_html=True) | |
| st.subheader("Network") | |
| def draw_nn(features, hidden_neurons): | |
| G = nx.DiGraph() | |
| input_layer = features | |
| hidden_layers = [[f"H{i+1}_{j+1}" for j in range(n)] for i, n in enumerate(hidden_neurons)] | |
| output_layer = ["Output"] | |
| all_layers = [input_layer] + hidden_layers + [output_layer] | |
| node_colors = {} | |
| for layer_idx, layer in enumerate(all_layers): | |
| for node in layer: | |
| G.add_node(node, layer=layer_idx) | |
| if layer_idx == 0: | |
| node_colors[node] = "#90EE90" # Green for input | |
| elif layer_idx == len(all_layers) - 1: | |
| node_colors[node] = "#FFA07A" # Orange for output | |
| else: | |
| node_colors[node] = "#87CEFA" # Blue for hidden | |
| for i in range(len(all_layers) - 1): | |
| for node1 in all_layers[i]: | |
| for node2 in all_layers[i + 1]: | |
| G.add_edge(node1, node2) | |
| pos = nx.multipartite_layout(G, subset_key="layer", align="vertical") | |
| pos_rotated = {node: (-y, x) for node, (x, y) in pos.items()} | |
| for node in pos_rotated: | |
| pos_rotated[node] = (pos_rotated[node][0] * 2, pos_rotated[node][1] * 2) | |
| fig, ax = plt.subplots(figsize=(8, 4)) | |
| ax.set_facecolor("#252830") | |
| nx.draw( | |
| G, pos_rotated, | |
| with_labels=True, | |
| node_color=[node_colors[node] for node in G.nodes()], | |
| edge_color="white", | |
| node_size=600, | |
| font_size=8, | |
| font_color="black", | |
| font_weight="bold", | |
| edgecolors="black", | |
| width=1.0, | |
| arrows=True, | |
| ax=ax | |
| ) | |
| plt.title("Neural Network Structure", color="white", fontsize=12, pad=10) | |
| return fig | |
| st.pyplot(draw_nn(selected_features, st.session_state.hidden_layer_neurons)) | |
| def add_layer(): | |
| if st.session_state.num_hidden_layers < 6: | |
| st.session_state.num_hidden_layers += 1 | |
| st.session_state.hidden_layer_neurons.append(1) | |
| def remove_layer(): | |
| if st.session_state.num_hidden_layers > 0: | |
| st.session_state.num_hidden_layers -= 1 | |
| st.session_state.hidden_layer_neurons.pop() | |
| def increase_neurons(i): | |
| if st.session_state.hidden_layer_neurons[i] < 8: | |
| st.session_state.hidden_layer_neurons[i] += 1 | |
| def decrease_neurons(i): | |
| if st.session_state.hidden_layer_neurons[i] > 1: | |
| st.session_state.hidden_layer_neurons[i] -= 1 | |
| for i in range(st.session_state.num_hidden_layers): | |
| col1, col2, col3 = st.columns([1, 2, 1]) | |
| with col1: | |
| st.button("−", key=f"dec_{i}", on_click=decrease_neurons, args=(i,)) | |
| with col2: | |
| st.write(f"Layer {i+1}: {st.session_state.hidden_layer_neurons[i]} neurons") | |
| with col3: | |
| st.button("+", key=f"inc_{i}", on_click=increase_neurons, args=(i,)) | |
| col_btn1, col_btn2 = st.columns(2) | |
| with col_btn1: | |
| st.button("Add Layer", on_click=add_layer) | |
| with col_btn2: | |
| st.button("Remove Layer", on_click=remove_layer) | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| # Right panel: Output and Training (decision region and loss plots stacked vertically, same size as dataset scatterplot) | |
| with col_right: | |
| st.markdown('<div class="panel">', unsafe_allow_html=True) | |
| st.subheader("Output") | |
| col_start, col_stop = st.columns(2) | |
| with col_start: | |
| if st.button("▶️ Play"): | |
| st.session_state.training = True | |
| with col_stop: | |
| if st.button("⏹️ Stop"): | |
| st.session_state.training = False | |
| def create_model(input_dim, neurons): | |
| model = Sequential() | |
| model.add(Input(shape=(input_dim,))) | |
| reg = l1(reg_rate) if reg_type == "L1" else l2(reg_rate) if reg_type == "L2" else None | |
| for n in neurons: | |
| model.add(Dense(n, activation=activation.lower(), kernel_regularizer=reg)) | |
| output_activation = "sigmoid" if problem_type == "Classification" else "linear" | |
| loss = BinaryCrossentropy() if problem_type == "Classification" else MeanSquaredError() | |
| model.add(Dense(1, activation=output_activation)) | |
| model.compile(optimizer=SGD(learning_rate=learning_rate), loss=loss, metrics=["accuracy" if problem_type == "Classification" else "mae"]) | |
| return model | |
| class OutputCallback(tf.keras.callbacks.Callback): | |
| def __init__(self, X, y): | |
| super().__init__() | |
| self.X, self.y = X, y | |
| self.losses = {"Epoch": [], "Train Loss": [], "Val Loss": []} | |
| self.placeholder = st.empty() | |
| self.current_epoch = 0 # Track current epoch | |
| def on_train_begin(self, logs=None): | |
| self.model = self.model # Use the model passed implicitly by Keras | |
| self.current_epoch = 0 | |
| def on_epoch_end(self, epoch, logs=None): | |
| try: | |
| self.current_epoch = epoch + 1 # Update current epoch | |
| self.losses["Epoch"].append(self.current_epoch) | |
| self.losses["Train Loss"].append(logs["loss"]) | |
| self.losses["Val Loss"].append(logs.get("val_loss", logs["loss"])) | |
| with self.placeholder.container(): | |
| # Single column for vertical stacking | |
| st.subheader("Decision Region & Loss") | |
| # Display epoch count above decision region | |
| st.write(f"Epoch: {self.current_epoch}") | |
| # Decision region plot (3x3 size, improved accuracy) | |
| fig1, ax1 = plt.subplots(figsize=(3, 3)) # Match dataset scatterplot size | |
| if problem_type == "Classification": | |
| X_2d = self.X[:, :2] # Use only first two features for 2D | |
| # Ensure model prediction for decision boundary | |
| y_pred_proba = self.model.predict(X_2d, verbose=0) | |
| y_pred = (y_pred_proba > 0.5).astype(int).ravel() | |
| try: | |
| # Use mlxtend for decision regions | |
| plot_decision_regions(X_2d, self.y, clf=self.model, legend=2, colors='blue,red') | |
| plt.scatter(X_2d[:, 0], X_2d[:, 1], c=self.y, cmap='coolwarm', edgecolors='k', alpha=0.7) | |
| # Add precise decision boundary using contour | |
| xx, yy = np.meshgrid(np.linspace(X_2d[:, 0].min(), X_2d[:, 0].max(), 100), | |
| np.linspace(X_2d[:, 1].min(), X_2d[:, 1].max(), 100)) | |
| grid = np.c_[xx.ravel(), yy.ravel()] | |
| Z = self.model.predict(grid, verbose=0) | |
| Z = (Z > 0.5).astype(int).reshape(xx.shape) | |
| plt.contour(xx, yy, Z, levels=[0.5], colors='black', linewidths=2) | |
| except Exception as e: | |
| st.warning(f"Decision region plot failed: {e}") | |
| # Fallback: Use contourf for decision regions | |
| xx, yy = np.meshgrid(np.linspace(X_2d[:, 0].min(), X_2d[:, 0].max(), 100), | |
| np.linspace(X_2d[:, 1].min(), X_2d[:, 1].max(), 100)) | |
| grid = np.c_[xx.ravel(), yy.ravel()] | |
| Z = self.model.predict(grid, verbose=0) if self.model else np.zeros((len(grid), 1)) | |
| Z = (Z > 0.5).astype(int).reshape(xx.shape) | |
| plt.contour(xx, yy, Z, levels=[0.5], colors='black', linewidths=2) | |
| plt.contourf(xx, yy, Z, alpha=0.3, cmap="coolwarm") | |
| plt.scatter(X_2d[:, 0], X_2d[:, 1], c=self.y, cmap="coolwarm", edgecolors="k", alpha=0.7) | |
| else: | |
| y_pred = self.model.predict(self.X, verbose=0) if self.model else np.zeros_like(self.X[:, 0]) | |
| plt.scatter(self.X[:, 0], self.y, c="blue", alpha=0.5) | |
| plt.plot(self.X[:, 0], y_pred, "r-", linewidths=2) | |
| ax1.set_facecolor("#333") | |
| ax1.set_xticks([]) | |
| ax1.set_yticks([]) | |
| st.pyplot(fig1) | |
| # Train-Val-Loss plot (3x3 size) | |
| fig2, ax2 = plt.subplots(figsize=(3, 3)) # Match dataset scatterplot size | |
| ax2.plot(self.losses["Epoch"], self.losses["Train Loss"], "b-", label="Train") | |
| ax2.plot(self.losses["Epoch"], self.losses["Val Loss"], "r--", label="Val") | |
| ax2.legend() | |
| ax2.set_facecolor("#333") | |
| st.pyplot(fig2) | |
| except Exception as e: | |
| st.error(f"Error in epoch end: {e}") | |
| if st.session_state.training: | |
| try: | |
| model = create_model(len(selected_features), st.session_state.hidden_layer_neurons) | |
| callback = OutputCallback(selected_data, cv) | |
| callback.model = model # Explicitly set the model for the callback | |
| model.fit(selected_data, cv, epochs=50, # Further reduced for Spaces | |
| batch_size=batch_size, validation_split=1-train_ratio, | |
| callbacks=[callback], verbose=0) | |
| except Exception as e: | |
| st.error(f"Training failed: {e}") | |
| st.markdown('</div>', unsafe_allow_html=True) |