interview / maze_env /env.py
Lee93whut
docs: clean up R3/R4 record and consolidate technical narrative
92423f0
"""MazeEnv 环境核心 —— 符合 OpenAI Gymnasium 标准接口(生产级)。
设计要点
--------
* **唯一随机源**:所有随机操作统一使用 Gymnasium 注入的 ``self.np_random``
句柄(``numpy.random.Generator``),通过 ``super().reset(seed=seed)``
初始化,严格禁止 ``numpy.random.*`` 全局函数或标准库 ``random``。
* **连通性保证**:``reset()`` 内嵌 BFS,确保生成的迷宫起点→终点绝对
可达,不可达时自动重新采样,直到满足条件。
* **终止语义区分**:
* ``terminated = True`` —— Agent 到达终点(任务成功完成)。
* ``truncated = True`` —— 超出 ``max_steps`` 步数上限(时间截断)。
* 二者严格互斥,不会同时为 ``True``。
* **奖励语义**:撞墙时同时扣除时间惩罚(``reward_step``)与撞墙惩罚
(``reward_wall_hit``),体现每一步都有时间成本。
典型用法::
from maze_env import MazeEnv, Action
env = MazeEnv.from_yaml("config.yaml")
obs, info = env.reset()
for _ in range(500):
action = env.action_space.sample()
obs, reward, terminated, truncated, info = env.step(action)
if terminated or truncated:
obs, info = env.reset()
env.close()
"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Optional, SupportsFloat
import numpy as np
import yaml
import gymnasium as gym
from gymnasium import spaces
from maze_env.actions import Action, DELTAS
from maze_env.generator import bfs_reachable, generate_maze
from maze_env.renderer import render_frame
__all__ = ["MazeEnv"]
class MazeEnv(gym.Env):
"""二维迷宫环境,遵循 OpenAI Gymnasium ``Env`` 标准接口(生产级)。
状态空间
--------
``Box(0, 1, shape=(4, N, N), dtype=np.float32)``
* 通道 0 —— 墙壁层:``1.0`` 表示墙,``0.0`` 表示可通行格子。
* 通道 1 —— Agent 层:Agent 当前所在格子为 ``1.0``,其余为 ``0.0``。
* 通道 2 —— 终点层:终点格子为 ``1.0``,其余为 ``0.0``。
* 通道 3 —— 访问历史层:本 episode 内已访问过的格子为 ``1.0``,未访问为 ``0.0``。
动作空间
--------
``Discrete(4)``:使用 :class:`~maze_env.actions.Action` 枚举或整数均可。
奖励设计
--------
* 到达终点:``+reward_goal``(默认 +100),``terminated=True``。
* 撞墙/越界:``reward_step + reward_wall_hit``(默认 −11),位置保持不变。
* 正常移动:``reward_step``(默认 −1)。
Note:
撞墙时**同时**扣除时间惩罚与撞墙惩罚(体现每步均有时间成本)。
Example:
>>> from maze_env import MazeEnv, Action
>>> env = MazeEnv(grid_size=6, obstacle_density=0.0, seed=0)
>>> obs, info = env.reset()
>>> obs.shape
(4, 6, 6)
>>> obs, reward, terminated, truncated, info = env.step(Action.RIGHT)
>>> info["agent_pos"]
(1, 2)
"""
metadata: dict[str, Any] = {"render_modes": ["human", "ansi"], "render_fps": 4}
# ------------------------------------------------------------------
# 构造与参数校验
# ------------------------------------------------------------------
def __init__(
self,
grid_size: int = 10,
obstacle_density: float = 0.25,
max_steps: int = 200,
seed: Optional[int] = None,
reward_goal: float = 100.0,
reward_wall_hit: float = -10.0,
reward_step: float = -1.0,
distance_shaping_alpha: float = 0.0,
render_mode: Optional[str] = None,
) -> None:
"""初始化迷宫环境。
Args:
grid_size: 迷宫边长 N,最小值为 4。
obstacle_density: 内部格子成为墙壁的概率,范围 ``[0.0, 1.0)``。
max_steps: 单幕最大步数,超出后触发 ``truncated=True``。
seed: 构造期随机种子;每次 ``reset()`` 也可独立传入。
reward_goal: 到达终点的奖励(建议为正数)。
reward_wall_hit: 撞墙惩罚(建议为负数)。
reward_step: 每步时间惩罚(建议为负数)。
distance_shaping_alpha: 距离 shaping 系数(默认 0.0 = 关闭)。
每步额外奖励 = alpha × (移动前曼哈顿距离 − 移动后曼哈顿距离),
靠近目标为正,远离为负;撞墙步不计入(位置未变)。
render_mode: 渲染模式,可选 ``"human"`` 或 ``"ansi"``。
Raises:
ValueError: 若 ``grid_size < 4``、``obstacle_density`` 越界、
``max_steps < 1``,或 ``render_mode`` 不在合法值列表中。
"""
super().__init__()
# ── 参数校验 ───────────────────────────────────────────────────
if grid_size < 4:
raise ValueError(f"grid_size 必须 >= 4,当前值:{grid_size}")
if not (0.0 <= obstacle_density < 1.0):
raise ValueError(
f"obstacle_density 必须在 [0.0, 1.0) 内,当前值:{obstacle_density}"
)
if max_steps < 1:
raise ValueError(f"max_steps 必须 >= 1,当前值:{max_steps}")
if render_mode is not None and render_mode not in self.metadata["render_modes"]:
raise ValueError(
f"不支持的 render_mode '{render_mode}',"
f"可选值:{self.metadata['render_modes']}"
)
# ── 超参数(不可变) ───────────────────────────────────────────
self.grid_size: int = grid_size
self.obstacle_density: float = obstacle_density
self.max_steps: int = max_steps
self.init_seed: Optional[int] = seed
self.reward_goal: float = reward_goal
self.reward_wall_hit: float = reward_wall_hit
self.reward_step: float = reward_step
self.distance_shaping_alpha: float = distance_shaping_alpha
self.render_mode: Optional[str] = render_mode
# ── 空间声明 ───────────────────────────────────────────────────
self.observation_space: spaces.Box = spaces.Box(
low=0.0,
high=1.0,
shape=(4, grid_size, grid_size), # ch0=wall, ch1=agent, ch2=goal, ch3=visited
dtype=np.float32,
)
self.action_space: spaces.Discrete = spaces.Discrete(len(Action))
# ── 运行时状态(占位,由 reset() 正式填充) ────────────────────
self._wall_map: np.ndarray = np.zeros(
(grid_size, grid_size), dtype=np.float32
)
self._visited_map: np.ndarray = np.zeros(
(grid_size, grid_size), dtype=np.float32
)
self._agent_pos: tuple[int, int] = (1, 1)
self._goal_pos: tuple[int, int] = (grid_size - 2, grid_size - 2)
self._step_count: int = 0
self._hit_wall_count: int = 0
self._episode_success: bool = False
# ------------------------------------------------------------------
# 公开只读属性(封装内部状态,供训练脚本等外部代码合法访问)
# ------------------------------------------------------------------
@property
def wall_map(self) -> np.ndarray:
"""当前幕的墙壁图,形状 ``(N, N)`` float32,1.0=墙,0.0=可通行。
返回只读视图(zero-copy),防止外部意外篡改环境内部状态。
若需要可写副本,请显式调用 ``.copy()``。
"""
view = self._wall_map.view()
view.flags.writeable = False
return view
@property
def goal_pos(self) -> tuple[int, int]:
"""当前幕的终点坐标 ``(row, col)``,只读。"""
return self._goal_pos
@property
def agent_pos(self) -> tuple[int, int]:
"""Agent 当前坐标 ``(row, col)``,只读。"""
return self._agent_pos
@classmethod
def from_config(
cls,
config: dict[str, Any],
render_mode: Optional[str] = None,
) -> "MazeEnv":
"""从已解析的配置字典创建环境实例。
配置格式::
maze:
grid_size: 10
obstacle_density: 0.25
max_steps: 200
rewards:
goal: 100
wall_hit: -10
step: -1
注:``maze.seed`` 不被此方法读取。需固定地图时,
请在创建实例后显式调用 ``env.reset(seed=X)``。
Args:
config: ``yaml.safe_load`` 等工具解析得到的字典。
render_mode: 渲染模式。
Returns:
配置好的 ``MazeEnv`` 实例。
"""
maze_cfg: dict[str, Any] = config["maze"]
reward_cfg: dict[str, Any] = config.get("rewards", {})
return cls(
grid_size=int(maze_cfg.get("grid_size", 10)),
obstacle_density=float(maze_cfg.get("obstacle_density", 0.25)),
max_steps=int(maze_cfg.get("max_steps", 200)),
# seed 不从 config 读取:调用方按需显式传入。
# config.yaml 中 maze.seed 仅用于 overfit 调试节,
# 透传此处会导致普通调用者意外锁死到同一张地图。
reward_goal=float(reward_cfg.get("goal", 100.0)),
reward_wall_hit=float(reward_cfg.get("wall_hit", -10.0)),
reward_step=float(reward_cfg.get("step", -1.0)),
distance_shaping_alpha=float(reward_cfg.get("distance_shaping_alpha", 0.0)),
render_mode=render_mode,
)
@classmethod
def from_yaml(
cls,
path: str | Path = "config.yaml",
render_mode: Optional[str] = None,
) -> "MazeEnv":
"""从 YAML 文件路径直接创建环境实例。
Args:
path: YAML 配置文件路径,默认 ``"config.yaml"``。
render_mode: 渲染模式。
Returns:
配置好的 ``MazeEnv`` 实例。
"""
with open(path, "r", encoding="utf-8") as fh:
cfg = yaml.safe_load(fh)
return cls.from_config(cfg, render_mode=render_mode)
# ------------------------------------------------------------------
# Gymnasium 核心接口
# ------------------------------------------------------------------
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict[str, Any]] = None,
) -> tuple[np.ndarray, dict[str, Any]]:
"""重置环境,生成新迷宫并将 Agent 放置到起点。
Args:
seed: 本幕随机种子。优先级:``reset(seed=X)`` > 构造期 ``seed``。
options: 可选注入字典,支持以下键:
* ``"wall_map"`` *(np.ndarray)* — 直接使用外部提供的墙壁图,
跳过随机生成(形状须为 ``(N, N)``,非零为墙)。
* ``"start"`` *(tuple[int,int])* — Agent 起点坐标,
默认 ``(1, 1)``。
* ``"goal"`` *(tuple[int,int])* — 终点坐标,
默认 ``(N-2, N-2)``。
注入外部地图时,调用方须自行保证起点→终点连通。
Returns:
``(observation, info)``:初始观测张量与 info 字典。
"""
effective_seed = seed if seed is not None else self.init_seed
super().reset(seed=effective_seed)
opts: dict[str, Any] = options or {}
# 重置幕级统计
self._step_count = 0
self._hit_wall_count = 0
self._episode_success = False
self._visited_map = np.zeros((self.grid_size, self.grid_size), dtype=np.float32)
self._agent_pos = opts.get("start", (1, 1))
self._goal_pos = opts.get("goal", (self.grid_size - 2, self.grid_size - 2))
# 起点标记为已访问
ar, ac = self._agent_pos
self._visited_map[ar, ac] = 1.0
if "wall_map" in opts:
# ── 外部注入地图(用于推理 / 可视化,跳过随机生成)────────────
wall_map = np.asarray(opts["wall_map"], dtype=np.float32)
expected = (self.grid_size, self.grid_size)
if wall_map.shape != expected:
raise ValueError(
f"注入的 wall_map 形状 {wall_map.shape} 与环境 "
f"grid_size={self.grid_size} 不匹配,期望 {expected}"
)
self._wall_map = wall_map
else:
# ── 随机生成,BFS 保证连通 ────────────────────────────────────
while True:
self._wall_map = generate_maze(
self.grid_size, self.obstacle_density, self.np_random
)
if bfs_reachable(self._wall_map, self._agent_pos, self._goal_pos):
break
return self._build_observation(), self._build_info()
def step(
self,
action: int,
) -> tuple[np.ndarray, SupportsFloat, bool, bool, dict[str, Any]]:
"""执行一步动作并返回转移结果。
Args:
action: 动作编号,合法值 ``{0,1,2,3}`` 或 :class:`Action` 枚举。
Returns:
``(observation, reward, terminated, truncated, info)``
Raises:
AssertionError: 若 ``action`` 不在合法动作空间内。
"""
assert self.action_space.contains(action), (
f"非法动作 {action!r},合法范围:{self.action_space}"
)
dr, dc = DELTAS[action]
cur_row, cur_col = self._agent_pos
new_row, new_col = cur_row + dr, cur_col + dc
N = self.grid_size
# 移动前的曼哈顿距离(用于距离 shaping)
gr, gc = self._goal_pos
dist_before: int = abs(cur_row - gr) + abs(cur_col - gc)
# 碰撞检测(显式 bool() 强转,避免 numpy.bool_ 与 Python bool 不一致)
hit_wall: bool = bool(
new_row < 0 or new_row >= N
or new_col < 0 or new_col >= N
or self._wall_map[new_row, new_col] == 1.0
)
if hit_wall:
self._hit_wall_count += 1
# 撞墙:时间惩罚 + 撞墙惩罚(体现每步均有时间成本);位置不变,不计入 shaping
reward: float = self.reward_step + self.reward_wall_hit
else:
self._agent_pos = (new_row, new_col)
reward = self.reward_step
# 距离 shaping:靠近目标为正,远离为负(仅有效移动步计入)
# 注:本项目 config 固定 distance_shaping_alpha=0.0,train.py 也未透传该字段,
# 故此 if 分支在当前训练/评估流程中永不执行,保留作为参数设计的可扩展点。
if self.distance_shaping_alpha != 0.0:
dist_after: int = abs(new_row - gr) + abs(new_col - gc)
reward += self.distance_shaping_alpha * (dist_before - dist_after)
# 更新访问地图(有效移动后标记新格子)
self._visited_map[new_row, new_col] = 1.0
self._step_count += 1
# 终止判断(terminated 与 truncated 严格互斥)
terminated: bool = self._agent_pos == self._goal_pos
if terminated:
reward += self.reward_goal
self._episode_success = True
truncated: bool = (not terminated) and (self._step_count >= self.max_steps)
info = self._build_info()
info["hit_wall"] = hit_wall # 本步撞墙标志(单步,非幕级)
if self.render_mode == "human":
self.render()
return self._build_observation(), float(reward), terminated, truncated, info
def render(self) -> Optional[str]:
"""渲染当前状态为 ASCII 网格。
Returns:
* ``"ansi"`` 模式:返回字符串。
* ``"human"`` 模式:打印到 stdout,返回 ``None``。
* ``None`` 模式:无操作,返回 ``None``。
"""
if self.render_mode is None:
return None
output = render_frame(
wall_map=self._wall_map,
agent_pos=self._agent_pos,
goal_pos=self._goal_pos,
step_count=self._step_count,
max_steps=self.max_steps,
hit_wall_count=self._hit_wall_count,
episode_success=self._episode_success,
)
if self.render_mode == "human":
print(output)
return None
return output
def close(self) -> None:
"""释放资源(当前无外部资源,保留以满足 Gymnasium 接口规范)。"""
# ------------------------------------------------------------------
# 私有辅助
# ------------------------------------------------------------------
def _build_observation(self) -> np.ndarray:
"""将当前状态编码为四通道观测张量 ``(4, N, N)``。
通道说明:
ch0 — wall_map:墙壁位置(1=墙,0=通路)
ch1 — agent_map:agent 当前位置(one-hot)
ch2 — goal_map:终点位置(one-hot)
ch3 — visited_map:本 episode 内已访问过的格子(二值,1=到达过,0=未到达)
"""
N = self.grid_size
obs = np.zeros((4, N, N), dtype=np.float32)
obs[0] = self._wall_map
ar, ac = self._agent_pos
obs[1, ar, ac] = 1.0
gr, gc = self._goal_pos
obs[2, gr, gc] = 1.0
obs[3] = self._visited_map
return obs
def _build_info(self) -> dict[str, Any]:
"""构建幕级统计 info 字典。
Returns:
包含 ``agent_pos``、``goal_pos``、``step_count``、
``hit_wall_count``、``success`` 五个字段的字典。
Note:
``step()`` 会在此基础上额外追加 ``"hit_wall": bool``(单步标志);
``reset()`` 返回的 info 不含该字段(初始无此概念),
调用方需注意两处 info 结构的微小差异。
"""
return {
"agent_pos": self._agent_pos,
"goal_pos": self._goal_pos,
"step_count": self._step_count,
"hit_wall_count": self._hit_wall_count,
"success": self._episode_success,
}