# Copyright (c) Ajay Bandiwaddar — OpenEnv Hackathon Round 1 """ Queue Engine -- Core RL State Machine for Queue Doctor. Every serve_patient() or wait() call advances the simulation by 1 step. Resource errors (no ICU bed, insufficient doctors) do NOT advance the step. Key design decisions: 1. Queue sorted by REPORTED severity -- misreporting genuinely deceives the agent (realistic: triage nurses rely on self-report initially). 2. wait() penalty only fires for SERVABLE patients -- patients that cannot be served due to resource constraints (ICU full, insufficient doctors) do not contribute to the wait penalty. This prevents double-counting: unservable patients already penalize via the hard grader's missed_emergency counter at finalize_episode(). """ import random from typing import List, Dict, Tuple from .models import Patient, SEVERITY_NAMES class QueueEngine: """Deterministic (or seeded-stochastic) hospital queue simulator.""" def __init__(self, task_config: dict, seed: int = None): self.task_config = task_config self.max_steps = task_config["max_steps"] self.num_doctors = task_config["num_doctors"] self.icu_capacity = task_config.get("icu_beds", 0) self.seed = seed self.step = 0 self.queue: List[Patient] = [] self.served: List[Dict] = [] self.available_doctors = self.num_doctors self.available_icu = self.icu_capacity self.missed_emergencies = 0 self.cumulative_reward = 0.0 self.step_rewards: List[float] = [] self.events_log: List[str] = [] rng = random.Random(seed) if seed is not None else None self._arrival_schedule: Dict[int, List[Patient]] = {} for a in task_config["arrivals"]: step_offset = 0 sev_offset = 0 if rng is not None: if rng.random() < 0.10: step_offset = rng.choice([-1, 1]) if rng.random() < 0.15: sev_offset = rng.choice([-1, 1]) if a["severity"] != 1 and a["severity"] + sev_offset < 1: sev_offset = 0 if a["severity"] == 1 and sev_offset == 1: sev_offset = 0 arrival_step = max(0, a["step"] + step_offset) true_sev = max(1, min(5, a["severity"] + sev_offset)) reported_sev = a.get("reported_severity", a["severity"]) if reported_sev == a["severity"] and sev_offset != 0: reported_sev = max(1, min(5, reported_sev + sev_offset)) if arrival_step not in self._arrival_schedule: self._arrival_schedule[arrival_step] = [] self._arrival_schedule[arrival_step].append(Patient( patient_id=a["patient_id"], severity=true_sev, reported_severity=reported_sev, arrival_step=arrival_step, deterioration_countdown=a.get("deterioration_countdown", -1), requires_icu=a.get("requires_icu", False), requires_specialist=a.get("requires_specialist", False), )) self._process_arrivals(0) def _can_serve(self, p: Patient) -> bool: """Check if patient can be served given current resources.""" doctors_needed = 2 if p.requires_specialist else 1 return ( self.available_doctors >= doctors_needed and (not p.requires_icu or self.available_icu >= 1) ) def get_state(self) -> dict: # Sort by REPORTED severity -- misreporting genuinely deceives agent sorted_queue = sorted( self.queue, key=lambda p: (p.reported_severity, -p.wait_time) ) state = { "step": self.step, "max_steps": self.max_steps, "steps_remaining": self.max_steps - self.step, "queue": [self._patient_dict(p) for p in sorted_queue], "queue_length": len(self.queue), "available_doctors": self.available_doctors, "patients_served": len(self.served), "missed_emergencies": self.missed_emergencies, "cumulative_reward": round(self.cumulative_reward, 4), "done": self.step >= self.max_steps, } if self.icu_capacity > 0: state["available_icu_beds"] = self.available_icu state["total_icu_beds"] = self.icu_capacity state["triage_advisory"] = self._build_advisory(sorted_queue) return state def _patient_dict(self, p: Patient) -> dict: d = p.to_dict() can_serve = self._can_serve(p) d["can_serve_now"] = can_serve if not can_serve: doctors_needed = 2 if p.requires_specialist else 1 if p.requires_icu and self.available_icu == 0: d["cannot_serve_reason"] = "No ICU beds available" elif self.available_doctors < doctors_needed: d["cannot_serve_reason"] = ( f"Needs {doctors_needed} doctors, " f"only {self.available_doctors} available" ) return d def serve_patient(self, patient_id: str) -> Tuple[float, dict, List[str]]: """ Serve a patient. ADVANCES TIME BY 1 STEP only if action is valid. Resource errors do NOT advance time. """ patient = next((p for p in self.queue if p.patient_id == patient_id), None) if patient is None: self.step += 1 self._advance_step() self.step_rewards.append(-0.1) self.cumulative_reward -= 0.1 return -0.1, self.get_state(), [ f"Error: Patient {patient_id} not found in queue." ] doctors_needed = 2 if patient.requires_specialist else 1 if self.available_doctors < doctors_needed: return 0.0, self.get_state(), [ f"Cannot serve {patient_id}: needs {doctors_needed} doctors, " f"{self.available_doctors} available. Choose another patient." ] if patient.requires_icu and self.available_icu < 1: return 0.0, self.get_state(), [ f"Cannot admit {patient_id}: no ICU beds available. " f"Choose a non-ICU patient." ] self.queue.remove(patient) self.available_doctors -= doctors_needed if patient.requires_icu: self.available_icu -= 1 reward = self._compute_reward(patient) self.cumulative_reward += reward resource_note = "" if patient.requires_specialist: resource_note = " [2 doctors used]" if patient.requires_icu: resource_note += ( f" [ICU bed consumed, " f"{self.available_icu}/{self.icu_capacity} remaining]" ) self.served.append({ "patient_id": patient.patient_id, "true_severity": patient.severity, "reported_severity": patient.reported_severity, "wait_time": patient.wait_time, "reward": round(reward, 4), "served_step": self.step, }) events = [ f"Step {self.step}: Served {patient_id} " f"(reported={patient.reported_severity}, true={patient.severity}, " f"waited {patient.wait_time} steps) -> reward {reward:.3f}{resource_note}" ] self.step += 1 self.available_doctors = self.num_doctors events.extend(self._advance_step()) self.step_rewards.append(reward) return reward, self.get_state(), events def wait(self) -> Tuple[float, dict, List[str]]: """ Skip step. Penalized only for SERVABLE patients waiting. Key fix: patients that cannot be served due to resource constraints (ICU full, insufficient doctors) do NOT contribute to the wait penalty. They already penalize via missed_emergencies at finalize_episode(). Counting them twice would make waiting with ICU-blocked patients catastrophically punishing even when the agent has no alternative. Penalty tiers (based on highest reported severity among SERVABLE patients): Severity-1 servable in queue: -0.30 per patient Severity 2-3 servable: -0.10 Any servable patient: -0.05 No servable patients: 0.00 (agent has no valid action) """ penalty = 0.0 events = [] # Only count SERVABLE patients for the penalty servable_patients = [p for p in self.queue if self._can_serve(p)] if servable_patients: emergencies = sum( 1 for p in servable_patients if p.reported_severity == 1 ) urgent = sum( 1 for p in servable_patients if 2 <= p.reported_severity <= 3 ) if emergencies > 0: penalty = -0.3 * emergencies events.append( f"Waited with {emergencies} IMMEDIATE servable patient(s)! " f"Penalty: {penalty:.2f}" ) elif urgent > 0: penalty = -0.1 events.append( f"Waited with {urgent} urgent servable patient(s). Penalty: -0.10" ) else: penalty = -0.05 events.append("Waited with non-urgent servable patients. Penalty: -0.05") elif self.queue: # Patients present but ALL resource-blocked -- no penalty # (agent physically cannot serve anyone; missed_emergencies tracks this) blocked_count = len(self.queue) events.append( f"Step {self.step}: {blocked_count} patient(s) present but all " f"resource-blocked -- no penalty. Missed emergencies tracked at finalize." ) else: events.append(f"Step {self.step}: Queue empty -- no penalty.") self.cumulative_reward += penalty self.step += 1 events.extend(self._advance_step()) self.step_rewards.append(penalty) return penalty, self.get_state(), events def _advance_step(self) -> List[str]: events = [] new_arrivals = self._arrival_schedule.get(self.step, []) if new_arrivals: for p in new_arrivals: self.queue.append(p) names = ", ".join( f"{p.patient_id}(reported={p.reported_severity})" for p in new_arrivals ) events.append(f"Step {self.step}: New arrivals -- {names}") if len(new_arrivals) >= 4: events.append( f"MASS CASUALTY EVENT: " f"{len(new_arrivals)} patients arrived simultaneously!" ) for patient in list(self.queue): if patient.deterioration_countdown > 0: patient.deterioration_countdown -= 1 if patient.deterioration_countdown == 0: old_sev = patient.severity patient.severity = max(1, patient.severity - 1) patient.reported_severity = patient.severity patient.condition = ( "critical" if patient.severity == 1 else "at_risk" ) patient.deterioration_countdown = -1 events.append( f"DETERIORATION: {patient.patient_id} worsened " f"severity {old_sev}->{patient.severity}" ) for patient in self.queue: patient.wait_time += 1 if patient.severity == 1: self.missed_emergencies += 1 self.available_doctors = self.num_doctors return events def _process_arrivals(self, step: int) -> None: for p in self._arrival_schedule.get(step, []): self.queue.append(p) def _compute_reward(self, patient: Patient) -> float: """ Manchester Triage System reward using TRUE severity. 1 step = approx 10 minutes of real time. """ w = patient.wait_time s = patient.severity if s == 1: if w == 0: return 1.00 if w == 1: return 0.60 if w == 2: return 0.20 return 0.00 elif s == 2: return max(0.0, 1.00 - w * 0.125) elif s == 3: return max(0.0, 0.85 - w * 0.071) elif s == 4: return max(0.0, 0.60 - w * 0.040) else: return max(0.0, 0.40 - w * 0.020) def _build_advisory(self, sorted_queue: list) -> str: """Advisory based on REPORTED severity. Excluded from LLM prompt.""" if not sorted_queue: return "Queue is empty." parts = [] emergencies = [p for p in self.queue if p.reported_severity == 1] deteriorating = [ p for p in self.queue if p.deterioration_countdown > 0 and p.deterioration_countdown <= 2 ] if emergencies: ids = ", ".join(p.patient_id for p in emergencies) parts.append(f"IMMEDIATE (reported): {ids}") if deteriorating: ids = ", ".join( f"{p.patient_id}(in {p.deterioration_countdown} steps)" for p in deteriorating ) parts.append(f"DETERIORATING SOON: {ids}") if self.icu_capacity > 0 and self.available_icu == 0: icu_waiting = [p for p in self.queue if p.requires_icu] if icu_waiting: parts.append( f"ICU FULL: {len(icu_waiting)} patient(s) cannot be admitted." ) if not parts: top = sorted_queue[0] parts.append( f"Highest priority: {top.patient_id} " f"(reported sev={top.reported_severity}, " f"waited {top.wait_time} steps)" ) return " | ".join(parts)