Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """Regression tests for the 2026-05-03 infinite-compaction-loop bug. | |
| Pod logs from prod-114 showed sessions stuck retrying compaction every | |
| few seconds because a single oversized tool output in the untouched tail | |
| kept the post-compact context above the 90% threshold: | |
| Context compacted: 200001 -> 215566 tokens | |
| Context compacted: 215566 -> 215572 tokens | |
| ContextWindowExceededError β forcing compaction | |
| ... (continues for 5+ minutes) | |
| These tests cover three fixes: | |
| 1. ``_truncate_oversized`` replaces oversized message content with a | |
| placeholder and preserves all extended-thinking metadata fields. | |
| 2. ``compact()`` raises ``CompactionFailedError`` when the post-compact | |
| context is still over threshold. | |
| 3. ``_compact_and_notify`` catches the error, sets ``session.is_running | |
| = False``, and emits a ``session_terminated`` event so callers can | |
| exit the agent loop. | |
| The P0 caught by PR #213 review (loop didn't actually exit on | |
| ``is_running = False``) would have been caught by an end-to-end | |
| behavioral test of #3 β that gap is closed by the | |
| ``test_compact_and_notify_terminates_session`` case below. | |
| """ | |
| from __future__ import annotations | |
| from unittest.mock import AsyncMock, MagicMock, patch | |
| import pytest | |
| from litellm import Message | |
| from agent.context_manager.manager import ( | |
| CompactionFailedError, | |
| ContextManager, | |
| _MAX_TOKENS_PER_MESSAGE, | |
| ) | |
| # ββ helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _make_cm( | |
| *, | |
| model_max_tokens: int = 100_000, | |
| compact_size: int = 1_000, | |
| untouched_messages: int = 5, | |
| ) -> ContextManager: | |
| cm = ContextManager.__new__(ContextManager) | |
| cm.system_prompt = "system" | |
| cm.model_max_tokens = model_max_tokens | |
| cm.compact_size = compact_size | |
| cm.running_context_usage = 0 | |
| cm.untouched_messages = untouched_messages | |
| cm.items = [Message(role="system", content="system")] | |
| cm.on_message_added = None | |
| return cm | |
| def _msg(role: str, content: str | None = "x", **extra) -> Message: | |
| return Message(role=role, content=content, **extra) | |
| # ββ _truncate_oversized ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def test_truncate_oversized_skips_messages_below_threshold(): | |
| cm = _make_cm() | |
| msgs = [_msg("user", "small content")] | |
| with patch("litellm.token_counter", return_value=100): | |
| out = cm._truncate_oversized(msgs, "anthropic/claude-opus-4-6") | |
| assert out == msgs # unchanged | |
| def test_truncate_oversized_replaces_content_above_threshold(): | |
| cm = _make_cm() | |
| big = "x" * (_MAX_TOKENS_PER_MESSAGE * 5) | |
| msgs = [_msg("user", big)] | |
| # token_counter returns the simulated big size for any message in this test | |
| with patch("litellm.token_counter", return_value=_MAX_TOKENS_PER_MESSAGE * 2): | |
| out = cm._truncate_oversized(msgs, "anthropic/claude-opus-4-6") | |
| assert len(out) == 1 | |
| assert out[0].content != big | |
| assert "[truncated for compaction" in out[0].content | |
| assert str(_MAX_TOKENS_PER_MESSAGE * 2) in out[0].content | |
| def test_truncate_oversized_preserves_thinking_blocks(): | |
| """Anthropic extended-thinking models reject the next request with | |
| ``Invalid signature in thinking block`` if a prior assistant message | |
| drops thinking_blocks. Truncation must keep this metadata. | |
| """ | |
| cm = _make_cm() | |
| big = "x" * (_MAX_TOKENS_PER_MESSAGE * 5) | |
| thinking = [{"type": "thinking", "thinking": "...", "signature": "abc123"}] | |
| msg = Message(role="assistant", content=big) | |
| msg.thinking_blocks = thinking | |
| msg.reasoning_content = "deep thought" | |
| with patch("litellm.token_counter", return_value=_MAX_TOKENS_PER_MESSAGE * 2): | |
| out = cm._truncate_oversized([msg], "anthropic/claude-opus-4-6") | |
| assert getattr(out[0], "thinking_blocks", None) == thinking | |
| assert getattr(out[0], "reasoning_content", None) == "deep thought" | |
| def test_truncate_oversized_never_touches_system_message(): | |
| """The system prompt is the agent's instructions β must never be truncated. | |
| Caught by the integration smoke test on PR #213: when items has fewer than | |
| ``untouched_messages`` entries, the slice math in ``compact()`` can let | |
| ``items[0]`` (the system message) leak into the ``recent_messages`` list | |
| that gets passed to ``_truncate_oversized``. The function must guard | |
| explicitly against this. | |
| """ | |
| cm = _make_cm() | |
| huge_system = "x" * (_MAX_TOKENS_PER_MESSAGE * 5) | |
| msgs = [_msg("system", huge_system)] | |
| with patch("litellm.token_counter", return_value=_MAX_TOKENS_PER_MESSAGE * 2): | |
| out = cm._truncate_oversized(msgs, "anthropic/claude-opus-4-6") | |
| assert out[0].content == huge_system, "system message must never be truncated" | |
| def test_truncate_oversized_resilient_to_token_counter_failure(): | |
| """token_counter occasionally raises on edge-case content. A blip there | |
| must NOT drop the message β better to leave it and let compaction | |
| handle it (or fail with CompactionFailedError) than to lose data. | |
| """ | |
| cm = _make_cm() | |
| msgs = [_msg("user", "anything")] | |
| with patch("litellm.token_counter", side_effect=Exception("counter blew up")): | |
| out = cm._truncate_oversized(msgs, "anthropic/claude-opus-4-6") | |
| assert out == msgs | |
| # ββ compact() raises CompactionFailedError βββββββββββββββββββββββββββββ | |
| async def test_compact_raises_when_post_compact_still_over_threshold(): | |
| """The whole point of the new behavior: don't loop on a useless | |
| compaction call. Raise so the caller can terminate the session. | |
| """ | |
| cm = _make_cm(model_max_tokens=100_000) | |
| # Build a context that's "over threshold" from the start | |
| cm.items = [ | |
| Message(role="system", content="system"), | |
| Message(role="user", content="task"), | |
| Message(role="assistant", content="x" * 1000), | |
| Message(role="user", content="follow-up 1"), | |
| Message(role="assistant", content="reply 1"), | |
| Message(role="user", content="follow-up 2"), | |
| Message(role="assistant", content="reply 2"), | |
| ] | |
| cm.running_context_usage = 95_000 # over threshold (90% of 100k = 90k) | |
| # Mock summarize_messages to return a tiny summary; mock _recompute_usage | |
| # to keep the running_context_usage above threshold so compact() raises. | |
| async def fake_summarize(*args, **kwargs): | |
| return ("summary", 10) | |
| def fake_recompute(self, model_name): | |
| # Simulate post-compact still over threshold | |
| self.running_context_usage = 95_000 | |
| with ( | |
| patch( | |
| "agent.context_manager.manager.summarize_messages", | |
| side_effect=fake_summarize, | |
| ), | |
| patch.object(ContextManager, "_recompute_usage", fake_recompute), | |
| # Avoid token_counter calls in _truncate_oversized | |
| patch("litellm.token_counter", return_value=100), | |
| ): | |
| with pytest.raises(CompactionFailedError): | |
| await cm.compact( | |
| model_name="anthropic/claude-opus-4-6", | |
| tool_specs=None, | |
| hf_token=None, | |
| session=None, | |
| ) | |
| async def test_compact_does_not_duplicate_system_when_idx_is_zero(): | |
| """Regression for the second P0 caught by bot review on PR #213. | |
| When ``len(items) == untouched_messages`` (the canonical 5-message | |
| early-compaction case: system + user-task + giant-tool-output + | |
| user-followup + assistant-reply), ``idx`` initialises to 0 and the | |
| walk-back ``while idx > 1`` loop is a no-op. Without an explicit | |
| clamp ``if idx < 1: idx = 1``, ``recent_messages = items[0:]`` | |
| starts at the system message, and the rebuild duplicates system + | |
| first-user. Anthropic API rejects two system messages. | |
| """ | |
| cm = _make_cm(model_max_tokens=100_000, untouched_messages=5) | |
| cm.items = [ | |
| Message(role="system", content="system"), | |
| Message(role="user", content="task"), | |
| Message(role="assistant", content="ok"), # would be the only | |
| # message_to_summarize but the | |
| # idx bug pulls it into recent | |
| Message(role="user", content="followup"), | |
| Message(role="assistant", content="reply"), | |
| ] # exactly 5 = untouched_messages, so idx initialises to 0 | |
| cm.running_context_usage = 95_000 | |
| async def fake_summarize(*args, **kwargs): | |
| return ("summary", 10) | |
| def fake_recompute(self, model_name): | |
| self.running_context_usage = 5_000 | |
| with ( | |
| patch( | |
| "agent.context_manager.manager.summarize_messages", | |
| side_effect=fake_summarize, | |
| ), | |
| patch.object(ContextManager, "_recompute_usage", fake_recompute), | |
| patch("litellm.token_counter", return_value=100), | |
| ): | |
| await cm.compact( | |
| model_name="anthropic/claude-opus-4-6", | |
| tool_specs=None, | |
| hf_token=None, | |
| session=None, | |
| ) | |
| # Critical assertion: only ONE system message in items | |
| system_count = sum(1 for m in cm.items if m.role == "system") | |
| assert system_count == 1, ( | |
| f"Expected exactly 1 system message, found {system_count}. " | |
| f"Roles: {[m.role for m in cm.items]}" | |
| ) | |
| # And the first-user "task" message must also appear exactly once. | |
| # Bot review on PR #213 caught a follow-up bug: clamping idx=1 | |
| # excludes the system but still overlaps with first_user_idx (also 1), | |
| # so first_user_msg ends up in BOTH head and recent_messages β | |
| # duplicate user message β Anthropic 400 (two consecutive user roles). | |
| task_count = sum( | |
| 1 for m in cm.items if m.role == "user" and (m.content or "") == "task" | |
| ) | |
| assert task_count == 1, ( | |
| f"Expected exactly 1 'task' user message, found {task_count}. " | |
| f"Roles+content: {[(m.role, (m.content or '')[:20]) for m in cm.items]}" | |
| ) | |
| # Defense in depth: no two consecutive same-role messages (Anthropic | |
| # API contract). System counts separately. | |
| non_system = [m for m in cm.items if m.role != "system"] | |
| for i in range(1, len(non_system)): | |
| assert non_system[i].role != non_system[i - 1].role, ( | |
| f"Two consecutive {non_system[i].role} messages at non-system " | |
| f"position {i - 1},{i} β Anthropic API rejects this. " | |
| f"Roles: {[m.role for m in cm.items]}" | |
| ) | |
| async def test_compact_succeeds_when_post_compact_under_threshold(): | |
| """Happy path: when compaction does its job, no exception raised.""" | |
| cm = _make_cm(model_max_tokens=100_000) | |
| cm.items = [ | |
| Message(role="system", content="system"), | |
| Message(role="user", content="task"), | |
| Message(role="assistant", content="x" * 1000), | |
| Message(role="user", content="follow-up"), | |
| Message(role="assistant", content="reply"), | |
| Message(role="user", content="follow-up 2"), | |
| Message(role="assistant", content="reply 2"), | |
| ] | |
| cm.running_context_usage = 95_000 | |
| async def fake_summarize(*args, **kwargs): | |
| return ("summary", 10) | |
| def fake_recompute(self, model_name): | |
| self.running_context_usage = 5_000 # well under threshold | |
| with ( | |
| patch( | |
| "agent.context_manager.manager.summarize_messages", | |
| side_effect=fake_summarize, | |
| ), | |
| patch.object(ContextManager, "_recompute_usage", fake_recompute), | |
| patch("litellm.token_counter", return_value=100), | |
| ): | |
| await cm.compact( | |
| model_name="anthropic/claude-opus-4-6", | |
| tool_specs=None, | |
| hf_token=None, | |
| session=None, | |
| ) | |
| assert cm.running_context_usage == 5_000 | |
| # ββ _compact_and_notify behavior on CompactionFailedError ββββββββββββββ | |
| async def test_compact_and_notify_terminates_session_on_failure(): | |
| """The PR's #213's P0 bug-class: setting ``is_running = False`` is | |
| only effective if the agent loop checks it. This test asserts the | |
| flag IS set AND a ``session_terminated`` event is emitted, so a | |
| follow-up assertion in the agent loop test catches the loop-exit. | |
| """ | |
| from agent.core.agent_loop import _compact_and_notify | |
| session = MagicMock() | |
| session.session_id = "sess-123" | |
| session.is_running = True | |
| session.config.model_name = "anthropic/claude-opus-4-6" | |
| session.hf_token = None | |
| session.tool_router.get_tool_specs_for_llm.return_value = [] | |
| session.send_event = AsyncMock() | |
| cm = MagicMock() | |
| cm.running_context_usage = 95_000 | |
| cm.compaction_threshold = 90_000 | |
| cm.model_max_tokens = 100_000 | |
| cm.items = [] | |
| cm.needs_compaction = True | |
| cm.compact = AsyncMock(side_effect=CompactionFailedError("ineffective")) | |
| session.context_manager = cm | |
| await _compact_and_notify(session) | |
| assert session.is_running is False, ( | |
| "_compact_and_notify must set is_running=False so the agent loop " | |
| "can exit. P0 caught by bot review on PR #213 was that the loop " | |
| "didn't actually check this flag." | |
| ) | |
| assert session.send_event.await_count == 1 | |
| event = session.send_event.await_args.args[0] | |
| assert event.event_type == "session_terminated" | |
| assert event.data["reason"] == "compaction_failed" | |
| assert event.data["context_usage"] == 95_000 | |
| async def test_compact_and_notify_passes_through_on_success(): | |
| """When compaction succeeds, no termination event, is_running stays True.""" | |
| from agent.core.agent_loop import _compact_and_notify | |
| session = MagicMock() | |
| session.session_id = "sess-456" | |
| session.is_running = True | |
| session.config.model_name = "anthropic/claude-opus-4-6" | |
| session.hf_token = None | |
| session.tool_router.get_tool_specs_for_llm.return_value = [] | |
| session.send_event = AsyncMock() | |
| cm = MagicMock() | |
| cm.running_context_usage = 5_000 | |
| cm.compaction_threshold = 90_000 | |
| cm.model_max_tokens = 100_000 | |
| cm.items = [] | |
| cm.needs_compaction = False | |
| cm.compact = AsyncMock(return_value=None) # success | |
| session.context_manager = cm | |
| # Pretend old_usage == new_usage so the "compacted" event is also skipped | |
| await _compact_and_notify(session) | |
| assert session.is_running is True | |
| # No session_terminated event emitted | |
| for call in session.send_event.await_args_list: | |
| ev = call.args[0] | |
| assert ev.event_type != "session_terminated" | |