Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| Agterm Environment Implementation. | |
| A simple test environment that echoes back messages sent to it. | |
| Perfect for testing HTTP server infrastructure. | |
| """ | |
| from uuid import uuid4 | |
| from openenv_core.env_server.interfaces import Environment | |
| from openenv_core.env_server.types import State | |
| from models import agtermAction, agtermObservation | |
| from agterm.agterm import AGTerm | |
| class agtermEnvironment(Environment): | |
| """ | |
| A simple echo environment that echoes back messages. | |
| This environment is designed for testing the HTTP server infrastructure. | |
| It maintains minimal state and simply echoes back whatever message it receives. | |
| Example: | |
| >>> env = agtermEnvironment() | |
| >>> obs = env.reset() | |
| >>> print(obs.message) # "Agterm environment ready!" | |
| >>> | |
| >>> obs = env.step(agtermAction(message="echo 'Hello world'")) | |
| >>> print(obs.message) # "Hello world!" | |
| """ | |
| def __init__(self): | |
| """Initialize the AGTerm environment.""" | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._reset_count = 0 | |
| self.agterm = AGTerm("bash", interactive=True, ready_markers=["# "]) | |
| def reset(self) -> agtermObservation: | |
| """ | |
| Reset the environment. | |
| Returns: | |
| agtermObservation with a ready message | |
| """ | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._reset_count += 1 | |
| self.agterm.reset() | |
| self.agterm.read_until_ready(timeout_ms=2500, settle_ms=150) | |
| if self.agterm.is_alive(): | |
| return agtermObservation( | |
| result="Agterm environment ready!", | |
| done=False, | |
| reward=0.0, | |
| ) | |
| else: | |
| return agtermObservation( | |
| result="Agterm environment failed to start.", | |
| done=True, | |
| reward=-1.0, | |
| ) | |
| def step(self, action: agtermAction) -> agtermObservation: # type: ignore[override] | |
| """ | |
| Execute a step in the environment by echoing the message. | |
| Args: | |
| action: agtermAction containing the message to echo | |
| Returns: | |
| agtermObservation with the echoed message and its length | |
| """ | |
| self._state.step_count += 1 | |
| message = action.message | |
| result = self.agterm.send_and_read_until_ready( | |
| message, timeout_ms=10000, settle_ms=100 | |
| ) | |
| # Simple reward: longer messages get higher rewards | |
| if result: | |
| reward = 1.0 | |
| done = False | |
| else: | |
| reward = -1.0 # Penalty for no response | |
| done = True | |
| return agtermObservation( | |
| result=result, | |
| done=done, | |
| reward=reward, | |
| metadata={"original_message": message, "step": self._state.step_count}, | |
| ) | |
| def state(self) -> State: | |
| """ | |
| Get the current environment state. | |
| Returns: | |
| Current State with episode_id and step_count | |
| """ | |
| return self._state | |