"""Tests for stronger autonomous planning execution.""" import asyncio from unittest.mock import patch import pytest from maris_core.autonomous.agent import StartRequest, StatusRequest, get_status, start_session from maris_core.autonomous.planner import Planner from maris_core.memory_context import ConversationMemoryStore BACKGROUND_TASK_SETTLE_SECONDS = 0.3 def test_planner_decompose_splits_goal_without_regex_backtracking() -> None: planner = Planner() steps = planner.decompose("Ievāc datus un tad salīdzini rezultātus -> pārbaudi kopsavilkumu") assert [step["action"] for step in steps[1:3]] == [ "Ievāc datus", "salīdzini rezultātus", ] @pytest.mark.asyncio async def test_autonomous_session_executes_dependent_tasks_over_status_polls() -> None: with patch("maris_core.autonomous.agent.get_pipeline", return_value=None): started = await start_session(StartRequest(session_id="session-1", goal="Uztaisīt plānu")) updated = await get_status(StatusRequest(session_id="session-1")) assert started.tasks assert started.tasks[0].status == "completed" assert started.events assert started.checkpoints assert started.agent_roles assert updated.progress_percent >= started.progress_percent assert updated.tasks[1].depends_on == [updated.tasks[0].id] assert updated.replay_cursor >= started.replay_cursor assert updated.resume_token == "resume:session-1" @pytest.mark.asyncio async def test_autonomous_task_retries_before_failing() -> None: with ( patch("maris_core.autonomous.agent.get_pipeline", return_value=None), patch( "maris_core.autonomous.agent._execute_task", side_effect=RuntimeError("pagaidu kļūda"), ) as execute_task_mock, ): started = await start_session(StartRequest(session_id="session-2", goal="Debugot servisu")) await asyncio.sleep(BACKGROUND_TASK_SETTLE_SECONDS) retried = await get_status(StatusRequest(session_id="session-2")) assert execute_task_mock.await_count == 2 assert started.tasks[0].status == "retrying" assert started.tasks[0].attempts == 1 assert retried.tasks[0].status == "failed" assert retried.tasks[0].attempts == 2 assert retried.approvals assert retried.approvals[0].status == "needs_intervention" assert any(event.event_type == "task.failed_attempt" for event in retried.events) @pytest.mark.asyncio async def test_autonomous_session_keeps_running_without_status_polls() -> None: async def fake_execute_task( task: dict[str, object], goal: str, tasks: list[dict[str, object]], *, persona_id: str | None = None, ) -> dict[str, object]: del goal, tasks, persona_id return {"summary": f"Pabeigts: {task['description']}", "artifacts": {}, "metrics": {}} with ( patch("maris_core.autonomous.agent.get_pipeline", return_value=None), patch("maris_core.autonomous.agent._execute_task", side_effect=fake_execute_task), ): started = await start_session( StartRequest(session_id="session-auto", goal="Uztaisīt plānu") ) assert started.tasks[0].status == "completed" await asyncio.sleep(BACKGROUND_TASK_SETTLE_SECONDS) current = await get_status(StatusRequest(session_id="session-auto")) assert current.status == "completed" assert current.progress_percent == 100 assert all(task.status == "completed" for task in current.tasks) assert any(event.event_type == "session.completed" for event in current.events) @pytest.mark.asyncio async def test_autonomous_session_preserves_selected_persona() -> None: captured_messages: list[dict[str, str]] = [] def fake_pipeline(messages, max_new_tokens, temperature): # type: ignore[no-untyped-def] nonlocal captured_messages captured_messages = messages return [ { "generated_text": messages + [ { "role": "assistant", "content": "1. Nosaki prioritātes\n2. Sadalīt roadmap soļos", } ] } ] with patch("maris_core.autonomous.agent.get_pipeline", return_value=fake_pipeline): started = await start_session( StartRequest(session_id="session-3", goal="Uztaisi roadmap", persona_id="strategist") ) assert started.persona_id == "strategist" assert started.persona_title == "Systems Strategist" assert ( "Plāno ar aktīvo personu 'Systems Strategist' un tās prioritātēm." in captured_messages[0]["content"] ) assert "Persona režīms: Systems Strategist." in (started.tasks[0].result or "") assert started.agent_roles[0].id == "planner" @pytest.mark.asyncio async def test_autonomous_planning_uses_shared_session_memory() -> None: captured_messages: list[dict[str, str]] = [] memory = ConversationMemoryStore() memory.remember_message( "session-memory", "assistant", "Iepriekš vienojāmies prioritizēt API stabilitāti un observability.", ) def fake_pipeline(messages, max_new_tokens, temperature): # type: ignore[no-untyped-def] nonlocal captured_messages captured_messages = messages return [ { "generated_text": messages + [{"role": "assistant", "content": "1. Stabilizē API\n2. Pievieno monitoringu"}] } ] with ( patch("maris_core.autonomous.agent.get_pipeline", return_value=fake_pipeline), patch("maris_core.autonomous.agent.memory_store", memory), ): started = await start_session( StartRequest(session_id="session-memory", goal="Turpinām API stabilitātes roadmap") ) assert "Saistītā sesijas atmiņa" in captured_messages[0]["content"] assert any(event.event_type == "memory.context_loaded" for event in started.events)