agentops-gym / inference.py
Revanth-ml's picture
Upload folder using huggingface_hub
b91d18e verified
#!/usr/bin/env python3
"""
AgentOps Gym β€” Baseline inference script.
Follows the exact pattern from the official OpenEnv sample inference.py.
STDOUT FORMAT:
[START] task=<task> env=<benchmark> model=<model>
[STEP] step=<n> action=<str> reward=<0.00> done=<true|false> error=<msg|null>
[END] success=<true|false> steps=<n> score=<0.000> rewards=<r1,r2,...>
"""
import asyncio
import json
import os
import re
from typing import Dict, List, Optional
# Load .env if present (local dev only)
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
from openai import OpenAI
# Ensure package importable from any working directory
import pathlib, sys as _sys
_root = pathlib.Path(__file__).resolve().parent
_parent = _root.parent
for _p in (_root, _parent):
if str(_p) not in _sys.path:
_sys.path.insert(0, str(_p))
from agentops_gym.client import AgentOpsEnv
from agentops_gym.models import ToolCall
# ---------------------------------------------------------------------------
# Configuration β€” exactly matching the official sample pattern
# ---------------------------------------------------------------------------
IMAGE_NAME = os.getenv("IMAGE_NAME")
API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY") # HF_TOKEN first
API_BASE_URL = os.getenv("API_BASE_URL") or "https://router.huggingface.co/v1"
MODEL_NAME = os.getenv("MODEL_NAME") or "Qwen/Qwen2.5-72B-Instruct"
BENCHMARK = "agentops-gym"
MAX_STEPS = 10
TEMPERATURE = 0.3
MAX_TOKENS = 1024
SUCCESS_SCORE_THRESHOLD = 0.5
ALL_TASKS = ["task_1", "task_2", "task_3", "task_4"]
# ---------------------------------------------------------------------------
# System prompt
# ---------------------------------------------------------------------------
SYSTEM_PROMPT = """\
You are an expert software engineer agent. You solve coding tasks by calling tools.
Available tools:
FileRead β€” Read a file. Parameters: {"filename": "path/to/file.py"}
FileWrite β€” Write a file. Parameters: {"filename": "...", "content": "..."}
Grep β€” Search files. Parameters: {"pattern": "regex_or_string"}
Bash β€” Simulated shell. Parameters: {"command": "lint main.py"}
WebSearch β€” Search docs. Parameters: {"query": "python lru_cache"}
TodoWrite β€” Record a plan. Parameters: {"plan": "1. Do X\\n2. Do Y"}
RULES:
1. Respond ONLY with a single JSON object β€” no markdown, no extra text.
2. Format: {"tool": "ToolName", "parameters": {...}, "reasoning": "why"}
3. Minimise total tool calls β€” efficiency matters.
4. For hard tasks: call TodoWrite FIRST to plan, then act.
5. Never repeat the exact same tool + parameters consecutively.
Example:
{"tool": "Grep", "parameters": {"pattern": "def fetch"}, "reasoning": "Find function"}
"""
# ---------------------------------------------------------------------------
# Stdout log helpers β€” must match spec exactly
# ---------------------------------------------------------------------------
def log_start(task: str, env: str, model: str) -> None:
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None:
error_val = error if error else "null"
done_val = str(done).lower()
print(
f"[STEP] step={step} action={str(action).replace(chr(10),' ')[:200]} "
f"reward={reward:.2f} done={done_val} error={error_val}",
flush=True,
)
def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None:
# Score must be strictly between 0 and 1
score = max(0.001, min(0.999, score))
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
print(
f"[END] success={str(success).lower()} steps={steps} "
f"score={score:.3f} rewards={rewards_str}",
flush=True,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def build_prompt(obs_data: Dict, history: List[str]) -> str:
parts = [f"TASK: {obs_data.get('task_description', '')}"]
parts.append(f"\nVisible files: {obs_data.get('visible_files', [])}")
last = obs_data.get("last_tool_result")
if last:
parts.append(f"\nLast tool result:\n{str(last)[:1500]}")
if history:
parts.append(f"\nHistory (last 3): {history[-3:]}")
if obs_data.get("message"):
parts.append(f"\nEnv message: {obs_data['message']}")
meta = obs_data.get("metadata", {})
parts.append(
f"\nStep {obs_data.get('step_count', 0)}, "
f"steps remaining: {meta.get('steps_remaining', '?')}"
)
parts.append("\nRespond with a single JSON tool call:")
return "\n".join(parts)
def extract_tool_call(text: str) -> Optional[Dict]:
text = text.strip()
if "```" in text:
for block in text.split("```"):
block = block.strip().lstrip("json").strip()
if block.startswith("{"):
text = block
break
try:
obj = json.loads(text)
if "tool" in obj:
return obj
except json.JSONDecodeError:
pass
m = re.search(r'\{[^{}]+\}', text, re.DOTALL)
if m:
try:
obj = json.loads(m.group())
if "tool" in obj:
return obj
except json.JSONDecodeError:
pass
return None
def get_model_action(client: OpenAI, obs_data: Dict, history: List[str]) -> Optional[Dict]:
try:
completion = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": build_prompt(obs_data, history)},
],
max_tokens=MAX_TOKENS,
temperature=TEMPERATURE,
)
raw = (completion.choices[0].message.content or "").strip()
return extract_tool_call(raw)
except Exception as e:
print(f"[DEBUG] LLM error: {e}", flush=True)
return None
# ---------------------------------------------------------------------------
# Single episode runner
# ---------------------------------------------------------------------------
async def run_episode(env: AgentOpsEnv, client: OpenAI, task_id: str) -> Dict:
history: List[str] = []
rewards: List[float] = []
steps_taken = 0
score = 0.001
success = False
obs_data: Dict = {}
log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME)
try:
result = await env.reset(seed=None, task_id=task_id)
obs_data = (
result.observation.model_dump()
if hasattr(result.observation, "model_dump")
else result.observation.dict()
)
for step in range(1, MAX_STEPS + 1):
if result.done or obs_data.get("done", False):
break
tool_call = get_model_action(client, obs_data, history) or {
"tool": "Grep",
"parameters": {"pattern": "def "},
"reasoning": "fallback",
}
tool = tool_call.get("tool", "Grep")
params = tool_call.get("parameters", {})
reasoning = tool_call.get("reasoning", "")
action_str = f"{tool}({json.dumps(params)})"
try:
result = await env.step(
ToolCall(tool=tool, parameters=params, reasoning=reasoning)
)
except Exception as e:
log_step(step=step, action=action_str, reward=0.0, done=True, error=str(e))
break
obs_data = (
result.observation.model_dump()
if hasattr(result.observation, "model_dump")
else result.observation.dict()
)
reward = float(result.reward or 0.0)
done = bool(result.done)
rewards.append(reward)
steps_taken = step
history.append(f"Step {step}: {action_str} β†’ reward {reward:.2f}")
log_step(step=step, action=action_str, reward=reward, done=done, error=None)
if done:
break
meta = obs_data.get("metadata", {})
score = float(meta.get("grader_score") or 0.0)
if score == 0.0:
score = float(meta.get("cumulative_reward") or 0.0)
score = max(0.001, min(0.999, score))
success = score >= SUCCESS_SCORE_THRESHOLD
except Exception as e:
print(f"[DEBUG] Episode error for {task_id}: {e}", flush=True)
score = 0.001
finally:
log_end(success=success, steps=steps_taken, score=score, rewards=rewards)
return {
"task_id": task_id,
"score": score,
"steps": steps_taken,
"success": success,
"rewards": rewards,
}
# ---------------------------------------------------------------------------
# Main β€” exactly matching official sample pattern
# ---------------------------------------------------------------------------
async def async_main() -> None:
# Use module-level API_KEY and API_BASE_URL β€” same as official sample
client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY)
# from_docker_image is awaitable β€” same as official sample
env = await AgentOpsEnv.from_docker_image(IMAGE_NAME)
print("=" * 60, flush=True)
print("AgentOps Gym β€” Baseline Inference", flush=True)
print(f"Model: {MODEL_NAME} | Image: {IMAGE_NAME}", flush=True)
print("=" * 60, flush=True)
results = []
try:
async with env:
for task_id in ALL_TASKS:
print("─" * 40, flush=True)
result = await run_episode(env, client, task_id)
results.append(result)
except Exception as e:
print(f"[DEBUG] Cleanup error (non-fatal): {e}", flush=True)
# Summary
total = sum(r["score"] for r in results)
solved = sum(1 for r in results if r["success"])
avg = total / len(results) if results else 0.0
print("=" * 60, flush=True)
print("BASELINE SUMMARY", flush=True)
print("=" * 60, flush=True)
for r in results:
status = "βœ… PASS" if r["success"] else "❌ FAIL"
print(f" {r['task_id']:>8} score={r['score']:.3f} steps={r['steps']:2d} {status}", flush=True)
print(f"\n Average score: {avg:.3f}", flush=True)
print(f" Solved: {solved} / {len(results)}", flush=True)
print("=" * 60, flush=True)
if __name__ == "__main__":
asyncio.run(async_main())