Sentinel-MLOps-Agent / anomaly_detector.py
EATosin's picture
Upload 2 files
435d673 verified
import numpy as np
from collections import deque
class AnomalyDetector:
def __init__(self, window_size=20, threshold=2.5):
"""
Physics-based Anomaly Detection.
We keep a 'window' of recent data to establish a baseline.
If a new data point deviates by > 2.5 Standard Deviations (Z-Score),
we flag it as an anomaly.
:param window_size: How many recent data points to remember.
:param threshold: The Z-Score limit (Physics standard is usually 3-sigma, we use 2.5).
"""
self.window_size = window_size
self.threshold = threshold
# A deque is like a list that automatically drops old data when full
self.data_window = deque(maxlen=window_size)
def update(self, value):
"""
Ingest new data, update statistics, and check for drift.
Returns: (is_anomaly, message, current_z_score)
"""
# 1. Add new value to history
self.data_window.append(value)
# 2. Need enough data to establish a baseline (at least 5 points)
if len(self.data_window) < 5:
return False, "Initializing baseline...", 0.0
# 3. Calculate Physics Metrics (Mean & Standard Deviation)
mean = np.mean(self.data_window)
std_dev = np.std(self.data_window)
# Avoid division by zero if flatline
if std_dev == 0:
return False, "Stable", 0.0
# 4. Calculate Z-Score (How far is this point from the 'Normal'?)
z_score = (value - mean) / std_dev
# 5. Check Threshold
if abs(z_score) > self.threshold:
return True, f"CRITICAL: Anomaly Detected! Value {value} is {z_score:.2f}σ deviation.", z_score
return False, "Normal", z_score
# --- Quick Test Block (Runs only if you play this file directly) ---
if __name__ == "__main__":
print("🔬 Initializing Sentinel Detector...")
detector = AnomalyDetector()
# 1. Simulate Normal Traffic (Values around 50)
normal_data = [50, 52, 49, 51, 50, 48, 53, 50, 51, 49]
for val in normal_data:
detector.update(val)
print(f"Input: {val} | Status: OK")
# 2. Simulate a SUDDEN SPIKE (Anomaly)
spike = 120 # Massive jump
is_anomaly, msg, z = detector.update(spike)
print(f"\n⚠️ Input: {spike} | {msg}")