""" Fire and smoke simulation for Pyre. Cellular automaton model with per-episode variability: - Variable number of ignition sources (2–4) - Variable spread rate (p_spread) - Wind direction (8 directions + CALM) biases spread: 2× downwind, 0.5× upwind - Humidity suppresses ignition probability - Closed doors reduce spread to ~15% of normal - Walls are completely impassable to fire - Smoke propagates faster than fire, weakly through doors - Burning cells accumulate a timer; after BURNOUT_TICKS they become obstacles - Per-cell fuel_map scales ignition probability and intensity gain (office rooms burn faster) - Per-cell ventilation_map replaces the global SMOKE_DECAY constant (open areas clear faster) Wind directions (borrowed from wildfire reference): N, NE, E, SE, S, SW, W, NW, CALM """ import random from typing import List, Optional, Tuple # Cell type constants (mirrors models.py) FLOOR = 0 WALL = 1 DOOR_OPEN = 2 DOOR_CLOSED = 3 EXIT = 4 OBSTACLE = 5 # Fire intensity thresholds FIRE_IGNITION = 0.1 FIRE_BURNING = 0.3 FIRE_INTENSITY_GAIN = 0.15 BURNOUT_TICKS = 5 # Door fire reduction factor DOOR_CLOSED_FIRE_FACTOR = 0.15 # Smoke parameters SMOKE_SPREAD_RATE = 0.20 SMOKE_DOOR_FACTOR = 0.4 SMOKE_DECAY = 0.02 # Smoke level thresholds SMOKE_NONE = 0.2 SMOKE_LIGHT = 0.5 SMOKE_MODERATE = 0.8 # Fire intensity at which an exit cell is considered blocked EXIT_BLOCKED_FIRE_THRESHOLD = 0.5 # Wind direction vectors (dx, dy in grid coords — positive y = south) WIND_DIRS = { "N": (0, -1), "NE": (1, -1), "E": (1, 0), "SE": (1, 1), "S": (0, 1), "SW": (-1, 1), "W": (-1, 0), "NW": (-1, -1), "CALM": (0, 0), } _CARDINAL = [(0, -1), (0, 1), (-1, 0), (1, 0)] # N, S, W, E def smoke_level_label(density: float) -> str: if density < SMOKE_NONE: return "none" if density < SMOKE_LIGHT: return "light" if density < SMOKE_MODERATE: return "moderate" return "heavy" def _idx(x: int, y: int, w: int) -> int: return y * w + x def _in_bounds(x: int, y: int, w: int, h: int) -> bool: return 0 <= x < w and 0 <= y < h def _wind_multiplier(dx: int, dy: int, wind_x: int, wind_y: int) -> float: """Return spread multiplier based on direction relative to wind. Downwind (dot > 0) → 2×, upwind (dot < 0) → 0.5×, crosswind → 1×. For diagonal wind components each cardinal direction gets a partial boost. """ if wind_x == 0 and wind_y == 0: return 1.0 dot = dx * wind_x + dy * wind_y if dot > 0: return 2.0 elif dot < 0: return 0.5 else: return 1.0 class FireSim: """Cellular automaton for fire and smoke dynamics. All variable parameters are set at construction time so each episode gets its own FireSim instance with unique fire behaviour. """ def __init__( self, w: int, h: int, rng: random.Random, p_spread: float = 0.25, wind_dir: str = "CALM", humidity: float = 0.25, burnout_ticks: int = BURNOUT_TICKS, fuel_map: Optional[List[float]] = None, ventilation_map: Optional[List[float]] = None, ): self.w = w self.h = h self.rng = rng self.p_spread = p_spread self.wind_dir = wind_dir self.humidity = humidity self.burnout_ticks = burnout_ticks # None → uniform fuel and ventilation (backward-compatible) self._fuel_map = fuel_map self._ventilation_map = ventilation_map wind_vec = WIND_DIRS.get(wind_dir, (0, 0)) self._wind_x = wind_vec[0] self._wind_y = wind_vec[1] # Humidity suppresses ignition: effective spread = p_spread × (1 - humidity) self._effective_spread = p_spread * max(0.0, 1.0 - humidity) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def step( self, cell_grid: List[int], fire_grid: List[float], smoke_grid: List[float], burn_timers: List[int], ) -> List[Tuple[int, int]]: """Advance fire and smoke by one step. Mutates fire_grid, smoke_grid, burn_timers in place. May mutate cell_grid (burned-out cells become obstacles). Returns list of (x, y) cells that burned out this step. """ w, h = self.w, self.h burned_out: List[Tuple[int, int]] = [] # --- Phase 1: Compute fire ignitions --- ignite: List[bool] = [False] * (w * h) for y in range(h): for x in range(w): i = _idx(x, y, w) ct = cell_grid[i] if fire_grid[i] < FIRE_BURNING: continue for dx, dy in _CARDINAL: nx, ny = x + dx, y + dy if not _in_bounds(nx, ny, w, h): continue ni = _idx(nx, ny, w) nct = cell_grid[ni] if nct in (WALL, OBSTACLE): continue if fire_grid[ni] > 0: continue # Base spread probability if nct == DOOR_CLOSED: p = self._effective_spread * DOOR_CLOSED_FIRE_FACTOR else: p = self._effective_spread # Wind multiplier p *= _wind_multiplier(dx, dy, self._wind_x, self._wind_y) # Fuel in the target cell scales ignition probability if self._fuel_map is not None: p *= self._fuel_map[ni] p = min(1.0, p) if self.rng.random() < p: ignite[ni] = True # --- Phase 2: Apply ignitions and advance existing fire --- new_fire = fire_grid[:] new_burn_timers = burn_timers[:] for y in range(h): for x in range(w): i = _idx(x, y, w) ct = cell_grid[i] if ct in (WALL, OBSTACLE): continue if fire_grid[i] > 0: intensity_gain = FIRE_INTENSITY_GAIN if self._fuel_map is not None: intensity_gain *= self._fuel_map[i] new_fire[i] = min(1.0, fire_grid[i] + intensity_gain) if fire_grid[i] >= FIRE_BURNING: new_burn_timers[i] = burn_timers[i] + 1 if new_burn_timers[i] >= self.burnout_ticks and new_fire[i] >= 1.0: cell_grid[i] = OBSTACLE new_fire[i] = 0.0 new_burn_timers[i] = 0 burned_out.append((x, y)) elif ignite[i]: new_fire[i] = FIRE_IGNITION new_burn_timers[i] = 0 fire_grid[:] = new_fire burn_timers[:] = new_burn_timers # --- Phase 3: Smoke spread --- self._spread_smoke(cell_grid, fire_grid, smoke_grid) return burned_out # ------------------------------------------------------------------ # Private # ------------------------------------------------------------------ def _spread_smoke( self, cell_grid: List[int], fire_grid: List[float], smoke_grid: List[float], ) -> None: w, h = self.w, self.h new_smoke = smoke_grid[:] for y in range(h): for x in range(w): i = _idx(x, y, w) ct = cell_grid[i] if ct in (WALL, OBSTACLE): continue if fire_grid[i] >= FIRE_BURNING: new_smoke[i] = min(1.0, smoke_grid[i] + 0.3) for dx, dy in _CARDINAL: nx, ny = x + dx, y + dy if not _in_bounds(nx, ny, w, h): continue ni = _idx(nx, ny, w) nct = cell_grid[ni] if nct in (WALL, OBSTACLE): continue if smoke_grid[i] > smoke_grid[ni]: diff = smoke_grid[i] - smoke_grid[ni] rate = SMOKE_SPREAD_RATE if nct == DOOR_CLOSED: rate *= SMOKE_DOOR_FACTOR transfer = min(diff * rate, diff * 0.5) new_smoke[ni] = min(1.0, new_smoke[ni] + transfer) decay = ( self._ventilation_map[i] if self._ventilation_map is not None else SMOKE_DECAY ) new_smoke[i] = max(0.0, new_smoke[i] - decay) smoke_grid[:] = new_smoke