Spaces:
Runtime error
Runtime error
| import os | |
| import joblib | |
| import warnings | |
| import numpy as np | |
| import pandas as pd | |
| from datasets import load_dataset | |
| from sklearn.tree import DecisionTreeClassifier | |
| from sklearn.svm import SVC | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.feature_selection import SelectKBest, chi2 | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import ( | |
| accuracy_score, precision_score, recall_score, | |
| f1_score, roc_auc_score, confusion_matrix, | |
| classification_report | |
| ) | |
| warnings.filterwarnings("ignore") | |
| print("=" * 60) | |
| print(" IDS MODEL TRAINING β 3 Models") | |
| print("=" * 60) | |
| # ββ Load dataset ββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n[1/6] Loading NSL-KDD dataset...") | |
| raw = load_dataset("Mireu-Lab/NSL-KDD") | |
| df = raw["train"].to_pandas() | |
| df = df.sample(n=20000, random_state=42).reset_index(drop=True) | |
| # ββ Detect and binarise target ββββββββββββββββββββββββββββββββ | |
| possible_targets = ["label", "class", "attack", "target", "Label", "Class"] | |
| target_col = None | |
| for col in possible_targets: | |
| if col in df.columns: | |
| target_col = col | |
| break | |
| if target_col is None: | |
| for col in df.columns: | |
| if str(df[col].dtype) in ["object", "string", "string[pyarrow]"]: | |
| if "normal" in df[col].astype(str).str.lower().unique(): | |
| target_col = col | |
| break | |
| print(f" Target column: {target_col}") | |
| df["label"] = df[target_col].apply( | |
| lambda x: 0 if str(x).strip().lower() == "normal" else 1 | |
| ) | |
| if target_col != "label": | |
| df = df.drop(columns=[target_col]) | |
| for col in ["difficulty", "Difficulty", "level"]: | |
| if col in df.columns: | |
| df = df.drop(columns=[col]) | |
| # ββ EDA / Cleaning ββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n[2/6] Cleaning and preprocessing...") | |
| df = df.drop_duplicates().reset_index(drop=True) | |
| categorical_cols = df.select_dtypes( | |
| include=["object", "string", "category"]).columns.tolist() | |
| for col in categorical_cols: | |
| df[col] = df[col].astype(str).str.lower().str.strip() | |
| for col in df.columns: | |
| if col == "label": | |
| continue | |
| if pd.api.types.is_numeric_dtype(df[col]): | |
| df[col] = df[col].fillna(df[col].median()) | |
| else: | |
| mode_val = df[col].mode() | |
| df[col] = df[col].fillna(mode_val[0] if len(mode_val) else "unknown") | |
| X_raw = df.drop("label", axis=1) | |
| y = df["label"] | |
| categorical_cols = X_raw.select_dtypes( | |
| include=["object", "string", "category"]).columns.tolist() | |
| X_encoded = pd.get_dummies(X_raw, columns=categorical_cols, drop_first=True) | |
| X_encoded = X_encoded.replace([np.inf, -np.inf], np.nan).fillna(0).astype(float) | |
| print(f" Encoded shape: {X_encoded.shape}") | |
| # ββ Feature selection βββββββββββββββββββββββββββββββββββββββββ | |
| print("\n[3/6] Selecting features (correlation + chi-square)...") | |
| corr = pd.concat([X_encoded, y], axis=1).corr()["label"].abs().sort_values(ascending=False) | |
| top_corr = corr.index[1:26].tolist() | |
| X_corr = X_encoded[top_corr].copy() | |
| for col in X_corr.columns: | |
| if X_corr[col].min() < 0: | |
| X_corr[col] -= X_corr[col].min() | |
| selector = SelectKBest(score_func=chi2, k=min(12, X_corr.shape[1])) | |
| selector.fit(X_corr, y) | |
| selected_features = X_corr.columns[selector.get_support()].tolist() | |
| print(f" Selected {len(selected_features)} features:") | |
| for i, f in enumerate(selected_features, 1): | |
| print(f" {i:2}. {f}") | |
| X_final = X_corr[selected_features] | |
| # ββ Train/test split ββββββββββββββββββββββββββββββββββββββββββ | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X_final, y, test_size=0.2, random_state=42, stratify=y | |
| ) | |
| print(f"\n Train: {X_train.shape[0]} rows | Test: {X_test.shape[0]} rows") | |
| # ββ Scale (for SVM and LR) ββββββββββββββββββββββββββββββββββββ | |
| scaler = StandardScaler() | |
| X_train_scaled = scaler.fit_transform(X_train) | |
| X_test_scaled = scaler.transform(X_test) | |
| # ββ Helper ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def evaluate(name, model, X_tr, X_te, scaled=False): | |
| Xtr = X_train_scaled if scaled else X_tr | |
| Xte = X_test_scaled if scaled else X_te | |
| model.fit(Xtr, y_train) | |
| yp = model.predict(Xte) | |
| ypr = model.predict_proba(Xte)[:, 1] if hasattr(model, "predict_proba") else \ | |
| model.decision_function(Xte) | |
| print(f"\n{'β'*50}") | |
| print(f" {name}") | |
| print(f"{'β'*50}") | |
| print(f" Accuracy : {accuracy_score(y_test, yp):.4f}") | |
| print(f" Precision : {precision_score(y_test, yp, zero_division=0):.4f}") | |
| print(f" Recall : {recall_score(y_test, yp, zero_division=0):.4f}") | |
| print(f" F1 Score : {f1_score(y_test, yp, zero_division=0):.4f}") | |
| print(f" ROC-AUC : {roc_auc_score(y_test, ypr):.4f}") | |
| print(f"\n Confusion Matrix:\n{confusion_matrix(y_test, yp)}") | |
| print(f"\n Classification Report:\n{classification_report(y_test, yp, zero_division=0)}") | |
| # ββ [4/6] Decision Tree βββββββββββββββββββββββββββββββββββββββ | |
| print("\n[4/6] Training Decision Tree...") | |
| dt_model = DecisionTreeClassifier( | |
| max_depth=10, min_samples_split=20, | |
| class_weight="balanced", random_state=42 | |
| ) | |
| evaluate("Decision Tree", dt_model, X_train, X_test, scaled=False) | |
| # ββ [5/6] Logistic Regression βββββββββββββββββββββββββββββββββ | |
| print("\n[5/6] Training Logistic Regression...") | |
| lr_model = LogisticRegression( | |
| max_iter=1000, class_weight="balanced", | |
| random_state=42, solver="lbfgs" | |
| ) | |
| evaluate("Logistic Regression", lr_model, X_train, X_test, scaled=True) | |
| # ββ [6/6] SVM βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n[6/6] Training SVM (RBF kernel, probability=True)...") | |
| svm_model = SVC( | |
| kernel="rbf", C=1.0, gamma="scale", | |
| class_weight="balanced", probability=True, random_state=42 | |
| ) | |
| evaluate("SVM (RBF)", svm_model, X_train, X_test, scaled=True) | |
| # ββ Save artifacts ββββββββββββββββββββββββββββββββββββββββββββ | |
| os.makedirs("models", exist_ok=True) | |
| joblib.dump(dt_model, "models/decision_tree_model.pkl") | |
| joblib.dump(lr_model, "models/logistic_regression_model.pkl") | |
| joblib.dump(svm_model, "models/svm_model.pkl") | |
| joblib.dump(scaler, "models/scaler.pkl") | |
| joblib.dump(selected_features, "models/features.pkl") | |
| # Save metrics summary for the app dashboard | |
| import json | |
| metrics_summary = {} | |
| for name, mdl, scaled in [ | |
| ("Decision Tree", dt_model, False), | |
| ("Logistic Regression", lr_model, True), | |
| ("SVM", svm_model, True), | |
| ]: | |
| Xte = X_test_scaled if scaled else X_test | |
| yp = mdl.predict(Xte) | |
| ypr = mdl.predict_proba(Xte)[:, 1] | |
| metrics_summary[name] = { | |
| "accuracy": round(accuracy_score(y_test, yp), 4), | |
| "precision": round(precision_score(y_test, yp, zero_division=0), 4), | |
| "recall": round(recall_score(y_test, yp, zero_division=0), 4), | |
| "f1": round(f1_score(y_test, yp, zero_division=0), 4), | |
| "roc_auc": round(roc_auc_score(y_test, ypr), 4), | |
| } | |
| with open("models/metrics_summary.json", "w") as f: | |
| json.dump(metrics_summary, f, indent=4) | |
| print("\n" + "=" * 60) | |
| print(" ALL MODELS SAVED SUCCESSFULLY") | |
| print("=" * 60) | |
| print(" models/decision_tree_model.pkl") | |
| print(" models/logistic_regression_model.pkl") | |
| print(" models/svm_model.pkl") | |
| print(" models/scaler.pkl") | |
| print(" models/features.pkl") | |
| print(" models/metrics_summary.json") | |
| print("=" * 60) |