Spaces:
Sleeping
Sleeping
File size: 6,227 Bytes
284f0a1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 | # Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Map Environment Implementation.
Grid navigation with fog of war. Agent starts at top-left, goal at bottom-right.
Must navigate efficiently while only seeing adjacent cells.
"""
import random
from collections import deque
from typing import Dict, List, Set, Tuple
from uuid import uuid4
from models import MapAction, MapObservation
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State
class MapEnvironment(Environment):
"""
Grid navigation environment with fog of war.
8x8 grid with ~18% walls. Agent sees only current cell and adjacent cells.
Goal is to reach bottom-right corner from top-left efficiently.
"""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
GRID_SIZE = 8
MAX_MOVES = 50
WALL_DENSITY = 0.18
DIRECTIONS = {
"up": (0, -1),
"down": (0, 1),
"left": (-1, 0),
"right": (1, 0),
}
def __init__(self):
"""Initialize the map environment."""
self._state = State(episode_id=str(uuid4()), step_count=0)
self._grid: List[List[str]] = []
self._agent_pos: Tuple[int, int] = (0, 0)
self._goal_pos: Tuple[int, int] = (self.GRID_SIZE - 1, self.GRID_SIZE - 1)
self._moves: int = 0
self._reached_goal: bool = False
self._optimal_path_length: int = 0
self._revealed: Set[Tuple[int, int]] = set()
def _generate_grid(self) -> bool:
"""
Generate a random grid with walls, ensuring a path exists.
Returns:
True if valid grid generated, False otherwise.
"""
self._grid = [["empty" for _ in range(self.GRID_SIZE)] for _ in range(self.GRID_SIZE)]
num_walls = int(self.GRID_SIZE * self.GRID_SIZE * self.WALL_DENSITY)
wall_positions = set()
while len(wall_positions) < num_walls:
x = random.randint(0, self.GRID_SIZE - 1)
y = random.randint(0, self.GRID_SIZE - 1)
if (x, y) == (0, 0) or (x, y) == self._goal_pos:
continue
wall_positions.add((x, y))
for x, y in wall_positions:
self._grid[y][x] = "wall"
self._grid[self._goal_pos[1]][self._goal_pos[0]] = "goal"
path_length = self._bfs_path_length()
if path_length == -1:
return False
self._optimal_path_length = path_length
return True
def _bfs_path_length(self) -> int:
"""
Compute shortest path length using BFS.
Returns:
Shortest path length, or -1 if no path exists.
"""
start = (0, 0)
queue = deque([(start, 0)])
visited = {start}
while queue:
(x, y), dist = queue.popleft()
if (x, y) == self._goal_pos:
return dist
for dx, dy in self.DIRECTIONS.values():
nx, ny = x + dx, y + dy
if 0 <= nx < self.GRID_SIZE and 0 <= ny < self.GRID_SIZE:
if (nx, ny) not in visited and self._grid[ny][nx] != "wall":
visited.add((nx, ny))
queue.append(((nx, ny), dist + 1))
return -1
def _get_visible_cells(self) -> List[Dict]:
"""Get cells visible to the agent (current + adjacent)."""
visible = []
x, y = self._agent_pos
for dx in [-1, 0, 1]:
for dy in [-1, 0, 1]:
nx, ny = x + dx, y + dy
if 0 <= nx < self.GRID_SIZE and 0 <= ny < self.GRID_SIZE:
cell_type = self._grid[ny][nx]
if (nx, ny) == self._agent_pos:
cell_type = "agent"
visible.append({"x": nx, "y": ny, "type": cell_type})
self._revealed.add((nx, ny))
return visible
def reset(self) -> MapObservation:
"""Reset the environment and generate a new grid."""
self._state = State(episode_id=str(uuid4()), step_count=0)
self._agent_pos = (0, 0)
self._moves = 0
self._reached_goal = False
self._revealed = set()
while not self._generate_grid():
pass
visible_cells = self._get_visible_cells()
return MapObservation(
visible_cells=visible_cells,
position={"x": 0, "y": 0},
moves=0,
max_moves=self.MAX_MOVES,
grid_size=self.GRID_SIZE,
reached_goal=False,
done=False,
reward=0.0,
)
def step(self, action: MapAction) -> MapObservation: # type: ignore[override]
"""
Execute a move in the specified direction.
Args:
action: MapAction with direction to move
Returns:
MapObservation with updated state
"""
self._state.step_count += 1
self._moves += 1
direction = action.direction.lower()
reward = -0.1
if direction in self.DIRECTIONS:
dx, dy = self.DIRECTIONS[direction]
nx, ny = self._agent_pos[0] + dx, self._agent_pos[1] + dy
if 0 <= nx < self.GRID_SIZE and 0 <= ny < self.GRID_SIZE:
if self._grid[ny][nx] != "wall":
self._agent_pos = (nx, ny)
if (nx, ny) == self._goal_pos:
self._reached_goal = True
reward = 10.0
visible_cells = self._get_visible_cells()
done = self._reached_goal or self._moves >= self.MAX_MOVES
return MapObservation(
visible_cells=visible_cells,
position={"x": self._agent_pos[0], "y": self._agent_pos[1]},
moves=self._moves,
max_moves=self.MAX_MOVES,
grid_size=self.GRID_SIZE,
reached_goal=self._reached_goal,
done=done,
reward=reward,
)
@property
def state(self) -> State:
"""Get the current environment state."""
return self._state
|