NIDS / app.py
JasHugF's picture
Update app.py
5301eea verified
import gradio as gr
import tensorflow as tf
import numpy as np
import pandas as pd
# 1. Load your trained .keras model
# This line loads your entire model from the .keras file.
model = tf.keras.models.load_model("adversarially_trained_model.keras")
print("output_shape:", model.output_shape) # e.g., (None, 5)
CLASS_NAMES = ["normal", "dos", "probe", "r2l", "u2r"]
def preprocess_single_record(record: dict, scaler, df_reference: pd.DataFrame) -> np.ndarray:
# Convert input dict to DataFrame
df_input = pd.DataFrame([record])
# Fill missing columns with 0 (for numeric) or mode (for categorical)
for col in df_reference.columns:
if col not in df_input.columns:
if df_reference[col].dtype == 'float64':
df_input[col] = 0.0
else:
df_input[col] = df_reference[col].mode()[0] if not df_reference[col].empty else 'unknown'
# Ensure correct column order
df_input = df_input[df_reference.columns]
# Convert numeric columns to float
numeric_cols = [
"duration", "src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
"hot", "num_failed_logins", "logged_in", "num_compromised", "root_shell",
"su_attempted", "num_root", "num_file_creations", "num_shells",
"num_access_files", "num_outbound_cmds", "is_host_login", "is_guest_login",
"count", "srv_count", "serror_rate", "srv_serror_rate", "rerror_rate",
"srv_rerror_rate", "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
"dst_host_count", "dst_host_srv_count", "dst_host_same_srv_rate",
"dst_host_diff_srv_rate", "dst_host_same_src_port_rate",
"dst_host_srv_diff_host_rate", "dst_host_serror_rate",
"dst_host_srv_serror_rate", "dst_host_rerror_rate", "dst_host_srv_rerror_rate"
]
df_input[numeric_cols] = df_input[numeric_cols].astype(float)
# One-hot encode categorical features
df_categorical = pd.get_dummies(df_input[["protocol_type", "service", "flag"]])
df_numeric = df_input[numeric_cols]
df_final = pd.concat([df_numeric, df_categorical], axis=1)
# Align with reference columns (from training)
df_final = df_final.reindex(columns=df_reference.columns, fill_value=0)
# Scale using the provided scaler
X_processed = scaler.transform(df_final)
return X_processed
# 2. Create the prediction function
def predict_from_array(input_text):
"""
Takes a comma-separated string of floats, preprocesses it,
and returns the model's prediction.
"""
# 3. Parse the input string
try:
# Split by comma, strip whitespace, and convert to float
float_values = [float(x.strip()) for x in input_text.split(',')]
# Convert the list to a NumPy array
input_array = np.array(float_values)
except ValueError as e:
return f"Input Error: Please enter numbers only. Details: {e}"
except Exception as e:
return f"Error: {e}"
# 4. Reshape the array for the model
# !!! IMPORTANT !!!
# Most Keras models expect a batch. If your model was trained on
# inputs with shape (num_samples, num_features), you must reshape
# your 1D array to (1, num_features).
# This line assumes your model expects one "row" of features
try:
input_array = input_array.reshape(1, -1)
except ValueError as e:
return f"Shape Error: Model expects a different number of features. Got {len(float_values)}. Details: {e}"
# 5. Make prediction
predictions = model.predict(input_array)
scores = predictions[0]
# Ensure 1D array
scores = np.array(scores).reshape(-1)
# 6. Format the output
# --- OPTION A: For Classification ---
# Use this if your model outputs probabilities for classes
num_outputs = len(scores)
# Align class names to model outputs
if len(CLASS_NAMES) < num_outputs:
aligned_class_names = CLASS_NAMES + [f"class_{i}" for i in range(len(CLASS_NAMES), num_outputs)]
else:
aligned_class_names = CLASS_NAMES[:num_outputs]
# Normalize scores to probabilities if they don't sum to ~1
exp_scores = np.exp(scores - np.max(scores))
prob_scores = exp_scores / np.sum(exp_scores) if np.isfinite(exp_scores).all() and exp_scores.sum() > 0 else scores
confidences = {aligned_class_names[i]: float(prob_scores[i]) for i in range(num_outputs)}
return confidences
# --- OPTION B: For Regression (or single value output) ---
# Use this if your model outputs a single number
# predicted_value = float(scores[0])
# return f"Predicted Value: {predicted_value:.4f}"
# --- OPTION C: For Raw Array Output ---
# Just return the raw prediction scores
# return str(scores)
# --- Gradio UI ---
# 7. Define the Gradio Interface
demo = gr.Interface(
fn=predict_from_array,
# Input: A Textbox.
inputs=gr.Textbox(
label="Input Features",
placeholder="Enter comma-separated float values, e.g., 1.5, 0.8, -2.3, 4.0"
),
# Output: A Textbox for regression (Option B) or Label for classification (Option A)
# Use gr.Label() if you return a dict for classification (Option A)
outputs=gr.Label(num_top_classes=3, label="Attack Type Predictions"),
# Use gr.Textbox() for regression (Option B) or raw output (Option C)
# outputs=gr.Textbox(label="Prediction"),
title="Network Intrusion Detection System (NIDS)",
description="Adversarially trained model for network attack classification. Input network traffic features as comma-separated values to detect attack types: normal, dos, probe, r2l, u2r, or other."
)
# 8. Launch the app
if __name__ == "__main__":
demo.launch()