Spaces:
Sleeping
Sleeping
| """ | |
| collapse_engine.py — Manages collapse site states and secondary event simulation. | |
| Models: | |
| - Site state progression (active → cleared / collapsed / evacuated) | |
| - Secondary collapse risk (aftershocks increase it over time) | |
| - Communication blackouts (partial observation simulation) | |
| - Rescue progress computation per team/debris type | |
| - Event generation (aftershocks, trapped radio calls, hazmat alerts) | |
| Based on: | |
| - FEMA USAR Field Operations Guide (2006) | |
| - INSARAG Guidelines Vol II — Operations (2020) | |
| - Elnashai et al. (2004): Seismic risk assessment models | |
| """ | |
| from __future__ import annotations | |
| import math | |
| import random | |
| from dataclasses import dataclass, field | |
| from typing import Dict, List, Optional, Tuple | |
| from ..models import DebrisType, EventNotification, SiteStatus, SiteObservation | |
| from .survivor_model import SiteSurvivorPool | |
| # --------------------------------------------------------------------------- | |
| # Site clearance rates (rescues per step per team) | |
| # These reflect real USAR operational rates from INSARAG guidelines | |
| # --------------------------------------------------------------------------- | |
| BASE_RESCUE_RATE: Dict[DebrisType, float] = { | |
| DebrisType.LIGHT: 3.5, # ~3–4 survivors per step (30 min) | |
| DebrisType.MODERATE: 1.8, # ~1–2 survivors per step | |
| DebrisType.HEAVY: 0.6, # <1 per step without heavy equipment | |
| } | |
| HEAVY_EQUIPMENT_MULTIPLIER = 2.8 # Heavy machinery boosts heavy debris rescue rate | |
| AIR_SUPPORT_BONUS_RATE = 2.0 # Helicopter clears debris / drops rescuers | |
| # --------------------------------------------------------------------------- | |
| # Collapse site | |
| # --------------------------------------------------------------------------- | |
| class Collapsesite: | |
| """Full state of a single collapse site.""" | |
| site_id: int | |
| name: str | |
| debris_type: DebrisType | |
| status: SiteStatus | |
| survivor_pool: SiteSurvivorPool | |
| secondary_collapse_risk: float | |
| distance_from_staging: float # in steps (travel time) | |
| requires_air_support: bool # heavy debris with no ground access | |
| assigned_team_id: Optional[int] = None | |
| rescue_progress: float = 0.0 # fraction 0–1 | |
| heavy_equipment_used: bool = False | |
| air_support_applied: bool = False | |
| secondary_collapsed: bool = False | |
| def to_observation(self, elapsed_hours: float) -> SiteObservation: | |
| pool = self.survivor_pool | |
| return SiteObservation( | |
| site_id=self.site_id, | |
| name=self.name, | |
| status=self.status, | |
| debris_type=self.debris_type, | |
| trapped_survivors=pool.alive_count, | |
| critical_count=pool.critical_count, | |
| survival_probability=round(pool.average_survival_probability(elapsed_hours), 3), | |
| decay_rate=self._compute_decay_rate(elapsed_hours), | |
| rescue_progress=round(self.rescue_progress, 3), | |
| assigned_team_id=self.assigned_team_id, | |
| requires_air_support=self.requires_air_support, | |
| secondary_collapse_risk=round(self.secondary_collapse_risk, 3), | |
| distance_from_staging=self.distance_from_staging, | |
| survivors_rescued=pool.rescued_count, | |
| ) | |
| def _compute_decay_rate(self, elapsed_hours: float) -> float: | |
| """Approximate per-step survival probability drop.""" | |
| # Phase-dependent decay rate | |
| if elapsed_hours < 6: | |
| base = 0.015 | |
| elif elapsed_hours < 24: | |
| base = 0.08 | |
| elif elapsed_hours < 72: | |
| base = 0.03 | |
| else: | |
| base = 0.005 | |
| # Critical survivors decay faster | |
| pool = self.survivor_pool | |
| if pool.alive_count > 0: | |
| crit_frac = pool.critical_count / pool.alive_count | |
| else: | |
| crit_frac = 0.0 | |
| return round(base * (1 + 0.5 * crit_frac), 4) | |
| def compute_rescue_batch_size( | |
| self, | |
| team_efficiency: float, | |
| team_specialization: str, | |
| has_heavy_equipment: bool, | |
| rng: random.Random, | |
| ) -> int: | |
| """ | |
| Calculate how many survivors a team rescues this step. | |
| Incorporates debris type, team fatigue, specialization, and equipment. | |
| """ | |
| if self.status != SiteStatus.ACTIVE: | |
| return 0 | |
| if self.survivor_pool.alive_count == 0: | |
| return 0 | |
| base_rate = BASE_RESCUE_RATE[self.debris_type] | |
| # Specialization bonus | |
| if team_specialization == "heavy_rescue" and self.debris_type == DebrisType.HEAVY: | |
| base_rate *= 1.5 | |
| elif team_specialization == "medical" and self.survivor_pool.critical_count > 0: | |
| base_rate *= 1.3 | |
| elif team_specialization == "swift_water": | |
| base_rate *= 0.85 # slight penalty on land sites | |
| # Equipment bonus | |
| if has_heavy_equipment and self.debris_type == DebrisType.HEAVY: | |
| base_rate *= HEAVY_EQUIPMENT_MULTIPLIER | |
| # Air support applied (one-time boost) | |
| if self.air_support_applied and not self.heavy_equipment_used: | |
| base_rate *= AIR_SUPPORT_BONUS_RATE | |
| # Team fatigue penalty | |
| base_rate *= (1.0 - 0.4 * team_efficiency) # efficiency here = fatigue | |
| # Stochastic variation ±20% | |
| rate = base_rate * rng.uniform(0.8, 1.2) | |
| # Cannot exceed alive count | |
| count = min(int(math.ceil(rate)), self.survivor_pool.alive_count) | |
| return max(0, count) | |
| def apply_secondary_collapse_check( | |
| self, | |
| aftershock_magnitude: float, | |
| rng: random.Random, | |
| ) -> bool: | |
| """ | |
| Check if a secondary collapse occurs this step. | |
| Returns True if collapsed (catastrophic event). | |
| """ | |
| if self.status != SiteStatus.ACTIVE: | |
| return False | |
| # Aftershock increases risk | |
| triggered_risk = self.secondary_collapse_risk * (1 + aftershock_magnitude * 2.0) | |
| triggered_risk = min(0.95, triggered_risk) | |
| if rng.random() < triggered_risk: | |
| self.status = SiteStatus.COLLAPSED | |
| self.secondary_collapsed = True | |
| # All unrescued survivors are killed | |
| for s in self.survivor_pool.survivors: | |
| if not s.rescued: | |
| s.alive = False | |
| return True | |
| return False | |
| def check_if_cleared(self): | |
| """Mark site as cleared if all accessible survivors are rescued.""" | |
| if self.survivor_pool.alive_count == 0 and self.status == SiteStatus.ACTIVE: | |
| self.status = SiteStatus.CLEARED | |
| self.rescue_progress = 1.0 | |
| # --------------------------------------------------------------------------- | |
| # Collapse Engine | |
| # --------------------------------------------------------------------------- | |
| class CollapseEngine: | |
| """ | |
| Manages the full collection of collapse sites in one episode. | |
| Handles aftershock generation, secondary collapses, and site progression. | |
| """ | |
| def __init__(self, sites: List[Collapsesite], rng: random.Random): | |
| self.sites = {s.site_id: s for s in sites} | |
| self.rng = rng | |
| self.aftershock_sequence: List[float] = [] # Pre-generated magnitudes | |
| self.current_step = 0 | |
| self.elapsed_hours = 0.0 | |
| self.step_hours = 0.5 # Each step = 30 minutes | |
| self._generate_aftershock_sequence(100) | |
| def _generate_aftershock_sequence(self, length: int): | |
| """ | |
| Pre-generate aftershock magnitudes using Gutenberg-Richter law. | |
| Most aftershocks are small; rare ones are significant. | |
| """ | |
| for _ in range(length): | |
| # Omori-Utsu decay: aftershocks become less frequent/intense over time | |
| step_factor = max(0.1, 1.0 - self.current_step * 0.008) | |
| if self.rng.random() < 0.15 * step_factor: | |
| # Aftershock occurs — magnitude from GR distribution | |
| magnitude = self.rng.expovariate(1.5) * step_factor | |
| magnitude = min(magnitude, 1.0) | |
| else: | |
| magnitude = 0.0 | |
| self.aftershock_sequence.append(magnitude) | |
| def step(self) -> Tuple[List[EventNotification], float]: | |
| """ | |
| Advance all sites by one timestep. | |
| Returns (events_generated, aftershock_magnitude_this_step). | |
| """ | |
| self.elapsed_hours += self.step_hours | |
| self.current_step += 1 | |
| events: List[EventNotification] = [] | |
| # Get aftershock for this step | |
| if self.current_step < len(self.aftershock_sequence): | |
| aftershock = self.aftershock_sequence[self.current_step] | |
| else: | |
| aftershock = 0.0 | |
| if aftershock > 0.3: | |
| events.append(EventNotification( | |
| event_type="aftershock", | |
| site_id=None, | |
| description=f"Aftershock detected (magnitude {aftershock:.2f} relative). Secondary collapse risk elevated.", | |
| severity=aftershock, | |
| )) | |
| # Process each active site | |
| for site in self.sites.values(): | |
| if site.status != SiteStatus.ACTIVE: | |
| continue | |
| # Update secondary collapse risk (creep up slowly over time) | |
| site.secondary_collapse_risk = min( | |
| 0.90, | |
| site.secondary_collapse_risk + 0.005 * self.elapsed_hours / 24.0, | |
| ) | |
| # Secondary collapse check | |
| collapsed = site.apply_secondary_collapse_check(aftershock, self.rng) | |
| if collapsed: | |
| events.append(EventNotification( | |
| event_type="secondary_collapse", | |
| site_id=site.site_id, | |
| description=f"SECONDARY COLLAPSE at {site.name}! All unrescued survivors are lost. Site is inaccessible.", | |
| severity=1.0, | |
| )) | |
| continue | |
| # Apply survival decay to all unrescued survivors | |
| deaths = site.survivor_pool.apply_time_decay(self.elapsed_hours, self.rng) | |
| if deaths > 0: | |
| events.append(EventNotification( | |
| event_type="survivor_death", | |
| site_id=site.site_id, | |
| description=f"{deaths} survivor(s) at {site.name} did not survive the wait.", | |
| severity=min(1.0, deaths * 0.2), | |
| )) | |
| # Random radio contact from survivors (partial information) | |
| if self.rng.random() < 0.08 and site.survivor_pool.alive_count > 0: | |
| events.append(EventNotification( | |
| event_type="survivor_radio_contact", | |
| site_id=site.site_id, | |
| description=f"Radio contact from {site.name}: {site.survivor_pool.alive_count} survivors confirmed alive. Critical: {site.survivor_pool.critical_count}.", | |
| severity=0.2, | |
| )) | |
| site.check_if_cleared() | |
| return events, aftershock | |
| def get_site(self, site_id: int) -> Optional[Collapsesite]: | |
| return self.sites.get(site_id) | |
| def all_sites_observation(self, elapsed_hours: float) -> List[SiteObservation]: | |
| return [s.to_observation(elapsed_hours) for s in self.sites.values()] | |
| def total_alive(self) -> int: | |
| return sum(s.survivor_pool.alive_count for s in self.sites.values()) | |
| def total_rescued(self) -> int: | |
| return sum(s.survivor_pool.rescued_count for s in self.sites.values()) | |
| def total_lost(self) -> int: | |
| return sum( | |
| sum(1 for sv in s.survivor_pool.survivors if not sv.alive and not sv.rescued) | |
| for s in self.sites.values() | |
| ) | |
| def total_initial_survivors(self) -> int: | |
| return sum(len(s.survivor_pool.survivors) for s in self.sites.values()) | |