|
|
| |
|
|
| from sklearn.model_selection import train_test_split |
| from sklearn.preprocessing import StandardScaler |
| from torch.utils.data import TensorDataset, DataLoader |
| import torch.nn as nn |
| import torch.optim as optim |
| import torch |
| |
| import matplotlib.pyplot as plt |
| import plotly.graph_objs as go |
| import IPython |
| import numpy as np |
| from graphviz import Digraph |
| import copy |
| import plotly.graph_objs as go |
| import torch |
| import numpy as np |
| import colorsys |
| from functools import partial |
| import gradio as gr |
| import os |
| import uuid |
| from contextlib import contextmanager |
| NETWORK_ORIENTAION = 'h' |
| TEMP_DIR = "/content/temp" |
| if not os.path.exists(TEMP_DIR): |
| os.makedirs(TEMP_DIR) |
|
|
| """## functions""" |
|
|
| |
|
|
| def simulate_clusters(noise=0.3,data_points=1000): |
| assert data_points%4==0, 'Data points should be dived by 4' |
| |
| np.random.seed(0) |
|
|
| |
| means = [(-1, -1), (-1, 1), (1, -1), (1, 1)] |
| covs = [np.eye(2) * noise for _ in means] |
|
|
| |
| cluster_samples = [] |
| for mean, cov in zip(means, covs): |
| samples = np.random.multivariate_normal(mean, cov, data_points//4) |
| cluster_samples.append(samples) |
|
|
| |
| X = np.vstack(cluster_samples) |
| y = np.array([i//(data_points//4) for i in range(data_points)]) |
| |
| y_adjusted = np.array([0 if i in [0, 3] else 1 for i in y]) |
|
|
| |
| X_train_adj, X_test_adj, y_train_adj, y_test_adj = train_test_split(X, y_adjusted, test_size=0.2, random_state=42) |
|
|
| |
| scaler_adj = StandardScaler() |
| X_train_scaled_adj = scaler_adj.fit_transform(X_train_adj) |
| X_test_scaled_adj = scaler_adj.transform(X_test_adj) |
|
|
| |
| X_train_tensor_adj = torch.tensor(X_train_scaled_adj, dtype=torch.float32) |
| y_train_tensor_adj = torch.tensor(y_train_adj, dtype=torch.long) |
| X_test_tensor_adj = torch.tensor(X_test_scaled_adj, dtype=torch.float32) |
| y_test_tensor_adj = torch.tensor(y_test_adj, dtype=torch.long) |
|
|
| return X_train_tensor_adj,y_train_tensor_adj,X_test_tensor_adj,y_test_tensor_adj |
|
|
| |
| def get_color(activation, base_color=False): |
| if base_color: |
| |
| r_base, g_base, b_base = int(base_color[1:3], 16), int(base_color[3:5], 16), int(base_color[5:7], 16) |
|
|
| |
| r = r_base + (255 - r_base) * (1 - activation) |
| g = g_base + (255 - g_base) * (1 - activation) |
| b = b_base + (255 - b_base) * (1 - activation) |
|
|
| return f'#{int(r):02x}{int(g):02x}{int(b):02x}' |
|
|
|
|
| else: |
| if activation > 0: |
| return f"#0000FF{int(activation * 255):02X}" |
| return "#E0E0E0" |
|
|
|
|
| rd = lambda activation: ("\n"+"{:.2f}".format(torch.round(activation,decimals=2).item())) if activation!=1 else '' |
| |
| softmax = lambda x: torch.exp(x) / torch.sum(torch.exp(x), axis=0) if all(x!=1) else x |
|
|
|
|
| rd = lambda activation: ("\n"+"{:.2f}".format(torch.round(activation,decimals=2).item())) if activation!=1 else '' |
| def visualize_network_with_weights(model, activations=False, norm='net', decision_boundary_images=None, width=1, height=1): |
| dot = Digraph() |
| if NETWORK_ORIENTAION=='h': |
| dot.attr(rankdir='LR') |
| pos_color = "blue" |
| neg_color = "orange" |
| layers_weights = {} |
| max_weight = 0 |
| number_of_layer = 3 |
| |
| input_color, hidden_color, output_color1,output_color2 = '#90EE90','#D3D3D3', '#FFB6C1' , '#ADD8E6' |
|
|
| |
| for name, layer in model.named_children(): |
| if isinstance(layer, torch.nn.Linear): |
| layer_weight = layer.weight.cpu().data.numpy() |
| layers_weights[name] = layer_weight |
| max_weight = max(max_weight, np.abs(layer_weight).max()) |
| output_layer_name = name |
| |
| if not activations: |
| activations = {layer: [1] * weight.shape[0] for layer, weight in layers_weights.items()} |
|
|
| |
| layers_weights_norm = {layer: weight / (np.abs(weight).max() if norm == 'layer' else max_weight) |
| for layer, weight in layers_weights.items()} |
| def add_node_with_border(node_id, label, base_color, activation, image_path=None, shape='circle', border_color='black', border_width=1): |
| fill_color = get_color(activation, base_color) |
| if image_path: |
| dot.node(node_id, label, shape='box', style='filled', fillcolor=fill_color, color=border_color, penwidth=str(border_width),imagescale='both', width=str(width), height=str(height), image=image_path, fixedsize='true') |
| else: |
| dot.node(node_id, label, shape=shape, style='filled', fillcolor=fill_color, color=border_color, penwidth=str(border_width)) |
| axis_names = ['X','Y'] |
| |
| for i in range(layers_weights['fc1'].shape[1]): |
| add_node_with_border(f'h0_{i}' , f'X{i} - {axis_names[i]} Axis', input_color, 1.0) |
|
|
| for layer_i in range(1,number_of_layer): |
| layer_name = 'fc'+str(layer_i) |
| for i, activation in enumerate(activations[layer_name]): |
| image_path = decision_boundary_images[layer_name][i] if decision_boundary_images and layer_name in decision_boundary_images and len(decision_boundary_images[layer_name]) > i else None |
| add_node_with_border(f'h{layer_i}_{i}', f'H{layer_i}_{i}{rd(activation)}', hidden_color, activation, image_path=image_path) |
| norm_output_activations = softmax(torch.tensor([activations[output_layer_name][0],activations[output_layer_name][1]])) |
| activation_label1,activation_label2 = norm_output_activations |
| add_node_with_border(f'h{number_of_layer}_0', f"Y0 - Label 0{rd(activation_label1)}", output_color1, activation_label1,shape='doublecircle') |
| add_node_with_border(f'h{number_of_layer}_1', f"Y1 - Label 1{rd(activation_label2)}", output_color2, activation_label2,shape='doublecircle') |
|
|
|
|
| |
| prev_layer_size = layers_weights[list(layers_weights.keys())[0]].shape[1] |
| prev_layer_name = 'h0' |
|
|
| for layer_idx, (layer_name, weight_matrix) in enumerate(layers_weights.items(), start=1): |
| current_layer_size = weight_matrix.shape[0] |
|
|
| for i in range(prev_layer_size): |
| for j in range(current_layer_size): |
| color = pos_color if weight_matrix[j, i] >= 0 else neg_color |
| dot.edge(f'{prev_layer_name}_{i}', f'h{layer_idx}_{j}', penwidth=str(abs(layers_weights_norm[layer_name][j, i]) * 5), color=color) |
|
|
| prev_layer_size = current_layer_size |
| prev_layer_name = f'h{layer_idx}' |
|
|
| return dot |
|
|
| |
| def plot_decision_boundary(model, X_train, y_train, X_test, y_test, show=True, epoch=''): |
| |
| model.eval() |
|
|
| |
| x_min, x_max = min(X_train[:, 0].min(), X_test[:, 0].min()) - 1, max(X_train[:, 0].max(), X_test[:, 0].max()) + 1 |
| y_min, y_max = min(X_train[:, 1].min(), X_test[:, 1].min()) - 1, max(X_train[:, 1].max(), X_test[:, 1].max()) + 1 |
| h = 0.01 |
|
|
| |
| xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h)) |
|
|
| |
| grid = np.c_[xx.ravel(), yy.ravel()] |
| grid_tensor = torch.FloatTensor(grid) |
| with torch.no_grad(): |
| predictions = model(grid_tensor.to(model.device)).argmax(1).to('cpu') |
| Z = predictions.numpy().reshape(xx.shape) |
|
|
| |
| contour = go.Contour( |
| x=np.arange(x_min, x_max, h), |
| y=np.arange(y_min, y_max, h), |
| z=Z, |
| colorscale='RdYlBu', |
| showscale=False |
| ) |
|
|
| |
| train_0 = X_train[y_train == 0] |
| train_1 = X_train[y_train == 1] |
| test_0 = X_test[y_test == 0] |
| test_1 = X_test[y_test == 1] |
|
|
| |
| train_0_scatter = go.Scatter(x=train_0[:, 0], y=train_0[:, 1], mode='markers', |
| marker=dict(color='red', line=dict(color='black', width=1)), |
| name='Train - Label 0') |
| train_1_scatter = go.Scatter(x=train_1[:, 0], y=train_1[:, 1], mode='markers', |
| marker=dict(color='green', line=dict(color='black', width=1)), |
| name='Train - Label 1') |
| test_0_scatter = go.Scatter(x=test_0[:, 0], y=test_0[:, 1], mode='markers', |
| marker=dict(color='rgba(255, 200, 200, 1)', symbol='circle-open', line=dict(color='black', width=1)), |
| name='Test - Label 0') |
| test_1_scatter = go.Scatter(x=test_1[:, 0], y=test_1[:, 1], mode='markers', |
| marker=dict(color='rgba(200, 255, 200, 1)', symbol='circle-open', line=dict(color='black', width=1)), |
| name='Test - Label 1') |
|
|
| |
| layout = go.Layout( |
| title='Decision Boundary ' + epoch, |
| xaxis=dict(title='Feature 1'), |
| yaxis=dict(title='Feature 2'), |
| showlegend=True |
| ) |
| |
| fig = go.Figure(data=[contour, train_0_scatter, train_1_scatter, test_0_scatter, test_1_scatter], layout=layout) |
|
|
| |
| if show: fig.show() |
| return fig |
|
|
|
|
| def generate_learning_curve(loss_hist, loss_val_hist, hidden_units, noise, epochs, lr,metric): |
| with torch.no_grad(): |
| metric = 'Loss' if metric.lower()=='loss' else "Accuracy" |
| |
| trace_train = go.Scatter( |
| x=list(range(1, epochs + 1)), |
| y=loss_hist, |
| mode='lines', |
| name=f'Training {metric}' |
| ) |
| trace_val = go.Scatter( |
| x=list(range(1, epochs + 1)), |
| y=loss_val_hist, |
| mode='lines', |
| name=f'Validation {metric}' |
| ) |
|
|
| |
| data = [trace_train, trace_val] |
|
|
| |
| layout = go.Layout( |
| title=f'Learning Curve - Hidden Units: {hidden_units}, Noise: {noise}, Learning Rate: {lr}', |
| xaxis=dict(title='Epochs'), |
| yaxis=dict(title=metric), |
|
|
| ) |
|
|
| |
| fig = go.Figure(data=data, layout=layout) |
| return fig |
|
|
| def save_plot_as_image(fig, remove_axes=True, remove_title=True, remove_colorbar=True, transparent_background=True): |
| """ |
| Saves a Matplotlib figure as an image and returns the path to the image. |
| |
| Args: |
| fig (matplotlib.figure.Figure): The Matplotlib figure to save. |
| remove_axes (bool): If True, removes the axes from the plot. |
| remove_title (bool): If True, removes the title and header from the plot. |
| remove_colorbar (bool): If True, removes the colorbar from the plot. |
| transparent_background (bool): If True, saves the image with a transparent background. |
| |
| Returns: |
| str: Path to the saved image file. |
| """ |
| |
| if not isinstance(fig, plt.Figure): |
| raise ValueError("The provided object is not a Matplotlib figure.") |
|
|
| |
| if remove_axes: |
| for ax in fig.axes: |
| ax.get_xaxis().set_visible(False) |
| ax.get_yaxis().set_visible(False) |
| ax.set_frame_on(False) |
|
|
| |
| if remove_title: |
| fig.suptitle("") |
| for ax in fig.axes: |
| ax.title.set_visible(False) |
|
|
| |
| if remove_colorbar: |
| for ax in fig.axes: |
| if hasattr(ax, 'collections') and ax.collections: |
| |
| for im in ax.get_images(): |
| if hasattr(im, 'colorbar') and im.colorbar: |
| im.colorbar.remove() |
|
|
| |
| if transparent_background: |
| fig.patch.set_alpha(0) |
| for ax in fig.axes: |
| ax.patch.set_alpha(0) |
|
|
|
|
| |
| filename = f"plot_{uuid.uuid4()}.png" |
| file_path = os.path.join(TEMP_DIR, filename) |
|
|
| |
| fig.savefig(file_path, bbox_inches='tight', pad_inches=0, transparent=transparent_background) |
|
|
| return file_path |
|
|
| def plot_neuron_decision_boundaries(model, X, step=0.01): |
| |
| if isinstance(X, torch.Tensor): |
| X = X.cpu().numpy() |
| mesh_border_expansion = 0.5 |
| |
| x_min, x_max = X[:, 0].min() - mesh_border_expansion , X[:, 0].max() + mesh_border_expansion |
| y_min, y_max = X[:, 1].min() - mesh_border_expansion , X[:, 1].max() + mesh_border_expansion |
| xx, yy = np.meshgrid(np.arange(x_min, x_max, step), np.arange(y_min, y_max, step)) |
| mesh_inputs = torch.Tensor(np.c_[xx.ravel(), yy.ravel()]) |
|
|
| model.eval() |
| figures_dict = {} |
| layer_outputs = mesh_inputs |
| with torch.no_grad(): |
| for name, layer in model.named_children(): |
| |
| layer_outputs = layer(layer_outputs.to(model.device)) |
|
|
| |
| if isinstance(layer, nn.Linear) or (name == list(model.named_children())[-1][0]): |
| |
| outputs_np = layer_outputs.cpu().numpy() |
| for neuron_idx in range(outputs_np.shape[1]): |
| Z = outputs_np[:, neuron_idx].reshape(xx.shape) |
|
|
| Z_min, Z_max = Z.min(), Z.max() |
| levels = sorted([Z_min, 0, Z_max]) if Z_min < 0 < Z_max else [Z_min, Z_max] |
|
|
| fig, ax = plt.subplots() |
| |
| ax.contourf(xx, yy, Z, levels=levels, cmap=plt.cm.RdBu, alpha=0.8) |
| |
| |
| |
| plt.show() |
| plt.close(fig) |
| if name not in figures_dict: |
| figures_dict[name]=[] |
| figures_dict[name] += [fig] |
|
|
| return figures_dict |
|
|
|
|
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
| |
| global fc_model_hist, X_train, y_train, X_test, y_test |
| fc_model_hist, X_train, y_train, X_test, y_test = None, None, None, None, None |
|
|
| class FCNet(nn.Module): |
| def __init__(self,hidden_units,device): |
| super(FCNet, self).__init__() |
| self.fc1 = nn.Linear(2, hidden_units) |
| self.act_func1 = nn.ReLU() |
| self.fc2 = nn.Linear(hidden_units, hidden_units) |
| self.act_func2 = nn.ReLU() |
| self.fc3 = nn.Linear(hidden_units, 2) |
| self.device = device |
| def forward(self, x): |
| x = self.act_func1(self.fc1(x)) |
| x = self.act_func2(self.fc2(x)) |
| x = self.fc3(x) |
| return x |
| def forward_with_activation(self, x): |
| inputs = x |
| x1 = self.act_func1(self.fc1(x)) |
| x2 = self.act_func2(self.fc2(x1)) |
| x3 = self.fc3(x2) |
| return x,{'inputs':inputs,'fc1':x1,'fc2':x2,'fc3':x3} |
| def to(self, device): |
| super().to(device) |
| self.device = device |
| return self |
|
|
| def init_net_and_train(hidden_units = 4,noise = 0.2,epochs = 30,data_points = 1000,lr=0.01,device='cpu',metric='acc'): |
| global fc_model_hist, X_train, y_train, X_test, y_test |
| |
| X_train,y_train,X_test,y_test = simulate_clusters(noise,data_points) |
|
|
| |
| train_dataset_adj = TensorDataset(X_train, y_train) |
| train_loader_adj = DataLoader(train_dataset_adj, batch_size=64, shuffle=True) |
| test_dataset_adj = TensorDataset(X_test, y_test) |
| test_loader_adj = DataLoader(test_dataset_adj, batch_size=64, shuffle=True) |
| |
| |
| fc_model = FCNet(hidden_units,device=device) |
| fc_model.to(device) |
| |
| fc_criterion = nn.CrossEntropyLoss() |
| fc_optimizer = optim.Adam(fc_model.parameters(), lr=lr) |
|
|
| |
| fc_model_hist = [] |
|
|
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| loss_hist = [] |
| loss_val_hist = [] |
| acc_hist = [] |
| acc_val_hist = [] |
|
|
| device = fc_model.device |
|
|
| for epoch in range(epochs): |
| fc_model.train() |
| cur_epoch_loss = 0 |
| correct_train = 0 |
| total_train = 0 |
|
|
| for inputs, labels in train_loader_adj: |
| inputs, labels = inputs.to(device), labels.to(device) |
| fc_optimizer.zero_grad() |
| outputs = fc_model(inputs) |
| loss = fc_criterion(outputs, labels) |
| loss.backward() |
| fc_optimizer.step() |
|
|
| cur_epoch_loss += loss.item() * inputs.size(0) |
| _, predicted = torch.max(outputs.data, 1) |
| total_train += labels.size(0) |
| correct_train += (predicted == labels).sum().item() |
|
|
| train_loss = cur_epoch_loss / total_train |
| train_accuracy = correct_train / total_train |
| loss_hist.append(train_loss) |
| acc_hist.append(train_accuracy) |
|
|
| fc_model.eval() |
| fc_model_hist.append(copy.deepcopy(fc_model).to('cpu')) |
| cur_epoch_loss = 0 |
| correct_test = 0 |
| total_test = 0 |
|
|
| with torch.no_grad(): |
| for inputs, labels in test_loader_adj: |
| inputs, labels = inputs.to(device), labels.to(device) |
| outputs = fc_model(inputs) |
| loss = fc_criterion(outputs, labels) |
|
|
| cur_epoch_loss += loss.item() * inputs.size(0) |
| _, predicted = torch.max(outputs.data, 1) |
| total_test += labels.size(0) |
| correct_test += (predicted == labels).sum().item() |
|
|
| test_loss = cur_epoch_loss / total_test |
| test_accuracy = correct_test / total_test |
| loss_val_hist.append(test_loss) |
| acc_val_hist.append(test_accuracy) |
|
|
|
|
| |
|
|
| |
| if metric=='acc': |
| reported_metric_train,reported_metric_val = acc_hist,acc_val_hist |
| else: |
| reported_metric_train,reported_metric_val = loss_hist,loss_val_hist |
| return generate_learning_curve(reported_metric_train,reported_metric_val,hidden_units,noise,epochs,lr,metric) |
|
|
| |
| def get_network_with_inputs(epoch, input_x, input_y,output_type = "HTML"): |
| if epoch>len(fc_model_hist): |
| epoch = len(fc_model_hist) |
| with torch.no_grad(): |
| cur_model = fc_model_hist[epoch - 1] |
| out, activations = cur_model.forward_with_activation(torch.tensor([input_x, input_y], dtype=torch.float32,device=cur_model.device)) |
| network_dot = visualize_network_with_weights(cur_model, activations=activations) |
| if output_type=='PNG': |
| cur_path = f'network_with_weights_activation_{epoch}' |
| network_dot.render(cur_path, format='png', cleanup=True) |
| return cur_path + ".png" |
| else: |
| svg_content = network_dot.pipe(format='svg').decode('utf-8') |
| |
| html_content = f'<div style="width:100%; height:100%;">{svg_content}</div>' |
| return html_content |
|
|
|
|
| get_plots_as_png = lambda des_list: [save_plot_as_image(plot) for plot in des_list] |
|
|
|
|
| as_HTML=False |
|
|
| def generate_images(epoch,net_with_unit_decisions=True): |
| global fc_model_hist |
| if epoch>len(fc_model_hist): |
| epoch = len(fc_model_hist) |
| fig = plot_decision_boundary(fc_model_hist[epoch-1], X_train, y_train, X_test, y_test, show=False,epoch=f'Epoch:{epoch}') |
| |
| if not net_with_unit_decisions: |
| network_dot = visualize_network_with_weights(fc_model_hist[epoch-1]) |
| else: |
| decision_plots = plot_neuron_decision_boundaries(fc_model_hist[epoch-1], X_train) |
| decision_boundary_images = {k:get_plots_as_png(decision_plots[k]) for k in decision_plots} |
| network_dot = visualize_network_with_weights(fc_model_hist[epoch-1], activations=False, decision_boundary_images=decision_boundary_images) |
| if as_HTML: |
| svg_content = network_dot.pipe(format='svg').decode('utf-8') |
| network_proccessed = f'<div style="width:100%; height:100%;">{svg_content}</div>' |
| else: |
| cur_path = f'{TEMP_DIR}/network_with_weights_activation_{epoch}' |
| network_dot.render(cur_path, format='png', cleanup=True) |
| network_proccessed = cur_path+".png" |
|
|
| return fig, network_proccessed |
|
|
| @contextmanager |
| def dummy_context(): |
| yield |
|
|
|
|