Wildfire-Containment-Simulator / agents /heuristic_agent.py
Eshit's picture
Deploy to HF Space
363abf3
"""
Heuristic agent for the Wildfire Containment Simulator.
Uses a priority-based decision stack to select the best action each step:
1. EMERGENCY: Evacuate endangered crews
2. PROTECT POPULATION: Firebreak between fire and populated zones
3. AIR SUPPORT: Drop retardant on highest-intensity clusters
4. CONTAIN PERIMETER: Deploy/move crews to fire perimeter downwind
5. RECON: Reveal unknown regions (hard tier)
6. IDLE: Wait for situation to evolve
"""
from __future__ import annotations
import math
from typing import Optional
from env.models import (
Action, ActionType, Observation, Direction,
FireState, FuelType, IntensityBin, DIRECTION_DELTAS,
CellObservation, CrewState, TankerState,
)
class HeuristicAgent:
"""
Greedy heuristic agent that scores situations and picks the highest-value action.
This is the primary baseline deliverable for the hackathon submission.
"""
def __init__(self):
self.step_count = 0
def act(self, obs: Observation) -> Action:
"""Select the best action using the priority decision stack."""
self.step_count += 1
# ── Priority 0: DEPLOY — get undeployed crews onto the field first ──
action = self._initial_deployment(obs)
if action:
return action
# ── Priority 1: EMERGENCY — evacuate endangered crews ──
action = self._check_crew_emergency(obs)
if action:
return action
# ── Priority 2: PROTECT POPULATION — firebreak near populated zones ──
action = self._protect_population(obs)
if action:
return action
# ── Priority 3: AIR SUPPORT — retardant on hottest cluster ──
action = self._air_support(obs)
if action:
return action
# ── Priority 4: CONTAIN PERIMETER — deploy/move crews to fire edge ──
action = self._contain_perimeter(obs)
if action:
return action
# ── Priority 5: RECON — reveal unknown areas ──
action = self._recon(obs)
if action:
return action
# ── Priority 6: IDLE ──
return Action(action_type=ActionType.IDLE, reason="No high-value action available")
# ══════════════════════════════════════════════════
# PRIORITY 0: INITIAL DEPLOYMENT
# ══════════════════════════════════════════════════
def _initial_deployment(self, obs: Observation) -> Optional[Action]:
"""Deploy undeployed crews to gain visibility and start working."""
undeployed = [c for c in obs.resources.crews if c.is_active and not c.is_deployed]
if not undeployed:
return None
burning = self._get_burning_cells_list(obs)
tanker_ready = any(
t.is_active and t.cooldown_remaining == 0
for t in obs.resources.tankers
)
if burning and tanker_ready:
return None
if not burning and obs.resources.recon_budget > 0 and self._unknown_cell_count(obs) >= 20:
return None
crew = undeployed[0]
rows = len(obs.grid)
cols = len(obs.grid[0]) if rows > 0 else 0
# Strategy: deploy near known fire, or spread across grid for visibility
if burning:
# Deploy near fire but not too close
fr, fc = burning[0]
deploy_r, deploy_c = self._find_safe_deploy_near(obs, fr, fc)
if deploy_r is not None:
return Action(
action_type=ActionType.DEPLOY_CREW,
crew_id=crew.crew_id,
target_row=deploy_r,
target_col=deploy_c,
)
# No visible fire (fog-of-war) — spread crews across grid quadrants
crew_idx = 0
for i, c in enumerate(obs.resources.crews):
if c.crew_id == crew.crew_id:
crew_idx = i
break
# Place in different quadrants
quadrants = [
(rows // 4, cols // 4),
(rows // 4, 3 * cols // 4),
(3 * rows // 4, cols // 4),
(3 * rows // 4, 3 * cols // 4),
(rows // 2, cols // 2),
(rows // 2, cols // 4),
]
target_r, target_c = quadrants[crew_idx % len(quadrants)]
# Find safe cell near target
deploy_r, deploy_c = self._find_safe_deploy_near(obs, target_r, target_c)
if deploy_r is not None:
return Action(
action_type=ActionType.DEPLOY_CREW,
crew_id=crew.crew_id,
target_row=deploy_r,
target_col=deploy_c,
)
# Fallback: deploy at grid center
return Action(
action_type=ActionType.DEPLOY_CREW,
crew_id=crew.crew_id,
target_row=rows // 2,
target_col=cols // 2,
)
def _get_burning_cells_list(self, obs: Observation) -> list[tuple[int, int]]:
"""Get list of known burning cell coordinates."""
return [
(cell.row, cell.col)
for row in obs.grid for cell in row
if cell.fire_state in (FireState.BURNING, FireState.EMBER)
]
# ══════════════════════════════════════════════════
# PRIORITY 1: CREW EMERGENCY
# ══════════════════════════════════════════════════
def _check_crew_emergency(self, obs: Observation) -> Optional[Action]:
"""Move any crew that is adjacent to high-intensity fire."""
for crew in obs.resources.crews:
if not crew.is_active or not crew.is_deployed:
continue
# Check if crew's current cell or neighbors are dangerous
danger = self._cell_danger(obs, crew.row, crew.col)
if danger < 0.6:
continue
# Find safest adjacent direction to flee
best_dir = None
min_danger = danger
for d in Direction:
dr, dc = DIRECTION_DELTAS[d]
nr, nc = crew.row + dr, crew.col + dc
if not self._in_bounds(obs, nr, nc):
continue
cell = obs.grid[nr][nc]
if cell.fuel_type == FuelType.WATER:
continue
if cell.fire_state in (FireState.BURNING, FireState.EMBER):
continue
d_val = self._cell_danger(obs, nr, nc)
if d_val < min_danger:
min_danger = d_val
best_dir = d
if best_dir:
return Action(
action_type=ActionType.MOVE_CREW,
crew_id=crew.crew_id,
direction=best_dir,
)
return None
# ══════════════════════════════════════════════════
# PRIORITY 2: PROTECT POPULATION
# ══════════════════════════════════════════════════
def _protect_population(self, obs: Observation) -> Optional[Action]:
"""Build firebreaks between fire and populated zones."""
if obs.resources.firebreak_budget <= 0:
return None
# Find populated cells threatened by nearby fire
threatened = []
for row in obs.grid:
for cell in row:
if not cell.is_populated:
continue
if cell.fire_state in (FireState.BURNED_OUT, FireState.BURNING):
continue
# Check if fire is within 3 cells
fire_dist = self._nearest_fire_distance(obs, cell.row, cell.col)
if fire_dist is not None and fire_dist <= 5:
threatened.append((cell.row, cell.col, fire_dist))
if not threatened:
return None
# Sort by closest fire
threatened.sort(key=lambda x: x[2])
target_r, target_c, _ = threatened[0]
# Find a deployed crew that can build a firebreak toward the fire
fire_dir = self._direction_toward_fire(obs, target_r, target_c)
if fire_dir is None:
return None
# Find the crew closest to this populated cell
best_crew = self._find_closest_crew(obs, target_r, target_c, deployed_only=True)
if best_crew:
crew = best_crew
# If crew is adjacent to the target area, build firebreak
dist_to_target = abs(crew.row - target_r) + abs(crew.col - target_c)
if dist_to_target <= 2:
# Try to build firebreak in the direction fire is coming from
for d in self._prioritized_directions(obs, crew.row, crew.col, fire_dir):
dr, dc = DIRECTION_DELTAS[d]
nr, nc = crew.row + dr, crew.col + dc
if self._is_valid_firebreak(obs, nr, nc):
return Action(
action_type=ActionType.BUILD_FIREBREAK,
crew_id=crew.crew_id,
direction=d,
)
# Otherwise move crew toward the threatened area
move_dir = self._best_direction_toward(crew.row, crew.col, target_r, target_c, obs)
if move_dir:
return Action(
action_type=ActionType.MOVE_CREW,
crew_id=crew.crew_id,
direction=move_dir,
)
# Deploy an undeployed crew near the threatened population
undeployed = self._find_closest_crew(obs, target_r, target_c, deployed_only=False, undeployed_only=True)
if undeployed:
# Deploy near the threatened cell (between fire and population)
deploy_r, deploy_c = self._find_safe_deploy_near(obs, target_r, target_c)
if deploy_r is not None:
return Action(
action_type=ActionType.DEPLOY_CREW,
crew_id=undeployed.crew_id,
target_row=deploy_r,
target_col=deploy_c,
)
return None
# ══════════════════════════════════════════════════
# PRIORITY 3: AIR SUPPORT
# ══════════════════════════════════════════════════
def _air_support(self, obs: Observation) -> Optional[Action]:
"""Drop retardant on the highest-intensity fire cluster."""
available_tankers = [
t for t in obs.resources.tankers
if t.is_active and t.cooldown_remaining == 0
]
if not available_tankers:
return None
# Find highest-intensity burning cluster
best_target = self._find_hottest_cluster(obs)
if best_target is None:
return None
tr, tc = best_target
# Check smoke density at target
if obs.grid[tr][tc].smoke_density > 0.8:
return None
tanker = available_tankers[0]
return Action(
action_type=ActionType.DROP_RETARDANT,
tanker_id=tanker.tanker_id,
target_row=tr,
target_col=tc,
)
# ══════════════════════════════════════════════════
# PRIORITY 4: CONTAIN PERIMETER
# ══════════════════════════════════════════════════
def _contain_perimeter(self, obs: Observation) -> Optional[Action]:
"""Deploy or move crews to the fire perimeter, preferring the downwind side."""
# Find fire perimeter cells (unburned cells adjacent to fire)
perimeter = self._get_fire_perimeter_cells(obs)
if not perimeter:
return None
# Score perimeter cells: higher score = more valuable to defend
scored = []
for r, c in perimeter:
score = self._perimeter_cell_score(obs, r, c)
scored.append((r, c, score))
scored.sort(key=lambda x: -x[2])
# Get all deployed active crews
active_crews = [c for c in obs.resources.crews if c.is_active and c.is_deployed]
if not active_crews:
return None
# Cycle through crews round-robin based on step count
crew = active_crews[self.step_count % len(active_crews)]
# Find the best perimeter cell for THIS crew
best_target = None
best_score = -1
for target_r, target_c, score in scored[:10]:
if obs.grid[target_r][target_c].crew_present:
continue
# Prefer targets close to this crew
dist = abs(crew.row - target_r) + abs(crew.col - target_c)
adjusted_score = score - dist * 0.3 # Penalize distant targets
if adjusted_score > best_score:
best_score = adjusted_score
best_target = (target_r, target_c)
if best_target is None:
return None
target_r, target_c = best_target
dist = abs(crew.row - target_r) + abs(crew.col - target_c)
if dist <= 1:
# Adjacent — build firebreak if possible
if obs.resources.firebreak_budget > 0:
for d in Direction:
dr, dc = DIRECTION_DELTAS[d]
nr, nc = crew.row + dr, crew.col + dc
if nr == target_r and nc == target_c and self._is_valid_firebreak(obs, nr, nc):
return Action(
action_type=ActionType.BUILD_FIREBREAK,
crew_id=crew.crew_id,
direction=d,
)
# Move toward target
move_dir = self._best_direction_toward(crew.row, crew.col, target_r, target_c, obs)
if move_dir:
return Action(
action_type=ActionType.MOVE_CREW,
crew_id=crew.crew_id,
direction=move_dir,
)
return None
# ══════════════════════════════════════════════════
# PRIORITY 5: RECON
# ══════════════════════════════════════════════════
def _recon(self, obs: Observation) -> Optional[Action]:
"""Send recon flight over unknown areas. Conserve budget, space out usage."""
if obs.resources.recon_budget <= 0:
return None
# Count unknown cells
unknown_cells = []
for row in obs.grid:
for cell in row:
if cell.fire_state == FireState.UNKNOWN:
unknown_cells.append((cell.row, cell.col))
if len(unknown_cells) < 20:
return None
visible_fire = bool(self._get_burning_cells_list(obs))
early_blind_recon = not visible_fire and self.step_count <= 3
undeployed = [c for c in obs.resources.crews if c.is_active and not c.is_deployed]
if not early_blind_recon:
# Don't recon until all crews are deployed.
if undeployed:
return None
# Only recon every ~30 steps to conserve budget.
if self.step_count % 30 != 5:
return None
# Cluster unknown cells and pick a dense region
# Simple approach: find the unknown cell farthest from any deployed crew
crew_positions = [(c.row, c.col) for c in obs.resources.crews if c.is_active and c.is_deployed]
if not crew_positions:
rows = len(obs.grid)
cols = len(obs.grid[0]) if rows > 0 else 0
if rows >= 35 and cols >= 35:
target = (rows // 4, cols // 4)
if obs.resources.recon_budget < 3:
target = (rows // 2, 3 * cols // 4)
return Action(
action_type=ActionType.RECON_FLIGHT,
target_row=target[0],
target_col=target[1],
)
best_cell = min(
unknown_cells,
key=lambda p: abs(p[0] - rows // 2) + abs(p[1] - cols // 2),
)
return Action(
action_type=ActionType.RECON_FLIGHT,
target_row=best_cell[0],
target_col=best_cell[1],
)
best_cell = None
max_min_dist = -1
for ur, uc in unknown_cells:
min_dist = min(abs(ur - cr) + abs(uc - cc) for cr, cc in crew_positions)
if min_dist > max_min_dist:
max_min_dist = min_dist
best_cell = (ur, uc)
if best_cell:
return Action(
action_type=ActionType.RECON_FLIGHT,
target_row=best_cell[0],
target_col=best_cell[1],
)
return None
# ══════════════════════════════════════════════════
# HELPER METHODS
# ══════════════════════════════════════════════════
def _unknown_cell_count(self, obs: Observation) -> int:
return sum(
1
for row in obs.grid
for cell in row
if cell.fire_state == FireState.UNKNOWN
)
def _in_bounds(self, obs: Observation, r: int, c: int) -> bool:
return 0 <= r < len(obs.grid) and 0 <= c < len(obs.grid[0])
def _cell_danger(self, obs: Observation, r: int, c: int) -> float:
"""Compute danger level of a cell (0=safe, 1=deadly)."""
if not self._in_bounds(obs, r, c):
return 0.0
cell = obs.grid[r][c]
danger = 0.0
# Direct fire
if cell.fire_state == FireState.BURNING:
intensity_vals = {
IntensityBin.NONE: 0, IntensityBin.LOW: 0.3,
IntensityBin.MEDIUM: 0.5, IntensityBin.HIGH: 0.7,
IntensityBin.EXTREME: 0.95,
}
danger = max(danger, intensity_vals.get(cell.intensity_bin, 0.5))
# Adjacent fire
for d in Direction:
dr, dc = DIRECTION_DELTAS[d]
nr, nc = r + dr, c + dc
if self._in_bounds(obs, nr, nc):
n_cell = obs.grid[nr][nc]
if n_cell.fire_state == FireState.BURNING:
intensity_vals = {
IntensityBin.NONE: 0, IntensityBin.LOW: 0.15,
IntensityBin.MEDIUM: 0.3, IntensityBin.HIGH: 0.5,
IntensityBin.EXTREME: 0.7,
}
danger = max(danger, intensity_vals.get(n_cell.intensity_bin, 0.3))
return danger
def _nearest_fire_distance(self, obs: Observation, r: int, c: int) -> Optional[int]:
"""Manhattan distance to nearest burning cell. None if no fire visible."""
min_dist = None
for row in obs.grid:
for cell in row:
if cell.fire_state in (FireState.BURNING, FireState.EMBER):
dist = abs(cell.row - r) + abs(cell.col - c)
if min_dist is None or dist < min_dist:
min_dist = dist
return min_dist
def _direction_toward_fire(self, obs: Observation, r: int, c: int) -> Optional[Direction]:
"""Find direction from (r,c) toward nearest fire."""
closest = None
min_dist = float("inf")
for row in obs.grid:
for cell in row:
if cell.fire_state in (FireState.BURNING, FireState.EMBER):
dist = abs(cell.row - r) + abs(cell.col - c)
if dist < min_dist:
min_dist = dist
closest = (cell.row, cell.col)
if closest is None:
return None
dr = closest[0] - r
dc = closest[1] - c
return self._delta_to_direction(dr, dc)
def _delta_to_direction(self, dr: int, dc: int) -> Direction:
"""Convert row/col deltas to nearest Direction enum."""
# Normalize to -1/0/1
nr = 0 if dr == 0 else (1 if dr > 0 else -1)
nc = 0 if dc == 0 else (1 if dc > 0 else -1)
delta_map = {v: k for k, v in DIRECTION_DELTAS.items()}
return delta_map.get((nr, nc), Direction.N)
def _prioritized_directions(self, obs: Observation, r: int, c: int, primary: Direction) -> list[Direction]:
"""Return directions ordered by priority, starting with primary."""
dirs = [primary]
for d in Direction:
if d != primary:
dirs.append(d)
return dirs
def _find_closest_crew(
self, obs: Observation, r: int, c: int,
deployed_only: bool = False, undeployed_only: bool = False,
) -> Optional[CrewState]:
"""Find the closest active crew to position (r,c)."""
best = None
min_dist = float("inf")
for crew in obs.resources.crews:
if not crew.is_active:
continue
if deployed_only and not crew.is_deployed:
continue
if undeployed_only and crew.is_deployed:
continue
if crew.is_deployed:
dist = abs(crew.row - r) + abs(crew.col - c)
else:
dist = 0 # Undeployed crews can deploy anywhere
if dist < min_dist:
min_dist = dist
best = crew
return best
def _find_safe_deploy_near(self, obs: Observation, r: int, c: int) -> tuple[Optional[int], Optional[int]]:
"""Find a safe cell near (r,c) for crew deployment."""
rows = len(obs.grid)
cols = len(obs.grid[0]) if rows > 0 else 0
# Search in expanding rings
for radius in range(0, 6):
candidates = []
for dr in range(-radius, radius + 1):
for dc in range(-radius, radius + 1):
if abs(dr) + abs(dc) != radius and radius > 0:
continue
nr, nc = r + dr, c + dc
if 0 <= nr < rows and 0 <= nc < cols:
cell = obs.grid[nr][nc]
if (cell.fire_state in (FireState.UNBURNED, FireState.FIREBREAK, FireState.SUPPRESSED)
and cell.fuel_type != FuelType.WATER
and not cell.crew_present
and self._cell_danger(obs, nr, nc) < 0.5):
candidates.append((nr, nc))
if candidates:
# Pick the one closest to the fire (to be useful)
candidates.sort(key=lambda p: self._nearest_fire_distance(obs, p[0], p[1]) or 999)
return candidates[0]
return None, None
def _best_direction_toward(self, fr: int, fc: int, tr: int, tc: int, obs: Observation) -> Optional[Direction]:
"""Find the best safe direction to move from (fr,fc) toward (tr,tc)."""
best_dir = None
best_dist = abs(fr - tr) + abs(fc - tc)
for d in Direction:
dr, dc = DIRECTION_DELTAS[d]
nr, nc = fr + dr, fc + dc
if not self._in_bounds(obs, nr, nc):
continue
cell = obs.grid[nr][nc]
if cell.fuel_type == FuelType.WATER:
continue
if cell.fire_state in (FireState.BURNING, FireState.EMBER):
continue
if self._cell_danger(obs, nr, nc) >= 0.6:
continue
dist = abs(nr - tr) + abs(nc - tc)
if dist < best_dist:
best_dist = dist
best_dir = d
return best_dir
def _find_hottest_cluster(self, obs: Observation) -> Optional[tuple[int, int]]:
"""Find the burning cell with highest intensity, preferring clusters."""
rows = len(obs.grid)
cols = len(obs.grid[0]) if rows > 0 else 0
best = None
best_score = -1.0
for row in obs.grid:
for cell in row:
if cell.fire_state != FireState.BURNING:
continue
# Base score from intensity
intensity_vals = {
IntensityBin.NONE: 0, IntensityBin.LOW: 0.25,
IntensityBin.MEDIUM: 0.5, IntensityBin.HIGH: 0.75,
IntensityBin.EXTREME: 1.0,
}
score = intensity_vals.get(cell.intensity_bin, 0.5)
# Bonus for burning neighbors (cluster)
for d in Direction:
dr, dc = DIRECTION_DELTAS[d]
nr, nc = cell.row + dr, cell.col + dc
if 0 <= nr < rows and 0 <= nc < cols:
if obs.grid[nr][nc].fire_state == FireState.BURNING:
score += 0.1
# Bonus for proximity to populated cells
for d in Direction:
dr, dc = DIRECTION_DELTAS[d]
for dist in range(1, 4):
nr, nc = cell.row + dr * dist, cell.col + dc * dist
if 0 <= nr < rows and 0 <= nc < cols:
if obs.grid[nr][nc].is_populated:
score += 0.5 / dist
if score > best_score:
best_score = score
best = (cell.row, cell.col)
return best
def _get_fire_perimeter_cells(self, obs: Observation) -> list[tuple[int, int]]:
"""Get unburned cells adjacent to fire (the containment line)."""
rows = len(obs.grid)
cols = len(obs.grid[0]) if rows > 0 else 0
perimeter = set()
for row in obs.grid:
for cell in row:
if cell.fire_state not in (FireState.BURNING, FireState.EMBER):
continue
for d in [(-1, 0), (1, 0), (0, -1), (0, 1), (-1, -1), (-1, 1), (1, -1), (1, 1)]:
nr, nc = cell.row + d[0], cell.col + d[1]
if 0 <= nr < rows and 0 <= nc < cols:
n_cell = obs.grid[nr][nc]
if n_cell.fire_state == FireState.UNBURNED:
perimeter.add((nr, nc))
return list(perimeter)
def _perimeter_cell_score(self, obs: Observation, r: int, c: int) -> float:
"""Score a perimeter cell for defensive priority."""
score = 1.0
cell = obs.grid[r][c]
# Heavily prioritize cells near populated areas
pop_dist = self._nearest_populated_distance(obs, r, c)
if pop_dist is not None:
score += 5.0 / max(1, pop_dist)
# Prioritize downwind cells (fire will spread toward them)
wind_dir = obs.weather.wind_direction_deg
wind_rad = math.radians(wind_dir + 180) # Direction fire spreads
fire_dist = self._nearest_fire_distance(obs, r, c)
if fire_dist is not None and fire_dist <= 2:
score += 2.0
# Prioritize cells with high fuel load (will burn intensely if ignited)
fuel_vals = {FuelType.GRASS: 0.5, FuelType.SHRUB: 0.7, FuelType.TIMBER: 1.0, FuelType.URBAN: 1.5}
score += fuel_vals.get(cell.fuel_type, 0.3)
return score
def _nearest_populated_distance(self, obs: Observation, r: int, c: int) -> Optional[int]:
"""Manhattan distance to nearest populated cell."""
min_dist = None
for row in obs.grid:
for cell in row:
if cell.is_populated and cell.fire_state != FireState.BURNED_OUT:
dist = abs(cell.row - r) + abs(cell.col - c)
if min_dist is None or dist < min_dist:
min_dist = dist
return min_dist
def _is_valid_firebreak(self, obs: Observation, r: int, c: int) -> bool:
"""Check if cell is valid for firebreak construction."""
if not self._in_bounds(obs, r, c):
return False
cell = obs.grid[r][c]
return (cell.fire_state == FireState.UNBURNED
and cell.fuel_type not in (FuelType.WATER, FuelType.URBAN))