LSTM-forecaster / utils /anomaly_detection.py
nkapila6's picture
Upload 373 files
964e116 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on 2024-07-24 13:04:10 Wednesday
@author: Nikhil Kapila
"""
from pyod.models.knn import KNN
import plotly.graph_objects as go
import numpy as np
import pandas as pd
def sliding_windows(data:pd.DataFrame, lookback:int=30)->tuple[np.array, np.array, pd.Index]:
X, y, timestamps = [], [], []
for i in range(len(data) - lookback):
X.append(data.iloc[i:i + lookback].values)
y.append(data.iloc[i + lookback])
timestamps.append(data.index[i + lookback])
return np.array(X), np.array(y), pd.Index(timestamps)
def detect_anomalies_with_sliding_windows(data: pd.DataFrame, lookback: int = 30):
X, y, timestamps = sliding_windows(data, lookback)
X_flat = X.reshape(X.shape[0], -1)
clf = KNN()
clf.fit(X_flat)
is_anomaly = clf.predict(X_flat) # 0 for normal, 1 for anomaly
anomaly_scores = clf.decision_scores_
anomalies = pd.DataFrame({
'Timestamp': timestamps,
'Is_Anomaly': is_anomaly,
'Anomaly_Score': anomaly_scores,
'electricity': y.flatten()
}).set_index('Timestamp')
return anomalies
def plotly_anomaly(anomalies):
anomaly_filter = anomalies['Is_Anomaly'] == 1 # This should be a boolean series
fig = go.Figure()
fig.add_trace(go.Scatter(x=anomalies.index, y=anomalies['electricity'], mode='lines', name='Normal'))
fig.add_trace(go.Scatter(x=anomalies[anomaly_filter].index, y=anomalies[anomaly_filter]['electricity'], mode='markers', name='Anomaly', marker=dict(color='red')))
fig.update_layout(
title='Time Series of Energy Usage with Anomalies Highlighted',
xaxis_title='Time',
yaxis_title='Energy Usage',
legend_title='Legend'
)
return fig