Spaces:
Sleeping
Sleeping
File size: 7,405 Bytes
4c8f878 f6e1c5d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 | from dataclasses import dataclass
from typing import List
@dataclass
class TaskResult:
task_name: str
score: float
steps_taken: int
actions_taken: List[str]
success: bool
reason: str
TASKS = {
"task_easy": {
"name": "task_easy",
"display_name": "Login Anomaly Triage",
"difficulty": "easy",
"scenario_id": "easy_false_positive_vpn",
"description": (
"A low-severity login alert has fired for an employee logging in "
"from an unusual location. The agent must correctly identify this "
"as a false positive (VPN usage) and choose to ignore it β without "
"over-reacting and disrupting a legitimate user."
),
"max_steps": 5,
"success_criteria": "Agent correctly ignores the false-positive alert.",
},
"task_medium": {
"name": "task_medium",
"display_name": "Insider Threat Containment",
"difficulty": "medium",
"scenario_id": "medium_insider_threat",
"description": (
"A departing employee is actively exfiltrating confidential files "
"on their final working days. The agent must investigate, contain "
"the account, preserve evidence, and escalate β in a logical order."
),
"max_steps": 8,
"success_criteria": "Agent escalates after investigating and blocking the account.",
},
"task_hard": {
"name": "task_hard",
"display_name": "APT Lateral Movement Response",
"difficulty": "hard",
"scenario_id": "hard_apt_lateral_movement",
"description": (
"An Advanced Persistent Threat (APT) is actively moving laterally "
"through the network using Cobalt Strike. The agent must identify "
"the attack chain, isolate compromised hosts, block C2 communication, "
"collect forensic evidence, and escalate to Tier-2 β all within "
"12 steps on a noisy, high-stakes incident."
),
"max_steps": 12,
"success_criteria": "Agent isolates device, blocks IP, collects forensics, and escalates.",
},
}
def _clamp(score: float) -> float:
"""Clamp score to strictly open interval (0.01, 0.99)."""
return round(min(0.99, max(0.01, score)), 2)
def grade_task_easy(actions: List[str], final_score: float, done: bool) -> TaskResult:
investigated = "investigate" in actions
ignored = "ignore" in actions
overreacted = any(a in actions for a in ["block_account", "isolate_device", "escalate", "block_ip"])
if overreacted:
score = 0.02
success = False
reason = "Over-reacted to a false positive β disrupted a legitimate user."
elif ignored and investigated:
extra = max(0, len(actions) - 2)
score = max(0.70, 0.98 - extra * 0.05)
success = True
reason = f"Correctly identified false positive after investigation. Steps: {len(actions)}"
elif ignored:
score = 0.78
success = True
reason = "Correctly ignored false positive (skipped investigation step)."
elif investigated:
score = 0.40
success = False
reason = "Investigated but failed to conclude this was a false positive."
else:
score = max(0.01, 0.10 * len(actions)) if actions else 0.01
success = False
reason = "Did not reach a conclusion."
return TaskResult(
task_name="task_easy",
score=_clamp(score),
steps_taken=len(actions),
actions_taken=actions,
success=success,
reason=reason,
)
def grade_task_medium(actions: List[str], final_score: float, done: bool) -> TaskResult:
score = 0.0
investigated = "investigate" in actions
blocked = "block_account" in actions
forensics = "collect_forensics" in actions
escalated = "escalate" in actions
if investigated: score += 0.20
if blocked: score += 0.25
if forensics: score += 0.20
if escalated: score += 0.25
try:
idx_inv = actions.index("investigate")
idx_blk = actions.index("block_account")
idx_esc = actions.index("escalate")
if idx_inv < idx_blk < idx_esc:
score += 0.05
except ValueError:
pass
extra = max(0, len(actions) - 6)
score -= extra * 0.05
score = _clamp(score)
success = score >= 0.70
reason_parts = []
if not investigated: reason_parts.append("missing investigation")
if not blocked: reason_parts.append("account not blocked")
if not forensics: reason_parts.append("no forensics collected")
if not escalated: reason_parts.append("not escalated")
reason = (
f"Score {score:.2f}. " +
(f"Missing: {', '.join(reason_parts)}." if reason_parts else "All key actions taken.")
)
return TaskResult(
task_name="task_medium",
score=score,
steps_taken=len(actions),
actions_taken=actions,
success=success,
reason=reason,
)
def grade_task_hard(actions: List[str], final_score: float, done: bool) -> TaskResult:
ignored = "ignore" in actions
if ignored:
return TaskResult(
task_name="task_hard",
score=0.01,
steps_taken=len(actions),
actions_taken=actions,
success=False,
reason="Critical APT incident ignored β catastrophic failure.",
)
score = 0.0
investigated = "investigate" in actions
isolated = "isolate_device" in actions
blocked_ip = "block_ip" in actions
forensics = "collect_forensics" in actions
escalated = "escalate" in actions
if investigated: score += 0.15
if isolated: score += 0.20
if blocked_ip: score += 0.20
if forensics: score += 0.20
if escalated: score += 0.15
key_actions = ["investigate", "isolate_device", "block_ip", "collect_forensics", "escalate"]
present_in_order = [a for a in actions if a in key_actions]
expected_order = [a for a in key_actions if a in actions]
if present_in_order == expected_order and len(expected_order) == 5:
score += 0.05
extra = max(0, len(actions) - 8)
score -= extra * 0.04
score = _clamp(score)
success = score >= 0.65
missing = [a for a in key_actions if a not in actions]
reason = (
f"Score {score:.2f}. " +
(f"Missing key actions: {', '.join(missing)}." if missing else "All critical actions taken.")
)
return TaskResult(
task_name="task_hard",
score=score,
steps_taken=len(actions),
actions_taken=actions,
success=success,
reason=reason,
)
GRADERS = {
"task_easy": grade_task_easy,
"task_medium": grade_task_medium,
"task_hard": grade_task_hard,
}
def run_grader(task_name: str, actions: List[str], final_score: float, done: bool) -> TaskResult:
if task_name not in GRADERS:
raise ValueError(f"Unknown task: {task_name}. Available: {list(GRADERS.keys())}")
return GRADERS[task_name](actions, final_score, done) |