Eshit's picture
Deploy to HF Space
363abf3
"""
Fire spread engine for the Wildfire Containment Simulator.
Implements a Rothermel-inspired cellular automaton where each burning cell
attempts to ignite its 8 neighbors based on fuel, wind, slope, moisture,
and suppression factors.
"""
from __future__ import annotations
import math
import numpy as np
from .models import FireState, FuelType
from .grid import Grid
# Base ignition rates by fuel type (tuned for balanced gameplay)
BASE_RATES: dict[FuelType, float] = {
FuelType.GRASS: 0.25,
FuelType.SHRUB: 0.18,
FuelType.TIMBER: 0.12,
FuelType.URBAN: 0.12, # 0.5x effective (applied separately)
FuelType.WATER: 0.0,
FuelType.ROAD: 0.0,
}
# Burn duration (steps before burnout) by fuel type
BURN_DURATION: dict[FuelType, int] = {
FuelType.GRASS: 4,
FuelType.SHRUB: 6,
FuelType.TIMBER: 10,
FuelType.URBAN: 8,
FuelType.WATER: 0,
FuelType.ROAD: 0,
}
# Intensity multiplier for urban structures
URBAN_INTENSITY_MULT = 2.0
URBAN_IGNITION_MULT = 0.5
# 8-neighbor offsets (row_delta, col_delta)
NEIGHBORS = [
(-1, -1), (-1, 0), (-1, 1),
(0, -1), (0, 1),
(1, -1), (1, 0), (1, 1),
]
class FireSpreadEngine:
"""
Manages fire propagation across the grid each simulation step.
The spread model computes per-neighbor ignition probabilities using:
P(ignite) = base_rate * fuel_factor * wind_factor * slope_factor
* (1 - moisture) * (1 - suppression) * tier_scale
"""
# Tier-based difficulty scaling for spread rate
# Tuned so that: random agent ~0.2-0.4, heuristic ~0.6-0.8 on easy
TIER_SPREAD_SCALE = {
"easy": 1.0,
"medium": 0.7,
"hard": 0.55,
}
def __init__(self, grid: Grid, rng: np.random.Generator):
self.grid = grid
self.rng = rng
self.cell_size_m = 100.0 # Each cell represents 100m x 100m
self.tier_scale = self.TIER_SPREAD_SCALE.get(grid.config.tier_name, 0.5)
def spread_step(self, wind_speed: float, wind_dir_deg: float) -> list[str]:
"""
Execute one step of fire spread.
1. For each BURNING cell, attempt to ignite neighbors.
2. Update intensities (grow/decay).
3. Transition cells that have exhausted fuel to BURNED_OUT.
Returns a list of event strings for the observation log.
"""
events: list[str] = []
grid = self.grid
# Collect currently burning cells (snapshot to avoid iteration issues)
burning_cells = []
for r in range(grid.rows):
for c in range(grid.cols):
if grid.dynamic_grid[r][c].fire_state == FireState.BURNING:
burning_cells.append((r, c))
# Phase 1: Attempt ignition of neighbors
new_ignitions: list[tuple[int, int, float]] = []
for r, c in burning_cells:
source_dyn = grid.dynamic_grid[r][c]
source_static = grid.static_grid[r][c]
for dr, dc in NEIGHBORS:
nr, nc = r + dr, c + dc
if not grid._in_bounds(nr, nc):
continue
target_static = grid.static_grid[nr][nc]
target_dyn = grid.dynamic_grid[nr][nc]
# Skip non-ignitable cells
if target_static.fuel_type in (FuelType.WATER, FuelType.ROAD):
continue
if target_dyn.fire_state != FireState.UNBURNED:
continue
# Compute ignition probability
prob = self._compute_ignition_prob(
source_r=r, source_c=c,
target_r=nr, target_c=nc,
source_intensity=source_dyn.fire_intensity,
wind_speed=wind_speed,
wind_dir_deg=wind_dir_deg,
)
if self.rng.random() < prob:
# Initial intensity depends on source and fuel
init_intensity = 0.2 + source_dyn.fire_intensity * 0.3
new_ignitions.append((nr, nc, init_intensity))
# Apply new ignitions
for nr, nc, intensity in new_ignitions:
if grid.dynamic_grid[nr][nc].fire_state == FireState.UNBURNED:
grid.ignite_cell(nr, nc, intensity)
if grid.static_grid[nr][nc].is_populated:
pop = grid.static_grid[nr][nc].population
events.append(f"FIRE reached populated cell ({nr},{nc}) with {pop} people!")
# Phase 2: Update intensities and burn timers
for r in range(grid.rows):
for c in range(grid.cols):
dyn = grid.dynamic_grid[r][c]
static = grid.static_grid[r][c]
if dyn.fire_state == FireState.BURNING:
dyn.time_burning += 1
max_dur = BURN_DURATION.get(static.fuel_type, 6)
# Intensity curve: ramp up, peak, decay
peak_step = max_dur // 3
if dyn.time_burning <= peak_step:
# Ramp up
growth = 0.15 * static.fuel_load
if static.fuel_type == FuelType.URBAN:
growth *= URBAN_INTENSITY_MULT
dyn.fire_intensity = min(1.0, dyn.fire_intensity + growth)
elif dyn.time_burning <= 2 * peak_step:
# Peak / plateau
pass
else:
# Decay
decay = 0.1
dyn.fire_intensity = max(0.05, dyn.fire_intensity - decay)
# Apply suppression reduction
if dyn.suppression_level > 0:
dyn.fire_intensity = max(0.0, dyn.fire_intensity - dyn.suppression_level * 0.1)
# Check for burnout
if dyn.time_burning >= max_dur or dyn.fire_intensity <= 0.0:
dyn.fire_state = FireState.BURNED_OUT
dyn.fire_intensity = 0.0
events.append(f"Cell ({r},{c}) burned out.")
# Transition to ember if intensity low
elif dyn.fire_intensity < 0.15 and dyn.time_burning > peak_step:
dyn.fire_state = FireState.EMBER
elif dyn.fire_state == FireState.EMBER:
dyn.time_burning += 1
dyn.fire_intensity = max(0.0, dyn.fire_intensity - 0.05)
max_dur = BURN_DURATION.get(static.fuel_type, 6)
if dyn.time_burning >= max_dur + 3 or dyn.fire_intensity <= 0.0:
dyn.fire_state = FireState.BURNED_OUT
dyn.fire_intensity = 0.0
if new_ignitions:
events.append(f"{len(new_ignitions)} new cell(s) ignited this step.")
return events
def _compute_ignition_prob(
self,
source_r: int, source_c: int,
target_r: int, target_c: int,
source_intensity: float,
wind_speed: float,
wind_dir_deg: float,
) -> float:
"""Compute probability of fire spreading from source to target cell."""
target_static = self.grid.static_grid[target_r][target_c]
target_dyn = self.grid.dynamic_grid[target_r][target_c]
# Base rate by fuel type
base = BASE_RATES.get(target_static.fuel_type, 0.0)
if base <= 0:
return 0.0
# Urban ignition penalty
if target_static.fuel_type == FuelType.URBAN:
base *= URBAN_IGNITION_MULT
# Fuel factor
fuel_factor = target_static.fuel_load
# Source intensity factor (hotter fires spread faster)
intensity_factor = 0.5 + source_intensity * 0.5
# Wind factor
wind_factor = self._compute_wind_factor(
source_r, source_c, target_r, target_c,
wind_speed, wind_dir_deg
)
# Slope factor (fire travels uphill faster)
slope_factor = self._compute_slope_factor(source_r, source_c, target_r, target_c)
# Moisture dampening
moisture_factor = 1.0 - target_dyn.moisture
# Suppression dampening
suppression_factor = 1.0 - target_dyn.suppression_level
prob = (base * fuel_factor * intensity_factor * wind_factor
* slope_factor * moisture_factor * suppression_factor
* self.tier_scale)
return float(np.clip(prob, 0.0, 0.95)) # Cap at 95%
def _compute_wind_factor(
self,
sr: int, sc: int, tr: int, tc: int,
wind_speed: float, wind_dir_deg: float,
) -> float:
"""
Wind factor: fire spreads faster downwind.
wind_dir_deg is the direction wind blows FROM (meteorological convention).
So fire spreads in the opposite direction.
"""
if wind_speed < 1.0:
return 1.0
# Direction from source to target
dr = tr - sr
dc = tc - sc
spread_angle = math.atan2(dc, -dr) # -dr because row increases downward
# Wind blows FROM wind_dir, so fire spreads TOWARD wind_dir + 180
wind_rad = math.radians(wind_dir_deg + 180)
angle_diff = spread_angle - wind_rad
cos_diff = math.cos(angle_diff)
# Scale: 1.0 at crosswind, up to 2.5 downwind, down to 0.3 upwind
factor = 1.0 + cos_diff * min(wind_speed / 40.0, 1.5)
return max(0.3, factor)
def _compute_slope_factor(
self, sr: int, sc: int, tr: int, tc: int
) -> float:
"""Slope factor: fire accelerates uphill."""
source_elev = self.grid.static_grid[sr][sc].elevation_m
target_elev = self.grid.static_grid[tr][tc].elevation_m
elev_diff = target_elev - source_elev
# Positive diff = uphill = faster spread
factor = 1.0 + 0.3 * max(0.0, elev_diff / self.cell_size_m)
# Slight slowdown going downhill
if elev_diff < 0:
factor = max(0.7, 1.0 + 0.1 * elev_diff / self.cell_size_m)
return float(np.clip(factor, 0.5, 2.0))