| """ |
| pyre_evals.py — Proprietary-LLM baseline evaluation for the Pyre fire-evacuation environment. |
| |
| Tests how well a top LLM (GPT-5.4-nano, Claude 3.5, etc.) performs at navigating a burning building |
| to an exit under partial observability, time pressure, and a spreading-fire hazard. |
| |
| Because the model has never been trained on this task, its behaviour acts as a clean |
| diagnostic signal: |
| • High evacuated rate → the environment reward signal, observations, and action space are |
| legible to a capable model. |
| • Systematic failures → reveal bugs or design flaws (e.g. narrative is ambiguous, exits |
| are unreachable, action parsing is fragile, damage is too fast, etc.). |
| • Low think_rate → the prompt isn't eliciting step-by-step reasoning. |
| • Low parse_rate → the action schema is hard to follow. |
| |
| Metrics per episode |
| ─────────────────── |
| evacuated — 1 if agent reached a safe exit, 0 otherwise (primary signal) |
| cause_of_end — "evacuated" | "death" | "timeout" |
| final_health — agent health at episode end (0–100) |
| total_reward — cumulative step + episode-end reward |
| mean_step_reward — total_reward / steps_taken |
| steps_taken — steps used (compare to max_steps for the difficulty) |
| think_rate — fraction of steps where model emitted <think>…</think> |
| parse_rate — fraction of steps where a valid action was extracted |
| format_score_avg — mean format quality (0.0–1.0) across all steps |
| |
| Usage |
| ───── |
| # 1. Add keys to pyre_env/.env (never commit this file): |
| # |
| # OPENAI_API_KEY=sk-... |
| # ANTHROPIC_API_KEY=sk-ant-... |
| # |
| # 2. Start the Pyre server (from pyre_env/): |
| # uv run server |
| # |
| # 3. Run the evaluator (from pyre_env/): |
| python evals.py --env-url http://localhost:8000 |
| |
| # Multiple difficulties, more seeds: |
| python pyre_evals.py --difficulties easy medium hard --seeds 5 |
| |
| # Filter to a single difficulty, verbose logging: |
| python pyre_evals.py --difficulties medium --seeds 3 --verbose |
| |
| # Custom model and temperature: |
| python pyre_evals.py --model gpt-5.4-nano --temperature 0.0 |
| |
| # Anthropic provider: |
| python pyre_evals.py --provider anthropic --model claude-3-5-sonnet-20241022 |
| |
| # RITS endpoint: |
| python pyre_evals.py --provider rits --model meta-llama/llama-3-70b-instruct |
| |
| # Save per-episode LLM traces for debugging: |
| python pyre_evals.py --debug --verbose |
| |
| Dependencies |
| ──────────── |
| pip install langchain-openai langchain-anthropic requests python-dotenv |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import csv |
| import json |
| import logging |
| import re |
| import textwrap |
| from collections import defaultdict |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| import requests |
| from pyre_env import PyreEnv, PyreAction |
|
|
| try: |
| from dotenv import load_dotenv |
| except ImportError: |
| raise SystemExit( |
| "python-dotenv is required. Install it with: pip install python-dotenv" |
| ) |
|
|
| |
| |
| |
| load_dotenv() |
|
|
| import os |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(message)s", |
| datefmt="%H:%M:%S", |
| ) |
| log = logging.getLogger("pyre_evals") |
|
|
|
|
| |
| |
| |
|
|
| DIFFICULTY_CONFIGS: List[Dict[str, Any]] = [ |
| { |
| "difficulty": "easy", |
| "max_steps": 200, |
| "description": "1 fire source · slow spread · calm wind · high humidity", |
| }, |
| { |
| "difficulty": "medium", |
| "max_steps": 150, |
| "description": "2–4 fire sources · moderate spread · any wind · moderate humidity", |
| }, |
| { |
| "difficulty": "hard", |
| "max_steps": 100, |
| "description": "3–5 fire sources · fast spread · always windy · low humidity", |
| }, |
| ] |
|
|
| DIFFICULTY_MAP: Dict[str, Dict[str, Any]] = {d["difficulty"]: d for d in DIFFICULTY_CONFIGS} |
|
|
|
|
| |
| |
| |
|
|
| SYSTEM_PROMPT = textwrap.dedent(""" |
| You are an agent trapped inside a burning building. |
| Your goal is to navigate to an EXIT before your health reaches zero or time runs out. |
| |
| ENVIRONMENT RULES |
| - You have partial vision: you cannot see through walls or dense smoke. |
| - Fire and smoke spread each step — do NOT linger in hazardous areas. |
| - Closing a door adjacent to active fire slows its spread (strategic move). |
| - Your health drains faster in moderate/heavy smoke and on fire cells. |
| - Exits may be BLOCKED if fire burns directly on them — check available hints. |
| |
| OUTPUT FORMAT (STRICT) |
| You MUST reason inside <think>...</think> tags first, then emit EXACTLY ONE JSON object. |
| Output NOTHING else — no extra text, no markdown fences, no second JSON block. |
| |
| <think> |
| Brief reasoning: what can I see, where is danger, what is the safest next move? |
| </think> |
| {"action": "move", "direction": "north"} |
| |
| AVAILABLE ACTIONS |
| - move : {"action": "move", "direction": "north|south|east|west"} |
| - look : {"action": "look", "direction": "north|south|east|west"} ← scan 5 cells ahead |
| - door : {"action": "door", "target_id": "door_X", "door_state": "open|close"} |
| - wait : {"action": "wait"} |
| |
| REWARD SIGNAL (shown in history after each step) |
| - Positive reward → you moved closer to an exit or played a smart move. |
| - Negative reward → you moved into danger, stalled, or wasted a step. |
| - Use the reward trend to judge if your current direction is working. |
| |
| STRATEGY TIPS |
| - Use `look` to scout a direction before entering an unknown corridor. |
| - Closing a door between you and fire buys time; re-open when clear. |
| - Prefer moves that increase reward — progress toward the exit is rewarded. |
| - If smoke is heavy, back away; your health drains fast in thick smoke. |
| - Door IDs (e.g. door_3) appear in the Visible objects list — use them with the door action. |
| """).strip() |
|
|
|
|
| |
| |
| |
|
|
| def _build_user_message(obs: Dict[str, Any], history: List[str]) -> str: |
| """Convert a raw observation dict + history into the LLM user message.""" |
|
|
| narrative = obs.get("narrative", "(no narrative)") |
| |
| |
| narrative = re.sub(r"\nAvailable actions:.*$", "", narrative, flags=re.MULTILINE) |
|
|
| |
| health = obs.get("agent_health", "?") |
| health_st = obs.get("health_status", "?") |
| location = obs.get("location_label", "?") |
| smoke = obs.get("smoke_level", "none") |
| fire_vis = obs.get("fire_visible", False) |
| fire_dir = obs.get("fire_direction") or "none" |
| wind = obs.get("wind_dir", "CALM") |
| elapsed = obs.get("elapsed_steps", 0) |
| last_fb = obs.get("last_action_feedback", "") |
| blocked_exits = obs.get("blocked_exit_ids", []) |
| visible_objs = obs.get("visible_objects", []) |
| audible = obs.get("audible_signals", []) |
|
|
| status_line = ( |
| f"Health: {health:.1f} | Status: {health_st} | Location: {location}\n" |
| f"Smoke: {smoke} | Fire visible: {fire_vis}" |
| + (f" (direction: {fire_dir})" if fire_vis else "") |
| + f"\nWind: {wind} | Steps elapsed: {elapsed}" |
| ) |
|
|
| if blocked_exits: |
| status_line += f"\nBLOCKED exits (fire on them): {', '.join(blocked_exits)}" |
|
|
| if visible_objs: |
| obj_strs = [ |
| f"{o.get('type','?')} '{o.get('id','?')}' {o.get('relative_pos','?')}" |
| + (f" [{o.get('state','')}]" if o.get("state") else "") |
| for o in visible_objs |
| ] |
| status_line += f"\nVisible objects: {'; '.join(obj_strs)}" |
|
|
| if audible: |
| status_line += f"\nSounds: {'; '.join(audible)}" |
|
|
| |
| history_str = "" |
| if history: |
| recent = history[-8:] |
| history_str = "=== RECENT ACTION HISTORY (action → feedback → reward → health) ===\n" + "\n".join(recent) + "\n\n" |
|
|
| return ( |
| f"=== CURRENT OBSERVATION ===\n{narrative}\n\n" |
| f"=== STATUS ===\n{status_line}\n\n" |
| + history_str |
| + "What is your next action? Respond with <think>...</think> then a single JSON action." |
| ) |
|
|
|
|
| |
| |
| |
|
|
| _VALID_ACTIONS = {"move", "door", "look", "wait"} |
| _VALID_DIRECTIONS = {"north", "south", "east", "west"} |
| _VALID_DOOR_STATES = {"open", "close"} |
|
|
| _FALLBACK_ACTION = {"action": "wait"} |
|
|
|
|
| def _validate_pyre_action(blob: Dict[str, Any]) -> Optional[Dict[str, Any]]: |
| """Return a sanitised action dict or None if the blob is unusable.""" |
| action = blob.get("action", "").strip().lower() |
| if action not in _VALID_ACTIONS: |
| return None |
|
|
| out: Dict[str, Any] = {"action": action} |
|
|
| if action in ("move", "look"): |
| direction = str(blob.get("direction", "")).strip().lower() |
| if direction not in _VALID_DIRECTIONS: |
| return None |
| out["direction"] = direction |
|
|
| elif action == "door": |
| tid = blob.get("target_id", "") or blob.get("target", "") |
| ds = str(blob.get("door_state", "")).strip().lower() |
| if not tid or ds not in _VALID_DOOR_STATES: |
| return None |
| out["target_id"] = str(tid) |
| out["door_state"] = ds |
|
|
| |
|
|
| return out |
|
|
|
|
| def _parse_pyre_action(text: str) -> Tuple[Dict[str, Any], float]: |
| """Extract a Pyre action from raw LLM text. |
| |
| Returns (action_dict, format_score) where format_score reflects output quality: |
| 1.0 — valid JSON + <think> tags |
| 0.7 — valid JSON, no <think> |
| 0.4 — partial JSON rescued via regex |
| 0.1 — action keyword found in raw text (last resort) |
| 0.0 — completely unparseable → {"action": "wait"} fallback |
| """ |
| has_think = "<think>" in text and "</think>" in text |
|
|
| |
| start = text.find("{") |
| end = text.rfind("}") |
| if start != -1 and end > start: |
| try: |
| blob = json.loads(text[start:end + 1]) |
| if isinstance(blob, dict): |
| action = _validate_pyre_action(blob) |
| if action is not None: |
| return action, (1.0 if has_think else 0.7) |
| except json.JSONDecodeError: |
| pass |
|
|
| |
| for m in re.finditer(r'\{[^{}]+\}', text): |
| try: |
| blob = json.loads(m.group()) |
| if isinstance(blob, dict) and "action" in blob: |
| action = _validate_pyre_action(blob) |
| if action is not None: |
| return action, 0.4 |
| except json.JSONDecodeError: |
| continue |
|
|
| |
| lower = text.lower() |
| |
| for d in _VALID_DIRECTIONS: |
| if f"move {d}" in lower or f'move.*{d}' in lower: |
| return {"action": "move", "direction": d}, 0.1 |
| |
| for d in _VALID_DIRECTIONS: |
| if f"look {d}" in lower: |
| return {"action": "look", "direction": d}, 0.1 |
| |
| door_m = re.search(r'door[_\s]*([\w]+)', lower) |
| if door_m: |
| tid = door_m.group(1) |
| ds = "close" if "clos" in lower else "open" |
| return {"action": "door", "target_id": tid, "door_state": ds}, 0.1 |
| |
| if "wait" in lower: |
| return {"action": "wait"}, 0.1 |
|
|
| |
| return dict(_FALLBACK_ACTION), 0.0 |
|
|
|
|
| |
| |
| |
|
|
| def _require_env(key: str, hint: str) -> str: |
| """Return the value of an env var or exit with a helpful message.""" |
| val = os.getenv(key, "") |
| if not val: |
| raise SystemExit( |
| f"Missing required environment variable: {key}\n" |
| f"Add it to your .env file: {key}={hint}" |
| ) |
| return val |
|
|
|
|
| def _build_llm(provider: str, model: str, temperature: float): |
| """Construct a LangChain chat model; credentials come from .env via load_dotenv().""" |
| if provider == "openai": |
| from langchain_openai import ChatOpenAI |
| api_key = _require_env("OPENAI_API_KEY", "sk-...") |
| return ChatOpenAI( |
| model=model, |
| temperature=temperature, |
| max_retries=3, |
| api_key=api_key, |
| ) |
|
|
| if provider == "anthropic": |
| from langchain_anthropic import ChatAnthropic |
| api_key = _require_env("ANTHROPIC_API_KEY", "sk-ant-...") |
| return ChatAnthropic( |
| model=model, |
| temperature=temperature, |
| max_retries=3, |
| api_key=api_key, |
| ) |
|
|
| if provider == "rits": |
| import httpx |
| from langchain_openai import ChatOpenAI |
| base_url = _require_env("RITS_URL", "https://inference.example.com/v1") |
| rits_key = _require_env("RITS_API_KEY", "<your-rits-key>") |
| |
| |
| return ChatOpenAI( |
| model=model, |
| temperature=temperature, |
| max_retries=2, |
| api_key="/", |
| base_url=base_url, |
| default_headers={"RITS_API_KEY": rits_key}, |
| http_client=httpx.Client(trust_env=False), |
| http_async_client=httpx.AsyncClient(trust_env=False), |
| ) |
|
|
| raise SystemExit(f"Unknown provider '{provider}'. Choose from: openai, anthropic, rits") |
|
|
|
|
| |
| |
| |
|
|
| def run_episode( |
| llm, |
| env_url: str, |
| seed: int, |
| difficulty: str, |
| max_steps: int, |
| debug_dir: Optional[Path] = None, |
| ) -> Dict[str, Any]: |
| """Run one full episode using the PyreEnv sync client (WebSocket-based, stateful). |
| |
| The episode is self-contained: reset → loop(observe → LLM → act) → done. |
| Uses PyreEnv.sync() so the WebSocket session persists across all steps of |
| the episode — the HTTP /step endpoint is stateless and cannot be used for |
| multi-step rollouts. |
| """ |
| from langchain_core.messages import HumanMessage, SystemMessage |
|
|
| step_rewards: List[float] = [] |
| history: List[str] = [] |
| llm_responses_log: List[str] = [] |
| steps_taken = 0 |
| think_steps = 0 |
| parsed_steps = 0 |
| fmt_scores: List[float] = [] |
| cause_of_end = "timeout" |
| final_health = 0.0 |
| agent_evacuated = False |
|
|
| try: |
| with PyreEnv(base_url=env_url).sync() as env: |
| |
| result = env.reset(difficulty=difficulty, seed=seed) |
| obs = result.observation |
| done = result.done |
|
|
| |
| if debug_dir is not None: |
| try: |
| state_resp = requests.get(f"{env_url}/state", timeout=10) |
| if state_resp.ok: |
| debug_dir.mkdir(parents=True, exist_ok=True) |
| (debug_dir / f"{difficulty}_seed{seed}_init_state.json").write_text( |
| json.dumps(state_resp.json(), indent=2) |
| ) |
| except Exception as exc: |
| log.warning("Could not save initial state: %s", exc) |
|
|
| |
| for _step in range(max_steps): |
| if done: |
| break |
|
|
| |
| obs_dict = obs.model_dump() |
|
|
| user_msg = _build_user_message(obs_dict, history) |
| messages = [ |
| SystemMessage(content=SYSTEM_PROMPT), |
| HumanMessage(content=user_msg), |
| ] |
|
|
| |
| try: |
| response = llm.invoke(messages) |
| completion_text = response.content |
| except Exception as exc: |
| log.warning("LLM call failed at step %d: %s", _step + 1, exc) |
| step_rewards.append(-0.20) |
| break |
|
|
| llm_responses_log.append(f"## Step {_step + 1}\n{completion_text}\n") |
|
|
| has_think = "<think>" in completion_text and "</think>" in completion_text |
| if has_think: |
| think_steps += 1 |
|
|
| steps_taken += 1 |
|
|
| |
| action_dict, fmt_score = _parse_pyre_action(completion_text) |
| fmt_scores.append(fmt_score) |
|
|
| if fmt_score > 0.0: |
| parsed_steps += 1 |
| else: |
| log.debug(" step=%d UNPARSEABLE — using wait fallback", _step + 1) |
|
|
| log.debug( |
| " step=%d fmt=%.1f action=%s", |
| _step + 1, fmt_score, json.dumps(action_dict), |
| ) |
|
|
| |
| try: |
| result = env.step(PyreAction(**action_dict)) |
| obs = result.observation |
| step_reward = float(result.reward or 0.0) |
| done = result.done |
| except Exception as exc: |
| log.warning("Step failed at step %d: %s", _step + 1, exc) |
| step_rewards.append(-0.20) |
| break |
|
|
| |
| fmt_penalty = (1.0 - fmt_score) * -0.10 |
| step_rewards.append(step_reward + fmt_penalty) |
|
|
| feedback = obs.last_action_feedback or "" |
| history.append( |
| f"Step {_step + 1}: {json.dumps(action_dict)}" |
| + (f"\n → {feedback}" if feedback else "") |
| + f"\n reward: {step_reward:+.3f} health: {obs.agent_health:.1f}" |
| ) |
|
|
| |
| agent_evacuated = obs.agent_evacuated |
| final_health = float(obs.agent_health) |
|
|
| except Exception as exc: |
| log.error("Episode failed (difficulty=%s seed=%d): %s", difficulty, seed, exc) |
| return { |
| "difficulty": difficulty, "seed": seed, |
| "error": str(exc), "evacuated": 0, |
| } |
|
|
| |
| if agent_evacuated: |
| cause_of_end = "evacuated" |
| elif final_health <= 0.0: |
| cause_of_end = "death" |
| else: |
| cause_of_end = "timeout" |
|
|
| |
| if debug_dir is not None and llm_responses_log: |
| try: |
| debug_dir.mkdir(parents=True, exist_ok=True) |
| (debug_dir / f"{difficulty}_seed{seed}_llm_trace.md").write_text( |
| "\n".join(llm_responses_log) |
| ) |
| except Exception as exc: |
| log.warning("Could not save LLM trace: %s", exc) |
|
|
| total_reward = sum(step_rewards) |
| mean_step_reward = total_reward / max(len(step_rewards), 1) |
| think_rate = think_steps / max(steps_taken, 1) |
| parse_rate = parsed_steps / max(steps_taken, 1) |
| fmt_score_avg = sum(fmt_scores) / max(len(fmt_scores), 1) |
|
|
| return { |
| "difficulty": difficulty, |
| "seed": seed, |
| "evacuated": int(agent_evacuated), |
| "cause_of_end": cause_of_end, |
| "final_health": round(final_health, 2), |
| "total_reward": round(total_reward, 4), |
| "mean_step_reward": round(mean_step_reward, 4), |
| "steps_taken": steps_taken, |
| "max_steps": max_steps, |
| "think_rate": round(think_rate, 4), |
| "parse_rate": round(parse_rate, 4), |
| "format_score_avg": round(fmt_score_avg, 4), |
| "error": None, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def _avg(vals: List[float]) -> float: |
| return round(sum(vals) / len(vals), 4) if vals else 0.0 |
|
|
|
|
| def _pct(vals: List[float]) -> str: |
| return f"{_avg(vals) * 100:.0f}%" |
|
|
|
|
| def print_summary(results: List[Dict[str, Any]], model_name: str) -> None: |
| """Print a per-difficulty summary table and a short diagnosis section.""" |
| by_diff: Dict[str, List[Dict]] = defaultdict(list) |
| for r in results: |
| if not r.get("error"): |
| by_diff[r["difficulty"]].append(r) |
|
|
| |
| col = "{:<10} {:>8} {:>10} {:>10} {:>11} {:>9} {:>9} {:>9}" |
| header = col.format( |
| "Difficulty", "Evac%", "AvgHealth", "TotalRew", |
| "MeanStpRew", "Steps/Max", "Think%", "Parse%", |
| ) |
| sep = "=" * len(header) |
|
|
| print(f"\n{'=' * len(header)}") |
| print(f" PYRE EVAL — model: {model_name}") |
| print(sep) |
| print(header) |
| print("-" * len(header)) |
|
|
| diagnosis: List[str] = [] |
|
|
| for cfg in DIFFICULTY_CONFIGS: |
| diff = cfg["difficulty"] |
| rows = by_diff.get(diff, []) |
| if not rows: |
| print(col.format(diff, "n/a", "n/a", "n/a", "n/a", "n/a", "n/a", "n/a")) |
| continue |
|
|
| evac_rate = _avg([r["evacuated"] for r in rows]) |
| avg_health = _avg([r["final_health"] for r in rows]) |
| avg_total_rew = _avg([r["total_reward"] for r in rows]) |
| avg_step_rew = _avg([r["mean_step_reward"] for r in rows]) |
| avg_steps = _avg([r["steps_taken"] for r in rows]) |
| avg_think = _avg([r["think_rate"] for r in rows]) |
| avg_parse = _avg([r["parse_rate"] for r in rows]) |
| max_steps = rows[0]["max_steps"] |
|
|
| cause_counts: Dict[str, int] = defaultdict(int) |
| for r in rows: |
| cause_counts[r["cause_of_end"]] += 1 |
|
|
| print(col.format( |
| diff, |
| f"{evac_rate * 100:.0f}%", |
| f"{avg_health:.1f}", |
| f"{avg_total_rew:+.2f}", |
| f"{avg_step_rew:+.3f}", |
| f"{avg_steps:.0f}/{max_steps}", |
| f"{avg_think * 100:.0f}%", |
| f"{avg_parse * 100:.0f}%", |
| )) |
|
|
| |
| death_n = cause_counts.get("death", 0) |
| timeout_n = cause_counts.get("timeout", 0) |
| evac_n = cause_counts.get("evacuated", 0) |
| n = len(rows) |
|
|
| if avg_parse < 0.80: |
| diagnosis.append( |
| f"[{diff}] Low parse rate ({avg_parse*100:.0f}%) — " |
| "action schema may be unclear; consider simplifying JSON keys." |
| ) |
| if avg_think < 0.50: |
| diagnosis.append( |
| f"[{diff}] Low think rate ({avg_think*100:.0f}%) — " |
| "model is skipping <think> tags; prompt may need stronger CoT instruction." |
| ) |
| if evac_n == 0 and diff == "easy": |
| diagnosis.append( |
| "[easy] Zero evacuations on easy difficulty — " |
| "check exit reachability, narrative clarity, or BFS distance from spawn." |
| ) |
| if death_n / n > 0.80: |
| diagnosis.append( |
| f"[{diff}] {death_n}/{n} episodes end in death — " |
| "damage rates may be too high or smoke/fire proximity warnings are not clear." |
| ) |
| if timeout_n / n > 0.80: |
| diagnosis.append( |
| f"[{diff}] {timeout_n}/{n} episodes time out — " |
| "model may be looping/waiting; exits might be hard to locate from narratives." |
| ) |
| if avg_step_rew < -0.3 and evac_rate < 0.1: |
| diagnosis.append( |
| f"[{diff}] Very negative mean step reward ({avg_step_rew:+.3f}) with near-zero " |
| "success — model may be actively moving into fire; check DangerPenalty trigger conditions." |
| ) |
|
|
| print(sep + "\n") |
|
|
| |
| if diagnosis: |
| print("DIAGNOSTICS (environment design signals)") |
| print("-" * 50) |
| for d in diagnosis: |
| print(f" • {d}") |
| print() |
| else: |
| print(" No red-flag patterns detected — environment appears legible to this model.\n") |
|
|
|
|
| def save_csv(results: List[Dict[str, Any]], path: Path) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| fieldnames = [ |
| "difficulty", "seed", "evacuated", "cause_of_end", |
| "final_health", "total_reward", "mean_step_reward", |
| "steps_taken", "max_steps", |
| "think_rate", "parse_rate", "format_score_avg", "error", |
| ] |
| with open(path, "w", newline="") as f: |
| writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore") |
| writer.writeheader() |
| writer.writerows(results) |
| log.info("Results saved → %s", path) |
|
|
|
|
| |
| |
| |
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser( |
| description="Proprietary-LLM baseline eval on the Pyre fire-evacuation environment", |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter, |
| ) |
| parser.add_argument( |
| "--env-url", default="http://localhost:8000", |
| help="Pyre server base URL", |
| ) |
| parser.add_argument( |
| "--provider", default="openai", |
| choices=["openai", "anthropic", "rits"], |
| help="LLM provider", |
| ) |
| parser.add_argument( |
| "--model", default="gpt-5.4-nano", |
| help="Model name / slug for the chosen provider", |
| ) |
| parser.add_argument( |
| "--temperature", type=float, default=0.0, |
| help="Sampling temperature (0 = deterministic greedy)", |
| ) |
| parser.add_argument( |
| "--difficulties", nargs="+", |
| default=["easy", "medium", "hard"], |
| choices=["easy", "medium", "hard"], |
| help="Difficulty levels to evaluate", |
| ) |
| parser.add_argument( |
| "--seeds", type=int, default=3, |
| help="Number of random seeds per difficulty level", |
| ) |
| parser.add_argument( |
| "--seed-start", type=int, default=1, |
| help="Starting seed value (seeds = seed_start, seed_start+1, ...)", |
| ) |
| parser.add_argument( |
| "--output-dir", default="./outputs/pyre_evals", |
| help="Directory for CSV results and debug traces", |
| ) |
| parser.add_argument( |
| "--debug", action="store_true", |
| help="Save per-episode LLM traces and initial states to --output-dir/debug/", |
| ) |
| parser.add_argument( |
| "--verbose", action="store_true", |
| help="Show DEBUG-level per-step logs", |
| ) |
| args = parser.parse_args() |
|
|
| if args.verbose: |
| log.setLevel(logging.DEBUG) |
|
|
| |
| diffs_to_run = [d for d in DIFFICULTY_CONFIGS if d["difficulty"] in args.difficulties] |
| if not diffs_to_run: |
| raise SystemExit( |
| f"No matching difficulties. Available: {[d['difficulty'] for d in DIFFICULTY_CONFIGS]}" |
| ) |
|
|
| log.info("Model : %s (provider=%s, temperature=%.1f)", args.model, args.provider, args.temperature) |
| log.info("Env URL : %s", args.env_url) |
| log.info("Difficulties: %s", [d["difficulty"] for d in diffs_to_run]) |
| log.info("Seeds : %d (%d–%d)", args.seeds, args.seed_start, args.seed_start + args.seeds - 1) |
|
|
| |
| try: |
| health = requests.get(f"{args.env_url}/health", timeout=5) |
| health.raise_for_status() |
| log.info("Server : healthy ✓") |
| except Exception as exc: |
| raise SystemExit(f"Server not reachable at {args.env_url}: {exc}") |
|
|
| |
| llm = _build_llm(args.provider, args.model, args.temperature) |
|
|
| |
| results: List[Dict[str, Any]] = [] |
| seeds = list(range(args.seed_start, args.seed_start + args.seeds)) |
| total = len(diffs_to_run) * len(seeds) |
| done_count = 0 |
| debug_dir = Path(args.output_dir) / "debug" if args.debug else None |
|
|
| for diff_cfg in diffs_to_run: |
| difficulty = diff_cfg["difficulty"] |
| max_steps = diff_cfg["max_steps"] |
|
|
| for seed in seeds: |
| done_count += 1 |
| log.info( |
| "[%d/%d] difficulty=%-8s seed=%d", |
| done_count, total, difficulty, seed, |
| ) |
|
|
| result = run_episode( |
| llm=llm, |
| env_url=args.env_url, |
| seed=seed, |
| difficulty=difficulty, |
| max_steps=max_steps, |
| debug_dir=debug_dir, |
| ) |
| results.append(result) |
|
|
| if result.get("error"): |
| log.warning(" ERROR: %s", result["error"]) |
| else: |
| evac_sym = "✓ EVACUATED" if result["evacuated"] else f"✗ {result['cause_of_end'].upper()}" |
| log.info( |
| " %-14s health=%5.1f reward=%+.2f steps=%d/%d think=%d%% parse=%d%%", |
| evac_sym, |
| result["final_health"], |
| result["total_reward"], |
| result["steps_taken"], |
| max_steps, |
| result["think_rate"] * 100, |
| result["parse_rate"] * 100, |
| ) |
|
|
| |
| print_summary(results, model_name=f"{args.provider}/{args.model}") |
|
|
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| model_slug = re.sub(r"[^a-zA-Z0-9_-]", "_", args.model) |
| csv_path = Path(args.output_dir) / f"pyre_{model_slug}_{timestamp}.csv" |
| save_csv(results, csv_path) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|