File size: 4,758 Bytes
0fd2603 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from sklearn.ensemble import IsolationForest
from pyod.models.hbos import HBOS
from pyod.models.ecod import ECOD
from pyod.models.lof import LOF
import gradio as gr
# Step 1: Generate synthetic network logs dataset
users = [f"user_{i}" for i in range(1, 11)] # 10 unique users
protocols = ["TCP", "UDP"]
actions = ["allow", "deny"]
np.random.seed(42)
data = [
[
f"2024-12-01T12:{np.random.randint(0, 59):02}:00Z",
f"192.168.1.{np.random.randint(1, 255)}",
f"10.0.0.{np.random.randint(1, 255)}",
np.random.randint(100, 10000), # bytes
np.random.choice(protocols),
np.random.randint(1024, 65535), # src_port
np.random.randint(1, 65535), # dest_port
np.random.choice(actions),
round(np.random.uniform(0.1, 10.0), 2), # duration
np.random.randint(1, 1000), # packets
np.random.choice(users), # user
]
for _ in range(1000)
]
columns = ["timestamp", "src_ip", "dest_ip", "bytes", "protocol", "src_port", "dest_port", "action", "duration", "packets", "user"]
df = pd.DataFrame(data, columns=columns)
# Preprocess the dataset
df["timestamp_unix"] = pd.to_datetime(df["timestamp"]).view('int64') // 10**9
# Encode categorical features
label_encoders = {}
for column in ["protocol", "action", "user"]:
le = LabelEncoder()
df[column + "_index"] = le.fit_transform(df[column])
label_encoders[column] = le
# Normalize numerical features
scaler = MinMaxScaler()
feature_columns = ["timestamp_unix", "bytes", "src_port", "dest_port", "duration", "packets", "protocol_index", "action_index", "user_index"]
df[feature_columns] = scaler.fit_transform(df[feature_columns])
# Map feature column names to actual names
feature_mapping = {
"timestamp_unix": "Timestamp (Unix)",
"bytes": "Bytes Transferred",
"src_port": "Source Port",
"dest_port": "Destination Port",
"duration": "Duration",
"packets": "Number of Packets",
"protocol_index": "Protocol (TCP/UDP)",
"action_index": "Action (Allow/Deny)",
"user_index": "User",
}
# Step 2: Function to visualize anomalies
def visualize_anomalies(feature1, feature2, sample_size):
# Validate features
if feature1 not in feature_columns or feature2 not in feature_columns:
raise ValueError("Selected features are not valid.")
# Sample the dataset
sample_size = min(sample_size, len(df)) # Ensure sample size is not larger than the dataset
sampled_df = df.sample(sample_size, random_state=42)
X = sampled_df[feature_columns].values
# Initialize anomaly detection models
models = {
"Isolation Forest": IsolationForest(contamination=0.1, random_state=42),
"HBOS": HBOS(contamination=0.1),
"ECOD": ECOD(contamination=0.1),
"LOF": LOF(contamination=0.1),
}
# Train models and collect predictions
predictions = {}
for name, model in models.items():
model.fit(X)
predictions[name] = model.predict(X) # 0 for inliers, 1 for outliers
# Visualize results
fig, axes = plt.subplots(1, len(models), figsize=(20, 5), sharey=True)
feature1_index = feature_columns.index(feature1)
feature2_index = feature_columns.index(feature2)
for i, (name, preds) in enumerate(predictions.items()):
axes[i].scatter(X[:, feature1_index], X[:, feature2_index], c=preds, cmap="coolwarm", s=10)
axes[i].set_title(name)
axes[i].set_xlabel(feature_mapping[feature1])
axes[i].set_ylabel(feature_mapping[feature2])
plt.suptitle("Comparison of Anomaly Detection Algorithms")
plt.tight_layout()
return fig
# Create Gradio Interface for Anomaly Detection Algorithm Comparison
demo = gr.Blocks()
with demo:
gr.Markdown("### Anomaly Detection Algorithm Comparison")
with gr.Row():
with gr.Column():
feature1_dropdown = gr.Dropdown(
choices=list(feature_mapping.keys()),
label="Feature 1"
)
feature2_dropdown = gr.Dropdown(
choices=list(feature_mapping.keys()),
label="Feature 2"
)
sample_slider = gr.Slider(
minimum=10,
maximum=1000,
step=10,
value=500,
label="Number of Samples"
)
submit_button = gr.Button("Visualize")
plot_output = gr.Plot(label="Visualization Results")
submit_button.click(
fn=visualize_anomalies,
inputs=[feature1_dropdown, feature2_dropdown, sample_slider],
outputs=plot_output,
)
demo.launch() |