ApoorvaBadoni commited on
Commit
f6e1c5d
·
1 Parent(s): 2a82d53

Fix: clamp grader scores to strictly (0.01, 0.99)

Browse files
Files changed (1) hide show
  1. tasks.py +34 -102
tasks.py CHANGED
@@ -1,48 +1,23 @@
1
- # Copyright (c) Meta Platforms, Inc. and affiliates.
2
- # All rights reserved.
3
- #
4
- # This source code is licensed under the BSD-style license found in the
5
- # LICENSE file in the root directory of this source tree.
6
-
7
- """
8
- SOC Environment — Task Definitions with Agent Graders.
9
-
10
- Each task is a concrete, named objective with:
11
- - A fixed scenario (deterministic — same seed every run)
12
- - A grader function that scores agent performance 0.0–1.0
13
- - Clear success/failure criteria
14
-
15
- Tasks:
16
- task_easy — Login anomaly triage (false positive detection)
17
- task_medium — Insider threat response (multi-step containment)
18
- task_hard — APT lateral movement response (chained reasoning)
19
- """
20
-
21
  from dataclasses import dataclass
22
- from typing import Any, Dict, List, Optional
23
 
24
 
25
  @dataclass
26
  class TaskResult:
27
  task_name: str
28
- score: float # 0.0–1.0
29
  steps_taken: int
30
  actions_taken: List[str]
31
  success: bool
32
  reason: str
33
 
34
 
