""" collapse_engine.py — Manages collapse site states and secondary event simulation. Models: - Site state progression (active → cleared / collapsed / evacuated) - Secondary collapse risk (aftershocks increase it over time) - Communication blackouts (partial observation simulation) - Rescue progress computation per team/debris type - Event generation (aftershocks, trapped radio calls, hazmat alerts) Based on: - FEMA USAR Field Operations Guide (2006) - INSARAG Guidelines Vol II — Operations (2020) - Elnashai et al. (2004): Seismic risk assessment models """ from __future__ import annotations import math import random from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple from ..models import DebrisType, EventNotification, SiteStatus, SiteObservation from .survivor_model import SiteSurvivorPool # --------------------------------------------------------------------------- # Site clearance rates (rescues per step per team) # These reflect real USAR operational rates from INSARAG guidelines # --------------------------------------------------------------------------- BASE_RESCUE_RATE: Dict[DebrisType, float] = { DebrisType.LIGHT: 3.5, # ~3–4 survivors per step (30 min) DebrisType.MODERATE: 1.8, # ~1–2 survivors per step DebrisType.HEAVY: 0.6, # <1 per step without heavy equipment } HEAVY_EQUIPMENT_MULTIPLIER = 2.8 # Heavy machinery boosts heavy debris rescue rate AIR_SUPPORT_BONUS_RATE = 2.0 # Helicopter clears debris / drops rescuers # --------------------------------------------------------------------------- # Collapse site # --------------------------------------------------------------------------- @dataclass class Collapsesite: """Full state of a single collapse site.""" site_id: int name: str debris_type: DebrisType status: SiteStatus survivor_pool: SiteSurvivorPool secondary_collapse_risk: float distance_from_staging: float # in steps (travel time) requires_air_support: bool # heavy debris with no ground access assigned_team_id: Optional[int] = None rescue_progress: float = 0.0 # fraction 0–1 heavy_equipment_used: bool = False air_support_applied: bool = False secondary_collapsed: bool = False def to_observation(self, elapsed_hours: float) -> SiteObservation: pool = self.survivor_pool return SiteObservation( site_id=self.site_id, name=self.name, status=self.status, debris_type=self.debris_type, trapped_survivors=pool.alive_count, critical_count=pool.critical_count, survival_probability=round(pool.average_survival_probability(elapsed_hours), 3), decay_rate=self._compute_decay_rate(elapsed_hours), rescue_progress=round(self.rescue_progress, 3), assigned_team_id=self.assigned_team_id, requires_air_support=self.requires_air_support, secondary_collapse_risk=round(self.secondary_collapse_risk, 3), distance_from_staging=self.distance_from_staging, survivors_rescued=pool.rescued_count, ) def _compute_decay_rate(self, elapsed_hours: float) -> float: """Approximate per-step survival probability drop.""" # Phase-dependent decay rate if elapsed_hours < 6: base = 0.015 elif elapsed_hours < 24: base = 0.08 elif elapsed_hours < 72: base = 0.03 else: base = 0.005 # Critical survivors decay faster pool = self.survivor_pool if pool.alive_count > 0: crit_frac = pool.critical_count / pool.alive_count else: crit_frac = 0.0 return round(base * (1 + 0.5 * crit_frac), 4) def compute_rescue_batch_size( self, team_efficiency: float, team_specialization: str, has_heavy_equipment: bool, rng: random.Random, ) -> int: """ Calculate how many survivors a team rescues this step. Incorporates debris type, team fatigue, specialization, and equipment. """ if self.status != SiteStatus.ACTIVE: return 0 if self.survivor_pool.alive_count == 0: return 0 base_rate = BASE_RESCUE_RATE[self.debris_type] # Specialization bonus if team_specialization == "heavy_rescue" and self.debris_type == DebrisType.HEAVY: base_rate *= 1.5 elif team_specialization == "medical" and self.survivor_pool.critical_count > 0: base_rate *= 1.3 elif team_specialization == "swift_water": base_rate *= 0.85 # slight penalty on land sites # Equipment bonus if has_heavy_equipment and self.debris_type == DebrisType.HEAVY: base_rate *= HEAVY_EQUIPMENT_MULTIPLIER # Air support applied (one-time boost) if self.air_support_applied and not self.heavy_equipment_used: base_rate *= AIR_SUPPORT_BONUS_RATE # Team fatigue penalty base_rate *= (1.0 - 0.4 * team_efficiency) # efficiency here = fatigue # Stochastic variation ±20% rate = base_rate * rng.uniform(0.8, 1.2) # Cannot exceed alive count count = min(int(math.ceil(rate)), self.survivor_pool.alive_count) return max(0, count) def apply_secondary_collapse_check( self, aftershock_magnitude: float, rng: random.Random, ) -> bool: """ Check if a secondary collapse occurs this step. Returns True if collapsed (catastrophic event). """ if self.status != SiteStatus.ACTIVE: return False # Aftershock increases risk triggered_risk = self.secondary_collapse_risk * (1 + aftershock_magnitude * 2.0) triggered_risk = min(0.95, triggered_risk) if rng.random() < triggered_risk: self.status = SiteStatus.COLLAPSED self.secondary_collapsed = True # All unrescued survivors are killed for s in self.survivor_pool.survivors: if not s.rescued: s.alive = False return True return False def check_if_cleared(self): """Mark site as cleared if all accessible survivors are rescued.""" if self.survivor_pool.alive_count == 0 and self.status == SiteStatus.ACTIVE: self.status = SiteStatus.CLEARED self.rescue_progress = 1.0 # --------------------------------------------------------------------------- # Collapse Engine # --------------------------------------------------------------------------- class CollapseEngine: """ Manages the full collection of collapse sites in one episode. Handles aftershock generation, secondary collapses, and site progression. """ def __init__(self, sites: List[Collapsesite], rng: random.Random): self.sites = {s.site_id: s for s in sites} self.rng = rng self.aftershock_sequence: List[float] = [] # Pre-generated magnitudes self.current_step = 0 self.elapsed_hours = 0.0 self.step_hours = 0.5 # Each step = 30 minutes self._generate_aftershock_sequence(100) def _generate_aftershock_sequence(self, length: int): """ Pre-generate aftershock magnitudes using Gutenberg-Richter law. Most aftershocks are small; rare ones are significant. """ for _ in range(length): # Omori-Utsu decay: aftershocks become less frequent/intense over time step_factor = max(0.1, 1.0 - self.current_step * 0.008) if self.rng.random() < 0.15 * step_factor: # Aftershock occurs — magnitude from GR distribution magnitude = self.rng.expovariate(1.5) * step_factor magnitude = min(magnitude, 1.0) else: magnitude = 0.0 self.aftershock_sequence.append(magnitude) def step(self) -> Tuple[List[EventNotification], float]: """ Advance all sites by one timestep. Returns (events_generated, aftershock_magnitude_this_step). """ self.elapsed_hours += self.step_hours self.current_step += 1 events: List[EventNotification] = [] # Get aftershock for this step if self.current_step < len(self.aftershock_sequence): aftershock = self.aftershock_sequence[self.current_step] else: aftershock = 0.0 if aftershock > 0.3: events.append(EventNotification( event_type="aftershock", site_id=None, description=f"Aftershock detected (magnitude {aftershock:.2f} relative). Secondary collapse risk elevated.", severity=aftershock, )) # Process each active site for site in self.sites.values(): if site.status != SiteStatus.ACTIVE: continue # Update secondary collapse risk (creep up slowly over time) site.secondary_collapse_risk = min( 0.90, site.secondary_collapse_risk + 0.005 * self.elapsed_hours / 24.0, ) # Secondary collapse check collapsed = site.apply_secondary_collapse_check(aftershock, self.rng) if collapsed: events.append(EventNotification( event_type="secondary_collapse", site_id=site.site_id, description=f"SECONDARY COLLAPSE at {site.name}! All unrescued survivors are lost. Site is inaccessible.", severity=1.0, )) continue # Apply survival decay to all unrescued survivors deaths = site.survivor_pool.apply_time_decay(self.elapsed_hours, self.rng) if deaths > 0: events.append(EventNotification( event_type="survivor_death", site_id=site.site_id, description=f"{deaths} survivor(s) at {site.name} did not survive the wait.", severity=min(1.0, deaths * 0.2), )) # Random radio contact from survivors (partial information) if self.rng.random() < 0.08 and site.survivor_pool.alive_count > 0: events.append(EventNotification( event_type="survivor_radio_contact", site_id=site.site_id, description=f"Radio contact from {site.name}: {site.survivor_pool.alive_count} survivors confirmed alive. Critical: {site.survivor_pool.critical_count}.", severity=0.2, )) site.check_if_cleared() return events, aftershock def get_site(self, site_id: int) -> Optional[Collapsesite]: return self.sites.get(site_id) def all_sites_observation(self, elapsed_hours: float) -> List[SiteObservation]: return [s.to_observation(elapsed_hours) for s in self.sites.values()] @property def total_alive(self) -> int: return sum(s.survivor_pool.alive_count for s in self.sites.values()) @property def total_rescued(self) -> int: return sum(s.survivor_pool.rescued_count for s in self.sites.values()) @property def total_lost(self) -> int: return sum( sum(1 for sv in s.survivor_pool.survivors if not sv.alive and not sv.rescued) for s in self.sites.values() ) @property def total_initial_survivors(self) -> int: return sum(len(s.survivor_pool.survivors) for s in self.sites.values())