forensic-shell / server /forensic_shell_environment.py
yashppawar's picture
Upload folder using huggingface_hub
6f6baad verified
"""
ForensicShell Environment — server-side implementation.
The environment pre-seeds a fake "breached" Linux filesystem in memory. The agent
interrogates it with structured read-only actions (list_dir, read_file, grep, stat)
and finishes the episode by submitting a ForensicReport via action_type='submit_report'.
A deterministic grader scores the report against hidden ground truth and returns
a reward in [0.0, 1.0] on the terminal step.
"""
import hashlib
import os
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from uuid import uuid4
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import EnvironmentMetadata, State
try:
from ..models import ForensicShellAction, ForensicShellObservation
except ImportError:
from models import ForensicShellAction, ForensicShellObservation
try:
from .grader import grade
from .scenario_generator import generate_scenario
from .scenarios import DEFAULT_TASK_ID, SCENARIOS
except ImportError:
from grader import grade # type: ignore
from scenario_generator import generate_scenario # type: ignore
from scenarios import DEFAULT_TASK_ID, SCENARIOS # type: ignore
MAX_STEPS_PER_EPISODE = 30 # default fallback
# Difficulty-dependent step budgets. Easier tasks shouldn't reward aimless
# exploration; harder tasks with red herrings genuinely need more budget.
STEPS_BY_DIFFICULTY = {"easy": 15, "medium": 25, "hard": 35}
# Hand-authored task overrides (kept for backward compat with Day-1 baselines)
STEPS_BY_TASK = {"t1_login": 15, "t2_modified": 25, "t3_timeline": 35}
# Exploration shaping reward — small positive reward the first time the agent
# reads one of the scenario's "canonical forensic artifacts" (auth.log, bash
# histories, cron files, backdoor path, etc.). Capped so the terminal grader
# reward always dominates the trajectory return.
SHAPING_REWARD_PER_READ = 0.02
SHAPING_REWARD_CAP = 0.10
def _canonical_artifacts(scenario: dict) -> set:
"""
Pick out the set of paths in a scenario that a good investigator *should*
read. For hand-authored scenarios we use the ground-truth modified_files
plus a fixed set of classic forensic log paths. For generated scenarios we
also include the bash history of the compromised user.
"""
gt = scenario.get("ground_truth", {}) or {}
paths: set = set()
paths.update(gt.get("modified_files", []) or [])
for p in (
"/var/log/auth.log",
"/var/log/auth.log.1",
"/etc/passwd",
"/etc/shadow",
):
if p in scenario.get("filesystem", {}):
paths.add(p)
user = gt.get("compromised_user")
if user:
bh = f"/home/{user}/.bash_history"
if bh in scenario.get("filesystem", {}):
paths.add(bh)
return paths
def _as_bytes(content) -> bytes:
if isinstance(content, bytes):
return content
return str(content).encode("utf-8", errors="replace")
def _as_text(content) -> str:
if isinstance(content, bytes):
try:
return content.decode("utf-8")
except UnicodeDecodeError:
return f"<binary:{len(content)} bytes>"
return str(content)
class ForensicShellEnvironment(Environment):
"""
Pre-seeded forensic investigation environment. Agent actions are read-only file
operations over a synthetic filesystem kept in a Python dict.
Episode flow:
reset(task_id='t1_login' | 't2_modified' | 't3_timeline') -> initial obs
step(list_dir|read_file|grep|stat) -> obs with output
step(submit_report(ForensicReport)) -> terminal obs with reward in [0,1]
"""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(self):
self._state = State(episode_id=str(uuid4()), step_count=0)
self._task_id: str = DEFAULT_TASK_ID
self._scenario: dict = SCENARIOS[DEFAULT_TASK_ID]
self._fs: Dict[str, object] = {}
self._done: bool = False
self._steps_used: int = 0
self._useful_read: set = set() # paths already rewarded
self._shaping_total: float = 0.0 # running sum, capped at SHAPING_REWARD_CAP
self._canonical: set = set() # per-episode canonical artifact set
# ---- episode lifecycle ---------------------------------------------------
def reset(
self,
task_id: Optional[str] = None,
seed: Optional[int] = None,
difficulty: Optional[int] = None,
pattern: Optional[str] = None,
**kwargs,
) -> ForensicShellObservation:
"""
Load either a hand-authored scenario (by task_id) OR a procedurally
generated one (by seed+difficulty+pattern). If seed is given, generator
wins; otherwise fall back to task_id lookup, then DEFAULT_TASK_ID.
"""
if seed is not None:
scenario = generate_scenario(
seed=int(seed),
difficulty=int(difficulty) if difficulty is not None else 3,
pattern=pattern,
)
self._task_id = scenario["task_id"]
self._scenario = scenario
else:
env_task = os.getenv("FORENSIC_TASK_ID")
chosen = task_id or env_task or DEFAULT_TASK_ID
if chosen not in SCENARIOS:
chosen = DEFAULT_TASK_ID
self._task_id = chosen
self._scenario = SCENARIOS[chosen]
self._fs = dict(self._scenario["filesystem"])
self._done = False
self._steps_used = 0
self._useful_read = set()
self._shaping_total = 0.0
self._canonical = _canonical_artifacts(self._scenario)
self._state = State(episode_id=str(uuid4()), step_count=0)
# Difficulty-dependent step budget
diff_label = self._scenario.get("difficulty", "medium")
self._max_steps = (
STEPS_BY_TASK.get(self._task_id)
or STEPS_BY_DIFFICULTY.get(diff_label)
or MAX_STEPS_PER_EPISODE
)
return ForensicShellObservation(
output=(
f"ForensicShell ready. Task: {self._task_id} "
f"({diff_label}).\n"
f"Available actions: list_dir(path), read_file(path,max_bytes), "
f"grep(pattern,path), stat(path), find(pattern,path), submit_report(report).\n"
f"Start by listing /var/log or /home."
),
task_id=self._task_id,
task_description=self._scenario["description"],
steps_remaining=self._max_steps,
action_error=None,
done=False,
reward=0.0,
metadata={
"difficulty": diff_label,
"max_steps": self._max_steps,
},
)
# ---- action dispatch -----------------------------------------------------
def step(self, action: ForensicShellAction) -> ForensicShellObservation: # type: ignore[override]
self._state.step_count += 1
self._steps_used += 1
steps_remaining = max(0, self._max_steps - self._steps_used)
# If already done, return a terminal obs (grace)
if self._done:
return self._obs(
output="Episode already ended. Call reset() to start a new one.",
steps_remaining=0,
error="episode_done",
done=True,
reward=0.0,
)
# Hard cap on steps
if self._steps_used > self._max_steps:
self._done = True
return self._obs(
output="Step budget exhausted without a submitted report.",
steps_remaining=0,
error="step_budget_exhausted",
done=True,
reward=0.0,
)
verb = action.action_type
try:
if verb == "list_dir":
out, err = self._do_list_dir(action.path or "/")
return self._obs(output=out, steps_remaining=steps_remaining, error=err, done=False, reward=0.0)
if verb == "read_file":
path = action.path or ""
out, err = self._do_read_file(path, action.max_bytes or 2048)
shaped = self._award_shaping(path) if err is None else 0.0
return self._obs(output=out, steps_remaining=steps_remaining, error=err, done=False, reward=shaped)
if verb == "grep":
path = action.path or ""
out, err = self._do_grep(action.pattern or "", path)
shaped = self._award_shaping(path) if err is None else 0.0
return self._obs(output=out, steps_remaining=steps_remaining, error=err, done=False, reward=shaped)
if verb == "stat":
out, err = self._do_stat(action.path or "")
return self._obs(output=out, steps_remaining=steps_remaining, error=err, done=False, reward=0.0)
if verb == "find":
out, err = self._do_find(action.pattern or "*", action.path or "/")
return self._obs(output=out, steps_remaining=steps_remaining, error=err, done=False, reward=0.0)
if verb == "submit_report":
return self._do_submit_report(action, steps_remaining)
return self._obs(
output="",
steps_remaining=steps_remaining,
error=f"unknown action_type: {verb}",
done=False,
reward=0.0,
)
except Exception as e: # pragma: no cover - defensive
return self._obs(
output="",
steps_remaining=steps_remaining,
error=f"internal_error: {type(e).__name__}: {e}",
done=False,
reward=0.0,
)
def _do_find(self, pattern: str, path: str) -> Tuple[str, Optional[str]]:
"""Recursive search: find files matching a glob pattern under a directory."""
from fnmatch import fnmatch
path = path.rstrip("/") or "/"
prefix = "/" if path == "/" else path + "/"
if path == "/":
prefix = "/"
matches: List[str] = []
for fp in sorted(self._fs.keys()):
if fp == path or fp.startswith(prefix):
basename = fp.rsplit("/", 1)[-1] if "/" in fp else fp
if fnmatch(basename, pattern):
matches.append(fp)
if len(matches) >= 50:
break
if not matches:
return f"(no files matching {pattern!r} under {path})", None
return "\n".join(matches), None
# ---- shaping reward -----------------------------------------------------
def _award_shaping(self, path: str) -> float:
"""
Return +SHAPING_REWARD_PER_READ the first time the agent touches a
canonical forensic artifact, capped so the cumulative shaping stays
<= SHAPING_REWARD_CAP across the episode.
"""
if not path or path not in self._canonical:
return 0.0
if path in self._useful_read:
return 0.0
if self._shaping_total + 1e-9 >= SHAPING_REWARD_CAP:
return 0.0
self._useful_read.add(path)
grant = min(SHAPING_REWARD_PER_READ, SHAPING_REWARD_CAP - self._shaping_total)
self._shaping_total += grant
return float(grant)
# ---- action primitives ---------------------------------------------------
def _do_list_dir(self, path: str) -> Tuple[str, Optional[str]]:
path = path.rstrip("/") or "/"
prefix = "/" if path == "/" else path + "/"
entries = set()
for fp in self._fs.keys():
if not fp.startswith(prefix):
continue
rest = fp[len(prefix):]
if not rest:
continue
head = rest.split("/", 1)[0]
entries.add(head)
if not entries:
return "", f"no such directory or empty: {path}"
listing = "\n".join(sorted(entries))
return f"{path}:\n{listing}", None
def _do_read_file(self, path: str, max_bytes: int) -> Tuple[str, Optional[str]]:
if path not in self._fs:
return "", f"no such file: {path}"
content = self._fs[path]
text = _as_text(content)
if max_bytes and len(text) > max_bytes:
text = text[:max_bytes] + f"\n... [truncated at {max_bytes} bytes]"
return text, None
def _do_grep(self, pattern: str, path: str) -> Tuple[str, Optional[str]]:
if not pattern:
return "", "empty pattern"
if path not in self._fs:
return "", f"no such file: {path}"
text = _as_text(self._fs[path])
hits: List[str] = []
for i, line in enumerate(text.splitlines(), start=1):
if pattern in line:
hits.append(f"{i}: {line}")
if len(hits) >= 100:
break
if not hits:
return f"(no matches for {pattern!r} in {path})", None
return "\n".join(hits), None
def _do_stat(self, path: str) -> Tuple[str, Optional[str]]:
if path not in self._fs:
return "", f"no such file: {path}"
content = self._fs[path]
raw = _as_bytes(content)
sha = hashlib.sha256(raw).hexdigest()
return (
f"path={path}\nsize={len(raw)}\nsha256={sha}",
None,
)
def _do_submit_report(
self, action: ForensicShellAction, steps_remaining: int
) -> ForensicShellObservation:
if action.report is None:
return self._obs(
output="submit_report requires a 'report' field.",
steps_remaining=steps_remaining,
error="missing_report",
done=False,
reward=0.0,
)
report_dict = action.report.model_dump(mode="json")
truth = self._scenario["ground_truth"]
reward = grade(self._task_id, report_dict, truth)
self._done = True
summary = (
f"Report received for task {self._task_id}. "
f"Reward: {reward:.3f}. Episode complete."
)
return self._obs(
output=summary,
steps_remaining=0,
error=None,
done=True,
reward=reward,
extra_metadata={"submitted_report": report_dict, "task_id": self._task_id},
)
# ---- obs helper ----------------------------------------------------------
def _obs(
self,
output: str,
steps_remaining: int,
error: Optional[str],
done: bool,
reward: float,
extra_metadata: Optional[Dict] = None,
) -> ForensicShellObservation:
meta: Dict = {
"task_id": self._task_id,
"step": self._state.step_count,
}
if extra_metadata:
meta.update(extra_metadata)
return ForensicShellObservation(
output=output,
task_id=self._task_id,
task_description=self._scenario.get("description", ""),
steps_remaining=steps_remaining,
action_error=error,
done=done,
reward=reward,
metadata=meta,
)
# ---- state + metadata ----------------------------------------------------
@property
def state(self) -> State:
return self._state
def get_metadata(self) -> EnvironmentMetadata:
"""
Override the OpenEnv default to populate the /metadata endpoint with a real
name, description, embedded README, version, author, and docs URL — instead
of the boilerplate auto-derived from the class name.
"""
readme_path = Path(__file__).resolve().parent.parent / "README.md"
readme_content: Optional[str] = None
if readme_path.exists():
try:
readme_content = readme_path.read_text(encoding="utf-8")
except OSError:
readme_content = None
return EnvironmentMetadata(
name="ForensicShell",
description=(
"Digital-forensics investigation environment for OpenEnv RL. The "
"agent reads logs, hashes backdoors, and reconstructs attacker "
"kill-chains across 5 attack patterns and 5 difficulty tiers. "
"Procedural scenarios via deterministic seeds; deterministic "
"graders return rewards in [0, 1] with partial credit (Jaccard, "
"F1, Kendall-tau)."
),
readme_content=readme_content,
version="0.2.0",
author="yashppawar",
documentation_url="https://huggingface.co/spaces/yashppawar/forensic-shell",
)