"""Tests for conversational memory retrieval.""" from maris_core.memory_context import ConversationMemoryStore def test_memory_store_retrieves_relevant_session_context() -> None: store = ConversationMemoryStore() store.remember_message("session-a", "user", "Man vajag Python API klientu ar retry loģiku.") store.remember_message("session-a", "assistant", "Iepriekš sagatavoju API klienta struktūru.") store.remember_message("session-b", "assistant", "Nesaistīts mūzikas ieteikums.") matches = store.retrieve_relevant_context("session-a", "Uztaisi retry API klientu Pythonā") assert matches assert "API klient" in matches[0].content assert all("mūzikas" not in match.content for match in matches[:2]) def test_memory_store_persists_sessions_to_disk(tmp_path) -> None: storage_path = tmp_path / "memory.json" store = ConversationMemoryStore(storage_path=str(storage_path)) store.remember_message("session-persist", "user", "Saglabā šo sarunas faktu.") reloaded = ConversationMemoryStore(storage_path=str(storage_path)) matches = reloaded.retrieve_relevant_context("session-persist", "sarunas faktu") assert storage_path.exists() assert matches assert matches[0].content == "Saglabā šo sarunas faktu." def test_memory_store_summarizes_recent_session_backbone() -> None: store = ConversationMemoryStore() store.remember_message( "session-s", "user", "Mēs plānojam incident response procesu visai komandai." ) store.remember_message( "session-s", "assistant", "Prioritātes bija alerting ownership, eskalācija un postmortem disciplīna.", ) summary = store.summarize_session("session-s") assert summary assert any("incident response" in item.lower() for item in summary) assert any("prioritātes" in item.lower() for item in summary) def test_memory_store_summarizes_user_focus_with_goal_labels() -> None: store = ConversationMemoryStore() store.remember_message( "session-focus", "user", "Es gribu uzbūvēt AI asistentu, kas atceras manu iepriekšējo kontekstu.", ) store.remember_message( "session-focus", "user", "Man svarīgi, lai atbildes paliek uzticamas un pamatotas ar faktiem.", ) summary = store.summarize_user_focus("session-focus", query="Kā panākt uzticamu AI atmiņu?") assert summary assert any(item.startswith("Mērķis:") for item in summary) assert any(item.startswith("Priekšroka:") for item in summary) assert any("ai asistentu" in item.lower() for item in summary) def test_memory_store_summarizes_active_threads() -> None: store = ConversationMemoryStore() store.remember_message( "session-thread", "user", "Kā man uzbūvēt uzticamu AI asistentu ar ilgtermiņa atmiņu?", ) store.remember_message( "session-thread", "user", "Turpinām ar nākamajiem 3 soļiem un prioritātēm.", ) summary = store.summarize_active_threads( "session-thread", query="Kādi ir nākamie soļi uzticamam AI asistentam?", ) assert summary assert any(item.startswith("Atvērtais jautājums:") for item in summary) assert any(item.startswith("Aktīvais virziens:") for item in summary) assert any("nākamajiem 3 soļiem" in item.lower() for item in summary) def test_memory_store_continuation_query_recalls_recent_session_context() -> None: store = ConversationMemoryStore() store.remember_message( "session-continuation", "user", "Mēs būvējam core assistant ar memory recall un tool grounding.", ) store.remember_message( "session-continuation", "assistant", "Līdz šim prioritātes bija hallucination samazināšana un stabilāka tool lietošana.", ) matches = store.retrieve_relevant_context("session-continuation", "Turpinām šo pašu virzienu.") assert matches assert any("hallucination" in match.content.lower() for match in matches)