""" Inference script for the ForensicShell OpenEnv environment. Runs a policy (LLM-backed by default, or heuristic via --mock-policy) through all three difficulty tiers of the ForensicShell environment and emits stdout logs in the mandatory hackathon format: [START] task= env= model= [STEP] step= action= reward=<0.00> done= error= [END] success= steps= score= rewards= Required env vars (defaults ONLY for API_BASE_URL and MODEL_NAME): API_BASE_URL OpenAI-compatible LLM endpoint MODEL_NAME Model identifier HF_TOKEN API key for the LLM LOCAL_IMAGE_NAME Optional — Docker image when using from_docker_image() """ import argparse import asyncio import os from typing import List, Optional from openai import OpenAI from forensic_shell import ForensicShellAction, ForensicShellEnv from forensic_shell.agents.llm_policy import ( LLMPolicy, MockPolicy, PolicyProtocol, action_to_str, ) from forensic_shell.models import ForensicReport # --- Required environment variables (per hackathon spec) ----------------- # Defaults are set ONLY for API_BASE_URL and MODEL_NAME (NOT for HF_TOKEN). API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") HF_TOKEN = os.getenv("HF_TOKEN") # Optional — only used when connecting via from_docker_image() LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") # -------------------------------------------------------------------------- FORENSIC_BASE_URL = os.getenv("FORENSIC_BASE_URL") # optional: connect to a running server instead of Docker BENCHMARK = os.getenv("FORENSIC_BENCHMARK", "forensic_shell") MAX_STEPS_PER_TASK = 14 SUCCESS_THRESHOLD = 0.5 # Phase 2 grader rejects scores of exactly 0.0 or 1.0. The spec language # "[0, 1]" is interpreted as the OPEN interval (0, 1) by the hackathon # validator, so every reported score is clamped into (SCORE_FLOOR, SCORE_CEIL). # We use 0.01 / 0.99 (not 0.001 / 0.999) so the .2f-rounded entries in the # rewards=... field never collapse back to 0.00 or 1.00 either. SCORE_FLOOR = 0.01 SCORE_CEIL = 0.99 TASK_IDS: List[str] = ["t1_login", "t2_modified", "t3_timeline"] # Re-export parse_action for backward-compat with tests/test_parser.py that # still imports it from this module by path. from forensic_shell.agents.llm_policy import parse_action # noqa: E402,F401 # --------------------------------------------------------------------------- # Structured stdout logging # --------------------------------------------------------------------------- def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: error_val = error if error else "null" done_val = "true" if done else "false" action_oneline = action.replace("\n", " ").replace("\r", " ") print( f"[STEP] step={step} action={action_oneline} reward={reward:.2f} " f"done={done_val} error={error_val}", flush=True, ) def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: rewards_str = ",".join(f"{r:.2f}" for r in rewards) success_val = "true" if success else "false" print( f"[END] success={success_val} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True, ) # --------------------------------------------------------------------------- # Episode driver # --------------------------------------------------------------------------- async def _drive_one_task( policy: PolicyProtocol, task_id: str, image: str, ) -> None: """ Run one task episode with the exact lifecycle ordering the hackathon spec requires: log_start -> env up -> reset -> step loop (log_step ...) -> env.close() -> log_end [END] is ALWAYS emitted from the outermost finally, even if env bring-up, the episode itself, or env teardown raises. Bring-up / episode / teardown are each caught in their own try block with distinct labels so a container shutdown timeout (cosmetic, happens after submit_report) is not mislabeled as an episode failure. """ history: List[str] = [] rewards: List[float] = [] steps_taken = 0 score = 0.0 success = False client: Optional[ForensicShellEnv] = None entered = False log_start(task=task_id, env=BENCHMARK, model=policy.name) try: # --- env bring-up -------------------------------------------------- try: if FORENSIC_BASE_URL: client = ForensicShellEnv(base_url=FORENSIC_BASE_URL) else: client = await ForensicShellEnv.from_docker_image(image) await client.__aenter__() entered = True except Exception as e: print( f"[DEBUG] env bring-up failed for {task_id}: " f"{type(e).__name__}: {e}", flush=True, ) client = None entered = False # --- episode ------------------------------------------------------- if client is not None and entered: try: result = await client.reset(task_id=task_id) observation = result.observation if not result.done: for step in range(1, MAX_STEPS_PER_TASK + 1): action = policy.act( observation=observation, history=history, step=step, ) result = await client.step(action) observation = result.observation reward = float(result.reward or 0.0) done = bool(result.done) rewards.append(reward) steps_taken = step log_step( step=step, action=action_to_str(action), reward=reward, done=done, error=observation.action_error, ) history.append( f"step {step}: {action_to_str(action)} " f"-> reward {reward:.2f} " f"err={observation.action_error or 'none'}" ) if done: break # Ensure we always have at least one reward entry, and clamp # the terminal reward into the strict open interval (0, 1) # required by the Phase 2 validator. The terminal reward IS # the task score, so clamping both the list entry and the # reported score keeps them consistent. if not rewards: rewards.append(SCORE_FLOOR) rewards[-1] = max(SCORE_FLOOR, min(SCORE_CEIL, rewards[-1])) score = rewards[-1] success = score >= SUCCESS_THRESHOLD except Exception as e: print( f"[DEBUG] episode error for {task_id}: " f"{type(e).__name__}: {e}", flush=True, ) # --- env teardown (reached before log_end so [END] is AFTER close) - if client is not None and entered: try: await client.__aexit__(None, None, None) except Exception as e: # Cosmetic: docker stop can exceed the 10s grace period, which # raises TimeoutExpired even though the container terminated # via SIGKILL as expected. This does NOT affect the score. print( f"[DEBUG] env teardown warning for {task_id} " f"(non-fatal, score already captured): " f"{type(e).__name__}: {e}", flush=True, ) finally: # [END] always emitted, and always after env.close() (teardown runs above) log_end( success=success, steps=steps_taken, score=score, rewards=rewards ) async def main(use_mock: bool) -> None: if use_mock: policy: PolicyProtocol = MockPolicy() else: if not HF_TOKEN: print("[DEBUG] WARNING: HF_TOKEN not set; falling back to MockPolicy.", flush=True) policy = MockPolicy() else: llm_client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN) policy = LLMPolicy(client=llm_client, model=MODEL_NAME, name=MODEL_NAME) # Resolve image name — the hackathon sample spec documents LOCAL_IMAGE_NAME # while the sample code uses IMAGE_NAME. Accept either. Per the # pre-submission checklist: NO default value — the grader must provide it # (or FORENSIC_BASE_URL must be set to use the HTTP path instead). image = LOCAL_IMAGE_NAME or os.getenv("IMAGE_NAME") or "" if not FORENSIC_BASE_URL and not image: raise RuntimeError( "Neither LOCAL_IMAGE_NAME (or IMAGE_NAME) nor FORENSIC_BASE_URL is set. " "Set LOCAL_IMAGE_NAME to the built docker image tag, or FORENSIC_BASE_URL " "to a running OpenEnv server URL." ) for task_id in TASK_IDS: await _drive_one_task(policy, task_id, image) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "--mock-policy", action="store_true", help="Use the rule-based MockPolicy (no LLM calls, no network).", ) args = parser.parse_args() asyncio.run(main(use_mock=args.mock_policy))