File size: 4,925 Bytes
20bc5e4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""
Adaptive Physics-informed cyberattack detector for the PLL OpenEnv.

Uses residual-based and pattern-based features derived from the
observation windows to detect, classify, and recommend protective
actions. The detector builds a baseline from the first 20 observations
(warmup window) of the episode to normalize the features individually.
"""

import numpy as np
from typing import Dict, Any

from src.models import Observation


class AdaptiveDetector:
    def __init__(self):
        # Baseline collections
        self.r1_history = []
        self.r3_history = []
        self.r4_history = []
        self.r5_history = []

        # Calibrated statistics
        self.mean_R1 = 0.0
        self.std_R1 = 1e-6
        self.mean_R3 = 0.0
        self.std_R3 = 1e-6
        self.mean_R4 = 0.0
        self.std_R4 = 1e-6
        self.mean_R5 = 0.0
        self.std_R5 = 1e-6

        self.is_calibrated = False

    def detect(self, observation) -> Dict[str, Any]:
        """
        Run physics-informed anomaly detection on the current observation.
        """
        vq = np.array(observation.vq_window, dtype=np.float64)
        vd = np.array(observation.vd_window, dtype=np.float64)
        omega = np.array(observation.omega_window, dtype=np.float64)
        omega_dev = np.array(observation.omega_deviation_window, dtype=np.float64)
        va, vb, vc = observation.raw_voltages

        # ---- Step 1: Feature Extraction --------------------------------
        vq_mean = float(np.mean(np.abs(vq)))
        vd_mean = float(np.mean(np.abs(vd)))
        vq_ratio = vq_mean / (vd_mean + 1e-6)

        omega_var = float(np.var(omega))
        omega_dev_var = float(np.var(omega_dev))
        vd_var = float(np.var(vd))
        
        abs_v_sum = abs(va) + abs(vb) + abs(vc) + 1e-6
        symmetry_ratio = float(abs(va + vb + vc) / abs_v_sum)

        vq_diff = np.diff(vq) if len(vq) > 1 else np.array([0.0])
        vq_trend = float(np.mean(vq_diff))
        vq_spike = float(np.max(np.abs(vq)))
        vq_drift = float(np.sum(vq))

        step = observation.step

        # ---- Step 2: Baseline Calibration ------------------------------
        if step < 20:
            self.r1_history.append(vq_ratio)
            self.r3_history.append(omega_var)
            self.r4_history.append(vd_var)
            self.r5_history.append(symmetry_ratio)
            
            return {
                "attack_detected": False,
                "attack_type": 0,
                "confidence": 0.0,
                "protective_action": 0,
                "score": 0.0,
                "baseline_score": 0.0
            }
        
        if not self.is_calibrated:
            self.mean_R1 = float(np.mean(self.r1_history))
            self.std_R1 = max(float(np.std(self.r1_history)), 1e-6)
            
            self.mean_R3 = float(np.mean(self.r3_history))
            self.std_R3 = max(float(np.std(self.r3_history)), 1e-6)
            
            self.mean_R4 = float(np.mean(self.r4_history))
            self.std_R4 = max(float(np.std(self.r4_history)), 1e-6)
            
            self.mean_R5 = float(np.mean(self.r5_history))
            self.std_R5 = max(float(np.std(self.r5_history)), 1e-6)
            
            self.is_calibrated = True

        # ---- Step 3: Normalized Features ------------------------------
        R1 = (vq_ratio - self.mean_R1) / self.std_R1
        R3 = (omega_var - self.mean_R3) / self.std_R3
        R4 = (vd_var - self.mean_R4) / self.std_R4
        R5 = (symmetry_ratio - self.mean_R5) / self.std_R5

        # ---- Step 4: Score --------------------------------------------
        score = 0.4 * R1 + 0.2 * R3 + 0.2 * R5 + 0.2 * R4

        # ---- Step 5: Detection ----------------------------------------
        attack_detected = score > 5.0
        confidence = min(1.0, score / 5.0) if attack_detected else 0.0

        # ---- Step 6: Classification -----------------------------------
        if not attack_detected:
            attack_type = 0
        else:
            if R3 > 2:
                attack_type = 1  # sinusoidal
            elif abs(vq_trend) > 0.01:
                attack_type = 2  # ramp
            elif vq_spike > 0.1:
                attack_type = 3  # pulse
            else:
                attack_type = 4  # stealthy

        # ---- Step 7: Protective Action --------------------------------
        if score > 6:
            protective_action = 3
        elif score > 3:
            protective_action = 2
        else:
            protective_action = 1
            if not attack_detected:
                protective_action = 0

        return {
            "attack_detected": bool(attack_detected),
            "attack_type": int(attack_type),
            "confidence": float(confidence),
            "protective_action": int(protective_action),
            "score": float(score),
            "baseline_score": 0.0
        }