Spaces:
Sleeping
Sleeping
| import random | |
| from openenv.core.env_server.interfaces import Environment | |
| from models import MazeAction, MazeObservation, MazeState | |
| from mazelib import Maze | |
| from mazelib.generate.BacktrackingGenerator import BacktrackingGenerator | |
| from mazelib.solve.ShortestPath import ShortestPath | |
| class MazeGenerator: | |
| def __init__(self, w_range=(3, 5), h_range=(3, 5)): | |
| self.w_range, self.h_range = w_range, h_range | |
| def generate(self, max_attempts=100): | |
| w, h = random.randint(*self.w_range), random.randint(*self.h_range) | |
| min_path_len = (w + h) * 2 + random.randint(-3, 5) | |
| m = Maze() | |
| m.generator = BacktrackingGenerator(w, h) | |
| m.solver = ShortestPath() | |
| for _ in range(max_attempts): | |
| m.generate() | |
| m.generate_entrances() | |
| m.solve() | |
| if len(m.solutions[0]) >= min_path_len: | |
| break | |
| return (1 - m.grid).tolist(), list(m.start), list(m.end) | |
| class MazeEnvironment(Environment): | |
| """Grid-based maze environment with random maze generation.""" | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self): | |
| self.generator = MazeGenerator() | |
| self._generate_new_maze() | |
| def _generate_new_maze(self): | |
| self._maze, self._agent_pos, self._goal_pos = self.generator.generate() | |
| self._step_count, self._episode_id = 0, None | |
| def row(self): return len(self._maze) | |
| def col(self): return len(self._maze[0]) | |
| def state(self) -> MazeState: | |
| return MazeState(maze=self._maze, agent_pos=self._agent_pos, | |
| goal_pos=self._goal_pos, episode_id=self._episode_id, | |
| step_count=self._step_count) | |
| def reset(self, seed=None, episode_id=None, **kwargs) -> MazeObservation: | |
| if seed: random.seed(seed) | |
| self._generate_new_maze() | |
| self._episode_id = episode_id | |
| return MazeObservation( | |
| position=self._agent_pos, | |
| grid_view=self._render(), | |
| valid_moves=self._get_valid_moves(), # add this | |
| done=False, | |
| reward=0 | |
| ) | |
| def _get_valid_moves(self) -> list: | |
| moves = [] | |
| i, j = self._agent_pos | |
| if i > 0 and self._maze[i-1][j]: moves.append("up") | |
| if i < self.row-1 and self._maze[i+1][j]: moves.append("down") | |
| if j > 0 and self._maze[i][j-1]: moves.append("left") | |
| if j < self.col-1 and self._maze[i][j+1]: moves.append("right") | |
| return moves | |
| def step(self, action: MazeAction, timeout_s=None, **kwargs) -> MazeObservation: | |
| if action.direction in ["up", "down", "left", "right"]: | |
| self._move(action.direction) | |
| self._step_count += 1 | |
| done = self._agent_pos == self._goal_pos | |
| return MazeObservation( | |
| position=self._agent_pos, | |
| grid_view=self._render(), | |
| valid_moves=self._get_valid_moves(), # add this | |
| done=done, | |
| reward=10 if done else -1 | |
| ) | |
| def _move(self, d): | |
| i, j = self._agent_pos | |
| if d == 'up': i -= 1 | |
| elif d == 'down': i += 1 | |
| elif d == 'left': j -= 1 | |
| elif d == 'right': j += 1 | |
| if 0 <= i < self.row and 0 <= j < self.col and self._maze[i][j]: | |
| self._agent_pos = [i, j] | |
| def _render(self) -> str: | |
| symbols = {0: '#', 1: '.'} | |
| result = "" | |
| for i in range(self.row): | |
| line = "" | |
| for j in range(self.col): | |
| if [i, j] == self._agent_pos: line += "A " | |
| elif [i, j] == self._goal_pos: line += "G " | |
| else: line += symbols[self._maze[i][j]] + " " | |
| result += line + "\n" | |
| return result | |
| # force update | |