Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import tensorflow as tf | |
| import numpy as np | |
| import pandas as pd | |
| # 1. Load your trained .keras model | |
| # This line loads your entire model from the .keras file. | |
| model = tf.keras.models.load_model("adversarially_trained_model.keras") | |
| print("output_shape:", model.output_shape) # e.g., (None, 5) | |
| CLASS_NAMES = ["normal", "dos", "probe", "r2l", "u2r"] | |
| def preprocess_single_record(record: dict, scaler, df_reference: pd.DataFrame) -> np.ndarray: | |
| # Convert input dict to DataFrame | |
| df_input = pd.DataFrame([record]) | |
| # Fill missing columns with 0 (for numeric) or mode (for categorical) | |
| for col in df_reference.columns: | |
| if col not in df_input.columns: | |
| if df_reference[col].dtype == 'float64': | |
| df_input[col] = 0.0 | |
| else: | |
| df_input[col] = df_reference[col].mode()[0] if not df_reference[col].empty else 'unknown' | |
| # Ensure correct column order | |
| df_input = df_input[df_reference.columns] | |
| # Convert numeric columns to float | |
| numeric_cols = [ | |
| "duration", "src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent", | |
| "hot", "num_failed_logins", "logged_in", "num_compromised", "root_shell", | |
| "su_attempted", "num_root", "num_file_creations", "num_shells", | |
| "num_access_files", "num_outbound_cmds", "is_host_login", "is_guest_login", | |
| "count", "srv_count", "serror_rate", "srv_serror_rate", "rerror_rate", | |
| "srv_rerror_rate", "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate", | |
| "dst_host_count", "dst_host_srv_count", "dst_host_same_srv_rate", | |
| "dst_host_diff_srv_rate", "dst_host_same_src_port_rate", | |
| "dst_host_srv_diff_host_rate", "dst_host_serror_rate", | |
| "dst_host_srv_serror_rate", "dst_host_rerror_rate", "dst_host_srv_rerror_rate" | |
| ] | |
| df_input[numeric_cols] = df_input[numeric_cols].astype(float) | |
| # One-hot encode categorical features | |
| df_categorical = pd.get_dummies(df_input[["protocol_type", "service", "flag"]]) | |
| df_numeric = df_input[numeric_cols] | |
| df_final = pd.concat([df_numeric, df_categorical], axis=1) | |
| # Align with reference columns (from training) | |
| df_final = df_final.reindex(columns=df_reference.columns, fill_value=0) | |
| # Scale using the provided scaler | |
| X_processed = scaler.transform(df_final) | |
| return X_processed | |
| # 2. Create the prediction function | |
| def predict_from_array(input_text): | |
| """ | |
| Takes a comma-separated string of floats, preprocesses it, | |
| and returns the model's prediction. | |
| """ | |
| # 3. Parse the input string | |
| try: | |
| # Split by comma, strip whitespace, and convert to float | |
| float_values = [float(x.strip()) for x in input_text.split(',')] | |
| # Convert the list to a NumPy array | |
| input_array = np.array(float_values) | |
| except ValueError as e: | |
| return f"Input Error: Please enter numbers only. Details: {e}" | |
| except Exception as e: | |
| return f"Error: {e}" | |
| # 4. Reshape the array for the model | |
| # !!! IMPORTANT !!! | |
| # Most Keras models expect a batch. If your model was trained on | |
| # inputs with shape (num_samples, num_features), you must reshape | |
| # your 1D array to (1, num_features). | |
| # This line assumes your model expects one "row" of features | |
| try: | |
| input_array = input_array.reshape(1, -1) | |
| except ValueError as e: | |
| return f"Shape Error: Model expects a different number of features. Got {len(float_values)}. Details: {e}" | |
| # 5. Make prediction | |
| predictions = model.predict(input_array) | |
| scores = predictions[0] | |
| # Ensure 1D array | |
| scores = np.array(scores).reshape(-1) | |
| # 6. Format the output | |
| # --- OPTION A: For Classification --- | |
| # Use this if your model outputs probabilities for classes | |
| num_outputs = len(scores) | |
| # Align class names to model outputs | |
| if len(CLASS_NAMES) < num_outputs: | |
| aligned_class_names = CLASS_NAMES + [f"class_{i}" for i in range(len(CLASS_NAMES), num_outputs)] | |
| else: | |
| aligned_class_names = CLASS_NAMES[:num_outputs] | |
| # Normalize scores to probabilities if they don't sum to ~1 | |
| exp_scores = np.exp(scores - np.max(scores)) | |
| prob_scores = exp_scores / np.sum(exp_scores) if np.isfinite(exp_scores).all() and exp_scores.sum() > 0 else scores | |
| confidences = {aligned_class_names[i]: float(prob_scores[i]) for i in range(num_outputs)} | |
| return confidences | |
| # --- OPTION B: For Regression (or single value output) --- | |
| # Use this if your model outputs a single number | |
| # predicted_value = float(scores[0]) | |
| # return f"Predicted Value: {predicted_value:.4f}" | |
| # --- OPTION C: For Raw Array Output --- | |
| # Just return the raw prediction scores | |
| # return str(scores) | |
| # --- Gradio UI --- | |
| # 7. Define the Gradio Interface | |
| demo = gr.Interface( | |
| fn=predict_from_array, | |
| # Input: A Textbox. | |
| inputs=gr.Textbox( | |
| label="Input Features", | |
| placeholder="Enter comma-separated float values, e.g., 1.5, 0.8, -2.3, 4.0" | |
| ), | |
| # Output: A Textbox for regression (Option B) or Label for classification (Option A) | |
| # Use gr.Label() if you return a dict for classification (Option A) | |
| outputs=gr.Label(num_top_classes=3, label="Attack Type Predictions"), | |
| # Use gr.Textbox() for regression (Option B) or raw output (Option C) | |
| # outputs=gr.Textbox(label="Prediction"), | |
| title="Network Intrusion Detection System (NIDS)", | |
| description="Adversarially trained model for network attack classification. Input network traffic features as comma-separated values to detect attack types: normal, dos, probe, r2l, u2r, or other." | |
| ) | |
| # 8. Launch the app | |
| if __name__ == "__main__": | |
| demo.launch() |