| """Tests for agent/context_compressor.py — compression logic, thresholds, truncation fallback.""" |
|
|
| import pytest |
| from unittest.mock import patch, MagicMock |
|
|
| from agent.context_compressor import ContextCompressor, SUMMARY_PREFIX |
|
|
|
|
| @pytest.fixture() |
| def compressor(): |
| """Create a ContextCompressor with mocked dependencies.""" |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor( |
| model="test/model", |
| threshold_percent=0.85, |
| protect_first_n=2, |
| protect_last_n=2, |
| quiet_mode=True, |
| ) |
| return c |
|
|
|
|
| class TestShouldCompress: |
| def test_below_threshold(self, compressor): |
| compressor.last_prompt_tokens = 50000 |
| assert compressor.should_compress() is False |
|
|
| def test_above_threshold(self, compressor): |
| compressor.last_prompt_tokens = 90000 |
| assert compressor.should_compress() is True |
|
|
| def test_exact_threshold(self, compressor): |
| compressor.last_prompt_tokens = 85000 |
| assert compressor.should_compress() is True |
|
|
| def test_explicit_tokens(self, compressor): |
| assert compressor.should_compress(prompt_tokens=90000) is True |
| assert compressor.should_compress(prompt_tokens=50000) is False |
|
|
|
|
|
|
| class TestUpdateFromResponse: |
| def test_updates_fields(self, compressor): |
| compressor.update_from_response({ |
| "prompt_tokens": 5000, |
| "completion_tokens": 1000, |
| "total_tokens": 6000, |
| }) |
| assert compressor.last_prompt_tokens == 5000 |
| assert compressor.last_completion_tokens == 1000 |
|
|
| def test_missing_fields_default_zero(self, compressor): |
| compressor.update_from_response({}) |
| assert compressor.last_prompt_tokens == 0 |
|
|
|
|
|
|
| class TestCompress: |
| def _make_messages(self, n): |
| return [{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"} for i in range(n)] |
|
|
| def test_too_few_messages_returns_unchanged(self, compressor): |
| msgs = self._make_messages(4) |
| result = compressor.compress(msgs) |
| assert result == msgs |
|
|
| def test_truncation_fallback_no_client(self, compressor): |
| |
| msgs = [{"role": "system", "content": "System prompt"}] + self._make_messages(10) |
| result = compressor.compress(msgs) |
| assert len(result) < len(msgs) |
| |
| assert result[0]["role"] == "system" |
| assert compressor.compression_count == 1 |
|
|
| def test_compression_increments_count(self, compressor): |
| msgs = self._make_messages(10) |
| compressor.compress(msgs) |
| assert compressor.compression_count == 1 |
| compressor.compress(msgs) |
| assert compressor.compression_count == 2 |
|
|
| def test_protects_first_and_last(self, compressor): |
| msgs = self._make_messages(10) |
| result = compressor.compress(msgs) |
| |
| |
| assert result[-1]["content"] == msgs[-1]["content"] |
| |
| |
| |
| |
| assert msgs[-2]["content"] in result[-2]["content"] |
|
|
|
|
| class TestGenerateSummaryNoneContent: |
| """Regression: content=None (from tool-call-only assistant messages) must not crash.""" |
|
|
| def test_none_content_does_not_crash(self): |
| mock_response = MagicMock() |
| mock_response.choices = [MagicMock()] |
| mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: tool calls happened" |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor(model="test", quiet_mode=True) |
|
|
| messages = [ |
| {"role": "user", "content": "do something"}, |
| {"role": "assistant", "content": None, "tool_calls": [ |
| {"function": {"name": "search"}} |
| ]}, |
| {"role": "tool", "content": "result"}, |
| {"role": "assistant", "content": None}, |
| {"role": "user", "content": "thanks"}, |
| ] |
|
|
| with patch("agent.context_compressor.call_llm", return_value=mock_response): |
| summary = c._generate_summary(messages) |
| assert isinstance(summary, str) |
| assert summary.startswith(SUMMARY_PREFIX) |
|
|
| def test_none_content_in_system_message_compress(self): |
| """System message with content=None should not crash during compress.""" |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2) |
|
|
| msgs = [{"role": "system", "content": None}] + [ |
| {"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"} |
| for i in range(10) |
| ] |
| result = c.compress(msgs) |
| assert len(result) < len(msgs) |
|
|
|
|
| class TestNonStringContent: |
| """Regression: content as dict (e.g., llama.cpp tool calls) must not crash.""" |
|
|
| def test_dict_content_coerced_to_string(self): |
| mock_response = MagicMock() |
| mock_response.choices = [MagicMock()] |
| mock_response.choices[0].message.content = {"text": "some summary"} |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor(model="test", quiet_mode=True) |
|
|
| messages = [ |
| {"role": "user", "content": "do something"}, |
| {"role": "assistant", "content": "ok"}, |
| ] |
|
|
| with patch("agent.context_compressor.call_llm", return_value=mock_response): |
| summary = c._generate_summary(messages) |
| assert isinstance(summary, str) |
| assert summary.startswith(SUMMARY_PREFIX) |
|
|
| def test_none_content_coerced_to_empty(self): |
| mock_response = MagicMock() |
| mock_response.choices = [MagicMock()] |
| mock_response.choices[0].message.content = None |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor(model="test", quiet_mode=True) |
|
|
| messages = [ |
| {"role": "user", "content": "do something"}, |
| {"role": "assistant", "content": "ok"}, |
| ] |
|
|
| with patch("agent.context_compressor.call_llm", return_value=mock_response): |
| summary = c._generate_summary(messages) |
| |
| assert summary is not None |
| assert summary == SUMMARY_PREFIX |
|
|
| def test_summary_call_does_not_force_temperature(self): |
| mock_response = MagicMock() |
| mock_response.choices = [MagicMock()] |
| mock_response.choices[0].message.content = "ok" |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor(model="test", quiet_mode=True) |
|
|
| messages = [ |
| {"role": "user", "content": "do something"}, |
| {"role": "assistant", "content": "ok"}, |
| ] |
|
|
| with patch("agent.context_compressor.call_llm", return_value=mock_response) as mock_call: |
| c._generate_summary(messages) |
|
|
| kwargs = mock_call.call_args.kwargs |
| assert "temperature" not in kwargs |
|
|
| def test_summary_call_passes_live_main_runtime(self): |
| mock_response = MagicMock() |
| mock_response.choices = [MagicMock()] |
| mock_response.choices[0].message.content = "ok" |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor( |
| model="gpt-5.4", |
| provider="openai-codex", |
| base_url="https://chatgpt.com/backend-api/codex", |
| api_key="codex-token", |
| api_mode="codex_responses", |
| quiet_mode=True, |
| ) |
|
|
| messages = [ |
| {"role": "user", "content": "do something"}, |
| {"role": "assistant", "content": "ok"}, |
| ] |
|
|
| with patch("agent.context_compressor.call_llm", return_value=mock_response) as mock_call: |
| c._generate_summary(messages) |
|
|
| assert mock_call.call_args.kwargs["main_runtime"] == { |
| "model": "gpt-5.4", |
| "provider": "openai-codex", |
| "base_url": "https://chatgpt.com/backend-api/codex", |
| "api_key": "codex-token", |
| "api_mode": "codex_responses", |
| } |
|
|
|
|
| class TestSummaryFailureCooldown: |
| def test_summary_failure_enters_cooldown_and_skips_retry(self): |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor(model="test", quiet_mode=True) |
|
|
| messages = [ |
| {"role": "user", "content": "do something"}, |
| {"role": "assistant", "content": "ok"}, |
| ] |
|
|
| with patch("agent.context_compressor.call_llm", side_effect=Exception("boom")) as mock_call: |
| first = c._generate_summary(messages) |
| second = c._generate_summary(messages) |
|
|
| assert first is None |
| assert second is None |
| assert mock_call.call_count == 1 |
|
|
|
|
| class TestSummaryPrefixNormalization: |
| def test_legacy_prefix_is_replaced(self): |
| summary = ContextCompressor._with_summary_prefix("[CONTEXT SUMMARY]: did work") |
| assert summary == f"{SUMMARY_PREFIX}\ndid work" |
|
|
| def test_existing_new_prefix_is_not_duplicated(self): |
| summary = ContextCompressor._with_summary_prefix(f"{SUMMARY_PREFIX}\ndid work") |
| assert summary == f"{SUMMARY_PREFIX}\ndid work" |
|
|
|
|
| class TestCompressWithClient: |
| def test_system_content_list_gets_compression_note_without_crashing(self): |
| mock_response = MagicMock() |
| mock_response.choices = [MagicMock()] |
| mock_response.choices[0].message.content = "summary text" |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2) |
|
|
| msgs = [ |
| {"role": "system", "content": [{"type": "text", "text": "system prompt"}]}, |
| {"role": "user", "content": "msg 1"}, |
| {"role": "assistant", "content": "msg 2"}, |
| {"role": "user", "content": "msg 3"}, |
| {"role": "assistant", "content": "msg 4"}, |
| {"role": "user", "content": "msg 5"}, |
| {"role": "assistant", "content": "msg 6"}, |
| {"role": "user", "content": "msg 7"}, |
| ] |
|
|
| with patch("agent.context_compressor.call_llm", return_value=mock_response): |
| result = c.compress(msgs) |
|
|
| assert isinstance(result[0]["content"], list) |
| assert any( |
| isinstance(block, dict) |
| and "compacted into a handoff summary" in block.get("text", "") |
| for block in result[0]["content"] |
| ) |
|
|
| def test_summarization_path(self): |
| mock_client = MagicMock() |
| mock_response = MagicMock() |
| mock_response.choices = [MagicMock()] |
| mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: stuff happened" |
| mock_client.chat.completions.create.return_value = mock_response |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2) |
|
|
| msgs = [{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"} for i in range(10)] |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): |
| result = c.compress(msgs) |
|
|
| |
| contents = [m.get("content", "") for m in result] |
| assert any(c.startswith(SUMMARY_PREFIX) for c in contents) |
| assert len(result) < len(msgs) |
|
|
| def test_summarization_does_not_split_tool_call_pairs(self): |
| mock_client = MagicMock() |
| mock_response = MagicMock() |
| mock_response.choices = [MagicMock()] |
| mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: compressed middle" |
| mock_client.chat.completions.create.return_value = mock_response |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor( |
| model="test", |
| quiet_mode=True, |
| protect_first_n=3, |
| protect_last_n=4, |
| ) |
|
|
| msgs = [ |
| {"role": "user", "content": "Could you address the reviewer comments in PR#71"}, |
| { |
| "role": "assistant", |
| "content": "", |
| "tool_calls": [ |
| {"id": "call_a", "type": "function", "function": {"name": "skill_view", "arguments": "{}"}}, |
| {"id": "call_b", "type": "function", "function": {"name": "skill_view", "arguments": "{}"}}, |
| ], |
| }, |
| {"role": "tool", "tool_call_id": "call_a", "content": "output a"}, |
| {"role": "tool", "tool_call_id": "call_b", "content": "output b"}, |
| {"role": "user", "content": "later 1"}, |
| {"role": "assistant", "content": "later 2"}, |
| {"role": "tool", "tool_call_id": "call_x", "content": "later output"}, |
| {"role": "assistant", "content": "later 3"}, |
| {"role": "user", "content": "later 4"}, |
| ] |
|
|
| with patch("agent.context_compressor.call_llm", return_value=mock_response): |
| result = c.compress(msgs) |
|
|
| answered_ids = { |
| msg.get("tool_call_id") |
| for msg in result |
| if msg.get("role") == "tool" and msg.get("tool_call_id") |
| } |
| for msg in result: |
| if msg.get("role") == "assistant" and msg.get("tool_calls"): |
| for tc in msg["tool_calls"]: |
| assert tc["id"] in answered_ids |
|
|
| def test_summary_role_avoids_consecutive_user_messages(self): |
| """Summary role should alternate with the last head message to avoid consecutive same-role messages.""" |
| mock_client = MagicMock() |
| mock_response = MagicMock() |
| mock_response.choices = [MagicMock()] |
| mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: stuff happened" |
| mock_client.chat.completions.create.return_value = mock_response |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2) |
|
|
| |
| |
| |
| |
| msgs = [ |
| {"role": "user", "content": "msg 0"}, |
| {"role": "assistant", "content": "msg 1"}, |
| {"role": "user", "content": "msg 2"}, |
| {"role": "assistant", "content": "msg 3"}, |
| {"role": "user", "content": "msg 4"}, |
| {"role": "assistant", "content": "msg 5"}, |
| {"role": "user", "content": "msg 6"}, |
| {"role": "assistant", "content": "msg 7"}, |
| ] |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): |
| result = c.compress(msgs) |
| summary_msg = [ |
| m for m in result if (m.get("content") or "").startswith(SUMMARY_PREFIX) |
| ] |
| assert len(summary_msg) == 1 |
| assert summary_msg[0]["role"] == "user" |
|
|
| def test_summary_role_avoids_consecutive_user_when_head_ends_with_user(self): |
| """When last head message is 'user', summary must be 'assistant' to avoid two consecutive user messages.""" |
| mock_client = MagicMock() |
| mock_response = MagicMock() |
| mock_response.choices = [MagicMock()] |
| mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: stuff happened" |
| mock_client.chat.completions.create.return_value = mock_response |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=3, protect_last_n=2) |
|
|
| |
| msgs = [ |
| {"role": "system", "content": "system prompt"}, |
| {"role": "user", "content": "msg 1"}, |
| {"role": "user", "content": "msg 2"}, |
| {"role": "assistant", "content": "msg 3"}, |
| {"role": "user", "content": "msg 4"}, |
| {"role": "assistant", "content": "msg 5"}, |
| {"role": "user", "content": "msg 6"}, |
| {"role": "assistant", "content": "msg 7"}, |
| ] |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): |
| result = c.compress(msgs) |
| summary_msg = [ |
| m for m in result if (m.get("content") or "").startswith(SUMMARY_PREFIX) |
| ] |
| assert len(summary_msg) == 1 |
| assert summary_msg[0]["role"] == "assistant" |
|
|
| def test_summary_role_flips_to_avoid_tail_collision(self): |
| """When summary role collides with the first tail message but flipping |
| doesn't collide with head, the role should be flipped.""" |
| mock_response = MagicMock() |
| mock_response.choices = [MagicMock()] |
| mock_response.choices[0].message.content = "summary text" |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2) |
|
|
| |
| |
| |
| msgs = [ |
| {"role": "user", "content": "msg 0"}, |
| {"role": "assistant", "content": "", "tool_calls": [ |
| {"id": "call_1", "type": "function", "function": {"name": "t", "arguments": "{}"}}, |
| ]}, |
| {"role": "tool", "tool_call_id": "call_1", "content": "result 1"}, |
| {"role": "assistant", "content": "msg 3"}, |
| {"role": "user", "content": "msg 4"}, |
| {"role": "assistant", "content": "msg 5"}, |
| {"role": "user", "content": "msg 6"}, |
| {"role": "assistant", "content": "msg 7"}, |
| ] |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): |
| result = c.compress(msgs) |
| |
| for i in range(1, len(result)): |
| r1 = result[i - 1].get("role") |
| r2 = result[i].get("role") |
| if r1 in ("user", "assistant") and r2 in ("user", "assistant"): |
| assert r1 != r2, f"consecutive {r1} at indices {i-1},{i}" |
|
|
| def test_double_collision_merges_summary_into_tail(self): |
| """When neither role avoids collision with both neighbors, the summary |
| should be merged into the first tail message rather than creating a |
| standalone message that breaks role alternation. |
| |
| Common scenario: head ends with 'assistant', tail starts with 'user'. |
| summary='user' collides with tail, summary='assistant' collides with head. |
| """ |
| mock_response = MagicMock() |
| mock_response.choices = [MagicMock()] |
| mock_response.choices[0].message.content = "summary text" |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=3, protect_last_n=3) |
|
|
| |
| |
| |
| msgs = [ |
| {"role": "system", "content": "system prompt"}, |
| {"role": "user", "content": "msg 1"}, |
| {"role": "assistant", "content": "msg 2"}, |
| {"role": "user", "content": "msg 3"}, |
| {"role": "assistant", "content": "msg 4"}, |
| {"role": "user", "content": "msg 5"}, |
| {"role": "user", "content": "msg 6"}, |
| {"role": "assistant", "content": "msg 7"}, |
| {"role": "user", "content": "msg 8"}, |
| ] |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): |
| result = c.compress(msgs) |
|
|
| |
| for i in range(1, len(result)): |
| r1 = result[i - 1].get("role") |
| r2 = result[i].get("role") |
| if r1 in ("user", "assistant") and r2 in ("user", "assistant"): |
| assert r1 != r2, f"consecutive {r1} at indices {i-1},{i}" |
|
|
| |
| first_tail = [m for m in result if "msg 6" in (m.get("content") or "")] |
| assert len(first_tail) == 1 |
| assert "summary text" in first_tail[0]["content"] |
|
|
| def test_double_collision_merges_summary_into_list_tail_content(self): |
| """Structured tail content should accept a merged summary without TypeError.""" |
| mock_response = MagicMock() |
| mock_response.choices = [MagicMock()] |
| mock_response.choices[0].message.content = "summary text" |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=3, protect_last_n=3) |
|
|
| msgs = [ |
| {"role": "system", "content": "system prompt"}, |
| {"role": "user", "content": "msg 1"}, |
| {"role": "assistant", "content": "msg 2"}, |
| {"role": "user", "content": "msg 3"}, |
| {"role": "assistant", "content": "msg 4"}, |
| {"role": "user", "content": "msg 5"}, |
| {"role": "user", "content": [{"type": "text", "text": "msg 6"}]}, |
| {"role": "assistant", "content": "msg 7"}, |
| {"role": "user", "content": "msg 8"}, |
| ] |
|
|
| with patch("agent.context_compressor.call_llm", return_value=mock_response): |
| result = c.compress(msgs) |
|
|
| merged_tail = next( |
| m for m in result |
| if m.get("role") == "user" and isinstance(m.get("content"), list) |
| ) |
| assert isinstance(merged_tail["content"], list) |
| assert "summary text" in merged_tail["content"][0]["text"] |
| assert any( |
| isinstance(block, dict) and block.get("text") == "msg 6" |
| for block in merged_tail["content"] |
| ) |
|
|
| def test_double_collision_user_head_assistant_tail(self): |
| """Reverse double collision: head ends with 'user', tail starts with 'assistant'. |
| summary='assistant' collides with tail, 'user' collides with head → merge.""" |
| mock_response = MagicMock() |
| mock_response.choices = [MagicMock()] |
| mock_response.choices[0].message.content = "summary text" |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2) |
|
|
| |
| |
| |
| |
| |
| msgs = [ |
| {"role": "system", "content": "system prompt"}, |
| {"role": "user", "content": "msg 1"}, |
| {"role": "assistant", "content": "msg 2"}, |
| {"role": "user", "content": "msg 3"}, |
| {"role": "assistant", "content": "msg 4"}, |
| {"role": "assistant", "content": "msg 5"}, |
| {"role": "user", "content": "msg 6"}, |
| {"role": "assistant", "content": "msg 7"}, |
| ] |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): |
| result = c.compress(msgs) |
|
|
| |
| for i in range(1, len(result)): |
| r1 = result[i - 1].get("role") |
| r2 = result[i].get("role") |
| if r1 in ("user", "assistant") and r2 in ("user", "assistant"): |
| assert r1 != r2, f"consecutive {r1} at indices {i-1},{i}" |
|
|
| |
| first_tail = [m for m in result if "msg 5" in (m.get("content") or "")] |
| assert len(first_tail) == 1 |
| assert "summary text" in first_tail[0]["content"] |
|
|
| def test_no_collision_scenarios_still_work(self): |
| """Verify that the common no-collision cases (head=assistant/tail=assistant, |
| head=user/tail=user) still produce a standalone summary message.""" |
| mock_response = MagicMock() |
| mock_response.choices = [MagicMock()] |
| mock_response.choices[0].message.content = "summary text" |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2) |
|
|
| |
| |
| |
| msgs = [ |
| {"role": "user", "content": "msg 0"}, |
| {"role": "assistant", "content": "msg 1"}, |
| {"role": "user", "content": "msg 2"}, |
| {"role": "assistant", "content": "msg 3"}, |
| {"role": "user", "content": "msg 4"}, |
| {"role": "assistant", "content": "msg 5"}, |
| {"role": "user", "content": "msg 6"}, |
| {"role": "assistant", "content": "msg 7"}, |
| ] |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): |
| result = c.compress(msgs) |
| summary_msgs = [m for m in result if (m.get("content") or "").startswith(SUMMARY_PREFIX)] |
| assert len(summary_msgs) == 1, "should have a standalone summary message" |
| assert summary_msgs[0]["role"] == "user" |
|
|
| def test_summarization_does_not_start_tail_with_tool_outputs(self): |
| mock_response = MagicMock() |
| mock_response.choices = [MagicMock()] |
| mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: compressed middle" |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor( |
| model="test", |
| quiet_mode=True, |
| protect_first_n=2, |
| protect_last_n=3, |
| ) |
|
|
| msgs = [ |
| {"role": "user", "content": "earlier 1"}, |
| {"role": "assistant", "content": "earlier 2"}, |
| {"role": "user", "content": "earlier 3"}, |
| { |
| "role": "assistant", |
| "content": "", |
| "tool_calls": [ |
| {"id": "call_c", "type": "function", "function": {"name": "search_files", "arguments": "{}"}}, |
| ], |
| }, |
| {"role": "tool", "tool_call_id": "call_c", "content": "output c"}, |
| {"role": "user", "content": "latest user"}, |
| ] |
|
|
| with patch("agent.context_compressor.call_llm", return_value=mock_response): |
| result = c.compress(msgs) |
|
|
| called_ids = { |
| tc["id"] |
| for msg in result |
| if msg.get("role") == "assistant" and msg.get("tool_calls") |
| for tc in msg["tool_calls"] |
| } |
| for msg in result: |
| if msg.get("role") == "tool" and msg.get("tool_call_id"): |
| assert msg["tool_call_id"] in called_ids |
|
|
|
|
| class TestSummaryTargetRatio: |
| """Verify that summary_target_ratio properly scales budgets with context window.""" |
|
|
| def test_tail_budget_scales_with_context(self): |
| """Tail token budget should be threshold_tokens * summary_target_ratio.""" |
| with patch("agent.context_compressor.get_model_context_length", return_value=200_000): |
| c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.40) |
| |
| assert c.tail_token_budget == 40_000 |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=1_000_000): |
| c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.40) |
| |
| assert c.tail_token_budget == 200_000 |
|
|
| def test_summary_cap_scales_with_context(self): |
| """Max summary tokens should be 5% of context, capped at 12K.""" |
| with patch("agent.context_compressor.get_model_context_length", return_value=200_000): |
| c = ContextCompressor(model="test", quiet_mode=True) |
| assert c.max_summary_tokens == 10_000 |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=1_000_000): |
| c = ContextCompressor(model="test", quiet_mode=True) |
| assert c.max_summary_tokens == 12_000 |
|
|
| def test_ratio_clamped(self): |
| """Ratio should be clamped to [0.10, 0.80].""" |
| with patch("agent.context_compressor.get_model_context_length", return_value=100_000): |
| c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.05) |
| assert c.summary_target_ratio == 0.10 |
|
|
| with patch("agent.context_compressor.get_model_context_length", return_value=100_000): |
| c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.95) |
| assert c.summary_target_ratio == 0.80 |
|
|
| def test_default_threshold_is_50_percent(self): |
| """Default compression threshold should be 50%, with a 64K floor.""" |
| with patch("agent.context_compressor.get_model_context_length", return_value=100_000): |
| c = ContextCompressor(model="test", quiet_mode=True) |
| assert c.threshold_percent == 0.50 |
| |
| assert c.threshold_tokens == 64_000 |
|
|
| def test_threshold_floor_does_not_apply_above_128k(self): |
| """On large-context models the 50% percentage is used directly.""" |
| with patch("agent.context_compressor.get_model_context_length", return_value=200_000): |
| c = ContextCompressor(model="test", quiet_mode=True) |
| |
| assert c.threshold_tokens == 100_000 |
|
|
| def test_default_protect_last_n_is_20(self): |
| """Default protect_last_n should be 20.""" |
| with patch("agent.context_compressor.get_model_context_length", return_value=100_000): |
| c = ContextCompressor(model="test", quiet_mode=True) |
| assert c.protect_last_n == 20 |
|
|
|
|
| class TestTokenBudgetTailProtection: |
| """Tests for token-budget-based tail protection (PR #6240). |
| |
| The core change: tail protection is now based on a token budget rather |
| than a fixed message count. This prevents large tool outputs from |
| blocking compaction. |
| """ |
|
|
| @pytest.fixture() |
| def budget_compressor(self): |
| """Compressor with known token budget for tail protection tests.""" |
| with patch("agent.context_compressor.get_model_context_length", return_value=200_000): |
| c = ContextCompressor( |
| model="test/model", |
| threshold_percent=0.50, |
| protect_first_n=2, |
| protect_last_n=20, |
| quiet_mode=True, |
| ) |
| return c |
|
|
| def test_large_tool_outputs_no_longer_block_compaction(self, budget_compressor): |
| """The motivating scenario: 20 messages with large tool outputs should |
| NOT prevent compaction. With message-count tail protection they would |
| all be protected, leaving nothing to summarize.""" |
| c = budget_compressor |
| messages = [ |
| {"role": "user", "content": "Start task"}, |
| {"role": "assistant", "content": "On it"}, |
| ] |
| |
| for i in range(10): |
| messages.append({ |
| "role": "assistant", "content": None, |
| "tool_calls": [{"function": {"name": f"tool_{i}", "arguments": "{}"}}], |
| }) |
| messages.append({ |
| "role": "tool", "content": "x" * 5000, |
| "tool_call_id": f"call_{i}", |
| }) |
| |
| messages.append({"role": "user", "content": "What's the status?"}) |
| messages.append({"role": "assistant", "content": "Here's what I found..."}) |
| messages.append({"role": "user", "content": "Continue"}) |
|
|
| |
| head_end = c.protect_first_n |
| cut = c._find_tail_cut_by_tokens(messages, head_end) |
| tail_size = len(messages) - cut |
| |
| assert tail_size < 20, f"Tail {tail_size} messages — large tool outputs are blocking compaction" |
| |
| assert tail_size >= 3 |
|
|
| def test_min_tail_always_3_messages(self, budget_compressor): |
| """Even with a tiny token budget, at least 3 messages are protected.""" |
| c = budget_compressor |
| |
| c.tail_token_budget = 10 |
| messages = [ |
| {"role": "user", "content": "hello"}, |
| {"role": "assistant", "content": "hi"}, |
| {"role": "user", "content": "do something"}, |
| {"role": "assistant", "content": "working on it"}, |
| {"role": "user", "content": "more work"}, |
| {"role": "assistant", "content": "done"}, |
| {"role": "user", "content": "thanks"}, |
| ] |
| head_end = 2 |
| cut = c._find_tail_cut_by_tokens(messages, head_end) |
| tail_size = len(messages) - cut |
| assert tail_size >= 3, f"Tail is only {tail_size} messages, min should be 3" |
|
|
| def test_soft_ceiling_allows_oversized_message(self, budget_compressor): |
| """The 1.5x soft ceiling allows an oversized message to be included |
| rather than splitting it.""" |
| c = budget_compressor |
| |
| c.tail_token_budget = 500 |
| messages = [ |
| {"role": "user", "content": "hello"}, |
| {"role": "assistant", "content": "hi"}, |
| {"role": "user", "content": "read the file"}, |
| |
| {"role": "assistant", "content": "a" * 2400}, |
| {"role": "user", "content": "short"}, |
| {"role": "assistant", "content": "short reply"}, |
| {"role": "user", "content": "continue"}, |
| ] |
| head_end = 2 |
| cut = c._find_tail_cut_by_tokens(messages, head_end) |
| |
| |
| |
| tail_size = len(messages) - cut |
| assert tail_size >= 3 |
|
|
| def test_small_conversation_still_compresses(self, budget_compressor): |
| """With the new min of 8 messages (head=2 + 3 + 1 guard + 2 middle), |
| a small but compressible conversation should still compress.""" |
| c = budget_compressor |
| |
| messages = [] |
| for i in range(9): |
| role = "user" if i % 2 == 0 else "assistant" |
| messages.append({"role": role, "content": f"Message {i}"}) |
|
|
| |
| |
| with patch.object(c, "_generate_summary", return_value="Summary of conversation"): |
| result = c.compress(messages, current_tokens=90_000) |
| |
| assert len(result) < len(messages) |
|
|
| def test_prune_with_token_budget(self, budget_compressor): |
| """_prune_old_tool_results with protect_tail_tokens respects the budget.""" |
| c = budget_compressor |
| messages = [ |
| {"role": "user", "content": "start"}, |
| {"role": "assistant", "content": None, |
| "tool_calls": [{"function": {"name": "read_file", "arguments": '{"path": "big.txt"}'}}]}, |
| {"role": "tool", "content": "x" * 10000, "tool_call_id": "c1"}, |
| {"role": "assistant", "content": None, |
| "tool_calls": [{"function": {"name": "read_file", "arguments": '{"path": "small.txt"}'}}]}, |
| {"role": "tool", "content": "y" * 10000, "tool_call_id": "c2"}, |
| {"role": "user", "content": "short recent message"}, |
| {"role": "assistant", "content": "short reply"}, |
| ] |
| |
| result, pruned = c._prune_old_tool_results( |
| messages, protect_tail_count=2, protect_tail_tokens=1000, |
| ) |
| |
| assert pruned >= 1 |
|
|
| def test_prune_without_token_budget_uses_message_count(self, budget_compressor): |
| """Without protect_tail_tokens, falls back to message-count behavior.""" |
| c = budget_compressor |
| messages = [ |
| {"role": "user", "content": "start"}, |
| {"role": "assistant", "content": None, |
| "tool_calls": [{"function": {"name": "tool", "arguments": "{}"}}]}, |
| {"role": "tool", "content": "x" * 5000, "tool_call_id": "c1"}, |
| {"role": "user", "content": "recent"}, |
| {"role": "assistant", "content": "reply"}, |
| ] |
| |
| result, pruned = c._prune_old_tool_results( |
| messages, protect_tail_count=3, |
| ) |
| |
| |
| assert isinstance(pruned, int) |
|
|
|
|
| class TestTruncateToolCallArgsJson: |
| """Regression tests for #11762. |
| |
| The previous implementation produced invalid JSON by slicing |
| ``function.arguments`` mid-string, which caused non-retryable 400s from |
| strict providers (observed on MiniMax) and stuck long sessions in a |
| re-send loop. The helper here must always emit parseable JSON whose |
| shape matches the original — shrunken, not corrupted. |
| """ |
|
|
| def _helper(self): |
| from agent.context_compressor import _truncate_tool_call_args_json |
| return _truncate_tool_call_args_json |
|
|
| def test_shrunken_args_remain_valid_json(self): |
| import json as _json |
| shrink = self._helper() |
| original = _json.dumps({ |
| "path": "~/.hermes/skills/shopping/browser-setup-notes.md", |
| "content": "# Shopping Browser Setup Notes\n\n" + "abc " * 400, |
| }) |
| assert len(original) > 500 |
| shrunk = shrink(original) |
| parsed = _json.loads(shrunk) |
| assert parsed["path"] == "~/.hermes/skills/shopping/browser-setup-notes.md" |
| assert parsed["content"].endswith("...[truncated]") |
| assert len(shrunk) < len(original) |
|
|
| def test_non_json_arguments_pass_through(self): |
| shrink = self._helper() |
| not_json = "this is not json at all, " * 50 |
| assert shrink(not_json) == not_json |
|
|
| def test_short_string_leaves_unchanged(self): |
| import json as _json |
| shrink = self._helper() |
| payload = _json.dumps({"command": "ls -la", "cwd": "/tmp"}) |
| assert _json.loads(shrink(payload)) == {"command": "ls -la", "cwd": "/tmp"} |
|
|
| def test_nested_structures_are_walked(self): |
| import json as _json |
| shrink = self._helper() |
| payload = _json.dumps({ |
| "messages": [ |
| {"role": "user", "content": "x" * 500}, |
| {"role": "assistant", "content": "ok"}, |
| ], |
| "meta": {"note": "y" * 500}, |
| }) |
| parsed = _json.loads(shrink(payload)) |
| assert parsed["messages"][0]["content"].endswith("...[truncated]") |
| assert parsed["messages"][1]["content"] == "ok" |
| assert parsed["meta"]["note"].endswith("...[truncated]") |
|
|
| def test_non_string_leaves_preserved(self): |
| import json as _json |
| shrink = self._helper() |
| payload = _json.dumps({ |
| "retries": 3, |
| "enabled": True, |
| "timeout": None, |
| "items": [1, 2, 3], |
| "note": "z" * 500, |
| }) |
| parsed = _json.loads(shrink(payload)) |
| assert parsed["retries"] == 3 |
| assert parsed["enabled"] is True |
| assert parsed["timeout"] is None |
| assert parsed["items"] == [1, 2, 3] |
| assert parsed["note"].endswith("...[truncated]") |
|
|
| def test_scalar_json_string_gets_shrunk(self): |
| import json as _json |
| shrink = self._helper() |
| payload = _json.dumps("q" * 500) |
| parsed = _json.loads(shrink(payload)) |
| assert isinstance(parsed, str) |
| assert parsed.endswith("...[truncated]") |
|
|
| def test_unicode_preserved(self): |
| import json as _json |
| shrink = self._helper() |
| payload = _json.dumps({"content": "非德满" + ("a" * 500)}) |
| out = shrink(payload) |
| |
| assert "非德满" in out |
|
|
| def test_pass3_emits_valid_json_for_downstream_provider(self): |
| """End-to-end: Pass 3 must never produce the exact failure payload |
| that caused the 400 loop (unterminated string, missing brace).""" |
| import json as _json |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): |
| c = ContextCompressor( |
| model="test/model", |
| threshold_percent=0.85, |
| protect_first_n=1, |
| protect_last_n=1, |
| quiet_mode=True, |
| ) |
| huge_content = "# Shopping Browser Setup Notes\n\n## Overview\n" + "x " * 400 |
| args_payload = _json.dumps({ |
| "path": "~/.hermes/skills/shopping/browser-setup-notes.md", |
| "content": huge_content, |
| }) |
| assert len(args_payload) > 500 |
| messages = [ |
| {"role": "user", "content": "please write two files"}, |
| {"role": "assistant", "content": None, "tool_calls": [ |
| {"id": "call_1", "type": "function", |
| "function": {"name": "write_file", "arguments": args_payload}}, |
| ]}, |
| {"role": "tool", "tool_call_id": "call_1", |
| "content": '{"bytes_written": 727}'}, |
| {"role": "user", "content": "ok"}, |
| {"role": "assistant", "content": "done"}, |
| ] |
| result, _ = c._prune_old_tool_results(messages, protect_tail_count=2) |
| shrunk = result[1]["tool_calls"][0]["function"]["arguments"] |
| |
| parsed = _json.loads(shrunk) |
| assert parsed["path"] == "~/.hermes/skills/shopping/browser-setup-notes.md" |
| assert parsed["content"].endswith("...[truncated]") |
|
|