File size: 2,366 Bytes
e2eb9d7
 
 
 
58ebeba
 
e2eb9d7
 
58ebeba
e2eb9d7
 
 
58ebeba
e2eb9d7
 
 
 
 
 
58ebeba
 
 
 
 
 
 
 
e2eb9d7
 
58ebeba
e2eb9d7
58ebeba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e2eb9d7
 
58ebeba
 
e2eb9d7
 
 
58ebeba
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
"""
AgentOps Gym — Environment client.

Wraps WebSocket communication with the environment server.
The validator calls AgentOpsEnv.from_docker_image(IMAGE_NAME) which
starts the Docker container and connects via WebSocket automatically.
"""

from typing import Dict, Any, Optional
from openenv.core.env_client import EnvClient
from openenv.core.client_types import StepResult

from agentops_gym.models import ToolCall, AgentObservation, AgentState


class AgentOpsEnv(EnvClient[ToolCall, AgentObservation, AgentState]):
    """Client for the AgentOps Gym environment."""

    def _step_payload(self, action: ToolCall) -> Dict[str, Any]:
        """Serialize ToolCall to the JSON payload the server expects."""
        payload = {
            "tool": action.tool,
            "parameters": action.parameters,
        }
        if action.reasoning is not None:
            payload["reasoning"] = action.reasoning
        return payload

    def _parse_result(self, payload: Dict[str, Any]) -> StepResult[AgentObservation]:
        """Parse server response into a typed StepResult."""
        obs_data = payload.get("observation", {})

        # Merge top-level done/reward into obs_data if not already there
        done   = payload.get("done", obs_data.get("done", False))
        reward = payload.get("reward", obs_data.get("reward", None))

        # Strip unknown fields so Pydantic doesn't choke
        known = {
            "visible_files", "last_tool_result", "action_history",
            "step_count", "task_description", "message",
            "done", "reward", "metadata",
        }
        clean = {k: v for k, v in obs_data.items() if k in known}
        clean["done"]   = done
        clean["reward"] = reward

        obs = AgentObservation(**clean)
        return StepResult(
            observation=obs,
            reward=reward,
            done=done,
        )

    def _parse_state(self, payload: Dict[str, Any]) -> AgentState:
        """Parse server state response into typed AgentState."""
        known = {
            "episode_id", "step_count", "task_id", "task_description",
            "difficulty", "max_steps", "visible_files", "discovered_files",
            "action_history", "current_reward", "completed", "grader_score",
        }
        clean = {k: v for k, v in payload.items() if k in known}
        return AgentState(**clean)