| """Tests for context compression persistence in the gateway. |
| |
| Verifies that when context compression fires during run_conversation(), |
| the compressed messages are properly persisted to both SQLite (via the |
| agent) and JSONL (via the gateway). |
| |
| Bug scenario (pre-fix): |
| 1. Gateway loads 200-message history, passes to agent |
| 2. Agent's run_conversation() compresses to ~30 messages mid-run |
| 3. _compress_context() resets _last_flushed_db_idx = 0 |
| 4. On exit, _flush_messages_to_session_db() calculates: |
| flush_from = max(len(conversation_history=200), _last_flushed_db_idx=0) = 200 |
| 5. messages[200:] is empty (only ~30 messages after compression) |
| 6. Nothing written to new session's SQLite — compressed context lost |
| 7. Gateway's history_offset was still 200, producing empty new_messages |
| 8. Fallback wrote only user/assistant pair — summary lost |
| """ |
|
|
| import os |
| import tempfile |
| from pathlib import Path |
| from unittest.mock import MagicMock, patch |
|
|
| import pytest |
|
|
|
|
| |
| |
| |
|
|
| class TestFlushAfterCompression: |
| """Verify that compressed messages are flushed to the new session's SQLite |
| even when conversation_history (from the original session) is longer than |
| the compressed messages list.""" |
|
|
| def _make_agent(self, session_db): |
| with patch.dict(os.environ, {"OPENROUTER_API_KEY": "test-key"}): |
| from run_agent import AIAgent |
| agent = AIAgent( |
| api_key="test-key", |
| base_url="https://openrouter.ai/api/v1", |
| model="test/model", |
| quiet_mode=True, |
| session_db=session_db, |
| session_id="original-session", |
| skip_context_files=True, |
| skip_memory=True, |
| ) |
| return agent |
|
|
| def test_flush_after_compression_with_long_history(self): |
| """The actual bug: conversation_history longer than compressed messages. |
| |
| Before the fix, flush_from = max(len(conversation_history), 0) = 200, |
| but messages only has ~30 entries, so messages[200:] is empty. |
| After the fix, conversation_history is cleared to None after compression, |
| so flush_from = max(0, 0) = 0, and ALL compressed messages are written. |
| """ |
| from hermes_state import SessionDB |
|
|
| with tempfile.TemporaryDirectory() as tmpdir: |
| db_path = Path(tmpdir) / "test.db" |
| db = SessionDB(db_path=db_path) |
|
|
| agent = self._make_agent(db) |
|
|
| |
| original_history = [ |
| {"role": "user" if i % 2 == 0 else "assistant", |
| "content": f"message {i}"} |
| for i in range(200) |
| ] |
|
|
| |
| agent._flush_messages_to_session_db(original_history, []) |
| original_rows = db.get_messages("original-session") |
| assert len(original_rows) == 200 |
|
|
| |
| agent.session_id = "compressed-session" |
| db.create_session(session_id="compressed-session", source="test") |
| agent._last_flushed_db_idx = 0 |
|
|
| |
| compressed_messages = [ |
| {"role": "user", "content": "[CONTEXT COMPACTION] Summary of work..."}, |
| {"role": "user", "content": "What should we do next?"}, |
| {"role": "assistant", "content": "Let me check..."}, |
| {"role": "user", "content": "new question"}, |
| {"role": "assistant", "content": "new answer"}, |
| ] |
|
|
| |
| |
| |
| agent._flush_messages_to_session_db(compressed_messages, None) |
|
|
| new_rows = db.get_messages("compressed-session") |
| assert len(new_rows) == 5, ( |
| f"Expected 5 compressed messages in new session, got {len(new_rows)}. " |
| f"Compression persistence bug: messages not written to SQLite." |
| ) |
|
|
| def test_flush_with_stale_history_loses_messages(self): |
| """Demonstrates the bug condition: stale conversation_history causes data loss.""" |
| from hermes_state import SessionDB |
|
|
| with tempfile.TemporaryDirectory() as tmpdir: |
| db_path = Path(tmpdir) / "test.db" |
| db = SessionDB(db_path=db_path) |
|
|
| agent = self._make_agent(db) |
|
|
| |
| agent.session_id = "new-session" |
| db.create_session(session_id="new-session", source="test") |
| agent._last_flushed_db_idx = 0 |
|
|
| compressed = [ |
| {"role": "user", "content": "summary"}, |
| {"role": "assistant", "content": "continuing..."}, |
| ] |
|
|
| |
| stale_history = [{"role": "user", "content": f"msg{i}"} for i in range(100)] |
| agent._flush_messages_to_session_db(compressed, stale_history) |
|
|
| rows = db.get_messages("new-session") |
| |
| |
| assert len(rows) == 0, ( |
| "Expected 0 messages with stale conversation_history " |
| "(this test verifies the bug condition exists)" |
| ) |
|
|
|
|
| |
| |
| |
|
|
| class TestGatewayHistoryOffsetAfterSplit: |
| """Verify that when the agent creates a new session during compression, |
| the gateway uses history_offset=0 so all compressed messages are written |
| to the JSONL transcript.""" |
|
|
| def test_history_offset_zero_on_session_split(self): |
| """When agent.session_id differs from the original, history_offset must be 0.""" |
| |
| |
| |
|
|
| original_session_id = "session-abc" |
| agent_session_id = "session-compressed-xyz" |
| agent_history_len = 200 |
|
|
| |
| _session_was_split = (agent_session_id != original_session_id) |
| _effective_history_offset = 0 if _session_was_split else agent_history_len |
|
|
| assert _session_was_split is True |
| assert _effective_history_offset == 0 |
|
|
| def test_history_offset_preserved_without_split(self): |
| """When no compression happened, history_offset is the original length.""" |
| session_id = "session-abc" |
| agent_session_id = "session-abc" |
| agent_history_len = 200 |
|
|
| _session_was_split = (agent_session_id != session_id) |
| _effective_history_offset = 0 if _session_was_split else agent_history_len |
|
|
| assert _session_was_split is False |
| assert _effective_history_offset == 200 |
|
|
| def test_new_messages_extraction_after_split(self): |
| """After compression with offset=0, new_messages should be ALL agent messages.""" |
| |
| agent_messages = [ |
| {"role": "user", "content": "[CONTEXT COMPACTION] Summary..."}, |
| {"role": "user", "content": "recent question"}, |
| {"role": "assistant", "content": "recent answer"}, |
| {"role": "user", "content": "new question"}, |
| {"role": "assistant", "content": "new answer"}, |
| ] |
| history_offset = 0 |
|
|
| new_messages = agent_messages[history_offset:] if len(agent_messages) > history_offset else [] |
| assert len(new_messages) == 5, ( |
| f"Expected all 5 messages with offset=0, got {len(new_messages)}" |
| ) |
|
|
| def test_new_messages_empty_with_stale_offset(self): |
| """Demonstrates the bug: stale offset produces empty new_messages.""" |
| agent_messages = [ |
| {"role": "user", "content": "summary"}, |
| {"role": "assistant", "content": "answer"}, |
| ] |
| |
| history_offset = 200 |
|
|
| new_messages = agent_messages[history_offset:] if len(agent_messages) > history_offset else [] |
| assert len(new_messages) == 0, ( |
| "Expected 0 messages with stale offset=200 (demonstrates the bug)" |
| ) |
|
|