"""Consumer-side integration smoke checks for mock or real adapters. Examples: python -m pulse_physiology_env.integration_smoke python -m pulse_physiology_env.integration_smoke --backend-class pulse_physiology_env.server.adapters:MockPulseAdapter """ from __future__ import annotations import argparse import importlib import inspect from pathlib import Path from pulse_physiology_env.models import ( PulsePhysiologyObservation, ToolAction, ToolResult, ) from pulse_physiology_env.patient_state import PatientState from pulse_physiology_env.real_backend import RealPulseBackend from pulse_physiology_env.server.pathology_architect import PathologyArchitect from pulse_physiology_env.server.mock_scenarios import DEFAULT_MOCK_SCENARIO_ID from pulse_physiology_env.tool_catalog import KNOWN_TOOL_NAMES from pulse_physiology_env.tool_parser import parse_tool_action REQUIRED_OBSERVATION_FIELDS = { "scenario_id", "patient_id", "sim_time_s", "heart_rate_bpm", "systolic_bp_mmhg", "diastolic_bp_mmhg", "spo2", "respiration_rate_bpm", "blood_volume_ml", "mental_status", "active_alerts", "done", } REQUIRED_ENVELOPE_FIELDS = {"observation", "reward", "done", "metadata", "tool_result", "error"} class _ConstructorNoScenarioBackend: """Regression-only backend stub with a constructor that accepts no scenario args.""" def __init__(self) -> None: self.default_scenario_id = DEFAULT_MOCK_SCENARIO_ID def set_default_scenario(self, scenario_id: str) -> None: """Set the backend's default scenario after construction.""" self.default_scenario_id = scenario_id def reset(self, scenario_id: str | None = None, **kwargs): """Proxy reset calls into a fresh mock backend instance.""" del kwargs from pulse_physiology_env.server.adapters import MockPulseAdapter backend = MockPulseAdapter(default_scenario_id=scenario_id or self.default_scenario_id) return backend.reset(scenario_id or self.default_scenario_id) def step(self, action): """Proxy one step through a fresh mock backend instance.""" from pulse_physiology_env.server.adapters import MockPulseAdapter backend = MockPulseAdapter(default_scenario_id=self.default_scenario_id) backend.reset(self.default_scenario_id) return backend.step(action) class _FakeRealEnvironment: """Regression-only OpenEnv-like environment used to test the real wrapper shape.""" def __init__(self) -> None: self._state = PatientState( scenario_id="baseline_stable", patient_id="real_wrapper_fixture", heart_rate_bpm=72.0, systolic_bp_mmhg=118.0, diastolic_bp_mmhg=76.0, mean_arterial_pressure_mmhg=90.0, spo2=0.98, respiration_rate_bpm=14.0, blood_volume_ml=5500.0, ) def reset(self, seed: int | None = None, episode_id: str | None = None, **kwargs): """Return a deterministic observation shaped like the real runtime.""" del seed, episode_id scenario_id = str(kwargs.get("scenario_id") or self._state.scenario_id) self._state = self._state.model_copy(update={"scenario_id": scenario_id, "sim_time_s": 0.0}) return self._build_observation( reward=0.0, tool_result=None, error=None, step_count=0, ) def step(self, action: ToolAction, timeout_s: float | None = None, **kwargs): """Advance the fake environment by one action and return an observation.""" del timeout_s, kwargs self._state = self._state.model_copy(update={"sim_time_s": self._state.sim_time_s + 30.0}) return self._build_observation( reward=0.5, tool_result=ToolResult( tool_name=action.tool_name, success=True, message="Fake real environment step succeeded.", state_changed=action.tool_name == "advance_time", changed_fields=["sim_time_s"] if action.tool_name == "advance_time" else [], ), error=None, step_count=1, ) def close(self) -> None: """Match the optional close hook used by the real environment.""" return None def _build_observation( self, *, reward: float, tool_result, error, step_count: int, ) -> PulsePhysiologyObservation: """Build a real-style observation payload for wrapper regression checks.""" raw_tools = list(KNOWN_TOOL_NAMES) + ["perform_cpr", "apply_nasal_cannula"] metadata = { "step_count": step_count, "available_tools": raw_tools, "scenario_description": "Fake real environment for wrapper regression.", } return PulsePhysiologyObservation.from_patient_state( self._state, reward=reward, available_tools=raw_tools, tool_result=tool_result, error=error, metadata=metadata, ) def _set_scenario_if_supported(backend, scenario_id: str) -> None: """Set the default scenario when a backend exposes a dedicated mutator hook. The mock and real adapters may temporarily diverge in constructor shape during integration, so the smoke test cannot assume every backend accepts ``default_scenario_id`` at construction time. """ for method_name in ("set_default_scenario_id", "set_default_scenario", "set_scenario"): method = getattr(backend, method_name, None) if callable(method): method(scenario_id) return def _load_backend(backend_class_path: str, scenario_id: str): """Instantiate the requested backend while tolerating constructor differences.""" module_name, class_name = backend_class_path.split(":", 1) module = importlib.import_module(module_name) backend_cls = getattr(module, class_name) init_signature = inspect.signature(backend_cls) init_parameters = init_signature.parameters if "default_scenario_id" in init_parameters: return backend_cls(default_scenario_id=scenario_id) backend = backend_cls() _set_scenario_if_supported(backend, scenario_id) return backend def _assert(condition: bool, message: str) -> None: """Abort the smoke run with a concise message when an assertion fails.""" if not condition: raise SystemExit(message) def _check_response_shape(result, label: str) -> None: """Validate that a backend response matches the consumer-side envelope contract.""" payload = result.model_dump() _assert(REQUIRED_ENVELOPE_FIELDS <= set(payload), f"{label}: missing top-level response keys") observation = payload["observation"] _assert(REQUIRED_OBSERVATION_FIELDS <= set(observation), f"{label}: missing required observation fields") _assert(payload["done"] == observation["done"], f"{label}: done mismatch between envelope and observation") _assert(isinstance(payload["metadata"].get("available_tools"), list), f"{label}: metadata.available_tools must be a list") def _regression_check_constructor_flexibility() -> None: """Ensure backend loading works even when the constructor omits scenario kwargs.""" backend = _load_backend( "pulse_physiology_env.integration_smoke:_ConstructorNoScenarioBackend", DEFAULT_MOCK_SCENARIO_ID, ) _assert( getattr(backend, "default_scenario_id", None) == DEFAULT_MOCK_SCENARIO_ID, "constructor_flexibility: scenario setter fallback did not prime the backend", ) def _regression_check_real_backend_wrapper() -> None: """Ensure the real-backend wrapper converts raw observations into response envelopes.""" backend = RealPulseBackend( default_scenario_id=DEFAULT_MOCK_SCENARIO_ID, environment_factory=_FakeRealEnvironment, ) reset_result = backend.reset(DEFAULT_MOCK_SCENARIO_ID) _check_response_shape(reset_result, "real_wrapper_reset") _assert( reset_result.observation.available_tools == list(KNOWN_TOOL_NAMES), "real_wrapper_reset: available_tools should be filtered to the frozen consumer tool set", ) _assert( set(reset_result.observation.metadata.get("raw_available_tools", [])) >= {"perform_cpr", "apply_nasal_cannula"}, "real_wrapper_reset: raw runtime tools should remain visible in metadata", ) step_result = backend.step(ToolAction(tool_name="get_vitals", arguments={})) _check_response_shape(step_result, "real_wrapper_step") _assert(step_result.tool_result is not None, "real_wrapper_step: tool_result should be preserved") def _regression_check_readme_frontmatter() -> None: """Ensure README frontmatter uses a readable emoji entry without mojibake.""" readme_path = Path(__file__).resolve().parent / "README.md" frontmatter = readme_path.read_text(encoding="utf-8").splitlines()[:5] _assert( any(line.startswith("emoji: ") for line in frontmatter), "readme_frontmatter: expected an emoji frontmatter entry in README frontmatter", ) _assert( not any("ðŸ" in line or "ðŸ" in line for line in frontmatter), "readme_frontmatter: found mojibake in README frontmatter", ) def _regression_check_argument_normalization() -> None: """Ensure harmless formatting noise is normalized instead of penalized.""" oxygen_action = parse_tool_action( '{"tool_name":"Give Oxygen","arguments":{"flow_lpm":"2LPM"}}' ) _assert(oxygen_action.tool_name == "give_oxygen", "argument_normalization: tool_name should canonicalize") _assert( oxygen_action.arguments["flow_lpm"] == 2.0, "argument_normalization: oxygen flow should coerce to float", ) bleed_action = parse_tool_action( '{"tool_name":"control_bleeding","arguments":{"method":"Tourniquet"}}' ) _assert( bleed_action.arguments["method"] == "tourniquet", "argument_normalization: choice values should canonicalize", ) pressor_action = parse_tool_action( '{"tool_name":"give_pressor","arguments":{"stop":"false"}}' ) _assert( pressor_action.arguments["stop"] is False, "argument_normalization: boolean-like strings should coerce to booleans", ) def _regression_check_stacked_pathology_blueprints() -> None: """Ensure generated pathology authoring supports stacked injury combos.""" architect = PathologyArchitect() blueprint = architect.build_blueprint( patient_id="hassan", injury_types=["tension_pneumothorax", "hemorrhagic_shock", "cardiac_tamponade"], severity=0.7, ) _assert( blueprint.injury_type == "polytrauma", "stacked_pathology: multi-injury combos should summarize as polytrauma", ) _assert( blueprint.injury_types == ("tension_pneumothorax", "hemorrhagic_shock", "cardiac_tamponade"), "stacked_pathology: injury_types should preserve the requested combo ordering", ) action_names = [step["action"] for step in blueprint.setup_actions] _assert( action_names.count("set_tension_pneumothorax") == 1 and action_names.count("set_pericardial_effusion") == 1 and action_names.count("set_hemorrhage") >= 1 and action_names[-1] == "advance_time", "stacked_pathology: combined setup actions should include each injury plus a final deterioration window", ) def _regression_check_runtime_effects() -> None: """Ensure observation noise and time pressure can be enabled on mock resets.""" from pulse_physiology_env.server.adapters import MockPulseAdapter backend = MockPulseAdapter(default_scenario_id="respiratory_distress") reset_result = backend.reset( "respiratory_distress", observation_noise_level=1.0, time_pressure_enabled=True, ) observation_noise = reset_result.observation.metadata.get("observation_noise", {}) time_pressure = reset_result.observation.metadata.get("time_pressure", {}) _assert( observation_noise.get("enabled") is True and observation_noise.get("noise_level", 0.0) >= 0.9, "runtime_effects: observation noise metadata should be present on noisy resets", ) _assert( time_pressure.get("enabled") is True and "deterioration_multiplier" in time_pressure, "runtime_effects: time pressure metadata should be present on configured resets", ) def main() -> None: """Run lightweight contract checks against a mock or real backend implementation.""" parser = argparse.ArgumentParser() parser.add_argument( "--backend-class", default="pulse_physiology_env.server.adapters:MockPulseAdapter", help="Module path to the adapter class in module:Class format.", ) parser.add_argument("--scenario", default=DEFAULT_MOCK_SCENARIO_ID) args = parser.parse_args() backend = _load_backend(args.backend_class, args.scenario) print("Integration smoke check\n") print(f"backend_class: {args.backend_class}") print(f"scenario: {args.scenario}\n") _regression_check_constructor_flexibility() print("PASS constructor flexibility") _regression_check_real_backend_wrapper() print("PASS real backend wrapper shape") _regression_check_readme_frontmatter() print("PASS README frontmatter encoding") _regression_check_argument_normalization() print("PASS argument normalization") _regression_check_stacked_pathology_blueprints() print("PASS stacked pathology blueprints") _regression_check_runtime_effects() print("PASS runtime effects") reset_result = backend.reset(args.scenario) _check_response_shape(reset_result, "reset") print("PASS reset envelope") valid_action = ToolAction(tool_name="get_vitals", arguments={}) valid_result = backend.step(valid_action) _check_response_shape(valid_result, "valid_step") _assert(valid_result.tool_result is not None, "valid_step: tool_result must be present") print("PASS valid step envelope") invalid_tool_result = backend.step(ToolAction(tool_name="not_a_real_tool", arguments={})) _check_response_shape(invalid_tool_result, "invalid_tool") _assert(invalid_tool_result.error is not None, "invalid_tool: structured error expected") _assert( invalid_tool_result.error.code in {"UNKNOWN_TOOL", "UNSUPPORTED_TOOL"}, "invalid_tool: expected UNKNOWN_TOOL or UNSUPPORTED_TOOL", ) print("PASS unknown tool handling") invalid_arg_result = backend.step(ToolAction(tool_name="advance_time", arguments={"seconds": -5})) _check_response_shape(invalid_arg_result, "invalid_argument") _assert(invalid_arg_result.error is not None, "invalid_argument: structured error expected") _assert(invalid_arg_result.error.code == "INVALID_ARGUMENT", "invalid_argument: expected INVALID_ARGUMENT") print("PASS invalid argument handling") available_tools = valid_result.metadata.available_tools _assert( set(available_tools).issubset(set(KNOWN_TOOL_NAMES)), "available tools must stay within the known consumer tool set", ) print("PASS available tool contract") print("\nIntegration smoke passed.") if __name__ == "__main__": main()