agentops-gym / server /app.py
Revanth-ml's picture
Upload folder using huggingface_hub
56724ad verified
"""
AgentOps Gym β€” FastAPI application.
Exposes the OpenEnv-compatible HTTP + WebSocket API via openenv-core's
create_app(), plus custom endpoints: /tasks, /grader, /health.
A persistent singleton environment handles HTTP /reset and /step (for
the baseline script and interactive testing). WebSocket connections each
get their own AgentOpsEnvironment instance (via create_app factory pattern).
"""
import threading
import logging
import os
from typing import Optional
from fastapi.responses import JSONResponse
try:
from openenv.core.env_server.http_server import create_app
except ImportError:
raise ImportError("openenv is required. Install with 'pip install openenv-core'")
try:
from agentops_gym.models import ToolCall, AgentObservation
from agentops_gym.server.environment import AgentOpsEnvironment, get_last_grader_result
from agentops_gym.server.tasks import TASK_REGISTRY
except (ModuleNotFoundError, ImportError):
from models import ToolCall, AgentObservation
from server.environment import AgentOpsEnvironment, get_last_grader_result
from server.tasks import TASK_REGISTRY
logger = logging.getLogger(__name__)
app = create_app(
AgentOpsEnvironment,
ToolCall,
AgentObservation,
env_name="agentops-gym",
)
_env = AgentOpsEnvironment()
_env_lock = threading.Lock()
def _serialize(obs: AgentObservation) -> dict:
return obs.model_dump() if hasattr(obs, "model_dump") else obs.dict()
app.router.routes = [
r for r in app.router.routes
if not (hasattr(r, "path") and r.path in ("/reset", "/step"))
]
@app.post("/reset")
async def stateful_reset(request: dict = None):
"""Reset environment for a new episode. Pass {'task_id': 'task_1'} etc."""
import asyncio
request = request or {}
task_id = request.get("task_id", "task_1")
def _do():
with _env_lock:
obs = _env.reset(task_id=task_id)
return _serialize(obs)
loop = asyncio.get_event_loop()
obs_dict = await loop.run_in_executor(None, _do)
return {"observation": obs_dict, "reward": 0.0, "done": False}
@app.post("/step")
async def stateful_step(request: dict = None):
"""Execute one tool call.
Accepts two body shapes:
1. {"action": {"tool": "...", "parameters": {...}}} ← inference script
2. {"tool": "...", "parameters": {...}} ← direct curl
"""
import asyncio
request = request or {}
if "action" in request:
action_data = request["action"]
else:
action_data = request
tool = action_data.get("tool", "")
parameters = action_data.get("parameters", {})
reasoning = action_data.get("reasoning", "")
if not tool:
return JSONResponse(
status_code=400,
content={"error": "'tool' field is required. Body must be {'action': {'tool': '...', 'parameters': {...}}}"},
)
def _do():
with _env_lock:
obs = _env.step(ToolCall(tool=tool, parameters=parameters, reasoning=reasoning))
return _serialize(obs)
loop = asyncio.get_event_loop()
obs_dict = await loop.run_in_executor(None, _do)
return {
"observation": obs_dict,
"reward": obs_dict.get("reward", 0.0),
"done": obs_dict.get("done", False),
}
@app.get("/tasks")
async def list_tasks():
"""List all available tasks with metadata."""
tasks = []
for tid, t in TASK_REGISTRY.items():
tasks.append({
"id": tid,
"name": t["name"],
"difficulty": t["difficulty"],
"description": t["description"],
"max_steps": t["max_steps"],
"optimal_steps": t["optimal_steps"],
})
return {
"tasks": tasks,
"action_schema": {
"tool": "string β€” one of FileRead|FileWrite|Grep|Bash|WebSearch|TodoWrite",
"parameters": "dict β€” tool-specific params",
"reasoning": "string (optional) β€” agent's reasoning",
},
}
@app.get("/grader")
async def grader_score():
"""Return the grader score for the last completed episode."""
result = get_last_grader_result()
if result is None:
return JSONResponse(
status_code=404,
content={"error": "No episode graded yet. Complete an episode first."},
)
return result
@app.get("/health")
async def health():
return {"status": "ok", "env": "agentops-gym"}
def main():
"""Entry point for running the AgentOps Gym server."""
import uvicorn
import os
host = os.getenv("HOST", "0.0.0.0")
port = int(os.getenv("PORT", "8000"))
uvicorn.run(app, host=host, port=int(port))
if __name__ == "__main__":
main()