Spaces:
Sleeping
Sleeping
| """ | |
| Fire and smoke simulation for Pyre. | |
| Cellular automaton model with per-episode variability: | |
| - Variable number of ignition sources (2–4) | |
| - Variable spread rate (p_spread) | |
| - Wind direction (8 directions + CALM) biases spread: 2× downwind, 0.5× upwind | |
| - Humidity suppresses ignition probability | |
| - Closed doors reduce spread to ~15% of normal | |
| - Walls are completely impassable to fire | |
| - Smoke propagates faster than fire, weakly through doors | |
| - Burning cells accumulate a timer; after BURNOUT_TICKS they become obstacles | |
| - Per-cell fuel_map scales ignition probability and intensity gain (office rooms burn faster) | |
| - Per-cell ventilation_map replaces the global SMOKE_DECAY constant (open areas clear faster) | |
| Wind directions (borrowed from wildfire reference): | |
| N, NE, E, SE, S, SW, W, NW, CALM | |
| """ | |
| import random | |
| from typing import List, Optional, Tuple | |
| # Cell type constants (mirrors models.py) | |
| FLOOR = 0 | |
| WALL = 1 | |
| DOOR_OPEN = 2 | |
| DOOR_CLOSED = 3 | |
| EXIT = 4 | |
| OBSTACLE = 5 | |
| # Fire intensity thresholds | |
| FIRE_IGNITION = 0.1 | |
| FIRE_BURNING = 0.3 | |
| FIRE_INTENSITY_GAIN = 0.15 | |
| BURNOUT_TICKS = 5 | |
| # Door fire reduction factor | |
| DOOR_CLOSED_FIRE_FACTOR = 0.15 | |
| # Smoke parameters | |
| SMOKE_SPREAD_RATE = 0.20 | |
| SMOKE_DOOR_FACTOR = 0.4 | |
| SMOKE_DECAY = 0.02 | |
| # Smoke level thresholds | |
| SMOKE_NONE = 0.2 | |
| SMOKE_LIGHT = 0.5 | |
| SMOKE_MODERATE = 0.8 | |
| # Fire intensity at which an exit cell is considered blocked | |
| EXIT_BLOCKED_FIRE_THRESHOLD = 0.5 | |
| # Wind direction vectors (dx, dy in grid coords — positive y = south) | |
| WIND_DIRS = { | |
| "N": (0, -1), | |
| "NE": (1, -1), | |
| "E": (1, 0), | |
| "SE": (1, 1), | |
| "S": (0, 1), | |
| "SW": (-1, 1), | |
| "W": (-1, 0), | |
| "NW": (-1, -1), | |
| "CALM": (0, 0), | |
| } | |
| _CARDINAL = [(0, -1), (0, 1), (-1, 0), (1, 0)] # N, S, W, E | |
| def smoke_level_label(density: float) -> str: | |
| if density < SMOKE_NONE: | |
| return "none" | |
| if density < SMOKE_LIGHT: | |
| return "light" | |
| if density < SMOKE_MODERATE: | |
| return "moderate" | |
| return "heavy" | |
| def _idx(x: int, y: int, w: int) -> int: | |
| return y * w + x | |
| def _in_bounds(x: int, y: int, w: int, h: int) -> bool: | |
| return 0 <= x < w and 0 <= y < h | |
| def _wind_multiplier(dx: int, dy: int, wind_x: int, wind_y: int) -> float: | |
| """Return spread multiplier based on direction relative to wind. | |
| Downwind (dot > 0) → 2×, upwind (dot < 0) → 0.5×, crosswind → 1×. | |
| For diagonal wind components each cardinal direction gets a partial boost. | |
| """ | |
| if wind_x == 0 and wind_y == 0: | |
| return 1.0 | |
| dot = dx * wind_x + dy * wind_y | |
| if dot > 0: | |
| return 2.0 | |
| elif dot < 0: | |
| return 0.5 | |
| else: | |
| return 1.0 | |
| class FireSim: | |
| """Cellular automaton for fire and smoke dynamics. | |
| All variable parameters are set at construction time so each episode | |
| gets its own FireSim instance with unique fire behaviour. | |
| """ | |
| def __init__( | |
| self, | |
| w: int, | |
| h: int, | |
| rng: random.Random, | |
| p_spread: float = 0.25, | |
| wind_dir: str = "CALM", | |
| humidity: float = 0.25, | |
| burnout_ticks: int = BURNOUT_TICKS, | |
| fuel_map: Optional[List[float]] = None, | |
| ventilation_map: Optional[List[float]] = None, | |
| ): | |
| self.w = w | |
| self.h = h | |
| self.rng = rng | |
| self.p_spread = p_spread | |
| self.wind_dir = wind_dir | |
| self.humidity = humidity | |
| self.burnout_ticks = burnout_ticks | |
| # None → uniform fuel and ventilation (backward-compatible) | |
| self._fuel_map = fuel_map | |
| self._ventilation_map = ventilation_map | |
| wind_vec = WIND_DIRS.get(wind_dir, (0, 0)) | |
| self._wind_x = wind_vec[0] | |
| self._wind_y = wind_vec[1] | |
| # Humidity suppresses ignition: effective spread = p_spread × (1 - humidity) | |
| self._effective_spread = p_spread * max(0.0, 1.0 - humidity) | |
| # ------------------------------------------------------------------ | |
| # Public API | |
| # ------------------------------------------------------------------ | |
| def step( | |
| self, | |
| cell_grid: List[int], | |
| fire_grid: List[float], | |
| smoke_grid: List[float], | |
| burn_timers: List[int], | |
| ) -> List[Tuple[int, int]]: | |
| """Advance fire and smoke by one step. | |
| Mutates fire_grid, smoke_grid, burn_timers in place. | |
| May mutate cell_grid (burned-out cells become obstacles). | |
| Returns list of (x, y) cells that burned out this step. | |
| """ | |
| w, h = self.w, self.h | |
| burned_out: List[Tuple[int, int]] = [] | |
| # --- Phase 1: Compute fire ignitions --- | |
| ignite: List[bool] = [False] * (w * h) | |
| for y in range(h): | |
| for x in range(w): | |
| i = _idx(x, y, w) | |
| ct = cell_grid[i] | |
| if fire_grid[i] < FIRE_BURNING: | |
| continue | |
| for dx, dy in _CARDINAL: | |
| nx, ny = x + dx, y + dy | |
| if not _in_bounds(nx, ny, w, h): | |
| continue | |
| ni = _idx(nx, ny, w) | |
| nct = cell_grid[ni] | |
| if nct in (WALL, OBSTACLE): | |
| continue | |
| if fire_grid[ni] > 0: | |
| continue | |
| # Base spread probability | |
| if nct == DOOR_CLOSED: | |
| p = self._effective_spread * DOOR_CLOSED_FIRE_FACTOR | |
| else: | |
| p = self._effective_spread | |
| # Wind multiplier | |
| p *= _wind_multiplier(dx, dy, self._wind_x, self._wind_y) | |
| # Fuel in the target cell scales ignition probability | |
| if self._fuel_map is not None: | |
| p *= self._fuel_map[ni] | |
| p = min(1.0, p) | |
| if self.rng.random() < p: | |
| ignite[ni] = True | |
| # --- Phase 2: Apply ignitions and advance existing fire --- | |
| new_fire = fire_grid[:] | |
| new_burn_timers = burn_timers[:] | |
| for y in range(h): | |
| for x in range(w): | |
| i = _idx(x, y, w) | |
| ct = cell_grid[i] | |
| if ct in (WALL, OBSTACLE): | |
| continue | |
| if fire_grid[i] > 0: | |
| intensity_gain = FIRE_INTENSITY_GAIN | |
| if self._fuel_map is not None: | |
| intensity_gain *= self._fuel_map[i] | |
| new_fire[i] = min(1.0, fire_grid[i] + intensity_gain) | |
| if fire_grid[i] >= FIRE_BURNING: | |
| new_burn_timers[i] = burn_timers[i] + 1 | |
| if new_burn_timers[i] >= self.burnout_ticks and new_fire[i] >= 1.0: | |
| cell_grid[i] = OBSTACLE | |
| new_fire[i] = 0.0 | |
| new_burn_timers[i] = 0 | |
| burned_out.append((x, y)) | |
| elif ignite[i]: | |
| new_fire[i] = FIRE_IGNITION | |
| new_burn_timers[i] = 0 | |
| fire_grid[:] = new_fire | |
| burn_timers[:] = new_burn_timers | |
| # --- Phase 3: Smoke spread --- | |
| self._spread_smoke(cell_grid, fire_grid, smoke_grid) | |
| return burned_out | |
| # ------------------------------------------------------------------ | |
| # Private | |
| # ------------------------------------------------------------------ | |
| def _spread_smoke( | |
| self, | |
| cell_grid: List[int], | |
| fire_grid: List[float], | |
| smoke_grid: List[float], | |
| ) -> None: | |
| w, h = self.w, self.h | |
| new_smoke = smoke_grid[:] | |
| for y in range(h): | |
| for x in range(w): | |
| i = _idx(x, y, w) | |
| ct = cell_grid[i] | |
| if ct in (WALL, OBSTACLE): | |
| continue | |
| if fire_grid[i] >= FIRE_BURNING: | |
| new_smoke[i] = min(1.0, smoke_grid[i] + 0.3) | |
| for dx, dy in _CARDINAL: | |
| nx, ny = x + dx, y + dy | |
| if not _in_bounds(nx, ny, w, h): | |
| continue | |
| ni = _idx(nx, ny, w) | |
| nct = cell_grid[ni] | |
| if nct in (WALL, OBSTACLE): | |
| continue | |
| if smoke_grid[i] > smoke_grid[ni]: | |
| diff = smoke_grid[i] - smoke_grid[ni] | |
| rate = SMOKE_SPREAD_RATE | |
| if nct == DOOR_CLOSED: | |
| rate *= SMOKE_DOOR_FACTOR | |
| transfer = min(diff * rate, diff * 0.5) | |
| new_smoke[ni] = min(1.0, new_smoke[ni] + transfer) | |
| decay = ( | |
| self._ventilation_map[i] | |
| if self._ventilation_map is not None | |
| else SMOKE_DECAY | |
| ) | |
| new_smoke[i] = max(0.0, new_smoke[i] - decay) | |
| smoke_grid[:] = new_smoke | |