maze / server /maze_environment.py
tripathysagar's picture
Upload folder using huggingface_hub
bd61c6a verified
import random
from openenv.core.env_server.interfaces import Environment
from models import MazeAction, MazeObservation, MazeState
from mazelib import Maze
from mazelib.generate.BacktrackingGenerator import BacktrackingGenerator
from mazelib.solve.ShortestPath import ShortestPath
class MazeGenerator:
def __init__(self, w_range=(3, 5), h_range=(3, 5)):
self.w_range, self.h_range = w_range, h_range
def generate(self, max_attempts=100):
w, h = random.randint(*self.w_range), random.randint(*self.h_range)
min_path_len = (w + h) * 2 + random.randint(-3, 5)
m = Maze()
m.generator = BacktrackingGenerator(w, h)
m.solver = ShortestPath()
for _ in range(max_attempts):
m.generate()
m.generate_entrances()
m.solve()
if len(m.solutions[0]) >= min_path_len:
break
return (1 - m.grid).tolist(), list(m.start), list(m.end)
class MazeEnvironment(Environment):
"""Grid-based maze environment with random maze generation."""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(self):
self.generator = MazeGenerator()
self._generate_new_maze()
def _generate_new_maze(self):
self._maze, self._agent_pos, self._goal_pos = self.generator.generate()
self._step_count, self._episode_id = 0, None
@property
def row(self): return len(self._maze)
@property
def col(self): return len(self._maze[0])
@property
def state(self) -> MazeState:
return MazeState(maze=self._maze, agent_pos=self._agent_pos,
goal_pos=self._goal_pos, episode_id=self._episode_id,
step_count=self._step_count)
def reset(self, seed=None, episode_id=None, **kwargs) -> MazeObservation:
if seed: random.seed(seed)
self._generate_new_maze()
self._episode_id = episode_id
return MazeObservation(
position=self._agent_pos,
grid_view=self._render(),
valid_moves=self._get_valid_moves(), # add this
done=False,
reward=0
)
def _get_valid_moves(self) -> list:
moves = []
i, j = self._agent_pos
if i > 0 and self._maze[i-1][j]: moves.append("up")
if i < self.row-1 and self._maze[i+1][j]: moves.append("down")
if j > 0 and self._maze[i][j-1]: moves.append("left")
if j < self.col-1 and self._maze[i][j+1]: moves.append("right")
return moves
def step(self, action: MazeAction, timeout_s=None, **kwargs) -> MazeObservation:
if action.direction in ["up", "down", "left", "right"]:
self._move(action.direction)
self._step_count += 1
done = self._agent_pos == self._goal_pos
return MazeObservation(
position=self._agent_pos,
grid_view=self._render(),
valid_moves=self._get_valid_moves(), # add this
done=done,
reward=10 if done else -1
)
def _move(self, d):
i, j = self._agent_pos
if d == 'up': i -= 1
elif d == 'down': i += 1
elif d == 'left': j -= 1
elif d == 'right': j += 1
if 0 <= i < self.row and 0 <= j < self.col and self._maze[i][j]:
self._agent_pos = [i, j]
def _render(self) -> str:
symbols = {0: '#', 1: '.'}
result = ""
for i in range(self.row):
line = ""
for j in range(self.col):
if [i, j] == self._agent_pos: line += "A "
elif [i, j] == self._goal_pos: line += "G "
else: line += symbols[self._maze[i][j]] + " "
result += line + "\n"
return result
# force update