File size: 4,423 Bytes
81e328b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8a428a8
81e328b
 
 
 
8a428a8
81e328b
8a428a8
 
81e328b
8a428a8
81e328b
8a428a8
 
 
81e328b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8a428a8
81e328b
 
 
 
 
 
 
 
8a428a8
 
81e328b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
01f8cd5
81e328b
 
 
 
 
 
01f8cd5
81e328b
 
 
 
 
 
 
 
 
 
01f8cd5
81e328b
8a428a8
81e328b
8a428a8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
Per-task deterministic graders for the PLL Cyberattack Detection OpenEnv.

Each grader takes an episode history and returns a score in [0.0, 1.0].
Graders are deterministic given the same episode data.
"""

from typing import List, Dict, Any, Optional


def grade_task_easy(history: List[Dict[str, Any]], attack_start_step: int) -> float:
    """
    Task 1 — Sinusoidal FDI Detection (Easy).
    Grader logic (relative to attack onset):
      delay = first_correct_detection_step - attack_start_step
      if delay <= 20:   score = 1.0
      elif delay <= 100: score = linear decay from 1.0 to 0.5
      elif delay <= 420: score = 0.2
      else (never detected): score = 0.0
    """
    first_correct_detection_step = None

    for entry in history:
        step = entry["step"]
        attack_active = entry["attack_active"]
        attack_detected = entry["attack_detected"]

        if attack_active and attack_detected:
            first_correct_detection_step = step
            break

    if first_correct_detection_step is None:
        return 0.01

    delay = first_correct_detection_step - attack_start_step

    if delay <= 20:
        score = 0.99
    elif delay <= 100:
        # Linear decay from 0.99 at delay=20 to 0.5 at delay=100
        score = 0.99 - 0.49 * (delay - 20) / 80.0
    elif delay <= 420:
        score = 0.2
    else:
        score = 0.01

    return max(0.01, min(0.99, score))


def grade_task_medium(history: List[Dict[str, Any]], attack_start_step: int) -> float:
    """
    Task 2 — Multi-Attack Classification (Medium).
    Grader logic:
      base_score = fraction of steps (after attack_start) where attack_type is correctly classified
      early_bonus = 0.4 * max(0, 1 - first_correct_classification_step / 100)
      score = min(1.0, base_score * 0.6 + early_bonus)
    """
    steps_after_attack = 0
    correct_classifications = 0
    first_correct_classification_step = None

    for entry in history:
        step = entry["step"]
        if step < attack_start_step:
            continue

        steps_after_attack += 1
        true_type = entry["true_attack_type"]
        agent_type = entry["agent_attack_type"]

        if agent_type == true_type:
            correct_classifications += 1
            if first_correct_classification_step is None:
                first_correct_classification_step = step

    if steps_after_attack == 0:
        return 0.01

    base_score = correct_classifications / steps_after_attack

    if first_correct_classification_step is not None:
        early_bonus = 0.4 * max(0.0, 1.0 - first_correct_classification_step / 100.0)
    else:
        early_bonus = 0.0

    score = min(0.99, base_score * 0.6 + early_bonus)
    return max(0.01, score)


def grade_task_hard(
    history: List[Dict[str, Any]],
    loss_of_lock_step: Optional[int],
    attack_start_step: int,
) -> float:
    """
    Task 3 — Stealthy Low-and-Slow Attack (Hard).
    Grader logic:
      if detected before loss_of_lock_step:
          score = 1.0 * (1 - first_detection_step / loss_of_lock_step)
      elif detected after loss_of_lock but before episode end:
          score = 0.3
      else (never detected):
          score = 0.0
      false_alarm_penalty = 0.2 per false alarm before attack starts
      (capped at reducing score to 0.0 minimum)
    """
    first_detection_step = None
    false_alarm_count = 0

    for entry in history:
        step = entry["step"]
        attack_active = entry["attack_active"]
        attack_detected = entry["attack_detected"]

        # Only count false alarms before the attack starts
        if attack_detected and not attack_active and step < attack_start_step:
            false_alarm_count += 1

        if attack_detected and attack_active and first_detection_step is None:
            first_detection_step = step

    # Compute base score
    if first_detection_step is None:
        score = 0.0
    elif loss_of_lock_step is not None and first_detection_step < loss_of_lock_step:
        score = 1.0 * (1.0 - first_detection_step / loss_of_lock_step)
    elif loss_of_lock_step is not None and first_detection_step >= loss_of_lock_step:
        score = 0.3
    else:
        # No loss of lock occurred but attack was detected
        score = 0.3

    # Apply false alarm penalty
    penalty = 0.2 * false_alarm_count
    score = max(0.01, score - penalty)

    return max(0.01, min(0.99, score))