File size: 19,037 Bytes
fe0625d a91b194 fe0625d 92423f0 fe0625d a91b194 fe0625d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 | """MazeEnv 环境核心 —— 符合 OpenAI Gymnasium 标准接口(生产级)。
设计要点
--------
* **唯一随机源**:所有随机操作统一使用 Gymnasium 注入的 ``self.np_random``
句柄(``numpy.random.Generator``),通过 ``super().reset(seed=seed)``
初始化,严格禁止 ``numpy.random.*`` 全局函数或标准库 ``random``。
* **连通性保证**:``reset()`` 内嵌 BFS,确保生成的迷宫起点→终点绝对
可达,不可达时自动重新采样,直到满足条件。
* **终止语义区分**:
* ``terminated = True`` —— Agent 到达终点(任务成功完成)。
* ``truncated = True`` —— 超出 ``max_steps`` 步数上限(时间截断)。
* 二者严格互斥,不会同时为 ``True``。
* **奖励语义**:撞墙时同时扣除时间惩罚(``reward_step``)与撞墙惩罚
(``reward_wall_hit``),体现每一步都有时间成本。
典型用法::
from maze_env import MazeEnv, Action
env = MazeEnv.from_yaml("config.yaml")
obs, info = env.reset()
for _ in range(500):
action = env.action_space.sample()
obs, reward, terminated, truncated, info = env.step(action)
if terminated or truncated:
obs, info = env.reset()
env.close()
"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Optional, SupportsFloat
import numpy as np
import yaml
import gymnasium as gym
from gymnasium import spaces
from maze_env.actions import Action, DELTAS
from maze_env.generator import bfs_reachable, generate_maze
from maze_env.renderer import render_frame
__all__ = ["MazeEnv"]
class MazeEnv(gym.Env):
"""二维迷宫环境,遵循 OpenAI Gymnasium ``Env`` 标准接口(生产级)。
状态空间
--------
``Box(0, 1, shape=(4, N, N), dtype=np.float32)``
* 通道 0 —— 墙壁层:``1.0`` 表示墙,``0.0`` 表示可通行格子。
* 通道 1 —— Agent 层:Agent 当前所在格子为 ``1.0``,其余为 ``0.0``。
* 通道 2 —— 终点层:终点格子为 ``1.0``,其余为 ``0.0``。
* 通道 3 —— 访问历史层:本 episode 内已访问过的格子为 ``1.0``,未访问为 ``0.0``。
动作空间
--------
``Discrete(4)``:使用 :class:`~maze_env.actions.Action` 枚举或整数均可。
奖励设计
--------
* 到达终点:``+reward_goal``(默认 +100),``terminated=True``。
* 撞墙/越界:``reward_step + reward_wall_hit``(默认 −11),位置保持不变。
* 正常移动:``reward_step``(默认 −1)。
Note:
撞墙时**同时**扣除时间惩罚与撞墙惩罚(体现每步均有时间成本)。
Example:
>>> from maze_env import MazeEnv, Action
>>> env = MazeEnv(grid_size=6, obstacle_density=0.0, seed=0)
>>> obs, info = env.reset()
>>> obs.shape
(4, 6, 6)
>>> obs, reward, terminated, truncated, info = env.step(Action.RIGHT)
>>> info["agent_pos"]
(1, 2)
"""
metadata: dict[str, Any] = {"render_modes": ["human", "ansi"], "render_fps": 4}
# ------------------------------------------------------------------
# 构造与参数校验
# ------------------------------------------------------------------
def __init__(
self,
grid_size: int = 10,
obstacle_density: float = 0.25,
max_steps: int = 200,
seed: Optional[int] = None,
reward_goal: float = 100.0,
reward_wall_hit: float = -10.0,
reward_step: float = -1.0,
distance_shaping_alpha: float = 0.0,
render_mode: Optional[str] = None,
) -> None:
"""初始化迷宫环境。
Args:
grid_size: 迷宫边长 N,最小值为 4。
obstacle_density: 内部格子成为墙壁的概率,范围 ``[0.0, 1.0)``。
max_steps: 单幕最大步数,超出后触发 ``truncated=True``。
seed: 构造期随机种子;每次 ``reset()`` 也可独立传入。
reward_goal: 到达终点的奖励(建议为正数)。
reward_wall_hit: 撞墙惩罚(建议为负数)。
reward_step: 每步时间惩罚(建议为负数)。
distance_shaping_alpha: 距离 shaping 系数(默认 0.0 = 关闭)。
每步额外奖励 = alpha × (移动前曼哈顿距离 − 移动后曼哈顿距离),
靠近目标为正,远离为负;撞墙步不计入(位置未变)。
render_mode: 渲染模式,可选 ``"human"`` 或 ``"ansi"``。
Raises:
ValueError: 若 ``grid_size < 4``、``obstacle_density`` 越界、
``max_steps < 1``,或 ``render_mode`` 不在合法值列表中。
"""
super().__init__()
# ── 参数校验 ───────────────────────────────────────────────────
if grid_size < 4:
raise ValueError(f"grid_size 必须 >= 4,当前值:{grid_size}")
if not (0.0 <= obstacle_density < 1.0):
raise ValueError(
f"obstacle_density 必须在 [0.0, 1.0) 内,当前值:{obstacle_density}"
)
if max_steps < 1:
raise ValueError(f"max_steps 必须 >= 1,当前值:{max_steps}")
if render_mode is not None and render_mode not in self.metadata["render_modes"]:
raise ValueError(
f"不支持的 render_mode '{render_mode}',"
f"可选值:{self.metadata['render_modes']}"
)
# ── 超参数(不可变) ───────────────────────────────────────────
self.grid_size: int = grid_size
self.obstacle_density: float = obstacle_density
self.max_steps: int = max_steps
self.init_seed: Optional[int] = seed
self.reward_goal: float = reward_goal
self.reward_wall_hit: float = reward_wall_hit
self.reward_step: float = reward_step
self.distance_shaping_alpha: float = distance_shaping_alpha
self.render_mode: Optional[str] = render_mode
# ── 空间声明 ───────────────────────────────────────────────────
self.observation_space: spaces.Box = spaces.Box(
low=0.0,
high=1.0,
shape=(4, grid_size, grid_size), # ch0=wall, ch1=agent, ch2=goal, ch3=visited
dtype=np.float32,
)
self.action_space: spaces.Discrete = spaces.Discrete(len(Action))
# ── 运行时状态(占位,由 reset() 正式填充) ────────────────────
self._wall_map: np.ndarray = np.zeros(
(grid_size, grid_size), dtype=np.float32
)
self._visited_map: np.ndarray = np.zeros(
(grid_size, grid_size), dtype=np.float32
)
self._agent_pos: tuple[int, int] = (1, 1)
self._goal_pos: tuple[int, int] = (grid_size - 2, grid_size - 2)
self._step_count: int = 0
self._hit_wall_count: int = 0
self._episode_success: bool = False
# ------------------------------------------------------------------
# 公开只读属性(封装内部状态,供训练脚本等外部代码合法访问)
# ------------------------------------------------------------------
@property
def wall_map(self) -> np.ndarray:
"""当前幕的墙壁图,形状 ``(N, N)`` float32,1.0=墙,0.0=可通行。
返回只读视图(zero-copy),防止外部意外篡改环境内部状态。
若需要可写副本,请显式调用 ``.copy()``。
"""
view = self._wall_map.view()
view.flags.writeable = False
return view
@property
def goal_pos(self) -> tuple[int, int]:
"""当前幕的终点坐标 ``(row, col)``,只读。"""
return self._goal_pos
@property
def agent_pos(self) -> tuple[int, int]:
"""Agent 当前坐标 ``(row, col)``,只读。"""
return self._agent_pos
@classmethod
def from_config(
cls,
config: dict[str, Any],
render_mode: Optional[str] = None,
) -> "MazeEnv":
"""从已解析的配置字典创建环境实例。
配置格式::
maze:
grid_size: 10
obstacle_density: 0.25
max_steps: 200
rewards:
goal: 100
wall_hit: -10
step: -1
注:``maze.seed`` 不被此方法读取。需固定地图时,
请在创建实例后显式调用 ``env.reset(seed=X)``。
Args:
config: ``yaml.safe_load`` 等工具解析得到的字典。
render_mode: 渲染模式。
Returns:
配置好的 ``MazeEnv`` 实例。
"""
maze_cfg: dict[str, Any] = config["maze"]
reward_cfg: dict[str, Any] = config.get("rewards", {})
return cls(
grid_size=int(maze_cfg.get("grid_size", 10)),
obstacle_density=float(maze_cfg.get("obstacle_density", 0.25)),
max_steps=int(maze_cfg.get("max_steps", 200)),
# seed 不从 config 读取:调用方按需显式传入。
# config.yaml 中 maze.seed 仅用于 overfit 调试节,
# 透传此处会导致普通调用者意外锁死到同一张地图。
reward_goal=float(reward_cfg.get("goal", 100.0)),
reward_wall_hit=float(reward_cfg.get("wall_hit", -10.0)),
reward_step=float(reward_cfg.get("step", -1.0)),
distance_shaping_alpha=float(reward_cfg.get("distance_shaping_alpha", 0.0)),
render_mode=render_mode,
)
@classmethod
def from_yaml(
cls,
path: str | Path = "config.yaml",
render_mode: Optional[str] = None,
) -> "MazeEnv":
"""从 YAML 文件路径直接创建环境实例。
Args:
path: YAML 配置文件路径,默认 ``"config.yaml"``。
render_mode: 渲染模式。
Returns:
配置好的 ``MazeEnv`` 实例。
"""
with open(path, "r", encoding="utf-8") as fh:
cfg = yaml.safe_load(fh)
return cls.from_config(cfg, render_mode=render_mode)
# ------------------------------------------------------------------
# Gymnasium 核心接口
# ------------------------------------------------------------------
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict[str, Any]] = None,
) -> tuple[np.ndarray, dict[str, Any]]:
"""重置环境,生成新迷宫并将 Agent 放置到起点。
Args:
seed: 本幕随机种子。优先级:``reset(seed=X)`` > 构造期 ``seed``。
options: 可选注入字典,支持以下键:
* ``"wall_map"`` *(np.ndarray)* — 直接使用外部提供的墙壁图,
跳过随机生成(形状须为 ``(N, N)``,非零为墙)。
* ``"start"`` *(tuple[int,int])* — Agent 起点坐标,
默认 ``(1, 1)``。
* ``"goal"`` *(tuple[int,int])* — 终点坐标,
默认 ``(N-2, N-2)``。
注入外部地图时,调用方须自行保证起点→终点连通。
Returns:
``(observation, info)``:初始观测张量与 info 字典。
"""
effective_seed = seed if seed is not None else self.init_seed
super().reset(seed=effective_seed)
opts: dict[str, Any] = options or {}
# 重置幕级统计
self._step_count = 0
self._hit_wall_count = 0
self._episode_success = False
self._visited_map = np.zeros((self.grid_size, self.grid_size), dtype=np.float32)
self._agent_pos = opts.get("start", (1, 1))
self._goal_pos = opts.get("goal", (self.grid_size - 2, self.grid_size - 2))
# 起点标记为已访问
ar, ac = self._agent_pos
self._visited_map[ar, ac] = 1.0
if "wall_map" in opts:
# ── 外部注入地图(用于推理 / 可视化,跳过随机生成)────────────
wall_map = np.asarray(opts["wall_map"], dtype=np.float32)
expected = (self.grid_size, self.grid_size)
if wall_map.shape != expected:
raise ValueError(
f"注入的 wall_map 形状 {wall_map.shape} 与环境 "
f"grid_size={self.grid_size} 不匹配,期望 {expected}"
)
self._wall_map = wall_map
else:
# ── 随机生成,BFS 保证连通 ────────────────────────────────────
while True:
self._wall_map = generate_maze(
self.grid_size, self.obstacle_density, self.np_random
)
if bfs_reachable(self._wall_map, self._agent_pos, self._goal_pos):
break
return self._build_observation(), self._build_info()
def step(
self,
action: int,
) -> tuple[np.ndarray, SupportsFloat, bool, bool, dict[str, Any]]:
"""执行一步动作并返回转移结果。
Args:
action: 动作编号,合法值 ``{0,1,2,3}`` 或 :class:`Action` 枚举。
Returns:
``(observation, reward, terminated, truncated, info)``
Raises:
AssertionError: 若 ``action`` 不在合法动作空间内。
"""
assert self.action_space.contains(action), (
f"非法动作 {action!r},合法范围:{self.action_space}"
)
dr, dc = DELTAS[action]
cur_row, cur_col = self._agent_pos
new_row, new_col = cur_row + dr, cur_col + dc
N = self.grid_size
# 移动前的曼哈顿距离(用于距离 shaping)
gr, gc = self._goal_pos
dist_before: int = abs(cur_row - gr) + abs(cur_col - gc)
# 碰撞检测(显式 bool() 强转,避免 numpy.bool_ 与 Python bool 不一致)
hit_wall: bool = bool(
new_row < 0 or new_row >= N
or new_col < 0 or new_col >= N
or self._wall_map[new_row, new_col] == 1.0
)
if hit_wall:
self._hit_wall_count += 1
# 撞墙:时间惩罚 + 撞墙惩罚(体现每步均有时间成本);位置不变,不计入 shaping
reward: float = self.reward_step + self.reward_wall_hit
else:
self._agent_pos = (new_row, new_col)
reward = self.reward_step
# 距离 shaping:靠近目标为正,远离为负(仅有效移动步计入)
# 注:本项目 config 固定 distance_shaping_alpha=0.0,train.py 也未透传该字段,
# 故此 if 分支在当前训练/评估流程中永不执行,保留作为参数设计的可扩展点。
if self.distance_shaping_alpha != 0.0:
dist_after: int = abs(new_row - gr) + abs(new_col - gc)
reward += self.distance_shaping_alpha * (dist_before - dist_after)
# 更新访问地图(有效移动后标记新格子)
self._visited_map[new_row, new_col] = 1.0
self._step_count += 1
# 终止判断(terminated 与 truncated 严格互斥)
terminated: bool = self._agent_pos == self._goal_pos
if terminated:
reward += self.reward_goal
self._episode_success = True
truncated: bool = (not terminated) and (self._step_count >= self.max_steps)
info = self._build_info()
info["hit_wall"] = hit_wall # 本步撞墙标志(单步,非幕级)
if self.render_mode == "human":
self.render()
return self._build_observation(), float(reward), terminated, truncated, info
def render(self) -> Optional[str]:
"""渲染当前状态为 ASCII 网格。
Returns:
* ``"ansi"`` 模式:返回字符串。
* ``"human"`` 模式:打印到 stdout,返回 ``None``。
* ``None`` 模式:无操作,返回 ``None``。
"""
if self.render_mode is None:
return None
output = render_frame(
wall_map=self._wall_map,
agent_pos=self._agent_pos,
goal_pos=self._goal_pos,
step_count=self._step_count,
max_steps=self.max_steps,
hit_wall_count=self._hit_wall_count,
episode_success=self._episode_success,
)
if self.render_mode == "human":
print(output)
return None
return output
def close(self) -> None:
"""释放资源(当前无外部资源,保留以满足 Gymnasium 接口规范)。"""
# ------------------------------------------------------------------
# 私有辅助
# ------------------------------------------------------------------
def _build_observation(self) -> np.ndarray:
"""将当前状态编码为四通道观测张量 ``(4, N, N)``。
通道说明:
ch0 — wall_map:墙壁位置(1=墙,0=通路)
ch1 — agent_map:agent 当前位置(one-hot)
ch2 — goal_map:终点位置(one-hot)
ch3 — visited_map:本 episode 内已访问过的格子(二值,1=到达过,0=未到达)
"""
N = self.grid_size
obs = np.zeros((4, N, N), dtype=np.float32)
obs[0] = self._wall_map
ar, ac = self._agent_pos
obs[1, ar, ac] = 1.0
gr, gc = self._goal_pos
obs[2, gr, gc] = 1.0
obs[3] = self._visited_map
return obs
def _build_info(self) -> dict[str, Any]:
"""构建幕级统计 info 字典。
Returns:
包含 ``agent_pos``、``goal_pos``、``step_count``、
``hit_wall_count``、``success`` 五个字段的字典。
Note:
``step()`` 会在此基础上额外追加 ``"hit_wall": bool``(单步标志);
``reset()`` 返回的 info 不含该字段(初始无此概念),
调用方需注意两处 info 结构的微小差异。
"""
return {
"agent_pos": self._agent_pos,
"goal_pos": self._goal_pos,
"step_count": self._step_count,
"hit_wall_count": self._hit_wall_count,
"success": self._episode_success,
}
|