import os import joblib import warnings import numpy as np import pandas as pd from datasets import load_dataset from sklearn.tree import DecisionTreeClassifier from sklearn.svm import SVC from sklearn.linear_model import LogisticRegression from sklearn.preprocessing import StandardScaler from sklearn.feature_selection import SelectKBest, chi2 from sklearn.model_selection import train_test_split from sklearn.metrics import ( accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix, classification_report ) warnings.filterwarnings("ignore") print("=" * 60) print(" IDS MODEL TRAINING — 3 Models") print("=" * 60) # ── Load dataset ────────────────────────────────────────────── print("\n[1/6] Loading NSL-KDD dataset...") raw = load_dataset("Mireu-Lab/NSL-KDD") df = raw["train"].to_pandas() df = df.sample(n=20000, random_state=42).reset_index(drop=True) # ── Detect and binarise target ──────────────────────────────── possible_targets = ["label", "class", "attack", "target", "Label", "Class"] target_col = None for col in possible_targets: if col in df.columns: target_col = col break if target_col is None: for col in df.columns: if str(df[col].dtype) in ["object", "string", "string[pyarrow]"]: if "normal" in df[col].astype(str).str.lower().unique(): target_col = col break print(f" Target column: {target_col}") df["label"] = df[target_col].apply( lambda x: 0 if str(x).strip().lower() == "normal" else 1 ) if target_col != "label": df = df.drop(columns=[target_col]) for col in ["difficulty", "Difficulty", "level"]: if col in df.columns: df = df.drop(columns=[col]) # ── EDA / Cleaning ──────────────────────────────────────────── print("\n[2/6] Cleaning and preprocessing...") df = df.drop_duplicates().reset_index(drop=True) categorical_cols = df.select_dtypes( include=["object", "string", "category"]).columns.tolist() for col in categorical_cols: df[col] = df[col].astype(str).str.lower().str.strip() for col in df.columns: if col == "label": continue if pd.api.types.is_numeric_dtype(df[col]): df[col] = df[col].fillna(df[col].median()) else: mode_val = df[col].mode() df[col] = df[col].fillna(mode_val[0] if len(mode_val) else "unknown") X_raw = df.drop("label", axis=1) y = df["label"] categorical_cols = X_raw.select_dtypes( include=["object", "string", "category"]).columns.tolist() X_encoded = pd.get_dummies(X_raw, columns=categorical_cols, drop_first=True) X_encoded = X_encoded.replace([np.inf, -np.inf], np.nan).fillna(0).astype(float) print(f" Encoded shape: {X_encoded.shape}") # ── Feature selection ───────────────────────────────────────── print("\n[3/6] Selecting features (correlation + chi-square)...") corr = pd.concat([X_encoded, y], axis=1).corr()["label"].abs().sort_values(ascending=False) top_corr = corr.index[1:26].tolist() X_corr = X_encoded[top_corr].copy() for col in X_corr.columns: if X_corr[col].min() < 0: X_corr[col] -= X_corr[col].min() selector = SelectKBest(score_func=chi2, k=min(12, X_corr.shape[1])) selector.fit(X_corr, y) selected_features = X_corr.columns[selector.get_support()].tolist() print(f" Selected {len(selected_features)} features:") for i, f in enumerate(selected_features, 1): print(f" {i:2}. {f}") X_final = X_corr[selected_features] # ── Train/test split ────────────────────────────────────────── X_train, X_test, y_train, y_test = train_test_split( X_final, y, test_size=0.2, random_state=42, stratify=y ) print(f"\n Train: {X_train.shape[0]} rows | Test: {X_test.shape[0]} rows") # ── Scale (for SVM and LR) ──────────────────────────────────── scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) # ── Helper ──────────────────────────────────────────────────── def evaluate(name, model, X_tr, X_te, scaled=False): Xtr = X_train_scaled if scaled else X_tr Xte = X_test_scaled if scaled else X_te model.fit(Xtr, y_train) yp = model.predict(Xte) ypr = model.predict_proba(Xte)[:, 1] if hasattr(model, "predict_proba") else \ model.decision_function(Xte) print(f"\n{'─'*50}") print(f" {name}") print(f"{'─'*50}") print(f" Accuracy : {accuracy_score(y_test, yp):.4f}") print(f" Precision : {precision_score(y_test, yp, zero_division=0):.4f}") print(f" Recall : {recall_score(y_test, yp, zero_division=0):.4f}") print(f" F1 Score : {f1_score(y_test, yp, zero_division=0):.4f}") print(f" ROC-AUC : {roc_auc_score(y_test, ypr):.4f}") print(f"\n Confusion Matrix:\n{confusion_matrix(y_test, yp)}") print(f"\n Classification Report:\n{classification_report(y_test, yp, zero_division=0)}") # ── [4/6] Decision Tree ─────────────────────────────────────── print("\n[4/6] Training Decision Tree...") dt_model = DecisionTreeClassifier( max_depth=10, min_samples_split=20, class_weight="balanced", random_state=42 ) evaluate("Decision Tree", dt_model, X_train, X_test, scaled=False) # ── [5/6] Logistic Regression ───────────────────────────────── print("\n[5/6] Training Logistic Regression...") lr_model = LogisticRegression( max_iter=1000, class_weight="balanced", random_state=42, solver="lbfgs" ) evaluate("Logistic Regression", lr_model, X_train, X_test, scaled=True) # ── [6/6] SVM ───────────────────────────────────────────────── print("\n[6/6] Training SVM (RBF kernel, probability=True)...") svm_model = SVC( kernel="rbf", C=1.0, gamma="scale", class_weight="balanced", probability=True, random_state=42 ) evaluate("SVM (RBF)", svm_model, X_train, X_test, scaled=True) # ── Save artifacts ──────────────────────────────────────────── os.makedirs("models", exist_ok=True) joblib.dump(dt_model, "models/decision_tree_model.pkl") joblib.dump(lr_model, "models/logistic_regression_model.pkl") joblib.dump(svm_model, "models/svm_model.pkl") joblib.dump(scaler, "models/scaler.pkl") joblib.dump(selected_features, "models/features.pkl") # Save metrics summary for the app dashboard import json metrics_summary = {} for name, mdl, scaled in [ ("Decision Tree", dt_model, False), ("Logistic Regression", lr_model, True), ("SVM", svm_model, True), ]: Xte = X_test_scaled if scaled else X_test yp = mdl.predict(Xte) ypr = mdl.predict_proba(Xte)[:, 1] metrics_summary[name] = { "accuracy": round(accuracy_score(y_test, yp), 4), "precision": round(precision_score(y_test, yp, zero_division=0), 4), "recall": round(recall_score(y_test, yp, zero_division=0), 4), "f1": round(f1_score(y_test, yp, zero_division=0), 4), "roc_auc": round(roc_auc_score(y_test, ypr), 4), } with open("models/metrics_summary.json", "w") as f: json.dump(metrics_summary, f, indent=4) print("\n" + "=" * 60) print(" ALL MODELS SAVED SUCCESSFULLY") print("=" * 60) print(" models/decision_tree_model.pkl") print(" models/logistic_regression_model.pkl") print(" models/svm_model.pkl") print(" models/scaler.pkl") print(" models/features.pkl") print(" models/metrics_summary.json") print("=" * 60)