jarvis / tests /test_runtime_operator_server.py
Jonathan Haas
refactor: extract operator server runtime helpers from main
6c481d8
Raw
History Blame Contribute Delete
3.8 kB
from __future__ import annotations
from types import SimpleNamespace
import pytest
from jarvis.runtime_operator_server import (
operator_events_provider,
operator_metrics_provider,
start_operator_server,
startup_diagnostics_provider,
stop_operator_server,
)
class _FakeObservability:
def __init__(self) -> None:
self.events: list[tuple[str, dict[str, object]]] = []
def prometheus_metrics(self) -> str:
return "metric_a 1\n"
def recent_events(self, *, limit: int = 100) -> list[dict[str, object]]:
return [{"name": "event", "limit": limit}]
def record_event(self, name: str, payload: dict[str, object]) -> None:
self.events.append((name, payload))
class _FakeServer:
def __init__(self, **kwargs) -> None:
self.kwargs = kwargs
self.started = False
self.stopped = False
async def start(self) -> None:
self.started = True
async def stop(self) -> None:
self.stopped = True
def _runtime_stub() -> SimpleNamespace:
cfg = SimpleNamespace(
startup_warnings=["warn-a"],
operator_server_enabled=True,
operator_server_host="127.0.0.1",
operator_server_port=8777,
webhook_inbound_enabled=True,
webhook_inbound_token="inbound-token",
webhook_auth_token="auth-token",
operator_auth_mode="token",
operator_auth_token="operator-token",
)
runtime = SimpleNamespace(
config=cfg,
_operator_server=None,
_observability=_FakeObservability(),
_startup_blockers=lambda: ["block-a"],
_startup_diagnostics_provider=lambda: ["warn-a", "BLOCKER: block-a"],
_operator_status_provider=lambda: None,
_operator_control_handler=lambda action, payload: None,
_operator_control_schema=lambda: {},
_operator_metrics_provider=lambda: "",
_operator_events_provider=lambda: [],
_operator_conversation_trace_provider=lambda limit=20: [],
)
return runtime
def test_startup_diagnostics_provider_merges_warnings_and_blockers() -> None:
runtime = _runtime_stub()
items = startup_diagnostics_provider(runtime)
assert items == ["warn-a", "BLOCKER: block-a"]
def test_operator_metrics_and_events_provider_use_observability() -> None:
runtime = _runtime_stub()
assert operator_metrics_provider(runtime) == "metric_a 1\n"
assert operator_events_provider(runtime) == [{"name": "event", "limit": 100}]
@pytest.mark.asyncio
async def test_start_operator_server_sets_handle_and_wires_inbound_callback() -> None:
runtime = _runtime_stub()
recorded: list[dict[str, object]] = []
logger = SimpleNamespace(warning=lambda *_args, **_kwargs: None)
def _record_inbound(**kwargs) -> int:
recorded.append({str(k): v for k, v in kwargs.items()})
return 42
await start_operator_server(
runtime,
operator_server_class=_FakeServer,
record_inbound_webhook_event_fn=_record_inbound,
logger=logger,
)
assert isinstance(runtime._operator_server, _FakeServer)
assert runtime._operator_server.started is True
callback = runtime._operator_server.kwargs["inbound_callback"]
result = callback({"x": 1}, {"h": "v"}, "/inbound", "test")
assert result == 42
assert recorded == [
{"payload": {"x": 1}, "headers": {"h": "v"}, "path": "/inbound", "source": "test"}
]
assert runtime._observability.events
assert runtime._observability.events[0][0] == "operator_server_started"
@pytest.mark.asyncio
async def test_stop_operator_server_clears_handle() -> None:
runtime = _runtime_stub()
runtime._operator_server = _FakeServer()
await stop_operator_server(runtime)
assert runtime._operator_server is None