Afeefa123's picture
Update app.py
aec9367 verified
"""
Gradio app for NSL-KDD binary intrusion detection demo (MVP)
Expecting these files in the same repo/root of the Space:
- nsl_kdd_tf_model.h5 (optional; if present will be used)
- scaler.pkl (optional; sklearn StandardScaler, must match model training)
- columns.json (optional; list of feature column names used by the model)
If artifacts are missing, the app will instruct you how to add them and offers a quick fallback
where you can upload a CSV and the app will train a lightweight sklearn model for demo purposes.
"""
import os
import json
import tempfile
import traceback
from typing import Tuple, List
import numpy as np
import pandas as pd
import gradio as gr
# optional heavy import guarded
TF_AVAILABLE = True
try:
import tensorflow as tf
except Exception:
TF_AVAILABLE = False
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
import joblib
# artifact filenames
MODEL_FILE = "nsl_kdd_tf_model.h5"
SCALER_FILE = "scaler.pkl"
COLUMNS_FILE = "columns.json"
# helper: load artifacts if exist
def load_artifacts():
model = None
scaler = None
columns = None
model_type = None
# load columns.json if present
if os.path.exists(COLUMNS_FILE):
with open(COLUMNS_FILE, "r", encoding="utf-8") as f:
columns = json.load(f)
# load scaler if present
if os.path.exists(SCALER_FILE):
try:
scaler = joblib.load(SCALER_FILE)
except Exception:
try:
scaler = joblib.load(open(SCALER_FILE, "rb"))
except Exception:
scaler = None
# load TF model if present and TF available
if os.path.exists(MODEL_FILE) and TF_AVAILABLE:
try:
model = tf.keras.models.load_model(MODEL_FILE)
model_type = "tensorflow"
except Exception:
model = None
return model, scaler, columns, model_type
MODEL, SCALER, COLUMNS, MODEL_TYPE = load_artifacts()
def model_available_message() -> str:
if MODEL is not None and SCALER is not None and COLUMNS is not None:
return "✅ Pretrained TensorFlow model and artifacts loaded. Ready to predict."
pieces = []
if MODEL is None:
pieces.append(f"Missing `{MODEL_FILE}`")
if SCALER is None:
pieces.append(f"Missing `{SCALER_FILE}`")
if COLUMNS is None:
pieces.append(f"Missing `{COLUMNS_FILE}`")
msg = "⚠️ Artifacts missing: " + ", ".join(pieces) + ".\n\n"
msg += "To run the TF model, add those files to the Space repository (same folder as app.py).\n"
msg += "Alternatively, upload a CSV of NSL-KDD records (the app will train a quick sklearn model for demo).\n\n"
msg += "columns.json should be a JSON array of feature names that match the model input (same as X_train.columns).\n"
return msg
# utility: preprocess input dataframe into model-ready X using columns & scaler
def prepare_X_from_df(df: pd.DataFrame, expected_columns: List[str], scaler_obj) -> np.ndarray:
# Align columns: fill missing with 0
X = df.reindex(columns=expected_columns, fill_value=0)
# Ensure numeric type
X = X.apply(pd.to_numeric, errors="coerce").fillna(0.0)
if scaler_obj is not None:
Xs = scaler_obj.transform(X)
else:
# if no scaler provided, return raw numpy
Xs = X.values.astype(np.float32)
return Xs
def predict_batch_from_df(df: pd.DataFrame) -> Tuple[pd.DataFrame, str]:
"""
returns (result_df, status_message)
result_df contains prob and predicted class per row
"""
try:
if MODEL is not None and SCALER is not None and COLUMNS is not None and MODEL_TYPE == "tensorflow":
Xs = prepare_X_from_df(df, COLUMNS, SCALER)
probs = MODEL.predict(Xs).ravel()
preds = (probs >= 0.5).astype(int)
out = df.copy()
out["_pred_prob"] = probs
out["_pred_class"] = preds
return out, "Predictions from TensorFlow model"
else:
# fallback: train a quick logistic regression on uploaded data if contains label
if 'label' in df.columns or 'label_bin' in df.columns:
# If label present, run quick preprocess similar to notebook: create X (one-hot for cats)
# Identify expected categorical columns if present
cats = ['protocol_type', 'service', 'flag']
col_names = df.columns.tolist()
# We'll try to mimic preprocess from notebook: numeric vs cats
num_cols = [c for c in col_names if c not in cats + ['label','label_bin']]
X_num = df[num_cols].apply(pd.to_numeric, errors='coerce').fillna(0.0)
X_cat = pd.get_dummies(df[cats], drop_first=True)
X = pd.concat([X_num, X_cat], axis=1)
y = df['label_bin'] if 'label_bin' in df.columns else df['label'].apply(lambda s: 0 if str(s).strip().lower()=="normal" else 1)
# minimal scaler + logistic
scaler_local = StandardScaler()
Xs = scaler_local.fit_transform(X)
clf = LogisticRegression(max_iter=200)
clf.fit(Xs, y)
probs = clf.predict_proba(Xs)[:,1]
preds = (probs >= 0.5).astype(int)
out = df.copy()
out["_pred_prob"] = probs
out["_pred_class"] = preds
return out, "Trained temporary LogisticRegression on uploaded CSV (used 'label' or 'label_bin' for training)."
else:
return pd.DataFrame(), "Cannot fallback: artifacts missing and uploaded CSV does not contain 'label' or 'label_bin' to train a temporary model."
except Exception as e:
tb = traceback.format_exc()
return pd.DataFrame(), f"Prediction error: {e}\n\n{tb}"
def predict_single(sample_text: str) -> str:
"""
sample_text: CSV row or JSON dict representing one row with same columns as columns.json
returns a readable string with probability and class
"""
try:
if not sample_text:
return "No input provided."
# try JSON first
try:
d = json.loads(sample_text)
if isinstance(d, dict):
df = pd.DataFrame([d])
else:
return "JSON must represent an object/dict for single sample."
except Exception:
# try CSV row
try:
df = pd.read_csv(pd.compat.StringIO(sample_text), header=None)
# if no header, user probably pasted values: cannot map to columns
if COLUMNS is not None and df.shape[1] == len(COLUMNS):
df.columns = COLUMNS
else:
return "CSV input detected but header/column count mismatch. Prefer JSON object keyed by column names."
except Exception:
return "Could not parse input. Paste a JSON object like {\"duration\":0, \"protocol_type\":\"tcp\", ...} or upload a CSV row with header."
# Now we have df; run batch predict logic but for a single row
if MODEL is not None and SCALER is not None and COLUMNS is not None and MODEL_TYPE == "tensorflow":
Xs = prepare_X_from_df(df, COLUMNS, SCALER)
prob = float(MODEL.predict(Xs)[0,0])
pred = int(prob >= 0.5)
return f"Pred prob: {prob:.4f} — predicted class: {pred} (0=normal, 1=attack)"
else:
return "Model artifacts not present in Space. Upload `nsl_kdd_tf_model.h5`, `scaler.pkl`, and `columns.json` to use the TensorFlow model. Alternatively upload a labelled CSV to train a quick demo model."
except Exception as e:
tb = traceback.format_exc()
return f"Error: {e}\n\n{tb}"
# Gradio UI components
with gr.Blocks(title="NSL-KDD Intrusion Detection — Demo MVP") as demo:
gr.Markdown("# NSL-KDD Intrusion Detection — Demo (MVP)\n"
"Upload your artifacts (`nsl_kdd_tf_model.h5`, `scaler.pkl`, `columns.json`) to the Space to use the TensorFlow model.\n"
"Or upload a labelled CSV (contains `label` or `label_bin`) and the app will train a quick logistic regression for demo.\n\n"
"Columns expected: the original notebook used 41 numeric features with one-hot for `protocol_type`, `service`, `flag`.\n"
)
status = gr.Textbox(label="Status / Artifact check", value=model_available_message(), interactive=False)
with gr.Row():
with gr.Column(scale=2):
file_input = gr.File(label="Upload CSV for batch prediction or for training fallback", file_types=['.csv'])
sample_input = gr.Textbox(label="Single-sample input (JSON object)", placeholder='{"duration":0, "protocol_type":"tcp", ...}', lines=6)
predict_button = gr.Button("Predict single sample")
batch_button = gr.Button("Run batch (on uploaded CSV)")
with gr.Column(scale=1):
out_table = gr.Dataframe(label="Batch predictions (if any)")
single_out = gr.Textbox(label="Single sample result", interactive=False)
# Example / help
example_text = json.dumps({
"duration": 0,
"protocol_type": "tcp",
"service": "http",
"flag": "SF",
"src_bytes": 181,
"dst_bytes": 5450
}, indent=2)
gr.Markdown("**Example single-sample JSON (fill in more NSL-KDD fields if you have them):**")
gr.Code(example_text, language="json")
# Callbacks
def on_predict_single(sample_text):
return predict_single(sample_text)
def on_batch_predict(file_obj):
if file_obj is None:
return pd.DataFrame(), "No file uploaded."
try:
# read uploaded CSV into DataFrame
df = pd.read_csv(file_obj.name)
except Exception:
try:
# fallback: try bytes
df = pd.read_csv(file_obj)
except Exception as e:
return pd.DataFrame(), f"Could not read CSV: {e}"
out_df, msg = predict_batch_from_df(df)
if out_df.empty:
return pd.DataFrame(), msg
# Limit columns shown for readability
display_df = out_df.copy()
# move prediction columns to front if present
for c in ["_pred_prob", "_pred_class"]:
if c in display_df.columns:
cols = [c] + [x for x in display_df.columns if x != c]
display_df = display_df[cols]
return display_df, msg
predict_button.click(on_predict_single, inputs=[sample_input], outputs=[single_out])
batch_button.click(on_batch_predict, inputs=[file_input], outputs=[out_table, status])
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)))