# Copyright (c) Ajay Bandiwaddar — OpenEnv Hackathon Round 1 """ Queue Doctor — OpenEnv MCPEnvironment. A genuine multi-step reinforcement learning environment for hospital emergency department triage. The agent makes sequential decisions each step — which patient to serve — and the environment state changes meaningfully in response (new arrivals, wait time increases, deterioration). This is a true Markov Decision Process: the agent's action at step N changes the state available at step N+1. A better policy produces measurably better outcomes across all three tasks. Stochasticity: start_task() accepts an optional seed parameter. When provided, small random perturbations are applied to patient attributes (severity ±1 with 15% probability, arrival step ±1 with 10% probability). This ensures each episode is distinct, prevents solution memorization, and produces non-zero score variance across runs (required by Phase 2). Episode workflow: list_tasks() → start_task(task_id, seed=) # seed optional → get_queue_state() → [serve_patient(patient_id) | wait()] × N steps → finalize_episode() """ import json from typing import Optional from uuid import uuid4 try: from openenv.core.env_server.mcp_environment import MCPEnvironment from openenv.core.env_server.types import Action, Observation, State except ImportError: from openenv.core.env_server.mcp_environment import MCPEnvironment from openenv.core.env_server.types import Action, Observation, State from fastmcp import FastMCP from .tasks import TASKS from .queue_engine import QueueEngine from .graders import GRADERS class QueueDoctorEnvironment(MCPEnvironment): """ Queue Doctor — Hospital Triage RL Environment. Three tasks of increasing difficulty: task_1_easy — Static queue, 1 doctor, 10 steps task_2_medium — Dynamic arrivals, 2 doctors, 20 steps task_3_hard — Mass casualty, deterioration, ICU, 3 doctors, 30 steps MCP tools: list_tasks() → task catalogue with metadata start_task(task_id, seed) → init episode (seed optional for stochasticity) get_queue_state() → observe current state (no time advance) serve_patient(patient_id) → treat patient, advance 1 step wait() → skip step (penalized), advance 1 step finalize_episode() → compute final normalized score get_current_state() → environment-level metadata """ def __init__(self): mcp = FastMCP("queue_doctor") @mcp.tool def list_tasks() -> str: """ List all available triage tasks with metadata. Returns task IDs, names, difficulty, resources, and descriptions. """ return json.dumps([ { "task_id": tid, "task_name": t["task_name"], "difficulty": t["difficulty"], "max_steps": t["max_steps"], "num_doctors": t["num_doctors"], "icu_beds": t.get("icu_beds", 0), "total_patients": len(t["arrivals"]), "description": t["description"], } for tid, t in TASKS.items() ], indent=2) @mcp.tool def start_task(task_id: str, seed: int = None) -> str: """ Initialize a task episode. Must be called before any actions. Args: task_id: One of 'task_1_easy', 'task_2_medium', 'task_3_hard' seed: Optional integer seed for episode randomization. When provided, small stochastic perturbations are applied to patient attributes (severity ±1 with 15% probability, arrival step ±1 with 10% probability). Use different seeds across runs to get score variance. Omit for the deterministic baseline episode. Returns task description, rules, initial queue state, and workflow. """ if task_id not in TASKS: return json.dumps({ "error": f"Unknown task_id '{task_id}'. " f"Valid: {list(TASKS.keys())}" }) self._active_task_id = task_id self._engine = QueueEngine(TASKS[task_id], seed=seed) self._state.step_count += 1 task = TASKS[task_id] initial_state = self._engine.get_state() return json.dumps({ "task_id": task_id, "task_name": task["task_name"], "difficulty": task["difficulty"], "description": task["description"], "max_steps": task["max_steps"], "num_doctors": task["num_doctors"], "icu_beds": task.get("icu_beds", 0), "seed": seed, "initial_queue": initial_state["queue"], "queue_length": initial_state["queue_length"], "triage_advisory": initial_state["triage_advisory"], "workflow": ( "1. Call get_queue_state() to observe current patients.\n" "2. Call serve_patient(patient_id) to treat a patient " " — this advances time by 1 step.\n" "3. OR call wait() to skip a step " " (penalized if patients are waiting).\n" "4. Repeat until done=true.\n" "5. Call finalize_episode() to get your final score." ), }, indent=2) @mcp.tool def get_queue_state() -> str: """ Observe the current emergency department state. Does NOT advance time. Returns: - Current step and steps remaining - All patients sorted by priority (severity, then wait time) - can_serve_now flag per patient (resource availability check) - Available doctors and ICU beds - Patients served and missed emergencies - Cumulative reward - Triage advisory (for inspection — not used by the inference agent) - done flag """ if self._engine is None: return json.dumps({ "error": "No active task. Call start_task(task_id) first." }) self._state.step_count += 1 return json.dumps(self._engine.get_state(), indent=2) @mcp.tool def serve_patient(patient_id: str) -> str: """ Assign a doctor to treat a patient. ADVANCES SIMULATION BY 1 STEP. After this action: - Patient removed from queue - Wait times increase for all remaining patients - New patients may arrive (deterministic or seeded schedule) - Deteriorating patients' countdowns decrease (Task 3) - Step counter increments Resource errors (no ICU bed, insufficient doctors) do NOT advance time — the agent receives an error message and must choose again. Args: patient_id: Patient ID (e.g. 'P001', 'P007') Returns step reward, updated queue state, and events log. """ if self._engine is None: return json.dumps({ "error": "No active task. Call start_task(task_id) first." }) if self._engine.step >= self._engine.max_steps: return json.dumps({ "error": "Episode complete. Call finalize_episode().", "done": True, }) reward, new_state, events = self._engine.serve_patient(patient_id) self._cumulative_reward += reward self._state.step_count += 1 return json.dumps({ "action": f"serve_patient({patient_id})", "step_reward": round(reward, 4), "events": events, "state": new_state, "done": new_state["done"], "hint": ( "Call finalize_episode() to get your final score." if new_state["done"] else "Continue serving patients or call finalize_episode() anytime." ), }, indent=2) @mcp.tool def wait() -> str: """ Skip this step without serving any patient. ADVANCES SIMULATION BY 1 STEP. Penalties: Emergency (severity 1) in queue: -0.30 per patient Urgent (severity 2-3) in queue: -0.10 Any patient in queue: -0.05 Empty queue: 0.00 Returns step penalty, updated queue state, and events log. """ if self._engine is None: return json.dumps({ "error": "No active task. Call start_task(task_id) first." }) if self._engine.step >= self._engine.max_steps: return json.dumps({ "error": "Episode complete. Call finalize_episode().", "done": True, }) penalty, new_state, events = self._engine.wait() self._cumulative_reward += penalty self._state.step_count += 1 return json.dumps({ "action": "wait()", "step_reward": round(penalty, 4), "events": events, "state": new_state, "done": new_state["done"], }, indent=2) @mcp.tool def finalize_episode() -> str: """ Finalize the current task and compute the final normalized score. Applies the principled grader to produce a score in [0, 1]. Grader weights are derived from published clinical literature — not tuned to hit target scores. Returns final score, component scores, and full episode statistics. """ if self._engine is None: return json.dumps({ "error": "No active task. Call start_task(task_id) first." }) task_id = self._active_task_id task = TASKS[task_id] result = GRADERS[task["grader"]](self._engine) self._finalized_tasks[task_id] = result["score"] done = len(self._finalized_tasks) >= len(TASKS) self._done = done self._state.step_count += 1 return json.dumps({ "task_id": task_id, "task_name": task["task_name"], "difficulty": task["difficulty"], **result, "episode_steps": self._engine.step, "patients_served": len(self._engine.served), "served_detail": self._engine.served, "tasks_completed": len(self._finalized_tasks), "tasks_total": len(TASKS), "all_done": done, }, indent=2) @mcp.tool def get_current_state() -> str: """Get environment-level metadata (episode state, not queue state).""" return json.dumps({ "episode_id": self._state.episode_id, "step_count": self._state.step_count, "active_task": self._active_task_id, "finalized_scores": self._finalized_tasks, "cumulative_reward": round(self._cumulative_reward, 4), "done": self._done, "tasks_available": list(TASKS.keys()), }, indent=2) super().__init__(mcp) self._state = State(episode_id=str(uuid4()), step_count=0) self._cumulative_reward: float = 0.0 self._done: bool = False self._active_task_id: Optional[str] = None self._engine: Optional[QueueEngine] = None self._finalized_tasks: dict = {} def reset(self, seed=None, episode_id=None, **kwargs) -> Observation: self._state = State(episode_id=episode_id or str(uuid4()), step_count=0) self._cumulative_reward = 0.0 self._done = False self._active_task_id = None self._engine = None self._finalized_tasks = {} return Observation( done=False, reward=0.0, metadata={ "status": "ready", "message": ( "Queue Doctor ready. " "Workflow: list_tasks() → start_task(task_id, seed=) → " "get_queue_state() → " "[serve_patient(patient_id) or wait()] × N → " "finalize_episode()" ), "tasks_available": list(TASKS.keys()), }, ) def _step_impl(self, action, timeout_s=None, **kwargs) -> Observation: return Observation( done=False, reward=0.0, metadata={ "error": f"Unknown action: {type(action).__name__}. Use MCP tools." }, ) def step(self, action, timeout_s=None, **kwargs) -> Observation: self._state.step_count += 1 return super().step(action, timeout_s=timeout_s, **kwargs) async def step_async(self, action, timeout_s=None, **kwargs) -> Observation: self._state.step_count += 1 return await super().step_async(action, timeout_s=timeout_s, **kwargs) @property def state(self) -> State: return self._state