Spaces:
Sleeping
Sleeping
File size: 11,943 Bytes
ce614ef | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 | """
collapse_engine.py — Manages collapse site states and secondary event simulation.
Models:
- Site state progression (active → cleared / collapsed / evacuated)
- Secondary collapse risk (aftershocks increase it over time)
- Communication blackouts (partial observation simulation)
- Rescue progress computation per team/debris type
- Event generation (aftershocks, trapped radio calls, hazmat alerts)
Based on:
- FEMA USAR Field Operations Guide (2006)
- INSARAG Guidelines Vol II — Operations (2020)
- Elnashai et al. (2004): Seismic risk assessment models
"""
from __future__ import annotations
import math
import random
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from ..models import DebrisType, EventNotification, SiteStatus, SiteObservation
from .survivor_model import SiteSurvivorPool
# ---------------------------------------------------------------------------
# Site clearance rates (rescues per step per team)
# These reflect real USAR operational rates from INSARAG guidelines
# ---------------------------------------------------------------------------
BASE_RESCUE_RATE: Dict[DebrisType, float] = {
DebrisType.LIGHT: 3.5, # ~3–4 survivors per step (30 min)
DebrisType.MODERATE: 1.8, # ~1–2 survivors per step
DebrisType.HEAVY: 0.6, # <1 per step without heavy equipment
}
HEAVY_EQUIPMENT_MULTIPLIER = 2.8 # Heavy machinery boosts heavy debris rescue rate
AIR_SUPPORT_BONUS_RATE = 2.0 # Helicopter clears debris / drops rescuers
# ---------------------------------------------------------------------------
# Collapse site
# ---------------------------------------------------------------------------
@dataclass
class Collapsesite:
"""Full state of a single collapse site."""
site_id: int
name: str
debris_type: DebrisType
status: SiteStatus
survivor_pool: SiteSurvivorPool
secondary_collapse_risk: float
distance_from_staging: float # in steps (travel time)
requires_air_support: bool # heavy debris with no ground access
assigned_team_id: Optional[int] = None
rescue_progress: float = 0.0 # fraction 0–1
heavy_equipment_used: bool = False
air_support_applied: bool = False
secondary_collapsed: bool = False
def to_observation(self, elapsed_hours: float) -> SiteObservation:
pool = self.survivor_pool
return SiteObservation(
site_id=self.site_id,
name=self.name,
status=self.status,
debris_type=self.debris_type,
trapped_survivors=pool.alive_count,
critical_count=pool.critical_count,
survival_probability=round(pool.average_survival_probability(elapsed_hours), 3),
decay_rate=self._compute_decay_rate(elapsed_hours),
rescue_progress=round(self.rescue_progress, 3),
assigned_team_id=self.assigned_team_id,
requires_air_support=self.requires_air_support,
secondary_collapse_risk=round(self.secondary_collapse_risk, 3),
distance_from_staging=self.distance_from_staging,
survivors_rescued=pool.rescued_count,
)
def _compute_decay_rate(self, elapsed_hours: float) -> float:
"""Approximate per-step survival probability drop."""
# Phase-dependent decay rate
if elapsed_hours < 6:
base = 0.015
elif elapsed_hours < 24:
base = 0.08
elif elapsed_hours < 72:
base = 0.03
else:
base = 0.005
# Critical survivors decay faster
pool = self.survivor_pool
if pool.alive_count > 0:
crit_frac = pool.critical_count / pool.alive_count
else:
crit_frac = 0.0
return round(base * (1 + 0.5 * crit_frac), 4)
def compute_rescue_batch_size(
self,
team_efficiency: float,
team_specialization: str,
has_heavy_equipment: bool,
rng: random.Random,
) -> int:
"""
Calculate how many survivors a team rescues this step.
Incorporates debris type, team fatigue, specialization, and equipment.
"""
if self.status != SiteStatus.ACTIVE:
return 0
if self.survivor_pool.alive_count == 0:
return 0
base_rate = BASE_RESCUE_RATE[self.debris_type]
# Specialization bonus
if team_specialization == "heavy_rescue" and self.debris_type == DebrisType.HEAVY:
base_rate *= 1.5
elif team_specialization == "medical" and self.survivor_pool.critical_count > 0:
base_rate *= 1.3
elif team_specialization == "swift_water":
base_rate *= 0.85 # slight penalty on land sites
# Equipment bonus
if has_heavy_equipment and self.debris_type == DebrisType.HEAVY:
base_rate *= HEAVY_EQUIPMENT_MULTIPLIER
# Air support applied (one-time boost)
if self.air_support_applied and not self.heavy_equipment_used:
base_rate *= AIR_SUPPORT_BONUS_RATE
# Team fatigue penalty
base_rate *= (1.0 - 0.4 * team_efficiency) # efficiency here = fatigue
# Stochastic variation ±20%
rate = base_rate * rng.uniform(0.8, 1.2)
# Cannot exceed alive count
count = min(int(math.ceil(rate)), self.survivor_pool.alive_count)
return max(0, count)
def apply_secondary_collapse_check(
self,
aftershock_magnitude: float,
rng: random.Random,
) -> bool:
"""
Check if a secondary collapse occurs this step.
Returns True if collapsed (catastrophic event).
"""
if self.status != SiteStatus.ACTIVE:
return False
# Aftershock increases risk
triggered_risk = self.secondary_collapse_risk * (1 + aftershock_magnitude * 2.0)
triggered_risk = min(0.95, triggered_risk)
if rng.random() < triggered_risk:
self.status = SiteStatus.COLLAPSED
self.secondary_collapsed = True
# All unrescued survivors are killed
for s in self.survivor_pool.survivors:
if not s.rescued:
s.alive = False
return True
return False
def check_if_cleared(self):
"""Mark site as cleared if all accessible survivors are rescued."""
if self.survivor_pool.alive_count == 0 and self.status == SiteStatus.ACTIVE:
self.status = SiteStatus.CLEARED
self.rescue_progress = 1.0
# ---------------------------------------------------------------------------
# Collapse Engine
# ---------------------------------------------------------------------------
class CollapseEngine:
"""
Manages the full collection of collapse sites in one episode.
Handles aftershock generation, secondary collapses, and site progression.
"""
def __init__(self, sites: List[Collapsesite], rng: random.Random):
self.sites = {s.site_id: s for s in sites}
self.rng = rng
self.aftershock_sequence: List[float] = [] # Pre-generated magnitudes
self.current_step = 0
self.elapsed_hours = 0.0
self.step_hours = 0.5 # Each step = 30 minutes
self._generate_aftershock_sequence(100)
def _generate_aftershock_sequence(self, length: int):
"""
Pre-generate aftershock magnitudes using Gutenberg-Richter law.
Most aftershocks are small; rare ones are significant.
"""
for _ in range(length):
# Omori-Utsu decay: aftershocks become less frequent/intense over time
step_factor = max(0.1, 1.0 - self.current_step * 0.008)
if self.rng.random() < 0.15 * step_factor:
# Aftershock occurs — magnitude from GR distribution
magnitude = self.rng.expovariate(1.5) * step_factor
magnitude = min(magnitude, 1.0)
else:
magnitude = 0.0
self.aftershock_sequence.append(magnitude)
def step(self) -> Tuple[List[EventNotification], float]:
"""
Advance all sites by one timestep.
Returns (events_generated, aftershock_magnitude_this_step).
"""
self.elapsed_hours += self.step_hours
self.current_step += 1
events: List[EventNotification] = []
# Get aftershock for this step
if self.current_step < len(self.aftershock_sequence):
aftershock = self.aftershock_sequence[self.current_step]
else:
aftershock = 0.0
if aftershock > 0.3:
events.append(EventNotification(
event_type="aftershock",
site_id=None,
description=f"Aftershock detected (magnitude {aftershock:.2f} relative). Secondary collapse risk elevated.",
severity=aftershock,
))
# Process each active site
for site in self.sites.values():
if site.status != SiteStatus.ACTIVE:
continue
# Update secondary collapse risk (creep up slowly over time)
site.secondary_collapse_risk = min(
0.90,
site.secondary_collapse_risk + 0.005 * self.elapsed_hours / 24.0,
)
# Secondary collapse check
collapsed = site.apply_secondary_collapse_check(aftershock, self.rng)
if collapsed:
events.append(EventNotification(
event_type="secondary_collapse",
site_id=site.site_id,
description=f"SECONDARY COLLAPSE at {site.name}! All unrescued survivors are lost. Site is inaccessible.",
severity=1.0,
))
continue
# Apply survival decay to all unrescued survivors
deaths = site.survivor_pool.apply_time_decay(self.elapsed_hours, self.rng)
if deaths > 0:
events.append(EventNotification(
event_type="survivor_death",
site_id=site.site_id,
description=f"{deaths} survivor(s) at {site.name} did not survive the wait.",
severity=min(1.0, deaths * 0.2),
))
# Random radio contact from survivors (partial information)
if self.rng.random() < 0.08 and site.survivor_pool.alive_count > 0:
events.append(EventNotification(
event_type="survivor_radio_contact",
site_id=site.site_id,
description=f"Radio contact from {site.name}: {site.survivor_pool.alive_count} survivors confirmed alive. Critical: {site.survivor_pool.critical_count}.",
severity=0.2,
))
site.check_if_cleared()
return events, aftershock
def get_site(self, site_id: int) -> Optional[Collapsesite]:
return self.sites.get(site_id)
def all_sites_observation(self, elapsed_hours: float) -> List[SiteObservation]:
return [s.to_observation(elapsed_hours) for s in self.sites.values()]
@property
def total_alive(self) -> int:
return sum(s.survivor_pool.alive_count for s in self.sites.values())
@property
def total_rescued(self) -> int:
return sum(s.survivor_pool.rescued_count for s in self.sites.values())
@property
def total_lost(self) -> int:
return sum(
sum(1 for sv in s.survivor_pool.survivors if not sv.alive and not sv.rescued)
for s in self.sites.values()
)
@property
def total_initial_survivors(self) -> int:
return sum(len(s.survivor_pool.survivors) for s in self.sites.values())
|