import gradio as gr
import numpy as np
import pandas as pd
import tensorflow as tf
import joblib
import pickle
import os
# --- 1. CONFIGURATION AND FILE LOADING ---
# Define file paths (assuming you'll upload your improved model)
MODEL_PATH = 'improved_intrusion_detection_model.h5'
SCALER_PATH = 'standard_scaler.pkl'
FEATURE_NAMES_PATH = 'feature_names.pkl'
# Define the 41 original raw features expected from the user input
# NOTE: This list needs to be manually defined based on the KDD dataset structure.
# The 'feature_names.pkl' you provided contains the FINAL 119 feature names.
RAW_41_FEATURES = [
'duration', 'protocol_type', 'service', 'flag', 'src_bytes', 'dst_bytes',
'land', 'wrong_fragment', 'urgent', 'hot', 'num_failed_logins', 'logged_in',
'num_compromised', 'root_shell', 'su_attempted', 'num_root', 'num_file_creations',
'num_shells', 'num_access_files', 'num_outbound_cmds', 'is_host_login',
'is_guest_login', 'count', 'srv_count', 'serror_rate', 'srv_serror_rate',
'rerror_rate', 'srv_rerror_rate', 'same_srv_rate', 'diff_srv_rate',
'srv_diff_host_rate', 'dst_host_count', 'dst_host_srv_count',
'dst_host_same_srv_rate', 'dst_host_diff_srv_rate', 'dst_host_same_src_port_rate',
'dst_host_srv_diff_host_rate', 'dst_host_serror_rate', 'dst_host_srv_serror_rate',
'dst_host_rerror_rate', 'dst_host_srv_rerror_rate'
]
# Identify categorical columns from the raw features
CATEGORICAL_COLS = ['protocol_type', 'service', 'flag']
NUMERICAL_COLS = [col for col in RAW_41_FEATURES if col not in CATEGORICAL_COLS]
try:
# Load Model (assuming it's in the directory)
model = tf.keras.models.load_model(MODEL_PATH)
# Load Preprocessing Objects
scaler = joblib.load(SCALER_PATH)
# Load final 119 feature names list
# The feature_names.pkl file contains the FINAL 119 column names, including OHE columns.
with open(FEATURE_NAMES_PATH, 'rb') as f:
FINAL_119_COLUMNS = pickle.load(f).tolist()
# --- Derived Configuration ---
# The final columns must match the scaler's feature count
if scaler.n_features_in_ != len(FINAL_119_COLUMNS):
raise ValueError(f"Scaler expects {scaler.n_features_in_} features, but feature_names.pkl has {len(FINAL_119_COLUMNS)}. Check file consistency.")
except (FileNotFoundError, ValueError) as e:
print(f"FATAL ERROR: Failed to load required file or file inconsistent: {e}")
print("Please ensure your improved model (.h5) and all .pkl files are in the same folder.")
raise
# --- 2. PREDICTION FUNCTION ---
def predict_attack(*raw_input_values):
"""
Processes the 41 raw user inputs, converts them to 119 scaled features, and predicts.
"""
if len(raw_input_values) != len(RAW_41_FEATURES):
return f'
Input Error: Expected {len(RAW_41_FEATURES)} features, received {len(raw_input_values)}.
'
# 1. Create a raw DataFrame from the user input
raw_df = pd.DataFrame([raw_input_values], columns=RAW_41_FEATURES)
# Ensure numerical columns are numeric type
for col in NUMERICAL_COLS:
raw_df[col] = pd.to_numeric(raw_df[col], errors='coerce').fillna(0.0)
# 2. One-Hot Encoding
# Use pandas get_dummies on the categorical columns
df_encoded = pd.get_dummies(raw_df, columns=CATEGORICAL_COLS, dtype=float)
# 3. Align and Reorder Features to match the 119 FINAL_119_COLUMNS list
# This crucial step ensures the exact order and column presence (filling missing with 0)
X_processed = df_encoded.reindex(columns=FINAL_119_COLUMNS, fill_value=0)
# Convert to NumPy array
X_array = X_processed.values.astype(np.float32)
# 4. Standard Scaling (on the entire 119-feature vector)
X_scaled = scaler.transform(X_array)
# 5. Reshape for CNN (1, 119, 1)
X_cnn = X_scaled.reshape((1, X_scaled.shape[1], 1))
# 6. Predict
prediction = model.predict(X_cnn, verbose=0)
# Determine result (binary classification threshold 0.5)
probability = prediction[0][0]
if probability > 0.5:
# Detected as Attack
result = f"🚨 ATTACK DETECTED! (Probability: {probability*100:.2f}%)"
color = "red"
else:
# Detected as Normal
result = f"✅ Normal Traffic (Probability: {(1 - probability)*100:.2f}%)"
color = "green"
return f'{result}
'
# --- 3. GRADIO INTERFACE SETUP ---
# Use placeholders for the categorical choices since we don't have the categorical map file
# This assumes the user will input valid strings like 'tcp', 'http', 'SF'.
# For a robust deployed app, you should load the unique categorical values.
# For demonstration, we'll use simple Textboxes or common examples.
input_components = []
for name in RAW_41_FEATURES:
if name in NUMERICAL_COLS:
input_components.append(gr.Number(label=name, value=0.0))
elif name == 'protocol_type':
input_components.append(gr.Dropdown(label=name, choices=['tcp', 'udp', 'icmp'], value='tcp'))
elif name == 'flag':
input_components.append(gr.Dropdown(label=name, choices=['SF', 'S0', 'REJ', 'RSTR', 'OTH'], value='SF'))
elif name == 'service':
# Service has 70+ values; using Textbox is best unless all choices are loaded
input_components.append(gr.Textbox(label=name, value='http'))
else:
input_components.append(gr.Textbox(label=name, value='0'))
# Example Neptune DoS attack vector: [0, tcp, private, S0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 10, 1, 1, 0, 0, 0.04, 0.06, 0, 255, 10, 0.04, 0.06, 0, 0, 1, 1, 0, 0]
example_attack_data = [
0.0, 'tcp', 'private', 'S0', 0.0, 0.0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
255, 10, 1.0, 1.0, 0.0, 0.0, 0.04, 0.06, 0.0, 255, 10, 0.04, 0.06, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0
]
# Gradio Interface
iface = gr.Interface(
fn=predict_attack,
inputs=input_components,
outputs=gr.HTML(label="Prediction Result"),
title="KDD Intrusion Detection System (CNN)",
description="Enter the 41 raw features of a network connection. The model predicts if the traffic is 'normal' or an 'attack'.",
examples=[example_attack_data]
)
# Launch the app
if __name__ == "__main__":
iface.launch(share=False)