santosh-iima's picture
Upload 91 files
beb8990 verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Generic MCP Environment Client with WebSocket Support.
This module provides a generic client for connecting to any MCP Environment
server over WebSocket (for persistent sessions) or HTTP. It's fully reusable
across different MCP integrations.
"""
from typing import Any, Dict
from openenv.core.client_types import StepResult
from openenv.core.env_server.types import State
from openenv.core.env_client import EnvClient
from .data_models import MCPAction, MCPObservation
class MCPEnvClient(EnvClient[MCPAction, MCPObservation, State]):
"""
Generic WebSocket client for MCP Environment.
This client connects to any MCPEnvironment server via WebSocket and provides
methods to interact with it: reset(), step(), and state access.
Each client maintains a persistent WebSocket connection for better performance.
Example:
>>> # Connect to a running server using context manager (recommended)
>>> with MCPEnvClient(base_url="http://localhost:8004") as client:
... result = client.reset()
... print(result.observation.metadata)
...
... # List tools
... result = client.step(MCPAction(action_type="ListToolsAction"))
... print(result.observation.tools_list)
...
... # Call a tool
... result = client.step(MCPAction(
... action_type="ToolCallAction",
... tool_name="create_resource",
... arguments={"name": "Test"}
... ))
... print(result.observation.tool_result)
... print(result.reward)
Example with manual cleanup:
>>> try:
... client = MCPEnvClient(base_url="http://localhost:8004")
... result = client.reset()
... result = client.step(MCPAction(action_type="ListToolsAction"))
... finally:
... client.close()
Example with Docker:
>>> # Automatically start container and connect
>>> client = MCPEnvClient.from_docker_image("calendar-env:latest")
>>> result = client.reset()
>>> result = client.step(MCPAction(action_type="ListToolsAction"))
"""
def _step_payload(self, action: MCPAction) -> Dict:
"""
Convert MCPAction to JSON payload for step request.
Args:
action: MCPAction instance
Returns:
Dictionary representation suitable for JSON encoding
"""
payload = {
"action_type": action.action_type,
}
if action.tool_name is not None:
payload["tool_name"] = action.tool_name
if action.arguments is not None:
payload["arguments"] = action.arguments
return payload
def _parse_result(self, payload: Dict) -> StepResult[MCPObservation]:
"""
Parse server response into StepResult[MCPObservation].
Args:
payload: JSON response from server
Returns:
StepResult with MCPObservation
"""
obs_data = payload.get("observation", {})
observation = MCPObservation(
success=obs_data.get("success", True),
error_message=obs_data.get("error_message"),
tools_list=obs_data.get("tools_list"),
tool_result=obs_data.get("tool_result"),
done=payload.get("done", False),
reward=payload.get("reward"),
metadata=obs_data.get("metadata", {}),
)
return StepResult(
observation=observation,
reward=payload.get("reward"),
done=payload.get("done", False),
)
def _parse_state(self, payload: Dict) -> State:
"""
Parse server response into State object.
Args:
payload: JSON response from /state endpoint
Returns:
State object with episode_id and step_count
"""
return State(
episode_id=payload.get("episode_id"),
step_count=payload.get("step_count", 0),
)