pyre_env / server /pyre_env_environment.py
Krooz's picture
Upload folder using huggingface_hub
f5a6010 verified
"""
PyreEnvironment — single-agent crisis navigation environment.
Orchestrates:
- Floor plan generation (floor_plan.py)
- Fire/smoke dynamics with per-episode variability (fire_sim.py)
- Narrative observation rendering (narrative.py)
- Composite reward rubrics (rubrics.py)
Per-episode randomization (makes each episode unique):
- Template selected in rotation
- Number of fire ignition sources: varies by difficulty
- Fire spread rate: varies by difficulty
- Wind direction: varies by difficulty
- Humidity: varies by difficulty
- Agent spawn position: random from template spawn options
- Fire start positions: random floor cells far from exits and agent
Difficulty levels (set via the `difficulty` param in /reset):
easy — 1 source, slow spread, CALM wind, high humidity, 200 max steps
medium — 2–4 sources, moderate spread, any wind, moderate humidity, 150 steps (default)
hard — 3–5 sources, fast spread, always windy, low humidity, 100 steps
Done conditions:
- Agent evacuated through an unblocked exit → success
- Agent health reaches 0 (overwhelmed by smoke/fire) → failure
- step_count >= max_steps → timeout
"""
import os
import random
import uuid
from collections import deque
from typing import Any, Dict, List, Optional, Tuple
from openenv.core.env_server.interfaces import Environment
try:
from ..models import PyreAction, PyreMapState, PyreObservation, PyreState
except (ImportError, ModuleNotFoundError):
from models import PyreAction, PyreMapState, PyreObservation, PyreState
from .fire_sim import FireSim, FIRE_BURNING, smoke_level_label, WIND_DIRS
from .floor_plan import generate_episode, generate_procedural_floor_plan, template_names
from .narrative import build_look_result, build_narrative_observation, compute_visible_cells
from .rubrics import make_per_step_rubrics, make_episode_end_rubrics, bfs_exit_dist, unblocked_exits, BFS_INF
# Cell type constants
FLOOR = 0
WALL = 1
DOOR_OPEN = 2
DOOR_CLOSED = 3
EXIT = 4
OBSTACLE = 5
_CARDINAL_DELTA = {"north": (0, -1), "south": (0, 1), "west": (-1, 0), "east": (1, 0)}
# Exit is blocked if fire intensity at exit cell exceeds this
EXIT_FIRE_THRESHOLD = 0.5
# Health damage per step (applied after each step)
DAMAGE_LIGHT_SMOKE = 0.5
DAMAGE_MODERATE_SMOKE = 2.0
DAMAGE_HEAVY_SMOKE = 5.0
DAMAGE_ON_FIRE = 10.0
# ---------------------------------------------------------------------------
# Difficulty presets — override fire randomisation ranges per episode
# ---------------------------------------------------------------------------
# Each preset is a dict of kwargs passed to reset(); ranges are (min, max) tuples.
_DIFFICULTY_PRESETS: Dict[str, Dict] = {
"easy": {
"n_sources_range": (1, 1),
"p_spread_range": (0.10, 0.20),
"humidity_range": (0.30, 0.50),
"wind_choices": ["CALM"],
"max_steps_override": 200,
# Map layout: fixed 16×16 templates (stable topology aids early learning)
"use_procedural": False,
"grid_dims": (16, 16),
"n_rooms_range": None,
"template_pool": None,
},
"medium": {
"n_sources_range": (2, 4),
"p_spread_range": (0.15, 0.40),
"humidity_range": (0.10, 0.45),
"wind_choices": list(WIND_DIRS.keys()),
"max_steps_override": None, # use env default
# Map layout: fixed 16×16 templates (same 3 buildings, fire varies)
"use_procedural": False,
"grid_dims": (16, 16),
"n_rooms_range": None,
"template_pool": None,
},
"hard_fixed": {
"n_sources_range": (3, 5),
"p_spread_range": (0.30, 0.55),
"humidity_range": (0.05, 0.20),
"wind_choices": [d for d in WIND_DIRS.keys() if d != "CALM"],
"max_steps_override": 100,
# Bridge stage: hard fire dynamics on a fixed topology before the
# final jump to fully procedural buildings.
"use_procedural": False,
"grid_dims": (16, 16),
"n_rooms_range": None,
"template_pool": ["t_corridor"],
},
"hard": {
"n_sources_range": (3, 5),
"p_spread_range": (0.30, 0.55),
"humidity_range": (0.05, 0.20),
"wind_choices": [d for d in WIND_DIRS.keys() if d != "CALM"],
"max_steps_override": 100,
# Map layout: procedurally generated 20×24 building — every episode unique
"use_procedural": True,
"grid_dims": (20, 24),
"n_rooms_range": (6, 10),
"template_pool": None,
},
}
def _idx(x: int, y: int, w: int) -> int:
return y * w + x
def _in_bounds(x: int, y: int, w: int, h: int) -> bool:
return 0 <= x < w and 0 <= y < h
def _manhattan(x1, y1, x2, y2):
return abs(x1 - x2) + abs(y1 - y2)
def _bfs_first_step_toward_exit(
sx: int,
sy: int,
exits: List[List[int]],
cell_grid: List[int],
w: int,
h: int,
) -> Optional[str]:
"""Return the first cardinal move on a shortest path to any reachable exit.
Closed doors are treated as traversable for planning, matching the reward
BFS used elsewhere. Returns None when no exit is reachable.
"""
if not exits:
return None
exit_set = {(ex[0], ex[1]) for ex in exits}
if (sx, sy) in exit_set:
return None
queue: deque = deque([(sx, sy, None)])
visited = {(sx, sy)}
moves = ((0, -1, "north"), (0, 1, "south"), (-1, 0, "west"), (1, 0, "east"))
while queue:
cx, cy, first_dir = queue.popleft()
for dx, dy, dir_name in moves:
nx, ny = cx + dx, cy + dy
if not _in_bounds(nx, ny, w, h):
continue
if (nx, ny) in visited:
continue
ct = cell_grid[_idx(nx, ny, w)]
if ct in (WALL, OBSTACLE):
continue
next_first = dir_name if first_dir is None else first_dir
if (nx, ny) in exit_set:
return next_first
visited.add((nx, ny))
queue.append((nx, ny, next_first))
return None
class PyreEnvironment(Environment):
"""First-person crisis navigation environment — single agent.
The agent is inside a burning building with partial observability.
It must navigate to safety before its health depletes.
Fire behaviour (spread rate, wind direction, ignition count) varies
each episode, forcing the agent to reason from observations rather
than memorising fixed patterns.
"""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(
self,
max_steps: int = 150,
base_seed: int = 42,
full_visibility: Optional[bool] = None,
):
super().__init__()
self.max_steps = int(os.environ.get("PYRE_MAX_STEPS", max_steps))
self.base_seed = int(os.environ.get("PYRE_SEED", base_seed))
if full_visibility is None:
full_visibility = os.environ.get("PYRE_FULL_VISIBILITY", "1").strip().lower() not in {"0", "false", "no"}
self.full_visibility = bool(full_visibility)
self._state: Optional[PyreState] = None
self._fire_sim: Optional[FireSim] = None
self._rng: Optional[random.Random] = None
self._per_step_rubrics = make_per_step_rubrics()
self._episode_rubrics = make_episode_end_rubrics()
self._episode_counter = 0
self._last_feedback = ""
# Episode-scoped reward tracking (reset each episode)
self._visited_cells: set = set()
self._min_exit_dist_reached: int = BFS_INF
self._rewarded_doors: set = set()
# ------------------------------------------------------------------
# OpenEnv API
# ------------------------------------------------------------------
def reset(self, seed: Optional[int] = None, difficulty: str = "medium", **kwargs) -> PyreObservation:
"""Start a new episode with fresh randomized fire parameters.
Args:
seed: Optional integer seed for full reproducibility.
difficulty: "easy" | "medium" | "hard_fixed" | "hard" — scales fire
behaviour, topology variation, and episode length.
Pass via POST /reset body: {"difficulty": "easy"}
"""
fire_seed = seed if seed is not None else (self.base_seed + self._episode_counter * 37)
self._episode_counter += 1
self._rng = random.Random(fire_seed)
# --- Resolve difficulty preset ---
preset = _DIFFICULTY_PRESETS.get(difficulty.lower(), _DIFFICULTY_PRESETS["medium"])
n_min, n_max = preset["n_sources_range"]
sp_min, sp_max = preset["p_spread_range"]
hm_min, hm_max = preset["humidity_range"]
wind_pool = preset["wind_choices"]
effective_max_steps = preset["max_steps_override"] or self.max_steps
# --- Randomize fire behaviour for this episode ---
n_sources = self._rng.randint(n_min, n_max)
p_spread = round(self._rng.uniform(sp_min, sp_max), 3)
humidity = round(self._rng.uniform(hm_min, hm_max), 3)
wind_dir = self._rng.choice(wind_pool)
# --- Floor plan ---
if preset["use_procedural"]:
pw, ph = preset["grid_dims"]
floor_plan = generate_procedural_floor_plan(
w=pw, h=ph, rng=self._rng,
n_rooms_range=preset["n_rooms_range"],
)
agent_start = self._rng.choice(floor_plan.agent_spawn_options)
# Randomise some doors closed at episode start (same 30% rule as generate_episode)
for dpos in floor_plan.door_positions:
if self._rng.random() < 0.3:
floor_plan.cell_grid[_idx(dpos[0], dpos[1], pw)] = DOOR_CLOSED
template_name = floor_plan.name
else:
templates = preset.get("template_pool") or template_names()
template_name = templates[self._episode_counter % len(templates)]
floor_plan, _, _, agent_start = generate_episode(
template_name, npc_count=0, seed=fire_seed
)
w, h = floor_plan.w, floor_plan.h
n_cells = w * h
fire_grid = [0.0] * n_cells
smoke_grid = [0.0] * n_cells
burn_timers = [0] * n_cells
# --- Place multiple fire sources ---
floor_cells = [
(x, y) for y in range(h) for x in range(w)
if floor_plan.cell_grid[_idx(x, y, w)] == FLOOR
]
fire_candidates = [
pos for pos in floor_cells
if all(
_manhattan(pos[0], pos[1], ex[0], ex[1]) >= floor_plan.fire_min_exit_dist
for ex in floor_plan.exit_positions
)
and _manhattan(pos[0], pos[1], agent_start[0], agent_start[1]) >= 4
]
if not fire_candidates:
fire_candidates = [
pos for pos in floor_cells
if pos != agent_start
and pos not in [(e[0], e[1]) for e in floor_plan.exit_positions]
]
n_sources = min(n_sources, len(fire_candidates))
self._rng.shuffle(fire_candidates)
fire_sources = fire_candidates[:n_sources]
for fx, fy in fire_sources:
# Start smoldering (0.1) so the agent has a few steps to observe
# before fire reaches spreading intensity (>= 0.3).
fire_grid[_idx(fx, fy, w)] = 0.1
# --- Door registry ---
door_registry: Dict[str, List[int]] = {}
for j, (dx, dy) in enumerate(floor_plan.door_positions):
door_registry[f"door_{j + 1}"] = [dx, dy]
self._last_feedback = "Episode started. Assess your surroundings."
self._difficulty = difficulty.lower()
self._state = PyreState.model_construct(
episode_id=str(uuid.uuid4()),
step_count=0,
grid_w=w,
grid_h=h,
template_name=template_name,
cell_grid=floor_plan.cell_grid,
fire_grid=fire_grid,
smoke_grid=smoke_grid,
burn_timers=burn_timers,
exit_positions=[[ex[0], ex[1]] for ex in floor_plan.exit_positions],
door_registry=door_registry,
zone_map=floor_plan.zone_map,
agent_x=agent_start[0],
agent_y=agent_start[1],
agent_alive=True,
agent_evacuated=False,
agent_health=100.0,
max_steps=effective_max_steps,
fire_seed=fire_seed,
fire_sources_count=n_sources,
fire_spread_rate=p_spread,
wind_dir=wind_dir,
humidity=humidity,
)
# Reset episode-scoped reward tracking
self._visited_cells = {(self._state.agent_x, self._state.agent_y)}
self._min_exit_dist_reached = BFS_INF
self._rewarded_doors = set()
self._fire_sim = FireSim(
w=w, h=h, rng=self._rng,
p_spread=p_spread,
wind_dir=wind_dir,
humidity=humidity,
fuel_map=floor_plan.fuel_map,
ventilation_map=floor_plan.ventilation_map,
)
obs_data = build_narrative_observation(
step_count=0,
agent_x=agent_start[0],
agent_y=agent_start[1],
agent_alive=True,
agent_evacuated=False,
agent_health=100.0,
cell_grid=floor_plan.cell_grid,
fire_grid=fire_grid,
smoke_grid=smoke_grid,
exit_positions=[[ex[0], ex[1]] for ex in floor_plan.exit_positions],
door_registry=door_registry,
zone_map=floor_plan.zone_map,
last_action_feedback=self._last_feedback,
wind_dir=wind_dir,
w=w, h=h,
visible_override=self._visible_set_for_state(self._state),
)
visible_set = self._visible_set_for_state(self._state)
obs_data["metadata"] = self._build_observation_metadata(self._state, visible_set)
obs_data["map_state"] = self._build_map_state(self._state, visible_set=visible_set)
return PyreObservation(**obs_data)
def _visible_set_for_state(self, st: PyreState) -> set:
"""Return the visibility set used for observations and encoded map_state."""
if not st.agent_alive or st.agent_evacuated:
return set()
if self.full_visibility:
return {(x, y) for y in range(st.grid_h) for x in range(st.grid_w)}
return compute_visible_cells(
st.agent_x, st.agent_y, st.cell_grid, st.smoke_grid, st.grid_w, st.grid_h,
)
def _build_observation_metadata(self, st: PyreState, visible_set: Optional[set] = None) -> Dict[str, Any]:
"""Metadata consumed by the trainer-side observation encoder."""
if visible_set is None:
visible_set = compute_visible_cells(
st.agent_x, st.agent_y, st.cell_grid, st.smoke_grid, st.grid_w, st.grid_h,
) if st.agent_alive and not st.agent_evacuated else set()
reachable_exits = unblocked_exits(st.exit_positions, st.fire_grid, st.grid_w)
exits_for_dist = reachable_exits if reachable_exits else st.exit_positions
nearest_exit_distance = bfs_exit_dist(
st.agent_x, st.agent_y, exits_for_dist, st.cell_grid, st.grid_w, st.grid_h,
)
nearest_exit_direction = _bfs_first_step_toward_exit(
st.agent_x, st.agent_y, exits_for_dist, st.cell_grid, st.grid_w, st.grid_h,
)
return {
"agent_health": st.agent_health,
"step": st.step_count,
"wind_dir": st.wind_dir,
"fire_spread_rate": st.fire_spread_rate,
"fire_sources": st.fire_sources_count,
"humidity": st.humidity,
"difficulty": getattr(self, "_difficulty", "medium"),
"nearest_exit_distance": nearest_exit_distance,
"nearest_exit_direction": nearest_exit_direction,
"reachable_exit_count": len(reachable_exits),
"visible_cell_count": len(visible_set),
}
def step(self, action: PyreAction, **kwargs) -> PyreObservation:
"""Execute one action, advance simulation, return observation + reward."""
st = self._state
if st is None:
st = self._make_default_state()
prev_agent_x = st.agent_x
prev_agent_y = st.agent_y
prev_health = st.agent_health
# --- Execute action ---
feedback = self._execute_action(action, st)
self._last_feedback = feedback
# --- Check self-evacuation (must be unblocked exit) ---
if st.agent_alive and not st.agent_evacuated:
agent_cell = st.cell_grid[_idx(st.agent_x, st.agent_y, st.grid_w)]
if agent_cell == EXIT:
fire_at_exit = st.fire_grid[_idx(st.agent_x, st.agent_y, st.grid_w)]
if fire_at_exit < EXIT_FIRE_THRESHOLD:
st.agent_evacuated = True
feedback = "You step through the exit and escape the building!"
self._last_feedback = feedback
else:
feedback = "The exit is engulfed in flames — you can't get through!"
self._last_feedback = feedback
# --- Advance fire simulation ---
self._fire_sim.step(st.cell_grid, st.fire_grid, st.smoke_grid, st.burn_timers)
# --- Apply health damage from smoke/fire ---
health_damage = 0.0
if st.agent_alive and not st.agent_evacuated:
ai = _idx(st.agent_x, st.agent_y, st.grid_w)
smoke = st.smoke_grid[ai]
fire = st.fire_grid[ai]
smoke_label = smoke_level_label(smoke)
if smoke_label == "heavy":
health_damage += DAMAGE_HEAVY_SMOKE
elif smoke_label == "moderate":
health_damage += DAMAGE_MODERATE_SMOKE
elif smoke_label == "light":
health_damage += DAMAGE_LIGHT_SMOKE
if fire >= FIRE_BURNING:
health_damage += DAMAGE_ON_FIRE
st.agent_health = max(0.0, st.agent_health - health_damage)
if st.agent_health <= 0:
st.agent_alive = False
self._last_feedback = "You collapse — overwhelmed by fire and smoke."
st.step_count += 1
# --- Episode-scoped reward tracking ---
# Detect first visit to current cell (before adding to visited set)
is_new_cell = (st.agent_x, st.agent_y) not in self._visited_cells
self._visited_cells.add((st.agent_x, st.agent_y))
# Track closest approach to any exit for NearMissBonus
_exits_reachable = unblocked_exits(st.exit_positions, st.fire_grid, st.grid_w)
_exits = _exits_reachable if _exits_reachable else st.exit_positions
_cur_dist = bfs_exit_dist(st.agent_x, st.agent_y, _exits, st.cell_grid, st.grid_w, st.grid_h)
if _cur_dist < self._min_exit_dist_reached:
self._min_exit_dist_reached = _cur_dist
# --- Done check ---
done = self._check_done(st)
# --- Reward ---
reward = self._compute_reward(
action=action.action,
target_id=action.target_id,
door_state=action.door_state,
prev_agent_x=prev_agent_x,
prev_agent_y=prev_agent_y,
health_damage=health_damage,
is_new_cell=is_new_cell,
st=st,
done=done,
)
# --- Build observation ---
obs_data = build_narrative_observation(
step_count=st.step_count,
agent_x=st.agent_x,
agent_y=st.agent_y,
agent_alive=st.agent_alive,
agent_evacuated=st.agent_evacuated,
agent_health=st.agent_health,
cell_grid=st.cell_grid,
fire_grid=st.fire_grid,
smoke_grid=st.smoke_grid,
exit_positions=st.exit_positions,
door_registry=st.door_registry,
zone_map=st.zone_map,
last_action_feedback=self._last_feedback,
wind_dir=st.wind_dir,
w=st.grid_w, h=st.grid_h,
visible_override=self._visible_set_for_state(st),
)
obs_data["done"] = done
obs_data["reward"] = reward
# Compute visible cells once so both metadata and map_state can use the count
_visible_set = self._visible_set_for_state(st)
obs_data["metadata"] = self._build_observation_metadata(st, _visible_set)
obs_data["map_state"] = self._build_map_state(st, visible_set=_visible_set)
return PyreObservation(**obs_data)
@property
def state(self) -> PyreState:
if self._state is None:
self._state = self._make_default_state()
return self._state
# ------------------------------------------------------------------
# Action execution
# ------------------------------------------------------------------
def _execute_action(self, action: PyreAction, st: PyreState) -> str:
act = action.action.strip().lower()
if act == "move":
return self._action_move(action, st)
elif act == "door":
return self._action_door(action, st)
elif act == "look":
return self._action_look(action, st)
elif act == "wait":
return "You wait and listen to the building."
else:
return f"Unknown action '{act}'. Nothing happened."
def _action_move(self, action: PyreAction, st: PyreState) -> str:
direction = (action.direction or "").lower()
delta = _CARDINAL_DELTA.get(direction)
if delta is None:
return f"Invalid direction '{direction}'."
nx, ny = st.agent_x + delta[0], st.agent_y + delta[1]
if not _in_bounds(nx, ny, st.grid_w, st.grid_h):
return "You walk into the outer wall — blocked."
ct = st.cell_grid[_idx(nx, ny, st.grid_w)]
if ct in (WALL, OBSTACLE):
return "Blocked by wall or debris."
if ct == DOOR_CLOSED:
return f"The door to the {direction} is closed. Open it first."
st.agent_x = nx
st.agent_y = ny
smoke = st.smoke_grid[_idx(nx, ny, st.grid_w)]
fire = st.fire_grid[_idx(nx, ny, st.grid_w)]
suffix = ""
if smoke > 0.5:
suffix = " The smoke is thick here."
if fire > 0.1:
suffix += " You feel intense heat."
return f"You move {direction}.{suffix}"
def _action_look(self, action: PyreAction, st: PyreState) -> str:
direction = (action.direction or "").strip().lower()
if not direction:
return "look requires a direction: north, south, east, or west."
return build_look_result(
direction=direction,
agent_x=st.agent_x,
agent_y=st.agent_y,
cell_grid=st.cell_grid,
fire_grid=st.fire_grid,
smoke_grid=st.smoke_grid,
zone_map=st.zone_map,
door_registry=st.door_registry,
w=st.grid_w,
h=st.grid_h,
)
def _action_door(self, action: PyreAction, st: PyreState) -> str:
target_id = action.target_id
door_state = (action.door_state or "").strip().lower()
if not target_id:
return "door requires a target_id (door ID)."
if door_state not in ("open", "close"):
return "door requires door_state='open' or door_state='close'."
if target_id not in st.door_registry:
return f"Door '{target_id}' not found."
dx, dy = st.door_registry[target_id]
if _manhattan(st.agent_x, st.agent_y, dx, dy) > 2:
return f"Door '{target_id}' is too far away."
ct = st.cell_grid[_idx(dx, dy, st.grid_w)]
if ct not in (DOOR_OPEN, DOOR_CLOSED):
return f"'{target_id}' is not a door."
if door_state == "close":
if ct == DOOR_CLOSED:
return f"Door '{target_id}' is already closed."
st.cell_grid[_idx(dx, dy, st.grid_w)] = DOOR_CLOSED
return f"You close door '{target_id}'. It may slow the fire."
else:
if ct == DOOR_OPEN:
return f"Door '{target_id}' is already open."
st.cell_grid[_idx(dx, dy, st.grid_w)] = DOOR_OPEN
return f"You open door '{target_id}'."
# ------------------------------------------------------------------
# Done check
# ------------------------------------------------------------------
def _check_done(self, st: PyreState) -> bool:
if not st.agent_alive:
return True
if st.agent_evacuated:
return True
if st.step_count >= st.max_steps:
return True
return False
# ------------------------------------------------------------------
# Reward
# ------------------------------------------------------------------
def _compute_reward(
self,
action: str,
target_id: Optional[str],
door_state: Optional[str],
prev_agent_x: int,
prev_agent_y: int,
health_damage: float,
is_new_cell: bool,
st: PyreState,
done: bool,
) -> float:
kwargs = dict(
action=action,
target_id=target_id,
door_state=door_state,
prev_agent_x=prev_agent_x,
prev_agent_y=prev_agent_y,
agent_x=st.agent_x,
agent_y=st.agent_y,
exit_positions=st.exit_positions,
cell_grid=st.cell_grid,
fire_grid=st.fire_grid,
smoke_grid=st.smoke_grid,
w=st.grid_w,
h=st.grid_h,
door_registry=st.door_registry,
done=done,
agent_evacuated=st.agent_evacuated,
agent_alive=st.agent_alive,
agent_health=st.agent_health,
health_damage=health_damage,
remaining_steps=max(0, st.max_steps - st.step_count),
is_new_cell=is_new_cell,
min_exit_dist_reached=self._min_exit_dist_reached,
rewarded_doors=self._rewarded_doors,
reachable_exit_count=len(unblocked_exits(st.exit_positions, st.fire_grid, st.grid_w)),
)
total = 0.0
for rubric in self._per_step_rubrics:
total += rubric.score(**kwargs)
if done:
for rubric in self._episode_rubrics:
total += rubric.score(**kwargs)
return round(total, 4)
# ------------------------------------------------------------------
# Map state builder
# ------------------------------------------------------------------
def _build_map_state(self, st: PyreState, visible_set: Optional[set] = None) -> PyreMapState:
"""Assemble the full numerical grid snapshot for UI / visualization.
Args:
visible_set: Pre-computed set of (x, y) visible cells. When provided
(e.g. from step()) the second compute_visible_cells call
is skipped. Pass None to compute fresh (used by reset()).
"""
if st.agent_alive and not st.agent_evacuated:
if visible_set is None:
visible_set = self._visible_set_for_state(st)
visible_cells = [[x, y] for x, y in sorted(visible_set)]
else:
visible_cells = []
return PyreMapState(
grid_w=st.grid_w,
grid_h=st.grid_h,
template_name=st.template_name,
episode_id=st.episode_id or "",
step_count=st.step_count,
max_steps=st.max_steps,
cell_grid=list(st.cell_grid),
fire_grid=list(st.fire_grid),
smoke_grid=list(st.smoke_grid),
agent_x=st.agent_x,
agent_y=st.agent_y,
agent_alive=st.agent_alive,
agent_evacuated=st.agent_evacuated,
agent_health=st.agent_health,
visible_cells=visible_cells,
exit_positions=list(st.exit_positions),
door_registry=dict(st.door_registry),
fire_spread_rate=st.fire_spread_rate,
wind_dir=st.wind_dir,
humidity=st.humidity,
)
# ------------------------------------------------------------------
# Defaults
# ------------------------------------------------------------------
def _make_default_state(self) -> PyreState:
return PyreState.model_construct(
episode_id="",
step_count=0,
grid_w=16,
grid_h=16,
template_name="",
cell_grid=[0] * 256,
fire_grid=[0.0] * 256,
smoke_grid=[0.0] * 256,
burn_timers=[0] * 256,
exit_positions=[],
door_registry={},
zone_map={},
agent_x=0,
agent_y=0,
agent_alive=True,
agent_evacuated=False,
agent_health=100.0,
max_steps=self.max_steps,
fire_seed=0,
fire_sources_count=2,
fire_spread_rate=0.25,
wind_dir="CALM",
humidity=0.25,
)