| from __future__ import annotations |
|
|
| from types import SimpleNamespace |
|
|
| import pytest |
|
|
| from jarvis.runtime_operator_server import ( |
| operator_events_provider, |
| operator_metrics_provider, |
| start_operator_server, |
| startup_diagnostics_provider, |
| stop_operator_server, |
| ) |
|
|
|
|
| class _FakeObservability: |
| def __init__(self) -> None: |
| self.events: list[tuple[str, dict[str, object]]] = [] |
|
|
| def prometheus_metrics(self) -> str: |
| return "metric_a 1\n" |
|
|
| def recent_events(self, *, limit: int = 100) -> list[dict[str, object]]: |
| return [{"name": "event", "limit": limit}] |
|
|
| def record_event(self, name: str, payload: dict[str, object]) -> None: |
| self.events.append((name, payload)) |
|
|
|
|
| class _FakeServer: |
| def __init__(self, **kwargs) -> None: |
| self.kwargs = kwargs |
| self.started = False |
| self.stopped = False |
|
|
| async def start(self) -> None: |
| self.started = True |
|
|
| async def stop(self) -> None: |
| self.stopped = True |
|
|
|
|
| def _runtime_stub() -> SimpleNamespace: |
| cfg = SimpleNamespace( |
| startup_warnings=["warn-a"], |
| operator_server_enabled=True, |
| operator_server_host="127.0.0.1", |
| operator_server_port=8777, |
| webhook_inbound_enabled=True, |
| webhook_inbound_token="inbound-token", |
| webhook_auth_token="auth-token", |
| operator_auth_mode="token", |
| operator_auth_token="operator-token", |
| ) |
| runtime = SimpleNamespace( |
| config=cfg, |
| _operator_server=None, |
| _observability=_FakeObservability(), |
| _startup_blockers=lambda: ["block-a"], |
| _startup_diagnostics_provider=lambda: ["warn-a", "BLOCKER: block-a"], |
| _operator_status_provider=lambda: None, |
| _operator_control_handler=lambda action, payload: None, |
| _operator_control_schema=lambda: {}, |
| _operator_metrics_provider=lambda: "", |
| _operator_events_provider=lambda: [], |
| _operator_conversation_trace_provider=lambda limit=20: [], |
| ) |
| return runtime |
|
|
|
|
| def test_startup_diagnostics_provider_merges_warnings_and_blockers() -> None: |
| runtime = _runtime_stub() |
| items = startup_diagnostics_provider(runtime) |
| assert items == ["warn-a", "BLOCKER: block-a"] |
|
|
|
|
| def test_operator_metrics_and_events_provider_use_observability() -> None: |
| runtime = _runtime_stub() |
| assert operator_metrics_provider(runtime) == "metric_a 1\n" |
| assert operator_events_provider(runtime) == [{"name": "event", "limit": 100}] |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_start_operator_server_sets_handle_and_wires_inbound_callback() -> None: |
| runtime = _runtime_stub() |
| recorded: list[dict[str, object]] = [] |
| logger = SimpleNamespace(warning=lambda *_args, **_kwargs: None) |
|
|
| def _record_inbound(**kwargs) -> int: |
| recorded.append({str(k): v for k, v in kwargs.items()}) |
| return 42 |
|
|
| await start_operator_server( |
| runtime, |
| operator_server_class=_FakeServer, |
| record_inbound_webhook_event_fn=_record_inbound, |
| logger=logger, |
| ) |
|
|
| assert isinstance(runtime._operator_server, _FakeServer) |
| assert runtime._operator_server.started is True |
| callback = runtime._operator_server.kwargs["inbound_callback"] |
| result = callback({"x": 1}, {"h": "v"}, "/inbound", "test") |
| assert result == 42 |
| assert recorded == [ |
| {"payload": {"x": 1}, "headers": {"h": "v"}, "path": "/inbound", "source": "test"} |
| ] |
| assert runtime._observability.events |
| assert runtime._observability.events[0][0] == "operator_server_started" |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_stop_operator_server_clears_handle() -> None: |
| runtime = _runtime_stub() |
| runtime._operator_server = _FakeServer() |
| await stop_operator_server(runtime) |
| assert runtime._operator_server is None |
|
|