import gradio as gr import tensorflow as tf import numpy as np import pandas as pd # 1. Load your trained .keras model # This line loads your entire model from the .keras file. model = tf.keras.models.load_model("adversarially_trained_model.keras") print("output_shape:", model.output_shape) # e.g., (None, 5) CLASS_NAMES = ["normal", "dos", "probe", "r2l", "u2r"] def preprocess_single_record(record: dict, scaler, df_reference: pd.DataFrame) -> np.ndarray: # Convert input dict to DataFrame df_input = pd.DataFrame([record]) # Fill missing columns with 0 (for numeric) or mode (for categorical) for col in df_reference.columns: if col not in df_input.columns: if df_reference[col].dtype == 'float64': df_input[col] = 0.0 else: df_input[col] = df_reference[col].mode()[0] if not df_reference[col].empty else 'unknown' # Ensure correct column order df_input = df_input[df_reference.columns] # Convert numeric columns to float numeric_cols = [ "duration", "src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent", "hot", "num_failed_logins", "logged_in", "num_compromised", "root_shell", "su_attempted", "num_root", "num_file_creations", "num_shells", "num_access_files", "num_outbound_cmds", "is_host_login", "is_guest_login", "count", "srv_count", "serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate", "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate", "dst_host_count", "dst_host_srv_count", "dst_host_same_srv_rate", "dst_host_diff_srv_rate", "dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate", "dst_host_serror_rate", "dst_host_srv_serror_rate", "dst_host_rerror_rate", "dst_host_srv_rerror_rate" ] df_input[numeric_cols] = df_input[numeric_cols].astype(float) # One-hot encode categorical features df_categorical = pd.get_dummies(df_input[["protocol_type", "service", "flag"]]) df_numeric = df_input[numeric_cols] df_final = pd.concat([df_numeric, df_categorical], axis=1) # Align with reference columns (from training) df_final = df_final.reindex(columns=df_reference.columns, fill_value=0) # Scale using the provided scaler X_processed = scaler.transform(df_final) return X_processed # 2. Create the prediction function def predict_from_array(input_text): """ Takes a comma-separated string of floats, preprocesses it, and returns the model's prediction. """ # 3. Parse the input string try: # Split by comma, strip whitespace, and convert to float float_values = [float(x.strip()) for x in input_text.split(',')] # Convert the list to a NumPy array input_array = np.array(float_values) except ValueError as e: return f"Input Error: Please enter numbers only. Details: {e}" except Exception as e: return f"Error: {e}" # 4. Reshape the array for the model # !!! IMPORTANT !!! # Most Keras models expect a batch. If your model was trained on # inputs with shape (num_samples, num_features), you must reshape # your 1D array to (1, num_features). # This line assumes your model expects one "row" of features try: input_array = input_array.reshape(1, -1) except ValueError as e: return f"Shape Error: Model expects a different number of features. Got {len(float_values)}. Details: {e}" # 5. Make prediction predictions = model.predict(input_array) scores = predictions[0] # Ensure 1D array scores = np.array(scores).reshape(-1) # 6. Format the output # --- OPTION A: For Classification --- # Use this if your model outputs probabilities for classes num_outputs = len(scores) # Align class names to model outputs if len(CLASS_NAMES) < num_outputs: aligned_class_names = CLASS_NAMES + [f"class_{i}" for i in range(len(CLASS_NAMES), num_outputs)] else: aligned_class_names = CLASS_NAMES[:num_outputs] # Normalize scores to probabilities if they don't sum to ~1 exp_scores = np.exp(scores - np.max(scores)) prob_scores = exp_scores / np.sum(exp_scores) if np.isfinite(exp_scores).all() and exp_scores.sum() > 0 else scores confidences = {aligned_class_names[i]: float(prob_scores[i]) for i in range(num_outputs)} return confidences # --- OPTION B: For Regression (or single value output) --- # Use this if your model outputs a single number # predicted_value = float(scores[0]) # return f"Predicted Value: {predicted_value:.4f}" # --- OPTION C: For Raw Array Output --- # Just return the raw prediction scores # return str(scores) # --- Gradio UI --- # 7. Define the Gradio Interface demo = gr.Interface( fn=predict_from_array, # Input: A Textbox. inputs=gr.Textbox( label="Input Features", placeholder="Enter comma-separated float values, e.g., 1.5, 0.8, -2.3, 4.0" ), # Output: A Textbox for regression (Option B) or Label for classification (Option A) # Use gr.Label() if you return a dict for classification (Option A) outputs=gr.Label(num_top_classes=3, label="Attack Type Predictions"), # Use gr.Textbox() for regression (Option B) or raw output (Option C) # outputs=gr.Textbox(label="Prediction"), title="Network Intrusion Detection System (NIDS)", description="Adversarially trained model for network attack classification. Input network traffic features as comma-separated values to detect attack types: normal, dos, probe, r2l, u2r, or other." ) # 8. Launch the app if __name__ == "__main__": demo.launch()