Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| Grid World Environment Implementation. | |
| A simple test environment that echoes back messages sent to it. | |
| Perfect for testing HTTP server infrastructure. | |
| """ | |
| from uuid import uuid4 | |
| from openenv.core.env_server.interfaces import Environment | |
| from openenv.core.env_server.types import State | |
| from grid_world.models import ( | |
| GridWorldAction, | |
| GridWorldObservation, | |
| GridWorldState, | |
| MoveAction, | |
| ) | |
| class GridWorldEnvironment(Environment): | |
| """ | |
| A simple echo environment that echoes back messages. | |
| This environment is designed for testing the HTTP server infrastructure. | |
| It maintains minimal state and simply echoes back whatever message it receives. | |
| Example: | |
| >>> env = GridWorldEnvironment() | |
| >>> obs = env.reset() | |
| >>> print(obs.echoed_message) # "Grid World environment ready!" | |
| >>> | |
| >>> obs = env.step(GridWorldAction(message="Hello")) | |
| >>> print(obs.echoed_message) # "Hello" | |
| >>> print(obs.message_length) # 5 | |
| """ | |
| # Enable concurrent WebSocket sessions. | |
| # Set to True if your environment isolates state between instances. | |
| # When True, multiple WebSocket clients can connect simultaneously, each | |
| # getting their own environment instance (when using factory mode in app.py). | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self): | |
| """Initialize the grid_world environment.""" | |
| super().__init__() | |
| self.grid_size = 5 | |
| self.goal_pos = [4, 4] | |
| self._state = GridWorldState( | |
| agent_x=0, | |
| agent_y=0, | |
| goal_x=self.goal_pos[0], | |
| goal_y=self.goal_pos[1], | |
| grid_size=self.grid_size, | |
| episode_steps=0 | |
| ) | |
| def reset(self) -> GridWorldObservation: | |
| """ | |
| Reset the environment. | |
| Returns: | |
| GridWorldObservation | |
| """ | |
| self._state.episode_id = str(uuid4()) | |
| self._state.step_count = 0 | |
| # Update State | |
| self._state.agent_x = 0 | |
| self._state.agent_y = 0 | |
| self._state.episode_steps = 0 | |
| return GridWorldObservation( | |
| x=self._state.agent_x, | |
| y=self._state.agent_y, | |
| suggested_action=self._next_action(self._state.agent_x, self._state.agent_y), | |
| message="Welcome to Grid World! Goal is at [4, 4].", | |
| reward=None, | |
| done=False | |
| ) | |
| def step(self, action: GridWorldAction) -> GridWorldObservation: # type: ignore[override] | |
| """ | |
| Execute a step in the environment by echoing the message. | |
| Args: | |
| action: GridWorldAction | |
| Returns: | |
| GridWorldObservation | |
| """ | |
| self._state.step_count += 1 | |
| self._state.episode_steps += 1 | |
| current_x = self._state.agent_x | |
| current_y = self._state.agent_y | |
| move = action.action | |
| if move == MoveAction.UP: | |
| current_x -= 1 | |
| elif move == MoveAction.DOWN: | |
| current_x += 1 | |
| elif move == MoveAction.LEFT: | |
| current_y -= 1 | |
| elif move == MoveAction.RIGHT: | |
| current_y += 1 | |
| # Update state | |
| self._state.agent_x = current_x | |
| self._state.agent_y = current_y | |
| reward, done, message = self._reward(current_x, current_y) | |
| return GridWorldObservation( | |
| x=current_x, | |
| y=current_y, | |
| suggested_action=self._next_action(current_x, current_y), | |
| reward=reward, | |
| done=done, | |
| message=message | |
| ) | |
| def _next_action(self, current_x, current_y): | |
| if [current_x, current_y] == self.goal_pos: | |
| return None | |
| if current_x < self.goal_pos[0]: | |
| return MoveAction.DOWN | |
| if current_x > self.goal_pos[0]: | |
| return MoveAction.UP | |
| if current_y < self.goal_pos[1]: | |
| return MoveAction.RIGHT | |
| return MoveAction.LEFT | |
| def _reward(self, current_x, current_y): | |
| reward = 0.0 | |
| done = False | |
| message = "Keep going..." | |
| if [current_x, current_y] == self.goal_pos: | |
| reward = 1.0 | |
| done = True | |
| message = "Found the goal!" | |
| else: | |
| reward = -0.1 | |
| return reward, done, message | |
| def state(self) -> State: | |
| """ | |
| Get the current environment state. | |
| Returns: | |
| Current State with episode_id and step_count | |
| """ | |
| return self._state | |