#!/usr/bin/env python3 """ AgentOps Gym — Baseline inference script. Follows the exact pattern from the official OpenEnv sample inference.py. STDOUT FORMAT: [START] task= env= model= [STEP] step= action= reward=<0.00> done= error= [END] success= steps= score=<0.000> rewards= """ import asyncio import json import os import re from typing import Dict, List, Optional # Load .env if present (local dev only) try: from dotenv import load_dotenv load_dotenv() except ImportError: pass from openai import OpenAI # Ensure package importable from any working directory import pathlib, sys as _sys _root = pathlib.Path(__file__).resolve().parent _parent = _root.parent for _p in (_root, _parent): if str(_p) not in _sys.path: _sys.path.insert(0, str(_p)) from agentops_gym.client import AgentOpsEnv from agentops_gym.models import ToolCall # --------------------------------------------------------------------------- # Configuration — exactly matching the official sample pattern # --------------------------------------------------------------------------- IMAGE_NAME = os.getenv("IMAGE_NAME") API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY") # HF_TOKEN first API_BASE_URL = os.getenv("API_BASE_URL") or "https://router.huggingface.co/v1" MODEL_NAME = os.getenv("MODEL_NAME") or "Qwen/Qwen2.5-72B-Instruct" BENCHMARK = "agentops-gym" MAX_STEPS = 10 TEMPERATURE = 0.3 MAX_TOKENS = 1024 SUCCESS_SCORE_THRESHOLD = 0.5 ALL_TASKS = ["task_1", "task_2", "task_3", "task_4"] # --------------------------------------------------------------------------- # System prompt # --------------------------------------------------------------------------- SYSTEM_PROMPT = """\ You are an expert software engineer agent. You solve coding tasks by calling tools. Available tools: FileRead — Read a file. Parameters: {"filename": "path/to/file.py"} FileWrite — Write a file. Parameters: {"filename": "...", "content": "..."} Grep — Search files. Parameters: {"pattern": "regex_or_string"} Bash — Simulated shell. Parameters: {"command": "lint main.py"} WebSearch — Search docs. Parameters: {"query": "python lru_cache"} TodoWrite — Record a plan. Parameters: {"plan": "1. Do X\\n2. Do Y"} RULES: 1. Respond ONLY with a single JSON object — no markdown, no extra text. 2. Format: {"tool": "ToolName", "parameters": {...}, "reasoning": "why"} 3. Minimise total tool calls — efficiency matters. 4. For hard tasks: call TodoWrite FIRST to plan, then act. 5. Never repeat the exact same tool + parameters consecutively. Example: {"tool": "Grep", "parameters": {"pattern": "def fetch"}, "reasoning": "Find function"} """ # --------------------------------------------------------------------------- # Stdout log helpers — must match spec exactly # --------------------------------------------------------------------------- def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: error_val = error if error else "null" done_val = str(done).lower() print( f"[STEP] step={step} action={str(action).replace(chr(10),' ')[:200]} " f"reward={reward:.2f} done={done_val} error={error_val}", flush=True, ) def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: # Score must be strictly between 0 and 1 score = max(0.001, min(0.999, score)) rewards_str = ",".join(f"{r:.2f}" for r in rewards) print( f"[END] success={str(success).lower()} steps={steps} " f"score={score:.3f} rewards={rewards_str}", flush=True, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def build_prompt(obs_data: Dict, history: List[str]) -> str: parts = [f"TASK: {obs_data.get('task_description', '')}"] parts.append(f"\nVisible files: {obs_data.get('visible_files', [])}") last = obs_data.get("last_tool_result") if last: parts.append(f"\nLast tool result:\n{str(last)[:1500]}") if history: parts.append(f"\nHistory (last 3): {history[-3:]}") if obs_data.get("message"): parts.append(f"\nEnv message: {obs_data['message']}") meta = obs_data.get("metadata", {}) parts.append( f"\nStep {obs_data.get('step_count', 0)}, " f"steps remaining: {meta.get('steps_remaining', '?')}" ) parts.append("\nRespond with a single JSON tool call:") return "\n".join(parts) def extract_tool_call(text: str) -> Optional[Dict]: text = text.strip() if "```" in text: for block in text.split("```"): block = block.strip().lstrip("json").strip() if block.startswith("{"): text = block break try: obj = json.loads(text) if "tool" in obj: return obj except json.JSONDecodeError: pass m = re.search(r'\{[^{}]+\}', text, re.DOTALL) if m: try: obj = json.loads(m.group()) if "tool" in obj: return obj except json.JSONDecodeError: pass return None def get_model_action(client: OpenAI, obs_data: Dict, history: List[str]) -> Optional[Dict]: try: completion = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": build_prompt(obs_data, history)}, ], max_tokens=MAX_TOKENS, temperature=TEMPERATURE, ) raw = (completion.choices[0].message.content or "").strip() return extract_tool_call(raw) except Exception as e: print(f"[DEBUG] LLM error: {e}", flush=True) return None # --------------------------------------------------------------------------- # Single episode runner # --------------------------------------------------------------------------- async def run_episode(env: AgentOpsEnv, client: OpenAI, task_id: str) -> Dict: history: List[str] = [] rewards: List[float] = [] steps_taken = 0 score = 0.001 success = False obs_data: Dict = {} log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME) try: result = await env.reset(seed=None, task_id=task_id) obs_data = ( result.observation.model_dump() if hasattr(result.observation, "model_dump") else result.observation.dict() ) for step in range(1, MAX_STEPS + 1): if result.done or obs_data.get("done", False): break tool_call = get_model_action(client, obs_data, history) or { "tool": "Grep", "parameters": {"pattern": "def "}, "reasoning": "fallback", } tool = tool_call.get("tool", "Grep") params = tool_call.get("parameters", {}) reasoning = tool_call.get("reasoning", "") action_str = f"{tool}({json.dumps(params)})" try: result = await env.step( ToolCall(tool=tool, parameters=params, reasoning=reasoning) ) except Exception as e: log_step(step=step, action=action_str, reward=0.0, done=True, error=str(e)) break obs_data = ( result.observation.model_dump() if hasattr(result.observation, "model_dump") else result.observation.dict() ) reward = float(result.reward or 0.0) done = bool(result.done) rewards.append(reward) steps_taken = step history.append(f"Step {step}: {action_str} → reward {reward:.2f}") log_step(step=step, action=action_str, reward=reward, done=done, error=None) if done: break meta = obs_data.get("metadata", {}) score = float(meta.get("grader_score") or 0.0) if score == 0.0: score = float(meta.get("cumulative_reward") or 0.0) score = max(0.001, min(0.999, score)) success = score >= SUCCESS_SCORE_THRESHOLD except Exception as e: print(f"[DEBUG] Episode error for {task_id}: {e}", flush=True) score = 0.001 finally: log_end(success=success, steps=steps_taken, score=score, rewards=rewards) return { "task_id": task_id, "score": score, "steps": steps_taken, "success": success, "rewards": rewards, } # --------------------------------------------------------------------------- # Main — exactly matching official sample pattern # --------------------------------------------------------------------------- async def async_main() -> None: # Use module-level API_KEY and API_BASE_URL — same as official sample client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) # from_docker_image is awaitable — same as official sample env = await AgentOpsEnv.from_docker_image(IMAGE_NAME) print("=" * 60, flush=True) print("AgentOps Gym — Baseline Inference", flush=True) print(f"Model: {MODEL_NAME} | Image: {IMAGE_NAME}", flush=True) print("=" * 60, flush=True) results = [] try: async with env: for task_id in ALL_TASKS: print("─" * 40, flush=True) result = await run_episode(env, client, task_id) results.append(result) except Exception as e: print(f"[DEBUG] Cleanup error (non-fatal): {e}", flush=True) # Summary total = sum(r["score"] for r in results) solved = sum(1 for r in results if r["success"]) avg = total / len(results) if results else 0.0 print("=" * 60, flush=True) print("BASELINE SUMMARY", flush=True) print("=" * 60, flush=True) for r in results: status = "✅ PASS" if r["success"] else "❌ FAIL" print(f" {r['task_id']:>8} score={r['score']:.3f} steps={r['steps']:2d} {status}", flush=True) print(f"\n Average score: {avg:.3f}", flush=True) print(f" Solved: {solved} / {len(results)}", flush=True) print("=" * 60, flush=True) if __name__ == "__main__": asyncio.run(async_main())