""" ForensicShell Environment — server-side implementation. The environment pre-seeds a fake "breached" Linux filesystem in memory. The agent interrogates it with structured read-only actions (list_dir, read_file, grep, stat) and finishes the episode by submitting a ForensicReport via action_type='submit_report'. A deterministic grader scores the report against hidden ground truth and returns a reward in [0.0, 1.0] on the terminal step. """ import hashlib import os from pathlib import Path from typing import Dict, List, Optional, Tuple from uuid import uuid4 from openenv.core.env_server.interfaces import Environment from openenv.core.env_server.types import EnvironmentMetadata, State try: from ..models import ForensicShellAction, ForensicShellObservation except ImportError: from models import ForensicShellAction, ForensicShellObservation try: from .grader import grade from .scenario_generator import generate_scenario from .scenarios import DEFAULT_TASK_ID, SCENARIOS except ImportError: from grader import grade # type: ignore from scenario_generator import generate_scenario # type: ignore from scenarios import DEFAULT_TASK_ID, SCENARIOS # type: ignore MAX_STEPS_PER_EPISODE = 30 # default fallback # Difficulty-dependent step budgets. Easier tasks shouldn't reward aimless # exploration; harder tasks with red herrings genuinely need more budget. STEPS_BY_DIFFICULTY = {"easy": 15, "medium": 25, "hard": 35} # Hand-authored task overrides (kept for backward compat with Day-1 baselines) STEPS_BY_TASK = {"t1_login": 15, "t2_modified": 25, "t3_timeline": 35} # Exploration shaping reward — small positive reward the first time the agent # reads one of the scenario's "canonical forensic artifacts" (auth.log, bash # histories, cron files, backdoor path, etc.). Capped so the terminal grader # reward always dominates the trajectory return. SHAPING_REWARD_PER_READ = 0.02 SHAPING_REWARD_CAP = 0.10 def _canonical_artifacts(scenario: dict) -> set: """ Pick out the set of paths in a scenario that a good investigator *should* read. For hand-authored scenarios we use the ground-truth modified_files plus a fixed set of classic forensic log paths. For generated scenarios we also include the bash history of the compromised user. """ gt = scenario.get("ground_truth", {}) or {} paths: set = set() paths.update(gt.get("modified_files", []) or []) for p in ( "/var/log/auth.log", "/var/log/auth.log.1", "/etc/passwd", "/etc/shadow", ): if p in scenario.get("filesystem", {}): paths.add(p) user = gt.get("compromised_user") if user: bh = f"/home/{user}/.bash_history" if bh in scenario.get("filesystem", {}): paths.add(bh) return paths def _as_bytes(content) -> bytes: if isinstance(content, bytes): return content return str(content).encode("utf-8", errors="replace") def _as_text(content) -> str: if isinstance(content, bytes): try: return content.decode("utf-8") except UnicodeDecodeError: return f"" return str(content) class ForensicShellEnvironment(Environment): """ Pre-seeded forensic investigation environment. Agent actions are read-only file operations over a synthetic filesystem kept in a Python dict. Episode flow: reset(task_id='t1_login' | 't2_modified' | 't3_timeline') -> initial obs step(list_dir|read_file|grep|stat) -> obs with output step(submit_report(ForensicReport)) -> terminal obs with reward in [0,1] """ SUPPORTS_CONCURRENT_SESSIONS: bool = True def __init__(self): self._state = State(episode_id=str(uuid4()), step_count=0) self._task_id: str = DEFAULT_TASK_ID self._scenario: dict = SCENARIOS[DEFAULT_TASK_ID] self._fs: Dict[str, object] = {} self._done: bool = False self._steps_used: int = 0 self._useful_read: set = set() # paths already rewarded self._shaping_total: float = 0.0 # running sum, capped at SHAPING_REWARD_CAP self._canonical: set = set() # per-episode canonical artifact set # ---- episode lifecycle --------------------------------------------------- def reset( self, task_id: Optional[str] = None, seed: Optional[int] = None, difficulty: Optional[int] = None, pattern: Optional[str] = None, **kwargs, ) -> ForensicShellObservation: """ Load either a hand-authored scenario (by task_id) OR a procedurally generated one (by seed+difficulty+pattern). If seed is given, generator wins; otherwise fall back to task_id lookup, then DEFAULT_TASK_ID. """ if seed is not None: scenario = generate_scenario( seed=int(seed), difficulty=int(difficulty) if difficulty is not None else 3, pattern=pattern, ) self._task_id = scenario["task_id"] self._scenario = scenario else: env_task = os.getenv("FORENSIC_TASK_ID") chosen = task_id or env_task or DEFAULT_TASK_ID if chosen not in SCENARIOS: chosen = DEFAULT_TASK_ID self._task_id = chosen self._scenario = SCENARIOS[chosen] self._fs = dict(self._scenario["filesystem"]) self._done = False self._steps_used = 0 self._useful_read = set() self._shaping_total = 0.0 self._canonical = _canonical_artifacts(self._scenario) self._state = State(episode_id=str(uuid4()), step_count=0) # Difficulty-dependent step budget diff_label = self._scenario.get("difficulty", "medium") self._max_steps = ( STEPS_BY_TASK.get(self._task_id) or STEPS_BY_DIFFICULTY.get(diff_label) or MAX_STEPS_PER_EPISODE ) return ForensicShellObservation( output=( f"ForensicShell ready. Task: {self._task_id} " f"({diff_label}).\n" f"Available actions: list_dir(path), read_file(path,max_bytes), " f"grep(pattern,path), stat(path), find(pattern,path), submit_report(report).\n" f"Start by listing /var/log or /home." ), task_id=self._task_id, task_description=self._scenario["description"], steps_remaining=self._max_steps, action_error=None, done=False, reward=0.0, metadata={ "difficulty": diff_label, "max_steps": self._max_steps, }, ) # ---- action dispatch ----------------------------------------------------- def step(self, action: ForensicShellAction) -> ForensicShellObservation: # type: ignore[override] self._state.step_count += 1 self._steps_used += 1 steps_remaining = max(0, self._max_steps - self._steps_used) # If already done, return a terminal obs (grace) if self._done: return self._obs( output="Episode already ended. Call reset() to start a new one.", steps_remaining=0, error="episode_done", done=True, reward=0.0, ) # Hard cap on steps if self._steps_used > self._max_steps: self._done = True return self._obs( output="Step budget exhausted without a submitted report.", steps_remaining=0, error="step_budget_exhausted", done=True, reward=0.0, ) verb = action.action_type try: if verb == "list_dir": out, err = self._do_list_dir(action.path or "/") return self._obs(output=out, steps_remaining=steps_remaining, error=err, done=False, reward=0.0) if verb == "read_file": path = action.path or "" out, err = self._do_read_file(path, action.max_bytes or 2048) shaped = self._award_shaping(path) if err is None else 0.0 return self._obs(output=out, steps_remaining=steps_remaining, error=err, done=False, reward=shaped) if verb == "grep": path = action.path or "" out, err = self._do_grep(action.pattern or "", path) shaped = self._award_shaping(path) if err is None else 0.0 return self._obs(output=out, steps_remaining=steps_remaining, error=err, done=False, reward=shaped) if verb == "stat": out, err = self._do_stat(action.path or "") return self._obs(output=out, steps_remaining=steps_remaining, error=err, done=False, reward=0.0) if verb == "find": out, err = self._do_find(action.pattern or "*", action.path or "/") return self._obs(output=out, steps_remaining=steps_remaining, error=err, done=False, reward=0.0) if verb == "submit_report": return self._do_submit_report(action, steps_remaining) return self._obs( output="", steps_remaining=steps_remaining, error=f"unknown action_type: {verb}", done=False, reward=0.0, ) except Exception as e: # pragma: no cover - defensive return self._obs( output="", steps_remaining=steps_remaining, error=f"internal_error: {type(e).__name__}: {e}", done=False, reward=0.0, ) def _do_find(self, pattern: str, path: str) -> Tuple[str, Optional[str]]: """Recursive search: find files matching a glob pattern under a directory.""" from fnmatch import fnmatch path = path.rstrip("/") or "/" prefix = "/" if path == "/" else path + "/" if path == "/": prefix = "/" matches: List[str] = [] for fp in sorted(self._fs.keys()): if fp == path or fp.startswith(prefix): basename = fp.rsplit("/", 1)[-1] if "/" in fp else fp if fnmatch(basename, pattern): matches.append(fp) if len(matches) >= 50: break if not matches: return f"(no files matching {pattern!r} under {path})", None return "\n".join(matches), None # ---- shaping reward ----------------------------------------------------- def _award_shaping(self, path: str) -> float: """ Return +SHAPING_REWARD_PER_READ the first time the agent touches a canonical forensic artifact, capped so the cumulative shaping stays <= SHAPING_REWARD_CAP across the episode. """ if not path or path not in self._canonical: return 0.0 if path in self._useful_read: return 0.0 if self._shaping_total + 1e-9 >= SHAPING_REWARD_CAP: return 0.0 self._useful_read.add(path) grant = min(SHAPING_REWARD_PER_READ, SHAPING_REWARD_CAP - self._shaping_total) self._shaping_total += grant return float(grant) # ---- action primitives --------------------------------------------------- def _do_list_dir(self, path: str) -> Tuple[str, Optional[str]]: path = path.rstrip("/") or "/" prefix = "/" if path == "/" else path + "/" entries = set() for fp in self._fs.keys(): if not fp.startswith(prefix): continue rest = fp[len(prefix):] if not rest: continue head = rest.split("/", 1)[0] entries.add(head) if not entries: return "", f"no such directory or empty: {path}" listing = "\n".join(sorted(entries)) return f"{path}:\n{listing}", None def _do_read_file(self, path: str, max_bytes: int) -> Tuple[str, Optional[str]]: if path not in self._fs: return "", f"no such file: {path}" content = self._fs[path] text = _as_text(content) if max_bytes and len(text) > max_bytes: text = text[:max_bytes] + f"\n... [truncated at {max_bytes} bytes]" return text, None def _do_grep(self, pattern: str, path: str) -> Tuple[str, Optional[str]]: if not pattern: return "", "empty pattern" if path not in self._fs: return "", f"no such file: {path}" text = _as_text(self._fs[path]) hits: List[str] = [] for i, line in enumerate(text.splitlines(), start=1): if pattern in line: hits.append(f"{i}: {line}") if len(hits) >= 100: break if not hits: return f"(no matches for {pattern!r} in {path})", None return "\n".join(hits), None def _do_stat(self, path: str) -> Tuple[str, Optional[str]]: if path not in self._fs: return "", f"no such file: {path}" content = self._fs[path] raw = _as_bytes(content) sha = hashlib.sha256(raw).hexdigest() return ( f"path={path}\nsize={len(raw)}\nsha256={sha}", None, ) def _do_submit_report( self, action: ForensicShellAction, steps_remaining: int ) -> ForensicShellObservation: if action.report is None: return self._obs( output="submit_report requires a 'report' field.", steps_remaining=steps_remaining, error="missing_report", done=False, reward=0.0, ) report_dict = action.report.model_dump(mode="json") truth = self._scenario["ground_truth"] reward = grade(self._task_id, report_dict, truth) self._done = True summary = ( f"Report received for task {self._task_id}. " f"Reward: {reward:.3f}. Episode complete." ) return self._obs( output=summary, steps_remaining=0, error=None, done=True, reward=reward, extra_metadata={"submitted_report": report_dict, "task_id": self._task_id}, ) # ---- obs helper ---------------------------------------------------------- def _obs( self, output: str, steps_remaining: int, error: Optional[str], done: bool, reward: float, extra_metadata: Optional[Dict] = None, ) -> ForensicShellObservation: meta: Dict = { "task_id": self._task_id, "step": self._state.step_count, } if extra_metadata: meta.update(extra_metadata) return ForensicShellObservation( output=output, task_id=self._task_id, task_description=self._scenario.get("description", ""), steps_remaining=steps_remaining, action_error=error, done=done, reward=reward, metadata=meta, ) # ---- state + metadata ---------------------------------------------------- @property def state(self) -> State: return self._state def get_metadata(self) -> EnvironmentMetadata: """ Override the OpenEnv default to populate the /metadata endpoint with a real name, description, embedded README, version, author, and docs URL — instead of the boilerplate auto-derived from the class name. """ readme_path = Path(__file__).resolve().parent.parent / "README.md" readme_content: Optional[str] = None if readme_path.exists(): try: readme_content = readme_path.read_text(encoding="utf-8") except OSError: readme_content = None return EnvironmentMetadata( name="ForensicShell", description=( "Digital-forensics investigation environment for OpenEnv RL. The " "agent reads logs, hashes backdoors, and reconstructs attacker " "kill-chains across 5 attack patterns and 5 difficulty tiers. " "Procedural scenarios via deterministic seeds; deterministic " "graders return rewards in [0, 1] with partial credit (Jaccard, " "F1, Kendall-tau)." ), readme_content=readme_content, version="0.2.0", author="yashppawar", documentation_url="https://huggingface.co/spaces/yashppawar/forensic-shell", )