Spaces:
Sleeping
Sleeping
File size: 5,780 Bytes
ddc5c50 7338d26 ddc5c50 5301eea ddc5c50 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
import gradio as gr
import tensorflow as tf
import numpy as np
import pandas as pd
# 1. Load your trained .keras model
# This line loads your entire model from the .keras file.
model = tf.keras.models.load_model("adversarially_trained_model.keras")
print("output_shape:", model.output_shape) # e.g., (None, 5)
CLASS_NAMES = ["normal", "dos", "probe", "r2l", "u2r"]
def preprocess_single_record(record: dict, scaler, df_reference: pd.DataFrame) -> np.ndarray:
# Convert input dict to DataFrame
df_input = pd.DataFrame([record])
# Fill missing columns with 0 (for numeric) or mode (for categorical)
for col in df_reference.columns:
if col not in df_input.columns:
if df_reference[col].dtype == 'float64':
df_input[col] = 0.0
else:
df_input[col] = df_reference[col].mode()[0] if not df_reference[col].empty else 'unknown'
# Ensure correct column order
df_input = df_input[df_reference.columns]
# Convert numeric columns to float
numeric_cols = [
"duration", "src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
"hot", "num_failed_logins", "logged_in", "num_compromised", "root_shell",
"su_attempted", "num_root", "num_file_creations", "num_shells",
"num_access_files", "num_outbound_cmds", "is_host_login", "is_guest_login",
"count", "srv_count", "serror_rate", "srv_serror_rate", "rerror_rate",
"srv_rerror_rate", "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
"dst_host_count", "dst_host_srv_count", "dst_host_same_srv_rate",
"dst_host_diff_srv_rate", "dst_host_same_src_port_rate",
"dst_host_srv_diff_host_rate", "dst_host_serror_rate",
"dst_host_srv_serror_rate", "dst_host_rerror_rate", "dst_host_srv_rerror_rate"
]
df_input[numeric_cols] = df_input[numeric_cols].astype(float)
# One-hot encode categorical features
df_categorical = pd.get_dummies(df_input[["protocol_type", "service", "flag"]])
df_numeric = df_input[numeric_cols]
df_final = pd.concat([df_numeric, df_categorical], axis=1)
# Align with reference columns (from training)
df_final = df_final.reindex(columns=df_reference.columns, fill_value=0)
# Scale using the provided scaler
X_processed = scaler.transform(df_final)
return X_processed
# 2. Create the prediction function
def predict_from_array(input_text):
"""
Takes a comma-separated string of floats, preprocesses it,
and returns the model's prediction.
"""
# 3. Parse the input string
try:
# Split by comma, strip whitespace, and convert to float
float_values = [float(x.strip()) for x in input_text.split(',')]
# Convert the list to a NumPy array
input_array = np.array(float_values)
except ValueError as e:
return f"Input Error: Please enter numbers only. Details: {e}"
except Exception as e:
return f"Error: {e}"
# 4. Reshape the array for the model
# !!! IMPORTANT !!!
# Most Keras models expect a batch. If your model was trained on
# inputs with shape (num_samples, num_features), you must reshape
# your 1D array to (1, num_features).
# This line assumes your model expects one "row" of features
try:
input_array = input_array.reshape(1, -1)
except ValueError as e:
return f"Shape Error: Model expects a different number of features. Got {len(float_values)}. Details: {e}"
# 5. Make prediction
predictions = model.predict(input_array)
scores = predictions[0]
# Ensure 1D array
scores = np.array(scores).reshape(-1)
# 6. Format the output
# --- OPTION A: For Classification ---
# Use this if your model outputs probabilities for classes
num_outputs = len(scores)
# Align class names to model outputs
if len(CLASS_NAMES) < num_outputs:
aligned_class_names = CLASS_NAMES + [f"class_{i}" for i in range(len(CLASS_NAMES), num_outputs)]
else:
aligned_class_names = CLASS_NAMES[:num_outputs]
# Normalize scores to probabilities if they don't sum to ~1
exp_scores = np.exp(scores - np.max(scores))
prob_scores = exp_scores / np.sum(exp_scores) if np.isfinite(exp_scores).all() and exp_scores.sum() > 0 else scores
confidences = {aligned_class_names[i]: float(prob_scores[i]) for i in range(num_outputs)}
return confidences
# --- OPTION B: For Regression (or single value output) ---
# Use this if your model outputs a single number
# predicted_value = float(scores[0])
# return f"Predicted Value: {predicted_value:.4f}"
# --- OPTION C: For Raw Array Output ---
# Just return the raw prediction scores
# return str(scores)
# --- Gradio UI ---
# 7. Define the Gradio Interface
demo = gr.Interface(
fn=predict_from_array,
# Input: A Textbox.
inputs=gr.Textbox(
label="Input Features",
placeholder="Enter comma-separated float values, e.g., 1.5, 0.8, -2.3, 4.0"
),
# Output: A Textbox for regression (Option B) or Label for classification (Option A)
# Use gr.Label() if you return a dict for classification (Option A)
outputs=gr.Label(num_top_classes=3, label="Attack Type Predictions"),
# Use gr.Textbox() for regression (Option B) or raw output (Option C)
# outputs=gr.Textbox(label="Prediction"),
title="Network Intrusion Detection System (NIDS)",
description="Adversarially trained model for network attack classification. Input network traffic features as comma-separated values to detect attack types: normal, dos, probe, r2l, u2r, or other."
)
# 8. Launch the app
if __name__ == "__main__":
demo.launch() |