|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import matplotlib.pyplot as plt |
|
|
from sklearn.preprocessing import MinMaxScaler, LabelEncoder |
|
|
from sklearn.ensemble import IsolationForest |
|
|
from pyod.models.hbos import HBOS |
|
|
from pyod.models.ecod import ECOD |
|
|
from pyod.models.lof import LOF |
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
users = [f"user_{i}" for i in range(1, 11)] |
|
|
protocols = ["TCP", "UDP"] |
|
|
actions = ["allow", "deny"] |
|
|
|
|
|
np.random.seed(42) |
|
|
data = [ |
|
|
[ |
|
|
f"2024-12-01T12:{np.random.randint(0, 59):02}:00Z", |
|
|
f"192.168.1.{np.random.randint(1, 255)}", |
|
|
f"10.0.0.{np.random.randint(1, 255)}", |
|
|
np.random.randint(100, 10000), |
|
|
np.random.choice(protocols), |
|
|
np.random.randint(1024, 65535), |
|
|
np.random.randint(1, 65535), |
|
|
np.random.choice(actions), |
|
|
round(np.random.uniform(0.1, 10.0), 2), |
|
|
np.random.randint(1, 1000), |
|
|
np.random.choice(users), |
|
|
] |
|
|
for _ in range(1000) |
|
|
] |
|
|
columns = ["timestamp", "src_ip", "dest_ip", "bytes", "protocol", "src_port", "dest_port", "action", "duration", "packets", "user"] |
|
|
|
|
|
df = pd.DataFrame(data, columns=columns) |
|
|
|
|
|
|
|
|
df["timestamp_unix"] = pd.to_datetime(df["timestamp"]).view('int64') // 10**9 |
|
|
|
|
|
|
|
|
label_encoders = {} |
|
|
for column in ["protocol", "action", "user"]: |
|
|
le = LabelEncoder() |
|
|
df[column + "_index"] = le.fit_transform(df[column]) |
|
|
label_encoders[column] = le |
|
|
|
|
|
|
|
|
scaler = MinMaxScaler() |
|
|
feature_columns = ["timestamp_unix", "bytes", "src_port", "dest_port", "duration", "packets", "protocol_index", "action_index", "user_index"] |
|
|
df[feature_columns] = scaler.fit_transform(df[feature_columns]) |
|
|
|
|
|
|
|
|
feature_mapping = { |
|
|
"timestamp_unix": "Timestamp (Unix)", |
|
|
"bytes": "Bytes Transferred", |
|
|
"src_port": "Source Port", |
|
|
"dest_port": "Destination Port", |
|
|
"duration": "Duration", |
|
|
"packets": "Number of Packets", |
|
|
"protocol_index": "Protocol (TCP/UDP)", |
|
|
"action_index": "Action (Allow/Deny)", |
|
|
"user_index": "User", |
|
|
} |
|
|
|
|
|
|
|
|
def visualize_anomalies(feature1, feature2, sample_size): |
|
|
|
|
|
if feature1 not in feature_columns or feature2 not in feature_columns: |
|
|
raise ValueError("Selected features are not valid.") |
|
|
|
|
|
|
|
|
sample_size = min(sample_size, len(df)) |
|
|
sampled_df = df.sample(sample_size, random_state=42) |
|
|
X = sampled_df[feature_columns].values |
|
|
|
|
|
|
|
|
models = { |
|
|
"Isolation Forest": IsolationForest(contamination=0.1, random_state=42), |
|
|
"HBOS": HBOS(contamination=0.1), |
|
|
"ECOD": ECOD(contamination=0.1), |
|
|
"LOF": LOF(contamination=0.1), |
|
|
} |
|
|
|
|
|
|
|
|
predictions = {} |
|
|
for name, model in models.items(): |
|
|
model.fit(X) |
|
|
predictions[name] = model.predict(X) |
|
|
|
|
|
|
|
|
fig, axes = plt.subplots(1, len(models), figsize=(20, 5), sharey=True) |
|
|
feature1_index = feature_columns.index(feature1) |
|
|
feature2_index = feature_columns.index(feature2) |
|
|
|
|
|
for i, (name, preds) in enumerate(predictions.items()): |
|
|
axes[i].scatter(X[:, feature1_index], X[:, feature2_index], c=preds, cmap="coolwarm", s=10) |
|
|
axes[i].set_title(name) |
|
|
axes[i].set_xlabel(feature_mapping[feature1]) |
|
|
axes[i].set_ylabel(feature_mapping[feature2]) |
|
|
|
|
|
plt.suptitle("Comparison of Anomaly Detection Algorithms") |
|
|
plt.tight_layout() |
|
|
return fig |
|
|
|
|
|
|
|
|
demo = gr.Blocks() |
|
|
with demo: |
|
|
gr.Markdown("### Anomaly Detection Algorithm Comparison") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
feature1_dropdown = gr.Dropdown( |
|
|
choices=list(feature_mapping.keys()), |
|
|
label="Feature 1" |
|
|
) |
|
|
feature2_dropdown = gr.Dropdown( |
|
|
choices=list(feature_mapping.keys()), |
|
|
label="Feature 2" |
|
|
) |
|
|
sample_slider = gr.Slider( |
|
|
minimum=10, |
|
|
maximum=1000, |
|
|
step=10, |
|
|
value=500, |
|
|
label="Number of Samples" |
|
|
) |
|
|
submit_button = gr.Button("Visualize") |
|
|
|
|
|
plot_output = gr.Plot(label="Visualization Results") |
|
|
|
|
|
submit_button.click( |
|
|
fn=visualize_anomalies, |
|
|
inputs=[feature1_dropdown, feature2_dropdown, sample_slider], |
|
|
outputs=plot_output, |
|
|
) |
|
|
|
|
|
demo.launch() |