""" AgentOps Gym — Environment client. Wraps WebSocket communication with the environment server. The validator calls AgentOpsEnv.from_docker_image(IMAGE_NAME) which starts the Docker container and connects via WebSocket automatically. """ from typing import Dict, Any, Optional from openenv.core.env_client import EnvClient from openenv.core.client_types import StepResult from agentops_gym.models import ToolCall, AgentObservation, AgentState class AgentOpsEnv(EnvClient[ToolCall, AgentObservation, AgentState]): """Client for the AgentOps Gym environment.""" def _step_payload(self, action: ToolCall) -> Dict[str, Any]: """Serialize ToolCall to the JSON payload the server expects.""" payload = { "tool": action.tool, "parameters": action.parameters, } if action.reasoning is not None: payload["reasoning"] = action.reasoning return payload def _parse_result(self, payload: Dict[str, Any]) -> StepResult[AgentObservation]: """Parse server response into a typed StepResult.""" obs_data = payload.get("observation", {}) # Merge top-level done/reward into obs_data if not already there done = payload.get("done", obs_data.get("done", False)) reward = payload.get("reward", obs_data.get("reward", None)) # Strip unknown fields so Pydantic doesn't choke known = { "visible_files", "last_tool_result", "action_history", "step_count", "task_description", "message", "done", "reward", "metadata", } clean = {k: v for k, v in obs_data.items() if k in known} clean["done"] = done clean["reward"] = reward obs = AgentObservation(**clean) return StepResult( observation=obs, reward=reward, done=done, ) def _parse_state(self, payload: Dict[str, Any]) -> AgentState: """Parse server state response into typed AgentState.""" known = { "episode_id", "step_count", "task_id", "task_description", "difficulty", "max_steps", "visible_files", "discovered_files", "action_history", "current_reward", "completed", "grader_score", } clean = {k: v for k, v in payload.items() if k in known} return AgentState(**clean)