Spaces:
Paused
Paused
| """Tests for agent/context_compressor.py — compression logic, thresholds, truncation fallback.""" | |
| import pytest | |
| from unittest.mock import patch, MagicMock | |
| from agent.context_compressor import ContextCompressor, SUMMARY_PREFIX | |
| def compressor(): | |
| """Create a ContextCompressor with mocked dependencies.""" | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): | |
| c = ContextCompressor( | |
| model="test/model", | |
| threshold_percent=0.85, | |
| protect_first_n=2, | |
| protect_last_n=2, | |
| quiet_mode=True, | |
| ) | |
| return c | |
| class TestShouldCompress: | |
| def test_below_threshold(self, compressor): | |
| compressor.last_prompt_tokens = 50000 | |
| assert compressor.should_compress() is False | |
| def test_above_threshold(self, compressor): | |
| compressor.last_prompt_tokens = 90000 | |
| assert compressor.should_compress() is True | |
| def test_exact_threshold(self, compressor): | |
| compressor.last_prompt_tokens = 85000 | |
| assert compressor.should_compress() is True | |
| def test_explicit_tokens(self, compressor): | |
| assert compressor.should_compress(prompt_tokens=90000) is True | |
| assert compressor.should_compress(prompt_tokens=50000) is False | |
| class TestUpdateFromResponse: | |
| def test_updates_fields(self, compressor): | |
| compressor.update_from_response({ | |
| "prompt_tokens": 5000, | |
| "completion_tokens": 1000, | |
| "total_tokens": 6000, | |
| }) | |
| assert compressor.last_prompt_tokens == 5000 | |
| assert compressor.last_completion_tokens == 1000 | |
| def test_missing_fields_default_zero(self, compressor): | |
| compressor.update_from_response({}) | |
| assert compressor.last_prompt_tokens == 0 | |
| class TestCompress: | |
| def _make_messages(self, n): | |
| return [{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"} for i in range(n)] | |
| def test_too_few_messages_returns_unchanged(self, compressor): | |
| msgs = self._make_messages(4) # protect_first=2 + protect_last=2 + 1 = 5 needed | |
| result = compressor.compress(msgs) | |
| assert result == msgs | |
| def test_truncation_fallback_no_client(self, compressor): | |
| # compressor has client=None, so should use truncation fallback | |
| msgs = [{"role": "system", "content": "System prompt"}] + self._make_messages(10) | |
| result = compressor.compress(msgs) | |
| assert len(result) < len(msgs) | |
| # Should keep system message and last N | |
| assert result[0]["role"] == "system" | |
| assert compressor.compression_count == 1 | |
| def test_compression_increments_count(self, compressor): | |
| msgs = self._make_messages(10) | |
| compressor.compress(msgs) | |
| assert compressor.compression_count == 1 | |
| compressor.compress(msgs) | |
| assert compressor.compression_count == 2 | |
| def test_protects_first_and_last(self, compressor): | |
| msgs = self._make_messages(10) | |
| result = compressor.compress(msgs) | |
| # First 2 messages should be preserved (protect_first_n=2) | |
| # Last 2 messages should be preserved (protect_last_n=2) | |
| assert result[-1]["content"] == msgs[-1]["content"] | |
| # The second-to-last tail message may have the summary merged | |
| # into it when a double-collision prevents a standalone summary | |
| # (head=assistant, tail=user in this fixture). Verify the | |
| # original content is present in either case. | |
| assert msgs[-2]["content"] in result[-2]["content"] | |
| class TestGenerateSummaryNoneContent: | |
| """Regression: content=None (from tool-call-only assistant messages) must not crash.""" | |
| def test_none_content_does_not_crash(self): | |
| mock_response = MagicMock() | |
| mock_response.choices = [MagicMock()] | |
| mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: tool calls happened" | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): | |
| c = ContextCompressor(model="test", quiet_mode=True) | |
| messages = [ | |
| {"role": "user", "content": "do something"}, | |
| {"role": "assistant", "content": None, "tool_calls": [ | |
| {"function": {"name": "search"}} | |
| ]}, | |
| {"role": "tool", "content": "result"}, | |
| {"role": "assistant", "content": None}, | |
| {"role": "user", "content": "thanks"}, | |
| ] | |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): | |
| summary = c._generate_summary(messages) | |
| assert isinstance(summary, str) | |
| assert summary.startswith(SUMMARY_PREFIX) | |
| def test_none_content_in_system_message_compress(self): | |
| """System message with content=None should not crash during compress.""" | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): | |
| c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2) | |
| msgs = [{"role": "system", "content": None}] + [ | |
| {"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"} | |
| for i in range(10) | |
| ] | |
| result = c.compress(msgs) | |
| assert len(result) < len(msgs) | |
| class TestNonStringContent: | |
| """Regression: content as dict (e.g., llama.cpp tool calls) must not crash.""" | |
| def test_dict_content_coerced_to_string(self): | |
| mock_response = MagicMock() | |
| mock_response.choices = [MagicMock()] | |
| mock_response.choices[0].message.content = {"text": "some summary"} | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): | |
| c = ContextCompressor(model="test", quiet_mode=True) | |
| messages = [ | |
| {"role": "user", "content": "do something"}, | |
| {"role": "assistant", "content": "ok"}, | |
| ] | |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): | |
| summary = c._generate_summary(messages) | |
| assert isinstance(summary, str) | |
| assert summary.startswith(SUMMARY_PREFIX) | |
| def test_none_content_coerced_to_empty(self): | |
| mock_response = MagicMock() | |
| mock_response.choices = [MagicMock()] | |
| mock_response.choices[0].message.content = None | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): | |
| c = ContextCompressor(model="test", quiet_mode=True) | |
| messages = [ | |
| {"role": "user", "content": "do something"}, | |
| {"role": "assistant", "content": "ok"}, | |
| ] | |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): | |
| summary = c._generate_summary(messages) | |
| # None content → empty string → standardized compaction handoff prefix added | |
| assert summary is not None | |
| assert summary == SUMMARY_PREFIX | |
| def test_summary_call_does_not_force_temperature(self): | |
| mock_response = MagicMock() | |
| mock_response.choices = [MagicMock()] | |
| mock_response.choices[0].message.content = "ok" | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): | |
| c = ContextCompressor(model="test", quiet_mode=True) | |
| messages = [ | |
| {"role": "user", "content": "do something"}, | |
| {"role": "assistant", "content": "ok"}, | |
| ] | |
| with patch("agent.context_compressor.call_llm", return_value=mock_response) as mock_call: | |
| c._generate_summary(messages) | |
| kwargs = mock_call.call_args.kwargs | |
| assert "temperature" not in kwargs | |
| def test_summary_call_passes_live_main_runtime(self): | |
| mock_response = MagicMock() | |
| mock_response.choices = [MagicMock()] | |
| mock_response.choices[0].message.content = "ok" | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): | |
| c = ContextCompressor( | |
| model="gpt-5.4", | |
| provider="openai-codex", | |
| base_url="https://chatgpt.com/backend-api/codex", | |
| api_key="codex-token", | |
| api_mode="codex_responses", | |
| quiet_mode=True, | |
| ) | |
| messages = [ | |
| {"role": "user", "content": "do something"}, | |
| {"role": "assistant", "content": "ok"}, | |
| ] | |
| with patch("agent.context_compressor.call_llm", return_value=mock_response) as mock_call: | |
| c._generate_summary(messages) | |
| assert mock_call.call_args.kwargs["main_runtime"] == { | |
| "model": "gpt-5.4", | |
| "provider": "openai-codex", | |
| "base_url": "https://chatgpt.com/backend-api/codex", | |
| "api_key": "codex-token", | |
| "api_mode": "codex_responses", | |
| } | |
| class TestSummaryFailureCooldown: | |
| def test_summary_failure_enters_cooldown_and_skips_retry(self): | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): | |
| c = ContextCompressor(model="test", quiet_mode=True) | |
| messages = [ | |
| {"role": "user", "content": "do something"}, | |
| {"role": "assistant", "content": "ok"}, | |
| ] | |
| with patch("agent.context_compressor.call_llm", side_effect=Exception("boom")) as mock_call: | |
| first = c._generate_summary(messages) | |
| second = c._generate_summary(messages) | |
| assert first is None | |
| assert second is None | |
| assert mock_call.call_count == 1 | |
| class TestSummaryPrefixNormalization: | |
| def test_legacy_prefix_is_replaced(self): | |
| summary = ContextCompressor._with_summary_prefix("[CONTEXT SUMMARY]: did work") | |
| assert summary == f"{SUMMARY_PREFIX}\ndid work" | |
| def test_existing_new_prefix_is_not_duplicated(self): | |
| summary = ContextCompressor._with_summary_prefix(f"{SUMMARY_PREFIX}\ndid work") | |
| assert summary == f"{SUMMARY_PREFIX}\ndid work" | |
| class TestCompressWithClient: | |
| def test_summarization_path(self): | |
| mock_client = MagicMock() | |
| mock_response = MagicMock() | |
| mock_response.choices = [MagicMock()] | |
| mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: stuff happened" | |
| mock_client.chat.completions.create.return_value = mock_response | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): | |
| c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2) | |
| msgs = [{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"} for i in range(10)] | |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): | |
| result = c.compress(msgs) | |
| # Should have summary message in the middle | |
| contents = [m.get("content", "") for m in result] | |
| assert any(c.startswith(SUMMARY_PREFIX) for c in contents) | |
| assert len(result) < len(msgs) | |
| def test_summarization_does_not_split_tool_call_pairs(self): | |
| mock_client = MagicMock() | |
| mock_response = MagicMock() | |
| mock_response.choices = [MagicMock()] | |
| mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: compressed middle" | |
| mock_client.chat.completions.create.return_value = mock_response | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): | |
| c = ContextCompressor( | |
| model="test", | |
| quiet_mode=True, | |
| protect_first_n=3, | |
| protect_last_n=4, | |
| ) | |
| msgs = [ | |
| {"role": "user", "content": "Could you address the reviewer comments in PR#71"}, | |
| { | |
| "role": "assistant", | |
| "content": "", | |
| "tool_calls": [ | |
| {"id": "call_a", "type": "function", "function": {"name": "skill_view", "arguments": "{}"}}, | |
| {"id": "call_b", "type": "function", "function": {"name": "skill_view", "arguments": "{}"}}, | |
| ], | |
| }, | |
| {"role": "tool", "tool_call_id": "call_a", "content": "output a"}, | |
| {"role": "tool", "tool_call_id": "call_b", "content": "output b"}, | |
| {"role": "user", "content": "later 1"}, | |
| {"role": "assistant", "content": "later 2"}, | |
| {"role": "tool", "tool_call_id": "call_x", "content": "later output"}, | |
| {"role": "assistant", "content": "later 3"}, | |
| {"role": "user", "content": "later 4"}, | |
| ] | |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): | |
| result = c.compress(msgs) | |
| answered_ids = { | |
| msg.get("tool_call_id") | |
| for msg in result | |
| if msg.get("role") == "tool" and msg.get("tool_call_id") | |
| } | |
| for msg in result: | |
| if msg.get("role") == "assistant" and msg.get("tool_calls"): | |
| for tc in msg["tool_calls"]: | |
| assert tc["id"] in answered_ids | |
| def test_summary_role_avoids_consecutive_user_messages(self): | |
| """Summary role should alternate with the last head message to avoid consecutive same-role messages.""" | |
| mock_client = MagicMock() | |
| mock_response = MagicMock() | |
| mock_response.choices = [MagicMock()] | |
| mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: stuff happened" | |
| mock_client.chat.completions.create.return_value = mock_response | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): | |
| c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2) | |
| # Last head message (index 1) is "assistant" → summary should be "user". | |
| # With min_tail=3, tail = last 3 messages (indices 5-7). | |
| # head_last=assistant, tail_first=assistant → summary_role="user", no collision. | |
| # Need 8 messages: min_for_compress = 2+3+1 = 6, must have > 6. | |
| msgs = [ | |
| {"role": "user", "content": "msg 0"}, | |
| {"role": "assistant", "content": "msg 1"}, | |
| {"role": "user", "content": "msg 2"}, | |
| {"role": "assistant", "content": "msg 3"}, | |
| {"role": "user", "content": "msg 4"}, | |
| {"role": "assistant", "content": "msg 5"}, | |
| {"role": "user", "content": "msg 6"}, | |
| {"role": "assistant", "content": "msg 7"}, | |
| ] | |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): | |
| result = c.compress(msgs) | |
| summary_msg = [ | |
| m for m in result if (m.get("content") or "").startswith(SUMMARY_PREFIX) | |
| ] | |
| assert len(summary_msg) == 1 | |
| assert summary_msg[0]["role"] == "user" | |
| def test_summary_role_avoids_consecutive_user_when_head_ends_with_user(self): | |
| """When last head message is 'user', summary must be 'assistant' to avoid two consecutive user messages.""" | |
| mock_client = MagicMock() | |
| mock_response = MagicMock() | |
| mock_response.choices = [MagicMock()] | |
| mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: stuff happened" | |
| mock_client.chat.completions.create.return_value = mock_response | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): | |
| c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=3, protect_last_n=2) | |
| # Last head message (index 2) is "user" → summary should be "assistant" | |
| msgs = [ | |
| {"role": "system", "content": "system prompt"}, | |
| {"role": "user", "content": "msg 1"}, | |
| {"role": "user", "content": "msg 2"}, # last head — user | |
| {"role": "assistant", "content": "msg 3"}, | |
| {"role": "user", "content": "msg 4"}, | |
| {"role": "assistant", "content": "msg 5"}, | |
| {"role": "user", "content": "msg 6"}, | |
| {"role": "assistant", "content": "msg 7"}, | |
| ] | |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): | |
| result = c.compress(msgs) | |
| summary_msg = [ | |
| m for m in result if (m.get("content") or "").startswith(SUMMARY_PREFIX) | |
| ] | |
| assert len(summary_msg) == 1 | |
| assert summary_msg[0]["role"] == "assistant" | |
| def test_summary_role_flips_to_avoid_tail_collision(self): | |
| """When summary role collides with the first tail message but flipping | |
| doesn't collide with head, the role should be flipped.""" | |
| mock_response = MagicMock() | |
| mock_response.choices = [MagicMock()] | |
| mock_response.choices[0].message.content = "summary text" | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): | |
| c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2) | |
| # Head ends with tool (index 1), tail starts with user (index 6). | |
| # Default: tool → summary_role="user" → collides with tail. | |
| # Flip to "assistant" → tool→assistant is fine. | |
| msgs = [ | |
| {"role": "user", "content": "msg 0"}, | |
| {"role": "assistant", "content": "", "tool_calls": [ | |
| {"id": "call_1", "type": "function", "function": {"name": "t", "arguments": "{}"}}, | |
| ]}, | |
| {"role": "tool", "tool_call_id": "call_1", "content": "result 1"}, | |
| {"role": "assistant", "content": "msg 3"}, | |
| {"role": "user", "content": "msg 4"}, | |
| {"role": "assistant", "content": "msg 5"}, | |
| {"role": "user", "content": "msg 6"}, | |
| {"role": "assistant", "content": "msg 7"}, | |
| ] | |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): | |
| result = c.compress(msgs) | |
| # Verify no consecutive user or assistant messages | |
| for i in range(1, len(result)): | |
| r1 = result[i - 1].get("role") | |
| r2 = result[i].get("role") | |
| if r1 in ("user", "assistant") and r2 in ("user", "assistant"): | |
| assert r1 != r2, f"consecutive {r1} at indices {i-1},{i}" | |
| def test_double_collision_merges_summary_into_tail(self): | |
| """When neither role avoids collision with both neighbors, the summary | |
| should be merged into the first tail message rather than creating a | |
| standalone message that breaks role alternation. | |
| Common scenario: head ends with 'assistant', tail starts with 'user'. | |
| summary='user' collides with tail, summary='assistant' collides with head. | |
| """ | |
| mock_response = MagicMock() | |
| mock_response.choices = [MagicMock()] | |
| mock_response.choices[0].message.content = "summary text" | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): | |
| c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=3, protect_last_n=3) | |
| # Head: [system, user, assistant] → last head = assistant | |
| # Tail: [user, assistant, user] → first tail = user | |
| # summary_role="user" collides with tail, "assistant" collides with head → merge | |
| msgs = [ | |
| {"role": "system", "content": "system prompt"}, | |
| {"role": "user", "content": "msg 1"}, | |
| {"role": "assistant", "content": "msg 2"}, | |
| {"role": "user", "content": "msg 3"}, # compressed | |
| {"role": "assistant", "content": "msg 4"}, # compressed | |
| {"role": "user", "content": "msg 5"}, # compressed | |
| {"role": "user", "content": "msg 6"}, # tail start | |
| {"role": "assistant", "content": "msg 7"}, | |
| {"role": "user", "content": "msg 8"}, | |
| ] | |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): | |
| result = c.compress(msgs) | |
| # Verify no consecutive user or assistant messages | |
| for i in range(1, len(result)): | |
| r1 = result[i - 1].get("role") | |
| r2 = result[i].get("role") | |
| if r1 in ("user", "assistant") and r2 in ("user", "assistant"): | |
| assert r1 != r2, f"consecutive {r1} at indices {i-1},{i}" | |
| # The summary text should be merged into the first tail message | |
| first_tail = [m for m in result if "msg 6" in (m.get("content") or "")] | |
| assert len(first_tail) == 1 | |
| assert "summary text" in first_tail[0]["content"] | |
| def test_double_collision_user_head_assistant_tail(self): | |
| """Reverse double collision: head ends with 'user', tail starts with 'assistant'. | |
| summary='assistant' collides with tail, 'user' collides with head → merge.""" | |
| mock_response = MagicMock() | |
| mock_response.choices = [MagicMock()] | |
| mock_response.choices[0].message.content = "summary text" | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): | |
| c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2) | |
| # Head: [system, user] → last head = user | |
| # Tail: [assistant, user, assistant] → first tail = assistant | |
| # summary_role="assistant" collides with tail, "user" collides with head → merge | |
| # With min_tail=3, tail = last 3 messages (indices 5-7). | |
| # Need 8 messages: min_for_compress = 2+3+1 = 6, must have > 6. | |
| msgs = [ | |
| {"role": "system", "content": "system prompt"}, | |
| {"role": "user", "content": "msg 1"}, | |
| {"role": "assistant", "content": "msg 2"}, # compressed | |
| {"role": "user", "content": "msg 3"}, # compressed | |
| {"role": "assistant", "content": "msg 4"}, # compressed | |
| {"role": "assistant", "content": "msg 5"}, # tail start | |
| {"role": "user", "content": "msg 6"}, | |
| {"role": "assistant", "content": "msg 7"}, | |
| ] | |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): | |
| result = c.compress(msgs) | |
| # Verify no consecutive user or assistant messages | |
| for i in range(1, len(result)): | |
| r1 = result[i - 1].get("role") | |
| r2 = result[i].get("role") | |
| if r1 in ("user", "assistant") and r2 in ("user", "assistant"): | |
| assert r1 != r2, f"consecutive {r1} at indices {i-1},{i}" | |
| # The summary should be merged into the first tail message (assistant at index 5) | |
| first_tail = [m for m in result if "msg 5" in (m.get("content") or "")] | |
| assert len(first_tail) == 1 | |
| assert "summary text" in first_tail[0]["content"] | |
| def test_no_collision_scenarios_still_work(self): | |
| """Verify that the common no-collision cases (head=assistant/tail=assistant, | |
| head=user/tail=user) still produce a standalone summary message.""" | |
| mock_response = MagicMock() | |
| mock_response.choices = [MagicMock()] | |
| mock_response.choices[0].message.content = "summary text" | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): | |
| c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2) | |
| # Head=assistant, Tail=assistant → summary_role="user", no collision. | |
| # With min_tail=3, tail = last 3 messages (indices 5-7). | |
| # Need 8 messages: min_for_compress = 2+3+1 = 6, must have > 6. | |
| msgs = [ | |
| {"role": "user", "content": "msg 0"}, | |
| {"role": "assistant", "content": "msg 1"}, | |
| {"role": "user", "content": "msg 2"}, | |
| {"role": "assistant", "content": "msg 3"}, | |
| {"role": "user", "content": "msg 4"}, | |
| {"role": "assistant", "content": "msg 5"}, | |
| {"role": "user", "content": "msg 6"}, | |
| {"role": "assistant", "content": "msg 7"}, | |
| ] | |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): | |
| result = c.compress(msgs) | |
| summary_msgs = [m for m in result if (m.get("content") or "").startswith(SUMMARY_PREFIX)] | |
| assert len(summary_msgs) == 1, "should have a standalone summary message" | |
| assert summary_msgs[0]["role"] == "user" | |
| def test_summarization_does_not_start_tail_with_tool_outputs(self): | |
| mock_response = MagicMock() | |
| mock_response.choices = [MagicMock()] | |
| mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: compressed middle" | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100000): | |
| c = ContextCompressor( | |
| model="test", | |
| quiet_mode=True, | |
| protect_first_n=2, | |
| protect_last_n=3, | |
| ) | |
| msgs = [ | |
| {"role": "user", "content": "earlier 1"}, | |
| {"role": "assistant", "content": "earlier 2"}, | |
| {"role": "user", "content": "earlier 3"}, | |
| { | |
| "role": "assistant", | |
| "content": "", | |
| "tool_calls": [ | |
| {"id": "call_c", "type": "function", "function": {"name": "search_files", "arguments": "{}"}}, | |
| ], | |
| }, | |
| {"role": "tool", "tool_call_id": "call_c", "content": "output c"}, | |
| {"role": "user", "content": "latest user"}, | |
| ] | |
| with patch("agent.context_compressor.call_llm", return_value=mock_response): | |
| result = c.compress(msgs) | |
| called_ids = { | |
| tc["id"] | |
| for msg in result | |
| if msg.get("role") == "assistant" and msg.get("tool_calls") | |
| for tc in msg["tool_calls"] | |
| } | |
| for msg in result: | |
| if msg.get("role") == "tool" and msg.get("tool_call_id"): | |
| assert msg["tool_call_id"] in called_ids | |
| class TestSummaryTargetRatio: | |
| """Verify that summary_target_ratio properly scales budgets with context window.""" | |
| def test_tail_budget_scales_with_context(self): | |
| """Tail token budget should be threshold_tokens * summary_target_ratio.""" | |
| with patch("agent.context_compressor.get_model_context_length", return_value=200_000): | |
| c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.40) | |
| # 200K * 0.50 threshold * 0.40 ratio = 40K | |
| assert c.tail_token_budget == 40_000 | |
| with patch("agent.context_compressor.get_model_context_length", return_value=1_000_000): | |
| c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.40) | |
| # 1M * 0.50 threshold * 0.40 ratio = 200K | |
| assert c.tail_token_budget == 200_000 | |
| def test_summary_cap_scales_with_context(self): | |
| """Max summary tokens should be 5% of context, capped at 12K.""" | |
| with patch("agent.context_compressor.get_model_context_length", return_value=200_000): | |
| c = ContextCompressor(model="test", quiet_mode=True) | |
| assert c.max_summary_tokens == 10_000 # 200K * 0.05 | |
| with patch("agent.context_compressor.get_model_context_length", return_value=1_000_000): | |
| c = ContextCompressor(model="test", quiet_mode=True) | |
| assert c.max_summary_tokens == 12_000 # capped at 12K ceiling | |
| def test_ratio_clamped(self): | |
| """Ratio should be clamped to [0.10, 0.80].""" | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100_000): | |
| c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.05) | |
| assert c.summary_target_ratio == 0.10 | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100_000): | |
| c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.95) | |
| assert c.summary_target_ratio == 0.80 | |
| def test_default_threshold_is_50_percent(self): | |
| """Default compression threshold should be 50%, with a 64K floor.""" | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100_000): | |
| c = ContextCompressor(model="test", quiet_mode=True) | |
| assert c.threshold_percent == 0.50 | |
| # 50% of 100K = 50K, but the floor is 64K | |
| assert c.threshold_tokens == 64_000 | |
| def test_threshold_floor_does_not_apply_above_128k(self): | |
| """On large-context models the 50% percentage is used directly.""" | |
| with patch("agent.context_compressor.get_model_context_length", return_value=200_000): | |
| c = ContextCompressor(model="test", quiet_mode=True) | |
| # 50% of 200K = 100K, which is above the 64K floor | |
| assert c.threshold_tokens == 100_000 | |
| def test_default_protect_last_n_is_20(self): | |
| """Default protect_last_n should be 20.""" | |
| with patch("agent.context_compressor.get_model_context_length", return_value=100_000): | |
| c = ContextCompressor(model="test", quiet_mode=True) | |
| assert c.protect_last_n == 20 | |
| class TestTokenBudgetTailProtection: | |
| """Tests for token-budget-based tail protection (PR #6240). | |
| The core change: tail protection is now based on a token budget rather | |
| than a fixed message count. This prevents large tool outputs from | |
| blocking compaction. | |
| """ | |
| def budget_compressor(self): | |
| """Compressor with known token budget for tail protection tests.""" | |
| with patch("agent.context_compressor.get_model_context_length", return_value=200_000): | |
| c = ContextCompressor( | |
| model="test/model", | |
| threshold_percent=0.50, # 100K threshold | |
| protect_first_n=2, | |
| protect_last_n=20, | |
| quiet_mode=True, | |
| ) | |
| return c | |
| def test_large_tool_outputs_no_longer_block_compaction(self, budget_compressor): | |
| """The motivating scenario: 20 messages with large tool outputs should | |
| NOT prevent compaction. With message-count tail protection they would | |
| all be protected, leaving nothing to summarize.""" | |
| c = budget_compressor | |
| messages = [ | |
| {"role": "user", "content": "Start task"}, | |
| {"role": "assistant", "content": "On it"}, | |
| ] | |
| # Add 20 messages with large tool outputs (~5K chars each ≈ 1250 tokens) | |
| for i in range(10): | |
| messages.append({ | |
| "role": "assistant", "content": None, | |
| "tool_calls": [{"function": {"name": f"tool_{i}", "arguments": "{}"}}], | |
| }) | |
| messages.append({ | |
| "role": "tool", "content": "x" * 5000, | |
| "tool_call_id": f"call_{i}", | |
| }) | |
| # Add 3 recent small messages | |
| messages.append({"role": "user", "content": "What's the status?"}) | |
| messages.append({"role": "assistant", "content": "Here's what I found..."}) | |
| messages.append({"role": "user", "content": "Continue"}) | |
| # The tail cut should NOT protect all 20 tool messages | |
| head_end = c.protect_first_n | |
| cut = c._find_tail_cut_by_tokens(messages, head_end) | |
| tail_size = len(messages) - cut | |
| # With token budget, the tail should be much smaller than 20+ | |
| assert tail_size < 20, f"Tail {tail_size} messages — large tool outputs are blocking compaction" | |
| # But at least 3 (hard minimum) | |
| assert tail_size >= 3 | |
| def test_min_tail_always_3_messages(self, budget_compressor): | |
| """Even with a tiny token budget, at least 3 messages are protected.""" | |
| c = budget_compressor | |
| # Override to a tiny budget | |
| c.tail_token_budget = 10 | |
| messages = [ | |
| {"role": "user", "content": "hello"}, | |
| {"role": "assistant", "content": "hi"}, | |
| {"role": "user", "content": "do something"}, | |
| {"role": "assistant", "content": "working on it"}, | |
| {"role": "user", "content": "more work"}, | |
| {"role": "assistant", "content": "done"}, | |
| {"role": "user", "content": "thanks"}, | |
| ] | |
| head_end = 2 | |
| cut = c._find_tail_cut_by_tokens(messages, head_end) | |
| tail_size = len(messages) - cut | |
| assert tail_size >= 3, f"Tail is only {tail_size} messages, min should be 3" | |
| def test_soft_ceiling_allows_oversized_message(self, budget_compressor): | |
| """The 1.5x soft ceiling allows an oversized message to be included | |
| rather than splitting it.""" | |
| c = budget_compressor | |
| # Set a small budget — 500 tokens | |
| c.tail_token_budget = 500 | |
| messages = [ | |
| {"role": "user", "content": "hello"}, | |
| {"role": "assistant", "content": "hi"}, | |
| {"role": "user", "content": "read the file"}, | |
| # This message is ~600 tokens (> budget of 500, but < 1.5x = 750) | |
| {"role": "assistant", "content": "a" * 2400}, | |
| {"role": "user", "content": "short"}, | |
| {"role": "assistant", "content": "short reply"}, | |
| {"role": "user", "content": "continue"}, | |
| ] | |
| head_end = 2 | |
| cut = c._find_tail_cut_by_tokens(messages, head_end) | |
| # The oversized message at index 3 should NOT be the cut point | |
| # because 1.5x ceiling = 750 tokens and accumulated would be ~610 | |
| # (short msgs + oversized msg) which is < 750 | |
| tail_size = len(messages) - cut | |
| assert tail_size >= 3 | |
| def test_small_conversation_still_compresses(self, budget_compressor): | |
| """With the new min of 8 messages (head=2 + 3 + 1 guard + 2 middle), | |
| a small but compressible conversation should still compress.""" | |
| c = budget_compressor | |
| # 9 messages: head(2) + 4 middle + 3 tail = compressible | |
| messages = [] | |
| for i in range(9): | |
| role = "user" if i % 2 == 0 else "assistant" | |
| messages.append({"role": role, "content": f"Message {i}"}) | |
| # Should not early-return (needs > protect_first_n + 3 + 1 = 6) | |
| # Mock the summary generation to avoid real API call | |
| with patch.object(c, "_generate_summary", return_value="Summary of conversation"): | |
| result = c.compress(messages, current_tokens=90_000) | |
| # Should have compressed (fewer messages than original) | |
| assert len(result) < len(messages) | |
| def test_prune_with_token_budget(self, budget_compressor): | |
| """_prune_old_tool_results with protect_tail_tokens respects the budget.""" | |
| c = budget_compressor | |
| messages = [ | |
| {"role": "user", "content": "start"}, | |
| {"role": "assistant", "content": None, | |
| "tool_calls": [{"function": {"name": "read_file", "arguments": '{"path": "big.txt"}'}}]}, | |
| {"role": "tool", "content": "x" * 10000, "tool_call_id": "c1"}, # ~2500 tokens | |
| {"role": "assistant", "content": None, | |
| "tool_calls": [{"function": {"name": "read_file", "arguments": '{"path": "small.txt"}'}}]}, | |
| {"role": "tool", "content": "y" * 10000, "tool_call_id": "c2"}, # ~2500 tokens | |
| {"role": "user", "content": "short recent message"}, | |
| {"role": "assistant", "content": "short reply"}, | |
| ] | |
| # With a 1000-token budget, only the last couple messages should be protected | |
| result, pruned = c._prune_old_tool_results( | |
| messages, protect_tail_count=2, protect_tail_tokens=1000, | |
| ) | |
| # At least one old tool result should have been pruned | |
| assert pruned >= 1 | |
| def test_prune_without_token_budget_uses_message_count(self, budget_compressor): | |
| """Without protect_tail_tokens, falls back to message-count behavior.""" | |
| c = budget_compressor | |
| messages = [ | |
| {"role": "user", "content": "start"}, | |
| {"role": "assistant", "content": None, | |
| "tool_calls": [{"function": {"name": "tool", "arguments": "{}"}}]}, | |
| {"role": "tool", "content": "x" * 5000, "tool_call_id": "c1"}, | |
| {"role": "user", "content": "recent"}, | |
| {"role": "assistant", "content": "reply"}, | |
| ] | |
| # protect_tail_count=3 means last 3 messages protected | |
| result, pruned = c._prune_old_tool_results( | |
| messages, protect_tail_count=3, | |
| ) | |
| # Tool at index 2 is outside the protected tail (last 3 = indices 2,3,4) | |
| # so it might or might not be pruned depending on boundary | |
| assert isinstance(pruned, int) | |