Spaces:
Sleeping
Sleeping
File size: 2,358 Bytes
435d673 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 | import numpy as np
from collections import deque
class AnomalyDetector:
def __init__(self, window_size=20, threshold=2.5):
"""
Physics-based Anomaly Detection.
We keep a 'window' of recent data to establish a baseline.
If a new data point deviates by > 2.5 Standard Deviations (Z-Score),
we flag it as an anomaly.
:param window_size: How many recent data points to remember.
:param threshold: The Z-Score limit (Physics standard is usually 3-sigma, we use 2.5).
"""
self.window_size = window_size
self.threshold = threshold
# A deque is like a list that automatically drops old data when full
self.data_window = deque(maxlen=window_size)
def update(self, value):
"""
Ingest new data, update statistics, and check for drift.
Returns: (is_anomaly, message, current_z_score)
"""
# 1. Add new value to history
self.data_window.append(value)
# 2. Need enough data to establish a baseline (at least 5 points)
if len(self.data_window) < 5:
return False, "Initializing baseline...", 0.0
# 3. Calculate Physics Metrics (Mean & Standard Deviation)
mean = np.mean(self.data_window)
std_dev = np.std(self.data_window)
# Avoid division by zero if flatline
if std_dev == 0:
return False, "Stable", 0.0
# 4. Calculate Z-Score (How far is this point from the 'Normal'?)
z_score = (value - mean) / std_dev
# 5. Check Threshold
if abs(z_score) > self.threshold:
return True, f"CRITICAL: Anomaly Detected! Value {value} is {z_score:.2f}σ deviation.", z_score
return False, "Normal", z_score
# --- Quick Test Block (Runs only if you play this file directly) ---
if __name__ == "__main__":
print("🔬 Initializing Sentinel Detector...")
detector = AnomalyDetector()
# 1. Simulate Normal Traffic (Values around 50)
normal_data = [50, 52, 49, 51, 50, 48, 53, 50, 51, 49]
for val in normal_data:
detector.update(val)
print(f"Input: {val} | Status: OK")
# 2. Simulate a SUDDEN SPIKE (Anomaly)
spike = 120 # Massive jump
is_anomaly, msg, z = detector.update(spike)
print(f"\n⚠️ Input: {spike} | {msg}") |