IDS_PROJECT / train_models.py
Syeda-RGB's picture
Upload 3 files
0a09fe4 verified
import os
import joblib
import warnings
import numpy as np
import pandas as pd
from datasets import load_dataset
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, chi2
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
accuracy_score, precision_score, recall_score,
f1_score, roc_auc_score, confusion_matrix,
classification_report
)
warnings.filterwarnings("ignore")
print("=" * 60)
print(" IDS MODEL TRAINING β€” 3 Models")
print("=" * 60)
# ── Load dataset ──────────────────────────────────────────────
print("\n[1/6] Loading NSL-KDD dataset...")
raw = load_dataset("Mireu-Lab/NSL-KDD")
df = raw["train"].to_pandas()
df = df.sample(n=20000, random_state=42).reset_index(drop=True)
# ── Detect and binarise target ────────────────────────────────
possible_targets = ["label", "class", "attack", "target", "Label", "Class"]
target_col = None
for col in possible_targets:
if col in df.columns:
target_col = col
break
if target_col is None:
for col in df.columns:
if str(df[col].dtype) in ["object", "string", "string[pyarrow]"]:
if "normal" in df[col].astype(str).str.lower().unique():
target_col = col
break
print(f" Target column: {target_col}")
df["label"] = df[target_col].apply(
lambda x: 0 if str(x).strip().lower() == "normal" else 1
)
if target_col != "label":
df = df.drop(columns=[target_col])
for col in ["difficulty", "Difficulty", "level"]:
if col in df.columns:
df = df.drop(columns=[col])
# ── EDA / Cleaning ────────────────────────────────────────────
print("\n[2/6] Cleaning and preprocessing...")
df = df.drop_duplicates().reset_index(drop=True)
categorical_cols = df.select_dtypes(
include=["object", "string", "category"]).columns.tolist()
for col in categorical_cols:
df[col] = df[col].astype(str).str.lower().str.strip()
for col in df.columns:
if col == "label":
continue
if pd.api.types.is_numeric_dtype(df[col]):
df[col] = df[col].fillna(df[col].median())
else:
mode_val = df[col].mode()
df[col] = df[col].fillna(mode_val[0] if len(mode_val) else "unknown")
X_raw = df.drop("label", axis=1)
y = df["label"]
categorical_cols = X_raw.select_dtypes(
include=["object", "string", "category"]).columns.tolist()
X_encoded = pd.get_dummies(X_raw, columns=categorical_cols, drop_first=True)
X_encoded = X_encoded.replace([np.inf, -np.inf], np.nan).fillna(0).astype(float)
print(f" Encoded shape: {X_encoded.shape}")
# ── Feature selection ─────────────────────────────────────────
print("\n[3/6] Selecting features (correlation + chi-square)...")
corr = pd.concat([X_encoded, y], axis=1).corr()["label"].abs().sort_values(ascending=False)
top_corr = corr.index[1:26].tolist()
X_corr = X_encoded[top_corr].copy()
for col in X_corr.columns:
if X_corr[col].min() < 0:
X_corr[col] -= X_corr[col].min()
selector = SelectKBest(score_func=chi2, k=min(12, X_corr.shape[1]))
selector.fit(X_corr, y)
selected_features = X_corr.columns[selector.get_support()].tolist()
print(f" Selected {len(selected_features)} features:")
for i, f in enumerate(selected_features, 1):
print(f" {i:2}. {f}")
X_final = X_corr[selected_features]
# ── Train/test split ──────────────────────────────────────────
X_train, X_test, y_train, y_test = train_test_split(
X_final, y, test_size=0.2, random_state=42, stratify=y
)
print(f"\n Train: {X_train.shape[0]} rows | Test: {X_test.shape[0]} rows")
# ── Scale (for SVM and LR) ────────────────────────────────────
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# ── Helper ────────────────────────────────────────────────────
def evaluate(name, model, X_tr, X_te, scaled=False):
Xtr = X_train_scaled if scaled else X_tr
Xte = X_test_scaled if scaled else X_te
model.fit(Xtr, y_train)
yp = model.predict(Xte)
ypr = model.predict_proba(Xte)[:, 1] if hasattr(model, "predict_proba") else \
model.decision_function(Xte)
print(f"\n{'─'*50}")
print(f" {name}")
print(f"{'─'*50}")
print(f" Accuracy : {accuracy_score(y_test, yp):.4f}")
print(f" Precision : {precision_score(y_test, yp, zero_division=0):.4f}")
print(f" Recall : {recall_score(y_test, yp, zero_division=0):.4f}")
print(f" F1 Score : {f1_score(y_test, yp, zero_division=0):.4f}")
print(f" ROC-AUC : {roc_auc_score(y_test, ypr):.4f}")
print(f"\n Confusion Matrix:\n{confusion_matrix(y_test, yp)}")
print(f"\n Classification Report:\n{classification_report(y_test, yp, zero_division=0)}")
# ── [4/6] Decision Tree ───────────────────────────────────────
print("\n[4/6] Training Decision Tree...")
dt_model = DecisionTreeClassifier(
max_depth=10, min_samples_split=20,
class_weight="balanced", random_state=42
)
evaluate("Decision Tree", dt_model, X_train, X_test, scaled=False)
# ── [5/6] Logistic Regression ─────────────────────────────────
print("\n[5/6] Training Logistic Regression...")
lr_model = LogisticRegression(
max_iter=1000, class_weight="balanced",
random_state=42, solver="lbfgs"
)
evaluate("Logistic Regression", lr_model, X_train, X_test, scaled=True)
# ── [6/6] SVM ─────────────────────────────────────────────────
print("\n[6/6] Training SVM (RBF kernel, probability=True)...")
svm_model = SVC(
kernel="rbf", C=1.0, gamma="scale",
class_weight="balanced", probability=True, random_state=42
)
evaluate("SVM (RBF)", svm_model, X_train, X_test, scaled=True)
# ── Save artifacts ────────────────────────────────────────────
os.makedirs("models", exist_ok=True)
joblib.dump(dt_model, "models/decision_tree_model.pkl")
joblib.dump(lr_model, "models/logistic_regression_model.pkl")
joblib.dump(svm_model, "models/svm_model.pkl")
joblib.dump(scaler, "models/scaler.pkl")
joblib.dump(selected_features, "models/features.pkl")
# Save metrics summary for the app dashboard
import json
metrics_summary = {}
for name, mdl, scaled in [
("Decision Tree", dt_model, False),
("Logistic Regression", lr_model, True),
("SVM", svm_model, True),
]:
Xte = X_test_scaled if scaled else X_test
yp = mdl.predict(Xte)
ypr = mdl.predict_proba(Xte)[:, 1]
metrics_summary[name] = {
"accuracy": round(accuracy_score(y_test, yp), 4),
"precision": round(precision_score(y_test, yp, zero_division=0), 4),
"recall": round(recall_score(y_test, yp, zero_division=0), 4),
"f1": round(f1_score(y_test, yp, zero_division=0), 4),
"roc_auc": round(roc_auc_score(y_test, ypr), 4),
}
with open("models/metrics_summary.json", "w") as f:
json.dump(metrics_summary, f, indent=4)
print("\n" + "=" * 60)
print(" ALL MODELS SAVED SUCCESSFULLY")
print("=" * 60)
print(" models/decision_tree_model.pkl")
print(" models/logistic_regression_model.pkl")
print(" models/svm_model.pkl")
print(" models/scaler.pkl")
print(" models/features.pkl")
print(" models/metrics_summary.json")
print("=" * 60)