yashppawar commited on
Commit
6f6baad
·
verified ·
1 Parent(s): 401c6f8

Upload folder using huggingface_hub

Browse files
agents/llm_policy.py CHANGED
@@ -47,6 +47,7 @@ SYSTEM_PROMPT = textwrap.dedent(
47
  {"action_type": "read_file", "path": "/some/file", "max_bytes": 2048}
48
  {"action_type": "grep", "pattern": "substring", "path": "/some/file"}
49
  {"action_type": "stat", "path": "/some/file"}
 
50
  {"action_type": "submit_report","report": {
51
  "compromised_user": "alice",
52
  "initial_ip": "198.51.100.77",
@@ -63,7 +64,8 @@ SYSTEM_PROMPT = textwrap.dedent(
63
 
64
  Rules:
65
  - Output EXACTLY ONE JSON object. No commentary, no markdown.
66
- - Start with list_dir on /var/log and /home to orient yourself.
 
67
  - Read /var/log/auth.log to find the compromised user and source IP.
68
  - For medium/hard tasks, also find modified files and use 'stat' to
69
  compute the backdoor SHA256 (the stat action returns sha256).
@@ -150,6 +152,8 @@ def action_to_str(action: ForensicShellAction) -> str:
150
  return f"grep({action.pattern!r},{action.path!r})"
151
  if action.action_type == "stat":
152
  return f"stat({action.path!r})"
 
 
153
  if action.action_type == "submit_report":
154
  return "submit_report(...)"
155
  return action.action_type
 
47
  {"action_type": "read_file", "path": "/some/file", "max_bytes": 2048}
48
  {"action_type": "grep", "pattern": "substring", "path": "/some/file"}
49
  {"action_type": "stat", "path": "/some/file"}
50
+ {"action_type": "find", "pattern": "*.log", "path": "/var"}
51
  {"action_type": "submit_report","report": {
52
  "compromised_user": "alice",
53
  "initial_ip": "198.51.100.77",
 
64
 
65
  Rules:
66
  - Output EXACTLY ONE JSON object. No commentary, no markdown.
67
+ - Start with list_dir on /var/log and /home, or find('*', '/') to orient.
68
+ - Use find('*.sh', '/') to discover attacker scripts recursively.
69
  - Read /var/log/auth.log to find the compromised user and source IP.
70
  - For medium/hard tasks, also find modified files and use 'stat' to
71
  compute the backdoor SHA256 (the stat action returns sha256).
 
152
  return f"grep({action.pattern!r},{action.path!r})"
153
  if action.action_type == "stat":
154
  return f"stat({action.path!r})"
155
+ if action.action_type == "find":
156
+ return f"find({action.pattern!r},{action.path!r})"
157
  if action.action_type == "submit_report":
158
  return "submit_report(...)"
159
  return action.action_type
models.py CHANGED
@@ -47,7 +47,7 @@ class ForensicShellAction(Action):
47
  """Agent action. Use action_type to pick the verb; set only the fields that verb needs."""
48
 
49
  action_type: Literal[
50
- "list_dir", "read_file", "grep", "stat", "submit_report"
51
  ] = Field(..., description="Which verb to execute")
52
  path: Optional[str] = Field(
53
  default=None, description="Target path for list_dir / read_file / grep / stat"
 
47
  """Agent action. Use action_type to pick the verb; set only the fields that verb needs."""
48
 
49
  action_type: Literal[
50
+ "list_dir", "read_file", "grep", "stat", "find", "submit_report"
51
  ] = Field(..., description="Which verb to execute")
52
  path: Optional[str] = Field(
53
  default=None, description="Target path for list_dir / read_file / grep / stat"
openenv_forensic_shell.egg-info/SOURCES.txt CHANGED
@@ -1,6 +1,7 @@
1
  README.md
2
  __init__.py
3
  client.py
 
4
  models.py
5
  pyproject.toml
6
  ./__init__.py
 
1
  README.md
2
  __init__.py
3
  client.py
4
+ inference.py
5
  models.py
6
  pyproject.toml
7
  ./__init__.py
server/forensic_shell_environment.py CHANGED
@@ -33,7 +33,13 @@ except ImportError:
33
  from scenarios import DEFAULT_TASK_ID, SCENARIOS # type: ignore
34
 
35
 
36
- MAX_STEPS_PER_EPISODE = 30
 
 
 
 
 
 
37
 
38
  # Exploration shaping reward — small positive reward the first time the agent
39
  # reads one of the scenario's "canonical forensic artifacts" (auth.log, bash
@@ -147,23 +153,31 @@ class ForensicShellEnvironment(Environment):
147
  self._canonical = _canonical_artifacts(self._scenario)
148
  self._state = State(episode_id=str(uuid4()), step_count=0)
149
 
 
 
 
 
 
 
 
 
150
  return ForensicShellObservation(
151
  output=(
152
  f"ForensicShell ready. Task: {self._task_id} "
153
- f"({self._scenario.get('difficulty', 'unknown')}).\n"
154
  f"Available actions: list_dir(path), read_file(path,max_bytes), "
155
- f"grep(pattern,path), stat(path), submit_report(report).\n"
156
  f"Start by listing /var/log or /home."
157
  ),
158
  task_id=self._task_id,
159
  task_description=self._scenario["description"],
160
- steps_remaining=MAX_STEPS_PER_EPISODE,
161
  action_error=None,
162
  done=False,
163
  reward=0.0,
164
  metadata={
165
- "difficulty": self._scenario.get("difficulty", ""),
166
- "max_steps": MAX_STEPS_PER_EPISODE,
167
  },
168
  )
169
 
@@ -172,7 +186,7 @@ class ForensicShellEnvironment(Environment):
172
  def step(self, action: ForensicShellAction) -> ForensicShellObservation: # type: ignore[override]
173
  self._state.step_count += 1
174
  self._steps_used += 1
175
- steps_remaining = max(0, MAX_STEPS_PER_EPISODE - self._steps_used)
176
 
177
  # If already done, return a terminal obs (grace)
178
  if self._done:
@@ -185,7 +199,7 @@ class ForensicShellEnvironment(Environment):
185
  )
186
 
187
  # Hard cap on steps
188
- if self._steps_used > MAX_STEPS_PER_EPISODE:
189
  self._done = True
190
  return self._obs(
191
  output="Step budget exhausted without a submitted report.",
@@ -218,6 +232,10 @@ class ForensicShellEnvironment(Environment):
218
  out, err = self._do_stat(action.path or "")
219
  return self._obs(output=out, steps_remaining=steps_remaining, error=err, done=False, reward=0.0)
220
 
 
 
 
 
221
  if verb == "submit_report":
222
  return self._do_submit_report(action, steps_remaining)
223
 
@@ -238,6 +256,26 @@ class ForensicShellEnvironment(Environment):
238
  reward=0.0,
239
  )
240
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
241
  # ---- shaping reward -----------------------------------------------------
242
 
243
  def _award_shaping(self, path: str) -> float:
 
33
  from scenarios import DEFAULT_TASK_ID, SCENARIOS # type: ignore
34
 
35
 
36
+ MAX_STEPS_PER_EPISODE = 30 # default fallback
37
+
38
+ # Difficulty-dependent step budgets. Easier tasks shouldn't reward aimless
39
+ # exploration; harder tasks with red herrings genuinely need more budget.
40
+ STEPS_BY_DIFFICULTY = {"easy": 15, "medium": 25, "hard": 35}
41
+ # Hand-authored task overrides (kept for backward compat with Day-1 baselines)
42
+ STEPS_BY_TASK = {"t1_login": 15, "t2_modified": 25, "t3_timeline": 35}
43
 
44
  # Exploration shaping reward — small positive reward the first time the agent
45
  # reads one of the scenario's "canonical forensic artifacts" (auth.log, bash
 
153
  self._canonical = _canonical_artifacts(self._scenario)
154
  self._state = State(episode_id=str(uuid4()), step_count=0)
155
 
156
+ # Difficulty-dependent step budget
157
+ diff_label = self._scenario.get("difficulty", "medium")
158
+ self._max_steps = (
159
+ STEPS_BY_TASK.get(self._task_id)
160
+ or STEPS_BY_DIFFICULTY.get(diff_label)
161
+ or MAX_STEPS_PER_EPISODE
162
+ )
163
+
164
  return ForensicShellObservation(
165
  output=(
166
  f"ForensicShell ready. Task: {self._task_id} "
167
+ f"({diff_label}).\n"
168
  f"Available actions: list_dir(path), read_file(path,max_bytes), "
169
+ f"grep(pattern,path), stat(path), find(pattern,path), submit_report(report).\n"
170
  f"Start by listing /var/log or /home."
171
  ),
172
  task_id=self._task_id,
173
  task_description=self._scenario["description"],
174
+ steps_remaining=self._max_steps,
175
  action_error=None,
176
  done=False,
177
  reward=0.0,
178
  metadata={
179
+ "difficulty": diff_label,
180
+ "max_steps": self._max_steps,
181
  },
182
  )
183
 
 
186
  def step(self, action: ForensicShellAction) -> ForensicShellObservation: # type: ignore[override]
187
  self._state.step_count += 1
188
  self._steps_used += 1
189
+ steps_remaining = max(0, self._max_steps - self._steps_used)
190
 
191
  # If already done, return a terminal obs (grace)
192
  if self._done:
 
199
  )
200
 
201
  # Hard cap on steps
202
+ if self._steps_used > self._max_steps:
203
  self._done = True
204
  return self._obs(
205
  output="Step budget exhausted without a submitted report.",
 
232
  out, err = self._do_stat(action.path or "")
233
  return self._obs(output=out, steps_remaining=steps_remaining, error=err, done=False, reward=0.0)
234
 
235
+ if verb == "find":
236
+ out, err = self._do_find(action.pattern or "*", action.path or "/")
237
+ return self._obs(output=out, steps_remaining=steps_remaining, error=err, done=False, reward=0.0)
238
+
239
  if verb == "submit_report":
240
  return self._do_submit_report(action, steps_remaining)
241
 
 
256
  reward=0.0,
257
  )
258
 
259
+ def _do_find(self, pattern: str, path: str) -> Tuple[str, Optional[str]]:
260
+ """Recursive search: find files matching a glob pattern under a directory."""
261
+ from fnmatch import fnmatch
262
+
263
+ path = path.rstrip("/") or "/"
264
+ prefix = "/" if path == "/" else path + "/"
265
+ if path == "/":
266
+ prefix = "/"
267
+ matches: List[str] = []
268
+ for fp in sorted(self._fs.keys()):
269
+ if fp == path or fp.startswith(prefix):
270
+ basename = fp.rsplit("/", 1)[-1] if "/" in fp else fp
271
+ if fnmatch(basename, pattern):
272
+ matches.append(fp)
273
+ if len(matches) >= 50:
274
+ break
275
+ if not matches:
276
+ return f"(no files matching {pattern!r} under {path})", None
277
+ return "\n".join(matches), None
278
+
279
  # ---- shaping reward -----------------------------------------------------
280
 
281
  def _award_shaping(self, path: str) -> float:
server/grader.py CHANGED
@@ -4,6 +4,15 @@ Deterministic graders for ForensicShell tasks.
4
  Each grader takes a submitted ForensicReport (as dict) and the scenario ground-truth
5
  dict and returns a float in [0.0, 1.0]. Partial credit is awarded per correct subfield
6
  so the reward function has meaningful gradient, not just 0/1.
 
 
 
 
 
 
 
 
 
7
  """
8
 
9
  from typing import Dict, List
@@ -13,14 +22,25 @@ def _safe_str(x) -> str:
13
  return (x or "").strip().lower() if isinstance(x, str) else ""
14
 
15
 
16
- def _jaccard(a: List[str], b: List[str]) -> float:
17
- sa = {s.strip() for s in a if isinstance(s, str) and s.strip()}
18
- sb = {s.strip() for s in b if isinstance(s, str) and s.strip()}
19
- if not sa and not sb:
 
 
 
 
 
20
  return 1.0
21
- if not sa or not sb:
22
  return 0.0
23
- return len(sa & sb) / len(sa | sb)
 
 
 
 
 
 
24
 
25
 
26
  def _kendall_tau_normalized(pred_order: List[str], true_order: List[str]) -> float:
@@ -69,7 +89,8 @@ def _grade_t1_login(report: Dict, truth: Dict) -> float:
69
  def _grade_t2_modified(report: Dict, truth: Dict) -> float:
70
  user_ok = 1.0 if _safe_str(report.get("compromised_user")) == _safe_str(truth.get("compromised_user")) else 0.0
71
  ip_ok = 1.0 if _safe_str(report.get("initial_ip")) == _safe_str(truth.get("initial_ip")) else 0.0
72
- files_score = _jaccard(report.get("modified_files") or [], truth.get("modified_files") or [])
 
73
  sha_ok = 1.0 if _safe_str(report.get("backdoor_sha256")) == _safe_str(truth.get("backdoor_sha256")) else 0.0
74
  return 0.2 * user_ok + 0.2 * ip_ok + 0.3 * files_score + 0.3 * sha_ok
75
 
@@ -77,7 +98,8 @@ def _grade_t2_modified(report: Dict, truth: Dict) -> float:
77
  def _grade_t3_timeline(report: Dict, truth: Dict) -> float:
78
  user_ok = 1.0 if _safe_str(report.get("compromised_user")) == _safe_str(truth.get("compromised_user")) else 0.0
79
  ip_ok = 1.0 if _safe_str(report.get("initial_ip")) == _safe_str(truth.get("initial_ip")) else 0.0
80
- files_score = _jaccard(report.get("modified_files") or [], truth.get("modified_files") or [])
 
81
  sha_ok = 1.0 if _safe_str(report.get("backdoor_sha256")) == _safe_str(truth.get("backdoor_sha256")) else 0.0
82
 
83
  pred_timeline = report.get("timeline") or []
@@ -89,7 +111,7 @@ def _grade_t3_timeline(report: Dict, truth: Dict) -> float:
89
  pred_phases = [p for p in pred_phases if isinstance(p, str)]
90
  true_phases = [e["phase"] for e in true_timeline]
91
 
92
- # F1 over phase set
93
  pred_set = set(pred_phases)
94
  true_set = set(true_phases)
95
  if not pred_set and not true_set:
@@ -102,16 +124,20 @@ def _grade_t3_timeline(report: Dict, truth: Dict) -> float:
102
  recall = tp / len(true_set)
103
  phase_f1 = 0.0 if (precision + recall) == 0 else 2 * precision * recall / (precision + recall)
104
 
105
- # Ordering quality (only if there's overlap to order)
106
  order_score = _kendall_tau_normalized(pred_phases, true_phases)
107
 
 
 
 
 
 
108
  return (
109
  0.15 * user_ok
110
  + 0.15 * ip_ok
111
  + 0.15 * files_score
112
  + 0.15 * sha_ok
113
- + 0.20 * phase_f1
114
- + 0.20 * order_score
115
  )
116
 
117
 
@@ -125,9 +151,7 @@ GRADERS = {
125
  def _grade_generic(report: Dict, truth: Dict) -> float:
126
  """
127
  Dispatcher for procedurally generated scenarios. Picks the right sub-grader
128
- by inspecting which fields are present in the ground-truth dict. This keeps
129
- the grader agnostic to task_id naming and lets the generator add richer
130
- fields without touching this module.
131
  """
132
  if "timeline" in truth:
133
  return _grade_t3_timeline(report, truth)
 
4
  Each grader takes a submitted ForensicReport (as dict) and the scenario ground-truth
5
  dict and returns a float in [0.0, 1.0]. Partial credit is awarded per correct subfield
6
  so the reward function has meaningful gradient, not just 0/1.
7
+
8
+ Design choices:
9
+ - modified_files uses F0.5 (precision-weighted) instead of Jaccard: submitting
10
+ false-positive files (claiming an unmodified file was attacked) is penalized
11
+ more than missing a file. This mirrors real forensics where false positives
12
+ waste incident response effort.
13
+ - Timeline scoring is multiplicative (phase_F1 * ordering): having all 5 phases
14
+ in the wrong order scores 0, not ~0.30. Correct phases AND correct order
15
+ required for full credit.
16
  """
17
 
18
  from typing import Dict, List
 
22
  return (x or "").strip().lower() if isinstance(x, str) else ""
23
 
24
 
25
+ def _fbeta(pred: List[str], truth: List[str], beta: float = 0.5) -> float:
26
+ """
27
+ F-beta score over string sets. beta < 1 weighs precision more than recall.
28
+ F0.5 penalizes false positives (extra wrong files) 2x harder than false
29
+ negatives (missing files), matching real forensic triage priorities.
30
+ """
31
+ pred_set = {s.strip() for s in pred if isinstance(s, str) and s.strip()}
32
+ truth_set = {s.strip() for s in truth if isinstance(s, str) and s.strip()}
33
+ if not pred_set and not truth_set:
34
  return 1.0
35
+ if not pred_set or not truth_set:
36
  return 0.0
37
+ tp = len(pred_set & truth_set)
38
+ precision = tp / len(pred_set)
39
+ recall = tp / len(truth_set)
40
+ if precision + recall == 0:
41
+ return 0.0
42
+ beta2 = beta * beta
43
+ return (1 + beta2) * precision * recall / (beta2 * precision + recall)
44
 
45
 
46
  def _kendall_tau_normalized(pred_order: List[str], true_order: List[str]) -> float:
 
89
  def _grade_t2_modified(report: Dict, truth: Dict) -> float:
90
  user_ok = 1.0 if _safe_str(report.get("compromised_user")) == _safe_str(truth.get("compromised_user")) else 0.0
91
  ip_ok = 1.0 if _safe_str(report.get("initial_ip")) == _safe_str(truth.get("initial_ip")) else 0.0
92
+ # F0.5: precision-weighted false positives penalized harder than false negatives
93
+ files_score = _fbeta(report.get("modified_files") or [], truth.get("modified_files") or [], beta=0.5)
94
  sha_ok = 1.0 if _safe_str(report.get("backdoor_sha256")) == _safe_str(truth.get("backdoor_sha256")) else 0.0
95
  return 0.2 * user_ok + 0.2 * ip_ok + 0.3 * files_score + 0.3 * sha_ok
96
 
 
98
  def _grade_t3_timeline(report: Dict, truth: Dict) -> float:
99
  user_ok = 1.0 if _safe_str(report.get("compromised_user")) == _safe_str(truth.get("compromised_user")) else 0.0
100
  ip_ok = 1.0 if _safe_str(report.get("initial_ip")) == _safe_str(truth.get("initial_ip")) else 0.0
101
+ # F0.5 for files (same precision-weighting as t2)
102
+ files_score = _fbeta(report.get("modified_files") or [], truth.get("modified_files") or [], beta=0.5)
103
  sha_ok = 1.0 if _safe_str(report.get("backdoor_sha256")) == _safe_str(truth.get("backdoor_sha256")) else 0.0
104
 
105
  pred_timeline = report.get("timeline") or []
 
111
  pred_phases = [p for p in pred_phases if isinstance(p, str)]
112
  true_phases = [e["phase"] for e in true_timeline]
113
 
114
+ # F1 over phase set (standard F1 — we don't precision-weight phases)
115
  pred_set = set(pred_phases)
116
  true_set = set(true_phases)
117
  if not pred_set and not true_set:
 
124
  recall = tp / len(true_set)
125
  phase_f1 = 0.0 if (precision + recall) == 0 else 2 * precision * recall / (precision + recall)
126
 
127
+ # Ordering quality
128
  order_score = _kendall_tau_normalized(pred_phases, true_phases)
129
 
130
+ # MULTIPLICATIVE timeline scoring: having all phases in wrong order gives
131
+ # F1=1.0 * tau=0.0 = 0.0, not the ~0.30 an additive scheme would produce.
132
+ # Correct phases AND correct order both required for full timeline credit.
133
+ timeline_score = phase_f1 * order_score
134
+
135
  return (
136
  0.15 * user_ok
137
  + 0.15 * ip_ok
138
  + 0.15 * files_score
139
  + 0.15 * sha_ok
140
+ + 0.40 * timeline_score
 
141
  )
142
 
143
 
 
151
  def _grade_generic(report: Dict, truth: Dict) -> float:
152
  """
153
  Dispatcher for procedurally generated scenarios. Picks the right sub-grader
154
+ by inspecting which fields are present in the ground-truth dict.
 
 
155
  """
156
  if "timeline" in truth:
157
  return _grade_t3_timeline(report, truth)