Spaces:
Sleeping
Sleeping
| from openenv.core.env_server.interfaces import Environment | |
| from models import MazeAction, MazeObservation, MazeState | |
| class MazeEnvironment(Environment): | |
| """ | |
| A grid-based maze environment for RL agents. | |
| The agent starts at [0,0] and must navigate to the goal at [3,3]. | |
| Grid values: 1 = open path, 0 = wall. | |
| Rewards: +10 for reaching goal, -1 per step. | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self): | |
| self._maze = [ | |
| [1, 0, 1, 1], | |
| [1, 1, 0, 1], | |
| [1, 1, 0, 1], | |
| [1, 1, 1, 1], | |
| ] | |
| self._agent_pos = [0, 0] | |
| self._goal_pos = [3, 3] | |
| self._step_count = 0 | |
| self._episode_id = None | |
| self.row = 4 | |
| self.col = 4 | |
| def state(self) -> MazeState: | |
| "Return current environment state." | |
| return MazeState( | |
| maze=self._maze, agent_pos=self._agent_pos, goal_pos=self._goal_pos, | |
| episode_id=self._episode_id, step_count=self._step_count | |
| ) | |
| def reset(self, seed=None, episode_id=None, **kwargs) -> MazeObservation: | |
| "Reset the environment to initial state and return starting observation." | |
| self._agent_pos = [0, 0] | |
| self._step_count = 0 | |
| self._episode_id = episode_id | |
| return MazeObservation(position=self._agent_pos, grid_view=self._render(), done=False, reward=0) | |
| def step(self, action: MazeAction, timeout_s=None, **kwargs) -> MazeObservation: | |
| if action.direction not in ["up", "down", "left", "right"]: | |
| return MazeObservation(position=self._agent_pos, grid_view=self._render(), done=False, reward=0) | |
| self._move(action.direction) | |
| self._step_count += 1 | |
| done = self._agent_pos == self._goal_pos | |
| reward = 10 if done else -1 | |
| return MazeObservation(position=self._agent_pos, grid_view=self._render(), done=done, reward=reward) | |
| def _is_valid(self, x: int, y: int) -> bool: | |
| "Check if position (x, y) is within bounds and not a wall." | |
| return 0 <= x < self.row and 0 <= y < self.col and self._maze[x][y] != 0 | |
| def _move(self, direction: str) -> bool: | |
| "Move agent in direction if valid. Returns True if move succeeded." | |
| i, j = self._agent_pos | |
| if direction == 'up': i -= 1 | |
| elif direction == 'down': i += 1 | |
| elif direction == 'left': j -= 1 | |
| elif direction == 'right': j += 1 | |
| if self._is_valid(i, j): | |
| self._agent_pos = [i, j] | |
| return True | |
| return False | |
| def _render(self) -> str: | |
| "Return string visualization of maze with A=agent, G=goal, #=wall, .=open." | |
| symbols = {0: '#', 1: '.'} | |
| result = "" | |
| for i in range(self.row): | |
| line = "" | |
| for j in range(self.col): | |
| if [i, j] == self._agent_pos: line += "A " | |
| elif [i, j] == self._goal_pos: line += "G " | |
| else: line += symbols[self._maze[i][j]] + " " | |
| result += line + "\n" | |
| return result | |