agentops-gym / server /inference.py
Revanth-ml's picture
Upload folder using huggingface_hub
3be06e2 verified
from __future__ import annotations
import json
import os
import re
import sys
import time
from typing import Any, Dict, List, Optional
import requests
from openai import OpenAI
# Load .env file if present (works without it too)
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
IMAGE_NAME = os.getenv("IMAGE_NAME")
API_KEY = os.getenv("HF_TOKEN") or os.getenv("OPENAI_API_KEY")
API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1")
MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct")
BASE_URL = os.getenv("ENV_BASE_URL", "http://localhost:8000")
BENCHMARK = "agentops-gym"
MAX_STEPS = 10
TEMPERATURE = 0.3
MAX_TOKENS = 600
ALL_TASKS = ["task_1", "task_2", "task_3", "task_4"]
# ---------------------------------------------------------------------------
# System prompt
# ---------------------------------------------------------------------------
SYSTEM_PROMPT = """\
You are an expert software engineer agent. You solve coding tasks by calling tools.
Available tools:
FileRead β€” Read a file. Parameters: {"filename": "path/to/file.py"}
FileWrite β€” Write/overwrite. Parameters: {"filename": "...", "content": "..."}
Grep β€” Search all files. Parameters: {"pattern": "regex_or_string"}
Bash β€” Simulated shell. Parameters: {"command": "lint main.py"}
WebSearch β€” Search docs. Parameters: {"query": "python lru_cache"}
TodoWrite β€” Record a plan. Parameters: {"plan": "1. Do X\\n2. Do Y"}
RULES:
1. Respond ONLY with a single JSON object β€” no markdown, no extra text.
2. Format exactly: {"tool": "ToolName", "parameters": {...}, "reasoning": "why"}
3. Be efficient β€” minimize total tool calls.
4. For hard tasks: call TodoWrite FIRST to plan, then act.
5. Never repeat the exact same tool + parameters twice in a row.
Example:
{"tool": "Grep", "parameters": {"pattern": "def fetch"}, "reasoning": "Find the function"}
"""
# ---------------------------------------------------------------------------
# Mandatory stdout log helpers
# ---------------------------------------------------------------------------
def log_start(task: str, env: str, model: str) -> None:
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None:
err_val = error if error else "null"
action_short = str(action).replace("\n", " ")[:200]
print(
f"[STEP] step={step} action={action_short} "
f"reward={reward:.2f} done={str(done).lower()} error={err_val}",
flush=True,
)
def log_end(success: bool, steps: int, rewards: List[float]) -> None:
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
print(
f"[END] success={str(success).lower()} steps={steps} rewards={rewards_str}",
flush=True,
)
# ---------------------------------------------------------------------------
# HTTP helpers
# ---------------------------------------------------------------------------
def http_reset(task_id: str) -> Dict:
"""POST /reset and return the observation dict."""
resp = requests.post(
f"{BASE_URL}/reset",
json={"task_id": task_id},
timeout=30,
)
resp.raise_for_status()
return resp.json()
def http_step(tool: str, parameters: Dict, reasoning: str = "") -> Dict:
"""POST /step with the correct body shape and return the response dict."""
body = {
"action": {
"tool": tool,
"parameters": parameters,
"reasoning": reasoning,
}
}
resp = requests.post(
f"{BASE_URL}/step",
json=body,
timeout=30,
)
resp.raise_for_status()
return resp.json()
def http_grader() -> Dict:
resp = requests.get(f"{BASE_URL}/grader", timeout=10)
if resp.status_code == 200:
return resp.json()
return {}
# ---------------------------------------------------------------------------
# Prompt builder
# ---------------------------------------------------------------------------
def build_prompt(obs: Dict) -> str:
parts = [f"TASK: {obs.get('task_description', '')}"]
parts.append(f"\nVisible files: {obs.get('visible_files', [])}")
last = obs.get("last_tool_result")
if last:
# Truncate long outputs
parts.append(f"\nLast tool result:\n{str(last)[:1500]}")
history = obs.get("action_history", [])
if history:
parts.append(f"\nHistory (last 3): {history[-3:]}")
if obs.get("message"):
parts.append(f"\nEnv message: {obs['message']}")
meta = obs.get("metadata", {})
steps_rem = meta.get("steps_remaining", "?")
parts.append(f"\nStep {obs.get('step_count', 0)}, steps remaining: {steps_rem}")
parts.append("\nRespond with a single JSON tool call:")
return "\n".join(parts)
# ---------------------------------------------------------------------------
# JSON extraction
# ---------------------------------------------------------------------------
def extract_tool_call(text: str) -> Optional[Dict]:
"""Extract a valid JSON tool call from model output."""
text = text.strip()
# Strip markdown fences
if "```" in text:
for block in text.split("```"):
block = block.strip().lstrip("json").strip()
if block.startswith("{"):
text = block
break
# Direct parse
try:
obj = json.loads(text)
if "tool" in obj:
return obj
except json.JSONDecodeError:
pass
# Extract first {...} block
m = re.search(r'\{[^{}]+\}', text, re.DOTALL)
if m:
try:
obj = json.loads(m.group())
if "tool" in obj:
return obj
except json.JSONDecodeError:
pass
return None
# ---------------------------------------------------------------------------
# Episode runner
# ---------------------------------------------------------------------------
def run_episode(client: OpenAI, task_id: str) -> Dict:
log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME)
rewards: List[float] = []
steps_taken = 0
score = 0.0
success = False
error_msg = None
try:
# Reset
reset_resp = http_reset(task_id)
obs = reset_resp.get("observation", {})
for step in range(1, MAX_STEPS + 1):
if reset_resp.get("done") or obs.get("done"):
break
# Ask the model
prompt = build_prompt(obs)
try:
completion = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
max_tokens=MAX_TOKENS,
temperature=TEMPERATURE,
)
raw = (completion.choices[0].message.content or "").strip()
except Exception as e:
error_msg = f"LLM error: {e}"
log_step(step=step, action="(llm_error)", reward=0.0, done=True, error=str(e))
break
tool_call = extract_tool_call(raw)
if tool_call is None:
# Fallback: safe no-op grep
tool_call = {
"tool": "Grep",
"parameters": {"pattern": "def "},
"reasoning": "fallback β€” could not parse model output",
}
tool = tool_call.get("tool", "Grep")
params = tool_call.get("parameters", {})
reasoning = tool_call.get("reasoning", "")
action_str = f"{tool}({json.dumps(params)})"
# Execute
try:
step_resp = http_step(tool, params, reasoning)
except requests.HTTPError as e:
error_msg = str(e)
log_step(step=step, action=action_short, reward=0.0, done=True, error=error_msg)
break
obs = step_resp.get("observation", {})
reward = float(step_resp.get("reward", 0.0) or 0.0)
done = bool(step_resp.get("done", False))
rewards.append(reward)
steps_taken = step
log_step(step=step, action=action_str, reward=reward, done=done, error=None)
if done:
break
# Fetch grader score
grader = http_grader()
score = float(grader.get("score", 0.0) or 0.0)
success = score >= 0.5
except Exception as exc:
print(f"[DEBUG] Episode error for {task_id}: {exc}", flush=True)
finally:
log_end(success=success, steps=steps_taken, rewards=rewards)
return {
"task_id": task_id,
"score": score,
"steps": steps_taken,
"success": success,
"rewards": rewards,
}
def main() -> None:
if not API_KEY:
print("ERROR: HF_TOKEN (or API_KEY) must be set.", file=sys.stderr)
print(" export HF_TOKEN=hf_xxx", file=sys.stderr)
sys.exit(1)
for attempt in range(10):
try:
r = requests.get(f"{BASE_URL}/health", timeout=5)
if r.status_code == 200:
break
except Exception:
pass
print(f"[DEBUG] Waiting for server... attempt {attempt+1}/10", flush=True)
time.sleep(2)
else:
print("ERROR: Server did not become ready.", file=sys.stderr)
sys.exit(1)
client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY)
print("=" * 60, flush=True)
print(f"AgentOps Gym β€” Baseline Inference", flush=True)
print(f"Model: {MODEL_NAME} | Server: {BASE_URL}", flush=True)
print("=" * 60, flush=True)
results = []
for task_id in ALL_TASKS:
print("─" * 40, flush=True)
result = run_episode(client, task_id)
results.append(result)
print("=" * 60, flush=True)
print("BASELINE SUMMARY", flush=True)
print("=" * 60, flush=True)
total = sum(r["score"] for r in results)
solved = sum(1 for r in results if r["success"])
avg = total / len(results) if results else 0.0
for r in results:
status = "βœ… PASS" if r["success"] else "❌ FAIL"
print(f" {r['task_id']:>8} score={r['score']:.3f} steps={r['steps']:2d} {status}", flush=True)
print(f"\n Average score: {avg:.3f}", flush=True)
print(f" Solved: {solved} / {len(results)}", flush=True)
print("=" * 60, flush=True)
if __name__ == "__main__":
main()