File size: 3,802 Bytes
6c481d8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 | from __future__ import annotations
from types import SimpleNamespace
import pytest
from jarvis.runtime_operator_server import (
operator_events_provider,
operator_metrics_provider,
start_operator_server,
startup_diagnostics_provider,
stop_operator_server,
)
class _FakeObservability:
def __init__(self) -> None:
self.events: list[tuple[str, dict[str, object]]] = []
def prometheus_metrics(self) -> str:
return "metric_a 1\n"
def recent_events(self, *, limit: int = 100) -> list[dict[str, object]]:
return [{"name": "event", "limit": limit}]
def record_event(self, name: str, payload: dict[str, object]) -> None:
self.events.append((name, payload))
class _FakeServer:
def __init__(self, **kwargs) -> None:
self.kwargs = kwargs
self.started = False
self.stopped = False
async def start(self) -> None:
self.started = True
async def stop(self) -> None:
self.stopped = True
def _runtime_stub() -> SimpleNamespace:
cfg = SimpleNamespace(
startup_warnings=["warn-a"],
operator_server_enabled=True,
operator_server_host="127.0.0.1",
operator_server_port=8777,
webhook_inbound_enabled=True,
webhook_inbound_token="inbound-token",
webhook_auth_token="auth-token",
operator_auth_mode="token",
operator_auth_token="operator-token",
)
runtime = SimpleNamespace(
config=cfg,
_operator_server=None,
_observability=_FakeObservability(),
_startup_blockers=lambda: ["block-a"],
_startup_diagnostics_provider=lambda: ["warn-a", "BLOCKER: block-a"],
_operator_status_provider=lambda: None,
_operator_control_handler=lambda action, payload: None,
_operator_control_schema=lambda: {},
_operator_metrics_provider=lambda: "",
_operator_events_provider=lambda: [],
_operator_conversation_trace_provider=lambda limit=20: [],
)
return runtime
def test_startup_diagnostics_provider_merges_warnings_and_blockers() -> None:
runtime = _runtime_stub()
items = startup_diagnostics_provider(runtime)
assert items == ["warn-a", "BLOCKER: block-a"]
def test_operator_metrics_and_events_provider_use_observability() -> None:
runtime = _runtime_stub()
assert operator_metrics_provider(runtime) == "metric_a 1\n"
assert operator_events_provider(runtime) == [{"name": "event", "limit": 100}]
@pytest.mark.asyncio
async def test_start_operator_server_sets_handle_and_wires_inbound_callback() -> None:
runtime = _runtime_stub()
recorded: list[dict[str, object]] = []
logger = SimpleNamespace(warning=lambda *_args, **_kwargs: None)
def _record_inbound(**kwargs) -> int:
recorded.append({str(k): v for k, v in kwargs.items()})
return 42
await start_operator_server(
runtime,
operator_server_class=_FakeServer,
record_inbound_webhook_event_fn=_record_inbound,
logger=logger,
)
assert isinstance(runtime._operator_server, _FakeServer)
assert runtime._operator_server.started is True
callback = runtime._operator_server.kwargs["inbound_callback"]
result = callback({"x": 1}, {"h": "v"}, "/inbound", "test")
assert result == 42
assert recorded == [
{"payload": {"x": 1}, "headers": {"h": "v"}, "path": "/inbound", "source": "test"}
]
assert runtime._observability.events
assert runtime._observability.events[0][0] == "operator_server_started"
@pytest.mark.asyncio
async def test_stop_operator_server_clears_handle() -> None:
runtime = _runtime_stub()
runtime._operator_server = _FakeServer()
await stop_operator_server(runtime)
assert runtime._operator_server is None
|