Spaces:
Sleeping
Sleeping
File size: 4,423 Bytes
81e328b 8a428a8 81e328b 8a428a8 81e328b 8a428a8 81e328b 8a428a8 81e328b 8a428a8 81e328b 8a428a8 81e328b 8a428a8 81e328b 01f8cd5 81e328b 01f8cd5 81e328b 01f8cd5 81e328b 8a428a8 81e328b 8a428a8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | """
Per-task deterministic graders for the PLL Cyberattack Detection OpenEnv.
Each grader takes an episode history and returns a score in [0.0, 1.0].
Graders are deterministic given the same episode data.
"""
from typing import List, Dict, Any, Optional
def grade_task_easy(history: List[Dict[str, Any]], attack_start_step: int) -> float:
"""
Task 1 — Sinusoidal FDI Detection (Easy).
Grader logic (relative to attack onset):
delay = first_correct_detection_step - attack_start_step
if delay <= 20: score = 1.0
elif delay <= 100: score = linear decay from 1.0 to 0.5
elif delay <= 420: score = 0.2
else (never detected): score = 0.0
"""
first_correct_detection_step = None
for entry in history:
step = entry["step"]
attack_active = entry["attack_active"]
attack_detected = entry["attack_detected"]
if attack_active and attack_detected:
first_correct_detection_step = step
break
if first_correct_detection_step is None:
return 0.01
delay = first_correct_detection_step - attack_start_step
if delay <= 20:
score = 0.99
elif delay <= 100:
# Linear decay from 0.99 at delay=20 to 0.5 at delay=100
score = 0.99 - 0.49 * (delay - 20) / 80.0
elif delay <= 420:
score = 0.2
else:
score = 0.01
return max(0.01, min(0.99, score))
def grade_task_medium(history: List[Dict[str, Any]], attack_start_step: int) -> float:
"""
Task 2 — Multi-Attack Classification (Medium).
Grader logic:
base_score = fraction of steps (after attack_start) where attack_type is correctly classified
early_bonus = 0.4 * max(0, 1 - first_correct_classification_step / 100)
score = min(1.0, base_score * 0.6 + early_bonus)
"""
steps_after_attack = 0
correct_classifications = 0
first_correct_classification_step = None
for entry in history:
step = entry["step"]
if step < attack_start_step:
continue
steps_after_attack += 1
true_type = entry["true_attack_type"]
agent_type = entry["agent_attack_type"]
if agent_type == true_type:
correct_classifications += 1
if first_correct_classification_step is None:
first_correct_classification_step = step
if steps_after_attack == 0:
return 0.01
base_score = correct_classifications / steps_after_attack
if first_correct_classification_step is not None:
early_bonus = 0.4 * max(0.0, 1.0 - first_correct_classification_step / 100.0)
else:
early_bonus = 0.0
score = min(0.99, base_score * 0.6 + early_bonus)
return max(0.01, score)
def grade_task_hard(
history: List[Dict[str, Any]],
loss_of_lock_step: Optional[int],
attack_start_step: int,
) -> float:
"""
Task 3 — Stealthy Low-and-Slow Attack (Hard).
Grader logic:
if detected before loss_of_lock_step:
score = 1.0 * (1 - first_detection_step / loss_of_lock_step)
elif detected after loss_of_lock but before episode end:
score = 0.3
else (never detected):
score = 0.0
false_alarm_penalty = 0.2 per false alarm before attack starts
(capped at reducing score to 0.0 minimum)
"""
first_detection_step = None
false_alarm_count = 0
for entry in history:
step = entry["step"]
attack_active = entry["attack_active"]
attack_detected = entry["attack_detected"]
# Only count false alarms before the attack starts
if attack_detected and not attack_active and step < attack_start_step:
false_alarm_count += 1
if attack_detected and attack_active and first_detection_step is None:
first_detection_step = step
# Compute base score
if first_detection_step is None:
score = 0.0
elif loss_of_lock_step is not None and first_detection_step < loss_of_lock_step:
score = 1.0 * (1.0 - first_detection_step / loss_of_lock_step)
elif loss_of_lock_step is not None and first_detection_step >= loss_of_lock_step:
score = 0.3
else:
# No loss of lock occurred but attack was detected
score = 0.3
# Apply false alarm penalty
penalty = 0.2 * false_alarm_count
score = max(0.01, score - penalty)
return max(0.01, min(0.99, score))
|