usar / server /simulation /collapse_engine.py
PranovRaghavendhra's picture
USAR OpenEnv v1.0 - Urban Search and Rescue
ce614ef
"""
collapse_engine.py — Manages collapse site states and secondary event simulation.
Models:
- Site state progression (active → cleared / collapsed / evacuated)
- Secondary collapse risk (aftershocks increase it over time)
- Communication blackouts (partial observation simulation)
- Rescue progress computation per team/debris type
- Event generation (aftershocks, trapped radio calls, hazmat alerts)
Based on:
- FEMA USAR Field Operations Guide (2006)
- INSARAG Guidelines Vol II — Operations (2020)
- Elnashai et al. (2004): Seismic risk assessment models
"""
from __future__ import annotations
import math
import random
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from ..models import DebrisType, EventNotification, SiteStatus, SiteObservation
from .survivor_model import SiteSurvivorPool
# ---------------------------------------------------------------------------
# Site clearance rates (rescues per step per team)
# These reflect real USAR operational rates from INSARAG guidelines
# ---------------------------------------------------------------------------
BASE_RESCUE_RATE: Dict[DebrisType, float] = {
DebrisType.LIGHT: 3.5, # ~3–4 survivors per step (30 min)
DebrisType.MODERATE: 1.8, # ~1–2 survivors per step
DebrisType.HEAVY: 0.6, # <1 per step without heavy equipment
}
HEAVY_EQUIPMENT_MULTIPLIER = 2.8 # Heavy machinery boosts heavy debris rescue rate
AIR_SUPPORT_BONUS_RATE = 2.0 # Helicopter clears debris / drops rescuers
# ---------------------------------------------------------------------------
# Collapse site
# ---------------------------------------------------------------------------
@dataclass
class Collapsesite:
"""Full state of a single collapse site."""
site_id: int
name: str
debris_type: DebrisType
status: SiteStatus
survivor_pool: SiteSurvivorPool
secondary_collapse_risk: float
distance_from_staging: float # in steps (travel time)
requires_air_support: bool # heavy debris with no ground access
assigned_team_id: Optional[int] = None
rescue_progress: float = 0.0 # fraction 0–1
heavy_equipment_used: bool = False
air_support_applied: bool = False
secondary_collapsed: bool = False
def to_observation(self, elapsed_hours: float) -> SiteObservation:
pool = self.survivor_pool
return SiteObservation(
site_id=self.site_id,
name=self.name,
status=self.status,
debris_type=self.debris_type,
trapped_survivors=pool.alive_count,
critical_count=pool.critical_count,
survival_probability=round(pool.average_survival_probability(elapsed_hours), 3),
decay_rate=self._compute_decay_rate(elapsed_hours),
rescue_progress=round(self.rescue_progress, 3),
assigned_team_id=self.assigned_team_id,
requires_air_support=self.requires_air_support,
secondary_collapse_risk=round(self.secondary_collapse_risk, 3),
distance_from_staging=self.distance_from_staging,
survivors_rescued=pool.rescued_count,
)
def _compute_decay_rate(self, elapsed_hours: float) -> float:
"""Approximate per-step survival probability drop."""
# Phase-dependent decay rate
if elapsed_hours < 6:
base = 0.015
elif elapsed_hours < 24:
base = 0.08
elif elapsed_hours < 72:
base = 0.03
else:
base = 0.005
# Critical survivors decay faster
pool = self.survivor_pool
if pool.alive_count > 0:
crit_frac = pool.critical_count / pool.alive_count
else:
crit_frac = 0.0
return round(base * (1 + 0.5 * crit_frac), 4)
def compute_rescue_batch_size(
self,
team_efficiency: float,
team_specialization: str,
has_heavy_equipment: bool,
rng: random.Random,
) -> int:
"""
Calculate how many survivors a team rescues this step.
Incorporates debris type, team fatigue, specialization, and equipment.
"""
if self.status != SiteStatus.ACTIVE:
return 0
if self.survivor_pool.alive_count == 0:
return 0
base_rate = BASE_RESCUE_RATE[self.debris_type]
# Specialization bonus
if team_specialization == "heavy_rescue" and self.debris_type == DebrisType.HEAVY:
base_rate *= 1.5
elif team_specialization == "medical" and self.survivor_pool.critical_count > 0:
base_rate *= 1.3
elif team_specialization == "swift_water":
base_rate *= 0.85 # slight penalty on land sites
# Equipment bonus
if has_heavy_equipment and self.debris_type == DebrisType.HEAVY:
base_rate *= HEAVY_EQUIPMENT_MULTIPLIER
# Air support applied (one-time boost)
if self.air_support_applied and not self.heavy_equipment_used:
base_rate *= AIR_SUPPORT_BONUS_RATE
# Team fatigue penalty
base_rate *= (1.0 - 0.4 * team_efficiency) # efficiency here = fatigue
# Stochastic variation ±20%
rate = base_rate * rng.uniform(0.8, 1.2)
# Cannot exceed alive count
count = min(int(math.ceil(rate)), self.survivor_pool.alive_count)
return max(0, count)
def apply_secondary_collapse_check(
self,
aftershock_magnitude: float,
rng: random.Random,
) -> bool:
"""
Check if a secondary collapse occurs this step.
Returns True if collapsed (catastrophic event).
"""
if self.status != SiteStatus.ACTIVE:
return False
# Aftershock increases risk
triggered_risk = self.secondary_collapse_risk * (1 + aftershock_magnitude * 2.0)
triggered_risk = min(0.95, triggered_risk)
if rng.random() < triggered_risk:
self.status = SiteStatus.COLLAPSED
self.secondary_collapsed = True
# All unrescued survivors are killed
for s in self.survivor_pool.survivors:
if not s.rescued:
s.alive = False
return True
return False
def check_if_cleared(self):
"""Mark site as cleared if all accessible survivors are rescued."""
if self.survivor_pool.alive_count == 0 and self.status == SiteStatus.ACTIVE:
self.status = SiteStatus.CLEARED
self.rescue_progress = 1.0
# ---------------------------------------------------------------------------
# Collapse Engine
# ---------------------------------------------------------------------------
class CollapseEngine:
"""
Manages the full collection of collapse sites in one episode.
Handles aftershock generation, secondary collapses, and site progression.
"""
def __init__(self, sites: List[Collapsesite], rng: random.Random):
self.sites = {s.site_id: s for s in sites}
self.rng = rng
self.aftershock_sequence: List[float] = [] # Pre-generated magnitudes
self.current_step = 0
self.elapsed_hours = 0.0
self.step_hours = 0.5 # Each step = 30 minutes
self._generate_aftershock_sequence(100)
def _generate_aftershock_sequence(self, length: int):
"""
Pre-generate aftershock magnitudes using Gutenberg-Richter law.
Most aftershocks are small; rare ones are significant.
"""
for _ in range(length):
# Omori-Utsu decay: aftershocks become less frequent/intense over time
step_factor = max(0.1, 1.0 - self.current_step * 0.008)
if self.rng.random() < 0.15 * step_factor:
# Aftershock occurs — magnitude from GR distribution
magnitude = self.rng.expovariate(1.5) * step_factor
magnitude = min(magnitude, 1.0)
else:
magnitude = 0.0
self.aftershock_sequence.append(magnitude)
def step(self) -> Tuple[List[EventNotification], float]:
"""
Advance all sites by one timestep.
Returns (events_generated, aftershock_magnitude_this_step).
"""
self.elapsed_hours += self.step_hours
self.current_step += 1
events: List[EventNotification] = []
# Get aftershock for this step
if self.current_step < len(self.aftershock_sequence):
aftershock = self.aftershock_sequence[self.current_step]
else:
aftershock = 0.0
if aftershock > 0.3:
events.append(EventNotification(
event_type="aftershock",
site_id=None,
description=f"Aftershock detected (magnitude {aftershock:.2f} relative). Secondary collapse risk elevated.",
severity=aftershock,
))
# Process each active site
for site in self.sites.values():
if site.status != SiteStatus.ACTIVE:
continue
# Update secondary collapse risk (creep up slowly over time)
site.secondary_collapse_risk = min(
0.90,
site.secondary_collapse_risk + 0.005 * self.elapsed_hours / 24.0,
)
# Secondary collapse check
collapsed = site.apply_secondary_collapse_check(aftershock, self.rng)
if collapsed:
events.append(EventNotification(
event_type="secondary_collapse",
site_id=site.site_id,
description=f"SECONDARY COLLAPSE at {site.name}! All unrescued survivors are lost. Site is inaccessible.",
severity=1.0,
))
continue
# Apply survival decay to all unrescued survivors
deaths = site.survivor_pool.apply_time_decay(self.elapsed_hours, self.rng)
if deaths > 0:
events.append(EventNotification(
event_type="survivor_death",
site_id=site.site_id,
description=f"{deaths} survivor(s) at {site.name} did not survive the wait.",
severity=min(1.0, deaths * 0.2),
))
# Random radio contact from survivors (partial information)
if self.rng.random() < 0.08 and site.survivor_pool.alive_count > 0:
events.append(EventNotification(
event_type="survivor_radio_contact",
site_id=site.site_id,
description=f"Radio contact from {site.name}: {site.survivor_pool.alive_count} survivors confirmed alive. Critical: {site.survivor_pool.critical_count}.",
severity=0.2,
))
site.check_if_cleared()
return events, aftershock
def get_site(self, site_id: int) -> Optional[Collapsesite]:
return self.sites.get(site_id)
def all_sites_observation(self, elapsed_hours: float) -> List[SiteObservation]:
return [s.to_observation(elapsed_hours) for s in self.sites.values()]
@property
def total_alive(self) -> int:
return sum(s.survivor_pool.alive_count for s in self.sites.values())
@property
def total_rescued(self) -> int:
return sum(s.survivor_pool.rescued_count for s in self.sites.values())
@property
def total_lost(self) -> int:
return sum(
sum(1 for sv in s.survivor_pool.survivors if not sv.alive and not sv.rescued)
for s in self.sites.values()
)
@property
def total_initial_survivors(self) -> int:
return sum(len(s.survivor_pool.survivors) for s in self.sites.values())