faultdetectionchillers / nnsvm_model.py
DevNumb's picture
Upload 4 files
d3cb5c7 verified
import numpy as np
class NNSVM:
def __init__(self, input_dim, n, d, AF="tanh", seed=0):
self.AF = AF
self.input_dim = input_dim
self.n = n
np.random.seed(seed)
limit = np.sqrt(6 / (input_dim + n))
self.W0 = np.random.uniform(-limit, limit, size=(n, input_dim))
self.b0 = np.zeros((n, 1))
limit = np.sqrt(6 / (n + d))
self.W1 = np.random.uniform(-limit, limit, size=(d, n))
self.b1 = np.zeros((d, 1))
self.theta = np.zeros((d, 1))
self.theta_0 = 0.0
def activation(self, z):
if self.AF == "tanh":
return np.tanh(z)
elif self.AF == "sigmoid":
return 1 / (1 + np.exp(-z))
elif self.AF == "GRBF":
return np.exp(-(z**2))
elif self.AF == "xGRBF":
return z * np.exp(-(z**2))
def predict(self, X):
z0 = np.matmul(self.W0, X) + self.b0
a0 = self.activation(z0)
z = np.matmul(self.W1, a0) + self.b1
phi = self.activation(z)
z_prime = np.matmul(np.transpose(phi, axes=(0, 2, 1)), self.theta) + self.theta_0
y_hat = np.sign(z_prime).flatten()
return y_hat, z_prime.flatten()
def forward(self, x):
self.z0 = np.matmul(self.W0, x) + self.b0
if self.AF == "tanh":
self.a0 = np.tanh(self.z0)
elif self.AF == "sigmoid":
self.a0 = 1 / (1 + np.exp(-self.z0))
elif self.AF == "GRBF":
self.a0 = np.exp(-(self.z0**2))
elif self.AF == "xGRBF":
self.a0 = self.z0 * np.exp(-(self.z0**2))
self.z = np.matmul(self.W1, self.a0) + self.b1
if self.AF == "tanh":
self.phi = np.tanh(self.z)
elif self.AF == "sigmoid":
self.phi = 1 / (1 + np.exp(-self.z))
elif self.AF == "GRBF":
self.phi = np.exp(-(self.z**2))
elif self.AF == "xGRBF":
self.phi = self.z * np.exp(-(self.z**2))
self.z_prime = np.matmul(np.transpose(self.phi, axes=(0, 2, 1)), self.theta) + self.theta_0
self.z_prime = self.z_prime[:, 0, 0]
return self.z_prime
def fit(self, X, y, X_test, y_test, seed=0, epochs=100, lr=1e-3, Lambda=1e-3,
beta1=0.9, beta2=0.999, eps=1e-8, alpha=1e-6, batch_size=128):
np.random.seed(seed)
n_samples = X.shape[0]
best_accuracy = 0
m_b0 = np.zeros_like(self.b0)
v_b0 = np.zeros_like(self.b0)
m_W0 = np.zeros_like(self.W0)
v_W0 = np.zeros_like(self.W0)
g_b0 = np.zeros_like(self.b0)
g_W0 = np.zeros_like(self.W0)
self.alpha_b0 = np.ones_like(self.b0) * lr
self.alpha_W0 = np.ones_like(self.W0) * lr
m_b1 = np.zeros_like(self.b1)
v_b1 = np.zeros_like(self.b1)
m_W1 = np.zeros_like(self.W1)
v_W1 = np.zeros_like(self.W1)
num_batches = n_samples // batch_size
batch_remaining = int(n_samples - num_batches * batch_size)
k = 0 # contador para aprendizaje adaptativo
print("Modelo listo para entrenamiento con", epochs, "茅pocas")
for epoch in range(epochs):
print("Epoch: ", epoch + 1)
index = np.random.permutation(n_samples)
for i in range(num_batches + 1):
if i != num_batches:
idx = index[i * batch_size:(i + 1) * batch_size]
else:
# En el 煤ltimo batch tomar los restantes
idx = index[i * batch_size:i * batch_size + batch_remaining]
x_i = X[idx]
y_i = y[idx]
# Forward pass
self.forward(x_i)
# C谩lculo gate matrix para el margen del SVM
gate_matrix = np.ones((x_i.shape[0], 1, 1))
gate_matrix[self.z_prime * (y_i.flatten()) > 1] = 0
# Gradientes
self.dtheta = -self.phi * y_i[:, None, None]
self.dtheta_0 = -y_i
self.dphi = -self.theta * y_i[:, None, None]
if self.AF == "tanh":
self.dz = (1 - (self.phi ** 2)) * self.dphi
elif self.AF == "sigmoid":
self.dz = self.phi * (1 - self.phi) * self.dphi
elif self.AF == "GRBF":
self.dz = (self.phi * (-2 * self.z)) * self.dphi
elif self.AF == "xGRBF":
self.dz = (self.phi / self.z + self.phi * (-2 * self.z)) * self.dphi
self.db1 = self.dz
self.dW1 = np.matmul(self.dz, np.transpose(self.a0, axes=(0, 2, 1)))
self.da0 = np.matmul(self.W1.T, self.dz)
if self.AF == "tanh":
self.dz0 = (1 - (self.a0 ** 2)) * self.da0
elif self.AF == "sigmoid":
self.dz0 = self.a0 * (1 - self.a0) * self.da0
elif self.AF == "GRBF":
self.dz0 = (self.a0 * (-2 * self.z0)) * self.da0
elif self.AF == "xGRBF":
self.dz0 = (self.a0 / self.z0 + self.a0 * (-2 * self.z0)) * self.da0
self.db0 = self.dz0
self.dW0 = np.matmul(self.dz0, np.transpose(x_i, axes=(0, 2, 1)))
n = x_i.shape[0]
# Actualizaci贸n par谩metros theta (SVM)
self.theta = self.theta - lr * ((1 / n * gate_matrix * self.dtheta).sum(axis=0) + Lambda * self.theta)
self.theta_0 = self.theta_0 - lr * (1 / n * gate_matrix * self.dtheta_0[:, None, None]).sum()
#self.theta_0 = self.theta_0 - lr * (1 / n * gate_matrix * self.dtheta_0).sum(axis=0)
# Promedio gradientes para Adam (capa 1)
self.db1 = (1 / n * gate_matrix * self.db1).sum(axis=0)
self.dW1 = (1 / n * gate_matrix * self.dW1).sum(axis=0)
# Adam actualizaci贸n para b1
m_b1 = beta1 * m_b1 + (1 - beta1) * self.db1
v_b1 = beta2 * v_b1 + (1 - beta2) * (self.db1 ** 2)
m_b1_hat = m_b1 / (1 - beta1 ** (k + 1))
v_b1_hat = v_b1 / (1 - beta2 ** (k + 1))
self.b1 = self.b1 - lr * m_b1_hat / (eps + np.sqrt(v_b1_hat))
# Adam actualizaci贸n para W1
m_W1 = beta1 * m_W1 + (1 - beta1) * self.dW1
v_W1 = beta2 * v_W1 + (1 - beta2) * (self.dW1 ** 2)
m_W1_hat = m_W1 / (1 - beta1 ** (k + 1))
v_W1_hat = v_W1 / (1 - beta2 ** (k + 1))
self.W1 = self.W1 - lr * m_W1_hat / (eps + np.sqrt(v_W1_hat))
# Learning rate adaptativo y actualizaci贸n para b0, W0 (primera capa)
g_new_b0 = self.db0.sum(axis=0) # suma sobre batches para reducir dimensiones
g_new_W0 = self.dW0.sum(axis=0)
self.alpha_b0 = self.alpha_b0 + alpha * (g_b0 * g_new_b0)
self.alpha_W0 = self.alpha_W0 + alpha * (g_W0 * g_new_W0)
g_b0 = g_new_b0
g_W0 = g_new_W0
m_b0 = beta1 * m_b0 + (1 - beta1) * self.db0.sum(axis=0)
v_b0 = beta2 * v_b0 + (1 - beta2) * (self.db0.sum(axis=0) ** 2)
m_b0_hat = m_b0 / (1 - beta1 ** (k + 1))
v_b0_hat = v_b0 / (1 - beta2 ** (k + 1))
self.b0 = self.b0 - self.alpha_b0 * m_b0_hat / (eps + np.sqrt(v_b0_hat))
m_W0 = beta1 * m_W0 + (1 - beta1) * self.dW0.sum(axis=0)
v_W0 = beta2 * v_W0 + (1 - beta2) * (self.dW0.sum(axis=0) ** 2)
m_W0_hat = m_W0 / (1 - beta1 ** (k + 1))
v_W0_hat = v_W0 / (1 - beta2 ** (k + 1))
self.W0 = self.W0 - self.alpha_W0 * m_W0_hat / (eps + np.sqrt(v_W0_hat))
k += 1
class NNSVM_Multiclass:
"""
Implementaci贸n de una Red Neuronal seguida de una SVM multiclase.
Esta versi贸n est谩 refactorizada para usar matrices 2D (batch_size, features),
lo que simplifica enormemente el c贸digo y sigue las convenciones est谩ndar.
"""
def __init__(self, input_dim, n, d, n_classes, AF="tanh", seed=0):
self.AF = AF
np.random.seed(seed)
# Inicializaci贸n de pesos (Xavier/Glorot)
# Capa 1 (Entrada -> Oculta)
limit_W0 = np.sqrt(6 / (input_dim + n))
self.W0 = np.random.uniform(-limit_W0, limit_W0, size=(input_dim, n))
self.b0 = np.zeros((1, n)) # (1, n) para broadcasting
# Capa 2 (Oculta -> Salida de la Red)
limit_W1 = np.sqrt(6 / (n + d))
self.W1 = np.random.uniform(-limit_W1, limit_W1, size=(n, d))
self.b1 = np.zeros((1, d)) # (1, d) para broadcasting
# Capa 3 (Par谩metros de la SVM)
self.theta = np.zeros((d, n_classes))
self.theta_0 = np.zeros((1, n_classes)) # (1, n_classes) para broadcasting
def activation(self, z):
if self.AF == "tanh":
return np.tanh(z)
# Se pueden a帽adir otras funciones de activaci贸n si es necesario
def activation_derivative(self, a):
# Derivada en funci贸n de la salida de la activaci贸n 'a'
if self.AF == "tanh":
return 1 - a**2
def forward(self, x):
# Flujo de datos a trav茅s de la red (propagaci贸n hacia adelante)
self.x = x
self.z0 = np.matmul(self.x, self.W0) + self.b0
self.a0 = self.activation(self.z0)
self.z1 = np.matmul(self.a0, self.W1) + self.b1
self.phi = self.activation(self.z1) # Esta es la salida de la NN (features para la SVM)
# Salida final del modelo (puntuaciones de la SVM)
self.z_prime = np.matmul(self.phi, self.theta) + self.theta_0
return self.z_prime
def predict(self, x):
scores = self.forward(x)
# La predicci贸n es la clase con la puntuaci贸n m谩s alta
return np.argmax(scores, axis=1)
def fit(self, X, y_ohe, X_val, y_val_ohe, epochs=100, lr=1e-3, Lambda=1e-3,
beta1=0.9, beta2=0.999, eps=1e-8, batch_size=128):
n_samples = X.shape[0]
# Inicializaci贸n del optimizador Adam para cada par谩metro
m_W0, v_W0 = np.zeros_like(self.W0), np.zeros_like(self.W0)
m_b0, v_b0 = np.zeros_like(self.b0), np.zeros_like(self.b0)
m_W1, v_W1 = np.zeros_like(self.W1), np.zeros_like(self.W1)
m_b1, v_b1 = np.zeros_like(self.b1), np.zeros_like(self.b1)
k = 0 # Contador de iteraciones de Adam
print(f"Iniciando entrenamiento por {epochs} 茅pocas...")
for epoch in range(epochs):
# Mezclar los datos en cada 茅poca
permutation = np.random.permutation(n_samples)
X_shuffled = X[permutation]
y_shuffled = y_ohe[permutation]
for i in range(0, n_samples, batch_size):
# Tomar un lote (batch) de datos
x_batch = X_shuffled[i:i+batch_size]
y_batch = y_shuffled[i:i+batch_size]
batch_n = x_batch.shape[0]
# 1. Forward pass (C谩lculo de la salida)
z_prime = self.forward(x_batch)
# 2. C谩lculo de la p茅rdida y su gradiente (Hinge Loss Multiclase)
true_class_scores = np.sum(z_prime * y_batch, axis=1, keepdims=True)
margins = np.maximum(0, 1 + z_prime - true_class_scores)
margins[y_batch == 1] = 0
# Gradiente de la p茅rdida respecto a z_prime
loss_grad = (margins > 0).astype(float)
correct_class_indices = np.argmax(y_batch, axis=1)
loss_grad[np.arange(batch_n), correct_class_indices] = -np.sum(loss_grad, axis=1)
# 3. Backward pass (C谩lculo de gradientes para cada capa)
# Gradientes de la capa SVM
dtheta = np.matmul(self.phi.T, loss_grad)
dtheta_0 = np.sum(loss_grad, axis=0, keepdims=True)
# Propagar gradiente hacia atr谩s
dphi = np.matmul(loss_grad, self.theta.T)
dz1 = dphi * self.activation_derivative(self.phi)
dW1 = np.matmul(self.a0.T, dz1)
db1 = np.sum(dz1, axis=0, keepdims=True)
da0 = np.matmul(dz1, self.W1.T)
dz0 = da0 * self.activation_derivative(self.a0)
dW0 = np.matmul(self.x.T, dz0)
db0 = np.sum(dz0, axis=0, keepdims=True)
# 4. Actualizaci贸n de par谩metros
# **CORRECCI脫N CLAVE**: Dividimos los gradientes por el tama帽o del lote (batch_n)
# para obtener el promedio, en lugar de la suma.
# El error original estaba aqu铆, usando la suma impl铆cita.
# Actualizaci贸n de la capa SVM (con regularizaci贸n L2)
self.theta -= lr * (dtheta / batch_n + Lambda * self.theta)
self.theta_0 -= lr * (dtheta_0 / batch_n)
# Actualizaci贸n de la Red Neuronal (con optimizador Adam)
k += 1
# --- Actualizaci贸n de W1, b1 ---
m_W1 = beta1 * m_W1 + (1 - beta1) * (dW1 / batch_n)
v_W1 = beta2 * v_W1 + (1 - beta2) * ((dW1 / batch_n)**2)
m_W1_hat = m_W1 / (1 - beta1**k)
v_W1_hat = v_W1 / (1 - beta2**k)
self.W1 -= lr * m_W1_hat / (np.sqrt(v_W1_hat) + eps)
m_b1 = beta1 * m_b1 + (1 - beta1) * (db1 / batch_n)
v_b1 = beta2 * v_b1 + (1 - beta2) * ((db1 / batch_n)**2)
m_b1_hat = m_b1 / (1 - beta1**k)
v_b1_hat = v_b1 / (1 - beta2**k)
self.b1 -= lr * m_b1_hat / (np.sqrt(v_b1_hat) + eps)
# --- Actualizaci贸n de W0, b0 ---
m_W0 = beta1 * m_W0 + (1 - beta1) * (dW0 / batch_n)
v_W0 = beta2 * v_W0 + (1 - beta2) * ((dW0 / batch_n)**2)
m_W0_hat = m_W0 / (1 - beta1**k)
v_W0_hat = v_W0 / (1 - beta2**k)
self.W0 -= lr * m_W0_hat / (np.sqrt(v_W0_hat) + eps)
m_b0 = beta1 * m_b0 + (1 - beta1) * (db0 / batch_n)
v_b0 = beta2 * v_b0 + (1 - beta2) * ((db0 / batch_n)**2)
m_b0_hat = m_b0 / (1 - beta1**k)
v_b0_hat = v_b0 / (1 - beta2**k)
self.b0 -= lr * m_b0_hat / (np.sqrt(v_b0_hat) + eps)