|
|
import gradio as gr |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import tensorflow as tf |
|
|
import joblib |
|
|
import json |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
model = tf.keras.models.load_model('improved_intrusion_detection_model.h5') |
|
|
|
|
|
|
|
|
scaler = joblib.load('kdd_scaler_StandardScaler.joblib') |
|
|
|
|
|
with open('kdd_41_original_feature_names.json', 'r') as f: |
|
|
FEATURE_NAMES = json.load(f) |
|
|
|
|
|
with open('kdd_categorical_unique_values.json', 'r') as f: |
|
|
CATEGORICAL_MAPPING = json.load(f) |
|
|
|
|
|
except FileNotFoundError as e: |
|
|
print(f"Error loading required file: {e}. Ensure all files are in the same directory.") |
|
|
raise |
|
|
|
|
|
|
|
|
CATEGORICAL_COLS = list(CATEGORICAL_MAPPING.keys()) |
|
|
NUMERICAL_COLS = [col for col in FEATURE_NAMES if col not in CATEGORICAL_COLS] |
|
|
|
|
|
|
|
|
|
|
|
FINAL_COLUMNS = NUMERICAL_COLS |
|
|
for col in CATEGORICAL_COLS: |
|
|
for value in CATEGORICAL_MAPPING[col]: |
|
|
FINAL_COLUMNS.append(f'{col}_{value}') |
|
|
|
|
|
|
|
|
|
|
|
def predict_attack(*raw_input_values): |
|
|
""" |
|
|
Processes the 41 raw user inputs, prepares them for the model, and returns a prediction. |
|
|
""" |
|
|
if len(raw_input_values) != len(FEATURE_NAMES): |
|
|
return "Input Error: Expected 41 features, received {len(raw_input_values)}." |
|
|
|
|
|
|
|
|
raw_df = pd.DataFrame([raw_input_values], columns=FEATURE_NAMES) |
|
|
|
|
|
|
|
|
for col in NUMERICAL_COLS: |
|
|
|
|
|
try: |
|
|
raw_df[col] = pd.to_numeric(raw_df[col]) |
|
|
except ValueError: |
|
|
return f"Input Error: Non-numeric value detected in column: {col}" |
|
|
|
|
|
|
|
|
df_encoded = raw_df.copy() |
|
|
for col, unique_values in CATEGORICAL_MAPPING.items(): |
|
|
|
|
|
ohe_temp = pd.DataFrame(0, index=df_encoded.index, columns=[f'{col}_{val}' for val in unique_values]) |
|
|
|
|
|
|
|
|
user_value = df_encoded[col].iloc[0] |
|
|
ohe_col_name = f'{col}_{user_value}' |
|
|
if ohe_col_name in ohe_temp.columns: |
|
|
ohe_temp[ohe_col_name] = 1 |
|
|
|
|
|
|
|
|
df_encoded = df_encoded.drop(columns=[col]) |
|
|
df_encoded = pd.concat([df_encoded, ohe_temp], axis=1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
X_processed = df_encoded.reindex(columns=FINAL_COLUMNS, fill_value=0) |
|
|
|
|
|
|
|
|
X_array = X_processed.values.astype(np.float32) |
|
|
|
|
|
|
|
|
X_scaled = scaler.transform(X_array) |
|
|
|
|
|
|
|
|
X_cnn = X_scaled.reshape((1, X_scaled.shape[1], 1)) |
|
|
|
|
|
|
|
|
prediction = model.predict(X_cnn, verbose=0) |
|
|
|
|
|
|
|
|
|
|
|
probability = prediction[0][0] |
|
|
|
|
|
if probability > 0.5: |
|
|
result = f"π¨ ATTACK DETECTED! (Confidence: {probability:.2f})" |
|
|
color = "red" |
|
|
else: |
|
|
result = f"β
Normal Traffic (Confidence: {1 - probability:.2f})" |
|
|
color = "green" |
|
|
|
|
|
|
|
|
return f'<h1 style="color:{color}; font-size:24px;">{result}</h1>' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
input_components = [] |
|
|
for name in FEATURE_NAMES: |
|
|
if name in NUMERICAL_COLS: |
|
|
|
|
|
input_components.append(gr.Number(label=name, value=0)) |
|
|
elif name in CATEGORICAL_COLS: |
|
|
|
|
|
input_components.append(gr.Dropdown( |
|
|
label=name, |
|
|
choices=CATEGORICAL_MAPPING[name], |
|
|
value=CATEGORICAL_MAPPING[name][0] |
|
|
)) |
|
|
else: |
|
|
|
|
|
input_components.append(gr.Textbox(label=name, value="0")) |
|
|
|
|
|
|
|
|
iface = gr.Interface( |
|
|
fn=predict_attack, |
|
|
inputs=input_components, |
|
|
outputs=gr.HTML(label="Prediction Result"), |
|
|
title="Intrusion Detection System (KDD/NSL-KDD CNN)", |
|
|
description="Enter the 41 feature values of a network connection to detect if it is an attack or normal traffic. Use the attack patterns provided (e.g., Neptune DoS) to test the model." |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
iface.launch() |