jarvis / tests /test_runtime_operator_status.py
Jonathan Haas
Add LLM memory quality eval gate and promote OpenAI-agent readiness updates
1265f9a
Raw
History Blame Contribute Delete
12.5 kB
from __future__ import annotations
from collections import deque
import json
from pathlib import Path
from types import SimpleNamespace
import pytest
from jarvis.runtime_constants import (
VALID_CONTROL_PRESETS,
VALID_OPERATOR_AUTH_MODES,
)
from jarvis.runtime_operator_status import (
normalize_operator_auth_mode,
operator_auth_risk,
operator_status_provider,
)
def _collect_paths(value: object, *, prefix: str = "") -> set[str]:
paths: set[str] = set()
if isinstance(value, dict):
for key, nested in value.items():
key_text = str(key)
path = f"{prefix}.{key_text}" if prefix else key_text
paths.add(path)
paths.update(_collect_paths(nested, prefix=path))
return paths
if isinstance(value, list):
list_path = f"{prefix}[]" if prefix else "[]"
paths.add(list_path)
if value:
paths.update(_collect_paths(value[0], prefix=list_path))
return paths
return paths
def test_normalize_operator_auth_mode_defaults_to_token_for_invalid() -> None:
assert normalize_operator_auth_mode("bad-mode", valid_modes=VALID_OPERATOR_AUTH_MODES) == "token"
assert normalize_operator_auth_mode("SESSION", valid_modes=VALID_OPERATOR_AUTH_MODES) == "session"
@pytest.mark.parametrize(
("mode", "token_configured", "expected"),
[
("off", False, "high"),
("off", True, "high"),
("token", False, "high"),
("token", True, "medium"),
("session", False, "high"),
("session", True, "low"),
],
)
def test_operator_auth_risk_matrix(mode: str, token_configured: bool, expected: str) -> None:
assert operator_auth_risk(auth_mode=mode, token_configured=token_configured) == expected
@pytest.mark.asyncio
async def test_operator_status_provider_shapes_payload_and_risk() -> None:
runtime = SimpleNamespace()
runtime.config = SimpleNamespace(
operator_server_enabled=True,
operator_server_host="127.0.0.1",
operator_server_port=8080,
operator_auth_mode="session",
operator_auth_token="tok",
persona_style="friendly",
backchannel_style="expressive",
)
runtime._conversation_traces = deque([{"turn_id": 11}], maxlen=10)
runtime._episodic_timeline = deque([{"episode_id": 7}], maxlen=10)
runtime._active_control_preset = "quiet_hours"
runtime._personality_preview_snapshot = {"persona_style": "composed"}
runtime._runtime_profile_snapshot = lambda: {"wake_mode": "wake_word"}
runtime._runtime_invariant_snapshot = lambda: {"total_violations": 0}
runtime._operator_conversation_trace_provider = lambda limit=1: [{"turn_id": 11}]
runtime._operator_episodic_timeline_provider = lambda limit=20: [{"episode_id": 7}]
async def _system_status(_: dict[str, object]) -> dict[str, object]:
return {"content": [{"text": '{"ok": true, "service": "jarvis"}'}]}
status = await operator_status_provider(
runtime,
valid_operator_auth_modes=VALID_OPERATOR_AUTH_MODES,
valid_control_presets=VALID_CONTROL_PRESETS,
system_status_fn=_system_status,
)
assert status["ok"] is True
assert status["operator"]["auth_mode"] == "session"
assert status["operator"]["auth_risk"] == "low"
assert status["conversation_trace"]["latest_turn_id"] == 11
assert status["episodic_timeline"]["latest_episode_id"] == 7
assert status["operator_controls"]["active_control_preset"] == "quiet_hours"
assert status["operator_controls"]["runtime_profile"]["wake_mode"] == "wake_word"
recommendations = status["operator_recommendations"]
assert recommendations["severity"] == "low"
assert recommendations["count"] >= 1
assert recommendations["recommended"][0]["code"] == "healthy"
@pytest.mark.asyncio
async def test_operator_status_provider_normalizes_invalid_auth_mode() -> None:
runtime = SimpleNamespace()
runtime.config = SimpleNamespace(
operator_server_enabled=True,
operator_server_host="0.0.0.0",
operator_server_port=8080,
operator_auth_mode="bad-mode",
operator_auth_token="",
persona_style="friendly",
backchannel_style="balanced",
)
runtime._conversation_traces = deque([], maxlen=10)
runtime._episodic_timeline = deque([], maxlen=10)
runtime._active_control_preset = "custom"
runtime._personality_preview_snapshot = None
runtime._runtime_profile_snapshot = lambda: {}
runtime._runtime_invariant_snapshot = lambda: {}
runtime._operator_conversation_trace_provider = lambda limit=1: []
runtime._operator_episodic_timeline_provider = lambda limit=20: []
async def _system_status(_: dict[str, object]) -> dict[str, object]:
return {"content": [{"text": "{}"}]}
status = await operator_status_provider(
runtime,
valid_operator_auth_modes=VALID_OPERATOR_AUTH_MODES,
valid_control_presets=VALID_CONTROL_PRESETS,
system_status_fn=_system_status,
)
assert status["operator"]["auth_mode"] == "token"
assert status["operator"]["auth_risk"] == "high"
recommendations = status["operator_recommendations"]
assert recommendations["severity"] in {"medium", "high"}
assert any(
row["code"] == "operator_auth_risk"
for row in recommendations["recommended"]
)
@pytest.mark.asyncio
async def test_operator_status_provider_recommends_on_health_and_checkpoint_signals() -> None:
runtime = SimpleNamespace()
runtime.config = SimpleNamespace(
operator_server_enabled=True,
operator_server_host="127.0.0.1",
operator_server_port=8080,
operator_auth_mode="token",
operator_auth_token="set-token",
persona_style="friendly",
backchannel_style="balanced",
)
runtime._conversation_traces = deque([], maxlen=10)
runtime._episodic_timeline = deque([], maxlen=10)
runtime._active_control_preset = "custom"
runtime._personality_preview_snapshot = None
runtime._runtime_profile_snapshot = lambda: {}
runtime._runtime_invariant_snapshot = lambda: {"total_violations": 3}
runtime._operator_conversation_trace_provider = lambda limit=1: []
runtime._operator_episodic_timeline_provider = lambda limit=20: []
async def _system_status(_: dict[str, object]) -> dict[str, object]:
payload = {
"health": {"health_level": "degraded", "reasons": ["memory_error"]},
"plan_preview": {"pending_count": 2},
"expansion": {
"planner_engine": {
"autonomy_waiting_checkpoint_count": 1,
"autonomy_backlog_step_count": 3,
"autonomy_needs_replan_count": 1,
"autonomy_retry_pending_count": 2,
},
"proactive": {"approval_pending_count": 1},
},
"voice_attention": {
"multimodal_grounding": {
"confidence_band": "low",
"overall_confidence": 0.2,
}
},
"dead_letter_queue": {"pending_count": 2},
"recovery_journal": {"interrupted_count": 1, "unresolved_count": 1},
}
return {"content": [{"text": str(payload).replace("'", '"')}]}
status = await operator_status_provider(
runtime,
valid_operator_auth_modes=VALID_OPERATOR_AUTH_MODES,
valid_control_presets=VALID_CONTROL_PRESETS,
system_status_fn=_system_status,
)
recommendations = status["operator_recommendations"]
assert recommendations["severity"] in {"medium", "high"}
codes = {row["code"] for row in recommendations["recommended"]}
assert "runtime_health_degraded" in codes
assert "pending_previews" in codes
assert "autonomy_waiting_checkpoint" in codes
assert "autonomy_backlog_steps" in codes
assert "autonomy_needs_replan" in codes
assert "autonomy_retry_pending" in codes
assert "approval_queue_pending" in codes
assert "dead_letter_pending" in codes
assert "recovery_unresolved" in codes
assert "multimodal_low_confidence" in codes
@pytest.mark.asyncio
async def test_operator_status_snapshot_contract_paths_stable() -> None:
runtime = SimpleNamespace()
runtime.config = SimpleNamespace(
operator_server_enabled=True,
operator_server_host="127.0.0.1",
operator_server_port=8080,
operator_auth_mode="session",
operator_auth_token="tok",
persona_style="friendly",
backchannel_style="balanced",
)
runtime._conversation_traces = deque([{"turn_id": 11}], maxlen=10)
runtime._episodic_timeline = deque([{"episode_id": 7}], maxlen=10)
runtime._active_control_preset = "custom"
runtime._personality_preview_snapshot = {"persona_style": "composed"}
runtime._runtime_profile_snapshot = lambda: {"wake_mode": "wake_word"}
runtime._runtime_invariant_snapshot = lambda: {"total_violations": 0}
runtime._operator_conversation_trace_provider = lambda limit=1: [{"turn_id": 11}]
runtime._operator_episodic_timeline_provider = lambda limit=20: [{"episode_id": 7}]
async def _system_status(_: dict[str, object]) -> dict[str, object]:
return {"content": [{"text": '{"ok": true, "service": "jarvis"}'}]}
status = await operator_status_provider(
runtime,
valid_operator_auth_modes=VALID_OPERATOR_AUTH_MODES,
valid_control_presets=VALID_CONTROL_PRESETS,
system_status_fn=_system_status,
)
snapshot_path = Path(__file__).resolve().parents[1] / "docs" / "evals" / "operator-status-snapshot.json"
snapshot = json.loads(snapshot_path.read_text(encoding="utf-8"))
required_paths = _collect_paths(snapshot)
actual_paths = _collect_paths(status)
missing = sorted(required_paths - actual_paths)
assert not missing
@pytest.mark.asyncio
async def test_operator_status_recommendation_codes_stress_contract() -> None:
runtime = SimpleNamespace()
runtime.config = SimpleNamespace(
operator_server_enabled=True,
operator_server_host="127.0.0.1",
operator_server_port=8080,
operator_auth_mode="token",
operator_auth_token="",
persona_style="friendly",
backchannel_style="balanced",
)
runtime._conversation_traces = deque([], maxlen=10)
runtime._episodic_timeline = deque([], maxlen=10)
runtime._active_control_preset = "custom"
runtime._personality_preview_snapshot = None
runtime._runtime_profile_snapshot = lambda: {}
runtime._runtime_invariant_snapshot = lambda: {"total_violations": 3}
runtime._operator_conversation_trace_provider = lambda limit=1: []
runtime._operator_episodic_timeline_provider = lambda limit=20: []
async def _system_status(_: dict[str, object]) -> dict[str, object]:
payload = {
"health": {"health_level": "degraded", "reasons": ["memory_error"]},
"plan_preview": {"pending_count": 2},
"expansion": {
"planner_engine": {
"autonomy_waiting_checkpoint_count": 1,
"autonomy_backlog_step_count": 2,
"autonomy_needs_replan_count": 1,
"autonomy_retry_pending_count": 1,
},
"proactive": {"approval_pending_count": 1},
},
"voice_attention": {"multimodal_grounding": {"confidence_band": "low"}},
"dead_letter_queue": {"pending_count": 1},
"recovery_journal": {"interrupted_count": 1, "unresolved_count": 0},
}
return {"content": [{"text": json.dumps(payload)}]}
status = await operator_status_provider(
runtime,
valid_operator_auth_modes=VALID_OPERATOR_AUTH_MODES,
valid_control_presets=VALID_CONTROL_PRESETS,
system_status_fn=_system_status,
)
codes = {row["code"] for row in status["operator_recommendations"]["recommended"]}
required_codes = {
"operator_auth_risk",
"runtime_health_degraded",
"health_reasons_present",
"runtime_invariants",
"pending_previews",
"autonomy_waiting_checkpoint",
"autonomy_backlog_steps",
"autonomy_needs_replan",
"autonomy_retry_pending",
"approval_queue_pending",
"dead_letter_pending",
"recovery_unresolved",
"multimodal_low_confidence",
}
assert required_codes.issubset(codes)