| """ |
| Heuristic agent for the Wildfire Containment Simulator. |
| |
| Uses a priority-based decision stack to select the best action each step: |
| 1. EMERGENCY: Evacuate endangered crews |
| 2. PROTECT POPULATION: Firebreak between fire and populated zones |
| 3. AIR SUPPORT: Drop retardant on highest-intensity clusters |
| 4. CONTAIN PERIMETER: Deploy/move crews to fire perimeter downwind |
| 5. RECON: Reveal unknown regions (hard tier) |
| 6. IDLE: Wait for situation to evolve |
| """ |
|
|
| from __future__ import annotations |
|
|
| import math |
| from typing import Optional |
|
|
| from env.models import ( |
| Action, ActionType, Observation, Direction, |
| FireState, FuelType, IntensityBin, DIRECTION_DELTAS, |
| CellObservation, CrewState, TankerState, |
| ) |
|
|
|
|
| class HeuristicAgent: |
| """ |
| Greedy heuristic agent that scores situations and picks the highest-value action. |
| |
| This is the primary baseline deliverable for the hackathon submission. |
| """ |
|
|
| def __init__(self): |
| self.step_count = 0 |
|
|
| def act(self, obs: Observation) -> Action: |
| """Select the best action using the priority decision stack.""" |
| self.step_count += 1 |
|
|
| |
| action = self._initial_deployment(obs) |
| if action: |
| return action |
|
|
| |
| action = self._check_crew_emergency(obs) |
| if action: |
| return action |
|
|
| |
| action = self._protect_population(obs) |
| if action: |
| return action |
|
|
| |
| action = self._air_support(obs) |
| if action: |
| return action |
|
|
| |
| action = self._contain_perimeter(obs) |
| if action: |
| return action |
|
|
| |
| action = self._recon(obs) |
| if action: |
| return action |
|
|
| |
| return Action(action_type=ActionType.IDLE, reason="No high-value action available") |
|
|
| |
| |
| |
|
|
| def _initial_deployment(self, obs: Observation) -> Optional[Action]: |
| """Deploy undeployed crews to gain visibility and start working.""" |
| undeployed = [c for c in obs.resources.crews if c.is_active and not c.is_deployed] |
| if not undeployed: |
| return None |
|
|
| burning = self._get_burning_cells_list(obs) |
| tanker_ready = any( |
| t.is_active and t.cooldown_remaining == 0 |
| for t in obs.resources.tankers |
| ) |
| if burning and tanker_ready: |
| return None |
| if not burning and obs.resources.recon_budget > 0 and self._unknown_cell_count(obs) >= 20: |
| return None |
|
|
| crew = undeployed[0] |
| rows = len(obs.grid) |
| cols = len(obs.grid[0]) if rows > 0 else 0 |
|
|
| |
| if burning: |
| |
| fr, fc = burning[0] |
| deploy_r, deploy_c = self._find_safe_deploy_near(obs, fr, fc) |
| if deploy_r is not None: |
| return Action( |
| action_type=ActionType.DEPLOY_CREW, |
| crew_id=crew.crew_id, |
| target_row=deploy_r, |
| target_col=deploy_c, |
| ) |
|
|
| |
| crew_idx = 0 |
| for i, c in enumerate(obs.resources.crews): |
| if c.crew_id == crew.crew_id: |
| crew_idx = i |
| break |
|
|
| |
| quadrants = [ |
| (rows // 4, cols // 4), |
| (rows // 4, 3 * cols // 4), |
| (3 * rows // 4, cols // 4), |
| (3 * rows // 4, 3 * cols // 4), |
| (rows // 2, cols // 2), |
| (rows // 2, cols // 4), |
| ] |
| target_r, target_c = quadrants[crew_idx % len(quadrants)] |
|
|
| |
| deploy_r, deploy_c = self._find_safe_deploy_near(obs, target_r, target_c) |
| if deploy_r is not None: |
| return Action( |
| action_type=ActionType.DEPLOY_CREW, |
| crew_id=crew.crew_id, |
| target_row=deploy_r, |
| target_col=deploy_c, |
| ) |
|
|
| |
| return Action( |
| action_type=ActionType.DEPLOY_CREW, |
| crew_id=crew.crew_id, |
| target_row=rows // 2, |
| target_col=cols // 2, |
| ) |
|
|
| def _get_burning_cells_list(self, obs: Observation) -> list[tuple[int, int]]: |
| """Get list of known burning cell coordinates.""" |
| return [ |
| (cell.row, cell.col) |
| for row in obs.grid for cell in row |
| if cell.fire_state in (FireState.BURNING, FireState.EMBER) |
| ] |
|
|
| |
| |
| |
|
|
| def _check_crew_emergency(self, obs: Observation) -> Optional[Action]: |
| """Move any crew that is adjacent to high-intensity fire.""" |
| for crew in obs.resources.crews: |
| if not crew.is_active or not crew.is_deployed: |
| continue |
|
|
| |
| danger = self._cell_danger(obs, crew.row, crew.col) |
| if danger < 0.6: |
| continue |
|
|
| |
| best_dir = None |
| min_danger = danger |
| for d in Direction: |
| dr, dc = DIRECTION_DELTAS[d] |
| nr, nc = crew.row + dr, crew.col + dc |
| if not self._in_bounds(obs, nr, nc): |
| continue |
| cell = obs.grid[nr][nc] |
| if cell.fuel_type == FuelType.WATER: |
| continue |
| if cell.fire_state in (FireState.BURNING, FireState.EMBER): |
| continue |
| d_val = self._cell_danger(obs, nr, nc) |
| if d_val < min_danger: |
| min_danger = d_val |
| best_dir = d |
|
|
| if best_dir: |
| return Action( |
| action_type=ActionType.MOVE_CREW, |
| crew_id=crew.crew_id, |
| direction=best_dir, |
| ) |
|
|
| return None |
|
|
| |
| |
| |
|
|
| def _protect_population(self, obs: Observation) -> Optional[Action]: |
| """Build firebreaks between fire and populated zones.""" |
| if obs.resources.firebreak_budget <= 0: |
| return None |
|
|
| |
| threatened = [] |
| for row in obs.grid: |
| for cell in row: |
| if not cell.is_populated: |
| continue |
| if cell.fire_state in (FireState.BURNED_OUT, FireState.BURNING): |
| continue |
| |
| fire_dist = self._nearest_fire_distance(obs, cell.row, cell.col) |
| if fire_dist is not None and fire_dist <= 5: |
| threatened.append((cell.row, cell.col, fire_dist)) |
|
|
| if not threatened: |
| return None |
|
|
| |
| threatened.sort(key=lambda x: x[2]) |
| target_r, target_c, _ = threatened[0] |
|
|
| |
| fire_dir = self._direction_toward_fire(obs, target_r, target_c) |
| if fire_dir is None: |
| return None |
|
|
| |
| best_crew = self._find_closest_crew(obs, target_r, target_c, deployed_only=True) |
|
|
| if best_crew: |
| crew = best_crew |
| |
| dist_to_target = abs(crew.row - target_r) + abs(crew.col - target_c) |
| if dist_to_target <= 2: |
| |
| for d in self._prioritized_directions(obs, crew.row, crew.col, fire_dir): |
| dr, dc = DIRECTION_DELTAS[d] |
| nr, nc = crew.row + dr, crew.col + dc |
| if self._is_valid_firebreak(obs, nr, nc): |
| return Action( |
| action_type=ActionType.BUILD_FIREBREAK, |
| crew_id=crew.crew_id, |
| direction=d, |
| ) |
|
|
| |
| move_dir = self._best_direction_toward(crew.row, crew.col, target_r, target_c, obs) |
| if move_dir: |
| return Action( |
| action_type=ActionType.MOVE_CREW, |
| crew_id=crew.crew_id, |
| direction=move_dir, |
| ) |
|
|
| |
| undeployed = self._find_closest_crew(obs, target_r, target_c, deployed_only=False, undeployed_only=True) |
| if undeployed: |
| |
| deploy_r, deploy_c = self._find_safe_deploy_near(obs, target_r, target_c) |
| if deploy_r is not None: |
| return Action( |
| action_type=ActionType.DEPLOY_CREW, |
| crew_id=undeployed.crew_id, |
| target_row=deploy_r, |
| target_col=deploy_c, |
| ) |
|
|
| return None |
|
|
| |
| |
| |
|
|
| def _air_support(self, obs: Observation) -> Optional[Action]: |
| """Drop retardant on the highest-intensity fire cluster.""" |
| available_tankers = [ |
| t for t in obs.resources.tankers |
| if t.is_active and t.cooldown_remaining == 0 |
| ] |
| if not available_tankers: |
| return None |
|
|
| |
| best_target = self._find_hottest_cluster(obs) |
| if best_target is None: |
| return None |
|
|
| tr, tc = best_target |
| |
| if obs.grid[tr][tc].smoke_density > 0.8: |
| return None |
|
|
| tanker = available_tankers[0] |
| return Action( |
| action_type=ActionType.DROP_RETARDANT, |
| tanker_id=tanker.tanker_id, |
| target_row=tr, |
| target_col=tc, |
| ) |
|
|
| |
| |
| |
|
|
| def _contain_perimeter(self, obs: Observation) -> Optional[Action]: |
| """Deploy or move crews to the fire perimeter, preferring the downwind side.""" |
| |
| perimeter = self._get_fire_perimeter_cells(obs) |
| if not perimeter: |
| return None |
|
|
| |
| scored = [] |
| for r, c in perimeter: |
| score = self._perimeter_cell_score(obs, r, c) |
| scored.append((r, c, score)) |
| scored.sort(key=lambda x: -x[2]) |
|
|
| |
| active_crews = [c for c in obs.resources.crews if c.is_active and c.is_deployed] |
| if not active_crews: |
| return None |
|
|
| |
| crew = active_crews[self.step_count % len(active_crews)] |
|
|
| |
| best_target = None |
| best_score = -1 |
| for target_r, target_c, score in scored[:10]: |
| if obs.grid[target_r][target_c].crew_present: |
| continue |
| |
| dist = abs(crew.row - target_r) + abs(crew.col - target_c) |
| adjusted_score = score - dist * 0.3 |
| if adjusted_score > best_score: |
| best_score = adjusted_score |
| best_target = (target_r, target_c) |
|
|
| if best_target is None: |
| return None |
|
|
| target_r, target_c = best_target |
| dist = abs(crew.row - target_r) + abs(crew.col - target_c) |
|
|
| if dist <= 1: |
| |
| if obs.resources.firebreak_budget > 0: |
| for d in Direction: |
| dr, dc = DIRECTION_DELTAS[d] |
| nr, nc = crew.row + dr, crew.col + dc |
| if nr == target_r and nc == target_c and self._is_valid_firebreak(obs, nr, nc): |
| return Action( |
| action_type=ActionType.BUILD_FIREBREAK, |
| crew_id=crew.crew_id, |
| direction=d, |
| ) |
|
|
| |
| move_dir = self._best_direction_toward(crew.row, crew.col, target_r, target_c, obs) |
| if move_dir: |
| return Action( |
| action_type=ActionType.MOVE_CREW, |
| crew_id=crew.crew_id, |
| direction=move_dir, |
| ) |
|
|
| return None |
|
|
| |
| |
| |
|
|
| def _recon(self, obs: Observation) -> Optional[Action]: |
| """Send recon flight over unknown areas. Conserve budget, space out usage.""" |
| if obs.resources.recon_budget <= 0: |
| return None |
|
|
| |
| unknown_cells = [] |
| for row in obs.grid: |
| for cell in row: |
| if cell.fire_state == FireState.UNKNOWN: |
| unknown_cells.append((cell.row, cell.col)) |
|
|
| if len(unknown_cells) < 20: |
| return None |
|
|
| visible_fire = bool(self._get_burning_cells_list(obs)) |
| early_blind_recon = not visible_fire and self.step_count <= 3 |
| undeployed = [c for c in obs.resources.crews if c.is_active and not c.is_deployed] |
|
|
| if not early_blind_recon: |
| |
| if undeployed: |
| return None |
|
|
| |
| if self.step_count % 30 != 5: |
| return None |
|
|
| |
| |
| crew_positions = [(c.row, c.col) for c in obs.resources.crews if c.is_active and c.is_deployed] |
| if not crew_positions: |
| rows = len(obs.grid) |
| cols = len(obs.grid[0]) if rows > 0 else 0 |
| if rows >= 35 and cols >= 35: |
| target = (rows // 4, cols // 4) |
| if obs.resources.recon_budget < 3: |
| target = (rows // 2, 3 * cols // 4) |
| return Action( |
| action_type=ActionType.RECON_FLIGHT, |
| target_row=target[0], |
| target_col=target[1], |
| ) |
| best_cell = min( |
| unknown_cells, |
| key=lambda p: abs(p[0] - rows // 2) + abs(p[1] - cols // 2), |
| ) |
| return Action( |
| action_type=ActionType.RECON_FLIGHT, |
| target_row=best_cell[0], |
| target_col=best_cell[1], |
| ) |
|
|
| best_cell = None |
| max_min_dist = -1 |
| for ur, uc in unknown_cells: |
| min_dist = min(abs(ur - cr) + abs(uc - cc) for cr, cc in crew_positions) |
| if min_dist > max_min_dist: |
| max_min_dist = min_dist |
| best_cell = (ur, uc) |
|
|
| if best_cell: |
| return Action( |
| action_type=ActionType.RECON_FLIGHT, |
| target_row=best_cell[0], |
| target_col=best_cell[1], |
| ) |
|
|
| return None |
|
|
| |
| |
| |
|
|
| def _unknown_cell_count(self, obs: Observation) -> int: |
| return sum( |
| 1 |
| for row in obs.grid |
| for cell in row |
| if cell.fire_state == FireState.UNKNOWN |
| ) |
|
|
| def _in_bounds(self, obs: Observation, r: int, c: int) -> bool: |
| return 0 <= r < len(obs.grid) and 0 <= c < len(obs.grid[0]) |
|
|
| def _cell_danger(self, obs: Observation, r: int, c: int) -> float: |
| """Compute danger level of a cell (0=safe, 1=deadly).""" |
| if not self._in_bounds(obs, r, c): |
| return 0.0 |
|
|
| cell = obs.grid[r][c] |
| danger = 0.0 |
|
|
| |
| if cell.fire_state == FireState.BURNING: |
| intensity_vals = { |
| IntensityBin.NONE: 0, IntensityBin.LOW: 0.3, |
| IntensityBin.MEDIUM: 0.5, IntensityBin.HIGH: 0.7, |
| IntensityBin.EXTREME: 0.95, |
| } |
| danger = max(danger, intensity_vals.get(cell.intensity_bin, 0.5)) |
|
|
| |
| for d in Direction: |
| dr, dc = DIRECTION_DELTAS[d] |
| nr, nc = r + dr, c + dc |
| if self._in_bounds(obs, nr, nc): |
| n_cell = obs.grid[nr][nc] |
| if n_cell.fire_state == FireState.BURNING: |
| intensity_vals = { |
| IntensityBin.NONE: 0, IntensityBin.LOW: 0.15, |
| IntensityBin.MEDIUM: 0.3, IntensityBin.HIGH: 0.5, |
| IntensityBin.EXTREME: 0.7, |
| } |
| danger = max(danger, intensity_vals.get(n_cell.intensity_bin, 0.3)) |
|
|
| return danger |
|
|
| def _nearest_fire_distance(self, obs: Observation, r: int, c: int) -> Optional[int]: |
| """Manhattan distance to nearest burning cell. None if no fire visible.""" |
| min_dist = None |
| for row in obs.grid: |
| for cell in row: |
| if cell.fire_state in (FireState.BURNING, FireState.EMBER): |
| dist = abs(cell.row - r) + abs(cell.col - c) |
| if min_dist is None or dist < min_dist: |
| min_dist = dist |
| return min_dist |
|
|
| def _direction_toward_fire(self, obs: Observation, r: int, c: int) -> Optional[Direction]: |
| """Find direction from (r,c) toward nearest fire.""" |
| closest = None |
| min_dist = float("inf") |
| for row in obs.grid: |
| for cell in row: |
| if cell.fire_state in (FireState.BURNING, FireState.EMBER): |
| dist = abs(cell.row - r) + abs(cell.col - c) |
| if dist < min_dist: |
| min_dist = dist |
| closest = (cell.row, cell.col) |
| if closest is None: |
| return None |
|
|
| dr = closest[0] - r |
| dc = closest[1] - c |
| return self._delta_to_direction(dr, dc) |
|
|
| def _delta_to_direction(self, dr: int, dc: int) -> Direction: |
| """Convert row/col deltas to nearest Direction enum.""" |
| |
| nr = 0 if dr == 0 else (1 if dr > 0 else -1) |
| nc = 0 if dc == 0 else (1 if dc > 0 else -1) |
|
|
| delta_map = {v: k for k, v in DIRECTION_DELTAS.items()} |
| return delta_map.get((nr, nc), Direction.N) |
|
|
| def _prioritized_directions(self, obs: Observation, r: int, c: int, primary: Direction) -> list[Direction]: |
| """Return directions ordered by priority, starting with primary.""" |
| dirs = [primary] |
| for d in Direction: |
| if d != primary: |
| dirs.append(d) |
| return dirs |
|
|
| def _find_closest_crew( |
| self, obs: Observation, r: int, c: int, |
| deployed_only: bool = False, undeployed_only: bool = False, |
| ) -> Optional[CrewState]: |
| """Find the closest active crew to position (r,c).""" |
| best = None |
| min_dist = float("inf") |
| for crew in obs.resources.crews: |
| if not crew.is_active: |
| continue |
| if deployed_only and not crew.is_deployed: |
| continue |
| if undeployed_only and crew.is_deployed: |
| continue |
|
|
| if crew.is_deployed: |
| dist = abs(crew.row - r) + abs(crew.col - c) |
| else: |
| dist = 0 |
|
|
| if dist < min_dist: |
| min_dist = dist |
| best = crew |
|
|
| return best |
|
|
| def _find_safe_deploy_near(self, obs: Observation, r: int, c: int) -> tuple[Optional[int], Optional[int]]: |
| """Find a safe cell near (r,c) for crew deployment.""" |
| rows = len(obs.grid) |
| cols = len(obs.grid[0]) if rows > 0 else 0 |
|
|
| |
| for radius in range(0, 6): |
| candidates = [] |
| for dr in range(-radius, radius + 1): |
| for dc in range(-radius, radius + 1): |
| if abs(dr) + abs(dc) != radius and radius > 0: |
| continue |
| nr, nc = r + dr, c + dc |
| if 0 <= nr < rows and 0 <= nc < cols: |
| cell = obs.grid[nr][nc] |
| if (cell.fire_state in (FireState.UNBURNED, FireState.FIREBREAK, FireState.SUPPRESSED) |
| and cell.fuel_type != FuelType.WATER |
| and not cell.crew_present |
| and self._cell_danger(obs, nr, nc) < 0.5): |
| candidates.append((nr, nc)) |
| if candidates: |
| |
| candidates.sort(key=lambda p: self._nearest_fire_distance(obs, p[0], p[1]) or 999) |
| return candidates[0] |
|
|
| return None, None |
|
|
| def _best_direction_toward(self, fr: int, fc: int, tr: int, tc: int, obs: Observation) -> Optional[Direction]: |
| """Find the best safe direction to move from (fr,fc) toward (tr,tc).""" |
| best_dir = None |
| best_dist = abs(fr - tr) + abs(fc - tc) |
|
|
| for d in Direction: |
| dr, dc = DIRECTION_DELTAS[d] |
| nr, nc = fr + dr, fc + dc |
| if not self._in_bounds(obs, nr, nc): |
| continue |
| cell = obs.grid[nr][nc] |
| if cell.fuel_type == FuelType.WATER: |
| continue |
| if cell.fire_state in (FireState.BURNING, FireState.EMBER): |
| continue |
| if self._cell_danger(obs, nr, nc) >= 0.6: |
| continue |
|
|
| dist = abs(nr - tr) + abs(nc - tc) |
| if dist < best_dist: |
| best_dist = dist |
| best_dir = d |
|
|
| return best_dir |
|
|
| def _find_hottest_cluster(self, obs: Observation) -> Optional[tuple[int, int]]: |
| """Find the burning cell with highest intensity, preferring clusters.""" |
| rows = len(obs.grid) |
| cols = len(obs.grid[0]) if rows > 0 else 0 |
|
|
| best = None |
| best_score = -1.0 |
|
|
| for row in obs.grid: |
| for cell in row: |
| if cell.fire_state != FireState.BURNING: |
| continue |
|
|
| |
| intensity_vals = { |
| IntensityBin.NONE: 0, IntensityBin.LOW: 0.25, |
| IntensityBin.MEDIUM: 0.5, IntensityBin.HIGH: 0.75, |
| IntensityBin.EXTREME: 1.0, |
| } |
| score = intensity_vals.get(cell.intensity_bin, 0.5) |
|
|
| |
| for d in Direction: |
| dr, dc = DIRECTION_DELTAS[d] |
| nr, nc = cell.row + dr, cell.col + dc |
| if 0 <= nr < rows and 0 <= nc < cols: |
| if obs.grid[nr][nc].fire_state == FireState.BURNING: |
| score += 0.1 |
|
|
| |
| for d in Direction: |
| dr, dc = DIRECTION_DELTAS[d] |
| for dist in range(1, 4): |
| nr, nc = cell.row + dr * dist, cell.col + dc * dist |
| if 0 <= nr < rows and 0 <= nc < cols: |
| if obs.grid[nr][nc].is_populated: |
| score += 0.5 / dist |
|
|
| if score > best_score: |
| best_score = score |
| best = (cell.row, cell.col) |
|
|
| return best |
|
|
| def _get_fire_perimeter_cells(self, obs: Observation) -> list[tuple[int, int]]: |
| """Get unburned cells adjacent to fire (the containment line).""" |
| rows = len(obs.grid) |
| cols = len(obs.grid[0]) if rows > 0 else 0 |
| perimeter = set() |
|
|
| for row in obs.grid: |
| for cell in row: |
| if cell.fire_state not in (FireState.BURNING, FireState.EMBER): |
| continue |
| for d in [(-1, 0), (1, 0), (0, -1), (0, 1), (-1, -1), (-1, 1), (1, -1), (1, 1)]: |
| nr, nc = cell.row + d[0], cell.col + d[1] |
| if 0 <= nr < rows and 0 <= nc < cols: |
| n_cell = obs.grid[nr][nc] |
| if n_cell.fire_state == FireState.UNBURNED: |
| perimeter.add((nr, nc)) |
|
|
| return list(perimeter) |
|
|
| def _perimeter_cell_score(self, obs: Observation, r: int, c: int) -> float: |
| """Score a perimeter cell for defensive priority.""" |
| score = 1.0 |
|
|
| cell = obs.grid[r][c] |
|
|
| |
| pop_dist = self._nearest_populated_distance(obs, r, c) |
| if pop_dist is not None: |
| score += 5.0 / max(1, pop_dist) |
|
|
| |
| wind_dir = obs.weather.wind_direction_deg |
| wind_rad = math.radians(wind_dir + 180) |
| fire_dist = self._nearest_fire_distance(obs, r, c) |
| if fire_dist is not None and fire_dist <= 2: |
| score += 2.0 |
|
|
| |
| fuel_vals = {FuelType.GRASS: 0.5, FuelType.SHRUB: 0.7, FuelType.TIMBER: 1.0, FuelType.URBAN: 1.5} |
| score += fuel_vals.get(cell.fuel_type, 0.3) |
|
|
| return score |
|
|
| def _nearest_populated_distance(self, obs: Observation, r: int, c: int) -> Optional[int]: |
| """Manhattan distance to nearest populated cell.""" |
| min_dist = None |
| for row in obs.grid: |
| for cell in row: |
| if cell.is_populated and cell.fire_state != FireState.BURNED_OUT: |
| dist = abs(cell.row - r) + abs(cell.col - c) |
| if min_dist is None or dist < min_dist: |
| min_dist = dist |
| return min_dist |
|
|
| def _is_valid_firebreak(self, obs: Observation, r: int, c: int) -> bool: |
| """Check if cell is valid for firebreak construction.""" |
| if not self._in_bounds(obs, r, c): |
| return False |
| cell = obs.grid[r][c] |
| return (cell.fire_state == FireState.UNBURNED |
| and cell.fuel_type not in (FuelType.WATER, FuelType.URBAN)) |
|
|