File size: 6,065 Bytes
f440f03 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | """Tests for stronger autonomous planning execution."""
import asyncio
from unittest.mock import patch
import pytest
from maris_core.autonomous.agent import StartRequest, StatusRequest, get_status, start_session
from maris_core.autonomous.planner import Planner
from maris_core.memory_context import ConversationMemoryStore
BACKGROUND_TASK_SETTLE_SECONDS = 0.3
def test_planner_decompose_splits_goal_without_regex_backtracking() -> None:
planner = Planner()
steps = planner.decompose("Ievāc datus un tad salīdzini rezultātus -> pārbaudi kopsavilkumu")
assert [step["action"] for step in steps[1:3]] == [
"Ievāc datus",
"salīdzini rezultātus",
]
@pytest.mark.asyncio
async def test_autonomous_session_executes_dependent_tasks_over_status_polls() -> None:
with patch("maris_core.autonomous.agent.get_pipeline", return_value=None):
started = await start_session(StartRequest(session_id="session-1", goal="Uztaisīt plānu"))
updated = await get_status(StatusRequest(session_id="session-1"))
assert started.tasks
assert started.tasks[0].status == "completed"
assert started.events
assert started.checkpoints
assert started.agent_roles
assert updated.progress_percent >= started.progress_percent
assert updated.tasks[1].depends_on == [updated.tasks[0].id]
assert updated.replay_cursor >= started.replay_cursor
assert updated.resume_token == "resume:session-1"
@pytest.mark.asyncio
async def test_autonomous_task_retries_before_failing() -> None:
with (
patch("maris_core.autonomous.agent.get_pipeline", return_value=None),
patch(
"maris_core.autonomous.agent._execute_task",
side_effect=RuntimeError("pagaidu kļūda"),
) as execute_task_mock,
):
started = await start_session(StartRequest(session_id="session-2", goal="Debugot servisu"))
await asyncio.sleep(BACKGROUND_TASK_SETTLE_SECONDS)
retried = await get_status(StatusRequest(session_id="session-2"))
assert execute_task_mock.await_count == 2
assert started.tasks[0].status == "retrying"
assert started.tasks[0].attempts == 1
assert retried.tasks[0].status == "failed"
assert retried.tasks[0].attempts == 2
assert retried.approvals
assert retried.approvals[0].status == "needs_intervention"
assert any(event.event_type == "task.failed_attempt" for event in retried.events)
@pytest.mark.asyncio
async def test_autonomous_session_keeps_running_without_status_polls() -> None:
async def fake_execute_task(
task: dict[str, object],
goal: str,
tasks: list[dict[str, object]],
*,
persona_id: str | None = None,
) -> dict[str, object]:
del goal, tasks, persona_id
return {"summary": f"Pabeigts: {task['description']}", "artifacts": {}, "metrics": {}}
with (
patch("maris_core.autonomous.agent.get_pipeline", return_value=None),
patch("maris_core.autonomous.agent._execute_task", side_effect=fake_execute_task),
):
started = await start_session(
StartRequest(session_id="session-auto", goal="Uztaisīt plānu")
)
assert started.tasks[0].status == "completed"
await asyncio.sleep(BACKGROUND_TASK_SETTLE_SECONDS)
current = await get_status(StatusRequest(session_id="session-auto"))
assert current.status == "completed"
assert current.progress_percent == 100
assert all(task.status == "completed" for task in current.tasks)
assert any(event.event_type == "session.completed" for event in current.events)
@pytest.mark.asyncio
async def test_autonomous_session_preserves_selected_persona() -> None:
captured_messages: list[dict[str, str]] = []
def fake_pipeline(messages, max_new_tokens, temperature): # type: ignore[no-untyped-def]
nonlocal captured_messages
captured_messages = messages
return [
{
"generated_text": messages
+ [
{
"role": "assistant",
"content": "1. Nosaki prioritātes\n2. Sadalīt roadmap soļos",
}
]
}
]
with patch("maris_core.autonomous.agent.get_pipeline", return_value=fake_pipeline):
started = await start_session(
StartRequest(session_id="session-3", goal="Uztaisi roadmap", persona_id="strategist")
)
assert started.persona_id == "strategist"
assert started.persona_title == "Systems Strategist"
assert (
"Plāno ar aktīvo personu 'Systems Strategist' un tās prioritātēm."
in captured_messages[0]["content"]
)
assert "Persona režīms: Systems Strategist." in (started.tasks[0].result or "")
assert started.agent_roles[0].id == "planner"
@pytest.mark.asyncio
async def test_autonomous_planning_uses_shared_session_memory() -> None:
captured_messages: list[dict[str, str]] = []
memory = ConversationMemoryStore()
memory.remember_message(
"session-memory",
"assistant",
"Iepriekš vienojāmies prioritizēt API stabilitāti un observability.",
)
def fake_pipeline(messages, max_new_tokens, temperature): # type: ignore[no-untyped-def]
nonlocal captured_messages
captured_messages = messages
return [
{
"generated_text": messages
+ [{"role": "assistant", "content": "1. Stabilizē API\n2. Pievieno monitoringu"}]
}
]
with (
patch("maris_core.autonomous.agent.get_pipeline", return_value=fake_pipeline),
patch("maris_core.autonomous.agent.memory_store", memory),
):
started = await start_session(
StartRequest(session_id="session-memory", goal="Turpinām API stabilitātes roadmap")
)
assert "Saistītā sesijas atmiņa" in captured_messages[0]["content"]
assert any(event.event_type == "memory.context_loaded" for event in started.events)
|