| """Tests for the async-memory Honcho improvements. |
| |
| Covers: |
| - write_frequency parsing (async / turn / session / int) |
| - resolve_session_name with session_title |
| - HonchoSessionManager.save() routing per write_frequency |
| - async writer thread lifecycle and retry |
| - flush_all() drains pending messages |
| - shutdown() joins the thread |
| """ |
|
|
| import json |
| import queue |
| import threading |
| import time |
| from pathlib import Path |
| from unittest.mock import MagicMock, patch, call |
|
|
| import pytest |
|
|
| from plugins.memory.honcho.client import HonchoClientConfig |
| from plugins.memory.honcho.session import ( |
| HonchoSession, |
| HonchoSessionManager, |
| _ASYNC_SHUTDOWN, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def _make_session(**kwargs) -> HonchoSession: |
| return HonchoSession( |
| key=kwargs.get("key", "cli:test"), |
| user_peer_id=kwargs.get("user_peer_id", "eri"), |
| assistant_peer_id=kwargs.get("assistant_peer_id", "hermes"), |
| honcho_session_id=kwargs.get("honcho_session_id", "cli-test"), |
| messages=kwargs.get("messages", []), |
| ) |
|
|
|
|
| def _make_manager(write_frequency="turn") -> HonchoSessionManager: |
| cfg = HonchoClientConfig( |
| write_frequency=write_frequency, |
| api_key="test-key", |
| enabled=True, |
| ) |
| mgr = HonchoSessionManager(config=cfg) |
| mgr._honcho = MagicMock() |
| return mgr |
|
|
|
|
| |
| |
| |
|
|
| class TestWriteFrequencyParsing: |
| def test_string_async(self, tmp_path): |
| cfg_file = tmp_path / "config.json" |
| cfg_file.write_text(json.dumps({"apiKey": "k", "writeFrequency": "async"})) |
| cfg = HonchoClientConfig.from_global_config(config_path=cfg_file) |
| assert cfg.write_frequency == "async" |
|
|
| def test_string_turn(self, tmp_path): |
| cfg_file = tmp_path / "config.json" |
| cfg_file.write_text(json.dumps({"apiKey": "k", "writeFrequency": "turn"})) |
| cfg = HonchoClientConfig.from_global_config(config_path=cfg_file) |
| assert cfg.write_frequency == "turn" |
|
|
| def test_string_session(self, tmp_path): |
| cfg_file = tmp_path / "config.json" |
| cfg_file.write_text(json.dumps({"apiKey": "k", "writeFrequency": "session"})) |
| cfg = HonchoClientConfig.from_global_config(config_path=cfg_file) |
| assert cfg.write_frequency == "session" |
|
|
| def test_integer_frequency(self, tmp_path): |
| cfg_file = tmp_path / "config.json" |
| cfg_file.write_text(json.dumps({"apiKey": "k", "writeFrequency": 5})) |
| cfg = HonchoClientConfig.from_global_config(config_path=cfg_file) |
| assert cfg.write_frequency == 5 |
|
|
| def test_integer_string_coerced(self, tmp_path): |
| cfg_file = tmp_path / "config.json" |
| cfg_file.write_text(json.dumps({"apiKey": "k", "writeFrequency": "3"})) |
| cfg = HonchoClientConfig.from_global_config(config_path=cfg_file) |
| assert cfg.write_frequency == 3 |
|
|
| def test_host_block_overrides_root(self, tmp_path): |
| cfg_file = tmp_path / "config.json" |
| cfg_file.write_text(json.dumps({ |
| "apiKey": "k", |
| "writeFrequency": "turn", |
| "hosts": {"hermes": {"writeFrequency": "session"}}, |
| })) |
| cfg = HonchoClientConfig.from_global_config(config_path=cfg_file) |
| assert cfg.write_frequency == "session" |
|
|
| def test_defaults_to_async(self, tmp_path): |
| cfg_file = tmp_path / "config.json" |
| cfg_file.write_text(json.dumps({"apiKey": "k"})) |
| cfg = HonchoClientConfig.from_global_config(config_path=cfg_file) |
| assert cfg.write_frequency == "async" |
|
|
|
|
| |
| |
| |
|
|
| class TestResolveSessionNameTitle: |
| def test_manual_override_beats_title(self): |
| cfg = HonchoClientConfig(sessions={"/my/project": "manual-name"}) |
| result = cfg.resolve_session_name("/my/project", session_title="the-title") |
| assert result == "manual-name" |
|
|
| def test_title_beats_dirname(self): |
| cfg = HonchoClientConfig() |
| result = cfg.resolve_session_name("/some/dir", session_title="my-project") |
| assert result == "my-project" |
|
|
| def test_title_with_peer_prefix(self): |
| cfg = HonchoClientConfig(peer_name="eri", session_peer_prefix=True) |
| result = cfg.resolve_session_name("/some/dir", session_title="aeris") |
| assert result == "eri-aeris" |
|
|
| def test_title_sanitized(self): |
| cfg = HonchoClientConfig() |
| result = cfg.resolve_session_name("/some/dir", session_title="my project/name!") |
| |
| assert result == "my-project-name" |
|
|
| def test_title_all_invalid_chars_falls_back_to_dirname(self): |
| cfg = HonchoClientConfig() |
| result = cfg.resolve_session_name("/some/dir", session_title="!!! ###") |
| |
| assert result == "dir" |
|
|
| def test_none_title_falls_back_to_dirname(self): |
| cfg = HonchoClientConfig() |
| result = cfg.resolve_session_name("/some/dir", session_title=None) |
| assert result == "dir" |
|
|
| def test_empty_title_falls_back_to_dirname(self): |
| cfg = HonchoClientConfig() |
| result = cfg.resolve_session_name("/some/dir", session_title="") |
| assert result == "dir" |
|
|
| def test_per_session_uses_session_id(self): |
| cfg = HonchoClientConfig(session_strategy="per-session") |
| result = cfg.resolve_session_name("/some/dir", session_id="20260309_175514_9797dd") |
| assert result == "20260309_175514_9797dd" |
|
|
| def test_per_session_with_peer_prefix(self): |
| cfg = HonchoClientConfig(session_strategy="per-session", peer_name="eri", session_peer_prefix=True) |
| result = cfg.resolve_session_name("/some/dir", session_id="20260309_175514_9797dd") |
| assert result == "eri-20260309_175514_9797dd" |
|
|
| def test_per_session_no_id_falls_back_to_dirname(self): |
| cfg = HonchoClientConfig(session_strategy="per-session") |
| result = cfg.resolve_session_name("/some/dir", session_id=None) |
| assert result == "dir" |
|
|
| def test_title_beats_session_id(self): |
| cfg = HonchoClientConfig(session_strategy="per-session") |
| result = cfg.resolve_session_name("/some/dir", session_title="my-title", session_id="20260309_175514_9797dd") |
| assert result == "my-title" |
|
|
| def test_manual_beats_session_id(self): |
| cfg = HonchoClientConfig(session_strategy="per-session", sessions={"/some/dir": "pinned"}) |
| result = cfg.resolve_session_name("/some/dir", session_id="20260309_175514_9797dd") |
| assert result == "pinned" |
|
|
| def test_global_strategy_returns_workspace(self): |
| cfg = HonchoClientConfig(session_strategy="global", workspace_id="my-workspace") |
| result = cfg.resolve_session_name("/some/dir") |
| assert result == "my-workspace" |
|
|
|
|
| |
| |
| |
|
|
| class TestSaveRouting: |
| def _make_session_with_message(self, mgr=None): |
| sess = _make_session() |
| sess.add_message("user", "hello") |
| sess.add_message("assistant", "hi") |
| if mgr: |
| mgr._cache[sess.key] = sess |
| return sess |
|
|
| def test_turn_flushes_immediately(self): |
| mgr = _make_manager(write_frequency="turn") |
| sess = self._make_session_with_message(mgr) |
| with patch.object(mgr, "_flush_session") as mock_flush: |
| mgr.save(sess) |
| mock_flush.assert_called_once_with(sess) |
|
|
| def test_session_mode_does_not_flush(self): |
| mgr = _make_manager(write_frequency="session") |
| sess = self._make_session_with_message(mgr) |
| with patch.object(mgr, "_flush_session") as mock_flush: |
| mgr.save(sess) |
| mock_flush.assert_not_called() |
|
|
| def test_async_mode_enqueues(self): |
| mgr = _make_manager(write_frequency="async") |
| sess = self._make_session_with_message(mgr) |
| with patch.object(mgr, "_flush_session") as mock_flush: |
| mgr.save(sess) |
| |
| mock_flush.assert_not_called() |
| assert not mgr._async_queue.empty() |
|
|
| def test_int_frequency_flushes_on_nth_turn(self): |
| mgr = _make_manager(write_frequency=3) |
| sess = self._make_session_with_message(mgr) |
| with patch.object(mgr, "_flush_session") as mock_flush: |
| mgr.save(sess) |
| mgr.save(sess) |
| assert mock_flush.call_count == 0 |
| mgr.save(sess) |
| assert mock_flush.call_count == 1 |
|
|
| def test_int_frequency_skips_other_turns(self): |
| mgr = _make_manager(write_frequency=5) |
| sess = self._make_session_with_message(mgr) |
| with patch.object(mgr, "_flush_session") as mock_flush: |
| for _ in range(4): |
| mgr.save(sess) |
| assert mock_flush.call_count == 0 |
| mgr.save(sess) |
| assert mock_flush.call_count == 1 |
|
|
|
|
| |
| |
| |
|
|
| class TestFlushAll: |
| def test_flushes_all_cached_sessions(self): |
| mgr = _make_manager(write_frequency="session") |
| s1 = _make_session(key="s1", honcho_session_id="s1") |
| s2 = _make_session(key="s2", honcho_session_id="s2") |
| s1.add_message("user", "a") |
| s2.add_message("user", "b") |
| mgr._cache = {"s1": s1, "s2": s2} |
|
|
| with patch.object(mgr, "_flush_session") as mock_flush: |
| mgr.flush_all() |
| assert mock_flush.call_count == 2 |
|
|
| def test_flush_all_drains_async_queue(self): |
| mgr = _make_manager(write_frequency="async") |
| sess = _make_session() |
| sess.add_message("user", "pending") |
| mgr._async_queue.put(sess) |
|
|
| with patch.object(mgr, "_flush_session") as mock_flush: |
| mgr.flush_all() |
| |
| assert mock_flush.call_count >= 1 |
|
|
| def test_flush_all_tolerates_errors(self): |
| mgr = _make_manager(write_frequency="session") |
| sess = _make_session() |
| mgr._cache = {"key": sess} |
| with patch.object(mgr, "_flush_session", side_effect=RuntimeError("oops")): |
| |
| mgr.flush_all() |
|
|
|
|
| |
| |
| |
|
|
| class TestAsyncWriterThread: |
| def test_thread_started_on_async_mode(self): |
| mgr = _make_manager(write_frequency="async") |
| assert mgr._async_thread is not None |
| assert mgr._async_thread.is_alive() |
| mgr.shutdown() |
|
|
| def test_no_thread_for_turn_mode(self): |
| mgr = _make_manager(write_frequency="turn") |
| assert mgr._async_thread is None |
| assert mgr._async_queue is None |
|
|
| def test_shutdown_joins_thread(self): |
| mgr = _make_manager(write_frequency="async") |
| assert mgr._async_thread.is_alive() |
| mgr.shutdown() |
| assert not mgr._async_thread.is_alive() |
|
|
| def test_async_writer_calls_flush(self): |
| mgr = _make_manager(write_frequency="async") |
| sess = _make_session() |
| sess.add_message("user", "async msg") |
|
|
| flushed = [] |
|
|
| def capture(s): |
| flushed.append(s) |
| return True |
|
|
| mgr._flush_session = capture |
| mgr._async_queue.put(sess) |
| |
| deadline = time.time() + 2.0 |
| while not flushed and time.time() < deadline: |
| time.sleep(0.05) |
|
|
| mgr.shutdown() |
| assert len(flushed) == 1 |
| assert flushed[0] is sess |
|
|
| def test_shutdown_sentinel_stops_loop(self): |
| mgr = _make_manager(write_frequency="async") |
| thread = mgr._async_thread |
| mgr.shutdown() |
| thread.join(timeout=3) |
| assert not thread.is_alive() |
|
|
|
|
| |
| |
| |
|
|
| class TestAsyncWriterRetry: |
| def test_retries_once_on_failure(self): |
| mgr = _make_manager(write_frequency="async") |
| sess = _make_session() |
| sess.add_message("user", "msg") |
|
|
| call_count = [0] |
|
|
| def flaky_flush(s): |
| call_count[0] += 1 |
| if call_count[0] == 1: |
| raise ConnectionError("network blip") |
| |
|
|
| mgr._flush_session = flaky_flush |
|
|
| with patch("time.sleep"): |
| mgr._async_queue.put(sess) |
| deadline = time.time() + 3.0 |
| while call_count[0] < 2 and time.time() < deadline: |
| time.sleep(0.05) |
|
|
| mgr.shutdown() |
| assert call_count[0] == 2 |
|
|
| def test_drops_after_two_failures(self): |
| mgr = _make_manager(write_frequency="async") |
| sess = _make_session() |
| sess.add_message("user", "msg") |
|
|
| call_count = [0] |
|
|
| def always_fail(s): |
| call_count[0] += 1 |
| raise RuntimeError("always broken") |
|
|
| mgr._flush_session = always_fail |
|
|
| with patch("time.sleep"): |
| mgr._async_queue.put(sess) |
| deadline = time.time() + 3.0 |
| while call_count[0] < 2 and time.time() < deadline: |
| time.sleep(0.05) |
|
|
| mgr.shutdown() |
| |
| assert call_count[0] == 2 |
| assert not mgr._async_thread.is_alive() |
|
|
| def test_retries_when_flush_reports_failure(self): |
| mgr = _make_manager(write_frequency="async") |
| sess = _make_session() |
| sess.add_message("user", "msg") |
|
|
| call_count = [0] |
|
|
| def fail_then_succeed(_session): |
| call_count[0] += 1 |
| return call_count[0] > 1 |
|
|
| mgr._flush_session = fail_then_succeed |
|
|
| with patch("time.sleep"): |
| mgr._async_queue.put(sess) |
| deadline = time.time() + 3.0 |
| while call_count[0] < 2 and time.time() < deadline: |
| time.sleep(0.05) |
|
|
| mgr.shutdown() |
| assert call_count[0] == 2 |
|
|
|
|
| class TestMemoryFileMigrationTargets: |
| def test_soul_upload_targets_ai_peer(self, tmp_path): |
| mgr = _make_manager(write_frequency="turn") |
| session = _make_session( |
| key="cli:test", |
| user_peer_id="custom-user", |
| assistant_peer_id="custom-ai", |
| honcho_session_id="cli-test", |
| ) |
| mgr._cache[session.key] = session |
|
|
| user_peer = MagicMock(name="user-peer") |
| ai_peer = MagicMock(name="ai-peer") |
| mgr._peers_cache[session.user_peer_id] = user_peer |
| mgr._peers_cache[session.assistant_peer_id] = ai_peer |
|
|
| honcho_session = MagicMock() |
| mgr._sessions_cache[session.honcho_session_id] = honcho_session |
|
|
| (tmp_path / "MEMORY.md").write_text("memory facts", encoding="utf-8") |
| (tmp_path / "USER.md").write_text("user profile", encoding="utf-8") |
| (tmp_path / "SOUL.md").write_text("ai identity", encoding="utf-8") |
|
|
| uploaded = mgr.migrate_memory_files(session.key, str(tmp_path)) |
|
|
| assert uploaded is True |
| assert honcho_session.upload_file.call_count == 3 |
|
|
| peer_by_upload_name = {} |
| for call_args in honcho_session.upload_file.call_args_list: |
| payload = call_args.kwargs["file"] |
| peer_by_upload_name[payload[0]] = call_args.kwargs["peer"] |
|
|
| assert peer_by_upload_name["consolidated_memory.md"] is user_peer |
| assert peer_by_upload_name["user_profile.md"] is user_peer |
| assert peer_by_upload_name["agent_soul.md"] is ai_peer |
|
|
|
|
| |
| |
| |
|
|
| class TestNewConfigFieldDefaults: |
| def test_write_frequency_default(self): |
| cfg = HonchoClientConfig() |
| assert cfg.write_frequency == "async" |
|
|
| def test_write_frequency_set(self): |
| cfg = HonchoClientConfig(write_frequency="turn") |
| assert cfg.write_frequency == "turn" |
|
|
|
|
| class TestPrefetchCacheAccessors: |
| def test_set_and_pop_context_result(self): |
| mgr = _make_manager(write_frequency="turn") |
| payload = {"representation": "Known user", "card": "prefers concise replies"} |
|
|
| mgr.set_context_result("cli:test", payload) |
|
|
| assert mgr.pop_context_result("cli:test") == payload |
| assert mgr.pop_context_result("cli:test") == {} |
|
|
|
|