File size: 6,131 Bytes
0e9396b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

"""
Echo Environment Implementation.

A pure MCP environment that echoes back messages sent to it.
This demonstrates how to build an MCPEnvironment with inline FastMCP tools.

All interactions happen through MCP tools:
- `echo_message(message)`: Echo back the provided message
- `echo_with_length(message)`: Echo back the message with its length

Example:
    >>> from openenv.core.env_server.mcp_types import ListToolsAction, CallToolAction
    >>> env = EchoEnvironment()
    >>> env.reset()
    >>>
    >>> # List available tools
    >>> obs = env.step(ListToolsAction())
    >>> print([t.name for t in obs.tools])  # ["echo_message", "echo_with_length"]
    >>>
    >>> # Call a tool
    >>> obs = env.step(CallToolAction(tool_name="echo_message", arguments={"message": "Hi!"}))
    >>> print(obs.result)  # "Hi!"
"""

from typing import Any, Optional
from uuid import uuid4

# Support both in-repo and standalone imports
try:
    # In-repo imports (when running from OpenEnv repository)
    from openenv.core.env_server.mcp_environment import MCPEnvironment
    from openenv.core.env_server.types import Action, Observation, State
except ImportError:
    # Standalone imports (when environment is standalone with openenv from pip)
    from openenv.core.env_server.mcp_environment import MCPEnvironment
    from openenv.core.env_server.types import Action, Observation, State

from fastmcp import FastMCP


class EchoEnvironment(MCPEnvironment):
    """
    A pure MCP echo environment that echoes back messages.

    This environment exposes all functionality through MCP tools:
    - `echo_message`: Echo back the provided message
    - `echo_with_length`: Echo back the message with its length

    The environment inherits MCP support (ListToolsAction, CallToolAction)
    from the MCPEnvironment base class. No legacy action types are supported.

    Example using MCPToolClient:
        >>> from openenv.core.mcp_client import MCPToolClient
        >>>
        >>> with MCPToolClient(base_url="http://localhost:8000") as env:
        ...     env.reset()
        ...     tools = env.list_tools()
        ...     print([t.name for t in tools])
        ...     result = env.call_tool("echo_message", message="Hello!")
        ...     print(result)
    """

    def __init__(self):
        """Initialize the echo environment with MCP server and tools."""
        # Create MCP server and define tools inline
        mcp = FastMCP("echo_env")

        @mcp.tool
        def echo_message(message: str) -> str:
            """
            Echo back the provided message.

            Args:
                message: The message to echo back

            Returns:
                The same message that was provided
            """
            return message

        @mcp.tool
        def echo_with_length(message: str) -> dict:
            """
            Echo back the message with its length.

            Args:
                message: The message to echo back

            Returns:
                Dictionary with the message and its length
            """
            return {"message": message, "length": len(message)}

        # Pass the MCP server to the base class
        super().__init__(mcp)
        self._state = State(episode_id=str(uuid4()), step_count=0)
        self._reset_count = 0

    def reset(
        self,
        seed: Optional[int] = None,
        episode_id: Optional[str] = None,
        **kwargs: Any,
    ) -> Observation:
        """
        Reset the environment.

        Args:
            seed: Optional random seed (unused in echo env)
            episode_id: Optional episode ID to use
            **kwargs: Additional reset options

        Returns:
            Observation indicating the environment is ready
        """
        self._state = State(
            episode_id=episode_id or str(uuid4()),
            step_count=0,
        )
        self._reset_count += 1

        return Observation(
            done=False,
            reward=0.0,
            metadata={"status": "ready", "message": "Echo environment ready!"},
        )

    def _step_impl(
        self,
        action: Action,
        timeout_s: Optional[float] = None,
        **kwargs: Any,
    ) -> Observation:
        """
        Handle non-MCP actions.

        This environment only supports MCP actions (ListToolsAction, CallToolAction).
        Any other action type returns an error observation.

        Args:
            action: The action to execute
            timeout_s: Optional timeout (unused)
            **kwargs: Additional arguments

        Returns:
            Observation with error for unknown action types
        """
        return Observation(
            done=False,
            reward=0.0,
            metadata={
                "error": f"Unknown action type: {type(action).__name__}. "
                "Use ListToolsAction or CallToolAction for MCP interactions."
            },
        )

    def step(
        self,
        action: Action,
        timeout_s: Optional[float] = None,
        **kwargs: Any,
    ) -> Observation:
        """
        Execute a step in the environment.

        Delegates to base class for MCP actions. Increments step count for all actions.

        Args:
            action: The MCP action to execute (ListToolsAction or CallToolAction)
            timeout_s: Optional timeout for the action
            **kwargs: Additional arguments

        Returns:
            Observation from the action execution
        """
        # Increment step count for all actions
        self._state.step_count += 1

        # Let the base class handle MCP actions and non-MCP routing
        return super().step(action, timeout_s=timeout_s, **kwargs)

    @property
    def state(self) -> State:
        """
        Get the current environment state.

        Returns:
            Current State with episode_id and step_count
        """
        return self._state