| | |
| | |
| | |
| | |
| | |
| |
|
| | """ |
| | Generic MCP Environment Client with WebSocket Support. |
| | |
| | This module provides a generic client for connecting to any MCP Environment |
| | server over WebSocket (for persistent sessions) or HTTP. It's fully reusable |
| | across different MCP integrations. |
| | """ |
| |
|
| | from typing import Any, Dict |
| |
|
| | from openenv.core.client_types import StepResult |
| | from openenv.core.env_server.types import State |
| | from openenv.core.env_client import EnvClient |
| |
|
| | from .data_models import MCPAction, MCPObservation |
| |
|
| |
|
| | class MCPEnvClient(EnvClient[MCPAction, MCPObservation, State]): |
| | """ |
| | Generic WebSocket client for MCP Environment. |
| | |
| | This client connects to any MCPEnvironment server via WebSocket and provides |
| | methods to interact with it: reset(), step(), and state access. |
| | Each client maintains a persistent WebSocket connection for better performance. |
| | |
| | Example: |
| | >>> # Connect to a running server using context manager (recommended) |
| | >>> with MCPEnvClient(base_url="http://localhost:8004") as client: |
| | ... result = client.reset() |
| | ... print(result.observation.metadata) |
| | ... |
| | ... # List tools |
| | ... result = client.step(MCPAction(action_type="ListToolsAction")) |
| | ... print(result.observation.tools_list) |
| | ... |
| | ... # Call a tool |
| | ... result = client.step(MCPAction( |
| | ... action_type="ToolCallAction", |
| | ... tool_name="create_resource", |
| | ... arguments={"name": "Test"} |
| | ... )) |
| | ... print(result.observation.tool_result) |
| | ... print(result.reward) |
| | |
| | Example with manual cleanup: |
| | >>> try: |
| | ... client = MCPEnvClient(base_url="http://localhost:8004") |
| | ... result = client.reset() |
| | ... result = client.step(MCPAction(action_type="ListToolsAction")) |
| | ... finally: |
| | ... client.close() |
| | |
| | Example with Docker: |
| | >>> # Automatically start container and connect |
| | >>> client = MCPEnvClient.from_docker_image("calendar-env:latest") |
| | >>> result = client.reset() |
| | >>> result = client.step(MCPAction(action_type="ListToolsAction")) |
| | """ |
| |
|
| | def _step_payload(self, action: MCPAction) -> Dict: |
| | """ |
| | Convert MCPAction to JSON payload for step request. |
| | |
| | Args: |
| | action: MCPAction instance |
| | |
| | Returns: |
| | Dictionary representation suitable for JSON encoding |
| | """ |
| | payload = { |
| | "action_type": action.action_type, |
| | } |
| | |
| | if action.tool_name is not None: |
| | payload["tool_name"] = action.tool_name |
| | |
| | if action.arguments is not None: |
| | payload["arguments"] = action.arguments |
| | |
| | return payload |
| |
|
| | def _parse_result(self, payload: Dict) -> StepResult[MCPObservation]: |
| | """ |
| | Parse server response into StepResult[MCPObservation]. |
| | |
| | Args: |
| | payload: JSON response from server |
| | |
| | Returns: |
| | StepResult with MCPObservation |
| | """ |
| | obs_data = payload.get("observation", {}) |
| | observation = MCPObservation( |
| | success=obs_data.get("success", True), |
| | error_message=obs_data.get("error_message"), |
| | tools_list=obs_data.get("tools_list"), |
| | tool_result=obs_data.get("tool_result"), |
| | done=payload.get("done", False), |
| | reward=payload.get("reward"), |
| | metadata=obs_data.get("metadata", {}), |
| | ) |
| |
|
| | return StepResult( |
| | observation=observation, |
| | reward=payload.get("reward"), |
| | done=payload.get("done", False), |
| | ) |
| |
|
| | def _parse_state(self, payload: Dict) -> State: |
| | """ |
| | Parse server response into State object. |
| | |
| | Args: |
| | payload: JSON response from /state endpoint |
| | |
| | Returns: |
| | State object with episode_id and step_count |
| | """ |
| | return State( |
| | episode_id=payload.get("episode_id"), |
| | step_count=payload.get("step_count", 0), |
| | ) |