import numpy as np class NNSVM: def __init__(self, input_dim, n, d, AF="tanh", seed=0): self.AF = AF self.input_dim = input_dim self.n = n np.random.seed(seed) limit = np.sqrt(6 / (input_dim + n)) self.W0 = np.random.uniform(-limit, limit, size=(n, input_dim)) self.b0 = np.zeros((n, 1)) limit = np.sqrt(6 / (n + d)) self.W1 = np.random.uniform(-limit, limit, size=(d, n)) self.b1 = np.zeros((d, 1)) self.theta = np.zeros((d, 1)) self.theta_0 = 0.0 def activation(self, z): if self.AF == "tanh": return np.tanh(z) elif self.AF == "sigmoid": return 1 / (1 + np.exp(-z)) elif self.AF == "GRBF": return np.exp(-(z**2)) elif self.AF == "xGRBF": return z * np.exp(-(z**2)) def predict(self, X): z0 = np.matmul(self.W0, X) + self.b0 a0 = self.activation(z0) z = np.matmul(self.W1, a0) + self.b1 phi = self.activation(z) z_prime = np.matmul(np.transpose(phi, axes=(0, 2, 1)), self.theta) + self.theta_0 y_hat = np.sign(z_prime).flatten() return y_hat, z_prime.flatten() def forward(self, x): self.z0 = np.matmul(self.W0, x) + self.b0 if self.AF == "tanh": self.a0 = np.tanh(self.z0) elif self.AF == "sigmoid": self.a0 = 1 / (1 + np.exp(-self.z0)) elif self.AF == "GRBF": self.a0 = np.exp(-(self.z0**2)) elif self.AF == "xGRBF": self.a0 = self.z0 * np.exp(-(self.z0**2)) self.z = np.matmul(self.W1, self.a0) + self.b1 if self.AF == "tanh": self.phi = np.tanh(self.z) elif self.AF == "sigmoid": self.phi = 1 / (1 + np.exp(-self.z)) elif self.AF == "GRBF": self.phi = np.exp(-(self.z**2)) elif self.AF == "xGRBF": self.phi = self.z * np.exp(-(self.z**2)) self.z_prime = np.matmul(np.transpose(self.phi, axes=(0, 2, 1)), self.theta) + self.theta_0 self.z_prime = self.z_prime[:, 0, 0] return self.z_prime def fit(self, X, y, X_test, y_test, seed=0, epochs=100, lr=1e-3, Lambda=1e-3, beta1=0.9, beta2=0.999, eps=1e-8, alpha=1e-6, batch_size=128): np.random.seed(seed) n_samples = X.shape[0] best_accuracy = 0 m_b0 = np.zeros_like(self.b0) v_b0 = np.zeros_like(self.b0) m_W0 = np.zeros_like(self.W0) v_W0 = np.zeros_like(self.W0) g_b0 = np.zeros_like(self.b0) g_W0 = np.zeros_like(self.W0) self.alpha_b0 = np.ones_like(self.b0) * lr self.alpha_W0 = np.ones_like(self.W0) * lr m_b1 = np.zeros_like(self.b1) v_b1 = np.zeros_like(self.b1) m_W1 = np.zeros_like(self.W1) v_W1 = np.zeros_like(self.W1) num_batches = n_samples // batch_size batch_remaining = int(n_samples - num_batches * batch_size) k = 0 # contador para aprendizaje adaptativo print("Modelo listo para entrenamiento con", epochs, "épocas") for epoch in range(epochs): print("Epoch: ", epoch + 1) index = np.random.permutation(n_samples) for i in range(num_batches + 1): if i != num_batches: idx = index[i * batch_size:(i + 1) * batch_size] else: # En el último batch tomar los restantes idx = index[i * batch_size:i * batch_size + batch_remaining] x_i = X[idx] y_i = y[idx] # Forward pass self.forward(x_i) # Cálculo gate matrix para el margen del SVM gate_matrix = np.ones((x_i.shape[0], 1, 1)) gate_matrix[self.z_prime * (y_i.flatten()) > 1] = 0 # Gradientes self.dtheta = -self.phi * y_i[:, None, None] self.dtheta_0 = -y_i self.dphi = -self.theta * y_i[:, None, None] if self.AF == "tanh": self.dz = (1 - (self.phi ** 2)) * self.dphi elif self.AF == "sigmoid": self.dz = self.phi * (1 - self.phi) * self.dphi elif self.AF == "GRBF": self.dz = (self.phi * (-2 * self.z)) * self.dphi elif self.AF == "xGRBF": self.dz = (self.phi / self.z + self.phi * (-2 * self.z)) * self.dphi self.db1 = self.dz self.dW1 = np.matmul(self.dz, np.transpose(self.a0, axes=(0, 2, 1))) self.da0 = np.matmul(self.W1.T, self.dz) if self.AF == "tanh": self.dz0 = (1 - (self.a0 ** 2)) * self.da0 elif self.AF == "sigmoid": self.dz0 = self.a0 * (1 - self.a0) * self.da0 elif self.AF == "GRBF": self.dz0 = (self.a0 * (-2 * self.z0)) * self.da0 elif self.AF == "xGRBF": self.dz0 = (self.a0 / self.z0 + self.a0 * (-2 * self.z0)) * self.da0 self.db0 = self.dz0 self.dW0 = np.matmul(self.dz0, np.transpose(x_i, axes=(0, 2, 1))) n = x_i.shape[0] # Actualización parámetros theta (SVM) self.theta = self.theta - lr * ((1 / n * gate_matrix * self.dtheta).sum(axis=0) + Lambda * self.theta) self.theta_0 = self.theta_0 - lr * (1 / n * gate_matrix * self.dtheta_0[:, None, None]).sum() #self.theta_0 = self.theta_0 - lr * (1 / n * gate_matrix * self.dtheta_0).sum(axis=0) # Promedio gradientes para Adam (capa 1) self.db1 = (1 / n * gate_matrix * self.db1).sum(axis=0) self.dW1 = (1 / n * gate_matrix * self.dW1).sum(axis=0) # Adam actualización para b1 m_b1 = beta1 * m_b1 + (1 - beta1) * self.db1 v_b1 = beta2 * v_b1 + (1 - beta2) * (self.db1 ** 2) m_b1_hat = m_b1 / (1 - beta1 ** (k + 1)) v_b1_hat = v_b1 / (1 - beta2 ** (k + 1)) self.b1 = self.b1 - lr * m_b1_hat / (eps + np.sqrt(v_b1_hat)) # Adam actualización para W1 m_W1 = beta1 * m_W1 + (1 - beta1) * self.dW1 v_W1 = beta2 * v_W1 + (1 - beta2) * (self.dW1 ** 2) m_W1_hat = m_W1 / (1 - beta1 ** (k + 1)) v_W1_hat = v_W1 / (1 - beta2 ** (k + 1)) self.W1 = self.W1 - lr * m_W1_hat / (eps + np.sqrt(v_W1_hat)) # Learning rate adaptativo y actualización para b0, W0 (primera capa) g_new_b0 = self.db0.sum(axis=0) # suma sobre batches para reducir dimensiones g_new_W0 = self.dW0.sum(axis=0) self.alpha_b0 = self.alpha_b0 + alpha * (g_b0 * g_new_b0) self.alpha_W0 = self.alpha_W0 + alpha * (g_W0 * g_new_W0) g_b0 = g_new_b0 g_W0 = g_new_W0 m_b0 = beta1 * m_b0 + (1 - beta1) * self.db0.sum(axis=0) v_b0 = beta2 * v_b0 + (1 - beta2) * (self.db0.sum(axis=0) ** 2) m_b0_hat = m_b0 / (1 - beta1 ** (k + 1)) v_b0_hat = v_b0 / (1 - beta2 ** (k + 1)) self.b0 = self.b0 - self.alpha_b0 * m_b0_hat / (eps + np.sqrt(v_b0_hat)) m_W0 = beta1 * m_W0 + (1 - beta1) * self.dW0.sum(axis=0) v_W0 = beta2 * v_W0 + (1 - beta2) * (self.dW0.sum(axis=0) ** 2) m_W0_hat = m_W0 / (1 - beta1 ** (k + 1)) v_W0_hat = v_W0 / (1 - beta2 ** (k + 1)) self.W0 = self.W0 - self.alpha_W0 * m_W0_hat / (eps + np.sqrt(v_W0_hat)) k += 1 class NNSVM_Multiclass: """ Implementación de una Red Neuronal seguida de una SVM multiclase. Esta versión está refactorizada para usar matrices 2D (batch_size, features), lo que simplifica enormemente el código y sigue las convenciones estándar. """ def __init__(self, input_dim, n, d, n_classes, AF="tanh", seed=0): self.AF = AF np.random.seed(seed) # Inicialización de pesos (Xavier/Glorot) # Capa 1 (Entrada -> Oculta) limit_W0 = np.sqrt(6 / (input_dim + n)) self.W0 = np.random.uniform(-limit_W0, limit_W0, size=(input_dim, n)) self.b0 = np.zeros((1, n)) # (1, n) para broadcasting # Capa 2 (Oculta -> Salida de la Red) limit_W1 = np.sqrt(6 / (n + d)) self.W1 = np.random.uniform(-limit_W1, limit_W1, size=(n, d)) self.b1 = np.zeros((1, d)) # (1, d) para broadcasting # Capa 3 (Parámetros de la SVM) self.theta = np.zeros((d, n_classes)) self.theta_0 = np.zeros((1, n_classes)) # (1, n_classes) para broadcasting def activation(self, z): if self.AF == "tanh": return np.tanh(z) # Se pueden añadir otras funciones de activación si es necesario def activation_derivative(self, a): # Derivada en función de la salida de la activación 'a' if self.AF == "tanh": return 1 - a**2 def forward(self, x): # Flujo de datos a través de la red (propagación hacia adelante) self.x = x self.z0 = np.matmul(self.x, self.W0) + self.b0 self.a0 = self.activation(self.z0) self.z1 = np.matmul(self.a0, self.W1) + self.b1 self.phi = self.activation(self.z1) # Esta es la salida de la NN (features para la SVM) # Salida final del modelo (puntuaciones de la SVM) self.z_prime = np.matmul(self.phi, self.theta) + self.theta_0 return self.z_prime def predict(self, x): scores = self.forward(x) # La predicción es la clase con la puntuación más alta return np.argmax(scores, axis=1) def fit(self, X, y_ohe, X_val, y_val_ohe, epochs=100, lr=1e-3, Lambda=1e-3, beta1=0.9, beta2=0.999, eps=1e-8, batch_size=128): n_samples = X.shape[0] # Inicialización del optimizador Adam para cada parámetro m_W0, v_W0 = np.zeros_like(self.W0), np.zeros_like(self.W0) m_b0, v_b0 = np.zeros_like(self.b0), np.zeros_like(self.b0) m_W1, v_W1 = np.zeros_like(self.W1), np.zeros_like(self.W1) m_b1, v_b1 = np.zeros_like(self.b1), np.zeros_like(self.b1) k = 0 # Contador de iteraciones de Adam print(f"Iniciando entrenamiento por {epochs} épocas...") for epoch in range(epochs): # Mezclar los datos en cada época permutation = np.random.permutation(n_samples) X_shuffled = X[permutation] y_shuffled = y_ohe[permutation] for i in range(0, n_samples, batch_size): # Tomar un lote (batch) de datos x_batch = X_shuffled[i:i+batch_size] y_batch = y_shuffled[i:i+batch_size] batch_n = x_batch.shape[0] # 1. Forward pass (Cálculo de la salida) z_prime = self.forward(x_batch) # 2. Cálculo de la pérdida y su gradiente (Hinge Loss Multiclase) true_class_scores = np.sum(z_prime * y_batch, axis=1, keepdims=True) margins = np.maximum(0, 1 + z_prime - true_class_scores) margins[y_batch == 1] = 0 # Gradiente de la pérdida respecto a z_prime loss_grad = (margins > 0).astype(float) correct_class_indices = np.argmax(y_batch, axis=1) loss_grad[np.arange(batch_n), correct_class_indices] = -np.sum(loss_grad, axis=1) # 3. Backward pass (Cálculo de gradientes para cada capa) # Gradientes de la capa SVM dtheta = np.matmul(self.phi.T, loss_grad) dtheta_0 = np.sum(loss_grad, axis=0, keepdims=True) # Propagar gradiente hacia atrás dphi = np.matmul(loss_grad, self.theta.T) dz1 = dphi * self.activation_derivative(self.phi) dW1 = np.matmul(self.a0.T, dz1) db1 = np.sum(dz1, axis=0, keepdims=True) da0 = np.matmul(dz1, self.W1.T) dz0 = da0 * self.activation_derivative(self.a0) dW0 = np.matmul(self.x.T, dz0) db0 = np.sum(dz0, axis=0, keepdims=True) # 4. Actualización de parámetros # **CORRECCIÓN CLAVE**: Dividimos los gradientes por el tamaño del lote (batch_n) # para obtener el promedio, en lugar de la suma. # El error original estaba aquí, usando la suma implícita. # Actualización de la capa SVM (con regularización L2) self.theta -= lr * (dtheta / batch_n + Lambda * self.theta) self.theta_0 -= lr * (dtheta_0 / batch_n) # Actualización de la Red Neuronal (con optimizador Adam) k += 1 # --- Actualización de W1, b1 --- m_W1 = beta1 * m_W1 + (1 - beta1) * (dW1 / batch_n) v_W1 = beta2 * v_W1 + (1 - beta2) * ((dW1 / batch_n)**2) m_W1_hat = m_W1 / (1 - beta1**k) v_W1_hat = v_W1 / (1 - beta2**k) self.W1 -= lr * m_W1_hat / (np.sqrt(v_W1_hat) + eps) m_b1 = beta1 * m_b1 + (1 - beta1) * (db1 / batch_n) v_b1 = beta2 * v_b1 + (1 - beta2) * ((db1 / batch_n)**2) m_b1_hat = m_b1 / (1 - beta1**k) v_b1_hat = v_b1 / (1 - beta2**k) self.b1 -= lr * m_b1_hat / (np.sqrt(v_b1_hat) + eps) # --- Actualización de W0, b0 --- m_W0 = beta1 * m_W0 + (1 - beta1) * (dW0 / batch_n) v_W0 = beta2 * v_W0 + (1 - beta2) * ((dW0 / batch_n)**2) m_W0_hat = m_W0 / (1 - beta1**k) v_W0_hat = v_W0 / (1 - beta2**k) self.W0 -= lr * m_W0_hat / (np.sqrt(v_W0_hat) + eps) m_b0 = beta1 * m_b0 + (1 - beta1) * (db0 / batch_n) v_b0 = beta2 * v_b0 + (1 - beta2) * ((db0 / batch_n)**2) m_b0_hat = m_b0 / (1 - beta1**k) v_b0_hat = v_b0 / (1 - beta2**k) self.b0 -= lr * m_b0_hat / (np.sqrt(v_b0_hat) + eps)