"""HF Space dashboard data and HTML for Pulse-ER.""" from __future__ import annotations import json import os from functools import lru_cache from typing import Any from pulse_physiology_env.demo_llm_policy import heuristic_infer_fn from pulse_physiology_env.episode_runner import EpisodeRunner, EpisodeTrace from pulse_physiology_env.eval_mock import score_policy, score_random_policy from pulse_physiology_env.policies import LLMPolicy, RandomPolicy, build_expert_policy, build_no_action_policy from pulse_physiology_env.server.adapters import KNOWN_TOOL_NAMES, MockPulseAdapter from pulse_physiology_env.server.pathology_architect import PathologyArchitect from pulse_physiology_env.server.tools import AVAILABLE_TOOL_NAMES, CLINICAL_45_TOOL_NAMES DEFAULT_SPACE_SCENARIO = "respiratory_distress" DEFAULT_SPACE_POLICY = "expert" RUNTIME_SETTINGS = { "observation_noise_level": 0.3, "time_pressure_enabled": True, "seed": 0, "max_steps": 8, } POLICY_META: dict[str, dict[str, str]] = { "expert": { "label": "Expert", "summary": "Deterministic-plus-adaptive baseline that follows authored playbooks and trauma heuristics.", }, "llm_demo": { "label": "LLM Demo", "summary": "Prompt-driven heuristic baseline that is stronger than random but still inconsistent on harder scenarios.", }, "random": { "label": "Random", "summary": "Uninformed tool caller that samples valid actions without clinical sequencing.", }, "no_action": { "label": "No Action", "summary": "Passive baseline that mostly advances time and lets the patient deteriorate untreated.", }, } SPACE_SCENARIO_META: dict[str, dict[str, str]] = { "baseline_stable": { "label": "Baseline Stable", "title": "Baseline Stability Console", "summary": ( "A low-acuity case that exposes policy restraint. The useful signal here is not dramatic rescue, " "but avoiding needless interventions while keeping the patient stable under noisy observations." ), "teaching_point": "assess first and avoid over-treatment in stable physiology", }, "respiratory_distress": { "label": "Respiratory Distress", "title": "Respiratory Rescue Console", "summary": ( "A reproducible respiratory deterioration case under observation noise and time pressure. " "The replay shows how the trained policy restores oxygenation and lowers work of breathing through ordered support." ), "teaching_point": "restore oxygenation early and do not wait through worsening hypoxemia", }, "hemorrhagic_shock": { "label": "Hemorrhagic Shock", "title": "Hemorrhagic Shock Console", "summary": ( "A circulation-first case where delayed action is punished quickly. " "This view is useful because it shows how strongly the environment separates hemorrhage control from passive or noisy policies." ), "teaching_point": "treat shock early, because delay compounds deterioration", }, } TOOL_GROUPS: list[dict[str, Any]] = [ { "name": "Assessment", "summary": "Initial reads, triage summaries, and deterioration checks.", "tools": [ "get_vitals", "get_respiratory_status", "check_deterioration", "summarize_state", "recommend_next_step", ], }, { "name": "Airway / Breathing", "summary": "Respiratory support, decompression, and patient positioning.", "tools": [ "give_oxygen", "airway_support", "needle_decompression", "position_patient", ], }, { "name": "Circulation", "summary": "Hemorrhage control, fluids, and pressor escalation.", "tools": [ "control_bleeding", "give_fluids", "give_pressor", ], }, { "name": "Diagnostics", "summary": "Delayed labs that become available after simulated time passes.", "tools": [ "get_blood_gas", "get_cbc", "get_bmp", ], }, { "name": "Procedures / Flow", "summary": "Procedural intervention and time control inside the episode loop.", "tools": [ "pericardiocentesis", "advance_time", ], }, ] def _patient_count() -> int: return len(PathologyArchitect().supported_patients()) def _injury_count() -> int: return len(PathologyArchitect().supported_injury_types()) def _combo_count() -> int: return len(PathologyArchitect().default_injury_combos()) def _repo_url() -> str: return os.environ.get("PULSE_ER_REPO_URL", "https://github.com/JatinYadav2006/Pulse-ER-env.git") def _training_link() -> str: return os.environ.get( "PULSE_ER_TRAINING_LINK", "https://github.com/JatinYadav2006/Pulse-ER-env/blob/kumarthegoat/train_grpo.py", ) def _trace_to_payload(trace: EpisodeTrace) -> dict[str, Any]: frames: list[dict[str, Any]] = [] initial = trace.initial_observation frames.append( { "step_index": -1, "tool_name": "reset", "message": f"Scenario loaded: {trace.scenario_id}", "reward": 0.0, "sim_time_s": initial.sim_time_s, "heart_rate_bpm": initial.heart_rate_bpm, "systolic_bp_mmhg": initial.systolic_bp_mmhg, "diastolic_bp_mmhg": initial.diastolic_bp_mmhg, "spo2": initial.spo2, "respiration_rate_bpm": initial.respiration_rate_bpm, "blood_volume_ml": initial.blood_volume_ml, "mental_status": getattr(initial.mental_status, "value", initial.mental_status), "active_alerts": list(initial.active_alerts), } ) for step in trace.steps: obs = step.observation frames.append( { "step_index": step.step_index, "tool_name": step.action.tool_name, "message": ( step.tool_result["message"] if step.tool_result is not None and "message" in step.tool_result else step.error["message"] if step.error is not None and "message" in step.error else step.action.tool_name ), "reward": step.reward, "sim_time_s": obs.sim_time_s, "heart_rate_bpm": obs.heart_rate_bpm, "systolic_bp_mmhg": obs.systolic_bp_mmhg, "diastolic_bp_mmhg": obs.diastolic_bp_mmhg, "spo2": obs.spo2, "respiration_rate_bpm": obs.respiration_rate_bpm, "blood_volume_ml": obs.blood_volume_ml, "mental_status": getattr(obs.mental_status, "value", obs.mental_status), "active_alerts": list(obs.active_alerts), } ) final_summary = trace.summary() return { "scenario_id": trace.scenario_id, "policy_name": trace.policy_name, "summary": final_summary, "events": list(trace.events), "frames": frames, "action_log": [ { "step_index": step.step_index, "tool_name": step.action.tool_name, "message": step.tool_result["message"] if step.tool_result else step.action.tool_name, "reward": step.reward, } for step in trace.steps ], "config": dict(RUNTIME_SETTINGS), } @lru_cache(maxsize=1) def get_policy_benchmark_payload() -> dict[str, Any]: expert = score_policy(lambda scenario_id: build_expert_policy(), "expert") llm_demo = score_policy( lambda scenario_id: LLMPolicy(infer_fn=heuristic_infer_fn, name="llm_demo"), "llm_demo", ) random_policy = score_random_policy() no_action = score_policy(lambda scenario_id: build_no_action_policy(), "no_action") comparison = [ {"label": "Expert", "value": expert.average_reward, "status": "good"}, { "label": "LLM Demo", "value": llm_demo.average_reward, "status": "good" if llm_demo.average_reward > 0 else "warn", }, {"label": "Random", "value": random_policy.average_reward, "status": "bad"}, {"label": "No Action", "value": no_action.average_reward, "status": "bad"}, ] per_scenario = { "expert": expert.per_scenario, "llm_demo": llm_demo.per_scenario, "random": random_policy.per_scenario, "no_action": no_action.per_scenario, } return {"comparison": comparison, "per_scenario": per_scenario} def _available_space_scenarios(benchmarks: dict[str, Any]) -> list[str]: available = list(benchmarks["per_scenario"]["expert"].keys()) ordered = [scenario_id for scenario_id in SPACE_SCENARIO_META if scenario_id in available] return ordered + [scenario_id for scenario_id in available if scenario_id not in ordered] def _normalize_space_scenario(scenario_id: str | None, benchmarks: dict[str, Any]) -> str: available = _available_space_scenarios(benchmarks) if scenario_id in available: return scenario_id return DEFAULT_SPACE_SCENARIO if DEFAULT_SPACE_SCENARIO in available else available[0] def _available_policy_names() -> list[str]: return list(POLICY_META.keys()) def _normalize_policy_name(policy_name: str | None) -> str: if policy_name in POLICY_META: return policy_name return DEFAULT_SPACE_POLICY def _build_policy(policy_name: str): if policy_name == "expert": return build_expert_policy() if policy_name == "llm_demo": return LLMPolicy(infer_fn=heuristic_infer_fn, name="llm_demo") if policy_name == "random": return RandomPolicy(seed=RUNTIME_SETTINGS["seed"]) if policy_name == "no_action": return build_no_action_policy() raise ValueError(f"Unsupported policy: {policy_name}") @lru_cache(maxsize=32) def get_demo_episode_payload( scenario_id: str = DEFAULT_SPACE_SCENARIO, policy_name: str = DEFAULT_SPACE_POLICY, ) -> dict[str, Any]: backend = MockPulseAdapter( default_scenario_id=scenario_id, observation_noise_level=RUNTIME_SETTINGS["observation_noise_level"], time_pressure_enabled=RUNTIME_SETTINGS["time_pressure_enabled"], seed=RUNTIME_SETTINGS["seed"], ) runner = EpisodeRunner(backend=backend, max_steps=RUNTIME_SETTINGS["max_steps"]) policy = _build_policy(_normalize_policy_name(policy_name)) try: trace = runner.run(policy=policy, scenario_id=scenario_id) finally: close_method = getattr(backend, "close", None) if callable(close_method): close_method() return _trace_to_payload(trace) def _build_primary_scenario(scenario_id: str, demo: dict[str, Any], benchmarks: dict[str, Any]) -> dict[str, str]: meta = SPACE_SCENARIO_META.get( scenario_id, { "label": scenario_id.replace("_", " ").title(), "title": f"{scenario_id.replace('_', ' ').title()} Console", "summary": "Deterministic mock replay for the selected scenario.", "teaching_point": "follow the measured physiology signal", }, ) frames = demo["frames"] first_observed = next((frame for frame in frames if frame["spo2"] is not None), frames[0]) final = demo["summary"] expert_rr = benchmarks["per_scenario"]["expert"][scenario_id] no_action_rr = benchmarks["per_scenario"]["no_action"][scenario_id] selected_policy = demo["policy_name"] selected_label = POLICY_META.get(selected_policy, {}).get("label", selected_policy.replace("_", " ").title()) return { "title": meta["title"], "tag": "EVALUATION CONSOLE", "summary": meta["summary"], "teaching_point": meta["teaching_point"], "naive_outcome": ( f"No-action baseline on {scenario_id}: {no_action_rr:+.3f} reward with progressive deterioration." ), "trained_outcome": ( f"Expert replay improves SpO2 from {first_observed['spo2'] * 100:.1f}% " f"to {final['spo2_percent']:.1f}% and finishes at {expert_rr:+.3f} reward." ), "selected_outcome": ( f"{selected_label} on {scenario_id} ended with {final['termination_reason']} at " f"{final['total_reward']:+.3f} reward after {final['num_steps']} steps." ), } def _build_research_highlights(benchmarks: dict[str, Any]) -> list[dict[str, str]]: return [ {"label": f"{len(KNOWN_TOOL_NAMES)} public tools", "value": "consumer contract exposed in the Space"}, {"label": f"{len(CLINICAL_45_TOOL_NAMES)} clinical tools", "value": "full clinical intervention surface in the real runtime"}, {"label": f"{len(AVAILABLE_TOOL_NAMES)} runtime names", "value": "combined real backend execution/alias surface"}, {"label": f"{_patient_count()} patient profiles", "value": "supported by PathologyArchitect"}, ] def _build_tool_surface() -> dict[str, Any]: return { "public_contract_count": len(KNOWN_TOOL_NAMES), "clinical_surface_count": len(CLINICAL_45_TOOL_NAMES), "runtime_name_count": len(AVAILABLE_TOOL_NAMES), "groups": TOOL_GROUPS, } def _build_engine_layers() -> list[dict[str, str]]: return [ { "title": "Physiology Core", "value": "Pulse 4.3.2", "detail": "Real backend runs on the Pulse physiology engine rather than a hand-scripted simulator.", }, { "title": "Reward Engine", "value": "Safety-shaped", "detail": "Dense reward, sequencing penalties, and terminal scoring drive clinically ordered behavior.", }, { "title": "ATLS Judge", "value": "Protocol scoring", "detail": "Action history can be graded into human-readable pass/fail protocol checks.", }, { "title": "Runtime Effects", "value": "Noise + time pressure", "detail": "Observation perturbations and deterioration pressure make the policy work under uncertainty.", }, { "title": "PathologyArchitect", "value": f"{_injury_count()} injury families", "detail": f"{_patient_count()} baseline patients and {_combo_count()} default combo ladders support generated trauma cases.", }, { "title": "Adversarial Evaluation", "value": "Breaking points", "detail": "Policies can be stress-tested with stacked injuries rather than judged on single static prompts.", }, ] def _build_benchmark_matrix(benchmarks: dict[str, Any]) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for scenario_id in _available_space_scenarios(benchmarks): rows.append( { "scenario_id": scenario_id, "label": SPACE_SCENARIO_META.get(scenario_id, {}).get("label", scenario_id.replace("_", " ").title()), "expert": benchmarks["per_scenario"]["expert"][scenario_id], "llm_demo": benchmarks["per_scenario"]["llm_demo"][scenario_id], "random": benchmarks["per_scenario"]["random"][scenario_id], "no_action": benchmarks["per_scenario"]["no_action"][scenario_id], } ) return rows def _build_runtime_profile(demo: dict[str, Any]) -> list[dict[str, str]]: summary = demo["summary"] config = demo["config"] return [ {"label": "Seed", "value": str(config["seed"])}, {"label": "Noise", "value": f"{config['observation_noise_level']:.1f}"}, {"label": "Time pressure", "value": "enabled" if config["time_pressure_enabled"] else "disabled"}, {"label": "Step budget", "value": str(config["max_steps"])}, {"label": "Final mental status", "value": str(summary["mental_status"])}, {"label": "Alert count", "value": str(len(summary["active_alerts"]))}, ] def _build_policy_outcome(demo: dict[str, Any]) -> list[dict[str, str]]: summary = demo["summary"] systolic = summary["systolic_bp_mmhg"] diastolic = summary["diastolic_bp_mmhg"] map_proxy = ( f"{round((systolic + 2 * diastolic) / 3)} mmHg" if systolic is not None and diastolic is not None else "n/a" ) return [ {"label": "Termination", "value": summary["termination_reason"].replace("_", " ")}, {"label": "Total reward", "value": f"{summary['total_reward']:+.3f}"}, {"label": "Steps executed", "value": str(summary["num_steps"])}, {"label": "Final SpO2", "value": f"{summary['spo2_percent']:.1f}%" if summary["spo2_percent"] is not None else "n/a"}, {"label": "Final MAP proxy", "value": map_proxy}, {"label": "Final alerts", "value": str(len(summary["active_alerts"]))}, ] def get_dashboard_payload(scenario_id: str | None = None, policy_name: str | None = None) -> dict[str, Any]: benchmarks = get_policy_benchmark_payload() selected_scenario = _normalize_space_scenario(scenario_id, benchmarks) selected_policy = _normalize_policy_name(policy_name) demo = get_demo_episode_payload(selected_scenario, selected_policy) scenario = _build_primary_scenario(selected_scenario, demo, benchmarks) return { "hero": { "title": "Pulse-ER", "subtitle": "Physiology Evaluation Console", "description": ( "A trauma reinforcement-learning environment backed by Pulse physiology. " "This Space exposes more than a single replay: scenario controls, policy execution, " "tool surface, runtime profile, and evaluation stack are all surfaced directly so the environment reads like a real benchmark console." ), "badges": [ f"{len(KNOWN_TOOL_NAMES)}-tool public contract", f"{len(CLINICAL_45_TOOL_NAMES)}-tool clinical surface", "Pulse 4.3.2 runtime", ], }, "selected_scenario": selected_scenario, "selected_policy": selected_policy, "available_scenarios": [ { "id": scenario_key, "label": SPACE_SCENARIO_META.get(scenario_key, {}).get("label", scenario_key.replace("_", " ").title()), } for scenario_key in _available_space_scenarios(benchmarks) ], "available_policies": [ {"id": policy_key, "label": POLICY_META[policy_key]["label"], "summary": POLICY_META[policy_key]["summary"]} for policy_key in _available_policy_names() ], "scenario": scenario, "policy_comparison": benchmarks["comparison"], "benchmark_matrix": _build_benchmark_matrix(benchmarks), "research_highlights": _build_research_highlights(benchmarks), "tool_surface": _build_tool_surface(), "engine_layers": _build_engine_layers(), "runtime_profile": _build_runtime_profile(demo), "policy_outcome": _build_policy_outcome(demo), "demo_episode": demo, "links": { "repo_url": _repo_url(), "training_url": _training_link(), }, } def build_dashboard_html() -> str: payload = json.dumps(get_dashboard_payload()) return f""" Pulse-ER Physiology Evaluation Console
"""