| from __future__ import annotations |
|
|
| import json |
| from types import SimpleNamespace |
|
|
| import pytest |
|
|
| from jarvis.runtime_operator_control import handle_operator_control |
|
|
|
|
| def _runtime_stub() -> SimpleNamespace: |
| return SimpleNamespace( |
| _voice_controller=lambda: SimpleNamespace(), |
| _operator_available_actions=lambda: [], |
| _parse_control_bool=lambda value: value if isinstance(value, bool) else None, |
| ) |
|
|
|
|
| def _tool_payload(payload: dict) -> dict: |
| return {"content": [{"type": "text", "text": json.dumps(payload)}]} |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_operator_control_lists_pending_approvals(monkeypatch) -> None: |
| runtime = _runtime_stub() |
|
|
| async def _fake_home_orchestrator(args: dict) -> dict: |
| assert args["action"] == "approval_list" |
| return _tool_payload( |
| { |
| "action": "approval_list", |
| "pending_count": 1, |
| "status_counts": {"pending": 1}, |
| "approvals": [{"approval_id": "approval-1", "status": "pending"}], |
| } |
| ) |
|
|
| monkeypatch.setattr("jarvis.runtime_operator_control.service_tools.home_orchestrator", _fake_home_orchestrator) |
|
|
| result = await handle_operator_control(runtime, "list_pending_approvals", {"limit": 5}) |
| assert result["ok"] is True |
| assert result["pending_count"] == 1 |
| assert result["approvals"][0]["approval_id"] == "approval-1" |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_operator_control_resolve_approval_with_execute(monkeypatch) -> None: |
| runtime = _runtime_stub() |
| calls: list[dict] = [] |
|
|
| async def _fake_home_orchestrator(args: dict) -> dict: |
| calls.append(dict(args)) |
| if args["action"] == "approval_resolve": |
| return _tool_payload( |
| { |
| "action": "approval_resolve", |
| "resolved": True, |
| "approved": True, |
| "execution_ticket": "ticket-123", |
| } |
| ) |
| if args["action"] == "execute": |
| return _tool_payload({"action": "execute", "live_executed_count": 1}) |
| raise AssertionError(f"unexpected action: {args}") |
|
|
| monkeypatch.setattr("jarvis.runtime_operator_control.service_tools.home_orchestrator", _fake_home_orchestrator) |
|
|
| result = await handle_operator_control( |
| runtime, |
| "resolve_approval", |
| { |
| "approval_id": "approval-7", |
| "approved": True, |
| "execute": True, |
| }, |
| ) |
| assert result["ok"] is True |
| assert result["approval"]["resolved"] is True |
| assert result["execution"]["live_executed_count"] == 1 |
| assert calls[0]["action"] == "approval_resolve" |
| assert calls[0]["__operator_identity"] == "operator" |
| assert calls[1]["action"] == "execute" |
| assert calls[1]["execution_ticket"] == "ticket-123" |
| assert calls[1]["resolver_id"] == "operator" |
| assert calls[1]["requester_id"] == "operator" |
| assert calls[1]["__operator_identity"] == "operator" |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_operator_control_resolve_approval_uses_operator_identity_not_payload(monkeypatch) -> None: |
| runtime = _runtime_stub() |
| calls: list[dict] = [] |
|
|
| async def _fake_home_orchestrator(args: dict) -> dict: |
| calls.append(dict(args)) |
| if args["action"] == "approval_resolve": |
| return _tool_payload({"action": "approval_resolve", "resolved": True, "approved": True}) |
| raise AssertionError(f"unexpected action: {args}") |
|
|
| monkeypatch.setattr("jarvis.runtime_operator_control.service_tools.home_orchestrator", _fake_home_orchestrator) |
|
|
| result = await handle_operator_control( |
| runtime, |
| "resolve_approval", |
| { |
| "approval_id": "approval-7", |
| "approved": True, |
| "resolver_id": "spoofed-user", |
| "__operator_identity": "session-abc123", |
| }, |
| ) |
| assert result["ok"] is True |
| assert calls[0]["resolver_id"] == "session-abc123" |
| assert calls[0]["__operator_identity"] == "session-abc123" |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_operator_control_dead_letter_status_and_replay(monkeypatch) -> None: |
| runtime = _runtime_stub() |
|
|
| async def _fake_dead_letter_list(args: dict) -> dict: |
| assert args["status"] == "open" |
| return _tool_payload({"pending_count": 2, "failed_count": 1}) |
|
|
| async def _fake_dead_letter_replay(args: dict) -> dict: |
| assert args["status"] == "open" |
| assert args["dry_run"] is True |
| return _tool_payload({"attempted_count": 2, "failed_count": 0}) |
|
|
| monkeypatch.setattr("jarvis.runtime_operator_control.service_tools.dead_letter_list", _fake_dead_letter_list) |
| monkeypatch.setattr("jarvis.runtime_operator_control.service_tools.dead_letter_replay", _fake_dead_letter_replay) |
|
|
| status = await handle_operator_control(runtime, "dead_letter_status", {"status_filter": "open"}) |
| replay = await handle_operator_control( |
| runtime, |
| "dead_letter_replay", |
| {"status_filter": "open", "dry_run": True}, |
| ) |
| assert status["ok"] is True |
| assert status["dead_letter_queue"]["pending_count"] == 2 |
| assert replay["ok"] is True |
| assert replay["dead_letter_replay"]["attempted_count"] == 2 |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_operator_control_resolve_approval_execute_requires_execution_ticket(monkeypatch) -> None: |
| runtime = _runtime_stub() |
|
|
| async def _fake_home_orchestrator(args: dict) -> dict: |
| if args["action"] == "approval_resolve": |
| return _tool_payload({"action": "approval_resolve", "resolved": True, "approved": True}) |
| raise AssertionError(f"unexpected action: {args}") |
|
|
| monkeypatch.setattr("jarvis.runtime_operator_control.service_tools.home_orchestrator", _fake_home_orchestrator) |
|
|
| result = await handle_operator_control( |
| runtime, |
| "resolve_approval", |
| { |
| "approval_id": "approval-7", |
| "approved": True, |
| "execute": True, |
| }, |
| ) |
| assert result["ok"] is False |
| assert result["error"] == "execution_ticket_missing" |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_operator_control_lists_autonomy_replans(monkeypatch) -> None: |
| runtime = _runtime_stub() |
|
|
| async def _fake_planner_engine(args: dict) -> dict: |
| assert args["action"] == "autonomy_status" |
| return _tool_payload( |
| { |
| "action": "autonomy_status", |
| "needs_replan_count": 1, |
| "retry_pending_count": 2, |
| "failure_taxonomy": {"condition_equals_mismatch": 3}, |
| "task_progress": [ |
| {"id": "deferred-1", "status": "needs_replan", "needs_replan": True}, |
| {"id": "deferred-2", "status": "scheduled", "needs_replan": False}, |
| ], |
| } |
| ) |
|
|
| monkeypatch.setattr("jarvis.runtime_operator_control.service_tools.planner_engine", _fake_planner_engine) |
|
|
| result = await handle_operator_control(runtime, "list_autonomy_replans", {"limit": 10}) |
| assert result["ok"] is True |
| assert result["needs_replan_count"] == 1 |
| assert result["retry_pending_count"] == 2 |
| assert result["tasks"] == [{"id": "deferred-1", "status": "needs_replan", "needs_replan": True}] |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_operator_control_apply_autonomy_replan_uses_operator_identity(monkeypatch) -> None: |
| runtime = _runtime_stub() |
| calls: list[dict] = [] |
|
|
| async def _fake_planner_engine(args: dict) -> dict: |
| calls.append(dict(args)) |
| assert args["action"] == "autonomy_replan" |
| return _tool_payload({"action": "autonomy_replan", "task_id": args["task_id"]}) |
|
|
| monkeypatch.setattr("jarvis.runtime_operator_control.service_tools.planner_engine", _fake_planner_engine) |
|
|
| result = await handle_operator_control( |
| runtime, |
| "apply_autonomy_replan", |
| { |
| "task_id": "deferred-9", |
| "plan_steps": ["step-a", "step-b"], |
| "step_contracts": [{}, {}], |
| "reset_progress": True, |
| "resolver_id": "spoofed", |
| "__operator_identity": "session-abc123", |
| "notes": "operator fix", |
| }, |
| ) |
| assert result["ok"] is True |
| assert result["autonomy_replan"]["task_id"] == "deferred-9" |
| assert calls[0]["resolver_id"] == "session-abc123" |
| assert calls[0]["task_id"] == "deferred-9" |
| assert calls[0]["plan_steps"] == ["step-a", "step-b"] |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_operator_control_copilot_actions(monkeypatch) -> None: |
| runtime = _runtime_stub() |
|
|
| async def _fake_system_status(args: dict) -> dict: |
| assert args == {} |
| return _tool_payload( |
| { |
| "expansion": { |
| "proactive": {"approval_pending_count": 1}, |
| "planner_engine": { |
| "autonomy_needs_replan_count": 1, |
| "autonomy_slo": {"alert_count": 1}, |
| }, |
| }, |
| "dead_letter_queue": {"pending_count": 2}, |
| } |
| ) |
|
|
| monkeypatch.setattr("jarvis.runtime_operator_control.service_tools.system_status", _fake_system_status) |
|
|
| result = await handle_operator_control(runtime, "copilot_actions", {}) |
| assert result["ok"] is True |
| action_ids = { |
| row["action_id"] |
| for row in result["actions"] |
| if isinstance(row, dict) and isinstance(row.get("action_id"), str) |
| } |
| assert "pending_approvals" in action_ids |
| assert "autonomy_replans" in action_ids |
| assert "dead_letter_replay_dry_run" in action_ids |
| assert "autonomy_slo_alerts" in action_ids |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_operator_control_copilot_execute_dispatches(monkeypatch) -> None: |
| runtime = _runtime_stub() |
|
|
| async def _fake_system_status(args: dict) -> dict: |
| assert args == {} |
| return _tool_payload( |
| { |
| "expansion": {"proactive": {"approval_pending_count": 0}, "planner_engine": {}}, |
| "dead_letter_queue": {"pending_count": 1}, |
| } |
| ) |
|
|
| async def _fake_dead_letter_replay(args: dict) -> dict: |
| assert args["dry_run"] is True |
| return _tool_payload({"attempted_count": 1, "failed_count": 0}) |
|
|
| monkeypatch.setattr("jarvis.runtime_operator_control.service_tools.system_status", _fake_system_status) |
| monkeypatch.setattr("jarvis.runtime_operator_control.service_tools.dead_letter_replay", _fake_dead_letter_replay) |
|
|
| result = await handle_operator_control( |
| runtime, |
| "copilot_execute", |
| {"action_id": "dead_letter_replay_dry_run"}, |
| ) |
| assert result["ok"] is True |
| assert result["dead_letter_replay"]["attempted_count"] == 1 |
|
|