#!/usr/bin/env python3 """ Evaluation Runner — run an LLM agent against Visual Memory gym scenarios. Single-gym version of the repo-level run_eval.py, tailored for the visual_memory environment. No --gym flag needed. Usage: # Single model (backward compatible) python run_eval.py --model gpt-5.4 --save --trajectory # Multiple models in parallel python run_eval.py --model gpt-5.4,claude-sonnet-4-6 --parallel-models 3 --save --trajectory # Specific scenario python run_eval.py --model gpt-5.4 --scenario directional_trap_8x8 # pass@k evaluation (run each scenario 10 times, report pass@1, pass@3, pass@8) python run_eval.py --model gpt-5.4 --num-samples 10 --pass-k 1,3,8 --save # Parallel scenarios (run 4 scenarios concurrently per model) python run_eval.py --model gpt-5.4 --parallel-scenarios 4 --save # Resume interrupted run python run_eval.py --model gpt-5.4 --run-id my_run --resume --save --trajectory # ATIF trajectory format (Harbor/Terminus-2 standard) python run_eval.py --model gpt-5.4 --trajectory --trajectory-format atif Prerequisites: 1. pip install -e . 2. docker build -t openenv-visual-memory -f server/Dockerfile . 3. docker run -d --name visual-memory -p 8000:8000 openenv-visual-memory """ import argparse import json import logging import os import sys import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone, timedelta from typing import Any, Dict, List, Optional, Set, Tuple import numpy as np IST = timezone(timedelta(hours=5, minutes=30)) from dotenv import load_dotenv load_dotenv(os.path.join(os.path.dirname(__file__), ".env")) sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from openenv import AutoEnv from agent.runner import AgentRunner from rewards.base import RewardBreakdown from rewards.checks import VisualMemoryChecker from rewards.transforms import VisualMemoryStepTransform from scenarios.definitions import VISUAL_MEMORY_SCENARIOS logger = logging.getLogger(__name__) GYM_NAME = "visual_memory" OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "outputs") # ── Helpers ── def _resolve_base_url() -> str: import importlib.resources import yaml try: ref = importlib.resources.files(GYM_NAME).joinpath("openenv.yaml") with importlib.resources.as_file(ref) as f: manifest = yaml.safe_load(f.read_text()) port = manifest.get("port", 8000) return f"http://localhost:{port}" except Exception: logger.warning("Could not read openenv.yaml, defaulting to port 8000") return "http://localhost:8000" def _fetch_gym_metadata(base_url: str) -> dict | None: import httpx try: resp = httpx.get(f"{base_url}/metadata", timeout=5.0) resp.raise_for_status() data = resp.json() data.pop("readme_content", None) return data except Exception as e: logger.debug(f"Failed to fetch /metadata from {base_url}: {e}") return None def divider(text: str = ""): print(f"\n{'=' * 70}") if text: print(f" {text}") print(f"{'=' * 70}") def print_breakdown(breakdown: RewardBreakdown): print(breakdown.summary()) print() print(f" Details: {breakdown.details}") def _check_label(check: dict) -> str: for key in ("min_score", "min_pct", "max_hits"): if key in check and key != "type": return str(check[key]) return check.get("type", "?") def _short_json(obj, max_len=80): s = json.dumps(obj, default=str) return s if len(s) <= max_len else s[:max_len] + "..." # ── pass@k Estimator ── def pass_at_k_estimator(n: int, c: int, k: int) -> float: """ Unbiased estimator of pass@k (Chen et al., 2021 — HumanEval). n = total samples, c = correct samples, k = subset size. Returns P(at least 1 correct in a random k-subset of n runs). """ if n - c < k: return 1.0 return 1.0 - np.prod(1.0 - k / np.arange(n - c + 1, n + 1)) # ── Checkpoint (Resume) ── def _load_checkpoint(run_id: str, model: str) -> Tuple[Set[str], List[Dict]]: """Load completed scenario IDs and their results from a checkpoint file.""" safe_model = model.replace("/", "_").replace(":", "_") checkpoint_path = os.path.join( OUTPUT_DIR, "trajectories", run_id, f"{safe_model}.checkpoint.json" ) final_path = os.path.join( OUTPUT_DIR, "trajectories", run_id, f"{safe_model}.json" ) for path in [checkpoint_path, final_path]: if not os.path.exists(path): continue try: with open(path) as f: data = json.load(f) completed = set() prior_results = [] for s in data.get("scenarios", []): sid = s.get("scenario_id") reward = s.get("reward") if sid and reward is not None: completed.add(sid) prior_results.append({ "scenario": sid, "total_reward": reward["total"], "breakdown": RewardBreakdown( structural=reward["structural"], ground_truth=reward["ground_truth"], efficiency=reward["efficiency"], penalty=reward["penalty"], total=reward["total"], ), "steps": s.get("total_steps", 0), "elapsed": s.get("elapsed_s", 0), "from_checkpoint": True, }) print(f" [{model}] Checkpoint loaded: {len(completed)} scenarios completed") return completed, prior_results except (json.JSONDecodeError, KeyError) as e: logger.warning(f"Could not parse checkpoint {path}: {e}") return set(), [] def _build_scenario_entry(r: Dict, scenario) -> Dict: """Build a trajectory scenario entry from a result dict.""" entry = { "scenario_id": r.get("scenario", getattr(scenario, "id", "unknown")), "elapsed_s": round(r.get("elapsed", 0), 2), } if scenario: entry["prompt"] = scenario.prompt entry["expected_tools"] = scenario.expected_tools entry["max_steps"] = scenario.max_steps episode = r.get("episode") if episode: steps = [] for i, step in enumerate(episode.steps, 1): result_data = step.result if isinstance(result_data, str): try: result_data = json.loads(result_data) except (json.JSONDecodeError, TypeError): pass steps.append({ "step": i, "timestamp": step.timestamp, "tool_name": step.tool_name, "arguments": step.arguments, "success": step.success, "result": result_data, "error": step.error, "elapsed_s": round(step.elapsed, 3), }) entry["steps"] = steps entry["total_steps"] = len(steps) else: entry["steps"] = [] entry["total_steps"] = r.get("steps", 0) if r.get("error"): entry["error"] = r["error"] outcome_results = r.get("outcome_results", []) if outcome_results and scenario: checks = [] for check_def, passed in zip(scenario.outcome_checks, outcome_results): checks.append({"check": check_def, "passed": passed}) entry["outcome_checks"] = checks bd = r.get("breakdown") if bd: entry["reward"] = { "structural": round(bd.structural, 4), "ground_truth": round(bd.ground_truth, 4), "efficiency": round(bd.efficiency, 4), "penalty": round(bd.penalty, 4), "total": round(bd.total, 4), } else: entry["reward"] = None # pass@k multi-sample data if "samples" in r: entry["num_samples"] = r.get("n", 1) entry["correct_count"] = r.get("c", 0) entry["pass_at_k"] = r.get("pass_at_k", {}) entry["samples"] = [] for sample in r["samples"]: sample_entry = { "sample_idx": sample.get("sample_idx", 0), "success": sample.get("success", False), "total_reward": sample.get("total_reward", 0), "steps": sample.get("steps", 0), "elapsed": round(sample.get("elapsed", 0), 2), } if sample.get("error"): sample_entry["error"] = sample["error"] entry["samples"].append(sample_entry) return entry def _save_checkpoint( run_id: str, model: str, all_results: List[Dict], scenarios: list, temperature: float, reward_mode: str, gym_version: str, ): """Incrementally save checkpoint after each scenario.""" safe_model = model.replace("/", "_").replace(":", "_") traj_dir = os.path.join(OUTPUT_DIR, "trajectories", run_id) os.makedirs(traj_dir, exist_ok=True) checkpoint_path = os.path.join(traj_dir, f"{safe_model}.checkpoint.json") scenario_map = {s.id: s for s in scenarios} checkpoint = { "run_id": run_id, "model": model, "gym": GYM_NAME, "gym_version": gym_version, "timestamp": datetime.now(IST).isoformat(), "temperature": temperature, "reward_mode": reward_mode, "total_scenarios": len(all_results), "scenarios": [], } for r in all_results: sid = r.get("scenario") if r.get("from_checkpoint"): checkpoint["scenarios"].append({ "scenario_id": sid, "elapsed_s": r.get("elapsed", 0), "total_steps": r.get("steps", 0), "reward": { "structural": r["breakdown"].structural, "ground_truth": r["breakdown"].ground_truth, "efficiency": r["breakdown"].efficiency, "penalty": r["breakdown"].penalty, "total": r["breakdown"].total, } if r.get("breakdown") else None, }) else: entry = _build_scenario_entry(r, scenario_map.get(sid)) checkpoint["scenarios"].append(entry) with open(checkpoint_path, "w") as f: json.dump(checkpoint, f, indent=2, default=str) # ── Results & Trajectory Saving ── def save_results_to_markdown( results: List[Dict[str, Any]], model: str, output_path: str, total_elapsed: float, temperature: float, run_id: str = "", reward_mode: str = "custom", gym_version: str = "unknown", num_samples: int = 1, pass_k_values: Optional[List[int]] = None, ): os.makedirs(os.path.dirname(output_path), exist_ok=True) timestamp = datetime.now(IST).strftime("%Y-%m-%d %H:%M:%S") is_new_file = not os.path.exists(output_path) is_passk = num_samples > 1 and pass_k_values with open(output_path, "a") as f: if is_new_file: f.write(f"# Visual Memory Gym — Evaluation Results\n\n") f.write(f"**Run ID**: `{run_id}` \n") f.write(f"**Gym Version**: `{gym_version}`\n\n") f.write(f"Evaluation results for the **visual_memory** gym across different LLM models.\n\n") if is_passk: f.write(f"**Mode**: pass@k (n={num_samples}, k={pass_k_values})\n\n") if reward_mode == "openenv": f.write(f"**Reward Mode**: `openenv` — per-step rewards from `rewards/transforms.py` + ground truth\n\n") else: f.write(f"**Reward Mode**: `custom` — episode-level rewards from `rewards/base.py`\n\n") f.write(f"Trajectories: `outputs/trajectories/{run_id}/`\n\n") f.write(f"---\n\n") safe_model = model.replace("/", "_").replace(":", "_") f.write(f"## Model: `{model}`\n\n") f.write(f"- **Date**: {timestamp}\n") f.write(f"- **Temperature**: {temperature}\n") f.write(f"- **Reward Mode**: {reward_mode}\n") if is_passk: f.write(f"- **Samples per scenario**: {num_samples}\n") f.write(f"- **Total Time**: {total_elapsed:.1f}s\n") f.write(f"- **Trajectory**: `outputs/trajectories/{run_id}/{safe_model}.json`\n\n") if is_passk: k_headers = " | ".join(f"pass@{k}" for k in pass_k_values) f.write(f"| Scenario | n | c | {k_headers} | Best Reward | Avg Steps |\n") k_divs = " | ".join(":---:" for _ in pass_k_values) f.write(f"|---|:---:|:---:|{k_divs}|:---:|:---:|\n") for r in results: n = r.get("n", 1) c = r.get("c", 0) pass_at = r.get("pass_at_k", {}) k_vals = " | ".join(f"{pass_at.get(str(k), 0.0):.2f}" for k in pass_k_values) best = r.get("best_reward", 0.0) avg_steps = r.get("avg_steps", 0) f.write( f"| {r['scenario']} " f"| {n} | {c} | {k_vals} " f"| {best:.2f} | {avg_steps:.1f} |\n" ) all_pass1 = [r.get("pass_at_k", {}).get("1", 0.0) for r in results] avg_pass1 = sum(all_pass1) / len(all_pass1) if all_pass1 else 0.0 f.write(f"\n**Average pass@1: {avg_pass1:.2f}**\n\n") else: if reward_mode == "openenv": f.write(f"| Scenario | Quality | Ground Truth | Penalty | **Total** | Steps | Time |\n") f.write(f"|---|:---:|:---:|:---:|:---:|:---:|:---:|\n") else: f.write(f"| Scenario | Structural | Ground Truth | Efficiency | Penalty | **Total** | Steps | Time |\n") f.write(f"|---|:---:|:---:|:---:|:---:|:---:|:---:|:---:|\n") total_reward = 0.0 for r in results: bd = r.get("breakdown") if bd: if reward_mode == "openenv": f.write( f"| {r['scenario']} " f"| {bd.structural:.2f} " f"| {bd.ground_truth:.2f} " f"| {bd.penalty:.2f} " f"| **{bd.total:.2f}** " f"| {r['steps']} " f"| {r['elapsed']:.1f}s |\n" ) else: f.write( f"| {r['scenario']} " f"| {bd.structural:.2f} " f"| {bd.ground_truth:.2f} " f"| {bd.efficiency:.2f} " f"| {bd.penalty:.2f} " f"| **{bd.total:.2f}** " f"| {r['steps']} " f"| {r['elapsed']:.1f}s |\n" ) total_reward += bd.total else: cols = "| — | — | — " if reward_mode == "openenv" else "| — | — | — | — " f.write( f"| {r['scenario']} " f"{cols}" f"| **ERROR** " f"| {r['steps']} " f"| {r['elapsed']:.1f}s |\n" ) avg = total_reward / len(results) if results else 0.0 f.write(f"\n**Average Reward: {avg:.2f}**\n\n") f.write(f"---\n\n") logger.info(f"Results saved to {output_path}") def save_trajectory( results: List[Dict[str, Any]], scenarios: list, model: str, temperature: float, total_elapsed: float, run_id: str = "", reward_mode: str = "custom", gym_version: str = "unknown", num_samples: int = 1, pass_k_values: Optional[List[int]] = None, ): run_ts = datetime.now(IST).isoformat() safe_model = model.replace("/", "_").replace(":", "_") filename = f"{safe_model}.json" traj_dir = os.path.join(OUTPUT_DIR, "trajectories", run_id) os.makedirs(traj_dir, exist_ok=True) filepath = os.path.join(traj_dir, filename) scenario_map = {s.id: s for s in scenarios} trajectory = { "run_id": run_id or "untagged", "model": model, "gym": GYM_NAME, "gym_version": gym_version, "timestamp": run_ts, "temperature": temperature, "reward_mode": reward_mode, "total_elapsed_s": round(total_elapsed, 2), "total_scenarios": len(results), } if num_samples > 1: trajectory["num_samples"] = num_samples trajectory["pass_k_values"] = pass_k_values or [1] trajectory["scenarios"] = [] for r in results: sid = r.get("scenario") scenario = scenario_map.get(sid) if r.get("from_checkpoint"): trajectory["scenarios"].append({ "scenario_id": sid, "elapsed_s": r.get("elapsed", 0), "total_steps": r.get("steps", 0), "reward": { "structural": r["breakdown"].structural, "ground_truth": r["breakdown"].ground_truth, "efficiency": r["breakdown"].efficiency, "penalty": r["breakdown"].penalty, "total": r["breakdown"].total, } if r.get("breakdown") else None, "from_checkpoint": True, }) else: entry = _build_scenario_entry(r, scenario) trajectory["scenarios"].append(entry) totals = [ s["reward"]["total"] for s in trajectory["scenarios"] if s.get("reward") ] trajectory["avg_reward"] = round(sum(totals) / len(totals), 4) if totals else 0.0 with open(filepath, "w") as f: json.dump(trajectory, f, indent=2, default=str) print(f"\n Trajectory saved: {filepath}") logger.info(f"Trajectory saved to {filepath}") # Clean up checkpoint file now that full trajectory is written checkpoint_path = os.path.join(traj_dir, f"{safe_model}.checkpoint.json") if os.path.exists(checkpoint_path): os.remove(checkpoint_path) return filepath def save_trajectory_atif( results: List[Dict[str, Any]], scenarios: list, model: str, temperature: float, total_elapsed: float, run_id: str = "", reward_mode: str = "custom", gym_version: str = "unknown", token_usage: Optional[Dict[str, int]] = None, ): """Save trajectory in ATIF v1.4 format (Harbor/Terminus-2 standard).""" safe_model = model.replace("/", "_").replace(":", "_") filename = f"{safe_model}_atif.json" traj_dir = os.path.join(OUTPUT_DIR, "trajectories", run_id) os.makedirs(traj_dir, exist_ok=True) filepath = os.path.join(traj_dir, filename) scenario_map = {s.id: s for s in scenarios} atif_steps = [] step_id = 0 for r in results: if r.get("from_checkpoint"): continue sid = r.get("scenario") scenario = scenario_map.get(sid) episode = r.get("episode") if not episode: continue step_id += 1 atif_steps.append({ "step_id": step_id, "timestamp": episode.steps[0].timestamp if episode.steps else None, "role": "user", "message": scenario.prompt if scenario else sid, "tool_calls": [], "observation": None, }) for step in episode.steps: step_id += 1 result_data = step.result if isinstance(result_data, str): try: result_data = json.loads(result_data) except (json.JSONDecodeError, TypeError): pass atif_steps.append({ "step_id": step_id, "timestamp": step.timestamp, "role": "assistant", "message": None, "tool_calls": [{ "id": f"call_{step_id}", "function_name": step.tool_name, "arguments": step.arguments, }], "observation": { "content": result_data, "success": step.success, "error": step.error, }, "duration_ms": round(step.elapsed * 1000), }) bd = r.get("breakdown") if bd and atif_steps: atif_steps[-1]["reward"] = bd.total usage = token_usage or {} trajectory = { "schema_version": "ATIF-v1.4", "session_id": run_id, "started_at": datetime.now(IST).isoformat(), "agent": { "name": f"openenv-{GYM_NAME}", "model_name": model, "temperature": temperature, }, "environment": { "gym": GYM_NAME, "version": gym_version, "reward_mode": reward_mode, }, "steps": atif_steps, "final_metrics": { "total_steps": step_id, "total_wall_time_s": round(total_elapsed, 2), "total_prompt_tokens": usage.get("prompt_tokens", 0), "total_completion_tokens": usage.get("completion_tokens", 0), "total_cost_usd": 0.0, "custom": { "avg_reward": round( sum(r.get("total_reward", 0) for r in results) / max(len(results), 1), 4 ), "total_scenarios": len(results), }, }, } with open(filepath, "w") as f: json.dump(trajectory, f, indent=2, default=str) print(f"\n ATIF trajectory saved: {filepath}") logger.info(f"ATIF trajectory saved to {filepath}") return filepath # ── Scenario Execution ── WS_RETRY_ERRORS = ("ConnectionClosed", "ConnectionClosedOK", "ConnectionClosedError", "sent 1000") MAX_WS_RETRIES = 3 def _run_scenario_with_retries( scenario, runner: AgentRunner, checker, env_client, connect_fn, model: str, ) -> Dict[str, Any]: """Execute a single scenario with WebSocket retry logic. Returns a result dict.""" start = time.time() last_error = None for attempt in range(MAX_WS_RETRIES + 1): try: if attempt > 0: logger.info(f"[{model}] Reconnecting (attempt {attempt + 1}) for {scenario.id}") print(f" [{model}] Reconnecting WebSocket (attempt {attempt + 1})...") try: env_client.__exit__(None, None, None) except Exception: pass time.sleep(2 * attempt) env_client, runner = connect_fn() episode, breakdown = runner.run_scenario(scenario, checker) elapsed = time.time() - start if hasattr(checker, "set_episode"): checker.set_episode(episode) outcome_results = checker.check_all(scenario.outcome_checks) return { "scenario": scenario.id, "total_reward": breakdown.total, "breakdown": breakdown, "steps": len(episode.steps), "elapsed": elapsed, "episode": episode, "outcome_results": outcome_results, } except Exception as e: last_error = e is_ws_error = any(tok in type(e).__name__ or tok in str(e) for tok in WS_RETRY_ERRORS) if is_ws_error and attempt < MAX_WS_RETRIES: logger.warning(f"[{model}] WebSocket error on {scenario.id}: {e}") continue break elapsed = time.time() - start logger.exception(f"[{model}] Scenario {scenario.id} failed") return { "scenario": scenario.id, "total_reward": 0.0, "breakdown": None, "steps": 0, "elapsed": elapsed, "error": str(last_error), } def _run_scenario_n_samples( scenario, n: int, pass_threshold: float, pass_k_values: List[int], model: str, base_url: str, temperature: float, max_tokens: int, reward_mode: str, ) -> Dict[str, Any]: """Run a single scenario n times for pass@k evaluation.""" samples = [] correct_count = 0 def _connect(): client = AutoEnv.from_env(GYM_NAME, base_url=base_url) client.__enter__() xform = VisualMemoryStepTransform() if reward_mode == "openenv" else None rnr = AgentRunner( model=model, env_client=client, temperature=temperature, max_tokens=max_tokens, reward_mode=reward_mode, transform=xform, ) return client, rnr env_client, runner = _connect() checker = VisualMemoryChecker() try: for sample_idx in range(n): result = _run_scenario_with_retries( scenario, runner, checker, env_client, _connect, model, ) gt_score = 0.0 outcome_results = result.get("outcome_results", []) if outcome_results: gt_score = sum(outcome_results) / len(outcome_results) is_success = gt_score >= pass_threshold if is_success: correct_count += 1 result["sample_idx"] = sample_idx result["ground_truth_score"] = gt_score result["success"] = is_success samples.append(result) status = "PASS" if is_success else "FAIL" print( f" [{model}] {scenario.id} sample {sample_idx + 1}/{n}: " f"{status} (gt={gt_score:.2f}, reward={result['total_reward']:.2f}, " f"{result['steps']} steps, {result['elapsed']:.1f}s)" ) finally: try: env_client.__exit__(None, None, None) except Exception: pass pass_at_k = {} for k in pass_k_values: if k <= n: pass_at_k[str(k)] = round(pass_at_k_estimator(n, correct_count, k), 4) best_sample = max(samples, key=lambda s: s.get("total_reward", 0.0)) return { "scenario": scenario.id, "n": n, "c": correct_count, "pass_at_k": pass_at_k, "samples": samples, "best_reward": best_sample.get("total_reward", 0.0), "avg_steps": sum(s.get("steps", 0) for s in samples) / max(len(samples), 1), "total_reward": best_sample.get("total_reward", 0.0), "breakdown": best_sample.get("breakdown"), "steps": best_sample.get("steps", 0), "elapsed": sum(s.get("elapsed", 0) for s in samples), "episode": best_sample.get("episode"), "outcome_results": best_sample.get("outcome_results", []), } # ── Model Workers ── def _run_single_model( model: str, base_url: str, scenarios: list, temperature: float, max_tokens: int, reward_mode: str, run_id: str, save: bool, trajectory: bool, verbose: bool, gym_version: str = "unknown", num_samples: int = 1, pass_k_values: Optional[List[int]] = None, pass_threshold: float = 0.5, parallel_scenarios: int = 1, resume: bool = False, trajectory_format: str = "native", ) -> Dict[str, Any]: model_start = time.time() # Resume: load checkpoint completed_ids: Set[str] = set() prior_results: List[Dict] = [] if resume: completed_ids, prior_results = _load_checkpoint(run_id, model) pending = [s for s in scenarios if s.id not in completed_ids] if not pending: print(f" [{model}] All scenarios already completed (checkpoint)") model_elapsed = time.time() - model_start return {"model": model, "results": prior_results, "elapsed": model_elapsed} if completed_ids: print(f" [{model}] Resuming: {len(pending)} remaining of {len(scenarios)} scenarios") model_results = list(prior_results) results_lock = threading.Lock() is_passk = num_samples > 1 def _connect(): client = AutoEnv.from_env(GYM_NAME, base_url=base_url) client.__enter__() xform = VisualMemoryStepTransform() if reward_mode == "openenv" else None rnr = AgentRunner( model=model, env_client=client, temperature=temperature, max_tokens=max_tokens, reward_mode=reward_mode, transform=xform, ) return client, rnr def _run_one_scenario(scenario, idx, total): """Run a single scenario (optionally n samples) and append to results.""" print(f"\n [{model}] Scenario {idx}/{total}: {scenario.id}") if is_passk: result = _run_scenario_n_samples( scenario, n=num_samples, pass_threshold=pass_threshold, pass_k_values=pass_k_values or [1], model=model, base_url=base_url, temperature=temperature, max_tokens=max_tokens, reward_mode=reward_mode, ) pk = result.get("pass_at_k", {}) pk_str = ", ".join(f"pass@{k}={pk.get(str(k), 0):.2f}" for k in (pass_k_values or [1])) print( f" [{model}] {scenario.id}: {result['c']}/{result['n']} correct → {pk_str}" ) else: env_client, runner = _connect() checker = VisualMemoryChecker() try: result = _run_scenario_with_retries( scenario, runner, checker, env_client, _connect, model, ) reward_str = f"{result['total_reward']:.2f}" if result.get("breakdown") else "ERROR" print( f" [{model}] {scenario.id}: {reward_str} " f"({result['steps']} steps, {result['elapsed']:.1f}s)" ) finally: try: env_client.__exit__(None, None, None) except Exception: pass with results_lock: model_results.append(result) _save_checkpoint( run_id, model, model_results, scenarios, temperature, reward_mode, gym_version, ) return result if parallel_scenarios > 1 and len(pending) > 1: max_concurrent = int(os.getenv("MAX_CONCURRENT_ENVS", "8")) max_workers = min(parallel_scenarios, len(pending), max_concurrent) print(f" [{model}] Running {len(pending)} scenarios with {max_workers} parallel workers") with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {} for idx, scenario in enumerate(pending, len(completed_ids) + 1): future = executor.submit( _run_one_scenario, scenario, idx, len(scenarios), ) futures[future] = scenario for future in as_completed(futures): scenario = futures[future] try: future.result() except Exception as e: print(f" [{model}] {scenario.id}: ERROR - {e}") logger.exception(f"Scenario {scenario.id} failed") with results_lock: model_results.append({ "scenario": scenario.id, "total_reward": 0.0, "breakdown": None, "steps": 0, "elapsed": 0.0, "error": str(e), }) else: if not is_passk: env_client, runner = _connect() checker = VisualMemoryChecker() try: for idx, scenario in enumerate(pending, len(completed_ids) + 1): print(f"\n [{model}] Scenario {idx}/{len(scenarios)}: {scenario.id}") result = _run_scenario_with_retries( scenario, runner, checker, env_client, _connect, model, ) reward_str = f"{result['total_reward']:.2f}" if result.get("breakdown") else "ERROR" print( f" [{model}] {scenario.id}: {reward_str} " f"({result['steps']} steps, {result['elapsed']:.1f}s)" ) model_results.append(result) _save_checkpoint( run_id, model, model_results, scenarios, temperature, reward_mode, gym_version, ) finally: try: env_client.__exit__(None, None, None) except Exception: pass else: for idx, scenario in enumerate(pending, len(completed_ids) + 1): _run_one_scenario(scenario, idx, len(scenarios)) model_elapsed = time.time() - model_start if save: output_path = os.path.join(OUTPUT_DIR, "results", f"{run_id}.md") save_results_to_markdown( results=model_results, model=model, output_path=output_path, total_elapsed=model_elapsed, temperature=temperature, run_id=run_id, reward_mode=reward_mode, gym_version=gym_version, num_samples=num_samples, pass_k_values=pass_k_values, ) if trajectory: save_trajectory( results=model_results, scenarios=scenarios, model=model, temperature=temperature, total_elapsed=model_elapsed, run_id=run_id, reward_mode=reward_mode, gym_version=gym_version, num_samples=num_samples, pass_k_values=pass_k_values, ) if trajectory_format == "atif": save_trajectory_atif( results=model_results, scenarios=scenarios, model=model, temperature=temperature, total_elapsed=model_elapsed, run_id=run_id, reward_mode=reward_mode, gym_version=gym_version, ) return { "model": model, "results": model_results, "elapsed": model_elapsed, } def _run_single_model_detailed( model: str, base_url: str, scenarios: list, temperature: float, max_tokens: int, reward_mode: str, run_id: str, save: bool, trajectory: bool, gym_version: str = "unknown", num_samples: int = 1, pass_k_values: Optional[List[int]] = None, pass_threshold: float = 0.5, resume: bool = False, trajectory_format: str = "native", ) -> Dict[str, Any]: model_start = time.time() results = [] completed_ids: Set[str] = set() if resume: completed_ids, prior = _load_checkpoint(run_id, model) results = list(prior) pending = [s for s in scenarios if s.id not in completed_ids] if not pending: print(f" [{model}] All scenarios already completed (checkpoint)") return {"model": model, "results": results, "elapsed": time.time() - model_start} is_passk = num_samples > 1 if is_passk: for i, scenario in enumerate(pending, len(completed_ids) + 1): divider(f"Scenario {i}/{len(scenarios)}: {scenario.id}") print(f" Prompt: {scenario.prompt[:120]}...") print(f" Samples: {num_samples}") print() result = _run_scenario_n_samples( scenario, n=num_samples, pass_threshold=pass_threshold, pass_k_values=pass_k_values or [1], model=model, base_url=base_url, temperature=temperature, max_tokens=max_tokens, reward_mode=reward_mode, ) pk = result.get("pass_at_k", {}) print(f"\n -- pass@k Results --") print(f" Correct: {result['c']}/{result['n']}") for k in (pass_k_values or [1]): print(f" pass@{k}: {pk.get(str(k), 0.0):.4f}") results.append(result) _save_checkpoint( run_id, model, results, scenarios, temperature, reward_mode, gym_version, ) else: env_client = AutoEnv.from_env(GYM_NAME, base_url=base_url) env_client.__enter__() checker = VisualMemoryChecker() transform = VisualMemoryStepTransform() if reward_mode == "openenv" else None runner = AgentRunner( model=model, env_client=env_client, temperature=temperature, max_tokens=max_tokens, reward_mode=reward_mode, transform=transform, ) try: for i, scenario in enumerate(pending, len(completed_ids) + 1): divider(f"Scenario {i}/{len(scenarios)}: {scenario.id}") print(f" Prompt: {scenario.prompt[:120]}...") print(f" Expected tools: {scenario.expected_tools}") print(f" Max steps: {scenario.max_steps}") print() start = time.time() try: episode, breakdown = runner.run_scenario(scenario, checker) elapsed = time.time() - start print() print(" -- Agent Actions --") for step in episode.steps: status = "OK" if step.success else "FAIL" args_str = _short_json(step.arguments) print(f" [{status}] {step.tool_name}({args_str})") print(f" Steps taken: {len(episode.steps)}") if hasattr(checker, "set_episode"): checker.set_episode(episode) print() print(" -- Ground Truth Verification --") outcome_results = checker.check_all(scenario.outcome_checks) for check, score in zip(scenario.outcome_checks, outcome_results): status = "PASS" if score else "FAIL" label = _check_label(check) print(f" [{status}] {check['type']}: {label}") print() print(" -- Reward Breakdown --") print_breakdown(breakdown) print(f"\n Completed in {elapsed:.1f}s") result = { "scenario": scenario.id, "total_reward": breakdown.total, "breakdown": breakdown, "steps": len(episode.steps), "elapsed": elapsed, "episode": episode, "outcome_results": outcome_results, } results.append(result) except Exception as e: elapsed = time.time() - start print(f"\n ERROR: {e}") logger.exception(f"Scenario {scenario.id} failed") results.append({ "scenario": scenario.id, "total_reward": 0.0, "breakdown": None, "steps": 0, "elapsed": elapsed, "error": str(e), }) _save_checkpoint( run_id, model, results, scenarios, temperature, reward_mode, gym_version, ) finally: env_client.__exit__(None, None, None) logger.info("AutoEnv client disconnected.") model_elapsed = time.time() - model_start if save: output_path = os.path.join(OUTPUT_DIR, "results", f"{run_id}.md") save_results_to_markdown( results=results, model=model, output_path=output_path, total_elapsed=model_elapsed, temperature=temperature, run_id=run_id, reward_mode=reward_mode, gym_version=gym_version, num_samples=num_samples, pass_k_values=pass_k_values, ) print(f"\n Results saved: {output_path}") if trajectory: save_trajectory( results=results, scenarios=scenarios, model=model, temperature=temperature, total_elapsed=model_elapsed, run_id=run_id, reward_mode=reward_mode, gym_version=gym_version, num_samples=num_samples, pass_k_values=pass_k_values, ) if trajectory_format == "atif": save_trajectory_atif( results=results, scenarios=scenarios, model=model, temperature=temperature, total_elapsed=model_elapsed, run_id=run_id, reward_mode=reward_mode, gym_version=gym_version, ) return { "model": model, "results": results, "elapsed": model_elapsed, } # ── Main ── def main(): parser = argparse.ArgumentParser( description="Evaluate an LLM agent against Visual Memory gym scenarios.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Basic (backward compatible) python run_eval.py --model gpt-5.4 --save --trajectory python run_eval.py --model gpt-5.4 --scenario directional_trap_8x8 # pass@k python run_eval.py --model gpt-5.4 --num-samples 10 --pass-k 1,3,8 --save # Parallel scenarios python run_eval.py --model gpt-5.4 --parallel-scenarios 4 --save # Resume interrupted run python run_eval.py --model gpt-5.4 --run-id my_run --resume --save --trajectory # ATIF trajectory format python run_eval.py --model gpt-5.4 --trajectory --trajectory-format atif # Combined python run_eval.py --model gpt-5.4 --num-samples 10 --pass-k 1,3,8 \\ --parallel-scenarios 4 --run-id bench_v1 --resume --save \\ --trajectory --trajectory-format atif """, ) parser.add_argument( "--model", default=os.getenv("LLM_MODEL", "gpt-4o"), help="LiteLLM model string, or comma-separated for parallel mode " "(e.g., 'gpt-5.4' or 'gpt-5.4,claude-sonnet-4-6')", ) parser.add_argument( "--scenario", default=None, help="Run a specific scenario by ID (default: run all 10)", ) parser.add_argument( "--temperature", type=float, default=None, help="LLM sampling temperature (default: 0.0, or 0.8 in pass@k mode)", ) parser.add_argument( "--max-tokens", type=int, default=int(os.getenv("LLM_MAX_TOKENS", "1024")), help="Max tokens per LLM response (default: 1024)", ) parser.add_argument( "--save", action="store_true", default=True, help="Save results to outputs/results/.md (default: on)", ) parser.add_argument( "--no-save", action="store_false", dest="save", help="Disable saving results markdown", ) parser.add_argument( "--trajectory", action="store_true", default=True, help="Save detailed trajectory JSON to outputs/trajectories// (default: on)", ) parser.add_argument( "--no-trajectory", action="store_false", dest="trajectory", help="Disable saving trajectory JSON", ) parser.add_argument( "--trajectory-format", default="native", choices=["native", "atif"], help="Trajectory format: 'native' (default) or 'atif' (Harbor/ATIF v1.4)", ) parser.add_argument( "--run-id", default=None, help="Run identifier (default: auto-generated as run_YYYYMMDD_HHMM)", ) parser.add_argument( "--reward-mode", default="custom", choices=["custom", "openenv"], help="Reward mode: 'custom' (episode-level) or 'openenv' (per-step). Default: custom", ) parser.add_argument( "--parallel-models", type=int, default=1, help="Number of models to evaluate in parallel (default: 1)", ) parser.add_argument( "--parallel", type=int, default=None, help=argparse.SUPPRESS, # hidden alias for --parallel-models ) parser.add_argument( "--parallel-scenarios", type=int, default=1, help="Number of scenarios to run concurrently per model (default: 1)", ) parser.add_argument( "--num-samples", type=int, default=1, help="Number of samples per scenario for pass@k (default: 1 = single run)", ) parser.add_argument( "--pass-k", default="1", help="Comma-separated k values for pass@k (default: '1'). " "Requires --num-samples >= max(k).", ) parser.add_argument( "--pass-threshold", type=float, default=0.5, help="Min ground_truth score to count as 'pass' for pass@k (default: 0.5)", ) parser.add_argument( "--resume", action="store_true", help="Resume a previous run, skipping completed scenarios. Requires --run-id.", ) parser.add_argument( "--verbose", "-v", action="store_true", help="Enable debug logging", ) args = parser.parse_args() # Resolve --parallel alias parallel_models = args.parallel if args.parallel is not None else args.parallel_models models = [m.strip() for m in args.model.split(",") if m.strip()] pass_k_values = [int(k.strip()) for k in args.pass_k.split(",") if k.strip()] # Validate pass@k constraints if args.num_samples > 1: max_k = max(pass_k_values) if args.num_samples < max_k: print(f"Error: --num-samples ({args.num_samples}) must be >= max(--pass-k) ({max_k})") sys.exit(1) # Temperature: auto-set for pass@k mode if not explicitly provided if args.temperature is not None: temperature = args.temperature elif args.num_samples > 1: temperature = 0.8 print(f" pass@k mode: temperature auto-set to 0.8 for diverse sampling") else: temperature = float(os.getenv("LLM_TEMPERATURE", "0.0")) if args.run_id: run_id = args.run_id else: run_id = f"run_{datetime.now(IST).strftime('%Y%m%d_%H%M')}" if args.resume and not args.run_id: print("Warning: --resume without --run-id uses auto-generated ID (no checkpoint to load)") log_level = logging.DEBUG if args.verbose else logging.WARNING logging.basicConfig( level=log_level, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%H:%M:%S", ) if not args.verbose: logging.getLogger("LiteLLM").setLevel(logging.WARNING) logging.getLogger("litellm").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("openenv").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) base_url = _resolve_base_url() scenarios = VISUAL_MEMORY_SCENARIOS if args.scenario: scenarios = [s for s in scenarios if s.id == args.scenario] if not scenarios: available = [s.id for s in VISUAL_MEMORY_SCENARIOS] print(f"Error: Scenario '{args.scenario}' not found. Available: {available}") sys.exit(1) divider("AutoEnv Discovery") print(f" Discovering gym '{GYM_NAME}' via AutoEnv...") env_info = AutoEnv.get_env_info(GYM_NAME) print(f" Found: {env_info['name']} (package: {env_info['package']}, v{env_info['version']})") print(f" Base URL: {base_url} (auto-derived from openenv.yaml)") gym_metadata = _fetch_gym_metadata(base_url) if gym_metadata: print(f"\n -- Environment Metadata (GET {base_url}/metadata) --") print(f" Name: {gym_metadata.get('name', 'N/A')}") print(f" Version: {gym_metadata.get('version', 'N/A')}") print(f" Description: {gym_metadata.get('description', 'N/A')}") else: print(f"\n Warning: Could not fetch /metadata from {base_url} (server may not be running)") is_parallel_models = parallel_models > 1 and len(models) > 1 gym_version = gym_metadata.get("version", "unknown") if gym_metadata else "unknown" mode_parts = [] if is_parallel_models: mode_parts.append(f"{parallel_models} model workers") if args.parallel_scenarios > 1: mode_parts.append(f"{args.parallel_scenarios} scenario workers/model") if args.num_samples > 1: mode_parts.append(f"pass@k (n={args.num_samples}, k={pass_k_values})") if args.resume: mode_parts.append("resume") mode_str = ", ".join(mode_parts) if mode_parts else "Sequential" divider("LLM Evaluation Run") print(f" Gym: {GYM_NAME} (v{gym_version})") print(f" Models: {', '.join(models)}") print(f" Run ID: {run_id}") print(f" Mode: {mode_str}") print(f" Base URL: {base_url}") print(f" Scenarios: {len(scenarios)} of {len(VISUAL_MEMORY_SCENARIOS)}") print(f" Temperature: {temperature}") print(f" Reward Mode: {args.reward_mode}") if args.num_samples > 1: print(f" Samples/scn: {args.num_samples}") print(f" pass@k: {pass_k_values}") print(f" Threshold: {args.pass_threshold}") if args.trajectory: print(f" Traj Format: {args.trajectory_format}") print(f" Output Dir: {OUTPUT_DIR}") total_start = time.time() all_model_results = [] common_kwargs = dict( base_url=base_url, scenarios=scenarios, temperature=temperature, max_tokens=args.max_tokens, reward_mode=args.reward_mode, run_id=run_id, save=args.save, trajectory=args.trajectory, gym_version=gym_version, num_samples=args.num_samples, pass_k_values=pass_k_values, pass_threshold=args.pass_threshold, parallel_scenarios=args.parallel_scenarios, resume=args.resume, trajectory_format=args.trajectory_format, ) if is_parallel_models: divider(f"Parallel Evaluation ({len(models)} models, {parallel_models} workers)") max_workers = min(parallel_models, len(models)) with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {} for idx, model in enumerate(models): if idx > 0: time.sleep(3) future = executor.submit( _run_single_model, model=model, verbose=args.verbose, **common_kwargs, ) futures[future] = model for future in as_completed(futures): model = futures[future] try: result = future.result() all_model_results.append(result) print(f"\n {model} completed in {result['elapsed']:.1f}s") except Exception as e: print(f"\n {model} FAILED: {e}") logger.exception(f"Model {model} failed") all_model_results.append({ "model": model, "results": [], "elapsed": 0.0, "error": str(e), }) else: for model in models: if len(models) > 1: divider(f"Model: {model}") if len(models) == 1 and args.parallel_scenarios <= 1: result = _run_single_model_detailed( model=model, **{k: v for k, v in common_kwargs.items() if k not in ("parallel_scenarios",)}, ) else: result = _run_single_model( model=model, verbose=args.verbose, **common_kwargs, ) all_model_results.append(result) total_elapsed = time.time() - total_start is_passk = args.num_samples > 1 divider("Evaluation Summary") for mr in all_model_results: model = mr["model"] results = mr.get("results", []) model_elapsed = mr.get("elapsed", 0.0) if not results: print(f"\n Model: {model} -- FAILED ({mr.get('error', 'unknown')})") continue print(f"\n Model: {model}") print(f" Time: {model_elapsed:.1f}s") if is_passk: k_headers = " ".join(f"{'pass@' + str(k):>8}" for k in pass_k_values) print(f" {'Scenario':<35} {'n':>4} {'c':>4} {k_headers}") print(f" {'-' * 35} {'-' * 4} {'-' * 4} " + " ".join("-" * 8 for _ in pass_k_values)) all_pass_at = {str(k): [] for k in pass_k_values} for r in results: n = r.get("n", 1) c = r.get("c", 0) pk = r.get("pass_at_k", {}) k_vals = " ".join(f"{pk.get(str(k), 0.0):>8.4f}" for k in pass_k_values) print(f" {r['scenario']:<35} {n:>4} {c:>4} {k_vals}") for k in pass_k_values: all_pass_at[str(k)].append(pk.get(str(k), 0.0)) print(f" {'-' * 35} {'-' * 4} {'-' * 4} " + " ".join("-" * 8 for _ in pass_k_values)) avg_vals = " ".join( f"{sum(all_pass_at[str(k)]) / len(all_pass_at[str(k)]):>8.4f}" if all_pass_at[str(k)] else f"{'N/A':>8}" for k in pass_k_values ) print(f" {'AVERAGE':<35} {'':>4} {'':>4} {avg_vals}") else: print(f" {'Scenario':<35} {'Reward':>8} {'Steps':>6} {'Time':>6}") print(f" {'-' * 35} {'-' * 8} {'-' * 6} {'-' * 6}") for r in results: reward_str = f"{r['total_reward']:.2f}" if r.get("breakdown") else "ERROR" print(f" {r['scenario']:<35} {reward_str:>8} {r['steps']:>6} {r['elapsed']:>5.1f}s") total_reward = sum(r["total_reward"] for r in results) avg_reward = total_reward / len(results) if results else 0.0 print(f" {'-' * 35} {'-' * 8} {'-' * 6} {'-' * 6}") print(f" {'AVERAGE':<35} {avg_reward:>8.2f}") if len(models) > 1: print(f"\n Total time (all models): {total_elapsed:.1f}s") if is_parallel_models: seq_time = sum(mr.get("elapsed", 0.0) for mr in all_model_results) speedup = seq_time / total_elapsed if total_elapsed > 0 else 1.0 print(f" Sequential equivalent: {seq_time:.1f}s") print(f" Speedup: {speedup:.1f}x") if __name__ == "__main__": main()