usar / server /simulation /team_model.py
PranovRaghavendhra's picture
USAR OpenEnv v1.0 - Urban Search and Rescue
ce614ef
"""
team_model.py — Rescue team state management for USAR environment.
Models:
- Team fatigue accumulation and recovery
- Specialization-based efficiency multipliers
- Travel time between staging and sites
- Air support asset tracking
Based on FEMA USAR Team Classification Guidelines and
INSARAG Heavy/Medium Team Capability Assessment criteria.
"""
from __future__ import annotations
import random
from dataclasses import dataclass
from typing import Optional
from ..models import TeamObservation, TeamStatus
# ---------------------------------------------------------------------------
# Team specializations
# ---------------------------------------------------------------------------
SPECIALIZATIONS = {
"heavy_rescue": {
"description": "Structural collapse, concrete cutting, void space search",
"heavy_debris_bonus": 1.5,
"medical_bonus": 1.0,
"fatigue_rate": 0.08, # fatigues faster — physically demanding
},
"medical": {
"description": "Triage, field surgery, critical survivor stabilization",
"heavy_debris_bonus": 0.85,
"medical_bonus": 1.4,
"fatigue_rate": 0.05,
},
"swift_water": {
"description": "Flood rescue, rope access, confined space",
"heavy_debris_bonus": 0.90,
"medical_bonus": 1.1,
"fatigue_rate": 0.06,
},
}
# ---------------------------------------------------------------------------
# Rescue team
# ---------------------------------------------------------------------------
@dataclass
class RescueTeam:
"""A single USAR rescue team."""
team_id: int
name: str
specialization: str # "heavy_rescue" | "medical" | "swift_water"
fatigue_level: float = 0.0
status: TeamStatus = TeamStatus.IDLE
current_site_id: Optional[int] = None
steps_at_site: int = 0
steps_idle: int = 0
total_rescued: int = 0
@property
def spec_data(self) -> dict:
return SPECIALIZATIONS[self.specialization]
@property
def efficiency_multiplier(self) -> float:
"""Current performance — fatigue reduces efficiency."""
fatigue_penalty = self.fatigue_level * 0.6
return max(0.15, 1.0 - fatigue_penalty)
@property
def is_available(self) -> bool:
return self.status in (TeamStatus.IDLE, TeamStatus.FATIGUED)
def assign_to_site(self, site_id: int) -> bool:
"""
Assign team to a site.
Returns False if team is unavailable.
"""
if self.status == TeamStatus.FATIGUED:
# Can be assigned but operates at reduced efficiency
pass
elif self.status not in (TeamStatus.IDLE,):
return False
self.current_site_id = site_id
self.status = TeamStatus.DEPLOYED
self.steps_at_site = 0
self.steps_idle = 0
return True
def recall_to_staging(self):
"""Recall team to staging area for rest."""
self.current_site_id = None
self.status = TeamStatus.IDLE
self.steps_at_site = 0
def step_update(self, rng: random.Random):
"""Update team state for one timestep."""
if self.status == TeamStatus.DEPLOYED:
self.steps_at_site += 1
# Fatigue accumulates while deployed
fatigue_rate = self.spec_data["fatigue_rate"]
self.fatigue_level = min(1.0, self.fatigue_level + fatigue_rate)
# Exhausted teams become fatigued status
if self.fatigue_level >= 0.85:
self.status = TeamStatus.FATIGUED
elif self.status in (TeamStatus.IDLE, TeamStatus.FATIGUED):
self.steps_idle += 1
# Recovery: 4 steps of rest fully restores a team
recovery = 0.20
self.fatigue_level = max(0.0, self.fatigue_level - recovery)
if self.fatigue_level < 0.1:
self.status = TeamStatus.IDLE
elif self.status == TeamStatus.AIRBORNE:
# Helicopter deployment — returns next step
self.steps_at_site += 1
if self.steps_at_site >= 2:
self.status = TeamStatus.IDLE
self.steps_at_site = 0
def to_observation(self) -> TeamObservation:
return TeamObservation(
team_id=self.team_id,
name=self.name,
status=self.status,
current_site_id=self.current_site_id,
fatigue_level=round(self.fatigue_level, 3),
specialization=self.specialization,
steps_at_site=self.steps_at_site,
efficiency_multiplier=round(self.efficiency_multiplier, 3),
)
# ---------------------------------------------------------------------------
# Air support asset
# ---------------------------------------------------------------------------
@dataclass
class AirSupportAsset:
"""Single helicopter / air asset for USAR operations."""
available: bool = True
cooldown: int = 0 # steps until available again
total_uses: int = 0
def use(self, cooldown_steps: int = 3):
"""Deploy air support. Sets cooldown before next available."""
self.available = False
self.cooldown = cooldown_steps
self.total_uses += 1
def step_update(self):
"""Advance one timestep."""
if not self.available:
self.cooldown -= 1
if self.cooldown <= 0:
self.available = True
self.cooldown = 0