Spaces:
Sleeping
Sleeping
| """ | |
| AgentOps Gym β FastAPI application. | |
| Exposes the OpenEnv-compatible HTTP + WebSocket API via openenv-core's | |
| create_app(), plus custom endpoints: /tasks, /grader, /health. | |
| A persistent singleton environment handles HTTP /reset and /step (for | |
| the baseline script and interactive testing). WebSocket connections each | |
| get their own AgentOpsEnvironment instance (via create_app factory pattern). | |
| """ | |
| import threading | |
| import logging | |
| import os | |
| from typing import Optional | |
| from fastapi.responses import JSONResponse | |
| try: | |
| from openenv.core.env_server.http_server import create_app | |
| except ImportError: | |
| raise ImportError("openenv is required. Install with 'pip install openenv-core'") | |
| try: | |
| from agentops_gym.models import ToolCall, AgentObservation | |
| from agentops_gym.server.environment import AgentOpsEnvironment, get_last_grader_result | |
| from agentops_gym.server.tasks import TASK_REGISTRY | |
| except (ModuleNotFoundError, ImportError): | |
| from models import ToolCall, AgentObservation | |
| from server.environment import AgentOpsEnvironment, get_last_grader_result | |
| from server.tasks import TASK_REGISTRY | |
| logger = logging.getLogger(__name__) | |
| app = create_app( | |
| AgentOpsEnvironment, | |
| ToolCall, | |
| AgentObservation, | |
| env_name="agentops-gym", | |
| ) | |
| _env = AgentOpsEnvironment() | |
| _env_lock = threading.Lock() | |
| def _serialize(obs: AgentObservation) -> dict: | |
| return obs.model_dump() if hasattr(obs, "model_dump") else obs.dict() | |
| app.router.routes = [ | |
| r for r in app.router.routes | |
| if not (hasattr(r, "path") and r.path in ("/reset", "/step")) | |
| ] | |
| async def stateful_reset(request: dict = None): | |
| """Reset environment for a new episode. Pass {'task_id': 'task_1'} etc.""" | |
| import asyncio | |
| request = request or {} | |
| task_id = request.get("task_id", "task_1") | |
| def _do(): | |
| with _env_lock: | |
| obs = _env.reset(task_id=task_id) | |
| return _serialize(obs) | |
| loop = asyncio.get_event_loop() | |
| obs_dict = await loop.run_in_executor(None, _do) | |
| return {"observation": obs_dict, "reward": 0.0, "done": False} | |
| async def stateful_step(request: dict = None): | |
| """Execute one tool call. | |
| Accepts two body shapes: | |
| 1. {"action": {"tool": "...", "parameters": {...}}} β inference script | |
| 2. {"tool": "...", "parameters": {...}} β direct curl | |
| """ | |
| import asyncio | |
| request = request or {} | |
| if "action" in request: | |
| action_data = request["action"] | |
| else: | |
| action_data = request | |
| tool = action_data.get("tool", "") | |
| parameters = action_data.get("parameters", {}) | |
| reasoning = action_data.get("reasoning", "") | |
| if not tool: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "'tool' field is required. Body must be {'action': {'tool': '...', 'parameters': {...}}}"}, | |
| ) | |
| def _do(): | |
| with _env_lock: | |
| obs = _env.step(ToolCall(tool=tool, parameters=parameters, reasoning=reasoning)) | |
| return _serialize(obs) | |
| loop = asyncio.get_event_loop() | |
| obs_dict = await loop.run_in_executor(None, _do) | |
| return { | |
| "observation": obs_dict, | |
| "reward": obs_dict.get("reward", 0.0), | |
| "done": obs_dict.get("done", False), | |
| } | |
| async def list_tasks(): | |
| """List all available tasks with metadata.""" | |
| tasks = [] | |
| for tid, t in TASK_REGISTRY.items(): | |
| tasks.append({ | |
| "id": tid, | |
| "name": t["name"], | |
| "difficulty": t["difficulty"], | |
| "description": t["description"], | |
| "max_steps": t["max_steps"], | |
| "optimal_steps": t["optimal_steps"], | |
| }) | |
| return { | |
| "tasks": tasks, | |
| "action_schema": { | |
| "tool": "string β one of FileRead|FileWrite|Grep|Bash|WebSearch|TodoWrite", | |
| "parameters": "dict β tool-specific params", | |
| "reasoning": "string (optional) β agent's reasoning", | |
| }, | |
| } | |
| async def grader_score(): | |
| """Return the grader score for the last completed episode.""" | |
| result = get_last_grader_result() | |
| if result is None: | |
| return JSONResponse( | |
| status_code=404, | |
| content={"error": "No episode graded yet. Complete an episode first."}, | |
| ) | |
| return result | |
| async def health(): | |
| return {"status": "ok", "env": "agentops-gym"} | |
| def main(): | |
| """Entry point for running the AgentOps Gym server.""" | |
| import uvicorn | |
| import os | |
| host = os.getenv("HOST", "0.0.0.0") | |
| port = int(os.getenv("PORT", "8000")) | |
| uvicorn.run(app, host=host, port=int(port)) | |
| if __name__ == "__main__": | |
| main() | |