forensic-shell / inference.py
yashppawar's picture
Upload folder using huggingface_hub
62567eb verified
"""
Inference script for the ForensicShell OpenEnv environment.
Runs a policy (LLM-backed by default, or heuristic via --mock-policy) through
all three difficulty tiers of the ForensicShell environment and emits stdout
logs in the mandatory hackathon format:
[START] task=<task_name> env=<benchmark> model=<model_name>
[STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null>
[END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...>
Required env vars (defaults ONLY for API_BASE_URL and MODEL_NAME):
API_BASE_URL OpenAI-compatible LLM endpoint
MODEL_NAME Model identifier
HF_TOKEN API key for the LLM
LOCAL_IMAGE_NAME Optional — Docker image when using from_docker_image()
"""
import argparse
import asyncio
import os
from typing import List, Optional
from openai import OpenAI
from forensic_shell import ForensicShellAction, ForensicShellEnv
from forensic_shell.agents.llm_policy import (
LLMPolicy,
MockPolicy,
PolicyProtocol,
action_to_str,
)
from forensic_shell.models import ForensicReport
# --- Required environment variables (per hackathon spec) -----------------
# Defaults are set ONLY for API_BASE_URL and MODEL_NAME (NOT for HF_TOKEN).
API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1")
MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct")
HF_TOKEN = os.getenv("HF_TOKEN")
# Optional — only used when connecting via from_docker_image()
LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME")
# --------------------------------------------------------------------------
FORENSIC_BASE_URL = os.getenv("FORENSIC_BASE_URL") # optional: connect to a running server instead of Docker
BENCHMARK = os.getenv("FORENSIC_BENCHMARK", "forensic_shell")
MAX_STEPS_PER_TASK = 14
SUCCESS_THRESHOLD = 0.5
# Phase 2 grader rejects scores of exactly 0.0 or 1.0. The spec language
# "[0, 1]" is interpreted as the OPEN interval (0, 1) by the hackathon
# validator, so every reported score is clamped into (SCORE_FLOOR, SCORE_CEIL).
# We use 0.01 / 0.99 (not 0.001 / 0.999) so the .2f-rounded entries in the
# rewards=... field never collapse back to 0.00 or 1.00 either.
SCORE_FLOOR = 0.01
SCORE_CEIL = 0.99
TASK_IDS: List[str] = ["t1_login", "t2_modified", "t3_timeline"]
# Re-export parse_action for backward-compat with tests/test_parser.py that
# still imports it from this module by path.
from forensic_shell.agents.llm_policy import parse_action # noqa: E402,F401
# ---------------------------------------------------------------------------
# Structured stdout logging
# ---------------------------------------------------------------------------
def log_start(task: str, env: str, model: str) -> None:
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None:
error_val = error if error else "null"
done_val = "true" if done else "false"
action_oneline = action.replace("\n", " ").replace("\r", " ")
print(
f"[STEP] step={step} action={action_oneline} reward={reward:.2f} "
f"done={done_val} error={error_val}",
flush=True,
)
def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None:
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
success_val = "true" if success else "false"
print(
f"[END] success={success_val} steps={steps} score={score:.3f} rewards={rewards_str}",
flush=True,
)
# ---------------------------------------------------------------------------
# Episode driver
# ---------------------------------------------------------------------------
async def _drive_one_task(
policy: PolicyProtocol,
task_id: str,
image: str,
) -> None:
"""
Run one task episode with the exact lifecycle ordering the hackathon spec
requires:
log_start -> env up -> reset -> step loop (log_step ...)
-> env.close() -> log_end
[END] is ALWAYS emitted from the outermost finally, even if env bring-up,
the episode itself, or env teardown raises. Bring-up / episode / teardown
are each caught in their own try block with distinct labels so a container
shutdown timeout (cosmetic, happens after submit_report) is not mislabeled
as an episode failure.
"""
history: List[str] = []
rewards: List[float] = []
steps_taken = 0
score = 0.0
success = False
client: Optional[ForensicShellEnv] = None
entered = False
log_start(task=task_id, env=BENCHMARK, model=policy.name)
try:
# --- env bring-up --------------------------------------------------
try:
if FORENSIC_BASE_URL:
client = ForensicShellEnv(base_url=FORENSIC_BASE_URL)
else:
client = await ForensicShellEnv.from_docker_image(image)
await client.__aenter__()
entered = True
except Exception as e:
print(
f"[DEBUG] env bring-up failed for {task_id}: "
f"{type(e).__name__}: {e}",
flush=True,
)
client = None
entered = False
# --- episode -------------------------------------------------------
if client is not None and entered:
try:
result = await client.reset(task_id=task_id)
observation = result.observation
if not result.done:
for step in range(1, MAX_STEPS_PER_TASK + 1):
action = policy.act(
observation=observation,
history=history,
step=step,
)
result = await client.step(action)
observation = result.observation
reward = float(result.reward or 0.0)
done = bool(result.done)
rewards.append(reward)
steps_taken = step
log_step(
step=step,
action=action_to_str(action),
reward=reward,
done=done,
error=observation.action_error,
)
history.append(
f"step {step}: {action_to_str(action)} "
f"-> reward {reward:.2f} "
f"err={observation.action_error or 'none'}"
)
if done:
break
# Ensure we always have at least one reward entry, and clamp
# the terminal reward into the strict open interval (0, 1)
# required by the Phase 2 validator. The terminal reward IS
# the task score, so clamping both the list entry and the
# reported score keeps them consistent.
if not rewards:
rewards.append(SCORE_FLOOR)
rewards[-1] = max(SCORE_FLOOR, min(SCORE_CEIL, rewards[-1]))
score = rewards[-1]
success = score >= SUCCESS_THRESHOLD
except Exception as e:
print(
f"[DEBUG] episode error for {task_id}: "
f"{type(e).__name__}: {e}",
flush=True,
)
# --- env teardown (reached before log_end so [END] is AFTER close) -
if client is not None and entered:
try:
await client.__aexit__(None, None, None)
except Exception as e:
# Cosmetic: docker stop can exceed the 10s grace period, which
# raises TimeoutExpired even though the container terminated
# via SIGKILL as expected. This does NOT affect the score.
print(
f"[DEBUG] env teardown warning for {task_id} "
f"(non-fatal, score already captured): "
f"{type(e).__name__}: {e}",
flush=True,
)
finally:
# [END] always emitted, and always after env.close() (teardown runs above)
log_end(
success=success, steps=steps_taken, score=score, rewards=rewards
)
async def main(use_mock: bool) -> None:
if use_mock:
policy: PolicyProtocol = MockPolicy()
else:
if not HF_TOKEN:
print("[DEBUG] WARNING: HF_TOKEN not set; falling back to MockPolicy.", flush=True)
policy = MockPolicy()
else:
llm_client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN)
policy = LLMPolicy(client=llm_client, model=MODEL_NAME, name=MODEL_NAME)
# Resolve image name — the hackathon sample spec documents LOCAL_IMAGE_NAME
# while the sample code uses IMAGE_NAME. Accept either. Per the
# pre-submission checklist: NO default value — the grader must provide it
# (or FORENSIC_BASE_URL must be set to use the HTTP path instead).
image = LOCAL_IMAGE_NAME or os.getenv("IMAGE_NAME") or ""
if not FORENSIC_BASE_URL and not image:
raise RuntimeError(
"Neither LOCAL_IMAGE_NAME (or IMAGE_NAME) nor FORENSIC_BASE_URL is set. "
"Set LOCAL_IMAGE_NAME to the built docker image tag, or FORENSIC_BASE_URL "
"to a running OpenEnv server URL."
)
for task_id in TASK_IDS:
await _drive_one_task(policy, task_id, image)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--mock-policy",
action="store_true",
help="Use the rule-based MockPolicy (no LLM calls, no network).",
)
args = parser.parse_args()
asyncio.run(main(use_mock=args.mock_policy))