maze_env / server /maze_env_environment.py
tripathysagar's picture
Upload folder using huggingface_hub
ff46cf7 verified
from openenv.core.env_server.interfaces import Environment
from models import MazeAction, MazeObservation, MazeState
class MazeEnvironment(Environment):
"""
A grid-based maze environment for RL agents.
The agent starts at [0,0] and must navigate to the goal at [3,3].
Grid values: 1 = open path, 0 = wall.
Rewards: +10 for reaching goal, -1 per step.
"""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(self):
self._maze = [
[1, 0, 1, 1],
[1, 1, 0, 1],
[1, 1, 0, 1],
[1, 1, 1, 1],
]
self._agent_pos = [0, 0]
self._goal_pos = [3, 3]
self._step_count = 0
self._episode_id = None
self.row = 4
self.col = 4
@property
def state(self) -> MazeState:
"Return current environment state."
return MazeState(
maze=self._maze, agent_pos=self._agent_pos, goal_pos=self._goal_pos,
episode_id=self._episode_id, step_count=self._step_count
)
def reset(self, seed=None, episode_id=None, **kwargs) -> MazeObservation:
"Reset the environment to initial state and return starting observation."
self._agent_pos = [0, 0]
self._step_count = 0
self._episode_id = episode_id
return MazeObservation(position=self._agent_pos, grid_view=self._render(), done=False, reward=0)
def step(self, action: MazeAction, timeout_s=None, **kwargs) -> MazeObservation:
if action.direction not in ["up", "down", "left", "right"]:
return MazeObservation(position=self._agent_pos, grid_view=self._render(), done=False, reward=0)
self._move(action.direction)
self._step_count += 1
done = self._agent_pos == self._goal_pos
reward = 10 if done else -1
return MazeObservation(position=self._agent_pos, grid_view=self._render(), done=done, reward=reward)
def _is_valid(self, x: int, y: int) -> bool:
"Check if position (x, y) is within bounds and not a wall."
return 0 <= x < self.row and 0 <= y < self.col and self._maze[x][y] != 0
def _move(self, direction: str) -> bool:
"Move agent in direction if valid. Returns True if move succeeded."
i, j = self._agent_pos
if direction == 'up': i -= 1
elif direction == 'down': i += 1
elif direction == 'left': j -= 1
elif direction == 'right': j += 1
if self._is_valid(i, j):
self._agent_pos = [i, j]
return True
return False
def _render(self) -> str:
"Return string visualization of maze with A=agent, G=goal, #=wall, .=open."
symbols = {0: '#', 1: '.'}
result = ""
for i in range(self.row):
line = ""
for j in range(self.col):
if [i, j] == self._agent_pos: line += "A "
elif [i, j] == self._goal_pos: line += "G "
else: line += symbols[self._maze[i][j]] + " "
result += line + "\n"
return result