EATosin commited on
Commit
435d673
·
verified ·
1 Parent(s): 85efc72

Upload 2 files

Browse files
Files changed (2) hide show
  1. anomaly_detector.py +63 -0
  2. main.py +68 -0
anomaly_detector.py ADDED
@@ -0,0 +1,63 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import numpy as np
2
+ from collections import deque
3
+
4
+ class AnomalyDetector:
5
+ def __init__(self, window_size=20, threshold=2.5):
6
+ """
7
+ Physics-based Anomaly Detection.
8
+ We keep a 'window' of recent data to establish a baseline.
9
+ If a new data point deviates by > 2.5 Standard Deviations (Z-Score),
10
+ we flag it as an anomaly.
11
+
12
+ :param window_size: How many recent data points to remember.
13
+ :param threshold: The Z-Score limit (Physics standard is usually 3-sigma, we use 2.5).
14
+ """
15
+ self.window_size = window_size
16
+ self.threshold = threshold
17
+ # A deque is like a list that automatically drops old data when full
18
+ self.data_window = deque(maxlen=window_size)
19
+
20
+ def update(self, value):
21
+ """
22
+ Ingest new data, update statistics, and check for drift.
23
+ Returns: (is_anomaly, message, current_z_score)
24
+ """
25
+ # 1. Add new value to history
26
+ self.data_window.append(value)
27
+
28
+ # 2. Need enough data to establish a baseline (at least 5 points)
29
+ if len(self.data_window) < 5:
30
+ return False, "Initializing baseline...", 0.0
31
+
32
+ # 3. Calculate Physics Metrics (Mean & Standard Deviation)
33
+ mean = np.mean(self.data_window)
34
+ std_dev = np.std(self.data_window)
35
+
36
+ # Avoid division by zero if flatline
37
+ if std_dev == 0:
38
+ return False, "Stable", 0.0
39
+
40
+ # 4. Calculate Z-Score (How far is this point from the 'Normal'?)
41
+ z_score = (value - mean) / std_dev
42
+
43
+ # 5. Check Threshold
44
+ if abs(z_score) > self.threshold:
45
+ return True, f"CRITICAL: Anomaly Detected! Value {value} is {z_score:.2f}σ deviation.", z_score
46
+
47
+ return False, "Normal", z_score
48
+
49
+ # --- Quick Test Block (Runs only if you play this file directly) ---
50
+ if __name__ == "__main__":
51
+ print("🔬 Initializing Sentinel Detector...")
52
+ detector = AnomalyDetector()
53
+
54
+ # 1. Simulate Normal Traffic (Values around 50)
55
+ normal_data = [50, 52, 49, 51, 50, 48, 53, 50, 51, 49]
56
+ for val in normal_data:
57
+ detector.update(val)
58
+ print(f"Input: {val} | Status: OK")
59
+
60
+ # 2. Simulate a SUDDEN SPIKE (Anomaly)
61
+ spike = 120 # Massive jump
62
+ is_anomaly, msg, z = detector.update(spike)
63
+ print(f"\n⚠️ Input: {spike} | {msg}")
main.py ADDED
@@ -0,0 +1,68 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, HTTPException
2
+ from pydantic import BaseModel
3
+ import uvicorn
4
+ import logging
5
+
6
+ # Import our custom modules
7
+ from anomaly_detector import AnomalyDetector
8
+ from rag_agent import SentinelAgent
9
+
10
+ # 1. Initialize the App
11
+ app = FastAPI(
12
+ title="Sentinel MLOps Agent",
13
+ description="Autonomous Anomaly Detection & RAG Investigation API",
14
+ version="1.0"
15
+ )
16
+
17
+ # 2. Load the Engines
18
+ print("🔋 Starting Sentinel Engines...")
19
+ detector = AnomalyDetector()
20
+ agent = SentinelAgent()
21
+
22
+ # 3. Define the Data Format (Validation)
23
+ class MetricData(BaseModel):
24
+ timestamp: str
25
+ service_name: str
26
+ cpu_usage: float
27
+
28
+ # 4. The API Endpoint
29
+ @app.post("/monitor")
30
+ async def monitor_system(data: MetricData):
31
+ """
32
+ Receives live system data.
33
+ - If Normal: Returns OK.
34
+ - If Anomaly: Triggers Gemini 2.5 to investigate.
35
+ """
36
+ # Step A: Check for Physics/Math Anomaly
37
+ is_anomaly, msg, z_score = detector.update(data.cpu_usage)
38
+
39
+ if not is_anomaly:
40
+ return {
41
+ "status": "Healthy",
42
+ "message": msg,
43
+ "z_score": z_score
44
+ }
45
+
46
+ # Step B: If Anomaly, Wake up the Agent
47
+ print(f"🚨 ALERT: Anomaly on {data.service_name} detected!")
48
+
49
+ report = agent.investigate(
50
+ anomaly_value=data.cpu_usage,
51
+ z_score=z_score
52
+ )
53
+
54
+ # Return the Full Incident Report
55
+ return {
56
+ "status": "CRITICAL",
57
+ "service": data.service_name,
58
+ "deviation": f"{z_score:.2f} sigma",
59
+ "investigation_report": report
60
+ }
61
+
62
+ @app.get("/")
63
+ def home():
64
+ return {"message": "Sentinel AI Agent is Active. Send POST to /monitor"}
65
+
66
+ # 5. Run the Server (Mobile Friendly)
67
+ if __name__ == "__main__":
68
+ uvicorn.run(app, host="0.0.0.0", port=8000)