import random from openenv.core.env_server.interfaces import Environment from models import MazeAction, MazeObservation, MazeState from mazelib import Maze from mazelib.generate.BacktrackingGenerator import BacktrackingGenerator from mazelib.solve.ShortestPath import ShortestPath class MazeGenerator: def __init__(self, w_range=(3, 5), h_range=(3, 5)): self.w_range, self.h_range = w_range, h_range def generate(self, max_attempts=100): w, h = random.randint(*self.w_range), random.randint(*self.h_range) min_path_len = (w + h) * 2 + random.randint(-3, 5) m = Maze() m.generator = BacktrackingGenerator(w, h) m.solver = ShortestPath() for _ in range(max_attempts): m.generate() m.generate_entrances() m.solve() if len(m.solutions[0]) >= min_path_len: break return (1 - m.grid).tolist(), list(m.start), list(m.end) class MazeEnvironment(Environment): """Grid-based maze environment with random maze generation.""" SUPPORTS_CONCURRENT_SESSIONS: bool = True def __init__(self): self.generator = MazeGenerator() self._generate_new_maze() def _generate_new_maze(self): self._maze, self._agent_pos, self._goal_pos = self.generator.generate() self._step_count, self._episode_id = 0, None @property def row(self): return len(self._maze) @property def col(self): return len(self._maze[0]) @property def state(self) -> MazeState: return MazeState(maze=self._maze, agent_pos=self._agent_pos, goal_pos=self._goal_pos, episode_id=self._episode_id, step_count=self._step_count) def reset(self, seed=None, episode_id=None, **kwargs) -> MazeObservation: if seed: random.seed(seed) self._generate_new_maze() self._episode_id = episode_id return MazeObservation( position=self._agent_pos, grid_view=self._render(), valid_moves=self._get_valid_moves(), # add this done=False, reward=0 ) def _get_valid_moves(self) -> list: moves = [] i, j = self._agent_pos if i > 0 and self._maze[i-1][j]: moves.append("up") if i < self.row-1 and self._maze[i+1][j]: moves.append("down") if j > 0 and self._maze[i][j-1]: moves.append("left") if j < self.col-1 and self._maze[i][j+1]: moves.append("right") return moves def step(self, action: MazeAction, timeout_s=None, **kwargs) -> MazeObservation: if action.direction in ["up", "down", "left", "right"]: self._move(action.direction) self._step_count += 1 done = self._agent_pos == self._goal_pos return MazeObservation( position=self._agent_pos, grid_view=self._render(), valid_moves=self._get_valid_moves(), # add this done=done, reward=10 if done else -1 ) def _move(self, d): i, j = self._agent_pos if d == 'up': i -= 1 elif d == 'down': i += 1 elif d == 'left': j -= 1 elif d == 'right': j += 1 if 0 <= i < self.row and 0 <= j < self.col and self._maze[i][j]: self._agent_pos = [i, j] def _render(self) -> str: symbols = {0: '#', 1: '.'} result = "" for i in range(self.row): line = "" for j in range(self.col): if [i, j] == self._agent_pos: line += "A " elif [i, j] == self._goal_pos: line += "G " else: line += symbols[self._maze[i][j]] + " " result += line + "\n" return result # force update