maze_openenv / server /maze_openenv_environment.py
tripathysagar's picture
Upload folder using huggingface_hub
b2936e2 verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Maze OpenEnv Environment Implementation.
A grid maze environment where an agent navigates from start to goal.
"""
from uuid import uuid4
from openenv.core.env_server.interfaces import Environment
from models import MazeAction, MazeObservation, MazeState
class MazeEnvironment(Environment):
"""
A grid maze environment where an agent navigates to a goal.
The agent can move up/down/left/right through walkable cells (1)
and cannot pass through walls (0).
Rewards:
+10 for reaching the goal
-1 for each step (encourages efficiency)
"""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(self):
super().__init__()
self._maze = [
[1, 0, 1, 1],
[1, 1, 0, 1],
[1, 1, 0, 1],
[1, 1, 1, 1],
]
self._agent_pos = [0, 0]
self._goal_pos = [3, 3]
self._step_count = 0
self._episode_id = None
self.row = 4
self.col = 4
@property
def state(self) -> MazeState:
return MazeState(
maze=self._maze,
agent_pos=self._agent_pos,
goal_pos=self._goal_pos,
episode_id=self._episode_id,
step_count=self._step_count
)
def reset(self, seed=None, episode_id=None, **kwargs) -> MazeObservation:
self._agent_pos = [0, 0]
self._step_count = 0
self._episode_id = episode_id or str(uuid4())
return MazeObservation(
position=self._agent_pos.copy(),
grid_view=self._render(),
done=False,
reward=0
)
def step(self, action: MazeAction, timeout_s=None, **kwargs) -> MazeObservation:
if action.direction not in ["up", "down", "left", "right"]:
return MazeObservation(
position=self._agent_pos.copy(),
grid_view=self._render(),
done=False,
reward=-1
)
self._move(action.direction)
self._step_count += 1
done = self._agent_pos == self._goal_pos
reward = 10 if done else -1
return MazeObservation(
position=self._agent_pos.copy(),
grid_view=self._render(),
done=done,
reward=reward
)
def _is_valid(self, x: int, y: int) -> bool:
return 0 <= x < self.row and 0 <= y < self.col and self._maze[x][y] != 0
def _move(self, direction: str) -> bool:
i, j = self._agent_pos
if direction == 'up': i -= 1
elif direction == 'down': i += 1
elif direction == 'left': j -= 1
elif direction == 'right': j += 1
if self._is_valid(i, j):
self._agent_pos = [i, j]
return True
return False
def _render(self) -> str:
symbols = {0: '#', 1: '.'}
result = ""
for i in range(self.row):
line = ""
for j in range(self.col):
if [i, j] == self._agent_pos: line += "A "
elif [i, j] == self._goal_pos: line += "G "
else: line += symbols[self._maze[i][j]] + " "
result += line + "\n"
return result