File size: 2,358 Bytes
435d673
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import numpy as np
from collections import deque

class AnomalyDetector:
    def __init__(self, window_size=20, threshold=2.5):
        """
        Physics-based Anomaly Detection.
        We keep a 'window' of recent data to establish a baseline.
        If a new data point deviates by > 2.5 Standard Deviations (Z-Score),
        we flag it as an anomaly.
        
        :param window_size: How many recent data points to remember.
        :param threshold: The Z-Score limit (Physics standard is usually 3-sigma, we use 2.5).
        """
        self.window_size = window_size
        self.threshold = threshold
        # A deque is like a list that automatically drops old data when full
        self.data_window = deque(maxlen=window_size)

    def update(self, value):
        """
        Ingest new data, update statistics, and check for drift.
        Returns: (is_anomaly, message, current_z_score)
        """
        # 1. Add new value to history
        self.data_window.append(value)
        
        # 2. Need enough data to establish a baseline (at least 5 points)
        if len(self.data_window) < 5:
            return False, "Initializing baseline...", 0.0

        # 3. Calculate Physics Metrics (Mean & Standard Deviation)
        mean = np.mean(self.data_window)
        std_dev = np.std(self.data_window)

        # Avoid division by zero if flatline
        if std_dev == 0:
            return False, "Stable", 0.0

        # 4. Calculate Z-Score (How far is this point from the 'Normal'?)
        z_score = (value - mean) / std_dev

        # 5. Check Threshold
        if abs(z_score) > self.threshold:
            return True, f"CRITICAL: Anomaly Detected! Value {value} is {z_score:.2f}σ deviation.", z_score
        
        return False, "Normal", z_score

# --- Quick Test Block (Runs only if you play this file directly) ---
if __name__ == "__main__":
    print("🔬 Initializing Sentinel Detector...")
    detector = AnomalyDetector()
    
    # 1. Simulate Normal Traffic (Values around 50)
    normal_data = [50, 52, 49, 51, 50, 48, 53, 50, 51, 49]
    for val in normal_data:
        detector.update(val)
        print(f"Input: {val} | Status: OK")

    # 2. Simulate a SUDDEN SPIKE (Anomaly)
    spike = 120 # Massive jump
    is_anomaly, msg, z = detector.update(spike)
    print(f"\n⚠️ Input: {spike} | {msg}")