| import pandas as pd |
| import numpy as np |
| import matplotlib.pyplot as plt |
| from sklearn.preprocessing import MinMaxScaler, LabelEncoder |
| from sklearn.ensemble import IsolationForest |
| from pyod.models.hbos import HBOS |
| from pyod.models.ecod import ECOD |
| from pyod.models.lof import LOF |
| import gradio as gr |
|
|
| |
| users = [f"user_{i}" for i in range(1, 11)] |
| protocols = ["TCP", "UDP"] |
| actions = ["allow", "deny"] |
|
|
| np.random.seed(42) |
| data = [ |
| [ |
| f"2024-12-01T12:{np.random.randint(0, 59):02}:00Z", |
| f"192.168.1.{np.random.randint(1, 255)}", |
| f"10.0.0.{np.random.randint(1, 255)}", |
| np.random.randint(100, 10000), |
| np.random.choice(protocols), |
| np.random.randint(1024, 65535), |
| np.random.randint(1, 65535), |
| np.random.choice(actions), |
| round(np.random.uniform(0.1, 10.0), 2), |
| np.random.randint(1, 1000), |
| np.random.choice(users), |
| ] |
| for _ in range(1000) |
| ] |
| columns = ["timestamp", "src_ip", "dest_ip", "bytes", "protocol", "src_port", "dest_port", "action", "duration", "packets", "user"] |
|
|
| df = pd.DataFrame(data, columns=columns) |
|
|
| |
| df["timestamp_unix"] = pd.to_datetime(df["timestamp"]).view('int64') // 10**9 |
|
|
| |
| label_encoders = {} |
| for column in ["protocol", "action", "user"]: |
| le = LabelEncoder() |
| df[column + "_index"] = le.fit_transform(df[column]) |
| label_encoders[column] = le |
|
|
| |
| scaler = MinMaxScaler() |
| feature_columns = ["timestamp_unix", "bytes", "src_port", "dest_port", "duration", "packets", "protocol_index", "action_index", "user_index"] |
| df[feature_columns] = scaler.fit_transform(df[feature_columns]) |
|
|
| |
| feature_mapping = { |
| "timestamp_unix": "Timestamp (Unix)", |
| "bytes": "Bytes Transferred", |
| "src_port": "Source Port", |
| "dest_port": "Destination Port", |
| "duration": "Duration", |
| "packets": "Number of Packets", |
| "protocol_index": "Protocol (TCP/UDP)", |
| "action_index": "Action (Allow/Deny)", |
| "user_index": "User", |
| } |
|
|
| |
| def visualize_anomalies(feature1, feature2, sample_size): |
| |
| if feature1 not in feature_columns or feature2 not in feature_columns: |
| raise ValueError("Selected features are not valid.") |
|
|
| |
| sample_size = min(sample_size, len(df)) |
| sampled_df = df.sample(sample_size, random_state=42) |
| X = sampled_df[feature_columns].values |
|
|
| |
| models = { |
| "Isolation Forest": IsolationForest(contamination=0.1, random_state=42), |
| "HBOS": HBOS(contamination=0.1), |
| "ECOD": ECOD(contamination=0.1), |
| "LOF": LOF(contamination=0.1), |
| } |
|
|
| |
| predictions = {} |
| for name, model in models.items(): |
| model.fit(X) |
| predictions[name] = model.predict(X) |
|
|
| |
| fig, axes = plt.subplots(1, len(models), figsize=(20, 5), sharey=True) |
| feature1_index = feature_columns.index(feature1) |
| feature2_index = feature_columns.index(feature2) |
|
|
| for i, (name, preds) in enumerate(predictions.items()): |
| axes[i].scatter(X[:, feature1_index], X[:, feature2_index], c=preds, cmap="coolwarm", s=10) |
| axes[i].set_title(name) |
| axes[i].set_xlabel(feature_mapping[feature1]) |
| axes[i].set_ylabel(feature_mapping[feature2]) |
|
|
| plt.suptitle("Comparison of Anomaly Detection Algorithms") |
| plt.tight_layout() |
| return fig |
|
|
| |
| demo = gr.Blocks() |
| with demo: |
| gr.Markdown("### Anomaly Detection Algorithm Comparison") |
| with gr.Row(): |
| with gr.Column(): |
| feature1_dropdown = gr.Dropdown( |
| choices=list(feature_mapping.keys()), |
| label="Feature 1" |
| ) |
| feature2_dropdown = gr.Dropdown( |
| choices=list(feature_mapping.keys()), |
| label="Feature 2" |
| ) |
| sample_slider = gr.Slider( |
| minimum=10, |
| maximum=1000, |
| step=10, |
| value=500, |
| label="Number of Samples" |
| ) |
| submit_button = gr.Button("Visualize") |
|
|
| plot_output = gr.Plot(label="Visualization Results") |
|
|
| submit_button.click( |
| fn=visualize_anomalies, |
| inputs=[feature1_dropdown, feature2_dropdown, sample_slider], |
| outputs=plot_output, |
| ) |
|
|
| demo.launch() |