Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import os | |
| import re | |
| import sys | |
| import time | |
| from typing import Any, Dict, List, Optional | |
| import requests | |
| from openai import OpenAI | |
| # Load .env file if present (works without it too) | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| except ImportError: | |
| pass | |
| # --------------------------------------------------------------------------- | |
| # Configuration | |
| # --------------------------------------------------------------------------- | |
| IMAGE_NAME = os.getenv("IMAGE_NAME") | |
| API_KEY = os.getenv("HF_TOKEN") or os.getenv("OPENAI_API_KEY") | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") | |
| BASE_URL = os.getenv("ENV_BASE_URL", "http://localhost:8000") | |
| BENCHMARK = "agentops-gym" | |
| MAX_STEPS = 10 | |
| TEMPERATURE = 0.3 | |
| MAX_TOKENS = 600 | |
| ALL_TASKS = ["task_1", "task_2", "task_3", "task_4"] | |
| # --------------------------------------------------------------------------- | |
| # System prompt | |
| # --------------------------------------------------------------------------- | |
| SYSTEM_PROMPT = """\ | |
| You are an expert software engineer agent. You solve coding tasks by calling tools. | |
| Available tools: | |
| FileRead β Read a file. Parameters: {"filename": "path/to/file.py"} | |
| FileWrite β Write/overwrite. Parameters: {"filename": "...", "content": "..."} | |
| Grep β Search all files. Parameters: {"pattern": "regex_or_string"} | |
| Bash β Simulated shell. Parameters: {"command": "lint main.py"} | |
| WebSearch β Search docs. Parameters: {"query": "python lru_cache"} | |
| TodoWrite β Record a plan. Parameters: {"plan": "1. Do X\\n2. Do Y"} | |
| RULES: | |
| 1. Respond ONLY with a single JSON object β no markdown, no extra text. | |
| 2. Format exactly: {"tool": "ToolName", "parameters": {...}, "reasoning": "why"} | |
| 3. Be efficient β minimize total tool calls. | |
| 4. For hard tasks: call TodoWrite FIRST to plan, then act. | |
| 5. Never repeat the exact same tool + parameters twice in a row. | |
| Example: | |
| {"tool": "Grep", "parameters": {"pattern": "def fetch"}, "reasoning": "Find the function"} | |
| """ | |
| # --------------------------------------------------------------------------- | |
| # Mandatory stdout log helpers | |
| # --------------------------------------------------------------------------- | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: | |
| err_val = error if error else "null" | |
| action_short = str(action).replace("\n", " ")[:200] | |
| print( | |
| f"[STEP] step={step} action={action_short} " | |
| f"reward={reward:.2f} done={str(done).lower()} error={err_val}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, rewards: List[float]) -> None: | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| print( | |
| f"[END] success={str(success).lower()} steps={steps} rewards={rewards_str}", | |
| flush=True, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # HTTP helpers | |
| # --------------------------------------------------------------------------- | |
| def http_reset(task_id: str) -> Dict: | |
| """POST /reset and return the observation dict.""" | |
| resp = requests.post( | |
| f"{BASE_URL}/reset", | |
| json={"task_id": task_id}, | |
| timeout=30, | |
| ) | |
| resp.raise_for_status() | |
| return resp.json() | |
| def http_step(tool: str, parameters: Dict, reasoning: str = "") -> Dict: | |
| """POST /step with the correct body shape and return the response dict.""" | |
| body = { | |
| "action": { | |
| "tool": tool, | |
| "parameters": parameters, | |
| "reasoning": reasoning, | |
| } | |
| } | |
| resp = requests.post( | |
| f"{BASE_URL}/step", | |
| json=body, | |
| timeout=30, | |
| ) | |
| resp.raise_for_status() | |
| return resp.json() | |
| def http_grader() -> Dict: | |
| resp = requests.get(f"{BASE_URL}/grader", timeout=10) | |
| if resp.status_code == 200: | |
| return resp.json() | |
| return {} | |
| # --------------------------------------------------------------------------- | |
| # Prompt builder | |
| # --------------------------------------------------------------------------- | |
| def build_prompt(obs: Dict) -> str: | |
| parts = [f"TASK: {obs.get('task_description', '')}"] | |
| parts.append(f"\nVisible files: {obs.get('visible_files', [])}") | |
| last = obs.get("last_tool_result") | |
| if last: | |
| # Truncate long outputs | |
| parts.append(f"\nLast tool result:\n{str(last)[:1500]}") | |
| history = obs.get("action_history", []) | |
| if history: | |
| parts.append(f"\nHistory (last 3): {history[-3:]}") | |
| if obs.get("message"): | |
| parts.append(f"\nEnv message: {obs['message']}") | |
| meta = obs.get("metadata", {}) | |
| steps_rem = meta.get("steps_remaining", "?") | |
| parts.append(f"\nStep {obs.get('step_count', 0)}, steps remaining: {steps_rem}") | |
| parts.append("\nRespond with a single JSON tool call:") | |
| return "\n".join(parts) | |
| # --------------------------------------------------------------------------- | |
| # JSON extraction | |
| # --------------------------------------------------------------------------- | |
| def extract_tool_call(text: str) -> Optional[Dict]: | |
| """Extract a valid JSON tool call from model output.""" | |
| text = text.strip() | |
| # Strip markdown fences | |
| if "```" in text: | |
| for block in text.split("```"): | |
| block = block.strip().lstrip("json").strip() | |
| if block.startswith("{"): | |
| text = block | |
| break | |
| # Direct parse | |
| try: | |
| obj = json.loads(text) | |
| if "tool" in obj: | |
| return obj | |
| except json.JSONDecodeError: | |
| pass | |
| # Extract first {...} block | |
| m = re.search(r'\{[^{}]+\}', text, re.DOTALL) | |
| if m: | |
| try: | |
| obj = json.loads(m.group()) | |
| if "tool" in obj: | |
| return obj | |
| except json.JSONDecodeError: | |
| pass | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # Episode runner | |
| # --------------------------------------------------------------------------- | |
| def run_episode(client: OpenAI, task_id: str) -> Dict: | |
| log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME) | |
| rewards: List[float] = [] | |
| steps_taken = 0 | |
| score = 0.0 | |
| success = False | |
| error_msg = None | |
| try: | |
| # Reset | |
| reset_resp = http_reset(task_id) | |
| obs = reset_resp.get("observation", {}) | |
| for step in range(1, MAX_STEPS + 1): | |
| if reset_resp.get("done") or obs.get("done"): | |
| break | |
| # Ask the model | |
| prompt = build_prompt(obs) | |
| try: | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| max_tokens=MAX_TOKENS, | |
| temperature=TEMPERATURE, | |
| ) | |
| raw = (completion.choices[0].message.content or "").strip() | |
| except Exception as e: | |
| error_msg = f"LLM error: {e}" | |
| log_step(step=step, action="(llm_error)", reward=0.0, done=True, error=str(e)) | |
| break | |
| tool_call = extract_tool_call(raw) | |
| if tool_call is None: | |
| # Fallback: safe no-op grep | |
| tool_call = { | |
| "tool": "Grep", | |
| "parameters": {"pattern": "def "}, | |
| "reasoning": "fallback β could not parse model output", | |
| } | |
| tool = tool_call.get("tool", "Grep") | |
| params = tool_call.get("parameters", {}) | |
| reasoning = tool_call.get("reasoning", "") | |
| action_str = f"{tool}({json.dumps(params)})" | |
| # Execute | |
| try: | |
| step_resp = http_step(tool, params, reasoning) | |
| except requests.HTTPError as e: | |
| error_msg = str(e) | |
| log_step(step=step, action=action_short, reward=0.0, done=True, error=error_msg) | |
| break | |
| obs = step_resp.get("observation", {}) | |
| reward = float(step_resp.get("reward", 0.0) or 0.0) | |
| done = bool(step_resp.get("done", False)) | |
| rewards.append(reward) | |
| steps_taken = step | |
| log_step(step=step, action=action_str, reward=reward, done=done, error=None) | |
| if done: | |
| break | |
| # Fetch grader score | |
| grader = http_grader() | |
| score = float(grader.get("score", 0.0) or 0.0) | |
| success = score >= 0.5 | |
| except Exception as exc: | |
| print(f"[DEBUG] Episode error for {task_id}: {exc}", flush=True) | |
| finally: | |
| log_end(success=success, steps=steps_taken, rewards=rewards) | |
| return { | |
| "task_id": task_id, | |
| "score": score, | |
| "steps": steps_taken, | |
| "success": success, | |
| "rewards": rewards, | |
| } | |
| def main() -> None: | |
| if not API_KEY: | |
| print("ERROR: HF_TOKEN (or API_KEY) must be set.", file=sys.stderr) | |
| print(" export HF_TOKEN=hf_xxx", file=sys.stderr) | |
| sys.exit(1) | |
| for attempt in range(10): | |
| try: | |
| r = requests.get(f"{BASE_URL}/health", timeout=5) | |
| if r.status_code == 200: | |
| break | |
| except Exception: | |
| pass | |
| print(f"[DEBUG] Waiting for server... attempt {attempt+1}/10", flush=True) | |
| time.sleep(2) | |
| else: | |
| print("ERROR: Server did not become ready.", file=sys.stderr) | |
| sys.exit(1) | |
| client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) | |
| print("=" * 60, flush=True) | |
| print(f"AgentOps Gym β Baseline Inference", flush=True) | |
| print(f"Model: {MODEL_NAME} | Server: {BASE_URL}", flush=True) | |
| print("=" * 60, flush=True) | |
| results = [] | |
| for task_id in ALL_TASKS: | |
| print("β" * 40, flush=True) | |
| result = run_episode(client, task_id) | |
| results.append(result) | |
| print("=" * 60, flush=True) | |
| print("BASELINE SUMMARY", flush=True) | |
| print("=" * 60, flush=True) | |
| total = sum(r["score"] for r in results) | |
| solved = sum(1 for r in results if r["success"]) | |
| avg = total / len(results) if results else 0.0 | |
| for r in results: | |
| status = "β PASS" if r["success"] else "β FAIL" | |
| print(f" {r['task_id']:>8} score={r['score']:.3f} steps={r['steps']:2d} {status}", flush=True) | |
| print(f"\n Average score: {avg:.3f}", flush=True) | |
| print(f" Solved: {solved} / {len(results)}", flush=True) | |
| print("=" * 60, flush=True) | |
| if __name__ == "__main__": | |
| main() |