File size: 7,405 Bytes
4c8f878
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f6e1c5d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
from dataclasses import dataclass
from typing import List


@dataclass
class TaskResult:
    task_name: str
    score: float
    steps_taken: int
    actions_taken: List[str]
    success: bool
    reason: str


TASKS = {
    "task_easy": {
        "name": "task_easy",
        "display_name": "Login Anomaly Triage",
        "difficulty": "easy",
        "scenario_id": "easy_false_positive_vpn",
        "description": (
            "A low-severity login alert has fired for an employee logging in "
            "from an unusual location. The agent must correctly identify this "
            "as a false positive (VPN usage) and choose to ignore it β€” without "
            "over-reacting and disrupting a legitimate user."
        ),
        "max_steps": 5,
        "success_criteria": "Agent correctly ignores the false-positive alert.",
    },
    "task_medium": {
        "name": "task_medium",
        "display_name": "Insider Threat Containment",
        "difficulty": "medium",
        "scenario_id": "medium_insider_threat",
        "description": (
            "A departing employee is actively exfiltrating confidential files "
            "on their final working days. The agent must investigate, contain "
            "the account, preserve evidence, and escalate β€” in a logical order."
        ),
        "max_steps": 8,
        "success_criteria": "Agent escalates after investigating and blocking the account.",
    },
    "task_hard": {
        "name": "task_hard",
        "display_name": "APT Lateral Movement Response",
        "difficulty": "hard",
        "scenario_id": "hard_apt_lateral_movement",
        "description": (
            "An Advanced Persistent Threat (APT) is actively moving laterally "
            "through the network using Cobalt Strike. The agent must identify "
            "the attack chain, isolate compromised hosts, block C2 communication, "
            "collect forensic evidence, and escalate to Tier-2 β€” all within "
            "12 steps on a noisy, high-stakes incident."
        ),
        "max_steps": 12,
        "success_criteria": "Agent isolates device, blocks IP, collects forensics, and escalates.",
    },
}


def _clamp(score: float) -> float:
    """Clamp score to strictly open interval (0.01, 0.99)."""
    return round(min(0.99, max(0.01, score)), 2)


def grade_task_easy(actions: List[str], final_score: float, done: bool) -> TaskResult:
    investigated = "investigate" in actions
    ignored = "ignore" in actions
    overreacted = any(a in actions for a in ["block_account", "isolate_device", "escalate", "block_ip"])

    if overreacted:
        score = 0.02
        success = False
        reason = "Over-reacted to a false positive β€” disrupted a legitimate user."
    elif ignored and investigated:
        extra = max(0, len(actions) - 2)
        score = max(0.70, 0.98 - extra * 0.05)
        success = True
        reason = f"Correctly identified false positive after investigation. Steps: {len(actions)}"
    elif ignored:
        score = 0.78
        success = True
        reason = "Correctly ignored false positive (skipped investigation step)."
    elif investigated:
        score = 0.40
        success = False
        reason = "Investigated but failed to conclude this was a false positive."
    else:
        score = max(0.01, 0.10 * len(actions)) if actions else 0.01
        success = False
        reason = "Did not reach a conclusion."

    return TaskResult(
        task_name="task_easy",
        score=_clamp(score),
        steps_taken=len(actions),
        actions_taken=actions,
        success=success,
        reason=reason,
    )


def grade_task_medium(actions: List[str], final_score: float, done: bool) -> TaskResult:
    score = 0.0
    investigated = "investigate" in actions
    blocked = "block_account" in actions
    forensics = "collect_forensics" in actions
    escalated = "escalate" in actions

    if investigated:  score += 0.20
    if blocked:       score += 0.25
    if forensics:     score += 0.20
    if escalated:     score += 0.25

    try:
        idx_inv = actions.index("investigate")
        idx_blk = actions.index("block_account")
        idx_esc = actions.index("escalate")
        if idx_inv < idx_blk < idx_esc:
            score += 0.05
    except ValueError:
        pass

    extra = max(0, len(actions) - 6)
    score -= extra * 0.05

    score = _clamp(score)
    success = score >= 0.70

    reason_parts = []
    if not investigated: reason_parts.append("missing investigation")
    if not blocked:      reason_parts.append("account not blocked")
    if not forensics:    reason_parts.append("no forensics collected")
    if not escalated:    reason_parts.append("not escalated")

    reason = (
        f"Score {score:.2f}. " +
        (f"Missing: {', '.join(reason_parts)}." if reason_parts else "All key actions taken.")
    )

    return TaskResult(
        task_name="task_medium",
        score=score,
        steps_taken=len(actions),
        actions_taken=actions,
        success=success,
        reason=reason,
    )


def grade_task_hard(actions: List[str], final_score: float, done: bool) -> TaskResult:
    ignored = "ignore" in actions
    if ignored:
        return TaskResult(
            task_name="task_hard",
            score=0.01,
            steps_taken=len(actions),
            actions_taken=actions,
            success=False,
            reason="Critical APT incident ignored β€” catastrophic failure.",
        )

    score = 0.0
    investigated  = "investigate"       in actions
    isolated      = "isolate_device"    in actions
    blocked_ip    = "block_ip"          in actions
    forensics     = "collect_forensics" in actions
    escalated     = "escalate"          in actions

    if investigated:  score += 0.15
    if isolated:      score += 0.20
    if blocked_ip:    score += 0.20
    if forensics:     score += 0.20
    if escalated:     score += 0.15

    key_actions = ["investigate", "isolate_device", "block_ip", "collect_forensics", "escalate"]
    present_in_order = [a for a in actions if a in key_actions]
    expected_order = [a for a in key_actions if a in actions]
    if present_in_order == expected_order and len(expected_order) == 5:
        score += 0.05

    extra = max(0, len(actions) - 8)
    score -= extra * 0.04

    score = _clamp(score)
    success = score >= 0.65

    missing = [a for a in key_actions if a not in actions]
    reason = (
        f"Score {score:.2f}. " +
        (f"Missing key actions: {', '.join(missing)}." if missing else "All critical actions taken.")
    )

    return TaskResult(
        task_name="task_hard",
        score=score,
        steps_taken=len(actions),
        actions_taken=actions,
        success=success,
        reason=reason,
    )


GRADERS = {
    "task_easy":   grade_task_easy,
    "task_medium": grade_task_medium,
    "task_hard":   grade_task_hard,
}


def run_grader(task_name: str, actions: List[str], final_score: float, done: bool) -> TaskResult:
    if task_name not in GRADERS:
        raise ValueError(f"Unknown task: {task_name}. Available: {list(GRADERS.keys())}")
    return GRADERS[task_name](actions, final_score, done)