""" OpenAI-powered baseline runner for the warehouse fulfillment environment. """ from __future__ import annotations import json import os from typing import Any, Dict, List from .env import WarehouseFulfillmentEnv from .graders import grade_episode from .models import BaselineCommand, WarehouseObservation, WarehouseState, model_to_dict from .tasks import TASKS try: from openai import OpenAI except ImportError: OpenAI = None SYSTEM_PROMPT = """You control a warehouse fulfillment robot. Return exactly one JSON object with: - command: one of turn_left, turn_right, move_forward, scan_bin, pick_item, pack_item, recharge, rest, wait - rationale: a short sentence Objective: - Complete the customer order as efficiently as possible. - Use scans before picks when the task requires verified bins. - Recharge before battery depletion if needed. - Avoid invalid actions and unnecessary wandering. Advanced mechanics (active on harder tasks): - Obstacles: some cells are impassable. If front_cell says "obstacle", turn to find another route. - Item weight: items have weight. If an item exceeds your carry capacity, you cannot pick it. Heavier items drain more battery while moving. - Stamina: movement costs stamina. When stamina hits 0, movement costs double battery. Use the "rest" action at the rest area to restore stamina. - Money: packing correct items earns money; wrong packs lose money. Hit the profit target if set. """ def _observation_prompt(observation: WarehouseObservation, state: WarehouseState) -> str: return json.dumps( { "task_id": state.task_id, "difficulty": state.difficulty, "mission": observation.mission, "observation": model_to_dict(observation), "state_summary": { "step_count": state.step_count, "max_steps": state.max_steps, "battery_level": state.battery_level, "carrying": state.carrying, "scanned_bins": state.scanned_bins, "completion_ratio": state.completion_ratio, "recent_actions": state.action_history[-6:], }, }, indent=2, sort_keys=True, ) def _response_schema() -> Dict[str, Any]: return { "type": "json_schema", "name": "warehouse_action", "strict": True, "schema": BaselineCommand.model_json_schema(), } def _pick_action(client: Any, model: str, observation: WarehouseObservation, state: WarehouseState) -> BaselineCommand: response = client.responses.create( model=model, input=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": _observation_prompt(observation, state)}, ], text={"format": _response_schema()}, ) content = getattr(response, "output_text", "").strip() if not content: raise RuntimeError("OpenAI response did not contain text output.") payload = json.loads(content) return BaselineCommand(**payload) def run_task(task_id: str, model: str, seed: int = 7) -> Dict[str, float]: env = WarehouseFulfillmentEnv(task_id=task_id, seed=seed) observation = env.reset(task_id=task_id, seed=seed) client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) done = False while not done: state = env.state() action = _pick_action(client, model, observation, state) observation, _reward, done, _info = env.step(action) final_state = env.state() return { "task_id": task_id, "reward": round(final_state.total_reward, 4), "score": grade_episode(final_state), "steps": float(final_state.step_count), "success": 1.0 if final_state.success else 0.0, } def run_baseline(model: str | None = None, seed: int = 7, seeds: List[int] | None = None) -> List[Dict[str, float]]: """Run baseline evaluation across one or multiple seeds. Args: model: Model name to use (defaults to OPENAI_MODEL env var) seed: Single seed to use (ignored if seeds is provided) seeds: List of seeds for multi-seed evaluation (e.g., [7, 42, 123, 456, 789]) Returns: List of results, one per task. If multiple seeds, results include mean/std/min/max. """ if OpenAI is None: raise RuntimeError("The openai package is required. Install project dependencies first.") if not os.environ.get("OPENAI_API_KEY"): raise RuntimeError("OPENAI_API_KEY is not set.") resolved_model = model or os.environ.get("OPENAI_MODEL", "gpt-4.1-mini") eval_seeds = seeds if seeds is not None else [seed] if len(eval_seeds) == 1: # Single-seed evaluation (backward compatible) return [run_task(task_id, model=resolved_model, seed=eval_seeds[0]) for task_id in TASKS] # Multi-seed evaluation results = [] for task_id in TASKS: task_results = [] for s in eval_seeds: result = run_task(task_id, model=resolved_model, seed=s) task_results.append(result) # Aggregate statistics scores = [r["score"] for r in task_results] rewards = [r["reward"] for r in task_results] steps = [r["steps"] for r in task_results] successes = [r["success"] for r in task_results] results.append({ "task_id": task_id, "score": round(sum(scores) / len(scores), 4), "score_std": round((sum((s - sum(scores)/len(scores))**2 for s in scores) / len(scores))**0.5, 4), "score_min": round(min(scores), 4), "score_max": round(max(scores), 4), "reward": round(sum(rewards) / len(rewards), 4), "steps": round(sum(steps) / len(steps), 1), "success": round(sum(successes) / len(successes), 2), "num_seeds": len(eval_seeds), "seeds": eval_seeds, }) return results def format_report(results: List[Dict[str, float]], model: str) -> str: lines = [f"model | {model}"] # Check if multi-seed results (has score_std key) is_multiseed = "score_std" in results[0] if results else False if is_multiseed: lines.append(f"seeds | {results[0]['num_seeds']} seeds: {results[0]['seeds']}") lines.append("task_id | score (±std) | min | max | reward | steps | success") for result in results: lines.append( f"{result['task_id']} | {result['score']:.4f} (±{result['score_std']:.4f}) | " f"{result['score_min']:.4f} | {result['score_max']:.4f} | " f"{result['reward']:.4f} | {result['steps']:.1f} | {result['success']:.2f}" ) else: lines.append("task_id | score | reward | steps | success") for result in results: lines.append( f"{result['task_id']} | {result['score']:.4f} | " f"{result['reward']:.4f} | {int(result['steps'])} | {int(result['success'])}" ) mean_score = sum(result["score"] for result in results) / len(results) lines.append(f"mean_score | {mean_score:.4f}") return "\n".join(lines) if __name__ == "__main__": selected_model = os.environ.get("OPENAI_MODEL", "gpt-4.1-mini") # Multi-seed evaluation: set EVAL_SEEDS env var or use default eval_seeds_str = os.environ.get("EVAL_SEEDS", "7,42,123,456,789") eval_seeds = [int(s.strip()) for s in eval_seeds_str.split(",")] print(f"Running baseline with {len(eval_seeds)} seeds: {eval_seeds}") results = run_baseline(model=selected_model, seeds=eval_seeds) print(format_report(results, model=selected_model))