Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| Maze OpenEnv Environment Implementation. | |
| A grid maze environment where an agent navigates from start to goal. | |
| """ | |
| from uuid import uuid4 | |
| from openenv.core.env_server.interfaces import Environment | |
| from models import MazeAction, MazeObservation, MazeState | |
| class MazeEnvironment(Environment): | |
| """ | |
| A grid maze environment where an agent navigates to a goal. | |
| The agent can move up/down/left/right through walkable cells (1) | |
| and cannot pass through walls (0). | |
| Rewards: | |
| +10 for reaching the goal | |
| -1 for each step (encourages efficiency) | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self): | |
| super().__init__() | |
| self._maze = [ | |
| [1, 0, 1, 1], | |
| [1, 1, 0, 1], | |
| [1, 1, 0, 1], | |
| [1, 1, 1, 1], | |
| ] | |
| self._agent_pos = [0, 0] | |
| self._goal_pos = [3, 3] | |
| self._step_count = 0 | |
| self._episode_id = None | |
| self.row = 4 | |
| self.col = 4 | |
| def state(self) -> MazeState: | |
| return MazeState( | |
| maze=self._maze, | |
| agent_pos=self._agent_pos, | |
| goal_pos=self._goal_pos, | |
| episode_id=self._episode_id, | |
| step_count=self._step_count | |
| ) | |
| def reset(self, seed=None, episode_id=None, **kwargs) -> MazeObservation: | |
| self._agent_pos = [0, 0] | |
| self._step_count = 0 | |
| self._episode_id = episode_id or str(uuid4()) | |
| return MazeObservation( | |
| position=self._agent_pos.copy(), | |
| grid_view=self._render(), | |
| done=False, | |
| reward=0 | |
| ) | |
| def step(self, action: MazeAction, timeout_s=None, **kwargs) -> MazeObservation: | |
| if action.direction not in ["up", "down", "left", "right"]: | |
| return MazeObservation( | |
| position=self._agent_pos.copy(), | |
| grid_view=self._render(), | |
| done=False, | |
| reward=-1 | |
| ) | |
| self._move(action.direction) | |
| self._step_count += 1 | |
| done = self._agent_pos == self._goal_pos | |
| reward = 10 if done else -1 | |
| return MazeObservation( | |
| position=self._agent_pos.copy(), | |
| grid_view=self._render(), | |
| done=done, | |
| reward=reward | |
| ) | |
| def _is_valid(self, x: int, y: int) -> bool: | |
| return 0 <= x < self.row and 0 <= y < self.col and self._maze[x][y] != 0 | |
| def _move(self, direction: str) -> bool: | |
| i, j = self._agent_pos | |
| if direction == 'up': i -= 1 | |
| elif direction == 'down': i += 1 | |
| elif direction == 'left': j -= 1 | |
| elif direction == 'right': j += 1 | |
| if self._is_valid(i, j): | |
| self._agent_pos = [i, j] | |
| return True | |
| return False | |
| def _render(self) -> str: | |
| symbols = {0: '#', 1: '.'} | |
| result = "" | |
| for i in range(self.row): | |
| line = "" | |
| for j in range(self.col): | |
| if [i, j] == self._agent_pos: line += "A " | |
| elif [i, j] == self._goal_pos: line += "G " | |
| else: line += symbols[self._maze[i][j]] + " " | |
| result += line + "\n" | |
| return result | |