35
- # ─────────────────────────────────────────────────────────────────────────────
36
- # TASK DEFINITIONS
37
- # Each task pins a specific scenario ID (deterministic) and defines its grader.
38
- # ─────────────────────────────────────────────────────────────────────────────
39
-
40
  TASKS = {
41
  "task_easy": {
42
  "name": "task_easy",
43
  "display_name": "Login Anomaly Triage",
44
  "difficulty": "easy",
45
- "scenario_id": "easy_false_positive_vpn", # pinned — always this scenario
46
  "description": (
47
  "A low-severity login alert has fired for an employee logging in "
48
  "from an unusual location. The agent must correctly identify this "
@@ -56,7 +31,7 @@ TASKS = {
56
  "name": "task_medium",
57
  "display_name": "Insider Threat Containment",
58
  "difficulty": "medium",
59
- "scenario_id": "medium_insider_threat", # pinned
60
  "description": (
61
  "A departing employee is actively exfiltrating confidential files "
62
  "on their final working days. The agent must investigate, contain "
@@ -69,7 +44,7 @@ TASKS = {
69
  "name": "task_hard",
70
  "display_name": "APT Lateral Movement Response",
71
  "difficulty": "hard",
72
- "scenario_id": "hard_apt_lateral_movement", # pinned
73
  "description": (
74
  "An Advanced Persistent Threat (APT) is actively moving laterally "
75
  "through the network using Cobalt Strike. The agent must identify "
@@ -83,51 +58,41 @@ TASKS = {
83
  }
84
 
85
 
86
- # ─────────────────────────────────────────────────────────────────────────────
87
- # GRADERS — deterministic, 0.0–1.0
88
- # ─────────────────────────────────────────────────────────────────────────────
 
89
 
90
  def grade_task_easy(actions: List[str], final_score: float, done: bool) -> TaskResult:
91
- """
92
- Grader for task_easy (false-positive VPN login).
93
-
94
- Scoring:
95
- 1.0 — investigated then ignored (perfect)
96
- 0.8 — ignored without investigating (correct but skipped diligence)
97
- 0.4 — investigated but did NOT ignore (missed false positive)
98
- 0.0 — took a destructive terminal action (block_account, isolate_device, escalate)
99
- linear deduction for extra steps
100
- """
101
  investigated = "investigate" in actions
102
  ignored = "ignore" in actions
103
  overreacted = any(a in actions for a in ["block_account", "isolate_device", "escalate", "block_ip"])
104
 
105
  if overreacted:
106
- score = 0.0
107
  success = False
108
  reason = "Over-reacted to a false positive — disrupted a legitimate user."
109
  elif ignored and investigated:
110
- # Perfect path — penalise slightly for extra steps
111
  extra = max(0, len(actions) - 2)
112
- score = max(0.7, 1.0 - extra * 0.05)
113
  success = True
114
  reason = f"Correctly identified false positive after investigation. Steps: {len(actions)}"
115
  elif ignored:
116
- score = 0.8
117
  success = True
118
  reason = "Correctly ignored false positive (skipped investigation step)."
119
  elif investigated:
120
- score = 0.4
121
  success = False
122
  reason = "Investigated but failed to conclude this was a false positive."
123
  else:
124
- score = 0.1 * len(actions) if actions else 0.0
125
  success = False
126
  reason = "Did not reach a conclusion."
127
 
128
  return TaskResult(
129
  task_name="task_easy",
130
- score=round(min(1.0, max(0.0, score)), 2),
131
  steps_taken=len(actions),
132
  actions_taken=actions,
133
  success=success,
@@ -136,46 +101,31 @@ def grade_task_easy(actions: List[str], final_score: float, done: bool) -> TaskR
136
 
137
 
138
  def grade_task_medium(actions: List[str], final_score: float, done: bool) -> TaskResult:
139
- """
140
- Grader for task_medium (insider threat).
141
-
142
- Ideal sequence: investigate → block_account → collect_forensics → escalate
143
-
144
- Scoring rubric:
145
- investigate present : +0.20
146
- block_account present : +0.25
147
- collect_forensics present : +0.20
148
- escalate as terminal : +0.25
149
- correct ordering bonus : +0.10
150
- step efficiency deduction : −0.05 per wasted step (beyond 6)
151
- """
152
  score = 0.0
153
  investigated = "investigate" in actions
154
  blocked = "block_account" in actions
155
  forensics = "collect_forensics" in actions
156
  escalated = "escalate" in actions
157
 
158
- if investigated: score += 0.20
159
- if blocked: score += 0.25
160
- if forensics: score += 0.20
161
- if escalated: score += 0.25
162
 
163
- # Ordering bonus: investigate before block before escalate
164
  try:
165
  idx_inv = actions.index("investigate")
166
  idx_blk = actions.index("block_account")
167
  idx_esc = actions.index("escalate")
168
  if idx_inv < idx_blk < idx_esc:
169
- score += 0.10
170
  except ValueError:
171
  pass
172
 
173
- # Efficiency deduction
174
  extra = max(0, len(actions) - 6)
175
  score -= extra * 0.05
176
 
177
- score = round(min(1.0, max(0.0, score)), 2)
178
- success = score >= 0.7
179
 
180
  reason_parts = []
181
  if not investigated: reason_parts.append("missing investigation")
@@ -199,56 +149,40 @@ def grade_task_medium(actions: List[str], final_score: float, done: bool) -> Tas
199
 
200
 
201
  def grade_task_hard(actions: List[str], final_score: float, done: bool) -> TaskResult:
202
- """
203
- Grader for task_hard (APT lateral movement).
204
-
205
- Ideal: investigate → isolate_device → block_ip → collect_forensics → escalate
206
-
207
- Scoring:
208
- investigate : +0.15
209
- isolate_device : +0.20
210
- block_ip : +0.20
211
- collect_forensics : +0.20
212
- escalate : +0.15
213
- order bonus (all 5 in correct relative order) : +0.10
214
- step efficiency (> 8 steps) : −0.04 per extra step
215
- penalty for ignoring a critical threat : −0.5 (floor 0)
216
- """
217
- score = 0.0
218
- investigated = "investigate" in actions
219
- isolated = "isolate_device" in actions
220
- blocked_ip = "block_ip" in actions
221
- forensics = "collect_forensics" in actions
222
- escalated = "escalate" in actions
223
- ignored = "ignore" in actions
224
-
225
  if ignored:
226
  return TaskResult(
227
  task_name="task_hard",
228
- score=0.0,
229
  steps_taken=len(actions),
230
  actions_taken=actions,
231
  success=False,
232
  reason="Critical APT incident ignored — catastrophic failure.",
233
  )
234
 
 
 
 
 
 
 
 
235
  if investigated: score += 0.15
236
  if isolated: score += 0.20
237
  if blocked_ip: score += 0.20
238
  if forensics: score += 0.20
239
  if escalated: score += 0.15
240
 
241
- # Order bonus
242
  key_actions = ["investigate", "isolate_device", "block_ip", "collect_forensics", "escalate"]
243
  present_in_order = [a for a in actions if a in key_actions]
244
  expected_order = [a for a in key_actions if a in actions]
245
  if present_in_order == expected_order and len(expected_order) == 5:
246
- score += 0.10
247
 
248
  extra = max(0, len(actions) - 8)
249
  score -= extra * 0.04
250
 
251
- score = round(min(1.0, max(0.0, score)), 2)
252
  success = score >= 0.65
253
 
254
  missing = [a for a in key_actions if a not in actions]
@@ -267,7 +201,6 @@ def grade_task_hard(actions: List[str], final_score: float, done: bool) -> TaskR
267
  )
268
 
269
 
270
- # Grader registry
271
  GRADERS = {
272
  "task_easy": grade_task_easy,
273
  "task_medium": grade_task_medium,
@@ -276,7 +209,6 @@ GRADERS = {
276
 
277
 
278
  def run_grader(task_name: str, actions: List[str], final_score: float, done: bool) -> TaskResult:
279
- """Run the grader for a named task and return a TaskResult."""
280
  if task_name not in GRADERS:
281
  raise ValueError(f"Unknown task: {task_name}. Available: {list(GRADERS.keys())}")
282
- return GRADERS[task_name](actions, final_score, done)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  from dataclasses import dataclass
2
+ from typing import List
3
 
4
 
5
  @dataclass
6
  class TaskResult:
7
  task_name: str
8
+ score: float
9
  steps_taken: int
10
  actions_taken: List[str]
11
  success: bool
12
  reason: str
13
 
14
 
 
 
 
 
 
15
  TASKS = {
16
  "task_easy": {
17
  "name": "task_easy",
18
  "display_name": "Login Anomaly Triage",
19
  "difficulty": "easy",
20
+ "scenario_id": "easy_false_positive_vpn",
21
  "description": (
22
  "A low-severity login alert has fired for an employee logging in "
23
  "from an unusual location. The agent must correctly identify this "
 
31
  "name": "task_medium",
32
  "display_name": "Insider Threat Containment",
33
  "difficulty": "medium",
34
+ "scenario_id": "medium_insider_threat",
35
  "description": (
36
  "A departing employee is actively exfiltrating confidential files "
37
  "on their final working days. The agent must investigate, contain "
 
44
  "name": "task_hard",
45
  "display_name": "APT Lateral Movement Response",
46
  "difficulty": "hard",
47
+ "scenario_id": "hard_apt_lateral_movement",
48
  "description": (
49
  "An Advanced Persistent Threat (APT) is actively moving laterally "
50
  "through the network using Cobalt Strike. The agent must identify "
 
58
  }
59
 
60
 
61
+ def _clamp(score: float) -> float:
62
+ """Clamp score to strictly open interval (0.01, 0.99)."""
63
+ return round(min(0.99, max(0.01, score)), 2)
64
+
65
 
66
  def grade_task_easy(actions: List[str], final_score: float, done: bool) -> TaskResult:
 
 
 
 
 
 
 
 
 
 
67
  investigated = "investigate" in actions
68
  ignored = "ignore" in actions
69
  overreacted = any(a in actions for a in ["block_account", "isolate_device", "escalate", "block_ip"])
70
 
71
  if overreacted:
72
+ score = 0.02
73
  success = False
74
  reason = "Over-reacted to a false positive — disrupted a legitimate user."
75
  elif ignored and investigated:
 
76
  extra = max(0, len(actions) - 2)
77
+ score = max(0.70, 0.98 - extra * 0.05)
78
  success = True
79
  reason = f"Correctly identified false positive after investigation. Steps: {len(actions)}"
80
  elif ignored:
81
+ score = 0.78
82
  success = True
83
  reason = "Correctly ignored false positive (skipped investigation step)."
84
  elif investigated:
85
+ score = 0.40
86
  success = False
87
  reason = "Investigated but failed to conclude this was a false positive."
88
  else:
89
+ score = max(0.01, 0.10 * len(actions)) if actions else 0.01
90
  success = False
91
  reason = "Did not reach a conclusion."
92
 
93
  return TaskResult(
94
  task_name="task_easy",
95
+ score=_clamp(score),
96
  steps_taken=len(actions),
97
  actions_taken=actions,
98
  success=success,
 
101
 
102
 
103
  def grade_task_medium(actions: List[str], final_score: float, done: bool) -> TaskResult:
 
 
 
 
 
 
 
 
 
 
 
 
 
104
  score = 0.0
105
  investigated = "investigate" in actions
106
  blocked = "block_account" in actions
107
  forensics = "collect_forensics" in actions
108
  escalated = "escalate" in actions
109
 
110
+ if investigated: score += 0.20
111
+ if blocked: score += 0.25
112
+ if forensics: score += 0.20
113
+ if escalated: score += 0.25
114
 
 
115
  try:
116
  idx_inv = actions.index("investigate")
117
  idx_blk = actions.index("block_account")
118
  idx_esc = actions.index("escalate")
119
  if idx_inv < idx_blk < idx_esc:
120
+ score += 0.05
121
  except ValueError:
122
  pass
123
 
 
124
  extra = max(0, len(actions) - 6)
125
  score -= extra * 0.05
126
 
127
+ score = _clamp(score)
128
+ success = score >= 0.70
129
 
130
  reason_parts = []
131
  if not investigated: reason_parts.append("missing investigation")
 
149
 
150
 
151
  def grade_task_hard(actions: List[str], final_score: float, done: bool) -> TaskResult:
152
+ ignored = "ignore" in actions
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
153
  if ignored:
154
  return TaskResult(
155
  task_name="task_hard",
156
+ score=0.01,
157
  steps_taken=len(actions),
158
  actions_taken=actions,
159
  success=False,
160
  reason="Critical APT incident ignored — catastrophic failure.",
161
  )
162
 
163
+ score = 0.0
164
+ investigated = "investigate" in actions
165
+ isolated = "isolate_device" in actions
166
+ blocked_ip = "block_ip" in actions
167
+ forensics = "collect_forensics" in actions
168
+ escalated = "escalate" in actions
169
+
170
  if investigated: score += 0.15
171
  if isolated: score += 0.20
172
  if blocked_ip: score += 0.20
173
  if forensics: score += 0.20
174
  if escalated: score += 0.15
175
 
 
176
  key_actions = ["investigate", "isolate_device", "block_ip", "collect_forensics", "escalate"]
177
  present_in_order = [a for a in actions if a in key_actions]
178
  expected_order = [a for a in key_actions if a in actions]
179
  if present_in_order == expected_order and len(expected_order) == 5:
180
+ score += 0.05
181
 
182
  extra = max(0, len(actions) - 8)
183
  score -= extra * 0.04
184
 
185
+ score = _clamp(score)
186
  success = score >= 0.65
187
 
188
  missing = [a for a in key_actions if a not in actions]
 
201
  )
202
 
203
 
 
204
  GRADERS = {
205
  "task_easy": grade_task_easy,
206
  "task_medium": grade_task_medium,
 
209
 
210
 
211
  def run_grader(task_name: str, actions: List[str], final_score: float, done: bool) -> TaskResult:
 
212
  if task_name not in GRADERS:
213
  raise ValueError(f"Unknown task: {task_name}. Available: {list(GRADERS.keys())}")
214
+ return GRADERS[task_name](actions, final_score, done)