| """ |
| Fire spread engine for the Wildfire Containment Simulator. |
| |
| Implements a Rothermel-inspired cellular automaton where each burning cell |
| attempts to ignite its 8 neighbors based on fuel, wind, slope, moisture, |
| and suppression factors. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import math |
|
|
| import numpy as np |
|
|
| from .models import FireState, FuelType |
| from .grid import Grid |
|
|
|
|
| |
| BASE_RATES: dict[FuelType, float] = { |
| FuelType.GRASS: 0.25, |
| FuelType.SHRUB: 0.18, |
| FuelType.TIMBER: 0.12, |
| FuelType.URBAN: 0.12, |
| FuelType.WATER: 0.0, |
| FuelType.ROAD: 0.0, |
| } |
|
|
| |
| BURN_DURATION: dict[FuelType, int] = { |
| FuelType.GRASS: 4, |
| FuelType.SHRUB: 6, |
| FuelType.TIMBER: 10, |
| FuelType.URBAN: 8, |
| FuelType.WATER: 0, |
| FuelType.ROAD: 0, |
| } |
|
|
| |
| URBAN_INTENSITY_MULT = 2.0 |
| URBAN_IGNITION_MULT = 0.5 |
|
|
| |
| NEIGHBORS = [ |
| (-1, -1), (-1, 0), (-1, 1), |
| (0, -1), (0, 1), |
| (1, -1), (1, 0), (1, 1), |
| ] |
|
|
|
|
| class FireSpreadEngine: |
| """ |
| Manages fire propagation across the grid each simulation step. |
| |
| The spread model computes per-neighbor ignition probabilities using: |
| P(ignite) = base_rate * fuel_factor * wind_factor * slope_factor |
| * (1 - moisture) * (1 - suppression) * tier_scale |
| """ |
|
|
| |
| |
| TIER_SPREAD_SCALE = { |
| "easy": 1.0, |
| "medium": 0.7, |
| "hard": 0.55, |
| } |
|
|
| def __init__(self, grid: Grid, rng: np.random.Generator): |
| self.grid = grid |
| self.rng = rng |
| self.cell_size_m = 100.0 |
| self.tier_scale = self.TIER_SPREAD_SCALE.get(grid.config.tier_name, 0.5) |
|
|
| def spread_step(self, wind_speed: float, wind_dir_deg: float) -> list[str]: |
| """ |
| Execute one step of fire spread. |
| |
| 1. For each BURNING cell, attempt to ignite neighbors. |
| 2. Update intensities (grow/decay). |
| 3. Transition cells that have exhausted fuel to BURNED_OUT. |
| |
| Returns a list of event strings for the observation log. |
| """ |
| events: list[str] = [] |
| grid = self.grid |
|
|
| |
| burning_cells = [] |
| for r in range(grid.rows): |
| for c in range(grid.cols): |
| if grid.dynamic_grid[r][c].fire_state == FireState.BURNING: |
| burning_cells.append((r, c)) |
|
|
| |
| new_ignitions: list[tuple[int, int, float]] = [] |
|
|
| for r, c in burning_cells: |
| source_dyn = grid.dynamic_grid[r][c] |
| source_static = grid.static_grid[r][c] |
|
|
| for dr, dc in NEIGHBORS: |
| nr, nc = r + dr, c + dc |
| if not grid._in_bounds(nr, nc): |
| continue |
|
|
| target_static = grid.static_grid[nr][nc] |
| target_dyn = grid.dynamic_grid[nr][nc] |
|
|
| |
| if target_static.fuel_type in (FuelType.WATER, FuelType.ROAD): |
| continue |
| if target_dyn.fire_state != FireState.UNBURNED: |
| continue |
|
|
| |
| prob = self._compute_ignition_prob( |
| source_r=r, source_c=c, |
| target_r=nr, target_c=nc, |
| source_intensity=source_dyn.fire_intensity, |
| wind_speed=wind_speed, |
| wind_dir_deg=wind_dir_deg, |
| ) |
|
|
| if self.rng.random() < prob: |
| |
| init_intensity = 0.2 + source_dyn.fire_intensity * 0.3 |
| new_ignitions.append((nr, nc, init_intensity)) |
|
|
| |
| for nr, nc, intensity in new_ignitions: |
| if grid.dynamic_grid[nr][nc].fire_state == FireState.UNBURNED: |
| grid.ignite_cell(nr, nc, intensity) |
| if grid.static_grid[nr][nc].is_populated: |
| pop = grid.static_grid[nr][nc].population |
| events.append(f"FIRE reached populated cell ({nr},{nc}) with {pop} people!") |
|
|
| |
| for r in range(grid.rows): |
| for c in range(grid.cols): |
| dyn = grid.dynamic_grid[r][c] |
| static = grid.static_grid[r][c] |
|
|
| if dyn.fire_state == FireState.BURNING: |
| dyn.time_burning += 1 |
| max_dur = BURN_DURATION.get(static.fuel_type, 6) |
|
|
| |
| peak_step = max_dur // 3 |
| if dyn.time_burning <= peak_step: |
| |
| growth = 0.15 * static.fuel_load |
| if static.fuel_type == FuelType.URBAN: |
| growth *= URBAN_INTENSITY_MULT |
| dyn.fire_intensity = min(1.0, dyn.fire_intensity + growth) |
| elif dyn.time_burning <= 2 * peak_step: |
| |
| pass |
| else: |
| |
| decay = 0.1 |
| dyn.fire_intensity = max(0.05, dyn.fire_intensity - decay) |
|
|
| |
| if dyn.suppression_level > 0: |
| dyn.fire_intensity = max(0.0, dyn.fire_intensity - dyn.suppression_level * 0.1) |
|
|
| |
| if dyn.time_burning >= max_dur or dyn.fire_intensity <= 0.0: |
| dyn.fire_state = FireState.BURNED_OUT |
| dyn.fire_intensity = 0.0 |
| events.append(f"Cell ({r},{c}) burned out.") |
|
|
| |
| elif dyn.fire_intensity < 0.15 and dyn.time_burning > peak_step: |
| dyn.fire_state = FireState.EMBER |
|
|
| elif dyn.fire_state == FireState.EMBER: |
| dyn.time_burning += 1 |
| dyn.fire_intensity = max(0.0, dyn.fire_intensity - 0.05) |
| max_dur = BURN_DURATION.get(static.fuel_type, 6) |
| if dyn.time_burning >= max_dur + 3 or dyn.fire_intensity <= 0.0: |
| dyn.fire_state = FireState.BURNED_OUT |
| dyn.fire_intensity = 0.0 |
|
|
| if new_ignitions: |
| events.append(f"{len(new_ignitions)} new cell(s) ignited this step.") |
|
|
| return events |
|
|
| def _compute_ignition_prob( |
| self, |
| source_r: int, source_c: int, |
| target_r: int, target_c: int, |
| source_intensity: float, |
| wind_speed: float, |
| wind_dir_deg: float, |
| ) -> float: |
| """Compute probability of fire spreading from source to target cell.""" |
| target_static = self.grid.static_grid[target_r][target_c] |
| target_dyn = self.grid.dynamic_grid[target_r][target_c] |
|
|
| |
| base = BASE_RATES.get(target_static.fuel_type, 0.0) |
| if base <= 0: |
| return 0.0 |
|
|
| |
| if target_static.fuel_type == FuelType.URBAN: |
| base *= URBAN_IGNITION_MULT |
|
|
| |
| fuel_factor = target_static.fuel_load |
|
|
| |
| intensity_factor = 0.5 + source_intensity * 0.5 |
|
|
| |
| wind_factor = self._compute_wind_factor( |
| source_r, source_c, target_r, target_c, |
| wind_speed, wind_dir_deg |
| ) |
|
|
| |
| slope_factor = self._compute_slope_factor(source_r, source_c, target_r, target_c) |
|
|
| |
| moisture_factor = 1.0 - target_dyn.moisture |
|
|
| |
| suppression_factor = 1.0 - target_dyn.suppression_level |
|
|
| prob = (base * fuel_factor * intensity_factor * wind_factor |
| * slope_factor * moisture_factor * suppression_factor |
| * self.tier_scale) |
|
|
| return float(np.clip(prob, 0.0, 0.95)) |
|
|
| def _compute_wind_factor( |
| self, |
| sr: int, sc: int, tr: int, tc: int, |
| wind_speed: float, wind_dir_deg: float, |
| ) -> float: |
| """ |
| Wind factor: fire spreads faster downwind. |
| wind_dir_deg is the direction wind blows FROM (meteorological convention). |
| So fire spreads in the opposite direction. |
| """ |
| if wind_speed < 1.0: |
| return 1.0 |
|
|
| |
| dr = tr - sr |
| dc = tc - sc |
| spread_angle = math.atan2(dc, -dr) |
|
|
| |
| wind_rad = math.radians(wind_dir_deg + 180) |
|
|
| angle_diff = spread_angle - wind_rad |
| cos_diff = math.cos(angle_diff) |
|
|
| |
| factor = 1.0 + cos_diff * min(wind_speed / 40.0, 1.5) |
| return max(0.3, factor) |
|
|
| def _compute_slope_factor( |
| self, sr: int, sc: int, tr: int, tc: int |
| ) -> float: |
| """Slope factor: fire accelerates uphill.""" |
| source_elev = self.grid.static_grid[sr][sc].elevation_m |
| target_elev = self.grid.static_grid[tr][tc].elevation_m |
| elev_diff = target_elev - source_elev |
|
|
| |
| factor = 1.0 + 0.3 * max(0.0, elev_diff / self.cell_size_m) |
| |
| if elev_diff < 0: |
| factor = max(0.7, 1.0 + 0.1 * elev_diff / self.cell_size_m) |
|
|
| return float(np.clip(factor, 0.5, 2.0)) |
|
|