Spaces:
Sleeping
Sleeping
| import os | |
| import numpy as np | |
| import pandas as pd | |
| import gradio as gr | |
| import tensorflow as tf | |
| import joblib | |
| # 1. CARGA DE MODELOS Y ARTEFACTOS | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| MODELS_DIR = os.path.join(BASE_DIR, "models") | |
| def load_artifacts(): | |
| try: | |
| model_bin = tf.keras.models.load_model(os.path.join(MODELS_DIR, "modelo_base.keras")) | |
| model_multi = tf.keras.models.load_model(os.path.join(MODELS_DIR, "modelo_transfer.keras")) | |
| scaler = joblib.load(os.path.join(MODELS_DIR, "scaler.joblib")) | |
| encoders = joblib.load(os.path.join(MODELS_DIR, "encoders.joblib")) | |
| return model_bin, model_multi, scaler, encoders | |
| except Exception as e: | |
| print(f"Error cargando modelos: {e}") | |
| return None, None, None, None | |
| model_bin, model_multi, scaler, encoders = load_artifacts() | |
| # Mapeo de categorías multiclase | |
| MULTI_CLASSES = {0: "Normal", 1: "DoS", 2: "Probe", 3: "R2L", 4: "U2R"} | |
| def predict_batch(file): | |
| if file is None: return None, "Cargue un archivo CSV." | |
| try: | |
| # Leer archivo (primeras 41 columnas) | |
| df_raw = pd.read_csv(file.name, header=None).iloc[:, :41] | |
| df_proc = df_raw.copy() | |
| # Preprocesamiento Batch | |
| if encoders: | |
| for col, pos in [('protocol_type', 1), ('service', 2), ('flag', 3)]: | |
| df_proc[pos] = encoders[col].transform(df_proc[pos].astype(str)) | |
| X = df_proc.values.astype(float) | |
| if scaler: | |
| X = scaler.transform(X) | |
| # Predicciones | |
| preds_bin = model_bin.predict(X, verbose=0) | |
| preds_multi = model_multi.predict(X, verbose=0) | |
| results = [] | |
| ataques_count = 0 | |
| for i in range(len(X)): | |
| # Lógica Binaria (Umbral 0.5) | |
| # Nota: Si la probabilidad es > 0.5 es Ataque | |
| prob_bin = float(preds_bin[i][0]) | |
| is_attack = prob_bin > 0.5 | |
| label_bin = "🛑 Ataque" if is_attack else "✅ Normal" | |
| conf_bin = f"{prob_bin:.2%}" if is_attack else f"{(1-prob_bin):.2%}" | |
| if is_attack: | |
| ataques_count += 1 | |
| # Lógica Multiclase | |
| idx_multi = np.argmax(preds_multi[i]) | |
| label_multi = MULTI_CLASSES.get(idx_multi, "Otros") | |
| conf_multi = f"{np.max(preds_multi[i]):.2%}" | |
| results.append([i+1, label_bin, conf_bin, label_multi, conf_multi]) | |
| # Resumen unificado | |
| total = len(results) | |
| normal_count = total - ataques_count | |
| summary = (f"📊 Análisis Completado\n" | |
| f"-------------------\n" | |
| f"Total registros: {total}\n" | |
| f"✅ Tráfico Normal: {normal_count}\n" | |
| f"🛑 Ataques: {ataques_count}") | |
| return results, summary | |
| except Exception as e: | |
| return None, f"Error en el proceso: {str(e)}" | |
| # 3. INTERFAZ | |
| with gr.Blocks(theme=gr.themes.Soft(), title="IDS Demo") as demo: | |
| gr.Markdown("# 🛡️ IDS con Redes Neuronales (NSL-KDD)") | |
| with gr.Row(): | |
| file_input = gr.File(label="Subir test_heterogeneo.csv", file_types=[".csv"]) | |
| run_btn = gr.Button("🚀 Analizar en Batch", variant="primary") | |
| with gr.Row(): | |
| summary_out = gr.Textbox(label="Resumen del Análisis", lines=6) | |
| table_out = gr.DataFrame( | |
| headers=["#", "Clasif. Binaria", "Confianza", "Categoría", "Confianza"], | |
| label="Resultados Detallados" | |
| ) | |
| run_btn.click(fn=predict_batch, inputs=file_input, outputs=[table_out, summary_out]) | |
| if __name__ == "__main__": | |
| demo.launch() |