Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| AgentOps Gym β Baseline inference script. | |
| Follows the exact pattern from the official OpenEnv sample inference.py. | |
| STDOUT FORMAT: | |
| [START] task=<task> env=<benchmark> model=<model> | |
| [STEP] step=<n> action=<str> reward=<0.00> done=<true|false> error=<msg|null> | |
| [END] success=<true|false> steps=<n> score=<0.000> rewards=<r1,r2,...> | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| import re | |
| from typing import Dict, List, Optional | |
| # Load .env if present (local dev only) | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| except ImportError: | |
| pass | |
| from openai import OpenAI | |
| # Ensure package importable from any working directory | |
| import pathlib, sys as _sys | |
| _root = pathlib.Path(__file__).resolve().parent | |
| _parent = _root.parent | |
| for _p in (_root, _parent): | |
| if str(_p) not in _sys.path: | |
| _sys.path.insert(0, str(_p)) | |
| from agentops_gym.client import AgentOpsEnv | |
| from agentops_gym.models import ToolCall | |
| # --------------------------------------------------------------------------- | |
| # Configuration β exactly matching the official sample pattern | |
| # --------------------------------------------------------------------------- | |
| IMAGE_NAME = os.getenv("IMAGE_NAME") | |
| API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY") # HF_TOKEN first | |
| API_BASE_URL = os.getenv("API_BASE_URL") or "https://router.huggingface.co/v1" | |
| MODEL_NAME = os.getenv("MODEL_NAME") or "Qwen/Qwen2.5-72B-Instruct" | |
| BENCHMARK = "agentops-gym" | |
| MAX_STEPS = 10 | |
| TEMPERATURE = 0.3 | |
| MAX_TOKENS = 1024 | |
| SUCCESS_SCORE_THRESHOLD = 0.5 | |
| ALL_TASKS = ["task_1", "task_2", "task_3", "task_4"] | |
| # --------------------------------------------------------------------------- | |
| # System prompt | |
| # --------------------------------------------------------------------------- | |
| SYSTEM_PROMPT = """\ | |
| You are an expert software engineer agent. You solve coding tasks by calling tools. | |
| Available tools: | |
| FileRead β Read a file. Parameters: {"filename": "path/to/file.py"} | |
| FileWrite β Write a file. Parameters: {"filename": "...", "content": "..."} | |
| Grep β Search files. Parameters: {"pattern": "regex_or_string"} | |
| Bash β Simulated shell. Parameters: {"command": "lint main.py"} | |
| WebSearch β Search docs. Parameters: {"query": "python lru_cache"} | |
| TodoWrite β Record a plan. Parameters: {"plan": "1. Do X\\n2. Do Y"} | |
| RULES: | |
| 1. Respond ONLY with a single JSON object β no markdown, no extra text. | |
| 2. Format: {"tool": "ToolName", "parameters": {...}, "reasoning": "why"} | |
| 3. Minimise total tool calls β efficiency matters. | |
| 4. For hard tasks: call TodoWrite FIRST to plan, then act. | |
| 5. Never repeat the exact same tool + parameters consecutively. | |
| Example: | |
| {"tool": "Grep", "parameters": {"pattern": "def fetch"}, "reasoning": "Find function"} | |
| """ | |
| # --------------------------------------------------------------------------- | |
| # Stdout log helpers β must match spec exactly | |
| # --------------------------------------------------------------------------- | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: | |
| error_val = error if error else "null" | |
| done_val = str(done).lower() | |
| print( | |
| f"[STEP] step={step} action={str(action).replace(chr(10),' ')[:200]} " | |
| f"reward={reward:.2f} done={done_val} error={error_val}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| # Score must be strictly between 0 and 1 | |
| score = max(0.001, min(0.999, score)) | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| print( | |
| f"[END] success={str(success).lower()} steps={steps} " | |
| f"score={score:.3f} rewards={rewards_str}", | |
| flush=True, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def build_prompt(obs_data: Dict, history: List[str]) -> str: | |
| parts = [f"TASK: {obs_data.get('task_description', '')}"] | |
| parts.append(f"\nVisible files: {obs_data.get('visible_files', [])}") | |
| last = obs_data.get("last_tool_result") | |
| if last: | |
| parts.append(f"\nLast tool result:\n{str(last)[:1500]}") | |
| if history: | |
| parts.append(f"\nHistory (last 3): {history[-3:]}") | |
| if obs_data.get("message"): | |
| parts.append(f"\nEnv message: {obs_data['message']}") | |
| meta = obs_data.get("metadata", {}) | |
| parts.append( | |
| f"\nStep {obs_data.get('step_count', 0)}, " | |
| f"steps remaining: {meta.get('steps_remaining', '?')}" | |
| ) | |
| parts.append("\nRespond with a single JSON tool call:") | |
| return "\n".join(parts) | |
| def extract_tool_call(text: str) -> Optional[Dict]: | |
| text = text.strip() | |
| if "```" in text: | |
| for block in text.split("```"): | |
| block = block.strip().lstrip("json").strip() | |
| if block.startswith("{"): | |
| text = block | |
| break | |
| try: | |
| obj = json.loads(text) | |
| if "tool" in obj: | |
| return obj | |
| except json.JSONDecodeError: | |
| pass | |
| m = re.search(r'\{[^{}]+\}', text, re.DOTALL) | |
| if m: | |
| try: | |
| obj = json.loads(m.group()) | |
| if "tool" in obj: | |
| return obj | |
| except json.JSONDecodeError: | |
| pass | |
| return None | |
| def get_model_action(client: OpenAI, obs_data: Dict, history: List[str]) -> Optional[Dict]: | |
| try: | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": build_prompt(obs_data, history)}, | |
| ], | |
| max_tokens=MAX_TOKENS, | |
| temperature=TEMPERATURE, | |
| ) | |
| raw = (completion.choices[0].message.content or "").strip() | |
| return extract_tool_call(raw) | |
| except Exception as e: | |
| print(f"[DEBUG] LLM error: {e}", flush=True) | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # Single episode runner | |
| # --------------------------------------------------------------------------- | |
| async def run_episode(env: AgentOpsEnv, client: OpenAI, task_id: str) -> Dict: | |
| history: List[str] = [] | |
| rewards: List[float] = [] | |
| steps_taken = 0 | |
| score = 0.001 | |
| success = False | |
| obs_data: Dict = {} | |
| log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME) | |
| try: | |
| result = await env.reset(seed=None, task_id=task_id) | |
| obs_data = ( | |
| result.observation.model_dump() | |
| if hasattr(result.observation, "model_dump") | |
| else result.observation.dict() | |
| ) | |
| for step in range(1, MAX_STEPS + 1): | |
| if result.done or obs_data.get("done", False): | |
| break | |
| tool_call = get_model_action(client, obs_data, history) or { | |
| "tool": "Grep", | |
| "parameters": {"pattern": "def "}, | |
| "reasoning": "fallback", | |
| } | |
| tool = tool_call.get("tool", "Grep") | |
| params = tool_call.get("parameters", {}) | |
| reasoning = tool_call.get("reasoning", "") | |
| action_str = f"{tool}({json.dumps(params)})" | |
| try: | |
| result = await env.step( | |
| ToolCall(tool=tool, parameters=params, reasoning=reasoning) | |
| ) | |
| except Exception as e: | |
| log_step(step=step, action=action_str, reward=0.0, done=True, error=str(e)) | |
| break | |
| obs_data = ( | |
| result.observation.model_dump() | |
| if hasattr(result.observation, "model_dump") | |
| else result.observation.dict() | |
| ) | |
| reward = float(result.reward or 0.0) | |
| done = bool(result.done) | |
| rewards.append(reward) | |
| steps_taken = step | |
| history.append(f"Step {step}: {action_str} β reward {reward:.2f}") | |
| log_step(step=step, action=action_str, reward=reward, done=done, error=None) | |
| if done: | |
| break | |
| meta = obs_data.get("metadata", {}) | |
| score = float(meta.get("grader_score") or 0.0) | |
| if score == 0.0: | |
| score = float(meta.get("cumulative_reward") or 0.0) | |
| score = max(0.001, min(0.999, score)) | |
| success = score >= SUCCESS_SCORE_THRESHOLD | |
| except Exception as e: | |
| print(f"[DEBUG] Episode error for {task_id}: {e}", flush=True) | |
| score = 0.001 | |
| finally: | |
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) | |
| return { | |
| "task_id": task_id, | |
| "score": score, | |
| "steps": steps_taken, | |
| "success": success, | |
| "rewards": rewards, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Main β exactly matching official sample pattern | |
| # --------------------------------------------------------------------------- | |
| async def async_main() -> None: | |
| # Use module-level API_KEY and API_BASE_URL β same as official sample | |
| client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) | |
| # from_docker_image is awaitable β same as official sample | |
| env = await AgentOpsEnv.from_docker_image(IMAGE_NAME) | |
| print("=" * 60, flush=True) | |
| print("AgentOps Gym β Baseline Inference", flush=True) | |
| print(f"Model: {MODEL_NAME} | Image: {IMAGE_NAME}", flush=True) | |
| print("=" * 60, flush=True) | |
| results = [] | |
| try: | |
| async with env: | |
| for task_id in ALL_TASKS: | |
| print("β" * 40, flush=True) | |
| result = await run_episode(env, client, task_id) | |
| results.append(result) | |
| except Exception as e: | |
| print(f"[DEBUG] Cleanup error (non-fatal): {e}", flush=True) | |
| # Summary | |
| total = sum(r["score"] for r in results) | |
| solved = sum(1 for r in results if r["success"]) | |
| avg = total / len(results) if results else 0.0 | |
| print("=" * 60, flush=True) | |
| print("BASELINE SUMMARY", flush=True) | |
| print("=" * 60, flush=True) | |
| for r in results: | |
| status = "β PASS" if r["success"] else "β FAIL" | |
| print(f" {r['task_id']:>8} score={r['score']:.3f} steps={r['steps']:2d} {status}", flush=True) | |
| print(f"\n Average score: {avg:.3f}", flush=True) | |
| print(f" Solved: {solved} / {len(results)}", flush=True) | |
| print("=" * 60, flush=True) | |
| if __name__ == "__main__": | |
| asyncio.run(async_main()) |