Spaces:
Sleeping
Sleeping
| import numpy as np | |
| from collections import deque | |
| class AnomalyDetector: | |
| def __init__(self, window_size=20, threshold=2.5): | |
| """ | |
| Physics-based Anomaly Detection. | |
| We keep a 'window' of recent data to establish a baseline. | |
| If a new data point deviates by > 2.5 Standard Deviations (Z-Score), | |
| we flag it as an anomaly. | |
| :param window_size: How many recent data points to remember. | |
| :param threshold: The Z-Score limit (Physics standard is usually 3-sigma, we use 2.5). | |
| """ | |
| self.window_size = window_size | |
| self.threshold = threshold | |
| # A deque is like a list that automatically drops old data when full | |
| self.data_window = deque(maxlen=window_size) | |
| def update(self, value): | |
| """ | |
| Ingest new data, update statistics, and check for drift. | |
| Returns: (is_anomaly, message, current_z_score) | |
| """ | |
| # 1. Add new value to history | |
| self.data_window.append(value) | |
| # 2. Need enough data to establish a baseline (at least 5 points) | |
| if len(self.data_window) < 5: | |
| return False, "Initializing baseline...", 0.0 | |
| # 3. Calculate Physics Metrics (Mean & Standard Deviation) | |
| mean = np.mean(self.data_window) | |
| std_dev = np.std(self.data_window) | |
| # Avoid division by zero if flatline | |
| if std_dev == 0: | |
| return False, "Stable", 0.0 | |
| # 4. Calculate Z-Score (How far is this point from the 'Normal'?) | |
| z_score = (value - mean) / std_dev | |
| # 5. Check Threshold | |
| if abs(z_score) > self.threshold: | |
| return True, f"CRITICAL: Anomaly Detected! Value {value} is {z_score:.2f}σ deviation.", z_score | |
| return False, "Normal", z_score | |
| # --- Quick Test Block (Runs only if you play this file directly) --- | |
| if __name__ == "__main__": | |
| print("🔬 Initializing Sentinel Detector...") | |
| detector = AnomalyDetector() | |
| # 1. Simulate Normal Traffic (Values around 50) | |
| normal_data = [50, 52, 49, 51, 50, 48, 53, 50, 51, 49] | |
| for val in normal_data: | |
| detector.update(val) | |
| print(f"Input: {val} | Status: OK") | |
| # 2. Simulate a SUDDEN SPIKE (Anomaly) | |
| spike = 120 # Massive jump | |
| is_anomaly, msg, z = detector.update(spike) | |
| print(f"\n⚠️ Input: {spike} | {msg}") |