visual_memory / run_eval.py
kdemon1011's picture
Upload folder using huggingface_hub
599c9bd verified
#!/usr/bin/env python3
"""
Evaluation Runner — run an LLM agent against Visual Memory gym scenarios.
Single-gym version of the repo-level run_eval.py, tailored for the
visual_memory environment. No --gym flag needed.
Usage:
# Single model (backward compatible)
python run_eval.py --model gpt-5.4 --save --trajectory
# Multiple models in parallel
python run_eval.py --model gpt-5.4,claude-sonnet-4-6 --parallel-models 3 --save --trajectory
# Specific scenario
python run_eval.py --model gpt-5.4 --scenario directional_trap_8x8
# pass@k evaluation (run each scenario 10 times, report pass@1, pass@3, pass@8)
python run_eval.py --model gpt-5.4 --num-samples 10 --pass-k 1,3,8 --save
# Parallel scenarios (run 4 scenarios concurrently per model)
python run_eval.py --model gpt-5.4 --parallel-scenarios 4 --save
# Resume interrupted run
python run_eval.py --model gpt-5.4 --run-id my_run --resume --save --trajectory
# ATIF trajectory format (Harbor/Terminus-2 standard)
python run_eval.py --model gpt-5.4 --trajectory --trajectory-format atif
Prerequisites:
1. pip install -e .
2. docker build -t openenv-visual-memory -f server/Dockerfile .
3. docker run -d --name visual-memory -p 8000:8000 openenv-visual-memory
"""
import argparse
import json
import logging
import os
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone, timedelta
from typing import Any, Dict, List, Optional, Set, Tuple
import numpy as np
IST = timezone(timedelta(hours=5, minutes=30))
from dotenv import load_dotenv
load_dotenv(os.path.join(os.path.dirname(__file__), ".env"))
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from openenv import AutoEnv
from agent.runner import AgentRunner
from rewards.base import RewardBreakdown
from rewards.checks import VisualMemoryChecker
from rewards.transforms import VisualMemoryStepTransform
from scenarios.definitions import VISUAL_MEMORY_SCENARIOS
logger = logging.getLogger(__name__)
GYM_NAME = "visual_memory"
OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "outputs")
# ── Helpers ──
def _resolve_base_url() -> str:
import importlib.resources
import yaml
try:
ref = importlib.resources.files(GYM_NAME).joinpath("openenv.yaml")
with importlib.resources.as_file(ref) as f:
manifest = yaml.safe_load(f.read_text())
port = manifest.get("port", 8000)
return f"http://localhost:{port}"
except Exception:
logger.warning("Could not read openenv.yaml, defaulting to port 8000")
return "http://localhost:8000"
def _fetch_gym_metadata(base_url: str) -> dict | None:
import httpx
try:
resp = httpx.get(f"{base_url}/metadata", timeout=5.0)
resp.raise_for_status()
data = resp.json()
data.pop("readme_content", None)
return data
except Exception as e:
logger.debug(f"Failed to fetch /metadata from {base_url}: {e}")
return None
def divider(text: str = ""):
print(f"\n{'=' * 70}")
if text:
print(f" {text}")
print(f"{'=' * 70}")
def print_breakdown(breakdown: RewardBreakdown):
print(breakdown.summary())
print()
print(f" Details: {breakdown.details}")
def _check_label(check: dict) -> str:
for key in ("min_score", "min_pct", "max_hits"):
if key in check and key != "type":
return str(check[key])
return check.get("type", "?")
def _short_json(obj, max_len=80):
s = json.dumps(obj, default=str)
return s if len(s) <= max_len else s[:max_len] + "..."
# ── pass@k Estimator ──
def pass_at_k_estimator(n: int, c: int, k: int) -> float:
"""
Unbiased estimator of pass@k (Chen et al., 2021 — HumanEval).
n = total samples, c = correct samples, k = subset size.
Returns P(at least 1 correct in a random k-subset of n runs).
"""
if n - c < k:
return 1.0
return 1.0 - np.prod(1.0 - k / np.arange(n - c + 1, n + 1))
# ── Checkpoint (Resume) ──
def _load_checkpoint(run_id: str, model: str) -> Tuple[Set[str], List[Dict]]:
"""Load completed scenario IDs and their results from a checkpoint file."""
safe_model = model.replace("/", "_").replace(":", "_")
checkpoint_path = os.path.join(
OUTPUT_DIR, "trajectories", run_id, f"{safe_model}.checkpoint.json"
)
final_path = os.path.join(
OUTPUT_DIR, "trajectories", run_id, f"{safe_model}.json"
)
for path in [checkpoint_path, final_path]:
if not os.path.exists(path):
continue
try:
with open(path) as f:
data = json.load(f)
completed = set()
prior_results = []
for s in data.get("scenarios", []):
sid = s.get("scenario_id")
reward = s.get("reward")
if sid and reward is not None:
completed.add(sid)
prior_results.append({
"scenario": sid,
"total_reward": reward["total"],
"breakdown": RewardBreakdown(
structural=reward["structural"],
ground_truth=reward["ground_truth"],
efficiency=reward["efficiency"],
penalty=reward["penalty"],
total=reward["total"],
),
"steps": s.get("total_steps", 0),
"elapsed": s.get("elapsed_s", 0),
"from_checkpoint": True,
})
print(f" [{model}] Checkpoint loaded: {len(completed)} scenarios completed")
return completed, prior_results
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Could not parse checkpoint {path}: {e}")
return set(), []
def _build_scenario_entry(r: Dict, scenario) -> Dict:
"""Build a trajectory scenario entry from a result dict."""
entry = {
"scenario_id": r.get("scenario", getattr(scenario, "id", "unknown")),
"elapsed_s": round(r.get("elapsed", 0), 2),
}
if scenario:
entry["prompt"] = scenario.prompt
entry["expected_tools"] = scenario.expected_tools
entry["max_steps"] = scenario.max_steps
episode = r.get("episode")
if episode:
steps = []
for i, step in enumerate(episode.steps, 1):
result_data = step.result
if isinstance(result_data, str):
try:
result_data = json.loads(result_data)
except (json.JSONDecodeError, TypeError):
pass
steps.append({
"step": i,
"timestamp": step.timestamp,
"tool_name": step.tool_name,
"arguments": step.arguments,
"success": step.success,
"result": result_data,
"error": step.error,
"elapsed_s": round(step.elapsed, 3),
})
entry["steps"] = steps
entry["total_steps"] = len(steps)
else:
entry["steps"] = []
entry["total_steps"] = r.get("steps", 0)
if r.get("error"):
entry["error"] = r["error"]
outcome_results = r.get("outcome_results", [])
if outcome_results and scenario:
checks = []
for check_def, passed in zip(scenario.outcome_checks, outcome_results):
checks.append({"check": check_def, "passed": passed})
entry["outcome_checks"] = checks
bd = r.get("breakdown")
if bd:
entry["reward"] = {
"structural": round(bd.structural, 4),
"ground_truth": round(bd.ground_truth, 4),
"efficiency": round(bd.efficiency, 4),
"penalty": round(bd.penalty, 4),
"total": round(bd.total, 4),
}
else:
entry["reward"] = None
# pass@k multi-sample data
if "samples" in r:
entry["num_samples"] = r.get("n", 1)
entry["correct_count"] = r.get("c", 0)
entry["pass_at_k"] = r.get("pass_at_k", {})
entry["samples"] = []
for sample in r["samples"]:
sample_entry = {
"sample_idx": sample.get("sample_idx", 0),
"success": sample.get("success", False),
"total_reward": sample.get("total_reward", 0),
"steps": sample.get("steps", 0),
"elapsed": round(sample.get("elapsed", 0), 2),
}
if sample.get("error"):
sample_entry["error"] = sample["error"]
entry["samples"].append(sample_entry)
return entry
def _save_checkpoint(
run_id: str,
model: str,
all_results: List[Dict],
scenarios: list,
temperature: float,
reward_mode: str,
gym_version: str,
):
"""Incrementally save checkpoint after each scenario."""
safe_model = model.replace("/", "_").replace(":", "_")
traj_dir = os.path.join(OUTPUT_DIR, "trajectories", run_id)
os.makedirs(traj_dir, exist_ok=True)
checkpoint_path = os.path.join(traj_dir, f"{safe_model}.checkpoint.json")
scenario_map = {s.id: s for s in scenarios}
checkpoint = {
"run_id": run_id,
"model": model,
"gym": GYM_NAME,
"gym_version": gym_version,
"timestamp": datetime.now(IST).isoformat(),
"temperature": temperature,
"reward_mode": reward_mode,
"total_scenarios": len(all_results),
"scenarios": [],
}
for r in all_results:
sid = r.get("scenario")
if r.get("from_checkpoint"):
checkpoint["scenarios"].append({
"scenario_id": sid,
"elapsed_s": r.get("elapsed", 0),
"total_steps": r.get("steps", 0),
"reward": {
"structural": r["breakdown"].structural,
"ground_truth": r["breakdown"].ground_truth,
"efficiency": r["breakdown"].efficiency,
"penalty": r["breakdown"].penalty,
"total": r["breakdown"].total,
} if r.get("breakdown") else None,
})
else:
entry = _build_scenario_entry(r, scenario_map.get(sid))
checkpoint["scenarios"].append(entry)
with open(checkpoint_path, "w") as f:
json.dump(checkpoint, f, indent=2, default=str)
# ── Results & Trajectory Saving ──
def save_results_to_markdown(
results: List[Dict[str, Any]],
model: str,
output_path: str,
total_elapsed: float,
temperature: float,
run_id: str = "",
reward_mode: str = "custom",
gym_version: str = "unknown",
num_samples: int = 1,
pass_k_values: Optional[List[int]] = None,
):
os.makedirs(os.path.dirname(output_path), exist_ok=True)
timestamp = datetime.now(IST).strftime("%Y-%m-%d %H:%M:%S")
is_new_file = not os.path.exists(output_path)
is_passk = num_samples > 1 and pass_k_values
with open(output_path, "a") as f:
if is_new_file:
f.write(f"# Visual Memory Gym — Evaluation Results\n\n")
f.write(f"**Run ID**: `{run_id}` \n")
f.write(f"**Gym Version**: `{gym_version}`\n\n")
f.write(f"Evaluation results for the **visual_memory** gym across different LLM models.\n\n")
if is_passk:
f.write(f"**Mode**: pass@k (n={num_samples}, k={pass_k_values})\n\n")
if reward_mode == "openenv":
f.write(f"**Reward Mode**: `openenv` — per-step rewards from `rewards/transforms.py` + ground truth\n\n")
else:
f.write(f"**Reward Mode**: `custom` — episode-level rewards from `rewards/base.py`\n\n")
f.write(f"Trajectories: `outputs/trajectories/{run_id}/`\n\n")
f.write(f"---\n\n")
safe_model = model.replace("/", "_").replace(":", "_")
f.write(f"## Model: `{model}`\n\n")
f.write(f"- **Date**: {timestamp}\n")
f.write(f"- **Temperature**: {temperature}\n")
f.write(f"- **Reward Mode**: {reward_mode}\n")
if is_passk:
f.write(f"- **Samples per scenario**: {num_samples}\n")
f.write(f"- **Total Time**: {total_elapsed:.1f}s\n")
f.write(f"- **Trajectory**: `outputs/trajectories/{run_id}/{safe_model}.json`\n\n")
if is_passk:
k_headers = " | ".join(f"pass@{k}" for k in pass_k_values)
f.write(f"| Scenario | n | c | {k_headers} | Best Reward | Avg Steps |\n")
k_divs = " | ".join(":---:" for _ in pass_k_values)
f.write(f"|---|:---:|:---:|{k_divs}|:---:|:---:|\n")
for r in results:
n = r.get("n", 1)
c = r.get("c", 0)
pass_at = r.get("pass_at_k", {})
k_vals = " | ".join(f"{pass_at.get(str(k), 0.0):.2f}" for k in pass_k_values)
best = r.get("best_reward", 0.0)
avg_steps = r.get("avg_steps", 0)
f.write(
f"| {r['scenario']} "
f"| {n} | {c} | {k_vals} "
f"| {best:.2f} | {avg_steps:.1f} |\n"
)
all_pass1 = [r.get("pass_at_k", {}).get("1", 0.0) for r in results]
avg_pass1 = sum(all_pass1) / len(all_pass1) if all_pass1 else 0.0
f.write(f"\n**Average pass@1: {avg_pass1:.2f}**\n\n")
else:
if reward_mode == "openenv":
f.write(f"| Scenario | Quality | Ground Truth | Penalty | **Total** | Steps | Time |\n")
f.write(f"|---|:---:|:---:|:---:|:---:|:---:|:---:|\n")
else:
f.write(f"| Scenario | Structural | Ground Truth | Efficiency | Penalty | **Total** | Steps | Time |\n")
f.write(f"|---|:---:|:---:|:---:|:---:|:---:|:---:|:---:|\n")
total_reward = 0.0
for r in results:
bd = r.get("breakdown")
if bd:
if reward_mode == "openenv":
f.write(
f"| {r['scenario']} "
f"| {bd.structural:.2f} "
f"| {bd.ground_truth:.2f} "
f"| {bd.penalty:.2f} "
f"| **{bd.total:.2f}** "
f"| {r['steps']} "
f"| {r['elapsed']:.1f}s |\n"
)
else:
f.write(
f"| {r['scenario']} "
f"| {bd.structural:.2f} "
f"| {bd.ground_truth:.2f} "
f"| {bd.efficiency:.2f} "
f"| {bd.penalty:.2f} "
f"| **{bd.total:.2f}** "
f"| {r['steps']} "
f"| {r['elapsed']:.1f}s |\n"
)
total_reward += bd.total
else:
cols = "| — | — | — " if reward_mode == "openenv" else "| — | — | — | — "
f.write(
f"| {r['scenario']} "
f"{cols}"
f"| **ERROR** "
f"| {r['steps']} "
f"| {r['elapsed']:.1f}s |\n"
)
avg = total_reward / len(results) if results else 0.0
f.write(f"\n**Average Reward: {avg:.2f}**\n\n")
f.write(f"---\n\n")
logger.info(f"Results saved to {output_path}")
def save_trajectory(
results: List[Dict[str, Any]],
scenarios: list,
model: str,
temperature: float,
total_elapsed: float,
run_id: str = "",
reward_mode: str = "custom",
gym_version: str = "unknown",
num_samples: int = 1,
pass_k_values: Optional[List[int]] = None,
):
run_ts = datetime.now(IST).isoformat()
safe_model = model.replace("/", "_").replace(":", "_")
filename = f"{safe_model}.json"
traj_dir = os.path.join(OUTPUT_DIR, "trajectories", run_id)
os.makedirs(traj_dir, exist_ok=True)
filepath = os.path.join(traj_dir, filename)
scenario_map = {s.id: s for s in scenarios}
trajectory = {
"run_id": run_id or "untagged",
"model": model,
"gym": GYM_NAME,
"gym_version": gym_version,
"timestamp": run_ts,
"temperature": temperature,
"reward_mode": reward_mode,
"total_elapsed_s": round(total_elapsed, 2),
"total_scenarios": len(results),
}
if num_samples > 1:
trajectory["num_samples"] = num_samples
trajectory["pass_k_values"] = pass_k_values or [1]
trajectory["scenarios"] = []
for r in results:
sid = r.get("scenario")
scenario = scenario_map.get(sid)
if r.get("from_checkpoint"):
trajectory["scenarios"].append({
"scenario_id": sid,
"elapsed_s": r.get("elapsed", 0),
"total_steps": r.get("steps", 0),
"reward": {
"structural": r["breakdown"].structural,
"ground_truth": r["breakdown"].ground_truth,
"efficiency": r["breakdown"].efficiency,
"penalty": r["breakdown"].penalty,
"total": r["breakdown"].total,
} if r.get("breakdown") else None,
"from_checkpoint": True,
})
else:
entry = _build_scenario_entry(r, scenario)
trajectory["scenarios"].append(entry)
totals = [
s["reward"]["total"]
for s in trajectory["scenarios"]
if s.get("reward")
]
trajectory["avg_reward"] = round(sum(totals) / len(totals), 4) if totals else 0.0
with open(filepath, "w") as f:
json.dump(trajectory, f, indent=2, default=str)
print(f"\n Trajectory saved: {filepath}")
logger.info(f"Trajectory saved to {filepath}")
# Clean up checkpoint file now that full trajectory is written
checkpoint_path = os.path.join(traj_dir, f"{safe_model}.checkpoint.json")
if os.path.exists(checkpoint_path):
os.remove(checkpoint_path)
return filepath
def save_trajectory_atif(
results: List[Dict[str, Any]],
scenarios: list,
model: str,
temperature: float,
total_elapsed: float,
run_id: str = "",
reward_mode: str = "custom",
gym_version: str = "unknown",
token_usage: Optional[Dict[str, int]] = None,
):
"""Save trajectory in ATIF v1.4 format (Harbor/Terminus-2 standard)."""
safe_model = model.replace("/", "_").replace(":", "_")
filename = f"{safe_model}_atif.json"
traj_dir = os.path.join(OUTPUT_DIR, "trajectories", run_id)
os.makedirs(traj_dir, exist_ok=True)
filepath = os.path.join(traj_dir, filename)
scenario_map = {s.id: s for s in scenarios}
atif_steps = []
step_id = 0
for r in results:
if r.get("from_checkpoint"):
continue
sid = r.get("scenario")
scenario = scenario_map.get(sid)
episode = r.get("episode")
if not episode:
continue
step_id += 1
atif_steps.append({
"step_id": step_id,
"timestamp": episode.steps[0].timestamp if episode.steps else None,
"role": "user",
"message": scenario.prompt if scenario else sid,
"tool_calls": [],
"observation": None,
})
for step in episode.steps:
step_id += 1
result_data = step.result
if isinstance(result_data, str):
try:
result_data = json.loads(result_data)
except (json.JSONDecodeError, TypeError):
pass
atif_steps.append({
"step_id": step_id,
"timestamp": step.timestamp,
"role": "assistant",
"message": None,
"tool_calls": [{
"id": f"call_{step_id}",
"function_name": step.tool_name,
"arguments": step.arguments,
}],
"observation": {
"content": result_data,
"success": step.success,
"error": step.error,
},
"duration_ms": round(step.elapsed * 1000),
})
bd = r.get("breakdown")
if bd and atif_steps:
atif_steps[-1]["reward"] = bd.total
usage = token_usage or {}
trajectory = {
"schema_version": "ATIF-v1.4",
"session_id": run_id,
"started_at": datetime.now(IST).isoformat(),
"agent": {
"name": f"openenv-{GYM_NAME}",
"model_name": model,
"temperature": temperature,
},
"environment": {
"gym": GYM_NAME,
"version": gym_version,
"reward_mode": reward_mode,
},
"steps": atif_steps,
"final_metrics": {
"total_steps": step_id,
"total_wall_time_s": round(total_elapsed, 2),
"total_prompt_tokens": usage.get("prompt_tokens", 0),
"total_completion_tokens": usage.get("completion_tokens", 0),
"total_cost_usd": 0.0,
"custom": {
"avg_reward": round(
sum(r.get("total_reward", 0) for r in results) / max(len(results), 1), 4
),
"total_scenarios": len(results),
},
},
}
with open(filepath, "w") as f:
json.dump(trajectory, f, indent=2, default=str)
print(f"\n ATIF trajectory saved: {filepath}")
logger.info(f"ATIF trajectory saved to {filepath}")
return filepath
# ── Scenario Execution ──
WS_RETRY_ERRORS = ("ConnectionClosed", "ConnectionClosedOK", "ConnectionClosedError", "sent 1000")
MAX_WS_RETRIES = 3
def _run_scenario_with_retries(
scenario,
runner: AgentRunner,
checker,
env_client,
connect_fn,
model: str,
) -> Dict[str, Any]:
"""Execute a single scenario with WebSocket retry logic. Returns a result dict."""
start = time.time()
last_error = None
for attempt in range(MAX_WS_RETRIES + 1):
try:
if attempt > 0:
logger.info(f"[{model}] Reconnecting (attempt {attempt + 1}) for {scenario.id}")
print(f" [{model}] Reconnecting WebSocket (attempt {attempt + 1})...")
try:
env_client.__exit__(None, None, None)
except Exception:
pass
time.sleep(2 * attempt)
env_client, runner = connect_fn()
episode, breakdown = runner.run_scenario(scenario, checker)
elapsed = time.time() - start
if hasattr(checker, "set_episode"):
checker.set_episode(episode)
outcome_results = checker.check_all(scenario.outcome_checks)
return {
"scenario": scenario.id,
"total_reward": breakdown.total,
"breakdown": breakdown,
"steps": len(episode.steps),
"elapsed": elapsed,
"episode": episode,
"outcome_results": outcome_results,
}
except Exception as e:
last_error = e
is_ws_error = any(tok in type(e).__name__ or tok in str(e) for tok in WS_RETRY_ERRORS)
if is_ws_error and attempt < MAX_WS_RETRIES:
logger.warning(f"[{model}] WebSocket error on {scenario.id}: {e}")
continue
break
elapsed = time.time() - start
logger.exception(f"[{model}] Scenario {scenario.id} failed")
return {
"scenario": scenario.id,
"total_reward": 0.0,
"breakdown": None,
"steps": 0,
"elapsed": elapsed,
"error": str(last_error),
}
def _run_scenario_n_samples(
scenario,
n: int,
pass_threshold: float,
pass_k_values: List[int],
model: str,
base_url: str,
temperature: float,
max_tokens: int,
reward_mode: str,
) -> Dict[str, Any]:
"""Run a single scenario n times for pass@k evaluation."""
samples = []
correct_count = 0
def _connect():
client = AutoEnv.from_env(GYM_NAME, base_url=base_url)
client.__enter__()
xform = VisualMemoryStepTransform() if reward_mode == "openenv" else None
rnr = AgentRunner(
model=model,
env_client=client,
temperature=temperature,
max_tokens=max_tokens,
reward_mode=reward_mode,
transform=xform,
)
return client, rnr
env_client, runner = _connect()
checker = VisualMemoryChecker()
try:
for sample_idx in range(n):
result = _run_scenario_with_retries(
scenario, runner, checker, env_client, _connect, model,
)
gt_score = 0.0
outcome_results = result.get("outcome_results", [])
if outcome_results:
gt_score = sum(outcome_results) / len(outcome_results)
is_success = gt_score >= pass_threshold
if is_success:
correct_count += 1
result["sample_idx"] = sample_idx
result["ground_truth_score"] = gt_score
result["success"] = is_success
samples.append(result)
status = "PASS" if is_success else "FAIL"
print(
f" [{model}] {scenario.id} sample {sample_idx + 1}/{n}: "
f"{status} (gt={gt_score:.2f}, reward={result['total_reward']:.2f}, "
f"{result['steps']} steps, {result['elapsed']:.1f}s)"
)
finally:
try:
env_client.__exit__(None, None, None)
except Exception:
pass
pass_at_k = {}
for k in pass_k_values:
if k <= n:
pass_at_k[str(k)] = round(pass_at_k_estimator(n, correct_count, k), 4)
best_sample = max(samples, key=lambda s: s.get("total_reward", 0.0))
return {
"scenario": scenario.id,
"n": n,
"c": correct_count,
"pass_at_k": pass_at_k,
"samples": samples,
"best_reward": best_sample.get("total_reward", 0.0),
"avg_steps": sum(s.get("steps", 0) for s in samples) / max(len(samples), 1),
"total_reward": best_sample.get("total_reward", 0.0),
"breakdown": best_sample.get("breakdown"),
"steps": best_sample.get("steps", 0),
"elapsed": sum(s.get("elapsed", 0) for s in samples),
"episode": best_sample.get("episode"),
"outcome_results": best_sample.get("outcome_results", []),
}
# ── Model Workers ──
def _run_single_model(
model: str,
base_url: str,
scenarios: list,
temperature: float,
max_tokens: int,
reward_mode: str,
run_id: str,
save: bool,
trajectory: bool,
verbose: bool,
gym_version: str = "unknown",
num_samples: int = 1,
pass_k_values: Optional[List[int]] = None,
pass_threshold: float = 0.5,
parallel_scenarios: int = 1,
resume: bool = False,
trajectory_format: str = "native",
) -> Dict[str, Any]:
model_start = time.time()
# Resume: load checkpoint
completed_ids: Set[str] = set()
prior_results: List[Dict] = []
if resume:
completed_ids, prior_results = _load_checkpoint(run_id, model)
pending = [s for s in scenarios if s.id not in completed_ids]
if not pending:
print(f" [{model}] All scenarios already completed (checkpoint)")
model_elapsed = time.time() - model_start
return {"model": model, "results": prior_results, "elapsed": model_elapsed}
if completed_ids:
print(f" [{model}] Resuming: {len(pending)} remaining of {len(scenarios)} scenarios")
model_results = list(prior_results)
results_lock = threading.Lock()
is_passk = num_samples > 1
def _connect():
client = AutoEnv.from_env(GYM_NAME, base_url=base_url)
client.__enter__()
xform = VisualMemoryStepTransform() if reward_mode == "openenv" else None
rnr = AgentRunner(
model=model,
env_client=client,
temperature=temperature,
max_tokens=max_tokens,
reward_mode=reward_mode,
transform=xform,
)
return client, rnr
def _run_one_scenario(scenario, idx, total):
"""Run a single scenario (optionally n samples) and append to results."""
print(f"\n [{model}] Scenario {idx}/{total}: {scenario.id}")
if is_passk:
result = _run_scenario_n_samples(
scenario, n=num_samples, pass_threshold=pass_threshold,
pass_k_values=pass_k_values or [1], model=model,
base_url=base_url, temperature=temperature,
max_tokens=max_tokens, reward_mode=reward_mode,
)
pk = result.get("pass_at_k", {})
pk_str = ", ".join(f"pass@{k}={pk.get(str(k), 0):.2f}" for k in (pass_k_values or [1]))
print(
f" [{model}] {scenario.id}: {result['c']}/{result['n']} correct → {pk_str}"
)
else:
env_client, runner = _connect()
checker = VisualMemoryChecker()
try:
result = _run_scenario_with_retries(
scenario, runner, checker, env_client, _connect, model,
)
reward_str = f"{result['total_reward']:.2f}" if result.get("breakdown") else "ERROR"
print(
f" [{model}] {scenario.id}: {reward_str} "
f"({result['steps']} steps, {result['elapsed']:.1f}s)"
)
finally:
try:
env_client.__exit__(None, None, None)
except Exception:
pass
with results_lock:
model_results.append(result)
_save_checkpoint(
run_id, model, model_results, scenarios,
temperature, reward_mode, gym_version,
)
return result
if parallel_scenarios > 1 and len(pending) > 1:
max_concurrent = int(os.getenv("MAX_CONCURRENT_ENVS", "8"))
max_workers = min(parallel_scenarios, len(pending), max_concurrent)
print(f" [{model}] Running {len(pending)} scenarios with {max_workers} parallel workers")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {}
for idx, scenario in enumerate(pending, len(completed_ids) + 1):
future = executor.submit(
_run_one_scenario, scenario, idx, len(scenarios),
)
futures[future] = scenario
for future in as_completed(futures):
scenario = futures[future]
try:
future.result()
except Exception as e:
print(f" [{model}] {scenario.id}: ERROR - {e}")
logger.exception(f"Scenario {scenario.id} failed")
with results_lock:
model_results.append({
"scenario": scenario.id,
"total_reward": 0.0,
"breakdown": None,
"steps": 0,
"elapsed": 0.0,
"error": str(e),
})
else:
if not is_passk:
env_client, runner = _connect()
checker = VisualMemoryChecker()
try:
for idx, scenario in enumerate(pending, len(completed_ids) + 1):
print(f"\n [{model}] Scenario {idx}/{len(scenarios)}: {scenario.id}")
result = _run_scenario_with_retries(
scenario, runner, checker, env_client, _connect, model,
)
reward_str = f"{result['total_reward']:.2f}" if result.get("breakdown") else "ERROR"
print(
f" [{model}] {scenario.id}: {reward_str} "
f"({result['steps']} steps, {result['elapsed']:.1f}s)"
)
model_results.append(result)
_save_checkpoint(
run_id, model, model_results, scenarios,
temperature, reward_mode, gym_version,
)
finally:
try:
env_client.__exit__(None, None, None)
except Exception:
pass
else:
for idx, scenario in enumerate(pending, len(completed_ids) + 1):
_run_one_scenario(scenario, idx, len(scenarios))
model_elapsed = time.time() - model_start
if save:
output_path = os.path.join(OUTPUT_DIR, "results", f"{run_id}.md")
save_results_to_markdown(
results=model_results,
model=model,
output_path=output_path,
total_elapsed=model_elapsed,
temperature=temperature,
run_id=run_id,
reward_mode=reward_mode,
gym_version=gym_version,
num_samples=num_samples,
pass_k_values=pass_k_values,
)
if trajectory:
save_trajectory(
results=model_results,
scenarios=scenarios,
model=model,
temperature=temperature,
total_elapsed=model_elapsed,
run_id=run_id,
reward_mode=reward_mode,
gym_version=gym_version,
num_samples=num_samples,
pass_k_values=pass_k_values,
)
if trajectory_format == "atif":
save_trajectory_atif(
results=model_results,
scenarios=scenarios,
model=model,
temperature=temperature,
total_elapsed=model_elapsed,
run_id=run_id,
reward_mode=reward_mode,
gym_version=gym_version,
)
return {
"model": model,
"results": model_results,
"elapsed": model_elapsed,
}
def _run_single_model_detailed(
model: str,
base_url: str,
scenarios: list,
temperature: float,
max_tokens: int,
reward_mode: str,
run_id: str,
save: bool,
trajectory: bool,
gym_version: str = "unknown",
num_samples: int = 1,
pass_k_values: Optional[List[int]] = None,
pass_threshold: float = 0.5,
resume: bool = False,
trajectory_format: str = "native",
) -> Dict[str, Any]:
model_start = time.time()
results = []
completed_ids: Set[str] = set()
if resume:
completed_ids, prior = _load_checkpoint(run_id, model)
results = list(prior)
pending = [s for s in scenarios if s.id not in completed_ids]
if not pending:
print(f" [{model}] All scenarios already completed (checkpoint)")
return {"model": model, "results": results, "elapsed": time.time() - model_start}
is_passk = num_samples > 1
if is_passk:
for i, scenario in enumerate(pending, len(completed_ids) + 1):
divider(f"Scenario {i}/{len(scenarios)}: {scenario.id}")
print(f" Prompt: {scenario.prompt[:120]}...")
print(f" Samples: {num_samples}")
print()
result = _run_scenario_n_samples(
scenario, n=num_samples, pass_threshold=pass_threshold,
pass_k_values=pass_k_values or [1], model=model,
base_url=base_url, temperature=temperature,
max_tokens=max_tokens, reward_mode=reward_mode,
)
pk = result.get("pass_at_k", {})
print(f"\n -- pass@k Results --")
print(f" Correct: {result['c']}/{result['n']}")
for k in (pass_k_values or [1]):
print(f" pass@{k}: {pk.get(str(k), 0.0):.4f}")
results.append(result)
_save_checkpoint(
run_id, model, results, scenarios,
temperature, reward_mode, gym_version,
)
else:
env_client = AutoEnv.from_env(GYM_NAME, base_url=base_url)
env_client.__enter__()
checker = VisualMemoryChecker()
transform = VisualMemoryStepTransform() if reward_mode == "openenv" else None
runner = AgentRunner(
model=model, env_client=env_client, temperature=temperature,
max_tokens=max_tokens, reward_mode=reward_mode, transform=transform,
)
try:
for i, scenario in enumerate(pending, len(completed_ids) + 1):
divider(f"Scenario {i}/{len(scenarios)}: {scenario.id}")
print(f" Prompt: {scenario.prompt[:120]}...")
print(f" Expected tools: {scenario.expected_tools}")
print(f" Max steps: {scenario.max_steps}")
print()
start = time.time()
try:
episode, breakdown = runner.run_scenario(scenario, checker)
elapsed = time.time() - start
print()
print(" -- Agent Actions --")
for step in episode.steps:
status = "OK" if step.success else "FAIL"
args_str = _short_json(step.arguments)
print(f" [{status}] {step.tool_name}({args_str})")
print(f" Steps taken: {len(episode.steps)}")
if hasattr(checker, "set_episode"):
checker.set_episode(episode)
print()
print(" -- Ground Truth Verification --")
outcome_results = checker.check_all(scenario.outcome_checks)
for check, score in zip(scenario.outcome_checks, outcome_results):
status = "PASS" if score else "FAIL"
label = _check_label(check)
print(f" [{status}] {check['type']}: {label}")
print()
print(" -- Reward Breakdown --")
print_breakdown(breakdown)
print(f"\n Completed in {elapsed:.1f}s")
result = {
"scenario": scenario.id,
"total_reward": breakdown.total,
"breakdown": breakdown,
"steps": len(episode.steps),
"elapsed": elapsed,
"episode": episode,
"outcome_results": outcome_results,
}
results.append(result)
except Exception as e:
elapsed = time.time() - start
print(f"\n ERROR: {e}")
logger.exception(f"Scenario {scenario.id} failed")
results.append({
"scenario": scenario.id,
"total_reward": 0.0,
"breakdown": None,
"steps": 0,
"elapsed": elapsed,
"error": str(e),
})
_save_checkpoint(
run_id, model, results, scenarios,
temperature, reward_mode, gym_version,
)
finally:
env_client.__exit__(None, None, None)
logger.info("AutoEnv client disconnected.")
model_elapsed = time.time() - model_start
if save:
output_path = os.path.join(OUTPUT_DIR, "results", f"{run_id}.md")
save_results_to_markdown(
results=results, model=model, output_path=output_path,
total_elapsed=model_elapsed, temperature=temperature,
run_id=run_id, reward_mode=reward_mode, gym_version=gym_version,
num_samples=num_samples, pass_k_values=pass_k_values,
)
print(f"\n Results saved: {output_path}")
if trajectory:
save_trajectory(
results=results, scenarios=scenarios, model=model,
temperature=temperature, total_elapsed=model_elapsed,
run_id=run_id, reward_mode=reward_mode, gym_version=gym_version,
num_samples=num_samples, pass_k_values=pass_k_values,
)
if trajectory_format == "atif":
save_trajectory_atif(
results=results, scenarios=scenarios, model=model,
temperature=temperature, total_elapsed=model_elapsed,
run_id=run_id, reward_mode=reward_mode, gym_version=gym_version,
)
return {
"model": model,
"results": results,
"elapsed": model_elapsed,
}
# ── Main ──
def main():
parser = argparse.ArgumentParser(
description="Evaluate an LLM agent against Visual Memory gym scenarios.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Basic (backward compatible)
python run_eval.py --model gpt-5.4 --save --trajectory
python run_eval.py --model gpt-5.4 --scenario directional_trap_8x8
# pass@k
python run_eval.py --model gpt-5.4 --num-samples 10 --pass-k 1,3,8 --save
# Parallel scenarios
python run_eval.py --model gpt-5.4 --parallel-scenarios 4 --save
# Resume interrupted run
python run_eval.py --model gpt-5.4 --run-id my_run --resume --save --trajectory
# ATIF trajectory format
python run_eval.py --model gpt-5.4 --trajectory --trajectory-format atif
# Combined
python run_eval.py --model gpt-5.4 --num-samples 10 --pass-k 1,3,8 \\
--parallel-scenarios 4 --run-id bench_v1 --resume --save \\
--trajectory --trajectory-format atif
""",
)
parser.add_argument(
"--model",
default=os.getenv("LLM_MODEL", "gpt-4o"),
help="LiteLLM model string, or comma-separated for parallel mode "
"(e.g., 'gpt-5.4' or 'gpt-5.4,claude-sonnet-4-6')",
)
parser.add_argument(
"--scenario",
default=None,
help="Run a specific scenario by ID (default: run all 10)",
)
parser.add_argument(
"--temperature",
type=float,
default=None,
help="LLM sampling temperature (default: 0.0, or 0.8 in pass@k mode)",
)
parser.add_argument(
"--max-tokens",
type=int,
default=int(os.getenv("LLM_MAX_TOKENS", "1024")),
help="Max tokens per LLM response (default: 1024)",
)
parser.add_argument(
"--save",
action="store_true",
default=True,
help="Save results to outputs/results/<run_id>.md (default: on)",
)
parser.add_argument(
"--no-save",
action="store_false",
dest="save",
help="Disable saving results markdown",
)
parser.add_argument(
"--trajectory",
action="store_true",
default=True,
help="Save detailed trajectory JSON to outputs/trajectories/<run_id>/ (default: on)",
)
parser.add_argument(
"--no-trajectory",
action="store_false",
dest="trajectory",
help="Disable saving trajectory JSON",
)
parser.add_argument(
"--trajectory-format",
default="native",
choices=["native", "atif"],
help="Trajectory format: 'native' (default) or 'atif' (Harbor/ATIF v1.4)",
)
parser.add_argument(
"--run-id",
default=None,
help="Run identifier (default: auto-generated as run_YYYYMMDD_HHMM)",
)
parser.add_argument(
"--reward-mode",
default="custom",
choices=["custom", "openenv"],
help="Reward mode: 'custom' (episode-level) or 'openenv' (per-step). Default: custom",
)
parser.add_argument(
"--parallel-models",
type=int,
default=1,
help="Number of models to evaluate in parallel (default: 1)",
)
parser.add_argument(
"--parallel",
type=int,
default=None,
help=argparse.SUPPRESS, # hidden alias for --parallel-models
)
parser.add_argument(
"--parallel-scenarios",
type=int,
default=1,
help="Number of scenarios to run concurrently per model (default: 1)",
)
parser.add_argument(
"--num-samples",
type=int,
default=1,
help="Number of samples per scenario for pass@k (default: 1 = single run)",
)
parser.add_argument(
"--pass-k",
default="1",
help="Comma-separated k values for pass@k (default: '1'). "
"Requires --num-samples >= max(k).",
)
parser.add_argument(
"--pass-threshold",
type=float,
default=0.5,
help="Min ground_truth score to count as 'pass' for pass@k (default: 0.5)",
)
parser.add_argument(
"--resume",
action="store_true",
help="Resume a previous run, skipping completed scenarios. Requires --run-id.",
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable debug logging",
)
args = parser.parse_args()
# Resolve --parallel alias
parallel_models = args.parallel if args.parallel is not None else args.parallel_models
models = [m.strip() for m in args.model.split(",") if m.strip()]
pass_k_values = [int(k.strip()) for k in args.pass_k.split(",") if k.strip()]
# Validate pass@k constraints
if args.num_samples > 1:
max_k = max(pass_k_values)
if args.num_samples < max_k:
print(f"Error: --num-samples ({args.num_samples}) must be >= max(--pass-k) ({max_k})")
sys.exit(1)
# Temperature: auto-set for pass@k mode if not explicitly provided
if args.temperature is not None:
temperature = args.temperature
elif args.num_samples > 1:
temperature = 0.8
print(f" pass@k mode: temperature auto-set to 0.8 for diverse sampling")
else:
temperature = float(os.getenv("LLM_TEMPERATURE", "0.0"))
if args.run_id:
run_id = args.run_id
else:
run_id = f"run_{datetime.now(IST).strftime('%Y%m%d_%H%M')}"
if args.resume and not args.run_id:
print("Warning: --resume without --run-id uses auto-generated ID (no checkpoint to load)")
log_level = logging.DEBUG if args.verbose else logging.WARNING
logging.basicConfig(
level=log_level,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%H:%M:%S",
)
if not args.verbose:
logging.getLogger("LiteLLM").setLevel(logging.WARNING)
logging.getLogger("litellm").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("openenv").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
base_url = _resolve_base_url()
scenarios = VISUAL_MEMORY_SCENARIOS
if args.scenario:
scenarios = [s for s in scenarios if s.id == args.scenario]
if not scenarios:
available = [s.id for s in VISUAL_MEMORY_SCENARIOS]
print(f"Error: Scenario '{args.scenario}' not found. Available: {available}")
sys.exit(1)
divider("AutoEnv Discovery")
print(f" Discovering gym '{GYM_NAME}' via AutoEnv...")
env_info = AutoEnv.get_env_info(GYM_NAME)
print(f" Found: {env_info['name']} (package: {env_info['package']}, v{env_info['version']})")
print(f" Base URL: {base_url} (auto-derived from openenv.yaml)")
gym_metadata = _fetch_gym_metadata(base_url)
if gym_metadata:
print(f"\n -- Environment Metadata (GET {base_url}/metadata) --")
print(f" Name: {gym_metadata.get('name', 'N/A')}")
print(f" Version: {gym_metadata.get('version', 'N/A')}")
print(f" Description: {gym_metadata.get('description', 'N/A')}")
else:
print(f"\n Warning: Could not fetch /metadata from {base_url} (server may not be running)")
is_parallel_models = parallel_models > 1 and len(models) > 1
gym_version = gym_metadata.get("version", "unknown") if gym_metadata else "unknown"
mode_parts = []
if is_parallel_models:
mode_parts.append(f"{parallel_models} model workers")
if args.parallel_scenarios > 1:
mode_parts.append(f"{args.parallel_scenarios} scenario workers/model")
if args.num_samples > 1:
mode_parts.append(f"pass@k (n={args.num_samples}, k={pass_k_values})")
if args.resume:
mode_parts.append("resume")
mode_str = ", ".join(mode_parts) if mode_parts else "Sequential"
divider("LLM Evaluation Run")
print(f" Gym: {GYM_NAME} (v{gym_version})")
print(f" Models: {', '.join(models)}")
print(f" Run ID: {run_id}")
print(f" Mode: {mode_str}")
print(f" Base URL: {base_url}")
print(f" Scenarios: {len(scenarios)} of {len(VISUAL_MEMORY_SCENARIOS)}")
print(f" Temperature: {temperature}")
print(f" Reward Mode: {args.reward_mode}")
if args.num_samples > 1:
print(f" Samples/scn: {args.num_samples}")
print(f" pass@k: {pass_k_values}")
print(f" Threshold: {args.pass_threshold}")
if args.trajectory:
print(f" Traj Format: {args.trajectory_format}")
print(f" Output Dir: {OUTPUT_DIR}")
total_start = time.time()
all_model_results = []
common_kwargs = dict(
base_url=base_url,
scenarios=scenarios,
temperature=temperature,
max_tokens=args.max_tokens,
reward_mode=args.reward_mode,
run_id=run_id,
save=args.save,
trajectory=args.trajectory,
gym_version=gym_version,
num_samples=args.num_samples,
pass_k_values=pass_k_values,
pass_threshold=args.pass_threshold,
parallel_scenarios=args.parallel_scenarios,
resume=args.resume,
trajectory_format=args.trajectory_format,
)
if is_parallel_models:
divider(f"Parallel Evaluation ({len(models)} models, {parallel_models} workers)")
max_workers = min(parallel_models, len(models))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {}
for idx, model in enumerate(models):
if idx > 0:
time.sleep(3)
future = executor.submit(
_run_single_model,
model=model,
verbose=args.verbose,
**common_kwargs,
)
futures[future] = model
for future in as_completed(futures):
model = futures[future]
try:
result = future.result()
all_model_results.append(result)
print(f"\n {model} completed in {result['elapsed']:.1f}s")
except Exception as e:
print(f"\n {model} FAILED: {e}")
logger.exception(f"Model {model} failed")
all_model_results.append({
"model": model,
"results": [],
"elapsed": 0.0,
"error": str(e),
})
else:
for model in models:
if len(models) > 1:
divider(f"Model: {model}")
if len(models) == 1 and args.parallel_scenarios <= 1:
result = _run_single_model_detailed(
model=model,
**{k: v for k, v in common_kwargs.items()
if k not in ("parallel_scenarios",)},
)
else:
result = _run_single_model(
model=model,
verbose=args.verbose,
**common_kwargs,
)
all_model_results.append(result)
total_elapsed = time.time() - total_start
is_passk = args.num_samples > 1
divider("Evaluation Summary")
for mr in all_model_results:
model = mr["model"]
results = mr.get("results", [])
model_elapsed = mr.get("elapsed", 0.0)
if not results:
print(f"\n Model: {model} -- FAILED ({mr.get('error', 'unknown')})")
continue
print(f"\n Model: {model}")
print(f" Time: {model_elapsed:.1f}s")
if is_passk:
k_headers = " ".join(f"{'pass@' + str(k):>8}" for k in pass_k_values)
print(f" {'Scenario':<35} {'n':>4} {'c':>4} {k_headers}")
print(f" {'-' * 35} {'-' * 4} {'-' * 4} " + " ".join("-" * 8 for _ in pass_k_values))
all_pass_at = {str(k): [] for k in pass_k_values}
for r in results:
n = r.get("n", 1)
c = r.get("c", 0)
pk = r.get("pass_at_k", {})
k_vals = " ".join(f"{pk.get(str(k), 0.0):>8.4f}" for k in pass_k_values)
print(f" {r['scenario']:<35} {n:>4} {c:>4} {k_vals}")
for k in pass_k_values:
all_pass_at[str(k)].append(pk.get(str(k), 0.0))
print(f" {'-' * 35} {'-' * 4} {'-' * 4} " + " ".join("-" * 8 for _ in pass_k_values))
avg_vals = " ".join(
f"{sum(all_pass_at[str(k)]) / len(all_pass_at[str(k)]):>8.4f}"
if all_pass_at[str(k)] else f"{'N/A':>8}"
for k in pass_k_values
)
print(f" {'AVERAGE':<35} {'':>4} {'':>4} {avg_vals}")
else:
print(f" {'Scenario':<35} {'Reward':>8} {'Steps':>6} {'Time':>6}")
print(f" {'-' * 35} {'-' * 8} {'-' * 6} {'-' * 6}")
for r in results:
reward_str = f"{r['total_reward']:.2f}" if r.get("breakdown") else "ERROR"
print(f" {r['scenario']:<35} {reward_str:>8} {r['steps']:>6} {r['elapsed']:>5.1f}s")
total_reward = sum(r["total_reward"] for r in results)
avg_reward = total_reward / len(results) if results else 0.0
print(f" {'-' * 35} {'-' * 8} {'-' * 6} {'-' * 6}")
print(f" {'AVERAGE':<35} {avg_reward:>8.2f}")
if len(models) > 1:
print(f"\n Total time (all models): {total_elapsed:.1f}s")
if is_parallel_models:
seq_time = sum(mr.get("elapsed", 0.0) for mr in all_model_results)
speedup = seq_time / total_elapsed if total_elapsed > 0 else 1.0
print(f" Sequential equivalent: {seq_time:.1f}s")
print(f" Speedup: {speedup:.1f}x")
if __name__ == "__main__":
main()