| from __future__ import annotations |
|
|
| from collections import deque |
| import json |
| from pathlib import Path |
| from types import SimpleNamespace |
|
|
| import pytest |
|
|
| from jarvis.runtime_constants import ( |
| VALID_CONTROL_PRESETS, |
| VALID_OPERATOR_AUTH_MODES, |
| ) |
| from jarvis.runtime_operator_status import ( |
| normalize_operator_auth_mode, |
| operator_auth_risk, |
| operator_status_provider, |
| ) |
|
|
|
|
| def _collect_paths(value: object, *, prefix: str = "") -> set[str]: |
| paths: set[str] = set() |
| if isinstance(value, dict): |
| for key, nested in value.items(): |
| key_text = str(key) |
| path = f"{prefix}.{key_text}" if prefix else key_text |
| paths.add(path) |
| paths.update(_collect_paths(nested, prefix=path)) |
| return paths |
| if isinstance(value, list): |
| list_path = f"{prefix}[]" if prefix else "[]" |
| paths.add(list_path) |
| if value: |
| paths.update(_collect_paths(value[0], prefix=list_path)) |
| return paths |
| return paths |
|
|
|
|
| def test_normalize_operator_auth_mode_defaults_to_token_for_invalid() -> None: |
| assert normalize_operator_auth_mode("bad-mode", valid_modes=VALID_OPERATOR_AUTH_MODES) == "token" |
| assert normalize_operator_auth_mode("SESSION", valid_modes=VALID_OPERATOR_AUTH_MODES) == "session" |
|
|
|
|
| @pytest.mark.parametrize( |
| ("mode", "token_configured", "expected"), |
| [ |
| ("off", False, "high"), |
| ("off", True, "high"), |
| ("token", False, "high"), |
| ("token", True, "medium"), |
| ("session", False, "high"), |
| ("session", True, "low"), |
| ], |
| ) |
| def test_operator_auth_risk_matrix(mode: str, token_configured: bool, expected: str) -> None: |
| assert operator_auth_risk(auth_mode=mode, token_configured=token_configured) == expected |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_operator_status_provider_shapes_payload_and_risk() -> None: |
| runtime = SimpleNamespace() |
| runtime.config = SimpleNamespace( |
| operator_server_enabled=True, |
| operator_server_host="127.0.0.1", |
| operator_server_port=8080, |
| operator_auth_mode="session", |
| operator_auth_token="tok", |
| persona_style="friendly", |
| backchannel_style="expressive", |
| ) |
| runtime._conversation_traces = deque([{"turn_id": 11}], maxlen=10) |
| runtime._episodic_timeline = deque([{"episode_id": 7}], maxlen=10) |
| runtime._active_control_preset = "quiet_hours" |
| runtime._personality_preview_snapshot = {"persona_style": "composed"} |
| runtime._runtime_profile_snapshot = lambda: {"wake_mode": "wake_word"} |
| runtime._runtime_invariant_snapshot = lambda: {"total_violations": 0} |
| runtime._operator_conversation_trace_provider = lambda limit=1: [{"turn_id": 11}] |
| runtime._operator_episodic_timeline_provider = lambda limit=20: [{"episode_id": 7}] |
|
|
| async def _system_status(_: dict[str, object]) -> dict[str, object]: |
| return {"content": [{"text": '{"ok": true, "service": "jarvis"}'}]} |
|
|
| status = await operator_status_provider( |
| runtime, |
| valid_operator_auth_modes=VALID_OPERATOR_AUTH_MODES, |
| valid_control_presets=VALID_CONTROL_PRESETS, |
| system_status_fn=_system_status, |
| ) |
|
|
| assert status["ok"] is True |
| assert status["operator"]["auth_mode"] == "session" |
| assert status["operator"]["auth_risk"] == "low" |
| assert status["conversation_trace"]["latest_turn_id"] == 11 |
| assert status["episodic_timeline"]["latest_episode_id"] == 7 |
| assert status["operator_controls"]["active_control_preset"] == "quiet_hours" |
| assert status["operator_controls"]["runtime_profile"]["wake_mode"] == "wake_word" |
| recommendations = status["operator_recommendations"] |
| assert recommendations["severity"] == "low" |
| assert recommendations["count"] >= 1 |
| assert recommendations["recommended"][0]["code"] == "healthy" |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_operator_status_provider_normalizes_invalid_auth_mode() -> None: |
| runtime = SimpleNamespace() |
| runtime.config = SimpleNamespace( |
| operator_server_enabled=True, |
| operator_server_host="0.0.0.0", |
| operator_server_port=8080, |
| operator_auth_mode="bad-mode", |
| operator_auth_token="", |
| persona_style="friendly", |
| backchannel_style="balanced", |
| ) |
| runtime._conversation_traces = deque([], maxlen=10) |
| runtime._episodic_timeline = deque([], maxlen=10) |
| runtime._active_control_preset = "custom" |
| runtime._personality_preview_snapshot = None |
| runtime._runtime_profile_snapshot = lambda: {} |
| runtime._runtime_invariant_snapshot = lambda: {} |
| runtime._operator_conversation_trace_provider = lambda limit=1: [] |
| runtime._operator_episodic_timeline_provider = lambda limit=20: [] |
|
|
| async def _system_status(_: dict[str, object]) -> dict[str, object]: |
| return {"content": [{"text": "{}"}]} |
|
|
| status = await operator_status_provider( |
| runtime, |
| valid_operator_auth_modes=VALID_OPERATOR_AUTH_MODES, |
| valid_control_presets=VALID_CONTROL_PRESETS, |
| system_status_fn=_system_status, |
| ) |
|
|
| assert status["operator"]["auth_mode"] == "token" |
| assert status["operator"]["auth_risk"] == "high" |
| recommendations = status["operator_recommendations"] |
| assert recommendations["severity"] in {"medium", "high"} |
| assert any( |
| row["code"] == "operator_auth_risk" |
| for row in recommendations["recommended"] |
| ) |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_operator_status_provider_recommends_on_health_and_checkpoint_signals() -> None: |
| runtime = SimpleNamespace() |
| runtime.config = SimpleNamespace( |
| operator_server_enabled=True, |
| operator_server_host="127.0.0.1", |
| operator_server_port=8080, |
| operator_auth_mode="token", |
| operator_auth_token="set-token", |
| persona_style="friendly", |
| backchannel_style="balanced", |
| ) |
| runtime._conversation_traces = deque([], maxlen=10) |
| runtime._episodic_timeline = deque([], maxlen=10) |
| runtime._active_control_preset = "custom" |
| runtime._personality_preview_snapshot = None |
| runtime._runtime_profile_snapshot = lambda: {} |
| runtime._runtime_invariant_snapshot = lambda: {"total_violations": 3} |
| runtime._operator_conversation_trace_provider = lambda limit=1: [] |
| runtime._operator_episodic_timeline_provider = lambda limit=20: [] |
|
|
| async def _system_status(_: dict[str, object]) -> dict[str, object]: |
| payload = { |
| "health": {"health_level": "degraded", "reasons": ["memory_error"]}, |
| "plan_preview": {"pending_count": 2}, |
| "expansion": { |
| "planner_engine": { |
| "autonomy_waiting_checkpoint_count": 1, |
| "autonomy_backlog_step_count": 3, |
| "autonomy_needs_replan_count": 1, |
| "autonomy_retry_pending_count": 2, |
| }, |
| "proactive": {"approval_pending_count": 1}, |
| }, |
| "voice_attention": { |
| "multimodal_grounding": { |
| "confidence_band": "low", |
| "overall_confidence": 0.2, |
| } |
| }, |
| "dead_letter_queue": {"pending_count": 2}, |
| "recovery_journal": {"interrupted_count": 1, "unresolved_count": 1}, |
| } |
| return {"content": [{"text": str(payload).replace("'", '"')}]} |
|
|
| status = await operator_status_provider( |
| runtime, |
| valid_operator_auth_modes=VALID_OPERATOR_AUTH_MODES, |
| valid_control_presets=VALID_CONTROL_PRESETS, |
| system_status_fn=_system_status, |
| ) |
|
|
| recommendations = status["operator_recommendations"] |
| assert recommendations["severity"] in {"medium", "high"} |
| codes = {row["code"] for row in recommendations["recommended"]} |
| assert "runtime_health_degraded" in codes |
| assert "pending_previews" in codes |
| assert "autonomy_waiting_checkpoint" in codes |
| assert "autonomy_backlog_steps" in codes |
| assert "autonomy_needs_replan" in codes |
| assert "autonomy_retry_pending" in codes |
| assert "approval_queue_pending" in codes |
| assert "dead_letter_pending" in codes |
| assert "recovery_unresolved" in codes |
| assert "multimodal_low_confidence" in codes |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_operator_status_snapshot_contract_paths_stable() -> None: |
| runtime = SimpleNamespace() |
| runtime.config = SimpleNamespace( |
| operator_server_enabled=True, |
| operator_server_host="127.0.0.1", |
| operator_server_port=8080, |
| operator_auth_mode="session", |
| operator_auth_token="tok", |
| persona_style="friendly", |
| backchannel_style="balanced", |
| ) |
| runtime._conversation_traces = deque([{"turn_id": 11}], maxlen=10) |
| runtime._episodic_timeline = deque([{"episode_id": 7}], maxlen=10) |
| runtime._active_control_preset = "custom" |
| runtime._personality_preview_snapshot = {"persona_style": "composed"} |
| runtime._runtime_profile_snapshot = lambda: {"wake_mode": "wake_word"} |
| runtime._runtime_invariant_snapshot = lambda: {"total_violations": 0} |
| runtime._operator_conversation_trace_provider = lambda limit=1: [{"turn_id": 11}] |
| runtime._operator_episodic_timeline_provider = lambda limit=20: [{"episode_id": 7}] |
|
|
| async def _system_status(_: dict[str, object]) -> dict[str, object]: |
| return {"content": [{"text": '{"ok": true, "service": "jarvis"}'}]} |
|
|
| status = await operator_status_provider( |
| runtime, |
| valid_operator_auth_modes=VALID_OPERATOR_AUTH_MODES, |
| valid_control_presets=VALID_CONTROL_PRESETS, |
| system_status_fn=_system_status, |
| ) |
|
|
| snapshot_path = Path(__file__).resolve().parents[1] / "docs" / "evals" / "operator-status-snapshot.json" |
| snapshot = json.loads(snapshot_path.read_text(encoding="utf-8")) |
| required_paths = _collect_paths(snapshot) |
| actual_paths = _collect_paths(status) |
| missing = sorted(required_paths - actual_paths) |
| assert not missing |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_operator_status_recommendation_codes_stress_contract() -> None: |
| runtime = SimpleNamespace() |
| runtime.config = SimpleNamespace( |
| operator_server_enabled=True, |
| operator_server_host="127.0.0.1", |
| operator_server_port=8080, |
| operator_auth_mode="token", |
| operator_auth_token="", |
| persona_style="friendly", |
| backchannel_style="balanced", |
| ) |
| runtime._conversation_traces = deque([], maxlen=10) |
| runtime._episodic_timeline = deque([], maxlen=10) |
| runtime._active_control_preset = "custom" |
| runtime._personality_preview_snapshot = None |
| runtime._runtime_profile_snapshot = lambda: {} |
| runtime._runtime_invariant_snapshot = lambda: {"total_violations": 3} |
| runtime._operator_conversation_trace_provider = lambda limit=1: [] |
| runtime._operator_episodic_timeline_provider = lambda limit=20: [] |
|
|
| async def _system_status(_: dict[str, object]) -> dict[str, object]: |
| payload = { |
| "health": {"health_level": "degraded", "reasons": ["memory_error"]}, |
| "plan_preview": {"pending_count": 2}, |
| "expansion": { |
| "planner_engine": { |
| "autonomy_waiting_checkpoint_count": 1, |
| "autonomy_backlog_step_count": 2, |
| "autonomy_needs_replan_count": 1, |
| "autonomy_retry_pending_count": 1, |
| }, |
| "proactive": {"approval_pending_count": 1}, |
| }, |
| "voice_attention": {"multimodal_grounding": {"confidence_band": "low"}}, |
| "dead_letter_queue": {"pending_count": 1}, |
| "recovery_journal": {"interrupted_count": 1, "unresolved_count": 0}, |
| } |
| return {"content": [{"text": json.dumps(payload)}]} |
|
|
| status = await operator_status_provider( |
| runtime, |
| valid_operator_auth_modes=VALID_OPERATOR_AUTH_MODES, |
| valid_control_presets=VALID_CONTROL_PRESETS, |
| system_status_fn=_system_status, |
| ) |
|
|
| codes = {row["code"] for row in status["operator_recommendations"]["recommended"]} |
| required_codes = { |
| "operator_auth_risk", |
| "runtime_health_degraded", |
| "health_reasons_present", |
| "runtime_invariants", |
| "pending_previews", |
| "autonomy_waiting_checkpoint", |
| "autonomy_backlog_steps", |
| "autonomy_needs_replan", |
| "autonomy_retry_pending", |
| "approval_queue_pending", |
| "dead_letter_pending", |
| "recovery_unresolved", |
| "multimodal_low_confidence", |
| } |
| assert required_codes.issubset(codes) |
|
|