| import gradio as gr |
| import numpy as np |
| import matplotlib.pyplot as plt |
|
|
| def relu(x): |
| return np.maximum(0, x) |
|
|
| def relu_derivative(x): |
| return (x > 0).astype(float) |
|
|
| def tanh(x): |
| return np.tanh(x) |
|
|
| def tanh_derivative(x): |
| return 1 - np.tanh(x)**2 |
|
|
| class EveOptimizer: |
| def __init__(self, params, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8): |
| self.params = params |
| self.lr = learning_rate |
| self.beta1 = beta1 |
| self.beta2 = beta2 |
| self.epsilon = epsilon |
| self.t = 0 |
| self.m = [np.zeros_like(p) for p in params] |
| self.v = [np.zeros_like(p) for p in params] |
| self.fractal_memory = [np.zeros_like(p) for p in params] |
|
|
| def step(self, grads): |
| self.t += 1 |
| for i, (param, grad) in enumerate(zip(self.params, grads)): |
| self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * grad |
| self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * (grad ** 2) |
|
|
| m_hat = self.m[i] / (1 - self.beta1 ** self.t) |
| v_hat = self.v[i] / (1 - self.beta2 ** self.t) |
|
|
| fractal_factor = self.fractal_adjustment(param, grad) |
| self.fractal_memory[i] = 0.9 * self.fractal_memory[i] + 0.1 * fractal_factor |
|
|
| param -= self.lr * m_hat / (np.sqrt(v_hat) + self.epsilon) * self.fractal_memory[i] |
|
|
| def fractal_adjustment(self, param, grad): |
| c = np.mean(grad) + 1j * np.std(param) |
| z = 0 |
| for _ in range(10): |
| z = z**2 + c |
| if abs(z) > 2: |
| break |
| return 1 / (1 + abs(z)) |
|
|
| class BatchNormalization: |
| def __init__(self, input_shape): |
| self.gamma = np.ones(input_shape) |
| self.beta = np.zeros(input_shape) |
| self.epsilon = 1e-5 |
| self.moving_mean = np.zeros(input_shape) |
| self.moving_var = np.ones(input_shape) |
|
|
| def forward(self, x, training=True): |
| if training: |
| mean = np.mean(x, axis=0) |
| var = np.var(x, axis=0) |
| self.moving_mean = 0.99 * self.moving_mean + 0.01 * mean |
| self.moving_var = 0.99 * self.moving_var + 0.01 * var |
| else: |
| mean = self.moving_mean |
| var = self.moving_var |
|
|
| x_norm = (x - mean) / np.sqrt(var + self.epsilon) |
| out = self.gamma * x_norm + self.beta |
| if training: |
| self.cache = (x, x_norm, mean, var) |
| return out |
|
|
| def backward(self, dout): |
| x, x_norm, mean, var = self.cache |
| m = x.shape[0] |
|
|
| dx_norm = dout * self.gamma |
| dvar = np.sum(dx_norm * (x - mean) * -0.5 * (var + self.epsilon)**(-1.5), axis=0) |
| dmean = np.sum(dx_norm * -1 / np.sqrt(var + self.epsilon), axis=0) + dvar * np.mean(-2 * (x - mean), axis=0) |
|
|
| dx = dx_norm / np.sqrt(var + self.epsilon) + dvar * 2 * (x - mean) / m + dmean / m |
| dgamma = np.sum(dout * x_norm, axis=0) |
| dbeta = np.sum(dout, axis=0) |
|
|
| return dx, dgamma, dbeta |
|
|
| class Reward: |
| def __init__(self): |
| self.lowest_avg_batch_loss = float('inf') |
| self.lowest_max_batch_loss = float('inf') |
| self.best_weights = None |
|
|
| def update(self, avg_batch_loss, max_batch_loss, network): |
| improved = False |
| if avg_batch_loss < self.lowest_avg_batch_loss: |
| self.lowest_avg_batch_loss = avg_batch_loss |
| improved = True |
| if max_batch_loss < self.lowest_max_batch_loss: |
| self.lowest_max_batch_loss = max_batch_loss |
| improved = True |
| if improved: |
| self.best_weights = self.get_network_weights(network) |
|
|
| def get_network_weights(self, network): |
| weights = [] |
| for layer in network.layers: |
| layer_weights = [] |
| for agent in layer.agents: |
| agent_weights = { |
| 'weights': agent.weights.copy(), |
| 'bias': agent.bias.copy(), |
| 'bn_gamma': agent.bn.gamma.copy(), |
| 'bn_beta': agent.bn.beta.copy() |
| } |
| layer_weights.append(agent_weights) |
| weights.append(layer_weights) |
| return weights |
|
|
| def apply_best_weights(self, network): |
| if self.best_weights is not None: |
| for layer, layer_weights in zip(network.layers, self.best_weights): |
| for agent, agent_weights in zip(layer.agents, layer_weights): |
| agent.weights = agent_weights['weights'].copy() |
| agent.bias = agent_weights['bias'].copy() |
| agent.bn.gamma = agent_weights['bn_gamma'].copy() |
| agent.bn.beta = agent_weights['bn_beta'].copy() |
|
|
| class Agent: |
| def __init__(self, id, input_size, output_size, fractal_method): |
| self.id = id |
| self.weights = np.random.randn(input_size, output_size) * np.sqrt(2. / input_size) |
| self.bias = np.zeros((1, output_size)) |
| self.fractal_method = fractal_method |
| self.bn = BatchNormalization((output_size,)) |
|
|
| self.optimizer = EveOptimizer([self.weights, self.bias, self.bn.gamma, self.bn.beta]) |
|
|
| def forward(self, x, training=True): |
| self.last_input = x |
| z = np.dot(x, self.weights) + self.bias |
| z_bn = self.bn.forward(z, training) |
| self.last_output = relu(z_bn) |
| return self.last_output |
|
|
| def backward(self, error, l2_lambda=1e-5): |
| delta = error * relu_derivative(self.last_output) |
| delta, dgamma, dbeta = self.bn.backward(delta) |
|
|
| dw = np.dot(self.last_input.T, delta) + l2_lambda * self.weights |
| db = np.sum(delta, axis=0, keepdims=True) |
|
|
| self.optimizer.step([dw, db, dgamma, dbeta]) |
|
|
| return np.dot(delta, self.weights.T) |
|
|
| def apply_fractal(self, x): |
| return self.fractal_method(x) |
|
|
| class Swarm: |
| def __init__(self, num_agents, input_size, output_size, fractal_method): |
| self.agents = [Agent(i, input_size, output_size, fractal_method) for i in range(num_agents)] |
|
|
| def forward(self, x, training=True): |
| results = [agent.forward(x, training) for agent in self.agents] |
| return np.mean(results, axis=0) |
|
|
| def backward(self, error, l2_lambda): |
| errors = [agent.backward(error, l2_lambda) for agent in self.agents] |
| return np.mean(errors, axis=0) |
|
|
| def apply_fractal(self, x): |
| results = [agent.apply_fractal(x) for agent in self.agents] |
| return np.mean(results, axis=0) |
|
|
| class SwarmNeuralNetwork: |
| def __init__(self, layer_sizes, fractal_methods): |
| self.layers = [] |
| for i in range(len(layer_sizes) - 2): |
| self.layers.append(Swarm(num_agents=3, |
| input_size=layer_sizes[i], |
| output_size=layer_sizes[i+1], |
| fractal_method=fractal_methods[i])) |
| self.output_layer = Swarm(num_agents=1, |
| input_size=layer_sizes[-2], |
| output_size=layer_sizes[-1], |
| fractal_method=fractal_methods[-1]) |
| self.reward = Reward() |
|
|
| def forward(self, x, training=True): |
| self.layer_outputs = [x] |
| for layer in self.layers: |
| x = layer.forward(x, training) |
| self.layer_outputs.append(x) |
| self.final_output = tanh(self.output_layer.forward(x, training)) |
| return self.final_output |
|
|
| def backward(self, error, l2_lambda=1e-5): |
| error = error * tanh_derivative(self.final_output) |
| error = self.output_layer.backward(error, l2_lambda) |
| for i in reversed(range(len(self.layers))): |
| error = self.layers[i].backward(error, l2_lambda) |
|
|
| def train(self, X, y, epochs, batch_size=32, l2_lambda=1e-5, patience=50): |
| best_mse = float('inf') |
| patience_counter = 0 |
|
|
| for epoch in range(epochs): |
| indices = np.arange(len(X)) |
| np.random.shuffle(indices) |
|
|
| self.reward.apply_best_weights(self) |
|
|
| epoch_losses = [] |
| for start_idx in range(0, len(X) - batch_size + 1, batch_size): |
| batch_indices = indices[start_idx:start_idx+batch_size] |
| X_batch = X[batch_indices] |
| y_batch = y[batch_indices] |
|
|
| output = self.forward(X_batch) |
| error = y_batch - output |
|
|
| error = np.clip(error, -1, 1) |
|
|
| self.backward(error, l2_lambda) |
|
|
| epoch_losses.append(np.mean(np.square(error))) |
|
|
| avg_batch_loss = np.mean(epoch_losses) |
| max_batch_loss = np.max(epoch_losses) |
| self.reward.update(avg_batch_loss, max_batch_loss, self) |
|
|
| mse = np.mean(np.square(y - self.forward(X, training=False))) |
|
|
| if epoch % 100 == 0: |
| print(f"Epoch {epoch}, MSE: {mse:.6f}, Avg Batch Loss: {avg_batch_loss:.6f}, Min Batch Loss: {np.min(epoch_losses):.6f}, Max Batch Loss: {max_batch_loss:.6f}") |
|
|
| if mse < best_mse: |
| best_mse = mse |
| patience_counter = 0 |
| else: |
| patience_counter += 1 |
|
|
| if patience_counter >= patience: |
| print(f"Early stopping at epoch {epoch}") |
| break |
|
|
| return best_mse |
|
|
| def apply_fractals(self, x): |
| fractal_outputs = [] |
| for i, layer in enumerate(self.layers): |
| x = self.layer_outputs[i+1] |
| fractal_output = layer.apply_fractal(x) |
| fractal_outputs.append(fractal_output) |
| return fractal_outputs |
|
|
| def sierpinski_fractal(input_data): |
| t = np.linspace(0, 2 * np.pi, input_data.shape[0]) |
| x = np.mean(input_data) * np.cos(t) |
| y = np.mean(input_data) * np.sin(t) |
| return x, y |
|
|
| def mandelbrot_fractal(input_data, max_iter=10): |
| output = np.zeros(input_data.shape[0]) |
| for i in range(input_data.shape[0]): |
| c = input_data[i, 0] + 0.1j * np.std(input_data) |
| z = 0 |
| for n in range(max_iter): |
| if abs(z) > 2: |
| output[i] = n |
| break |
| z = z*z + c |
| else: |
| output[i] = max_iter |
| return output |
|
|
| def julia_fractal(input_data, max_iter=10): |
| output = np.zeros(input_data.shape[0]) |
| c = -0.8 + 0.156j |
| for i in range(input_data.shape[0]): |
| z = input_data[i, 0] + 0.1j * np.std(input_data) |
| for n in range(max_iter): |
| if abs(z) > 2: |
| output[i] = n |
| break |
| z = z*z + c |
| else: |
| output[i] = max_iter |
| return output |
|
|
| def run_snn(epochs, batch_size, l2_lambda, patience): |
| np.random.seed(42) |
|
|
| X = np.linspace(0, 10, 1000).reshape(-1, 1) |
| y = np.sin(X).reshape(-1, 1) |
|
|
| X = (X - X.min()) / (X.max() - X.min()) |
| y = (y - y.min()) / (y.max() - y.min()) |
|
|
| snn = SwarmNeuralNetwork(layer_sizes=[1, 32, 16, 8, 1], |
| fractal_methods=[sierpinski_fractal, mandelbrot_fractal, julia_fractal, julia_fractal]) |
|
|
| snn.train(X, y, epochs=epochs, batch_size=batch_size, l2_lambda=l2_lambda, patience=patience) |
|
|
| y_pred = snn.forward(X, training=False) |
| fractal_outputs = snn.apply_fractals(X) |
|
|
| fig, axs = plt.subplots(2, 2, figsize=(15, 10)) |
| |
| axs[0, 0].plot(X, y, label='True') |
| axs[0, 0].plot(X, y_pred, label='Predicted') |
| axs[0, 0].legend() |
| axs[0, 0].set_title('True vs Predicted') |
| |
| x, y = fractal_outputs[0] |
| axs[0, 1].plot(x, y) |
| axs[0, 1].set_title('Sierpinski Fractal Output') |
| |
| axs[1, 0].plot(X, fractal_outputs[1]) |
| axs[1, 0].set_title('Mandelbrot Fractal Output') |
| |
| axs[1, 1].plot(X, fractal_outputs[2]) |
| axs[1, 1].set_title('Julia Fractal Output') |
|
|
| plt.tight_layout() |
| return fig |
|
|
| with gr.Blocks() as demo: |
| epochs = gr.Slider(1, 10000, value=5000, label="Epochs") |
| batch_size = gr.Slider(1, 100, value=32, label="Batch Size") |
| l2_lambda = gr.Slider(0.0001, 0.1, value=0.00001, label="L2 Lambda") |
| patience = gr.Slider(1, 1000, value=50, label="Patience") |
|
|
| plot = gr.Plot() |
|
|
| def update_plot(epochs, batch_size, l2_lambda, patience): |
| return run_snn(epochs, batch_size, l2_lambda, patience) |
|
|
| btn = gr.Button("Run SNN") |
| btn.click(update_plot, inputs=[epochs, batch_size, l2_lambda, patience], outputs=plot) |
|
|
| demo.launch() |
|
|