Spaces:
Sleeping
Sleeping
| """ | |
| Inference script for the ForensicShell OpenEnv environment. | |
| Runs a policy (LLM-backed by default, or heuristic via --mock-policy) through | |
| all three difficulty tiers of the ForensicShell environment and emits stdout | |
| logs in the mandatory hackathon format: | |
| [START] task=<task_name> env=<benchmark> model=<model_name> | |
| [STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null> | |
| [END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...> | |
| Required env vars (defaults ONLY for API_BASE_URL and MODEL_NAME): | |
| API_BASE_URL OpenAI-compatible LLM endpoint | |
| MODEL_NAME Model identifier | |
| HF_TOKEN API key for the LLM | |
| LOCAL_IMAGE_NAME Optional — Docker image when using from_docker_image() | |
| """ | |
| import argparse | |
| import asyncio | |
| import os | |
| from typing import List, Optional | |
| from openai import OpenAI | |
| from forensic_shell import ForensicShellAction, ForensicShellEnv | |
| from forensic_shell.agents.llm_policy import ( | |
| LLMPolicy, | |
| MockPolicy, | |
| PolicyProtocol, | |
| action_to_str, | |
| ) | |
| from forensic_shell.models import ForensicReport | |
| # --- Required environment variables (per hackathon spec) ----------------- | |
| # Defaults are set ONLY for API_BASE_URL and MODEL_NAME (NOT for HF_TOKEN). | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| # Optional — only used when connecting via from_docker_image() | |
| LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") | |
| # -------------------------------------------------------------------------- | |
| FORENSIC_BASE_URL = os.getenv("FORENSIC_BASE_URL") # optional: connect to a running server instead of Docker | |
| BENCHMARK = os.getenv("FORENSIC_BENCHMARK", "forensic_shell") | |
| MAX_STEPS_PER_TASK = 14 | |
| SUCCESS_THRESHOLD = 0.5 | |
| # Phase 2 grader rejects scores of exactly 0.0 or 1.0. The spec language | |
| # "[0, 1]" is interpreted as the OPEN interval (0, 1) by the hackathon | |
| # validator, so every reported score is clamped into (SCORE_FLOOR, SCORE_CEIL). | |
| # We use 0.01 / 0.99 (not 0.001 / 0.999) so the .2f-rounded entries in the | |
| # rewards=... field never collapse back to 0.00 or 1.00 either. | |
| SCORE_FLOOR = 0.01 | |
| SCORE_CEIL = 0.99 | |
| TASK_IDS: List[str] = ["t1_login", "t2_modified", "t3_timeline"] | |
| # Re-export parse_action for backward-compat with tests/test_parser.py that | |
| # still imports it from this module by path. | |
| from forensic_shell.agents.llm_policy import parse_action # noqa: E402,F401 | |
| # --------------------------------------------------------------------------- | |
| # Structured stdout logging | |
| # --------------------------------------------------------------------------- | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: | |
| error_val = error if error else "null" | |
| done_val = "true" if done else "false" | |
| action_oneline = action.replace("\n", " ").replace("\r", " ") | |
| print( | |
| f"[STEP] step={step} action={action_oneline} reward={reward:.2f} " | |
| f"done={done_val} error={error_val}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| success_val = "true" if success else "false" | |
| print( | |
| f"[END] success={success_val} steps={steps} score={score:.3f} rewards={rewards_str}", | |
| flush=True, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Episode driver | |
| # --------------------------------------------------------------------------- | |
| async def _drive_one_task( | |
| policy: PolicyProtocol, | |
| task_id: str, | |
| image: str, | |
| ) -> None: | |
| """ | |
| Run one task episode with the exact lifecycle ordering the hackathon spec | |
| requires: | |
| log_start -> env up -> reset -> step loop (log_step ...) | |
| -> env.close() -> log_end | |
| [END] is ALWAYS emitted from the outermost finally, even if env bring-up, | |
| the episode itself, or env teardown raises. Bring-up / episode / teardown | |
| are each caught in their own try block with distinct labels so a container | |
| shutdown timeout (cosmetic, happens after submit_report) is not mislabeled | |
| as an episode failure. | |
| """ | |
| history: List[str] = [] | |
| rewards: List[float] = [] | |
| steps_taken = 0 | |
| score = 0.0 | |
| success = False | |
| client: Optional[ForensicShellEnv] = None | |
| entered = False | |
| log_start(task=task_id, env=BENCHMARK, model=policy.name) | |
| try: | |
| # --- env bring-up -------------------------------------------------- | |
| try: | |
| if FORENSIC_BASE_URL: | |
| client = ForensicShellEnv(base_url=FORENSIC_BASE_URL) | |
| else: | |
| client = await ForensicShellEnv.from_docker_image(image) | |
| await client.__aenter__() | |
| entered = True | |
| except Exception as e: | |
| print( | |
| f"[DEBUG] env bring-up failed for {task_id}: " | |
| f"{type(e).__name__}: {e}", | |
| flush=True, | |
| ) | |
| client = None | |
| entered = False | |
| # --- episode ------------------------------------------------------- | |
| if client is not None and entered: | |
| try: | |
| result = await client.reset(task_id=task_id) | |
| observation = result.observation | |
| if not result.done: | |
| for step in range(1, MAX_STEPS_PER_TASK + 1): | |
| action = policy.act( | |
| observation=observation, | |
| history=history, | |
| step=step, | |
| ) | |
| result = await client.step(action) | |
| observation = result.observation | |
| reward = float(result.reward or 0.0) | |
| done = bool(result.done) | |
| rewards.append(reward) | |
| steps_taken = step | |
| log_step( | |
| step=step, | |
| action=action_to_str(action), | |
| reward=reward, | |
| done=done, | |
| error=observation.action_error, | |
| ) | |
| history.append( | |
| f"step {step}: {action_to_str(action)} " | |
| f"-> reward {reward:.2f} " | |
| f"err={observation.action_error or 'none'}" | |
| ) | |
| if done: | |
| break | |
| # Ensure we always have at least one reward entry, and clamp | |
| # the terminal reward into the strict open interval (0, 1) | |
| # required by the Phase 2 validator. The terminal reward IS | |
| # the task score, so clamping both the list entry and the | |
| # reported score keeps them consistent. | |
| if not rewards: | |
| rewards.append(SCORE_FLOOR) | |
| rewards[-1] = max(SCORE_FLOOR, min(SCORE_CEIL, rewards[-1])) | |
| score = rewards[-1] | |
| success = score >= SUCCESS_THRESHOLD | |
| except Exception as e: | |
| print( | |
| f"[DEBUG] episode error for {task_id}: " | |
| f"{type(e).__name__}: {e}", | |
| flush=True, | |
| ) | |
| # --- env teardown (reached before log_end so [END] is AFTER close) - | |
| if client is not None and entered: | |
| try: | |
| await client.__aexit__(None, None, None) | |
| except Exception as e: | |
| # Cosmetic: docker stop can exceed the 10s grace period, which | |
| # raises TimeoutExpired even though the container terminated | |
| # via SIGKILL as expected. This does NOT affect the score. | |
| print( | |
| f"[DEBUG] env teardown warning for {task_id} " | |
| f"(non-fatal, score already captured): " | |
| f"{type(e).__name__}: {e}", | |
| flush=True, | |
| ) | |
| finally: | |
| # [END] always emitted, and always after env.close() (teardown runs above) | |
| log_end( | |
| success=success, steps=steps_taken, score=score, rewards=rewards | |
| ) | |
| async def main(use_mock: bool) -> None: | |
| if use_mock: | |
| policy: PolicyProtocol = MockPolicy() | |
| else: | |
| if not HF_TOKEN: | |
| print("[DEBUG] WARNING: HF_TOKEN not set; falling back to MockPolicy.", flush=True) | |
| policy = MockPolicy() | |
| else: | |
| llm_client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN) | |
| policy = LLMPolicy(client=llm_client, model=MODEL_NAME, name=MODEL_NAME) | |
| # Resolve image name — the hackathon sample spec documents LOCAL_IMAGE_NAME | |
| # while the sample code uses IMAGE_NAME. Accept either. Per the | |
| # pre-submission checklist: NO default value — the grader must provide it | |
| # (or FORENSIC_BASE_URL must be set to use the HTTP path instead). | |
| image = LOCAL_IMAGE_NAME or os.getenv("IMAGE_NAME") or "" | |
| if not FORENSIC_BASE_URL and not image: | |
| raise RuntimeError( | |
| "Neither LOCAL_IMAGE_NAME (or IMAGE_NAME) nor FORENSIC_BASE_URL is set. " | |
| "Set LOCAL_IMAGE_NAME to the built docker image tag, or FORENSIC_BASE_URL " | |
| "to a running OpenEnv server URL." | |
| ) | |
| for task_id in TASK_IDS: | |
| await _drive_one_task(policy, task_id, image) | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument( | |
| "--mock-policy", | |
| action="store_true", | |
| help="Use the rule-based MockPolicy (no LLM calls, no network).", | |
| ) | |
| args = parser.parse_args() | |
| asyncio.run(main(use_mock=args.mock_policy)) | |