Spaces:
Sleeping
Sleeping
| """ | |
| PyreEnvironment — single-agent crisis navigation environment. | |
| Orchestrates: | |
| - Floor plan generation (floor_plan.py) | |
| - Fire/smoke dynamics with per-episode variability (fire_sim.py) | |
| - Narrative observation rendering (narrative.py) | |
| - Composite reward rubrics (rubrics.py) | |
| Per-episode randomization (makes each episode unique): | |
| - Template selected in rotation | |
| - Number of fire ignition sources: varies by difficulty | |
| - Fire spread rate: varies by difficulty | |
| - Wind direction: varies by difficulty | |
| - Humidity: varies by difficulty | |
| - Agent spawn position: random from template spawn options | |
| - Fire start positions: random floor cells far from exits and agent | |
| Difficulty levels (set via the `difficulty` param in /reset): | |
| easy — 1 source, slow spread, CALM wind, high humidity, 200 max steps | |
| medium — 2–4 sources, moderate spread, any wind, moderate humidity, 150 steps (default) | |
| hard — 3–5 sources, fast spread, always windy, low humidity, 100 steps | |
| Done conditions: | |
| - Agent evacuated through an unblocked exit → success | |
| - Agent health reaches 0 (overwhelmed by smoke/fire) → failure | |
| - step_count >= max_steps → timeout | |
| """ | |
| import os | |
| import random | |
| import uuid | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from openenv.core.env_server.interfaces import Environment | |
| try: | |
| from ..models import PyreAction, PyreMapState, PyreObservation, PyreState | |
| except (ImportError, ModuleNotFoundError): | |
| from models import PyreAction, PyreMapState, PyreObservation, PyreState | |
| from .fire_sim import FireSim, FIRE_BURNING, smoke_level_label, WIND_DIRS | |
| from .floor_plan import generate_episode, generate_procedural_floor_plan, template_names | |
| from .narrative import build_look_result, build_narrative_observation, compute_visible_cells | |
| from .rubrics import make_per_step_rubrics, make_episode_end_rubrics, bfs_exit_dist, unblocked_exits, BFS_INF | |
| # Cell type constants | |
| FLOOR = 0 | |
| WALL = 1 | |
| DOOR_OPEN = 2 | |
| DOOR_CLOSED = 3 | |
| EXIT = 4 | |
| OBSTACLE = 5 | |
| _CARDINAL_DELTA = {"north": (0, -1), "south": (0, 1), "west": (-1, 0), "east": (1, 0)} | |
| # Exit is blocked if fire intensity at exit cell exceeds this | |
| EXIT_FIRE_THRESHOLD = 0.5 | |
| # Health damage per step (applied after each step) | |
| DAMAGE_LIGHT_SMOKE = 0.5 | |
| DAMAGE_MODERATE_SMOKE = 2.0 | |
| DAMAGE_HEAVY_SMOKE = 5.0 | |
| DAMAGE_ON_FIRE = 10.0 | |
| # --------------------------------------------------------------------------- | |
| # Difficulty presets — override fire randomisation ranges per episode | |
| # --------------------------------------------------------------------------- | |
| # Each preset is a dict of kwargs passed to reset(); ranges are (min, max) tuples. | |
| _DIFFICULTY_PRESETS: Dict[str, Dict] = { | |
| "easy": { | |
| "n_sources_range": (1, 1), | |
| "p_spread_range": (0.10, 0.20), | |
| "humidity_range": (0.30, 0.50), | |
| "wind_choices": ["CALM"], | |
| "max_steps_override": 200, | |
| # Map layout: fixed 16×16 templates (stable topology aids early learning) | |
| "use_procedural": False, | |
| "grid_dims": (16, 16), | |
| "n_rooms_range": None, | |
| }, | |
| "medium": { | |
| "n_sources_range": (2, 4), | |
| "p_spread_range": (0.15, 0.40), | |
| "humidity_range": (0.10, 0.45), | |
| "wind_choices": list(WIND_DIRS.keys()), | |
| "max_steps_override": None, # use env default | |
| # Map layout: fixed 16×16 templates (same 3 buildings, fire varies) | |
| "use_procedural": False, | |
| "grid_dims": (16, 16), | |
| "n_rooms_range": None, | |
| }, | |
| "hard": { | |
| "n_sources_range": (3, 5), | |
| "p_spread_range": (0.30, 0.55), | |
| "humidity_range": (0.05, 0.20), | |
| "wind_choices": [d for d in WIND_DIRS.keys() if d != "CALM"], | |
| "max_steps_override": 100, | |
| # Map layout: procedurally generated 20×24 building — every episode unique | |
| "use_procedural": True, | |
| "grid_dims": (20, 24), | |
| "n_rooms_range": (6, 10), | |
| }, | |
| } | |
| def _idx(x: int, y: int, w: int) -> int: | |
| return y * w + x | |
| def _in_bounds(x: int, y: int, w: int, h: int) -> bool: | |
| return 0 <= x < w and 0 <= y < h | |
| def _manhattan(x1, y1, x2, y2): | |
| return abs(x1 - x2) + abs(y1 - y2) | |
| class PyreEnvironment(Environment): | |
| """First-person crisis navigation environment — single agent. | |
| The agent is inside a burning building with partial observability. | |
| It must navigate to safety before its health depletes. | |
| Fire behaviour (spread rate, wind direction, ignition count) varies | |
| each episode, forcing the agent to reason from observations rather | |
| than memorising fixed patterns. | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__( | |
| self, | |
| max_steps: int = 150, | |
| base_seed: int = 42, | |
| ): | |
| super().__init__() | |
| self.max_steps = int(os.environ.get("PYRE_MAX_STEPS", max_steps)) | |
| self.base_seed = int(os.environ.get("PYRE_SEED", base_seed)) | |
| self._state: Optional[PyreState] = None | |
| self._fire_sim: Optional[FireSim] = None | |
| self._rng: Optional[random.Random] = None | |
| self._per_step_rubrics = make_per_step_rubrics() | |
| self._episode_rubrics = make_episode_end_rubrics() | |
| self._episode_counter = 0 | |
| self._last_feedback = "" | |
| # Episode-scoped reward tracking (reset each episode) | |
| self._visited_cells: set = set() | |
| self._min_exit_dist_reached: int = BFS_INF | |
| self._rewarded_doors: set = set() | |
| # ------------------------------------------------------------------ | |
| # OpenEnv API | |
| # ------------------------------------------------------------------ | |
| def reset(self, seed: Optional[int] = None, difficulty: str = "medium", **kwargs) -> PyreObservation: | |
| """Start a new episode with fresh randomized fire parameters. | |
| Args: | |
| seed: Optional integer seed for full reproducibility. | |
| difficulty: "easy" | "medium" | "hard" — scales fire behaviour and episode length. | |
| Pass via POST /reset body: {"difficulty": "easy"} | |
| """ | |
| fire_seed = seed if seed is not None else (self.base_seed + self._episode_counter * 37) | |
| self._episode_counter += 1 | |
| self._rng = random.Random(fire_seed) | |
| # --- Resolve difficulty preset --- | |
| preset = _DIFFICULTY_PRESETS.get(difficulty.lower(), _DIFFICULTY_PRESETS["medium"]) | |
| n_min, n_max = preset["n_sources_range"] | |
| sp_min, sp_max = preset["p_spread_range"] | |
| hm_min, hm_max = preset["humidity_range"] | |
| wind_pool = preset["wind_choices"] | |
| effective_max_steps = preset["max_steps_override"] or self.max_steps | |
| # --- Randomize fire behaviour for this episode --- | |
| n_sources = self._rng.randint(n_min, n_max) | |
| p_spread = round(self._rng.uniform(sp_min, sp_max), 3) | |
| humidity = round(self._rng.uniform(hm_min, hm_max), 3) | |
| wind_dir = self._rng.choice(wind_pool) | |
| # --- Floor plan --- | |
| if preset["use_procedural"]: | |
| pw, ph = preset["grid_dims"] | |
| floor_plan = generate_procedural_floor_plan( | |
| w=pw, h=ph, rng=self._rng, | |
| n_rooms_range=preset["n_rooms_range"], | |
| ) | |
| agent_start = self._rng.choice(floor_plan.agent_spawn_options) | |
| # Randomise some doors closed at episode start (same 30% rule as generate_episode) | |
| for dpos in floor_plan.door_positions: | |
| if self._rng.random() < 0.3: | |
| floor_plan.cell_grid[_idx(dpos[0], dpos[1], pw)] = DOOR_CLOSED | |
| template_name = floor_plan.name | |
| else: | |
| templates = template_names() | |
| template_name = templates[self._episode_counter % len(templates)] | |
| floor_plan, _, _, agent_start = generate_episode( | |
| template_name, npc_count=0, seed=fire_seed | |
| ) | |
| w, h = floor_plan.w, floor_plan.h | |
| n_cells = w * h | |
| fire_grid = [0.0] * n_cells | |
| smoke_grid = [0.0] * n_cells | |
| burn_timers = [0] * n_cells | |
| # --- Place multiple fire sources --- | |
| floor_cells = [ | |
| (x, y) for y in range(h) for x in range(w) | |
| if floor_plan.cell_grid[_idx(x, y, w)] == FLOOR | |
| ] | |
| fire_candidates = [ | |
| pos for pos in floor_cells | |
| if all( | |
| _manhattan(pos[0], pos[1], ex[0], ex[1]) >= floor_plan.fire_min_exit_dist | |
| for ex in floor_plan.exit_positions | |
| ) | |
| and _manhattan(pos[0], pos[1], agent_start[0], agent_start[1]) >= 4 | |
| ] | |
| if not fire_candidates: | |
| fire_candidates = [ | |
| pos for pos in floor_cells | |
| if pos != agent_start | |
| and pos not in [(e[0], e[1]) for e in floor_plan.exit_positions] | |
| ] | |
| n_sources = min(n_sources, len(fire_candidates)) | |
| self._rng.shuffle(fire_candidates) | |
| fire_sources = fire_candidates[:n_sources] | |
| for fx, fy in fire_sources: | |
| # Start smoldering (0.1) so the agent has a few steps to observe | |
| # before fire reaches spreading intensity (>= 0.3). | |
| fire_grid[_idx(fx, fy, w)] = 0.1 | |
| # --- Door registry --- | |
| door_registry: Dict[str, List[int]] = {} | |
| for j, (dx, dy) in enumerate(floor_plan.door_positions): | |
| door_registry[f"door_{j + 1}"] = [dx, dy] | |
| self._last_feedback = "Episode started. Assess your surroundings." | |
| self._difficulty = difficulty.lower() | |
| self._state = PyreState.model_construct( | |
| episode_id=str(uuid.uuid4()), | |
| step_count=0, | |
| grid_w=w, | |
| grid_h=h, | |
| template_name=template_name, | |
| cell_grid=floor_plan.cell_grid, | |
| fire_grid=fire_grid, | |
| smoke_grid=smoke_grid, | |
| burn_timers=burn_timers, | |
| exit_positions=[[ex[0], ex[1]] for ex in floor_plan.exit_positions], | |
| door_registry=door_registry, | |
| zone_map=floor_plan.zone_map, | |
| agent_x=agent_start[0], | |
| agent_y=agent_start[1], | |
| agent_alive=True, | |
| agent_evacuated=False, | |
| agent_health=100.0, | |
| max_steps=effective_max_steps, | |
| fire_seed=fire_seed, | |
| fire_sources_count=n_sources, | |
| fire_spread_rate=p_spread, | |
| wind_dir=wind_dir, | |
| humidity=humidity, | |
| ) | |
| # Reset episode-scoped reward tracking | |
| self._visited_cells = {(self._state.agent_x, self._state.agent_y)} | |
| self._min_exit_dist_reached = BFS_INF | |
| self._rewarded_doors = set() | |
| self._fire_sim = FireSim( | |
| w=w, h=h, rng=self._rng, | |
| p_spread=p_spread, | |
| wind_dir=wind_dir, | |
| humidity=humidity, | |
| fuel_map=floor_plan.fuel_map, | |
| ventilation_map=floor_plan.ventilation_map, | |
| ) | |
| obs_data = build_narrative_observation( | |
| step_count=0, | |
| agent_x=agent_start[0], | |
| agent_y=agent_start[1], | |
| agent_alive=True, | |
| agent_evacuated=False, | |
| agent_health=100.0, | |
| cell_grid=floor_plan.cell_grid, | |
| fire_grid=fire_grid, | |
| smoke_grid=smoke_grid, | |
| exit_positions=[[ex[0], ex[1]] for ex in floor_plan.exit_positions], | |
| door_registry=door_registry, | |
| zone_map=floor_plan.zone_map, | |
| last_action_feedback=self._last_feedback, | |
| wind_dir=wind_dir, | |
| w=w, h=h, | |
| ) | |
| obs_data["map_state"] = self._build_map_state(self._state) | |
| return PyreObservation(**obs_data) | |
| def step(self, action: PyreAction, **kwargs) -> PyreObservation: | |
| """Execute one action, advance simulation, return observation + reward.""" | |
| st = self._state | |
| if st is None: | |
| st = self._make_default_state() | |
| prev_agent_x = st.agent_x | |
| prev_agent_y = st.agent_y | |
| prev_health = st.agent_health | |
| # --- Execute action --- | |
| feedback = self._execute_action(action, st) | |
| self._last_feedback = feedback | |
| # --- Check self-evacuation (must be unblocked exit) --- | |
| if st.agent_alive and not st.agent_evacuated: | |
| agent_cell = st.cell_grid[_idx(st.agent_x, st.agent_y, st.grid_w)] | |
| if agent_cell == EXIT: | |
| fire_at_exit = st.fire_grid[_idx(st.agent_x, st.agent_y, st.grid_w)] | |
| if fire_at_exit < EXIT_FIRE_THRESHOLD: | |
| st.agent_evacuated = True | |
| feedback = "You step through the exit and escape the building!" | |
| self._last_feedback = feedback | |
| else: | |
| feedback = "The exit is engulfed in flames — you can't get through!" | |
| self._last_feedback = feedback | |
| # --- Advance fire simulation --- | |
| self._fire_sim.step(st.cell_grid, st.fire_grid, st.smoke_grid, st.burn_timers) | |
| # --- Apply health damage from smoke/fire --- | |
| health_damage = 0.0 | |
| if st.agent_alive and not st.agent_evacuated: | |
| ai = _idx(st.agent_x, st.agent_y, st.grid_w) | |
| smoke = st.smoke_grid[ai] | |
| fire = st.fire_grid[ai] | |
| smoke_label = smoke_level_label(smoke) | |
| if smoke_label == "heavy": | |
| health_damage += DAMAGE_HEAVY_SMOKE | |
| elif smoke_label == "moderate": | |
| health_damage += DAMAGE_MODERATE_SMOKE | |
| elif smoke_label == "light": | |
| health_damage += DAMAGE_LIGHT_SMOKE | |
| if fire >= FIRE_BURNING: | |
| health_damage += DAMAGE_ON_FIRE | |
| st.agent_health = max(0.0, st.agent_health - health_damage) | |
| if st.agent_health <= 0: | |
| st.agent_alive = False | |
| self._last_feedback = "You collapse — overwhelmed by fire and smoke." | |
| st.step_count += 1 | |
| # --- Episode-scoped reward tracking --- | |
| # Detect first visit to current cell (before adding to visited set) | |
| is_new_cell = (st.agent_x, st.agent_y) not in self._visited_cells | |
| self._visited_cells.add((st.agent_x, st.agent_y)) | |
| # Track closest approach to any exit for NearMissBonus | |
| _exits = unblocked_exits(st.exit_positions, st.fire_grid, st.grid_w) | |
| if not _exits: | |
| _exits = st.exit_positions | |
| _cur_dist = bfs_exit_dist(st.agent_x, st.agent_y, _exits, st.cell_grid, st.grid_w, st.grid_h) | |
| if _cur_dist < self._min_exit_dist_reached: | |
| self._min_exit_dist_reached = _cur_dist | |
| # --- Done check --- | |
| done = self._check_done(st) | |
| # --- Reward --- | |
| reward = self._compute_reward( | |
| action=action.action, | |
| target_id=action.target_id, | |
| door_state=action.door_state, | |
| prev_agent_x=prev_agent_x, | |
| prev_agent_y=prev_agent_y, | |
| health_damage=health_damage, | |
| is_new_cell=is_new_cell, | |
| st=st, | |
| done=done, | |
| ) | |
| # --- Build observation --- | |
| obs_data = build_narrative_observation( | |
| step_count=st.step_count, | |
| agent_x=st.agent_x, | |
| agent_y=st.agent_y, | |
| agent_alive=st.agent_alive, | |
| agent_evacuated=st.agent_evacuated, | |
| agent_health=st.agent_health, | |
| cell_grid=st.cell_grid, | |
| fire_grid=st.fire_grid, | |
| smoke_grid=st.smoke_grid, | |
| exit_positions=st.exit_positions, | |
| door_registry=st.door_registry, | |
| zone_map=st.zone_map, | |
| last_action_feedback=self._last_feedback, | |
| wind_dir=st.wind_dir, | |
| w=st.grid_w, h=st.grid_h, | |
| ) | |
| obs_data["done"] = done | |
| obs_data["reward"] = reward | |
| obs_data["metadata"] = { | |
| "agent_health": st.agent_health, | |
| "step": st.step_count, | |
| "wind_dir": st.wind_dir, | |
| "fire_spread_rate": st.fire_spread_rate, | |
| "fire_sources": st.fire_sources_count, | |
| "humidity": st.humidity, | |
| "difficulty": getattr(self, "_difficulty", "medium"), | |
| } | |
| obs_data["map_state"] = self._build_map_state(st) | |
| return PyreObservation(**obs_data) | |
| def state(self) -> PyreState: | |
| if self._state is None: | |
| self._state = self._make_default_state() | |
| return self._state | |
| # ------------------------------------------------------------------ | |
| # Action execution | |
| # ------------------------------------------------------------------ | |
| def _execute_action(self, action: PyreAction, st: PyreState) -> str: | |
| act = action.action.strip().lower() | |
| if act == "move": | |
| return self._action_move(action, st) | |
| elif act == "door": | |
| return self._action_door(action, st) | |
| elif act == "look": | |
| return self._action_look(action, st) | |
| elif act == "wait": | |
| return "You wait and listen to the building." | |
| else: | |
| return f"Unknown action '{act}'. Nothing happened." | |
| def _action_move(self, action: PyreAction, st: PyreState) -> str: | |
| direction = (action.direction or "").lower() | |
| delta = _CARDINAL_DELTA.get(direction) | |
| if delta is None: | |
| return f"Invalid direction '{direction}'." | |
| nx, ny = st.agent_x + delta[0], st.agent_y + delta[1] | |
| if not _in_bounds(nx, ny, st.grid_w, st.grid_h): | |
| return "You walk into the outer wall — blocked." | |
| ct = st.cell_grid[_idx(nx, ny, st.grid_w)] | |
| if ct in (WALL, OBSTACLE): | |
| return "Blocked by wall or debris." | |
| if ct == DOOR_CLOSED: | |
| return f"The door to the {direction} is closed. Open it first." | |
| st.agent_x = nx | |
| st.agent_y = ny | |
| smoke = st.smoke_grid[_idx(nx, ny, st.grid_w)] | |
| fire = st.fire_grid[_idx(nx, ny, st.grid_w)] | |
| suffix = "" | |
| if smoke > 0.5: | |
| suffix = " The smoke is thick here." | |
| if fire > 0.1: | |
| suffix += " You feel intense heat." | |
| return f"You move {direction}.{suffix}" | |
| def _action_look(self, action: PyreAction, st: PyreState) -> str: | |
| direction = (action.direction or "").strip().lower() | |
| if not direction: | |
| return "look requires a direction: north, south, east, or west." | |
| return build_look_result( | |
| direction=direction, | |
| agent_x=st.agent_x, | |
| agent_y=st.agent_y, | |
| cell_grid=st.cell_grid, | |
| fire_grid=st.fire_grid, | |
| smoke_grid=st.smoke_grid, | |
| zone_map=st.zone_map, | |
| door_registry=st.door_registry, | |
| w=st.grid_w, | |
| h=st.grid_h, | |
| ) | |
| def _action_door(self, action: PyreAction, st: PyreState) -> str: | |
| target_id = action.target_id | |
| door_state = (action.door_state or "").strip().lower() | |
| if not target_id: | |
| return "door requires a target_id (door ID)." | |
| if door_state not in ("open", "close"): | |
| return "door requires door_state='open' or door_state='close'." | |
| if target_id not in st.door_registry: | |
| return f"Door '{target_id}' not found." | |
| dx, dy = st.door_registry[target_id] | |
| if _manhattan(st.agent_x, st.agent_y, dx, dy) > 2: | |
| return f"Door '{target_id}' is too far away." | |
| ct = st.cell_grid[_idx(dx, dy, st.grid_w)] | |
| if ct not in (DOOR_OPEN, DOOR_CLOSED): | |
| return f"'{target_id}' is not a door." | |
| if door_state == "close": | |
| if ct == DOOR_CLOSED: | |
| return f"Door '{target_id}' is already closed." | |
| st.cell_grid[_idx(dx, dy, st.grid_w)] = DOOR_CLOSED | |
| return f"You close door '{target_id}'. It may slow the fire." | |
| else: | |
| if ct == DOOR_OPEN: | |
| return f"Door '{target_id}' is already open." | |
| st.cell_grid[_idx(dx, dy, st.grid_w)] = DOOR_OPEN | |
| return f"You open door '{target_id}'." | |
| # ------------------------------------------------------------------ | |
| # Done check | |
| # ------------------------------------------------------------------ | |
| def _check_done(self, st: PyreState) -> bool: | |
| if not st.agent_alive: | |
| return True | |
| if st.agent_evacuated: | |
| return True | |
| if st.step_count >= st.max_steps: | |
| return True | |
| return False | |
| # ------------------------------------------------------------------ | |
| # Reward | |
| # ------------------------------------------------------------------ | |
| def _compute_reward( | |
| self, | |
| action: str, | |
| target_id: Optional[str], | |
| door_state: Optional[str], | |
| prev_agent_x: int, | |
| prev_agent_y: int, | |
| health_damage: float, | |
| is_new_cell: bool, | |
| st: PyreState, | |
| done: bool, | |
| ) -> float: | |
| kwargs = dict( | |
| action=action, | |
| target_id=target_id, | |
| door_state=door_state, | |
| prev_agent_x=prev_agent_x, | |
| prev_agent_y=prev_agent_y, | |
| agent_x=st.agent_x, | |
| agent_y=st.agent_y, | |
| exit_positions=st.exit_positions, | |
| cell_grid=st.cell_grid, | |
| fire_grid=st.fire_grid, | |
| smoke_grid=st.smoke_grid, | |
| w=st.grid_w, | |
| h=st.grid_h, | |
| door_registry=st.door_registry, | |
| done=done, | |
| agent_evacuated=st.agent_evacuated, | |
| agent_alive=st.agent_alive, | |
| agent_health=st.agent_health, | |
| health_damage=health_damage, | |
| remaining_steps=max(0, st.max_steps - st.step_count), | |
| is_new_cell=is_new_cell, | |
| min_exit_dist_reached=self._min_exit_dist_reached, | |
| rewarded_doors=self._rewarded_doors, | |
| ) | |
| total = 0.0 | |
| for rubric in self._per_step_rubrics: | |
| total += rubric.score(**kwargs) | |
| if done: | |
| for rubric in self._episode_rubrics: | |
| total += rubric.score(**kwargs) | |
| return round(total, 4) | |
| # ------------------------------------------------------------------ | |
| # Map state builder | |
| # ------------------------------------------------------------------ | |
| def _build_map_state(self, st: PyreState) -> PyreMapState: | |
| """Assemble the full numerical grid snapshot for UI / visualization.""" | |
| if st.agent_alive and not st.agent_evacuated: | |
| visible = compute_visible_cells( | |
| st.agent_x, st.agent_y, | |
| st.cell_grid, st.smoke_grid, | |
| st.grid_w, st.grid_h, | |
| ) | |
| visible_cells = [[x, y] for x, y in sorted(visible)] | |
| else: | |
| visible_cells = [] | |
| return PyreMapState( | |
| grid_w=st.grid_w, | |
| grid_h=st.grid_h, | |
| template_name=st.template_name, | |
| episode_id=st.episode_id or "", | |
| step_count=st.step_count, | |
| max_steps=st.max_steps, | |
| cell_grid=list(st.cell_grid), | |
| fire_grid=list(st.fire_grid), | |
| smoke_grid=list(st.smoke_grid), | |
| agent_x=st.agent_x, | |
| agent_y=st.agent_y, | |
| agent_alive=st.agent_alive, | |
| agent_evacuated=st.agent_evacuated, | |
| agent_health=st.agent_health, | |
| visible_cells=visible_cells, | |
| exit_positions=list(st.exit_positions), | |
| door_registry=dict(st.door_registry), | |
| fire_spread_rate=st.fire_spread_rate, | |
| wind_dir=st.wind_dir, | |
| humidity=st.humidity, | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Defaults | |
| # ------------------------------------------------------------------ | |
| def _make_default_state(self) -> PyreState: | |
| return PyreState.model_construct( | |
| episode_id="", | |
| step_count=0, | |
| grid_w=16, | |
| grid_h=16, | |
| template_name="", | |
| cell_grid=[0] * 256, | |
| fire_grid=[0.0] * 256, | |
| smoke_grid=[0.0] * 256, | |
| burn_timers=[0] * 256, | |
| exit_positions=[], | |
| door_registry={}, | |
| zone_map={}, | |
| agent_x=0, | |
| agent_y=0, | |
| agent_alive=True, | |
| agent_evacuated=False, | |
| agent_health=100.0, | |
| max_steps=self.max_steps, | |
| fire_seed=0, | |
| fire_sources_count=2, | |
| fire_spread_rate=0.25, | |
| wind_dir="CALM", | |
| humidity=0.25, | |
| ) | |