faultdetectionchillers / binary_classification.py
DevNumb's picture
Upload 4 files
d3cb5c7 verified
# %%
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import joblib
# Paso 1: Cargar la base de datos desde la carpeta "Data select"
moods = os.listdir("Data select")
list_of_dfs = []
for mood in moods:
fns = os.listdir(f"Data select/{mood}")
for fn in fns:
df = pd.read_excel(f"Data select/{mood}/{fn}", sheet_name="Complete Data Set")
# Asegurar que no se duplique la fila de nombres
if not df.columns[0].startswith("Time"):
df.columns = df.iloc[0]
df = df.drop(index=0)
# Seleccionar las columnas y crear columnas derivadas
df = df[['FWC', 'FWE', 'VC', 'VE', 'TCA', 'TSI', 'TO_sump', 'TO_feed',
'PO_net', 'PO_feed', 'TRC_sub', 'TEI', 'TEO', 'TCO', 'TCI']]
df['Label'] = mood
df['TEI-TEO'] = df['TEI'].astype(float) - df['TEO'].astype(float)
df['TCO-TCI'] = df['TCO'].astype(float) - df['TCI'].astype(float)
list_of_dfs.append(df)
# Paso 2: Concatenar todos los datos
df = pd.concat(list_of_dfs, ignore_index=True)
df.fillna(method='ffill', inplace=True)
# Paso 3: Definición de variables de entrada y salida
X = np.array(df[['FWC', 'FWE', 'VC', 'VE', 'TCA', 'TSI', 'TO_sump',
'TO_feed', 'PO_net', 'PO_feed', 'TRC_sub', 'TEI-TEO', 'TCO-TCI']]).astype(float)
y = np.array(df["Label"])
# Paso 4: División de los datos (train, val, test)
X_trainval, X_test, y_trainval, y_test = train_test_split(X, y, test_size=0.2, random_state=0)
X_train, X_val, y_train, y_val = train_test_split(X_trainval, y_trainval, test_size=0.25, random_state=0)
# Paso 5: Estandarización manual
mu = X_train.mean(axis=0)
std = X_train.std(axis=0)
X_train_p = (X_train - mu) / std
X_val_p = (X_val - mu) / std
X_test_p = (X_test - mu) / std
# Paso 6: Expandir dimensión para compatibilidad futura
X_train_p = np.expand_dims(X_train_p, axis=2)
X_val_p = np.expand_dims(X_val_p, axis=2)
X_test_p = np.expand_dims(X_test_p, axis=2)
# Paso 7: Codificación de etiquetas binaria (Normal = 1, Falla = -1)
normal_index = y_train == 'Normal'
fault_index = y_train != 'Normal'
y_train_p = y_train.copy()
y_train_p[normal_index] = 1
y_train_p[fault_index] = -1
y_train_p = y_train_p.astype(int)
normal_index = y_val == 'Normal'
fault_index = y_val != 'Normal'
y_val_p = y_val.copy()
y_val_p[normal_index] = 1
y_val_p[fault_index] = -1
y_val_p = y_val_p.astype(int)
normal_index = y_test == 'Normal'
fault_index = y_test != 'Normal'
y_test_p = y_test.copy()
y_test_p[normal_index] = 1
y_test_p[fault_index] = -1
y_test_p = y_test_p.astype(int)
# Paso 8: Definición de la red neuronal + SVM manual sin cupy
class NNSVM:
def __init__(self, input_dim, n, d, AF="tanh", seed=0):
self.AF = AF
self.input_dim = input_dim
self.n = n
np.random.seed(seed)
limit = np.sqrt(6 / (input_dim + n))
self.W0 = np.random.uniform(-limit, limit, size=(n, input_dim))
self.b0 = np.zeros((n, 1))
limit = np.sqrt(6 / (n + d))
self.W1 = np.random.uniform(-limit, limit, size=(d, n))
self.b1 = np.zeros((d, 1))
self.theta = np.zeros((d, 1))
self.theta_0 = 0.0
def activation(self, z):
if self.AF == "tanh":
return np.tanh(z)
elif self.AF == "sigmoid":
return 1 / (1 + np.exp(-z))
elif self.AF == "GRBF":
return np.exp(-(z**2))
elif self.AF == "xGRBF":
return z * np.exp(-(z**2))
def predict(self, X):
z0 = np.matmul(self.W0, X) + self.b0
a0 = self.activation(z0)
z = np.matmul(self.W1, a0) + self.b1
phi = self.activation(z)
z_prime = np.matmul(np.transpose(phi, axes=(0, 2, 1)), self.theta) + self.theta_0
y_hat = np.sign(z_prime).flatten()
return y_hat, z_prime.flatten()
def forward(self, x):
self.z0 = np.matmul(self.W0, x) + self.b0
if self.AF == "tanh":
self.a0 = np.tanh(self.z0)
elif self.AF == "sigmoid":
self.a0 = 1 / (1 + np.exp(-self.z0))
elif self.AF == "GRBF":
self.a0 = np.exp(-(self.z0**2))
elif self.AF == "xGRBF":
self.a0 = self.z0 * np.exp(-(self.z0**2))
self.z = np.matmul(self.W1, self.a0) + self.b1
if self.AF == "tanh":
self.phi = np.tanh(self.z)
elif self.AF == "sigmoid":
self.phi = 1 / (1 + np.exp(-self.z))
elif self.AF == "GRBF":
self.phi = np.exp(-(self.z**2))
elif self.AF == "xGRBF":
self.phi = self.z * np.exp(-(self.z**2))
self.z_prime = np.matmul(np.transpose(self.phi, axes=(0, 2, 1)), self.theta) + self.theta_0
self.z_prime = self.z_prime[:, 0, 0]
return self.z_prime
def fit(self, X, y, X_test, y_test, seed=0, epochs=100, lr=1e-3, Lambda=1e-3,
beta1=0.9, beta2=0.999, eps=1e-8, alpha=1e-6, batch_size=128):
np.random.seed(seed)
n_samples = X.shape[0]
best_accuracy = 0
m_b0 = np.zeros_like(self.b0)
v_b0 = np.zeros_like(self.b0)
m_W0 = np.zeros_like(self.W0)
v_W0 = np.zeros_like(self.W0)
g_b0 = np.zeros_like(self.b0)
g_W0 = np.zeros_like(self.W0)
self.alpha_b0 = np.ones_like(self.b0) * lr
self.alpha_W0 = np.ones_like(self.W0) * lr
m_b1 = np.zeros_like(self.b1)
v_b1 = np.zeros_like(self.b1)
m_W1 = np.zeros_like(self.W1)
v_W1 = np.zeros_like(self.W1)
num_batches = n_samples // batch_size
batch_remaining = int(n_samples - num_batches * batch_size)
k = 0 # contador para aprendizaje adaptativo
print("Modelo listo para entrenamiento con", epochs, "épocas")
for epoch in range(epochs):
print("Epoch: ", epoch + 1)
index = np.random.permutation(n_samples)
for i in range(num_batches + 1):
if i != num_batches:
idx = index[i * batch_size:(i + 1) * batch_size]
else:
# En el último batch tomar los restantes
idx = index[i * batch_size:i * batch_size + batch_remaining]
x_i = X[idx]
y_i = y[idx]
# Forward pass
self.forward(x_i)
# Cálculo gate matrix para el margen del SVM
gate_matrix = np.ones((x_i.shape[0], 1, 1))
gate_matrix[self.z_prime * (y_i.flatten()) > 1] = 0
# Gradientes
self.dtheta = -self.phi * y_i[:, None, None]
self.dtheta_0 = -y_i
self.dphi = -self.theta * y_i[:, None, None]
if self.AF == "tanh":
self.dz = (1 - (self.phi ** 2)) * self.dphi
elif self.AF == "sigmoid":
self.dz = self.phi * (1 - self.phi) * self.dphi
elif self.AF == "GRBF":
self.dz = (self.phi * (-2 * self.z)) * self.dphi
elif self.AF == "xGRBF":
self.dz = (self.phi / self.z + self.phi * (-2 * self.z)) * self.dphi
self.db1 = self.dz
self.dW1 = np.matmul(self.dz, np.transpose(self.a0, axes=(0, 2, 1)))
self.da0 = np.matmul(self.W1.T, self.dz)
if self.AF == "tanh":
self.dz0 = (1 - (self.a0 ** 2)) * self.da0
elif self.AF == "sigmoid":
self.dz0 = self.a0 * (1 - self.a0) * self.da0
elif self.AF == "GRBF":
self.dz0 = (self.a0 * (-2 * self.z0)) * self.da0
elif self.AF == "xGRBF":
self.dz0 = (self.a0 / self.z0 + self.a0 * (-2 * self.z0)) * self.da0
self.db0 = self.dz0
self.dW0 = np.matmul(self.dz0, np.transpose(x_i, axes=(0, 2, 1)))
n = x_i.shape[0]
# Actualización parámetros theta (SVM)
self.theta = self.theta - lr * ((1 / n * gate_matrix * self.dtheta).sum(axis=0) + Lambda * self.theta)
self.theta_0 = self.theta_0 - lr * (1 / n * gate_matrix * self.dtheta_0[:, None, None]).sum()
#self.theta_0 = self.theta_0 - lr * (1 / n * gate_matrix * self.dtheta_0).sum(axis=0)
# Promedio gradientes para Adam (capa 1)
self.db1 = (1 / n * gate_matrix * self.db1).sum(axis=0)
self.dW1 = (1 / n * gate_matrix * self.dW1).sum(axis=0)
# Adam actualización para b1
m_b1 = beta1 * m_b1 + (1 - beta1) * self.db1
v_b1 = beta2 * v_b1 + (1 - beta2) * (self.db1 ** 2)
m_b1_hat = m_b1 / (1 - beta1 ** (k + 1))
v_b1_hat = v_b1 / (1 - beta2 ** (k + 1))
self.b1 = self.b1 - lr * m_b1_hat / (eps + np.sqrt(v_b1_hat))
# Adam actualización para W1
m_W1 = beta1 * m_W1 + (1 - beta1) * self.dW1
v_W1 = beta2 * v_W1 + (1 - beta2) * (self.dW1 ** 2)
m_W1_hat = m_W1 / (1 - beta1 ** (k + 1))
v_W1_hat = v_W1 / (1 - beta2 ** (k + 1))
self.W1 = self.W1 - lr * m_W1_hat / (eps + np.sqrt(v_W1_hat))
# Learning rate adaptativo y actualización para b0, W0 (primera capa)
g_new_b0 = self.db0.sum(axis=0) # suma sobre batches para reducir dimensiones
g_new_W0 = self.dW0.sum(axis=0)
self.alpha_b0 = self.alpha_b0 + alpha * (g_b0 * g_new_b0)
self.alpha_W0 = self.alpha_W0 + alpha * (g_W0 * g_new_W0)
g_b0 = g_new_b0
g_W0 = g_new_W0
m_b0 = beta1 * m_b0 + (1 - beta1) * self.db0.sum(axis=0)
v_b0 = beta2 * v_b0 + (1 - beta2) * (self.db0.sum(axis=0) ** 2)
m_b0_hat = m_b0 / (1 - beta1 ** (k + 1))
v_b0_hat = v_b0 / (1 - beta2 ** (k + 1))
self.b0 = self.b0 - self.alpha_b0 * m_b0_hat / (eps + np.sqrt(v_b0_hat))
m_W0 = beta1 * m_W0 + (1 - beta1) * self.dW0.sum(axis=0)
v_W0 = beta2 * v_W0 + (1 - beta2) * (self.dW0.sum(axis=0) ** 2)
m_W0_hat = m_W0 / (1 - beta1 ** (k + 1))
v_W0_hat = v_W0 / (1 - beta2 ** (k + 1))
self.W0 = self.W0 - self.alpha_W0 * m_W0_hat / (eps + np.sqrt(v_W0_hat))
k += 1
# Paso 9: Crear el modelo con la arquitectura de la tesis
input_dim = X_train_p.shape[1] # 13 entradas
hidden_neurons = 500 # una sola capa oculta de 500 neuronas
output_dim = 500 # una sola salida (SVM)
model = NNSVM(input_dim, hidden_neurons, output_dim, AF="tanh", seed=0)
# %%
# Paso 10: Entrenar el modelo con los parámetros de la tesis
model.fit(X_train_p, y_train_p, X_val_p, y_val_p,
epochs=10, # 300 épocas
lr=1e-3, # tasa de aprendizaje base
Lambda=1e-3, # regularización para SVM
beta1=0.9,
beta2=0.999,
eps=1e-8,
alpha=1e-6, # factor de ajuste del lr adaptativo
batch_size=128)
# %%
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
import seaborn as sns
# Paso 11: Predicciones en test
y_pred, _ = model.predict(X_test_p)
y_pred = y_pred.astype(int)
# Paso 12: Métricas
acc = accuracy_score(y_test_p, y_pred)
print(f"\n✅ Accuracy en test: {acc:.4f}")
print("\n📊 Reporte de clasificación:")
print(classification_report(y_test_p, y_pred, target_names=["Falla", "Normal"]))
# Paso 13: Matriz de confusión visual
cm = confusion_matrix(y_test_p, y_pred)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=["Falla", "Normal"], yticklabels=["Falla", "Normal"])
plt.title("Matriz de Confusión")
plt.xlabel("Predicción")
plt.ylabel("Real")
plt.show()
# %%
# Paso 14 (Opcional): Guardar el modelo entrenado y los parámetros de normalización
print("Guardando el modelo y los parámetros de normalización...")
# Guardar el modelo binario de clasificación
joblib.dump(model, 'modelo_clasificacion_binaria.pkl')
print("Modelo de clasificación binaria guardado como 'modelo_clasificacion_binaria.pkl'")
# Guardar los parámetros de normalización (mu y std)
# Asumiendo que mu y std se calcularon en un paso anterior (ej. en el Paso 6 de tu script)
# Si no están disponibles en este scope, asegúrate de pasarlos o hacerlos accesibles.
joblib.dump(mu, 'parametros_normalizacion_mu.pkl')
joblib.dump(std, 'parametros_normalizacion_std.pkl')
print("Parámetros de normalización (mu, std) guardados como 'parametros_normalizacion_mu.pkl' y 'parametros_normalizacion_std.pkl'")
# %%