Spaces:
Build error
Build error
| # %% | |
| import os | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| from sklearn.model_selection import train_test_split | |
| import joblib | |
| # Paso 1: Cargar la base de datos desde la carpeta "Data select" | |
| moods = os.listdir("Data select") | |
| list_of_dfs = [] | |
| for mood in moods: | |
| fns = os.listdir(f"Data select/{mood}") | |
| for fn in fns: | |
| df = pd.read_excel(f"Data select/{mood}/{fn}", sheet_name="Complete Data Set") | |
| # Asegurar que no se duplique la fila de nombres | |
| if not df.columns[0].startswith("Time"): | |
| df.columns = df.iloc[0] | |
| df = df.drop(index=0) | |
| # Seleccionar las columnas y crear columnas derivadas | |
| df = df[['FWC', 'FWE', 'VC', 'VE', 'TCA', 'TSI', 'TO_sump', 'TO_feed', | |
| 'PO_net', 'PO_feed', 'TRC_sub', 'TEI', 'TEO', 'TCO', 'TCI']] | |
| df['Label'] = mood | |
| df['TEI-TEO'] = df['TEI'].astype(float) - df['TEO'].astype(float) | |
| df['TCO-TCI'] = df['TCO'].astype(float) - df['TCI'].astype(float) | |
| list_of_dfs.append(df) | |
| # Paso 2: Concatenar todos los datos | |
| df = pd.concat(list_of_dfs, ignore_index=True) | |
| df.fillna(method='ffill', inplace=True) | |
| # Paso 3: Definición de variables de entrada y salida | |
| X = np.array(df[['FWC', 'FWE', 'VC', 'VE', 'TCA', 'TSI', 'TO_sump', | |
| 'TO_feed', 'PO_net', 'PO_feed', 'TRC_sub', 'TEI-TEO', 'TCO-TCI']]).astype(float) | |
| y = np.array(df["Label"]) | |
| # Paso 4: División de los datos (train, val, test) | |
| X_trainval, X_test, y_trainval, y_test = train_test_split(X, y, test_size=0.2, random_state=0) | |
| X_train, X_val, y_train, y_val = train_test_split(X_trainval, y_trainval, test_size=0.25, random_state=0) | |
| # Paso 5: Estandarización manual | |
| mu = X_train.mean(axis=0) | |
| std = X_train.std(axis=0) | |
| X_train_p = (X_train - mu) / std | |
| X_val_p = (X_val - mu) / std | |
| X_test_p = (X_test - mu) / std | |
| # Paso 6: Expandir dimensión para compatibilidad futura | |
| X_train_p = np.expand_dims(X_train_p, axis=2) | |
| X_val_p = np.expand_dims(X_val_p, axis=2) | |
| X_test_p = np.expand_dims(X_test_p, axis=2) | |
| # Paso 7: Codificación de etiquetas binaria (Normal = 1, Falla = -1) | |
| normal_index = y_train == 'Normal' | |
| fault_index = y_train != 'Normal' | |
| y_train_p = y_train.copy() | |
| y_train_p[normal_index] = 1 | |
| y_train_p[fault_index] = -1 | |
| y_train_p = y_train_p.astype(int) | |
| normal_index = y_val == 'Normal' | |
| fault_index = y_val != 'Normal' | |
| y_val_p = y_val.copy() | |
| y_val_p[normal_index] = 1 | |
| y_val_p[fault_index] = -1 | |
| y_val_p = y_val_p.astype(int) | |
| normal_index = y_test == 'Normal' | |
| fault_index = y_test != 'Normal' | |
| y_test_p = y_test.copy() | |
| y_test_p[normal_index] = 1 | |
| y_test_p[fault_index] = -1 | |
| y_test_p = y_test_p.astype(int) | |
| # Paso 8: Definición de la red neuronal + SVM manual sin cupy | |
| class NNSVM: | |
| def __init__(self, input_dim, n, d, AF="tanh", seed=0): | |
| self.AF = AF | |
| self.input_dim = input_dim | |
| self.n = n | |
| np.random.seed(seed) | |
| limit = np.sqrt(6 / (input_dim + n)) | |
| self.W0 = np.random.uniform(-limit, limit, size=(n, input_dim)) | |
| self.b0 = np.zeros((n, 1)) | |
| limit = np.sqrt(6 / (n + d)) | |
| self.W1 = np.random.uniform(-limit, limit, size=(d, n)) | |
| self.b1 = np.zeros((d, 1)) | |
| self.theta = np.zeros((d, 1)) | |
| self.theta_0 = 0.0 | |
| def activation(self, z): | |
| if self.AF == "tanh": | |
| return np.tanh(z) | |
| elif self.AF == "sigmoid": | |
| return 1 / (1 + np.exp(-z)) | |
| elif self.AF == "GRBF": | |
| return np.exp(-(z**2)) | |
| elif self.AF == "xGRBF": | |
| return z * np.exp(-(z**2)) | |
| def predict(self, X): | |
| z0 = np.matmul(self.W0, X) + self.b0 | |
| a0 = self.activation(z0) | |
| z = np.matmul(self.W1, a0) + self.b1 | |
| phi = self.activation(z) | |
| z_prime = np.matmul(np.transpose(phi, axes=(0, 2, 1)), self.theta) + self.theta_0 | |
| y_hat = np.sign(z_prime).flatten() | |
| return y_hat, z_prime.flatten() | |
| def forward(self, x): | |
| self.z0 = np.matmul(self.W0, x) + self.b0 | |
| if self.AF == "tanh": | |
| self.a0 = np.tanh(self.z0) | |
| elif self.AF == "sigmoid": | |
| self.a0 = 1 / (1 + np.exp(-self.z0)) | |
| elif self.AF == "GRBF": | |
| self.a0 = np.exp(-(self.z0**2)) | |
| elif self.AF == "xGRBF": | |
| self.a0 = self.z0 * np.exp(-(self.z0**2)) | |
| self.z = np.matmul(self.W1, self.a0) + self.b1 | |
| if self.AF == "tanh": | |
| self.phi = np.tanh(self.z) | |
| elif self.AF == "sigmoid": | |
| self.phi = 1 / (1 + np.exp(-self.z)) | |
| elif self.AF == "GRBF": | |
| self.phi = np.exp(-(self.z**2)) | |
| elif self.AF == "xGRBF": | |
| self.phi = self.z * np.exp(-(self.z**2)) | |
| self.z_prime = np.matmul(np.transpose(self.phi, axes=(0, 2, 1)), self.theta) + self.theta_0 | |
| self.z_prime = self.z_prime[:, 0, 0] | |
| return self.z_prime | |
| def fit(self, X, y, X_test, y_test, seed=0, epochs=100, lr=1e-3, Lambda=1e-3, | |
| beta1=0.9, beta2=0.999, eps=1e-8, alpha=1e-6, batch_size=128): | |
| np.random.seed(seed) | |
| n_samples = X.shape[0] | |
| best_accuracy = 0 | |
| m_b0 = np.zeros_like(self.b0) | |
| v_b0 = np.zeros_like(self.b0) | |
| m_W0 = np.zeros_like(self.W0) | |
| v_W0 = np.zeros_like(self.W0) | |
| g_b0 = np.zeros_like(self.b0) | |
| g_W0 = np.zeros_like(self.W0) | |
| self.alpha_b0 = np.ones_like(self.b0) * lr | |
| self.alpha_W0 = np.ones_like(self.W0) * lr | |
| m_b1 = np.zeros_like(self.b1) | |
| v_b1 = np.zeros_like(self.b1) | |
| m_W1 = np.zeros_like(self.W1) | |
| v_W1 = np.zeros_like(self.W1) | |
| num_batches = n_samples // batch_size | |
| batch_remaining = int(n_samples - num_batches * batch_size) | |
| k = 0 # contador para aprendizaje adaptativo | |
| print("Modelo listo para entrenamiento con", epochs, "épocas") | |
| for epoch in range(epochs): | |
| print("Epoch: ", epoch + 1) | |
| index = np.random.permutation(n_samples) | |
| for i in range(num_batches + 1): | |
| if i != num_batches: | |
| idx = index[i * batch_size:(i + 1) * batch_size] | |
| else: | |
| # En el último batch tomar los restantes | |
| idx = index[i * batch_size:i * batch_size + batch_remaining] | |
| x_i = X[idx] | |
| y_i = y[idx] | |
| # Forward pass | |
| self.forward(x_i) | |
| # Cálculo gate matrix para el margen del SVM | |
| gate_matrix = np.ones((x_i.shape[0], 1, 1)) | |
| gate_matrix[self.z_prime * (y_i.flatten()) > 1] = 0 | |
| # Gradientes | |
| self.dtheta = -self.phi * y_i[:, None, None] | |
| self.dtheta_0 = -y_i | |
| self.dphi = -self.theta * y_i[:, None, None] | |
| if self.AF == "tanh": | |
| self.dz = (1 - (self.phi ** 2)) * self.dphi | |
| elif self.AF == "sigmoid": | |
| self.dz = self.phi * (1 - self.phi) * self.dphi | |
| elif self.AF == "GRBF": | |
| self.dz = (self.phi * (-2 * self.z)) * self.dphi | |
| elif self.AF == "xGRBF": | |
| self.dz = (self.phi / self.z + self.phi * (-2 * self.z)) * self.dphi | |
| self.db1 = self.dz | |
| self.dW1 = np.matmul(self.dz, np.transpose(self.a0, axes=(0, 2, 1))) | |
| self.da0 = np.matmul(self.W1.T, self.dz) | |
| if self.AF == "tanh": | |
| self.dz0 = (1 - (self.a0 ** 2)) * self.da0 | |
| elif self.AF == "sigmoid": | |
| self.dz0 = self.a0 * (1 - self.a0) * self.da0 | |
| elif self.AF == "GRBF": | |
| self.dz0 = (self.a0 * (-2 * self.z0)) * self.da0 | |
| elif self.AF == "xGRBF": | |
| self.dz0 = (self.a0 / self.z0 + self.a0 * (-2 * self.z0)) * self.da0 | |
| self.db0 = self.dz0 | |
| self.dW0 = np.matmul(self.dz0, np.transpose(x_i, axes=(0, 2, 1))) | |
| n = x_i.shape[0] | |
| # Actualización parámetros theta (SVM) | |
| self.theta = self.theta - lr * ((1 / n * gate_matrix * self.dtheta).sum(axis=0) + Lambda * self.theta) | |
| self.theta_0 = self.theta_0 - lr * (1 / n * gate_matrix * self.dtheta_0[:, None, None]).sum() | |
| #self.theta_0 = self.theta_0 - lr * (1 / n * gate_matrix * self.dtheta_0).sum(axis=0) | |
| # Promedio gradientes para Adam (capa 1) | |
| self.db1 = (1 / n * gate_matrix * self.db1).sum(axis=0) | |
| self.dW1 = (1 / n * gate_matrix * self.dW1).sum(axis=0) | |
| # Adam actualización para b1 | |
| m_b1 = beta1 * m_b1 + (1 - beta1) * self.db1 | |
| v_b1 = beta2 * v_b1 + (1 - beta2) * (self.db1 ** 2) | |
| m_b1_hat = m_b1 / (1 - beta1 ** (k + 1)) | |
| v_b1_hat = v_b1 / (1 - beta2 ** (k + 1)) | |
| self.b1 = self.b1 - lr * m_b1_hat / (eps + np.sqrt(v_b1_hat)) | |
| # Adam actualización para W1 | |
| m_W1 = beta1 * m_W1 + (1 - beta1) * self.dW1 | |
| v_W1 = beta2 * v_W1 + (1 - beta2) * (self.dW1 ** 2) | |
| m_W1_hat = m_W1 / (1 - beta1 ** (k + 1)) | |
| v_W1_hat = v_W1 / (1 - beta2 ** (k + 1)) | |
| self.W1 = self.W1 - lr * m_W1_hat / (eps + np.sqrt(v_W1_hat)) | |
| # Learning rate adaptativo y actualización para b0, W0 (primera capa) | |
| g_new_b0 = self.db0.sum(axis=0) # suma sobre batches para reducir dimensiones | |
| g_new_W0 = self.dW0.sum(axis=0) | |
| self.alpha_b0 = self.alpha_b0 + alpha * (g_b0 * g_new_b0) | |
| self.alpha_W0 = self.alpha_W0 + alpha * (g_W0 * g_new_W0) | |
| g_b0 = g_new_b0 | |
| g_W0 = g_new_W0 | |
| m_b0 = beta1 * m_b0 + (1 - beta1) * self.db0.sum(axis=0) | |
| v_b0 = beta2 * v_b0 + (1 - beta2) * (self.db0.sum(axis=0) ** 2) | |
| m_b0_hat = m_b0 / (1 - beta1 ** (k + 1)) | |
| v_b0_hat = v_b0 / (1 - beta2 ** (k + 1)) | |
| self.b0 = self.b0 - self.alpha_b0 * m_b0_hat / (eps + np.sqrt(v_b0_hat)) | |
| m_W0 = beta1 * m_W0 + (1 - beta1) * self.dW0.sum(axis=0) | |
| v_W0 = beta2 * v_W0 + (1 - beta2) * (self.dW0.sum(axis=0) ** 2) | |
| m_W0_hat = m_W0 / (1 - beta1 ** (k + 1)) | |
| v_W0_hat = v_W0 / (1 - beta2 ** (k + 1)) | |
| self.W0 = self.W0 - self.alpha_W0 * m_W0_hat / (eps + np.sqrt(v_W0_hat)) | |
| k += 1 | |
| # Paso 9: Crear el modelo con la arquitectura de la tesis | |
| input_dim = X_train_p.shape[1] # 13 entradas | |
| hidden_neurons = 500 # una sola capa oculta de 500 neuronas | |
| output_dim = 500 # una sola salida (SVM) | |
| model = NNSVM(input_dim, hidden_neurons, output_dim, AF="tanh", seed=0) | |
| # %% | |
| # Paso 10: Entrenar el modelo con los parámetros de la tesis | |
| model.fit(X_train_p, y_train_p, X_val_p, y_val_p, | |
| epochs=10, # 300 épocas | |
| lr=1e-3, # tasa de aprendizaje base | |
| Lambda=1e-3, # regularización para SVM | |
| beta1=0.9, | |
| beta2=0.999, | |
| eps=1e-8, | |
| alpha=1e-6, # factor de ajuste del lr adaptativo | |
| batch_size=128) | |
| # %% | |
| from sklearn.metrics import accuracy_score, confusion_matrix, classification_report | |
| import seaborn as sns | |
| # Paso 11: Predicciones en test | |
| y_pred, _ = model.predict(X_test_p) | |
| y_pred = y_pred.astype(int) | |
| # Paso 12: Métricas | |
| acc = accuracy_score(y_test_p, y_pred) | |
| print(f"\n✅ Accuracy en test: {acc:.4f}") | |
| print("\n📊 Reporte de clasificación:") | |
| print(classification_report(y_test_p, y_pred, target_names=["Falla", "Normal"])) | |
| # Paso 13: Matriz de confusión visual | |
| cm = confusion_matrix(y_test_p, y_pred) | |
| sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=["Falla", "Normal"], yticklabels=["Falla", "Normal"]) | |
| plt.title("Matriz de Confusión") | |
| plt.xlabel("Predicción") | |
| plt.ylabel("Real") | |
| plt.show() | |
| # %% | |
| # Paso 14 (Opcional): Guardar el modelo entrenado y los parámetros de normalización | |
| print("Guardando el modelo y los parámetros de normalización...") | |
| # Guardar el modelo binario de clasificación | |
| joblib.dump(model, 'modelo_clasificacion_binaria.pkl') | |
| print("Modelo de clasificación binaria guardado como 'modelo_clasificacion_binaria.pkl'") | |
| # Guardar los parámetros de normalización (mu y std) | |
| # Asumiendo que mu y std se calcularon en un paso anterior (ej. en el Paso 6 de tu script) | |
| # Si no están disponibles en este scope, asegúrate de pasarlos o hacerlos accesibles. | |
| joblib.dump(mu, 'parametros_normalizacion_mu.pkl') | |
| joblib.dump(std, 'parametros_normalizacion_std.pkl') | |
| print("Parámetros de normalización (mu, std) guardados como 'parametros_normalizacion_mu.pkl' y 'parametros_normalizacion_std.pkl'") | |
| # %% |