# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """ Maze OpenEnv Environment Implementation. A grid maze environment where an agent navigates from start to goal. """ from uuid import uuid4 from openenv.core.env_server.interfaces import Environment from models import MazeAction, MazeObservation, MazeState class MazeEnvironment(Environment): """ A grid maze environment where an agent navigates to a goal. The agent can move up/down/left/right through walkable cells (1) and cannot pass through walls (0). Rewards: +10 for reaching the goal -1 for each step (encourages efficiency) """ SUPPORTS_CONCURRENT_SESSIONS: bool = True def __init__(self): super().__init__() self._maze = [ [1, 0, 1, 1], [1, 1, 0, 1], [1, 1, 0, 1], [1, 1, 1, 1], ] self._agent_pos = [0, 0] self._goal_pos = [3, 3] self._step_count = 0 self._episode_id = None self.row = 4 self.col = 4 @property def state(self) -> MazeState: return MazeState( maze=self._maze, agent_pos=self._agent_pos, goal_pos=self._goal_pos, episode_id=self._episode_id, step_count=self._step_count ) def reset(self, seed=None, episode_id=None, **kwargs) -> MazeObservation: self._agent_pos = [0, 0] self._step_count = 0 self._episode_id = episode_id or str(uuid4()) return MazeObservation( position=self._agent_pos.copy(), grid_view=self._render(), done=False, reward=0 ) def step(self, action: MazeAction, timeout_s=None, **kwargs) -> MazeObservation: if action.direction not in ["up", "down", "left", "right"]: return MazeObservation( position=self._agent_pos.copy(), grid_view=self._render(), done=False, reward=-1 ) self._move(action.direction) self._step_count += 1 done = self._agent_pos == self._goal_pos reward = 10 if done else -1 return MazeObservation( position=self._agent_pos.copy(), grid_view=self._render(), done=done, reward=reward ) def _is_valid(self, x: int, y: int) -> bool: return 0 <= x < self.row and 0 <= y < self.col and self._maze[x][y] != 0 def _move(self, direction: str) -> bool: i, j = self._agent_pos if direction == 'up': i -= 1 elif direction == 'down': i += 1 elif direction == 'left': j -= 1 elif direction == 'right': j += 1 if self._is_valid(i, j): self._agent_pos = [i, j] return True return False def _render(self) -> str: symbols = {0: '#', 1: '.'} result = "" for i in range(self.row): line = "" for j in range(self.col): if [i, j] == self._agent_pos: line += "A " elif [i, j] == self._goal_pos: line += "G " else: line += symbols[self._maze[i][j]] + " " result += line + "\n" return result