| """Tests for stronger autonomous planning execution.""" |
|
|
| import asyncio |
| from unittest.mock import patch |
|
|
| import pytest |
|
|
| from maris_core.autonomous.agent import StartRequest, StatusRequest, get_status, start_session |
| from maris_core.autonomous.planner import Planner |
| from maris_core.memory_context import ConversationMemoryStore |
|
|
| BACKGROUND_TASK_SETTLE_SECONDS = 0.3 |
|
|
|
|
| def test_planner_decompose_splits_goal_without_regex_backtracking() -> None: |
| planner = Planner() |
|
|
| steps = planner.decompose("Ievāc datus un tad salīdzini rezultātus -> pārbaudi kopsavilkumu") |
|
|
| assert [step["action"] for step in steps[1:3]] == [ |
| "Ievāc datus", |
| "salīdzini rezultātus", |
| ] |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_autonomous_session_executes_dependent_tasks_over_status_polls() -> None: |
| with patch("maris_core.autonomous.agent.get_pipeline", return_value=None): |
| started = await start_session(StartRequest(session_id="session-1", goal="Uztaisīt plānu")) |
| updated = await get_status(StatusRequest(session_id="session-1")) |
|
|
| assert started.tasks |
| assert started.tasks[0].status == "completed" |
| assert started.events |
| assert started.checkpoints |
| assert started.agent_roles |
| assert updated.progress_percent >= started.progress_percent |
| assert updated.tasks[1].depends_on == [updated.tasks[0].id] |
| assert updated.replay_cursor >= started.replay_cursor |
| assert updated.resume_token == "resume:session-1" |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_autonomous_task_retries_before_failing() -> None: |
| with ( |
| patch("maris_core.autonomous.agent.get_pipeline", return_value=None), |
| patch( |
| "maris_core.autonomous.agent._execute_task", |
| side_effect=RuntimeError("pagaidu kļūda"), |
| ) as execute_task_mock, |
| ): |
| started = await start_session(StartRequest(session_id="session-2", goal="Debugot servisu")) |
| await asyncio.sleep(BACKGROUND_TASK_SETTLE_SECONDS) |
| retried = await get_status(StatusRequest(session_id="session-2")) |
|
|
| assert execute_task_mock.await_count == 2 |
| assert started.tasks[0].status == "retrying" |
| assert started.tasks[0].attempts == 1 |
| assert retried.tasks[0].status == "failed" |
| assert retried.tasks[0].attempts == 2 |
| assert retried.approvals |
| assert retried.approvals[0].status == "needs_intervention" |
| assert any(event.event_type == "task.failed_attempt" for event in retried.events) |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_autonomous_session_keeps_running_without_status_polls() -> None: |
| async def fake_execute_task( |
| task: dict[str, object], |
| goal: str, |
| tasks: list[dict[str, object]], |
| *, |
| persona_id: str | None = None, |
| ) -> dict[str, object]: |
| del goal, tasks, persona_id |
| return {"summary": f"Pabeigts: {task['description']}", "artifacts": {}, "metrics": {}} |
|
|
| with ( |
| patch("maris_core.autonomous.agent.get_pipeline", return_value=None), |
| patch("maris_core.autonomous.agent._execute_task", side_effect=fake_execute_task), |
| ): |
| started = await start_session( |
| StartRequest(session_id="session-auto", goal="Uztaisīt plānu") |
| ) |
| assert started.tasks[0].status == "completed" |
|
|
| await asyncio.sleep(BACKGROUND_TASK_SETTLE_SECONDS) |
| current = await get_status(StatusRequest(session_id="session-auto")) |
|
|
| assert current.status == "completed" |
| assert current.progress_percent == 100 |
| assert all(task.status == "completed" for task in current.tasks) |
| assert any(event.event_type == "session.completed" for event in current.events) |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_autonomous_session_preserves_selected_persona() -> None: |
| captured_messages: list[dict[str, str]] = [] |
|
|
| def fake_pipeline(messages, max_new_tokens, temperature): |
| nonlocal captured_messages |
| captured_messages = messages |
| return [ |
| { |
| "generated_text": messages |
| + [ |
| { |
| "role": "assistant", |
| "content": "1. Nosaki prioritātes\n2. Sadalīt roadmap soļos", |
| } |
| ] |
| } |
| ] |
|
|
| with patch("maris_core.autonomous.agent.get_pipeline", return_value=fake_pipeline): |
| started = await start_session( |
| StartRequest(session_id="session-3", goal="Uztaisi roadmap", persona_id="strategist") |
| ) |
|
|
| assert started.persona_id == "strategist" |
| assert started.persona_title == "Systems Strategist" |
| assert ( |
| "Plāno ar aktīvo personu 'Systems Strategist' un tās prioritātēm." |
| in captured_messages[0]["content"] |
| ) |
| assert "Persona režīms: Systems Strategist." in (started.tasks[0].result or "") |
| assert started.agent_roles[0].id == "planner" |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_autonomous_planning_uses_shared_session_memory() -> None: |
| captured_messages: list[dict[str, str]] = [] |
| memory = ConversationMemoryStore() |
| memory.remember_message( |
| "session-memory", |
| "assistant", |
| "Iepriekš vienojāmies prioritizēt API stabilitāti un observability.", |
| ) |
|
|
| def fake_pipeline(messages, max_new_tokens, temperature): |
| nonlocal captured_messages |
| captured_messages = messages |
| return [ |
| { |
| "generated_text": messages |
| + [{"role": "assistant", "content": "1. Stabilizē API\n2. Pievieno monitoringu"}] |
| } |
| ] |
|
|
| with ( |
| patch("maris_core.autonomous.agent.get_pipeline", return_value=fake_pipeline), |
| patch("maris_core.autonomous.agent.memory_store", memory), |
| ): |
| started = await start_session( |
| StartRequest(session_id="session-memory", goal="Turpinām API stabilitātes roadmap") |
| ) |
|
|
| assert "Saistītā sesijas atmiņa" in captured_messages[0]["content"] |
| assert any(event.event_type == "memory.context_loaded" for event in started.events) |
|
|