import pandas as pd import numpy as np import matplotlib.pyplot as plt from sklearn.preprocessing import MinMaxScaler, LabelEncoder from sklearn.ensemble import IsolationForest from pyod.models.hbos import HBOS from pyod.models.ecod import ECOD from pyod.models.lof import LOF import gradio as gr # Step 1: Generate synthetic network logs dataset users = [f"user_{i}" for i in range(1, 11)] # 10 unique users protocols = ["TCP", "UDP"] actions = ["allow", "deny"] np.random.seed(42) data = [ [ f"2024-12-01T12:{np.random.randint(0, 59):02}:00Z", f"192.168.1.{np.random.randint(1, 255)}", f"10.0.0.{np.random.randint(1, 255)}", np.random.randint(100, 10000), # bytes np.random.choice(protocols), np.random.randint(1024, 65535), # src_port np.random.randint(1, 65535), # dest_port np.random.choice(actions), round(np.random.uniform(0.1, 10.0), 2), # duration np.random.randint(1, 1000), # packets np.random.choice(users), # user ] for _ in range(1000) ] columns = ["timestamp", "src_ip", "dest_ip", "bytes", "protocol", "src_port", "dest_port", "action", "duration", "packets", "user"] df = pd.DataFrame(data, columns=columns) # Preprocess the dataset df["timestamp_unix"] = pd.to_datetime(df["timestamp"]).view('int64') // 10**9 # Encode categorical features label_encoders = {} for column in ["protocol", "action", "user"]: le = LabelEncoder() df[column + "_index"] = le.fit_transform(df[column]) label_encoders[column] = le # Normalize numerical features scaler = MinMaxScaler() feature_columns = ["timestamp_unix", "bytes", "src_port", "dest_port", "duration", "packets", "protocol_index", "action_index", "user_index"] df[feature_columns] = scaler.fit_transform(df[feature_columns]) # Map feature column names to actual names feature_mapping = { "timestamp_unix": "Timestamp (Unix)", "bytes": "Bytes Transferred", "src_port": "Source Port", "dest_port": "Destination Port", "duration": "Duration", "packets": "Number of Packets", "protocol_index": "Protocol (TCP/UDP)", "action_index": "Action (Allow/Deny)", "user_index": "User", } # Step 2: Function to visualize anomalies def visualize_anomalies(feature1, feature2, sample_size): # Validate features if feature1 not in feature_columns or feature2 not in feature_columns: raise ValueError("Selected features are not valid.") # Sample the dataset sample_size = min(sample_size, len(df)) # Ensure sample size is not larger than the dataset sampled_df = df.sample(sample_size, random_state=42) X = sampled_df[feature_columns].values # Initialize anomaly detection models models = { "Isolation Forest": IsolationForest(contamination=0.1, random_state=42), "HBOS": HBOS(contamination=0.1), "ECOD": ECOD(contamination=0.1), "LOF": LOF(contamination=0.1), } # Train models and collect predictions predictions = {} for name, model in models.items(): model.fit(X) predictions[name] = model.predict(X) # 0 for inliers, 1 for outliers # Visualize results fig, axes = plt.subplots(1, len(models), figsize=(20, 5), sharey=True) feature1_index = feature_columns.index(feature1) feature2_index = feature_columns.index(feature2) for i, (name, preds) in enumerate(predictions.items()): axes[i].scatter(X[:, feature1_index], X[:, feature2_index], c=preds, cmap="coolwarm", s=10) axes[i].set_title(name) axes[i].set_xlabel(feature_mapping[feature1]) axes[i].set_ylabel(feature_mapping[feature2]) plt.suptitle("Comparison of Anomaly Detection Algorithms") plt.tight_layout() return fig # Create Gradio Interface for Anomaly Detection Algorithm Comparison demo = gr.Blocks() with demo: gr.Markdown("### Anomaly Detection Algorithm Comparison") with gr.Row(): with gr.Column(): feature1_dropdown = gr.Dropdown( choices=list(feature_mapping.keys()), label="Feature 1" ) feature2_dropdown = gr.Dropdown( choices=list(feature_mapping.keys()), label="Feature 2" ) sample_slider = gr.Slider( minimum=10, maximum=1000, step=10, value=500, label="Number of Samples" ) submit_button = gr.Button("Visualize") plot_output = gr.Plot(label="Visualization Results") submit_button.click( fn=visualize_anomalies, inputs=[feature1_dropdown, feature2_dropdown, sample_slider], outputs=plot_output, ) demo.launch()