Spaces:
Sleeping
Sleeping
| """ | |
| OpenAI-powered baseline runner for the warehouse fulfillment environment. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| from typing import Any, Dict, List | |
| from .env import WarehouseFulfillmentEnv | |
| from .graders import grade_episode | |
| from .models import BaselineCommand, WarehouseObservation, WarehouseState, model_to_dict | |
| from .tasks import TASKS | |
| try: | |
| from openai import OpenAI | |
| except ImportError: | |
| OpenAI = None | |
| SYSTEM_PROMPT = """You control a warehouse fulfillment robot. | |
| Return exactly one JSON object with: | |
| - command: one of turn_left, turn_right, move_forward, scan_bin, pick_item, pack_item, recharge, rest, wait | |
| - rationale: a short sentence | |
| Objective: | |
| - Complete the customer order as efficiently as possible. | |
| - Use scans before picks when the task requires verified bins. | |
| - Recharge before battery depletion if needed. | |
| - Avoid invalid actions and unnecessary wandering. | |
| Advanced mechanics (active on harder tasks): | |
| - Obstacles: some cells are impassable. If front_cell says "obstacle", turn to find another route. | |
| - Item weight: items have weight. If an item exceeds your carry capacity, you cannot pick it. | |
| Heavier items drain more battery while moving. | |
| - Stamina: movement costs stamina. When stamina hits 0, movement costs double battery. | |
| Use the "rest" action at the rest area to restore stamina. | |
| - Money: packing correct items earns money; wrong packs lose money. Hit the profit target if set. | |
| """ | |
| def _observation_prompt(observation: WarehouseObservation, state: WarehouseState) -> str: | |
| return json.dumps( | |
| { | |
| "task_id": state.task_id, | |
| "difficulty": state.difficulty, | |
| "mission": observation.mission, | |
| "observation": model_to_dict(observation), | |
| "state_summary": { | |
| "step_count": state.step_count, | |
| "max_steps": state.max_steps, | |
| "battery_level": state.battery_level, | |
| "carrying": state.carrying, | |
| "scanned_bins": state.scanned_bins, | |
| "completion_ratio": state.completion_ratio, | |
| "recent_actions": state.action_history[-6:], | |
| }, | |
| }, | |
| indent=2, | |
| sort_keys=True, | |
| ) | |
| def _response_schema() -> Dict[str, Any]: | |
| return { | |
| "type": "json_schema", | |
| "name": "warehouse_action", | |
| "strict": True, | |
| "schema": BaselineCommand.model_json_schema(), | |
| } | |
| def _pick_action(client: Any, model: str, observation: WarehouseObservation, state: WarehouseState) -> BaselineCommand: | |
| response = client.responses.create( | |
| model=model, | |
| input=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": _observation_prompt(observation, state)}, | |
| ], | |
| text={"format": _response_schema()}, | |
| ) | |
| content = getattr(response, "output_text", "").strip() | |
| if not content: | |
| raise RuntimeError("OpenAI response did not contain text output.") | |
| payload = json.loads(content) | |
| return BaselineCommand(**payload) | |
| def run_task(task_id: str, model: str, seed: int = 7) -> Dict[str, float]: | |
| env = WarehouseFulfillmentEnv(task_id=task_id, seed=seed) | |
| observation = env.reset(task_id=task_id, seed=seed) | |
| client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) | |
| done = False | |
| while not done: | |
| state = env.state() | |
| action = _pick_action(client, model, observation, state) | |
| observation, _reward, done, _info = env.step(action) | |
| final_state = env.state() | |
| return { | |
| "task_id": task_id, | |
| "reward": round(final_state.total_reward, 4), | |
| "score": grade_episode(final_state), | |
| "steps": float(final_state.step_count), | |
| "success": 1.0 if final_state.success else 0.0, | |
| } | |
| def run_baseline(model: str | None = None, seed: int = 7, seeds: List[int] | None = None) -> List[Dict[str, float]]: | |
| """Run baseline evaluation across one or multiple seeds. | |
| Args: | |
| model: Model name to use (defaults to OPENAI_MODEL env var) | |
| seed: Single seed to use (ignored if seeds is provided) | |
| seeds: List of seeds for multi-seed evaluation (e.g., [7, 42, 123, 456, 789]) | |
| Returns: | |
| List of results, one per task. If multiple seeds, results include mean/std/min/max. | |
| """ | |
| if OpenAI is None: | |
| raise RuntimeError("The openai package is required. Install project dependencies first.") | |
| if not os.environ.get("OPENAI_API_KEY"): | |
| raise RuntimeError("OPENAI_API_KEY is not set.") | |
| resolved_model = model or os.environ.get("OPENAI_MODEL", "gpt-4.1-mini") | |
| eval_seeds = seeds if seeds is not None else [seed] | |
| if len(eval_seeds) == 1: | |
| # Single-seed evaluation (backward compatible) | |
| return [run_task(task_id, model=resolved_model, seed=eval_seeds[0]) for task_id in TASKS] | |
| # Multi-seed evaluation | |
| results = [] | |
| for task_id in TASKS: | |
| task_results = [] | |
| for s in eval_seeds: | |
| result = run_task(task_id, model=resolved_model, seed=s) | |
| task_results.append(result) | |
| # Aggregate statistics | |
| scores = [r["score"] for r in task_results] | |
| rewards = [r["reward"] for r in task_results] | |
| steps = [r["steps"] for r in task_results] | |
| successes = [r["success"] for r in task_results] | |
| results.append({ | |
| "task_id": task_id, | |
| "score": round(sum(scores) / len(scores), 4), | |
| "score_std": round((sum((s - sum(scores)/len(scores))**2 for s in scores) / len(scores))**0.5, 4), | |
| "score_min": round(min(scores), 4), | |
| "score_max": round(max(scores), 4), | |
| "reward": round(sum(rewards) / len(rewards), 4), | |
| "steps": round(sum(steps) / len(steps), 1), | |
| "success": round(sum(successes) / len(successes), 2), | |
| "num_seeds": len(eval_seeds), | |
| "seeds": eval_seeds, | |
| }) | |
| return results | |
| def format_report(results: List[Dict[str, float]], model: str) -> str: | |
| lines = [f"model | {model}"] | |
| # Check if multi-seed results (has score_std key) | |
| is_multiseed = "score_std" in results[0] if results else False | |
| if is_multiseed: | |
| lines.append(f"seeds | {results[0]['num_seeds']} seeds: {results[0]['seeds']}") | |
| lines.append("task_id | score (±std) | min | max | reward | steps | success") | |
| for result in results: | |
| lines.append( | |
| f"{result['task_id']} | {result['score']:.4f} (±{result['score_std']:.4f}) | " | |
| f"{result['score_min']:.4f} | {result['score_max']:.4f} | " | |
| f"{result['reward']:.4f} | {result['steps']:.1f} | {result['success']:.2f}" | |
| ) | |
| else: | |
| lines.append("task_id | score | reward | steps | success") | |
| for result in results: | |
| lines.append( | |
| f"{result['task_id']} | {result['score']:.4f} | " | |
| f"{result['reward']:.4f} | {int(result['steps'])} | {int(result['success'])}" | |
| ) | |
| mean_score = sum(result["score"] for result in results) / len(results) | |
| lines.append(f"mean_score | {mean_score:.4f}") | |
| return "\n".join(lines) | |
| if __name__ == "__main__": | |
| selected_model = os.environ.get("OPENAI_MODEL", "gpt-4.1-mini") | |
| # Multi-seed evaluation: set EVAL_SEEDS env var or use default | |
| eval_seeds_str = os.environ.get("EVAL_SEEDS", "7,42,123,456,789") | |
| eval_seeds = [int(s.strip()) for s in eval_seeds_str.split(",")] | |
| print(f"Running baseline with {len(eval_seeds)} seeds: {eval_seeds}") | |
| results = run_baseline(model=selected_model, seeds=eval_seeds) | |
| print(format_report(results, model=selected_model)) | |