""" PyreEnvironment — single-agent crisis navigation environment. Orchestrates: - Floor plan generation (floor_plan.py) - Fire/smoke dynamics with per-episode variability (fire_sim.py) - Narrative observation rendering (narrative.py) - Composite reward rubrics (rubrics.py) Per-episode randomization (makes each episode unique): - Template selected in rotation - Number of fire ignition sources: varies by difficulty - Fire spread rate: varies by difficulty - Wind direction: varies by difficulty - Humidity: varies by difficulty - Agent spawn position: random from template spawn options - Fire start positions: random floor cells far from exits and agent Difficulty levels (set via the `difficulty` param in /reset): easy — 1 source, slow spread, CALM wind, high humidity, 200 max steps medium — 2–4 sources, moderate spread, any wind, moderate humidity, 150 steps (default) hard — 3–5 sources, fast spread, always windy, low humidity, 100 steps Done conditions: - Agent evacuated through an unblocked exit → success - Agent health reaches 0 (overwhelmed by smoke/fire) → failure - step_count >= max_steps → timeout """ import os import random import uuid from typing import Any, Dict, List, Optional, Tuple from openenv.core.env_server.interfaces import Environment try: from ..models import PyreAction, PyreMapState, PyreObservation, PyreState except (ImportError, ModuleNotFoundError): from models import PyreAction, PyreMapState, PyreObservation, PyreState from .fire_sim import FireSim, FIRE_BURNING, smoke_level_label, WIND_DIRS from .floor_plan import generate_episode, generate_procedural_floor_plan, template_names from .narrative import build_look_result, build_narrative_observation, compute_visible_cells from .rubrics import make_per_step_rubrics, make_episode_end_rubrics, bfs_exit_dist, unblocked_exits, BFS_INF # Cell type constants FLOOR = 0 WALL = 1 DOOR_OPEN = 2 DOOR_CLOSED = 3 EXIT = 4 OBSTACLE = 5 _CARDINAL_DELTA = {"north": (0, -1), "south": (0, 1), "west": (-1, 0), "east": (1, 0)} # Exit is blocked if fire intensity at exit cell exceeds this EXIT_FIRE_THRESHOLD = 0.5 # Health damage per step (applied after each step) DAMAGE_LIGHT_SMOKE = 0.5 DAMAGE_MODERATE_SMOKE = 2.0 DAMAGE_HEAVY_SMOKE = 5.0 DAMAGE_ON_FIRE = 10.0 # --------------------------------------------------------------------------- # Difficulty presets — override fire randomisation ranges per episode # --------------------------------------------------------------------------- # Each preset is a dict of kwargs passed to reset(); ranges are (min, max) tuples. _DIFFICULTY_PRESETS: Dict[str, Dict] = { "easy": { "n_sources_range": (1, 1), "p_spread_range": (0.10, 0.20), "humidity_range": (0.30, 0.50), "wind_choices": ["CALM"], "max_steps_override": 200, # Map layout: fixed 16×16 templates (stable topology aids early learning) "use_procedural": False, "grid_dims": (16, 16), "n_rooms_range": None, }, "medium": { "n_sources_range": (2, 4), "p_spread_range": (0.15, 0.40), "humidity_range": (0.10, 0.45), "wind_choices": list(WIND_DIRS.keys()), "max_steps_override": None, # use env default # Map layout: fixed 16×16 templates (same 3 buildings, fire varies) "use_procedural": False, "grid_dims": (16, 16), "n_rooms_range": None, }, "hard": { "n_sources_range": (3, 5), "p_spread_range": (0.30, 0.55), "humidity_range": (0.05, 0.20), "wind_choices": [d for d in WIND_DIRS.keys() if d != "CALM"], "max_steps_override": 100, # Map layout: procedurally generated 20×24 building — every episode unique "use_procedural": True, "grid_dims": (20, 24), "n_rooms_range": (6, 10), }, } def _idx(x: int, y: int, w: int) -> int: return y * w + x def _in_bounds(x: int, y: int, w: int, h: int) -> bool: return 0 <= x < w and 0 <= y < h def _manhattan(x1, y1, x2, y2): return abs(x1 - x2) + abs(y1 - y2) class PyreEnvironment(Environment): """First-person crisis navigation environment — single agent. The agent is inside a burning building with partial observability. It must navigate to safety before its health depletes. Fire behaviour (spread rate, wind direction, ignition count) varies each episode, forcing the agent to reason from observations rather than memorising fixed patterns. """ SUPPORTS_CONCURRENT_SESSIONS: bool = True def __init__( self, max_steps: int = 150, base_seed: int = 42, ): super().__init__() self.max_steps = int(os.environ.get("PYRE_MAX_STEPS", max_steps)) self.base_seed = int(os.environ.get("PYRE_SEED", base_seed)) self._state: Optional[PyreState] = None self._fire_sim: Optional[FireSim] = None self._rng: Optional[random.Random] = None self._per_step_rubrics = make_per_step_rubrics() self._episode_rubrics = make_episode_end_rubrics() self._episode_counter = 0 self._last_feedback = "" # Episode-scoped reward tracking (reset each episode) self._visited_cells: set = set() self._min_exit_dist_reached: int = BFS_INF self._rewarded_doors: set = set() # ------------------------------------------------------------------ # OpenEnv API # ------------------------------------------------------------------ def reset(self, seed: Optional[int] = None, difficulty: str = "medium", **kwargs) -> PyreObservation: """Start a new episode with fresh randomized fire parameters. Args: seed: Optional integer seed for full reproducibility. difficulty: "easy" | "medium" | "hard" — scales fire behaviour and episode length. Pass via POST /reset body: {"difficulty": "easy"} """ fire_seed = seed if seed is not None else (self.base_seed + self._episode_counter * 37) self._episode_counter += 1 self._rng = random.Random(fire_seed) # --- Resolve difficulty preset --- preset = _DIFFICULTY_PRESETS.get(difficulty.lower(), _DIFFICULTY_PRESETS["medium"]) n_min, n_max = preset["n_sources_range"] sp_min, sp_max = preset["p_spread_range"] hm_min, hm_max = preset["humidity_range"] wind_pool = preset["wind_choices"] effective_max_steps = preset["max_steps_override"] or self.max_steps # --- Randomize fire behaviour for this episode --- n_sources = self._rng.randint(n_min, n_max) p_spread = round(self._rng.uniform(sp_min, sp_max), 3) humidity = round(self._rng.uniform(hm_min, hm_max), 3) wind_dir = self._rng.choice(wind_pool) # --- Floor plan --- if preset["use_procedural"]: pw, ph = preset["grid_dims"] floor_plan = generate_procedural_floor_plan( w=pw, h=ph, rng=self._rng, n_rooms_range=preset["n_rooms_range"], ) agent_start = self._rng.choice(floor_plan.agent_spawn_options) # Randomise some doors closed at episode start (same 30% rule as generate_episode) for dpos in floor_plan.door_positions: if self._rng.random() < 0.3: floor_plan.cell_grid[_idx(dpos[0], dpos[1], pw)] = DOOR_CLOSED template_name = floor_plan.name else: templates = template_names() template_name = templates[self._episode_counter % len(templates)] floor_plan, _, _, agent_start = generate_episode( template_name, npc_count=0, seed=fire_seed ) w, h = floor_plan.w, floor_plan.h n_cells = w * h fire_grid = [0.0] * n_cells smoke_grid = [0.0] * n_cells burn_timers = [0] * n_cells # --- Place multiple fire sources --- floor_cells = [ (x, y) for y in range(h) for x in range(w) if floor_plan.cell_grid[_idx(x, y, w)] == FLOOR ] fire_candidates = [ pos for pos in floor_cells if all( _manhattan(pos[0], pos[1], ex[0], ex[1]) >= floor_plan.fire_min_exit_dist for ex in floor_plan.exit_positions ) and _manhattan(pos[0], pos[1], agent_start[0], agent_start[1]) >= 4 ] if not fire_candidates: fire_candidates = [ pos for pos in floor_cells if pos != agent_start and pos not in [(e[0], e[1]) for e in floor_plan.exit_positions] ] n_sources = min(n_sources, len(fire_candidates)) self._rng.shuffle(fire_candidates) fire_sources = fire_candidates[:n_sources] for fx, fy in fire_sources: # Start smoldering (0.1) so the agent has a few steps to observe # before fire reaches spreading intensity (>= 0.3). fire_grid[_idx(fx, fy, w)] = 0.1 # --- Door registry --- door_registry: Dict[str, List[int]] = {} for j, (dx, dy) in enumerate(floor_plan.door_positions): door_registry[f"door_{j + 1}"] = [dx, dy] self._last_feedback = "Episode started. Assess your surroundings." self._difficulty = difficulty.lower() self._state = PyreState.model_construct( episode_id=str(uuid.uuid4()), step_count=0, grid_w=w, grid_h=h, template_name=template_name, cell_grid=floor_plan.cell_grid, fire_grid=fire_grid, smoke_grid=smoke_grid, burn_timers=burn_timers, exit_positions=[[ex[0], ex[1]] for ex in floor_plan.exit_positions], door_registry=door_registry, zone_map=floor_plan.zone_map, agent_x=agent_start[0], agent_y=agent_start[1], agent_alive=True, agent_evacuated=False, agent_health=100.0, max_steps=effective_max_steps, fire_seed=fire_seed, fire_sources_count=n_sources, fire_spread_rate=p_spread, wind_dir=wind_dir, humidity=humidity, ) # Reset episode-scoped reward tracking self._visited_cells = {(self._state.agent_x, self._state.agent_y)} self._min_exit_dist_reached = BFS_INF self._rewarded_doors = set() self._fire_sim = FireSim( w=w, h=h, rng=self._rng, p_spread=p_spread, wind_dir=wind_dir, humidity=humidity, fuel_map=floor_plan.fuel_map, ventilation_map=floor_plan.ventilation_map, ) obs_data = build_narrative_observation( step_count=0, agent_x=agent_start[0], agent_y=agent_start[1], agent_alive=True, agent_evacuated=False, agent_health=100.0, cell_grid=floor_plan.cell_grid, fire_grid=fire_grid, smoke_grid=smoke_grid, exit_positions=[[ex[0], ex[1]] for ex in floor_plan.exit_positions], door_registry=door_registry, zone_map=floor_plan.zone_map, last_action_feedback=self._last_feedback, wind_dir=wind_dir, w=w, h=h, ) obs_data["map_state"] = self._build_map_state(self._state) return PyreObservation(**obs_data) def step(self, action: PyreAction, **kwargs) -> PyreObservation: """Execute one action, advance simulation, return observation + reward.""" st = self._state if st is None: st = self._make_default_state() prev_agent_x = st.agent_x prev_agent_y = st.agent_y prev_health = st.agent_health # --- Execute action --- feedback = self._execute_action(action, st) self._last_feedback = feedback # --- Check self-evacuation (must be unblocked exit) --- if st.agent_alive and not st.agent_evacuated: agent_cell = st.cell_grid[_idx(st.agent_x, st.agent_y, st.grid_w)] if agent_cell == EXIT: fire_at_exit = st.fire_grid[_idx(st.agent_x, st.agent_y, st.grid_w)] if fire_at_exit < EXIT_FIRE_THRESHOLD: st.agent_evacuated = True feedback = "You step through the exit and escape the building!" self._last_feedback = feedback else: feedback = "The exit is engulfed in flames — you can't get through!" self._last_feedback = feedback # --- Advance fire simulation --- self._fire_sim.step(st.cell_grid, st.fire_grid, st.smoke_grid, st.burn_timers) # --- Apply health damage from smoke/fire --- health_damage = 0.0 if st.agent_alive and not st.agent_evacuated: ai = _idx(st.agent_x, st.agent_y, st.grid_w) smoke = st.smoke_grid[ai] fire = st.fire_grid[ai] smoke_label = smoke_level_label(smoke) if smoke_label == "heavy": health_damage += DAMAGE_HEAVY_SMOKE elif smoke_label == "moderate": health_damage += DAMAGE_MODERATE_SMOKE elif smoke_label == "light": health_damage += DAMAGE_LIGHT_SMOKE if fire >= FIRE_BURNING: health_damage += DAMAGE_ON_FIRE st.agent_health = max(0.0, st.agent_health - health_damage) if st.agent_health <= 0: st.agent_alive = False self._last_feedback = "You collapse — overwhelmed by fire and smoke." st.step_count += 1 # --- Episode-scoped reward tracking --- # Detect first visit to current cell (before adding to visited set) is_new_cell = (st.agent_x, st.agent_y) not in self._visited_cells self._visited_cells.add((st.agent_x, st.agent_y)) # Track closest approach to any exit for NearMissBonus _exits = unblocked_exits(st.exit_positions, st.fire_grid, st.grid_w) if not _exits: _exits = st.exit_positions _cur_dist = bfs_exit_dist(st.agent_x, st.agent_y, _exits, st.cell_grid, st.grid_w, st.grid_h) if _cur_dist < self._min_exit_dist_reached: self._min_exit_dist_reached = _cur_dist # --- Done check --- done = self._check_done(st) # --- Reward --- reward = self._compute_reward( action=action.action, target_id=action.target_id, door_state=action.door_state, prev_agent_x=prev_agent_x, prev_agent_y=prev_agent_y, health_damage=health_damage, is_new_cell=is_new_cell, st=st, done=done, ) # --- Build observation --- obs_data = build_narrative_observation( step_count=st.step_count, agent_x=st.agent_x, agent_y=st.agent_y, agent_alive=st.agent_alive, agent_evacuated=st.agent_evacuated, agent_health=st.agent_health, cell_grid=st.cell_grid, fire_grid=st.fire_grid, smoke_grid=st.smoke_grid, exit_positions=st.exit_positions, door_registry=st.door_registry, zone_map=st.zone_map, last_action_feedback=self._last_feedback, wind_dir=st.wind_dir, w=st.grid_w, h=st.grid_h, ) obs_data["done"] = done obs_data["reward"] = reward obs_data["metadata"] = { "agent_health": st.agent_health, "step": st.step_count, "wind_dir": st.wind_dir, "fire_spread_rate": st.fire_spread_rate, "fire_sources": st.fire_sources_count, "humidity": st.humidity, "difficulty": getattr(self, "_difficulty", "medium"), } obs_data["map_state"] = self._build_map_state(st) return PyreObservation(**obs_data) @property def state(self) -> PyreState: if self._state is None: self._state = self._make_default_state() return self._state # ------------------------------------------------------------------ # Action execution # ------------------------------------------------------------------ def _execute_action(self, action: PyreAction, st: PyreState) -> str: act = action.action.strip().lower() if act == "move": return self._action_move(action, st) elif act == "door": return self._action_door(action, st) elif act == "look": return self._action_look(action, st) elif act == "wait": return "You wait and listen to the building." else: return f"Unknown action '{act}'. Nothing happened." def _action_move(self, action: PyreAction, st: PyreState) -> str: direction = (action.direction or "").lower() delta = _CARDINAL_DELTA.get(direction) if delta is None: return f"Invalid direction '{direction}'." nx, ny = st.agent_x + delta[0], st.agent_y + delta[1] if not _in_bounds(nx, ny, st.grid_w, st.grid_h): return "You walk into the outer wall — blocked." ct = st.cell_grid[_idx(nx, ny, st.grid_w)] if ct in (WALL, OBSTACLE): return "Blocked by wall or debris." if ct == DOOR_CLOSED: return f"The door to the {direction} is closed. Open it first." st.agent_x = nx st.agent_y = ny smoke = st.smoke_grid[_idx(nx, ny, st.grid_w)] fire = st.fire_grid[_idx(nx, ny, st.grid_w)] suffix = "" if smoke > 0.5: suffix = " The smoke is thick here." if fire > 0.1: suffix += " You feel intense heat." return f"You move {direction}.{suffix}" def _action_look(self, action: PyreAction, st: PyreState) -> str: direction = (action.direction or "").strip().lower() if not direction: return "look requires a direction: north, south, east, or west." return build_look_result( direction=direction, agent_x=st.agent_x, agent_y=st.agent_y, cell_grid=st.cell_grid, fire_grid=st.fire_grid, smoke_grid=st.smoke_grid, zone_map=st.zone_map, door_registry=st.door_registry, w=st.grid_w, h=st.grid_h, ) def _action_door(self, action: PyreAction, st: PyreState) -> str: target_id = action.target_id door_state = (action.door_state or "").strip().lower() if not target_id: return "door requires a target_id (door ID)." if door_state not in ("open", "close"): return "door requires door_state='open' or door_state='close'." if target_id not in st.door_registry: return f"Door '{target_id}' not found." dx, dy = st.door_registry[target_id] if _manhattan(st.agent_x, st.agent_y, dx, dy) > 2: return f"Door '{target_id}' is too far away." ct = st.cell_grid[_idx(dx, dy, st.grid_w)] if ct not in (DOOR_OPEN, DOOR_CLOSED): return f"'{target_id}' is not a door." if door_state == "close": if ct == DOOR_CLOSED: return f"Door '{target_id}' is already closed." st.cell_grid[_idx(dx, dy, st.grid_w)] = DOOR_CLOSED return f"You close door '{target_id}'. It may slow the fire." else: if ct == DOOR_OPEN: return f"Door '{target_id}' is already open." st.cell_grid[_idx(dx, dy, st.grid_w)] = DOOR_OPEN return f"You open door '{target_id}'." # ------------------------------------------------------------------ # Done check # ------------------------------------------------------------------ def _check_done(self, st: PyreState) -> bool: if not st.agent_alive: return True if st.agent_evacuated: return True if st.step_count >= st.max_steps: return True return False # ------------------------------------------------------------------ # Reward # ------------------------------------------------------------------ def _compute_reward( self, action: str, target_id: Optional[str], door_state: Optional[str], prev_agent_x: int, prev_agent_y: int, health_damage: float, is_new_cell: bool, st: PyreState, done: bool, ) -> float: kwargs = dict( action=action, target_id=target_id, door_state=door_state, prev_agent_x=prev_agent_x, prev_agent_y=prev_agent_y, agent_x=st.agent_x, agent_y=st.agent_y, exit_positions=st.exit_positions, cell_grid=st.cell_grid, fire_grid=st.fire_grid, smoke_grid=st.smoke_grid, w=st.grid_w, h=st.grid_h, door_registry=st.door_registry, done=done, agent_evacuated=st.agent_evacuated, agent_alive=st.agent_alive, agent_health=st.agent_health, health_damage=health_damage, remaining_steps=max(0, st.max_steps - st.step_count), is_new_cell=is_new_cell, min_exit_dist_reached=self._min_exit_dist_reached, rewarded_doors=self._rewarded_doors, ) total = 0.0 for rubric in self._per_step_rubrics: total += rubric.score(**kwargs) if done: for rubric in self._episode_rubrics: total += rubric.score(**kwargs) return round(total, 4) # ------------------------------------------------------------------ # Map state builder # ------------------------------------------------------------------ def _build_map_state(self, st: PyreState) -> PyreMapState: """Assemble the full numerical grid snapshot for UI / visualization.""" if st.agent_alive and not st.agent_evacuated: visible = compute_visible_cells( st.agent_x, st.agent_y, st.cell_grid, st.smoke_grid, st.grid_w, st.grid_h, ) visible_cells = [[x, y] for x, y in sorted(visible)] else: visible_cells = [] return PyreMapState( grid_w=st.grid_w, grid_h=st.grid_h, template_name=st.template_name, episode_id=st.episode_id or "", step_count=st.step_count, max_steps=st.max_steps, cell_grid=list(st.cell_grid), fire_grid=list(st.fire_grid), smoke_grid=list(st.smoke_grid), agent_x=st.agent_x, agent_y=st.agent_y, agent_alive=st.agent_alive, agent_evacuated=st.agent_evacuated, agent_health=st.agent_health, visible_cells=visible_cells, exit_positions=list(st.exit_positions), door_registry=dict(st.door_registry), fire_spread_rate=st.fire_spread_rate, wind_dir=st.wind_dir, humidity=st.humidity, ) # ------------------------------------------------------------------ # Defaults # ------------------------------------------------------------------ def _make_default_state(self) -> PyreState: return PyreState.model_construct( episode_id="", step_count=0, grid_w=16, grid_h=16, template_name="", cell_grid=[0] * 256, fire_grid=[0.0] * 256, smoke_grid=[0.0] * 256, burn_timers=[0] * 256, exit_positions=[], door_registry={}, zone_map={}, agent_x=0, agent_y=0, agent_alive=True, agent_evacuated=False, agent_health=100.0, max_steps=self.max_steps, fire_seed=0, fire_sources_count=2, fire_spread_rate=0.25, wind_dir="CALM", humidity=0.25, )