jarvis / tests /test_operator_server.py
Jonathan Haas
feat: add observability budget telemetry and canary analytics
bd2c25a
Raw
History Blame Contribute Delete
29.2 kB
from __future__ import annotations
import hashlib
import json
import aiohttp
import pytest
from jarvis.operator_server import OperatorServer
@pytest.mark.asyncio
async def test_operator_server_routes_and_control_log(tmp_path):
calls: list[tuple[str, dict]] = []
async def status_provider():
return {"status": "ok"}
async def control_handler(action: str, payload: dict):
calls.append((action, payload))
return {"ok": True, "action": action}
server = OperatorServer(
host="127.0.0.1",
port=0,
status_provider=status_provider,
diagnostics_provider=lambda: ["warn-1"],
control_handler=control_handler,
control_schema_provider=lambda: {"actions": {"set_mode": {"required": ["mode"]}}},
metrics_provider=lambda: "jarvis_uptime_seconds 1\n",
events_provider=lambda: [{"event_type": "x", "payload": {"a": 1}}],
conversation_trace_provider=lambda limit=20: [{"turn_id": 1, "intent": "action"}][:limit],
inbound_callback=lambda payload, headers, path, source: 7,
inbound_enabled=False,
inbound_token="",
operator_auth_token="",
)
await server.start()
try:
assert server._site is not None
sockets = getattr(server._site, "_server").sockets
port = int(sockets[0].getsockname()[1])
base = f"http://127.0.0.1:{port}"
async with aiohttp.ClientSession(cookie_jar=aiohttp.CookieJar(unsafe=True)) as session:
dashboard_resp = await session.get(f"{base}/")
dashboard = await dashboard_resp.text()
assert dashboard_resp.headers["X-Content-Type-Options"] == "nosniff"
assert dashboard_resp.headers["X-Frame-Options"] == "DENY"
assert dashboard_resp.headers["Referrer-Policy"] == "no-referrer"
csp = dashboard_resp.headers["Content-Security-Policy"]
assert "unsafe-inline" not in csp
assert "style-src 'self' 'nonce-" in csp
assert "script-src 'self' 'nonce-" in csp
assert "'unsafe-hashes'" in csp
assert '<style nonce="' in dashboard
assert '<script nonce="' in dashboard
assert "@media (max-width: 920px)" in dashboard
assert "Control Schema" in dashboard
assert "Conversation Trace" in dashboard
assert "Auth mode:" in dashboard
assert "STT Confidence" in dashboard
assert "Router Canary" in dashboard
assert "Commit Preview" in dashboard
assert "Operator Brief Profile" in dashboard
assert "Preset Quiet Hours" in dashboard
assert "Export Runtime Profile" in dashboard
assert "List Pending Approvals" in dashboard
assert "Approve + Execute" in dashboard
assert "Dead-Letter Replay" in dashboard
assert "Dead-Letter Dry-Run" in dashboard
assert "dead_letter_replay" in dashboard
assert "List Autonomy Replans" in dashboard
assert "Apply Autonomy Replan" in dashboard
assert "apply_autonomy_replan" in dashboard
bad_control = await session.post(
f"{base}/api/control",
data="{not-json",
headers={"content-type": "application/json"},
)
assert bad_control.status == 400
assert (await bad_control.json())["error"] == "invalid_json"
status_resp = await session.get(f"{base}/api/status")
assert status_resp.headers["Cache-Control"] == "no-store"
status = await status_resp.json()
assert status["status"] == "ok"
metrics_text = await (await session.get(f"{base}/metrics")).text()
assert "jarvis_uptime_seconds" in metrics_text
events_text = await (await session.get(f"{base}/events?timeout_sec=0.6")).text()
assert "event: runtime" in events_text
trace = await (await session.get(f"{base}/api/conversation-trace?limit=5")).json()
assert trace[0]["turn_id"] == 1
control_schema = await (await session.get(f"{base}/api/control-schema")).json()
assert "set_mode" in control_schema["actions"]
control = await (
await session.post(
f"{base}/api/control",
json={
"action": "set_mode",
"payload": {"mode": "wake_word", "token": "secret-token"},
},
)
).json()
assert control["ok"] is True
second = await (
await session.post(
f"{base}/api/control",
json={"action": "set_mode", "payload": {"mode": "always_listening"}},
)
).json()
assert second["ok"] is True
actions = await (await session.get(f"{base}/api/operator-actions")).json()
assert len(actions) == 2
newest, oldest = actions[0], actions[1]
assert oldest["action"] == "set_mode"
assert oldest["payload"]["token"] == "***REDACTED***"
assert "signature" in oldest
assert oldest["signature_alg"] == "hmac-sha256"
assert newest["previous_signature"] == oldest["signature"]
assert calls == [
("set_mode", {"mode": "wake_word", "token": "secret-token"}),
("set_mode", {"mode": "always_listening"}),
]
finally:
await server.stop()
@pytest.mark.asyncio
async def test_operator_server_inbound_webhook_token_enforcement():
captured: list[dict] = []
def callback(payload, headers, path, source):
captured.append(
{
"payload": payload,
"headers": headers,
"path": path,
"source": source,
}
)
return 42
server = OperatorServer(
host="127.0.0.1",
port=0,
status_provider=lambda: _awaitable({"ok": True}),
diagnostics_provider=lambda: [],
control_handler=lambda a, p: _awaitable({"ok": True}),
control_schema_provider=lambda: {"actions": {}},
metrics_provider=lambda: "",
events_provider=lambda: [],
inbound_callback=callback,
inbound_enabled=True,
inbound_token="secret-token",
operator_auth_token="",
)
await server.start()
try:
assert server._site is not None
sockets = getattr(server._site, "_server").sockets
port = int(sockets[0].getsockname()[1])
base = f"http://127.0.0.1:{port}"
async with aiohttp.ClientSession(cookie_jar=aiohttp.CookieJar(unsafe=True)) as session:
forbidden = await session.post(f"{base}/api/webhook/inbound", json={"x": 1})
assert forbidden.status == 403
query_forbidden = await session.post(f"{base}/api/webhook/inbound?token=secret-token", json={"x": 2})
assert query_forbidden.status == 403
ok_bearer = await session.post(
f"{base}/api/webhook/inbound",
headers={"Authorization": "Bearer secret-token"},
json={"event": "bearer"},
)
assert ok_bearer.status == 200
ok_resp = await session.post(
f"{base}/api/webhook/inbound",
headers={"X-Webhook-Token": "secret-token"},
json={"event": "done"},
)
assert ok_resp.status == 200
payload = await ok_resp.json()
assert payload == {"ok": True, "event_id": 42}
assert len(captured) == 2
assert captured[0]["payload"] == {"event": "bearer"}
assert captured[0]["path"] == "/api/webhook/inbound"
assert captured[1]["payload"] == {"event": "done"}
assert captured[1]["path"] == "/api/webhook/inbound"
finally:
await server.stop()
@pytest.mark.asyncio
async def test_operator_server_inbound_requires_configured_token():
server = OperatorServer(
host="127.0.0.1",
port=0,
status_provider=lambda: _awaitable({"ok": True}),
diagnostics_provider=lambda: [],
control_handler=lambda a, p: _awaitable({"ok": True}),
control_schema_provider=lambda: {"actions": {}},
metrics_provider=lambda: "",
events_provider=lambda: [],
inbound_callback=lambda payload, headers, path, source: 1,
inbound_enabled=True,
inbound_token="",
operator_auth_token="",
)
await server.start()
try:
assert server._site is not None
sockets = getattr(server._site, "_server").sockets
port = int(sockets[0].getsockname()[1])
base = f"http://127.0.0.1:{port}"
async with aiohttp.ClientSession() as session:
resp = await session.post(f"{base}/api/webhook/inbound", json={"x": 1})
assert resp.status == 503
finally:
await server.stop()
@pytest.mark.asyncio
async def test_operator_server_auth_protects_api_endpoints():
server = OperatorServer(
host="127.0.0.1",
port=0,
status_provider=lambda: _awaitable({"ok": True}),
diagnostics_provider=lambda: [],
control_handler=lambda a, p: _awaitable({"ok": True}),
control_schema_provider=lambda: {"actions": {}},
metrics_provider=lambda: "jarvis_uptime_seconds 1\n",
events_provider=lambda: [],
inbound_callback=lambda payload, headers, path, source: 1,
inbound_enabled=False,
inbound_token="",
operator_auth_token="op-secret",
)
await server.start()
try:
assert server._site is not None
sockets = getattr(server._site, "_server").sockets
port = int(sockets[0].getsockname()[1])
base = f"http://127.0.0.1:{port}"
async with aiohttp.ClientSession() as session:
dashboard = await session.get(f"{base}/")
assert dashboard.status == 200
unauth = await session.get(f"{base}/api/status")
assert unauth.status == 401
schema_unauth = await session.get(f"{base}/api/control-schema")
assert schema_unauth.status == 401
trace_unauth = await session.get(f"{base}/api/conversation-trace")
assert trace_unauth.status == 401
denied = await session.get(f"{base}/api/status", headers={"X-Operator-Token": "wrong"})
assert denied.status == 403
allowed = await session.get(f"{base}/api/status", headers={"X-Operator-Token": "op-secret"})
assert allowed.status == 200
payload = await allowed.json()
assert payload["ok"] is True
schema_allowed = await session.get(f"{base}/api/control-schema", headers={"X-Operator-Token": "op-secret"})
assert schema_allowed.status == 200
trace_allowed = await session.get(f"{base}/api/conversation-trace", headers={"X-Operator-Token": "op-secret"})
assert trace_allowed.status == 200
allowed_bearer = await session.get(
f"{base}/api/status",
headers={"Authorization": "Bearer op-secret"},
)
assert allowed_bearer.status == 200
finally:
await server.stop()
@pytest.mark.asyncio
async def test_operator_server_session_auth_login_logout_flow():
server = OperatorServer(
host="127.0.0.1",
port=0,
status_provider=lambda: _awaitable({"ok": True}),
diagnostics_provider=lambda: [],
control_handler=lambda a, p: _awaitable({"ok": True}),
control_schema_provider=lambda: {"actions": {}},
metrics_provider=lambda: "jarvis_uptime_seconds 1\n",
events_provider=lambda: [],
inbound_callback=lambda payload, headers, path, source: 1,
inbound_enabled=False,
inbound_token="",
operator_auth_token="op-secret",
operator_auth_mode="session",
)
await server.start()
try:
assert server._site is not None
sockets = getattr(server._site, "_server").sockets
port = int(sockets[0].getsockname()[1])
base = f"http://127.0.0.1:{port}"
async with aiohttp.ClientSession(cookie_jar=aiohttp.CookieJar(unsafe=True)) as session:
unauthorized = await session.get(f"{base}/api/status")
assert unauthorized.status == 401
header_only = await session.get(f"{base}/api/status", headers={"X-Operator-Token": "op-secret"})
assert header_only.status == 401
wrong_login = await session.post(f"{base}/api/session/login", json={"token": "wrong"})
assert wrong_login.status == 403
login = await session.post(f"{base}/api/session/login", json={"token": "op-secret"})
assert login.status == 200
payload = await login.json()
assert payload["ok"] is True
assert payload["mode"] == "session"
assert "jarvis_operator_session" in login.cookies
assert payload["csrf_header"] == "X-CSRF-Token"
csrf_token = str(payload["csrf_token"]).strip()
assert csrf_token
assert str(payload.get("operator_identity", "")).startswith("session-")
allowed = await session.get(f"{base}/api/status")
assert allowed.status == 200
logout = await session.post(
f"{base}/api/session/logout",
headers={"X-CSRF-Token": csrf_token},
)
assert logout.status == 200
after_logout = await session.get(f"{base}/api/status")
assert after_logout.status == 401
finally:
await server.stop()
@pytest.mark.asyncio
async def test_operator_server_auth_off_mode_allows_unauthenticated_api():
server = OperatorServer(
host="127.0.0.1",
port=0,
status_provider=lambda: _awaitable({"ok": True}),
diagnostics_provider=lambda: [],
control_handler=lambda a, p: _awaitable({"ok": True}),
control_schema_provider=lambda: {"actions": {}},
metrics_provider=lambda: "jarvis_uptime_seconds 1\n",
events_provider=lambda: [],
inbound_callback=lambda payload, headers, path, source: 1,
inbound_enabled=False,
inbound_token="",
operator_auth_token="",
operator_auth_mode="off",
)
await server.start()
try:
assert server._site is not None
sockets = getattr(server._site, "_server").sockets
port = int(sockets[0].getsockname()[1])
base = f"http://127.0.0.1:{port}"
async with aiohttp.ClientSession() as session:
status = await session.get(f"{base}/api/status")
assert status.status == 200
metrics = await session.get(f"{base}/metrics")
assert metrics.status == 200
finally:
await server.stop()
@pytest.mark.asyncio
async def test_operator_server_session_login_throttles_repeated_failures():
server = OperatorServer(
host="127.0.0.1",
port=0,
status_provider=lambda: _awaitable({"ok": True}),
diagnostics_provider=lambda: [],
control_handler=lambda a, p: _awaitable({"ok": True}),
control_schema_provider=lambda: {"actions": {}},
metrics_provider=lambda: "",
events_provider=lambda: [],
inbound_callback=lambda payload, headers, path, source: 1,
inbound_enabled=False,
inbound_token="",
operator_auth_token="op-secret",
operator_auth_mode="session",
)
await server.start()
try:
assert server._site is not None
sockets = getattr(server._site, "_server").sockets
port = int(sockets[0].getsockname()[1])
base = f"http://127.0.0.1:{port}"
async with aiohttp.ClientSession(cookie_jar=aiohttp.CookieJar(unsafe=True)) as session:
for _ in range(5):
bad = await session.post(f"{base}/api/session/login", json={"token": "wrong"})
assert bad.status == 403
blocked = await session.post(f"{base}/api/session/login", json={"token": "wrong"})
assert blocked.status == 429
finally:
await server.stop()
@pytest.mark.asyncio
async def test_operator_server_records_security_events_for_operator_auth_failures():
server = OperatorServer(
host="127.0.0.1",
port=0,
status_provider=lambda: _awaitable({"ok": True}),
diagnostics_provider=lambda: [],
control_handler=lambda a, p: _awaitable({"ok": True}),
control_schema_provider=lambda: {"actions": {}},
metrics_provider=lambda: "",
events_provider=lambda: [],
inbound_callback=lambda payload, headers, path, source: 1,
inbound_enabled=False,
inbound_token="",
operator_auth_token="op-secret",
operator_auth_mode="token",
)
await server.start()
try:
assert server._site is not None
sockets = getattr(server._site, "_server").sockets
port = int(sockets[0].getsockname()[1])
base = f"http://127.0.0.1:{port}"
async with aiohttp.ClientSession() as session:
missing = await session.get(f"{base}/api/status")
assert missing.status == 401
wrong = await session.get(f"{base}/api/status", headers={"X-Operator-Token": "wrong"})
assert wrong.status == 403
actions_resp = await session.get(f"{base}/api/operator-actions", headers={"X-Operator-Token": "op-secret"})
assert actions_resp.status == 200
actions = await actions_resp.json()
security = [row for row in actions if row.get("action") == "security_event"]
reasons = {str((row.get("payload") or {}).get("reason", "")) for row in security}
assert "token_missing" in reasons
assert "token_invalid" in reasons
finally:
await server.stop()
@pytest.mark.asyncio
async def test_operator_server_records_security_events_for_login_throttle():
server = OperatorServer(
host="127.0.0.1",
port=0,
status_provider=lambda: _awaitable({"ok": True}),
diagnostics_provider=lambda: [],
control_handler=lambda a, p: _awaitable({"ok": True}),
control_schema_provider=lambda: {"actions": {}},
metrics_provider=lambda: "",
events_provider=lambda: [],
inbound_callback=lambda payload, headers, path, source: 1,
inbound_enabled=False,
inbound_token="",
operator_auth_token="op-secret",
operator_auth_mode="session",
)
await server.start()
try:
assert server._site is not None
sockets = getattr(server._site, "_server").sockets
port = int(sockets[0].getsockname()[1])
base = f"http://127.0.0.1:{port}"
async with aiohttp.ClientSession(cookie_jar=aiohttp.CookieJar(unsafe=True)) as session:
for _ in range(5):
bad = await session.post(f"{base}/api/session/login", json={"token": "wrong"})
assert bad.status == 403
blocked = await session.post(f"{base}/api/session/login", json={"token": "wrong"})
assert blocked.status == 429
server._login_blocked_until.clear()
server._login_failures.clear()
login = await session.post(f"{base}/api/session/login", json={"token": "op-secret"})
assert login.status == 200
actions_resp = await session.get(f"{base}/api/operator-actions")
assert actions_resp.status == 200
actions = await actions_resp.json()
security = [row for row in actions if row.get("action") == "security_event"]
reasons = {str((row.get("payload") or {}).get("reason", "")) for row in security}
assert "token_invalid" in reasons
assert "throttle_armed" in reasons
assert "throttled" in reasons
finally:
await server.stop()
@pytest.mark.asyncio
async def test_operator_server_injects_authenticated_identity_for_resolve_approval():
calls: list[tuple[str, dict]] = []
async def control_handler(action: str, payload: dict):
calls.append((action, dict(payload)))
return {"ok": True}
server = OperatorServer(
host="127.0.0.1",
port=0,
status_provider=lambda: _awaitable({"ok": True}),
diagnostics_provider=lambda: [],
control_handler=control_handler,
control_schema_provider=lambda: {"actions": {"resolve_approval": {"required": ["approval_id", "approved"]}}},
metrics_provider=lambda: "",
events_provider=lambda: [],
inbound_callback=lambda payload, headers, path, source: 1,
inbound_enabled=False,
inbound_token="",
operator_auth_token="op-secret",
operator_auth_mode="token",
)
await server.start()
try:
assert server._site is not None
sockets = getattr(server._site, "_server").sockets
port = int(sockets[0].getsockname()[1])
base = f"http://127.0.0.1:{port}"
async with aiohttp.ClientSession() as session:
resp = await session.post(
f"{base}/api/control",
headers={"X-Operator-Token": "op-secret"},
json={
"action": "resolve_approval",
"payload": {
"approval_id": "approval-1",
"approved": True,
"resolver_id": "spoofed-user",
"__operator_identity": "spoofed-identity",
},
},
)
assert resp.status == 200
assert calls
action, payload = calls[-1]
assert action == "resolve_approval"
expected_identity = "token-" + hashlib.sha256("op-secret".encode("utf-8")).hexdigest()[:12]
assert payload["__operator_identity"] == expected_identity
assert payload["resolver_id"] == "spoofed-user"
finally:
await server.stop()
@pytest.mark.asyncio
async def test_operator_server_injects_authenticated_identity_for_apply_autonomy_replan():
calls: list[tuple[str, dict]] = []
async def control_handler(action: str, payload: dict):
calls.append((action, dict(payload)))
return {"ok": True}
server = OperatorServer(
host="127.0.0.1",
port=0,
status_provider=lambda: _awaitable({"ok": True}),
diagnostics_provider=lambda: [],
control_handler=control_handler,
control_schema_provider=lambda: {"actions": {"apply_autonomy_replan": {"required": ["task_id"]}}},
metrics_provider=lambda: "",
events_provider=lambda: [],
inbound_callback=lambda payload, headers, path, source: 1,
inbound_enabled=False,
inbound_token="",
operator_auth_token="op-secret",
operator_auth_mode="token",
)
await server.start()
try:
assert server._site is not None
sockets = getattr(server._site, "_server").sockets
port = int(sockets[0].getsockname()[1])
base = f"http://127.0.0.1:{port}"
async with aiohttp.ClientSession() as session:
resp = await session.post(
f"{base}/api/control",
headers={"X-Operator-Token": "op-secret"},
json={
"action": "apply_autonomy_replan",
"payload": {
"task_id": "deferred-1",
"resolver_id": "spoofed-user",
"__operator_identity": "spoofed-identity",
},
},
)
assert resp.status == 200
assert calls
action, payload = calls[-1]
assert action == "apply_autonomy_replan"
expected_identity = "token-" + hashlib.sha256("op-secret".encode("utf-8")).hexdigest()[:12]
assert payload["__operator_identity"] == expected_identity
assert payload["resolver_id"] == "spoofed-user"
finally:
await server.stop()
@pytest.mark.asyncio
async def test_operator_server_control_maps_invalid_action_to_400():
async def control_handler(action: str, payload: dict):
return {"ok": False, "error": "invalid_action", "message": "unknown action"}
server = OperatorServer(
host="127.0.0.1",
port=0,
status_provider=lambda: _awaitable({"ok": True}),
diagnostics_provider=lambda: [],
control_handler=control_handler,
control_schema_provider=lambda: {"actions": {"set_mode": {"required": ["mode"]}}},
metrics_provider=lambda: "",
events_provider=lambda: [],
inbound_callback=lambda payload, headers, path, source: 1,
inbound_enabled=False,
inbound_token="",
operator_auth_token="",
)
await server.start()
try:
assert server._site is not None
sockets = getattr(server._site, "_server").sockets
port = int(sockets[0].getsockname()[1])
base = f"http://127.0.0.1:{port}"
async with aiohttp.ClientSession() as session:
resp = await session.post(f"{base}/api/control", json={"action": "nope", "payload": {}})
assert resp.status == 400
payload = await resp.json()
assert payload["error"] == "invalid_action"
finally:
await server.stop()
@pytest.mark.asyncio
async def test_operator_server_session_mode_requires_csrf_for_control():
server = OperatorServer(
host="127.0.0.1",
port=0,
status_provider=lambda: _awaitable({"ok": True}),
diagnostics_provider=lambda: [],
control_handler=lambda a, p: _awaitable({"ok": True, "action": a}),
control_schema_provider=lambda: {"actions": {}},
metrics_provider=lambda: "",
events_provider=lambda: [],
inbound_callback=lambda payload, headers, path, source: 1,
inbound_enabled=False,
inbound_token="",
operator_auth_token="op-secret",
operator_auth_mode="session",
)
await server.start()
try:
assert server._site is not None
sockets = getattr(server._site, "_server").sockets
port = int(sockets[0].getsockname()[1])
base = f"http://127.0.0.1:{port}"
async with aiohttp.ClientSession(cookie_jar=aiohttp.CookieJar(unsafe=True)) as session:
login = await session.post(f"{base}/api/session/login", json={"token": "op-secret"})
assert login.status == 200
payload = await login.json()
csrf_token = str(payload["csrf_token"]).strip()
no_csrf = await session.post(f"{base}/api/control", json={"action": "noop", "payload": {}})
assert no_csrf.status == 401
bad_csrf = await session.post(
f"{base}/api/control",
headers={"X-CSRF-Token": "bad"},
json={"action": "noop", "payload": {}},
)
assert bad_csrf.status == 403
good_csrf = await session.post(
f"{base}/api/control",
headers={"X-CSRF-Token": csrf_token},
json={"action": "noop", "payload": {}},
)
assert good_csrf.status == 200
finally:
await server.stop()
@pytest.mark.asyncio
async def test_operator_server_audit_uses_tail_semantics(monkeypatch, tmp_path):
log_path = tmp_path / "audit.jsonl"
rows = [
{"timestamp": idx, "tool": "x", "args": {"n": idx}}
for idx in range(1, 241)
]
log_path.write_text("".join(f"{json.dumps(row)}\n" for row in rows), encoding="utf-8")
monkeypatch.setattr("jarvis.operator_server.AUDIT_LOG", log_path)
server = OperatorServer(
host="127.0.0.1",
port=0,
status_provider=lambda: _awaitable({"ok": True}),
diagnostics_provider=lambda: [],
control_handler=lambda a, p: _awaitable({"ok": True}),
control_schema_provider=lambda: {"actions": {"set_mode": {"required": ["mode"]}}},
metrics_provider=lambda: "",
events_provider=lambda: [],
inbound_callback=lambda payload, headers, path, source: 1,
inbound_enabled=False,
inbound_token="",
operator_auth_token="",
)
await server.start()
try:
assert server._site is not None
sockets = getattr(server._site, "_server").sockets
port = int(sockets[0].getsockname()[1])
base = f"http://127.0.0.1:{port}"
async with aiohttp.ClientSession() as session:
resp = await session.get(f"{base}/api/audit?limit=5")
assert resp.status == 200
payload = await resp.json()
assert len(payload) == 5
timestamps = [int(item["timestamp"]) for item in payload]
assert timestamps == [240, 239, 238, 237, 236]
finally:
await server.stop()
async def _awaitable(value):
return value