Spaces:
Running
Running
File size: 6,131 Bytes
0e9396b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Echo Environment Implementation.
A pure MCP environment that echoes back messages sent to it.
This demonstrates how to build an MCPEnvironment with inline FastMCP tools.
All interactions happen through MCP tools:
- `echo_message(message)`: Echo back the provided message
- `echo_with_length(message)`: Echo back the message with its length
Example:
>>> from openenv.core.env_server.mcp_types import ListToolsAction, CallToolAction
>>> env = EchoEnvironment()
>>> env.reset()
>>>
>>> # List available tools
>>> obs = env.step(ListToolsAction())
>>> print([t.name for t in obs.tools]) # ["echo_message", "echo_with_length"]
>>>
>>> # Call a tool
>>> obs = env.step(CallToolAction(tool_name="echo_message", arguments={"message": "Hi!"}))
>>> print(obs.result) # "Hi!"
"""
from typing import Any, Optional
from uuid import uuid4
# Support both in-repo and standalone imports
try:
# In-repo imports (when running from OpenEnv repository)
from openenv.core.env_server.mcp_environment import MCPEnvironment
from openenv.core.env_server.types import Action, Observation, State
except ImportError:
# Standalone imports (when environment is standalone with openenv from pip)
from openenv.core.env_server.mcp_environment import MCPEnvironment
from openenv.core.env_server.types import Action, Observation, State
from fastmcp import FastMCP
class EchoEnvironment(MCPEnvironment):
"""
A pure MCP echo environment that echoes back messages.
This environment exposes all functionality through MCP tools:
- `echo_message`: Echo back the provided message
- `echo_with_length`: Echo back the message with its length
The environment inherits MCP support (ListToolsAction, CallToolAction)
from the MCPEnvironment base class. No legacy action types are supported.
Example using MCPToolClient:
>>> from openenv.core.mcp_client import MCPToolClient
>>>
>>> with MCPToolClient(base_url="http://localhost:8000") as env:
... env.reset()
... tools = env.list_tools()
... print([t.name for t in tools])
... result = env.call_tool("echo_message", message="Hello!")
... print(result)
"""
def __init__(self):
"""Initialize the echo environment with MCP server and tools."""
# Create MCP server and define tools inline
mcp = FastMCP("echo_env")
@mcp.tool
def echo_message(message: str) -> str:
"""
Echo back the provided message.
Args:
message: The message to echo back
Returns:
The same message that was provided
"""
return message
@mcp.tool
def echo_with_length(message: str) -> dict:
"""
Echo back the message with its length.
Args:
message: The message to echo back
Returns:
Dictionary with the message and its length
"""
return {"message": message, "length": len(message)}
# Pass the MCP server to the base class
super().__init__(mcp)
self._state = State(episode_id=str(uuid4()), step_count=0)
self._reset_count = 0
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
**kwargs: Any,
) -> Observation:
"""
Reset the environment.
Args:
seed: Optional random seed (unused in echo env)
episode_id: Optional episode ID to use
**kwargs: Additional reset options
Returns:
Observation indicating the environment is ready
"""
self._state = State(
episode_id=episode_id or str(uuid4()),
step_count=0,
)
self._reset_count += 1
return Observation(
done=False,
reward=0.0,
metadata={"status": "ready", "message": "Echo environment ready!"},
)
def _step_impl(
self,
action: Action,
timeout_s: Optional[float] = None,
**kwargs: Any,
) -> Observation:
"""
Handle non-MCP actions.
This environment only supports MCP actions (ListToolsAction, CallToolAction).
Any other action type returns an error observation.
Args:
action: The action to execute
timeout_s: Optional timeout (unused)
**kwargs: Additional arguments
Returns:
Observation with error for unknown action types
"""
return Observation(
done=False,
reward=0.0,
metadata={
"error": f"Unknown action type: {type(action).__name__}. "
"Use ListToolsAction or CallToolAction for MCP interactions."
},
)
def step(
self,
action: Action,
timeout_s: Optional[float] = None,
**kwargs: Any,
) -> Observation:
"""
Execute a step in the environment.
Delegates to base class for MCP actions. Increments step count for all actions.
Args:
action: The MCP action to execute (ListToolsAction or CallToolAction)
timeout_s: Optional timeout for the action
**kwargs: Additional arguments
Returns:
Observation from the action execution
"""
# Increment step count for all actions
self._state.step_count += 1
# Let the base class handle MCP actions and non-MCP routing
return super().step(action, timeout_s=timeout_s, **kwargs)
@property
def state(self) -> State:
"""
Get the current environment state.
Returns:
Current State with episode_id and step_count
"""
return self._state
|