"""Tests for autonomous persistence and real executor adapters.""" from __future__ import annotations from pathlib import Path from types import SimpleNamespace from unittest.mock import patch import pytest from maris_core.autonomous.agent import ( StartRequest, StatusRequest, _sessions, get_status, start_session, ) from maris_core.autonomous.executor import AutonomousTaskExecutor from maris_core.autonomous.session_store import AutonomousSessionStore @pytest.mark.asyncio async def test_autonomous_session_recovers_from_persistent_store(tmp_path: Path) -> None: store = AutonomousSessionStore(db_path=str(tmp_path / "autonomous-state.db")) with ( patch("maris_core.autonomous.agent.session_store", store), patch("maris_core.autonomous.agent.get_pipeline", return_value=None), ): started = await start_session( StartRequest(session_id="persisted-session", goal="Uztaisīt plānu") ) _sessions.clear() recovered = await get_status(StatusRequest(session_id="persisted-session")) assert started.session_id == "persisted-session" assert recovered.tasks assert recovered.tasks[0].status == "completed" assert recovered.resume_token == "resume:persisted-session" audit_records = await store.load_audit_records("persisted-session") assert audit_records assert audit_records[0]["record_type"] == "session.started" @pytest.mark.asyncio async def test_executor_runs_reasoning_with_runtime_generate() -> None: executor = AutonomousTaskExecutor() task = { "id": "task-1", "description": "Nosaki prioritātes", "tool": "reasoning", "depends_on": [], } fake_response = SimpleNamespace( response="Konkrēts darba rezultāts", model="test-model", tokens_used=42, latency_ms=17, prompt_messages=2, memory_matches=1, ) with patch("maris_core.autonomous.executor.generate", return_value=fake_response): result = await executor.execute(task, "Uztaisīt roadmap", [task], persona_id="strategist") assert "Konkrēts darba rezultāts" in result.summary assert "Systems Strategist" in result.summary assert result.artifacts["model"] == "test-model" assert result.metrics["latency_ms"] == 17 @pytest.mark.asyncio async def test_executor_runs_browser_automation_workflow() -> None: executor = AutonomousTaskExecutor() task = { "id": "task-browser", "description": "Atver https://example.com un nolasīt saturu", "tool": "browser_automation", "depends_on": [], } with ( patch( "maris_core.autonomous.executor.start_browser_session", return_value=SimpleNamespace(session_id="browser-session"), ), patch( "maris_core.autonomous.executor.navigate_browser", return_value=SimpleNamespace(url="https://example.com", title="Example"), ), patch( "maris_core.autonomous.executor.extract_browser_text", return_value=SimpleNamespace(text="Sveiks no browser"), ), patch( "maris_core.autonomous.executor.screenshot_browser", return_value=SimpleNamespace(image_base64="aW1hZ2U="), ), patch("maris_core.autonomous.executor.close_browser_session"), ): result = await executor.execute(task, "Iegūt lapas saturu", [task]) assert "https://example.com" in result.summary assert result.artifacts["text"] == "Sveiks no browser" assert result.metrics["extracted_text_length"] == len("Sveiks no browser")