File size: 5,591 Bytes
9f3c33c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import numpy as np
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.ensemble import IsolationForest
from sklearn.metrics import roc_curve, auc
import shap
import matplotlib.pyplot as plt
import gradio as gr
from sklearn import svm
from sklearn.covariance import EllipticEnvelope
from sklearn.neighbors import LocalOutlierFactor
from sklearn.linear_model import SGDOneClassSVM
from sklearn.kernel_approximation import Nystroem
from sklearn.pipeline import make_pipeline
import time
from functools import partial


# Generate synthetic data with 20 features
np.random.seed(42)
X, _ = make_classification(
    n_samples=500,
    n_features=20,
    n_informative=10,
    n_redundant=5,
    n_clusters_per_class=1,
    random_state=42
)
outliers = np.random.uniform(low=-6, high=6, size=(50, 20))  # Add outliers
X = np.vstack([X, outliers])

# Convert to DataFrame
columns = [f"Feature{i+1}" for i in range(20)]
df = pd.DataFrame(X, columns=columns)

# Fit Isolation Forest
iso_forest = IsolationForest(
    n_estimators=100,
    max_samples=256,
    contamination=0.1,
    random_state=42
)
iso_forest.fit(df)

# Predict anomaly scores
anomaly_scores = iso_forest.decision_function(df)  # Negative values indicate anomalies
anomaly_labels = iso_forest.predict(df)  # -1 for anomaly, 1 for normal

# Add results to DataFrame
df["Anomaly_Score"] = anomaly_scores
df["Anomaly_Label"] = np.where(anomaly_labels == -1, "Anomaly", "Normal")

# Generate true labels (1 for anomaly, 0 for normal) for ROC curve
true_labels = np.where(df["Anomaly_Label"] == "Anomaly", 1, 0)

# SHAP Explainability
explainer = shap.Explainer(iso_forest, df[columns])
shap_values = explainer(df[columns])


# Functions for Anomaly Detection Algorithms tab
def train_models(input_data, outliers_fraction, n_samples, clf_name):
    """Train anomaly detection models and plot results."""
    n_outliers = int(outliers_fraction * n_samples)
    n_inliers = n_samples - n_outliers
    blobs_params = dict(random_state=0, n_samples=n_inliers, n_features=2)
    NAME_CLF_MAPPING = {
        "Robust covariance": EllipticEnvelope(contamination=outliers_fraction),
        "One-Class SVM": svm.OneClassSVM(nu=outliers_fraction, kernel="rbf", gamma=0.1),
        "One-Class SVM (SGD)": make_pipeline(
            Nystroem(gamma=0.1, random_state=42, n_components=150),
            SGDOneClassSVM(
                nu=outliers_fraction,
                shuffle=True,
                fit_intercept=True,
                random_state=42,
                tol=1e-6,
            ),
        ),
        "Isolation Forest": IsolationForest(contamination=outliers_fraction, random_state=42),
        "Local Outlier Factor": LocalOutlierFactor(n_neighbors=35, contamination=outliers_fraction),
    }
    DATA_MAPPING = {
        "Central Blob": make_blobs(centers=[[0, 0], [0, 0]], cluster_std=0.5, **blobs_params)[0],
        "Two Blobs": make_blobs(centers=[[2, 2], [-2, -2]], cluster_std=[0.5, 0.5], **blobs_params)[0],
        "Blob with Noise": make_blobs(centers=[[2, 2], [-2, -2]], cluster_std=[1.5, 0.3], **blobs_params)[0],
        "Moons": 4.0
        * (make_moons(n_samples=n_samples, noise=0.05, random_state=0)[0] - np.array([0.5, 0.25])),
        "Noise": 14.0 * (np.random.RandomState(42).rand(n_samples, 2) - 0.5),
    }
    xx, yy = np.meshgrid(np.linspace(-7, 7, 150), np.linspace(-7, 7, 150))
    clf = NAME_CLF_MAPPING[clf_name]
    plt.figure(figsize=(10, 8))
    X = DATA_MAPPING[input_data]
    rng = np.random.RandomState(42)
    X = np.concatenate([X, rng.uniform(low=-6, high=6, size=(n_outliers, 2))], axis=0)
    t0 = time.time()
    clf.fit(X)
    t1 = time.time()

    if clf_name == "Local Outlier Factor":
        y_pred = clf.fit_predict(X)
    else:
        y_pred = clf.fit(X).predict(X)

    if clf_name != "Local Outlier Factor":
        Z = clf.predict(np.c_[xx.ravel(), yy.ravel()])
        Z = Z.reshape(xx.shape)
        plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors="black")

    colors = np.array(["#377eb8", "#ff7f00"])
    plt.scatter(X[:, 0], X[:, 1], s=30, color=colors[(y_pred + 1) // 2])

    plt.xlim(-7, 7)
    plt.ylim(-7, 7)
    plt.xticks(())
    plt.yticks(())
    plt.title(f"{clf_name} (time: {t1 - t0:.2f}s)")
    return plt


# Create Gradio interface
with gr.Blocks() as demo:
    gr.Markdown("# Isolation Forest Anomaly Detection")
    
    with gr.Tab("Anomaly Detection Algorithms"):
        gr.Markdown("## Compare Anomaly Detection Algorithms")
        input_models = [
            "Robust covariance", "One-Class SVM", "One-Class SVM (SGD)", "Isolation Forest", "Local Outlier Factor"
        ]
        input_data = gr.Radio(
            choices=["Central Blob", "Two Blobs", "Blob with Noise", "Moons", "Noise"],
            value="Moons",
            label="Dataset Type"
        )
        n_samples = gr.Slider(
            minimum=100, maximum=500, step=25, value=300, label="Number of Samples"
        )
        outliers_fraction = gr.Slider(
            minimum=0.1, maximum=0.9, step=0.1, value=0.2, label="Outlier Fraction"
        )

        for clf_name in input_models:
            plot = gr.Plot(label=clf_name)
            fn = partial(train_models, clf_name=clf_name)
            input_data.change(fn=fn, inputs=[input_data, outliers_fraction, n_samples], outputs=plot)
            n_samples.change(fn=fn, inputs=[input_data, outliers_fraction, n_samples], outputs=plot)
            outliers_fraction.change(fn=fn, inputs=[input_data, outliers_fraction, n_samples], outputs=plot)

# Launch the Gradio app
demo.launch()