Spaces:
Sleeping
Sleeping
File size: 4,724 Bytes
e2eb9d7 56724ad e2eb9d7 56724ad e2eb9d7 56724ad e2eb9d7 56724ad e2eb9d7 56724ad e2eb9d7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | """
AgentOps Gym β FastAPI application.
Exposes the OpenEnv-compatible HTTP + WebSocket API via openenv-core's
create_app(), plus custom endpoints: /tasks, /grader, /health.
A persistent singleton environment handles HTTP /reset and /step (for
the baseline script and interactive testing). WebSocket connections each
get their own AgentOpsEnvironment instance (via create_app factory pattern).
"""
import threading
import logging
import os
from typing import Optional
from fastapi.responses import JSONResponse
try:
from openenv.core.env_server.http_server import create_app
except ImportError:
raise ImportError("openenv is required. Install with 'pip install openenv-core'")
try:
from agentops_gym.models import ToolCall, AgentObservation
from agentops_gym.server.environment import AgentOpsEnvironment, get_last_grader_result
from agentops_gym.server.tasks import TASK_REGISTRY
except (ModuleNotFoundError, ImportError):
from models import ToolCall, AgentObservation
from server.environment import AgentOpsEnvironment, get_last_grader_result
from server.tasks import TASK_REGISTRY
logger = logging.getLogger(__name__)
app = create_app(
AgentOpsEnvironment,
ToolCall,
AgentObservation,
env_name="agentops-gym",
)
_env = AgentOpsEnvironment()
_env_lock = threading.Lock()
def _serialize(obs: AgentObservation) -> dict:
return obs.model_dump() if hasattr(obs, "model_dump") else obs.dict()
app.router.routes = [
r for r in app.router.routes
if not (hasattr(r, "path") and r.path in ("/reset", "/step"))
]
@app.post("/reset")
async def stateful_reset(request: dict = None):
"""Reset environment for a new episode. Pass {'task_id': 'task_1'} etc."""
import asyncio
request = request or {}
task_id = request.get("task_id", "task_1")
def _do():
with _env_lock:
obs = _env.reset(task_id=task_id)
return _serialize(obs)
loop = asyncio.get_event_loop()
obs_dict = await loop.run_in_executor(None, _do)
return {"observation": obs_dict, "reward": 0.0, "done": False}
@app.post("/step")
async def stateful_step(request: dict = None):
"""Execute one tool call.
Accepts two body shapes:
1. {"action": {"tool": "...", "parameters": {...}}} β inference script
2. {"tool": "...", "parameters": {...}} β direct curl
"""
import asyncio
request = request or {}
if "action" in request:
action_data = request["action"]
else:
action_data = request
tool = action_data.get("tool", "")
parameters = action_data.get("parameters", {})
reasoning = action_data.get("reasoning", "")
if not tool:
return JSONResponse(
status_code=400,
content={"error": "'tool' field is required. Body must be {'action': {'tool': '...', 'parameters': {...}}}"},
)
def _do():
with _env_lock:
obs = _env.step(ToolCall(tool=tool, parameters=parameters, reasoning=reasoning))
return _serialize(obs)
loop = asyncio.get_event_loop()
obs_dict = await loop.run_in_executor(None, _do)
return {
"observation": obs_dict,
"reward": obs_dict.get("reward", 0.0),
"done": obs_dict.get("done", False),
}
@app.get("/tasks")
async def list_tasks():
"""List all available tasks with metadata."""
tasks = []
for tid, t in TASK_REGISTRY.items():
tasks.append({
"id": tid,
"name": t["name"],
"difficulty": t["difficulty"],
"description": t["description"],
"max_steps": t["max_steps"],
"optimal_steps": t["optimal_steps"],
})
return {
"tasks": tasks,
"action_schema": {
"tool": "string β one of FileRead|FileWrite|Grep|Bash|WebSearch|TodoWrite",
"parameters": "dict β tool-specific params",
"reasoning": "string (optional) β agent's reasoning",
},
}
@app.get("/grader")
async def grader_score():
"""Return the grader score for the last completed episode."""
result = get_last_grader_result()
if result is None:
return JSONResponse(
status_code=404,
content={"error": "No episode graded yet. Complete an episode first."},
)
return result
@app.get("/health")
async def health():
return {"status": "ok", "env": "agentops-gym"}
def main():
"""Entry point for running the AgentOps Gym server."""
import uvicorn
import os
host = os.getenv("HOST", "0.0.0.0")
port = int(os.getenv("PORT", "8000"))
uvicorn.run(app, host=host, port=int(port))
if __name__ == "__main__":
main()
|