Spaces:
Sleeping
Sleeping
File size: 10,010 Bytes
8c6d68f 62567eb 8c6d68f 62567eb 8c6d68f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 | """
Inference script for the ForensicShell OpenEnv environment.
Runs a policy (LLM-backed by default, or heuristic via --mock-policy) through
all three difficulty tiers of the ForensicShell environment and emits stdout
logs in the mandatory hackathon format:
[START] task=<task_name> env=<benchmark> model=<model_name>
[STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null>
[END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...>
Required env vars (defaults ONLY for API_BASE_URL and MODEL_NAME):
API_BASE_URL OpenAI-compatible LLM endpoint
MODEL_NAME Model identifier
HF_TOKEN API key for the LLM
LOCAL_IMAGE_NAME Optional — Docker image when using from_docker_image()
"""
import argparse
import asyncio
import os
from typing import List, Optional
from openai import OpenAI
from forensic_shell import ForensicShellAction, ForensicShellEnv
from forensic_shell.agents.llm_policy import (
LLMPolicy,
MockPolicy,
PolicyProtocol,
action_to_str,
)
from forensic_shell.models import ForensicReport
# --- Required environment variables (per hackathon spec) -----------------
# Defaults are set ONLY for API_BASE_URL and MODEL_NAME (NOT for HF_TOKEN).
API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1")
MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct")
HF_TOKEN = os.getenv("HF_TOKEN")
# Optional — only used when connecting via from_docker_image()
LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME")
# --------------------------------------------------------------------------
FORENSIC_BASE_URL = os.getenv("FORENSIC_BASE_URL") # optional: connect to a running server instead of Docker
BENCHMARK = os.getenv("FORENSIC_BENCHMARK", "forensic_shell")
MAX_STEPS_PER_TASK = 14
SUCCESS_THRESHOLD = 0.5
# Phase 2 grader rejects scores of exactly 0.0 or 1.0. The spec language
# "[0, 1]" is interpreted as the OPEN interval (0, 1) by the hackathon
# validator, so every reported score is clamped into (SCORE_FLOOR, SCORE_CEIL).
# We use 0.01 / 0.99 (not 0.001 / 0.999) so the .2f-rounded entries in the
# rewards=... field never collapse back to 0.00 or 1.00 either.
SCORE_FLOOR = 0.01
SCORE_CEIL = 0.99
TASK_IDS: List[str] = ["t1_login", "t2_modified", "t3_timeline"]
# Re-export parse_action for backward-compat with tests/test_parser.py that
# still imports it from this module by path.
from forensic_shell.agents.llm_policy import parse_action # noqa: E402,F401
# ---------------------------------------------------------------------------
# Structured stdout logging
# ---------------------------------------------------------------------------
def log_start(task: str, env: str, model: str) -> None:
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None:
error_val = error if error else "null"
done_val = "true" if done else "false"
action_oneline = action.replace("\n", " ").replace("\r", " ")
print(
f"[STEP] step={step} action={action_oneline} reward={reward:.2f} "
f"done={done_val} error={error_val}",
flush=True,
)
def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None:
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
success_val = "true" if success else "false"
print(
f"[END] success={success_val} steps={steps} score={score:.3f} rewards={rewards_str}",
flush=True,
)
# ---------------------------------------------------------------------------
# Episode driver
# ---------------------------------------------------------------------------
async def _drive_one_task(
policy: PolicyProtocol,
task_id: str,
image: str,
) -> None:
"""
Run one task episode with the exact lifecycle ordering the hackathon spec
requires:
log_start -> env up -> reset -> step loop (log_step ...)
-> env.close() -> log_end
[END] is ALWAYS emitted from the outermost finally, even if env bring-up,
the episode itself, or env teardown raises. Bring-up / episode / teardown
are each caught in their own try block with distinct labels so a container
shutdown timeout (cosmetic, happens after submit_report) is not mislabeled
as an episode failure.
"""
history: List[str] = []
rewards: List[float] = []
steps_taken = 0
score = 0.0
success = False
client: Optional[ForensicShellEnv] = None
entered = False
log_start(task=task_id, env=BENCHMARK, model=policy.name)
try:
# --- env bring-up --------------------------------------------------
try:
if FORENSIC_BASE_URL:
client = ForensicShellEnv(base_url=FORENSIC_BASE_URL)
else:
client = await ForensicShellEnv.from_docker_image(image)
await client.__aenter__()
entered = True
except Exception as e:
print(
f"[DEBUG] env bring-up failed for {task_id}: "
f"{type(e).__name__}: {e}",
flush=True,
)
client = None
entered = False
# --- episode -------------------------------------------------------
if client is not None and entered:
try:
result = await client.reset(task_id=task_id)
observation = result.observation
if not result.done:
for step in range(1, MAX_STEPS_PER_TASK + 1):
action = policy.act(
observation=observation,
history=history,
step=step,
)
result = await client.step(action)
observation = result.observation
reward = float(result.reward or 0.0)
done = bool(result.done)
rewards.append(reward)
steps_taken = step
log_step(
step=step,
action=action_to_str(action),
reward=reward,
done=done,
error=observation.action_error,
)
history.append(
f"step {step}: {action_to_str(action)} "
f"-> reward {reward:.2f} "
f"err={observation.action_error or 'none'}"
)
if done:
break
# Ensure we always have at least one reward entry, and clamp
# the terminal reward into the strict open interval (0, 1)
# required by the Phase 2 validator. The terminal reward IS
# the task score, so clamping both the list entry and the
# reported score keeps them consistent.
if not rewards:
rewards.append(SCORE_FLOOR)
rewards[-1] = max(SCORE_FLOOR, min(SCORE_CEIL, rewards[-1]))
score = rewards[-1]
success = score >= SUCCESS_THRESHOLD
except Exception as e:
print(
f"[DEBUG] episode error for {task_id}: "
f"{type(e).__name__}: {e}",
flush=True,
)
# --- env teardown (reached before log_end so [END] is AFTER close) -
if client is not None and entered:
try:
await client.__aexit__(None, None, None)
except Exception as e:
# Cosmetic: docker stop can exceed the 10s grace period, which
# raises TimeoutExpired even though the container terminated
# via SIGKILL as expected. This does NOT affect the score.
print(
f"[DEBUG] env teardown warning for {task_id} "
f"(non-fatal, score already captured): "
f"{type(e).__name__}: {e}",
flush=True,
)
finally:
# [END] always emitted, and always after env.close() (teardown runs above)
log_end(
success=success, steps=steps_taken, score=score, rewards=rewards
)
async def main(use_mock: bool) -> None:
if use_mock:
policy: PolicyProtocol = MockPolicy()
else:
if not HF_TOKEN:
print("[DEBUG] WARNING: HF_TOKEN not set; falling back to MockPolicy.", flush=True)
policy = MockPolicy()
else:
llm_client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN)
policy = LLMPolicy(client=llm_client, model=MODEL_NAME, name=MODEL_NAME)
# Resolve image name — the hackathon sample spec documents LOCAL_IMAGE_NAME
# while the sample code uses IMAGE_NAME. Accept either. Per the
# pre-submission checklist: NO default value — the grader must provide it
# (or FORENSIC_BASE_URL must be set to use the HTTP path instead).
image = LOCAL_IMAGE_NAME or os.getenv("IMAGE_NAME") or ""
if not FORENSIC_BASE_URL and not image:
raise RuntimeError(
"Neither LOCAL_IMAGE_NAME (or IMAGE_NAME) nor FORENSIC_BASE_URL is set. "
"Set LOCAL_IMAGE_NAME to the built docker image tag, or FORENSIC_BASE_URL "
"to a running OpenEnv server URL."
)
for task_id in TASK_IDS:
await _drive_one_task(policy, task_id, image)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--mock-policy",
action="store_true",
help="Use the rule-based MockPolicy (no LLM calls, no network).",
)
args = parser.parse_args()
asyncio.run(main(use_mock=args.mock_policy))
|