maris-ai-master / core-python /tests /test_autonomous.py
MarisUK's picture
Maris AI model sync
f440f03 verified
"""Tests for stronger autonomous planning execution."""
import asyncio
from unittest.mock import patch
import pytest
from maris_core.autonomous.agent import StartRequest, StatusRequest, get_status, start_session
from maris_core.autonomous.planner import Planner
from maris_core.memory_context import ConversationMemoryStore
BACKGROUND_TASK_SETTLE_SECONDS = 0.3
def test_planner_decompose_splits_goal_without_regex_backtracking() -> None:
planner = Planner()
steps = planner.decompose("Ievāc datus un tad salīdzini rezultātus -> pārbaudi kopsavilkumu")
assert [step["action"] for step in steps[1:3]] == [
"Ievāc datus",
"salīdzini rezultātus",
]
@pytest.mark.asyncio
async def test_autonomous_session_executes_dependent_tasks_over_status_polls() -> None:
with patch("maris_core.autonomous.agent.get_pipeline", return_value=None):
started = await start_session(StartRequest(session_id="session-1", goal="Uztaisīt plānu"))
updated = await get_status(StatusRequest(session_id="session-1"))
assert started.tasks
assert started.tasks[0].status == "completed"
assert started.events
assert started.checkpoints
assert started.agent_roles
assert updated.progress_percent >= started.progress_percent
assert updated.tasks[1].depends_on == [updated.tasks[0].id]
assert updated.replay_cursor >= started.replay_cursor
assert updated.resume_token == "resume:session-1"
@pytest.mark.asyncio
async def test_autonomous_task_retries_before_failing() -> None:
with (
patch("maris_core.autonomous.agent.get_pipeline", return_value=None),
patch(
"maris_core.autonomous.agent._execute_task",
side_effect=RuntimeError("pagaidu kļūda"),
) as execute_task_mock,
):
started = await start_session(StartRequest(session_id="session-2", goal="Debugot servisu"))
await asyncio.sleep(BACKGROUND_TASK_SETTLE_SECONDS)
retried = await get_status(StatusRequest(session_id="session-2"))
assert execute_task_mock.await_count == 2
assert started.tasks[0].status == "retrying"
assert started.tasks[0].attempts == 1
assert retried.tasks[0].status == "failed"
assert retried.tasks[0].attempts == 2
assert retried.approvals
assert retried.approvals[0].status == "needs_intervention"
assert any(event.event_type == "task.failed_attempt" for event in retried.events)
@pytest.mark.asyncio
async def test_autonomous_session_keeps_running_without_status_polls() -> None:
async def fake_execute_task(
task: dict[str, object],
goal: str,
tasks: list[dict[str, object]],
*,
persona_id: str | None = None,
) -> dict[str, object]:
del goal, tasks, persona_id
return {"summary": f"Pabeigts: {task['description']}", "artifacts": {}, "metrics": {}}
with (
patch("maris_core.autonomous.agent.get_pipeline", return_value=None),
patch("maris_core.autonomous.agent._execute_task", side_effect=fake_execute_task),
):
started = await start_session(
StartRequest(session_id="session-auto", goal="Uztaisīt plānu")
)
assert started.tasks[0].status == "completed"
await asyncio.sleep(BACKGROUND_TASK_SETTLE_SECONDS)
current = await get_status(StatusRequest(session_id="session-auto"))
assert current.status == "completed"
assert current.progress_percent == 100
assert all(task.status == "completed" for task in current.tasks)
assert any(event.event_type == "session.completed" for event in current.events)
@pytest.mark.asyncio
async def test_autonomous_session_preserves_selected_persona() -> None:
captured_messages: list[dict[str, str]] = []
def fake_pipeline(messages, max_new_tokens, temperature): # type: ignore[no-untyped-def]
nonlocal captured_messages
captured_messages = messages
return [
{
"generated_text": messages
+ [
{
"role": "assistant",
"content": "1. Nosaki prioritātes\n2. Sadalīt roadmap soļos",
}
]
}
]
with patch("maris_core.autonomous.agent.get_pipeline", return_value=fake_pipeline):
started = await start_session(
StartRequest(session_id="session-3", goal="Uztaisi roadmap", persona_id="strategist")
)
assert started.persona_id == "strategist"
assert started.persona_title == "Systems Strategist"
assert (
"Plāno ar aktīvo personu 'Systems Strategist' un tās prioritātēm."
in captured_messages[0]["content"]
)
assert "Persona režīms: Systems Strategist." in (started.tasks[0].result or "")
assert started.agent_roles[0].id == "planner"
@pytest.mark.asyncio
async def test_autonomous_planning_uses_shared_session_memory() -> None:
captured_messages: list[dict[str, str]] = []
memory = ConversationMemoryStore()
memory.remember_message(
"session-memory",
"assistant",
"Iepriekš vienojāmies prioritizēt API stabilitāti un observability.",
)
def fake_pipeline(messages, max_new_tokens, temperature): # type: ignore[no-untyped-def]
nonlocal captured_messages
captured_messages = messages
return [
{
"generated_text": messages
+ [{"role": "assistant", "content": "1. Stabilizē API\n2. Pievieno monitoringu"}]
}
]
with (
patch("maris_core.autonomous.agent.get_pipeline", return_value=fake_pipeline),
patch("maris_core.autonomous.agent.memory_store", memory),
):
started = await start_session(
StartRequest(session_id="session-memory", goal="Turpinām API stabilitātes roadmap")
)
assert "Saistītā sesijas atmiņa" in captured_messages[0]["content"]
assert any(event.event_type == "memory.context_loaded" for event in started.events)