rtik007's picture
Create app.py
0fd2603 verified
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from sklearn.ensemble import IsolationForest
from pyod.models.hbos import HBOS
from pyod.models.ecod import ECOD
from pyod.models.lof import LOF
import gradio as gr
# Step 1: Generate synthetic network logs dataset
users = [f"user_{i}" for i in range(1, 11)] # 10 unique users
protocols = ["TCP", "UDP"]
actions = ["allow", "deny"]
np.random.seed(42)
data = [
[
f"2024-12-01T12:{np.random.randint(0, 59):02}:00Z",
f"192.168.1.{np.random.randint(1, 255)}",
f"10.0.0.{np.random.randint(1, 255)}",
np.random.randint(100, 10000), # bytes
np.random.choice(protocols),
np.random.randint(1024, 65535), # src_port
np.random.randint(1, 65535), # dest_port
np.random.choice(actions),
round(np.random.uniform(0.1, 10.0), 2), # duration
np.random.randint(1, 1000), # packets
np.random.choice(users), # user
]
for _ in range(1000)
]
columns = ["timestamp", "src_ip", "dest_ip", "bytes", "protocol", "src_port", "dest_port", "action", "duration", "packets", "user"]
df = pd.DataFrame(data, columns=columns)
# Preprocess the dataset
df["timestamp_unix"] = pd.to_datetime(df["timestamp"]).view('int64') // 10**9
# Encode categorical features
label_encoders = {}
for column in ["protocol", "action", "user"]:
le = LabelEncoder()
df[column + "_index"] = le.fit_transform(df[column])
label_encoders[column] = le
# Normalize numerical features
scaler = MinMaxScaler()
feature_columns = ["timestamp_unix", "bytes", "src_port", "dest_port", "duration", "packets", "protocol_index", "action_index", "user_index"]
df[feature_columns] = scaler.fit_transform(df[feature_columns])
# Map feature column names to actual names
feature_mapping = {
"timestamp_unix": "Timestamp (Unix)",
"bytes": "Bytes Transferred",
"src_port": "Source Port",
"dest_port": "Destination Port",
"duration": "Duration",
"packets": "Number of Packets",
"protocol_index": "Protocol (TCP/UDP)",
"action_index": "Action (Allow/Deny)",
"user_index": "User",
}
# Step 2: Function to visualize anomalies
def visualize_anomalies(feature1, feature2, sample_size):
# Validate features
if feature1 not in feature_columns or feature2 not in feature_columns:
raise ValueError("Selected features are not valid.")
# Sample the dataset
sample_size = min(sample_size, len(df)) # Ensure sample size is not larger than the dataset
sampled_df = df.sample(sample_size, random_state=42)
X = sampled_df[feature_columns].values
# Initialize anomaly detection models
models = {
"Isolation Forest": IsolationForest(contamination=0.1, random_state=42),
"HBOS": HBOS(contamination=0.1),
"ECOD": ECOD(contamination=0.1),
"LOF": LOF(contamination=0.1),
}
# Train models and collect predictions
predictions = {}
for name, model in models.items():
model.fit(X)
predictions[name] = model.predict(X) # 0 for inliers, 1 for outliers
# Visualize results
fig, axes = plt.subplots(1, len(models), figsize=(20, 5), sharey=True)
feature1_index = feature_columns.index(feature1)
feature2_index = feature_columns.index(feature2)
for i, (name, preds) in enumerate(predictions.items()):
axes[i].scatter(X[:, feature1_index], X[:, feature2_index], c=preds, cmap="coolwarm", s=10)
axes[i].set_title(name)
axes[i].set_xlabel(feature_mapping[feature1])
axes[i].set_ylabel(feature_mapping[feature2])
plt.suptitle("Comparison of Anomaly Detection Algorithms")
plt.tight_layout()
return fig
# Create Gradio Interface for Anomaly Detection Algorithm Comparison
demo = gr.Blocks()
with demo:
gr.Markdown("### Anomaly Detection Algorithm Comparison")
with gr.Row():
with gr.Column():
feature1_dropdown = gr.Dropdown(
choices=list(feature_mapping.keys()),
label="Feature 1"
)
feature2_dropdown = gr.Dropdown(
choices=list(feature_mapping.keys()),
label="Feature 2"
)
sample_slider = gr.Slider(
minimum=10,
maximum=1000,
step=10,
value=500,
label="Number of Samples"
)
submit_button = gr.Button("Visualize")
plot_output = gr.Plot(label="Visualization Results")
submit_button.click(
fn=visualize_anomalies,
inputs=[feature1_dropdown, feature2_dropdown, sample_slider],
outputs=plot_output,
)
demo.launch()