File size: 14,660 Bytes
d84b454
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
754345f
 
 
 
d84b454
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
754345f
 
d84b454
 
 
 
 
 
 
 
 
 
 
 
754345f
 
 
 
d84b454
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
754345f
d84b454
 
 
 
 
 
 
 
 
754345f
d84b454
754345f
d84b454
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
754345f
 
 
 
d84b454
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
"""Regression tests for the 2026-05-03 infinite-compaction-loop bug.

Pod logs from prod-114 showed sessions stuck retrying compaction every
few seconds because a single oversized tool output in the untouched tail
kept the post-compact context above the 90% threshold:

    Context compacted: 200001 -> 215566 tokens
    Context compacted: 215566 -> 215572 tokens
    ContextWindowExceededError β€” forcing compaction
    ... (continues for 5+ minutes)

These tests cover three fixes:

1. ``_truncate_oversized`` replaces oversized message content with a
   placeholder and preserves all extended-thinking metadata fields.
2. ``compact()`` raises ``CompactionFailedError`` when the post-compact
   context is still over threshold.
3. ``_compact_and_notify`` catches the error, sets ``session.is_running
   = False``, and emits a ``session_terminated`` event so callers can
   exit the agent loop.

The P0 caught by PR #213 review (loop didn't actually exit on
``is_running = False``) would have been caught by an end-to-end
behavioral test of #3 β€” that gap is closed by the
``test_compact_and_notify_terminates_session`` case below.
"""

from __future__ import annotations

from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from litellm import Message

from agent.context_manager.manager import (
    CompactionFailedError,
    ContextManager,
    _MAX_TOKENS_PER_MESSAGE,
)


# ── helpers ────────────────────────────────────────────────────────────


def _make_cm(
    *,
    model_max_tokens: int = 100_000,
    compact_size: int = 1_000,
    untouched_messages: int = 5,
) -> ContextManager:
    cm = ContextManager.__new__(ContextManager)
    cm.system_prompt = "system"
    cm.model_max_tokens = model_max_tokens
    cm.compact_size = compact_size
    cm.running_context_usage = 0
    cm.untouched_messages = untouched_messages
    cm.items = [Message(role="system", content="system")]
    cm.on_message_added = None
    return cm


def _msg(role: str, content: str | None = "x", **extra) -> Message:
    return Message(role=role, content=content, **extra)


# ── _truncate_oversized ────────────────────────────────────────────────


def test_truncate_oversized_skips_messages_below_threshold():
    cm = _make_cm()
    msgs = [_msg("user", "small content")]
    with patch("litellm.token_counter", return_value=100):
        out = cm._truncate_oversized(msgs, "anthropic/claude-opus-4-6")
    assert out == msgs  # unchanged


def test_truncate_oversized_replaces_content_above_threshold():
    cm = _make_cm()
    big = "x" * (_MAX_TOKENS_PER_MESSAGE * 5)
    msgs = [_msg("user", big)]
    # token_counter returns the simulated big size for any message in this test
    with patch("litellm.token_counter", return_value=_MAX_TOKENS_PER_MESSAGE * 2):
        out = cm._truncate_oversized(msgs, "anthropic/claude-opus-4-6")
    assert len(out) == 1
    assert out[0].content != big
    assert "[truncated for compaction" in out[0].content
    assert str(_MAX_TOKENS_PER_MESSAGE * 2) in out[0].content


def test_truncate_oversized_preserves_thinking_blocks():
    """Anthropic extended-thinking models reject the next request with
    ``Invalid signature in thinking block`` if a prior assistant message
    drops thinking_blocks. Truncation must keep this metadata.
    """
    cm = _make_cm()
    big = "x" * (_MAX_TOKENS_PER_MESSAGE * 5)
    thinking = [{"type": "thinking", "thinking": "...", "signature": "abc123"}]
    msg = Message(role="assistant", content=big)
    msg.thinking_blocks = thinking
    msg.reasoning_content = "deep thought"
    with patch("litellm.token_counter", return_value=_MAX_TOKENS_PER_MESSAGE * 2):
        out = cm._truncate_oversized([msg], "anthropic/claude-opus-4-6")
    assert getattr(out[0], "thinking_blocks", None) == thinking
    assert getattr(out[0], "reasoning_content", None) == "deep thought"


def test_truncate_oversized_never_touches_system_message():
    """The system prompt is the agent's instructions β€” must never be truncated.

    Caught by the integration smoke test on PR #213: when items has fewer than
    ``untouched_messages`` entries, the slice math in ``compact()`` can let
    ``items[0]`` (the system message) leak into the ``recent_messages`` list
    that gets passed to ``_truncate_oversized``. The function must guard
    explicitly against this.
    """
    cm = _make_cm()
    huge_system = "x" * (_MAX_TOKENS_PER_MESSAGE * 5)
    msgs = [_msg("system", huge_system)]
    with patch("litellm.token_counter", return_value=_MAX_TOKENS_PER_MESSAGE * 2):
        out = cm._truncate_oversized(msgs, "anthropic/claude-opus-4-6")
    assert out[0].content == huge_system, "system message must never be truncated"


def test_truncate_oversized_resilient_to_token_counter_failure():
    """token_counter occasionally raises on edge-case content. A blip there
    must NOT drop the message β€” better to leave it and let compaction
    handle it (or fail with CompactionFailedError) than to lose data.
    """
    cm = _make_cm()
    msgs = [_msg("user", "anything")]
    with patch("litellm.token_counter", side_effect=Exception("counter blew up")):
        out = cm._truncate_oversized(msgs, "anthropic/claude-opus-4-6")
    assert out == msgs


# ── compact() raises CompactionFailedError ─────────────────────────────


@pytest.mark.asyncio
async def test_compact_raises_when_post_compact_still_over_threshold():
    """The whole point of the new behavior: don't loop on a useless
    compaction call. Raise so the caller can terminate the session.
    """
    cm = _make_cm(model_max_tokens=100_000)
    # Build a context that's "over threshold" from the start
    cm.items = [
        Message(role="system", content="system"),
        Message(role="user", content="task"),
        Message(role="assistant", content="x" * 1000),
        Message(role="user", content="follow-up 1"),
        Message(role="assistant", content="reply 1"),
        Message(role="user", content="follow-up 2"),
        Message(role="assistant", content="reply 2"),
    ]
    cm.running_context_usage = 95_000  # over threshold (90% of 100k = 90k)

    # Mock summarize_messages to return a tiny summary; mock _recompute_usage
    # to keep the running_context_usage above threshold so compact() raises.
    async def fake_summarize(*args, **kwargs):
        return ("summary", 10)

    def fake_recompute(self, model_name):
        # Simulate post-compact still over threshold
        self.running_context_usage = 95_000

    with (
        patch(
            "agent.context_manager.manager.summarize_messages",
            side_effect=fake_summarize,
        ),
        patch.object(ContextManager, "_recompute_usage", fake_recompute),
        # Avoid token_counter calls in _truncate_oversized
        patch("litellm.token_counter", return_value=100),
    ):
        with pytest.raises(CompactionFailedError):
            await cm.compact(
                model_name="anthropic/claude-opus-4-6",
                tool_specs=None,
                hf_token=None,
                session=None,
            )


@pytest.mark.asyncio
async def test_compact_does_not_duplicate_system_when_idx_is_zero():
    """Regression for the second P0 caught by bot review on PR #213.

    When ``len(items) == untouched_messages`` (the canonical 5-message
    early-compaction case: system + user-task + giant-tool-output +
    user-followup + assistant-reply), ``idx`` initialises to 0 and the
    walk-back ``while idx > 1`` loop is a no-op. Without an explicit
    clamp ``if idx < 1: idx = 1``, ``recent_messages = items[0:]``
    starts at the system message, and the rebuild duplicates system +
    first-user. Anthropic API rejects two system messages.
    """
    cm = _make_cm(model_max_tokens=100_000, untouched_messages=5)
    cm.items = [
        Message(role="system", content="system"),
        Message(role="user", content="task"),
        Message(role="assistant", content="ok"),  # would be the only
        # message_to_summarize but the
        # idx bug pulls it into recent
        Message(role="user", content="followup"),
        Message(role="assistant", content="reply"),
    ]  # exactly 5 = untouched_messages, so idx initialises to 0
    cm.running_context_usage = 95_000

    async def fake_summarize(*args, **kwargs):
        return ("summary", 10)

    def fake_recompute(self, model_name):
        self.running_context_usage = 5_000

    with (
        patch(
            "agent.context_manager.manager.summarize_messages",
            side_effect=fake_summarize,
        ),
        patch.object(ContextManager, "_recompute_usage", fake_recompute),
        patch("litellm.token_counter", return_value=100),
    ):
        await cm.compact(
            model_name="anthropic/claude-opus-4-6",
            tool_specs=None,
            hf_token=None,
            session=None,
        )

    # Critical assertion: only ONE system message in items
    system_count = sum(1 for m in cm.items if m.role == "system")
    assert system_count == 1, (
        f"Expected exactly 1 system message, found {system_count}. "
        f"Roles: {[m.role for m in cm.items]}"
    )
    # And the first-user "task" message must also appear exactly once.
    # Bot review on PR #213 caught a follow-up bug: clamping idx=1
    # excludes the system but still overlaps with first_user_idx (also 1),
    # so first_user_msg ends up in BOTH head and recent_messages β†’
    # duplicate user message β†’ Anthropic 400 (two consecutive user roles).
    task_count = sum(
        1 for m in cm.items if m.role == "user" and (m.content or "") == "task"
    )
    assert task_count == 1, (
        f"Expected exactly 1 'task' user message, found {task_count}. "
        f"Roles+content: {[(m.role, (m.content or '')[:20]) for m in cm.items]}"
    )
    # Defense in depth: no two consecutive same-role messages (Anthropic
    # API contract). System counts separately.
    non_system = [m for m in cm.items if m.role != "system"]
    for i in range(1, len(non_system)):
        assert non_system[i].role != non_system[i - 1].role, (
            f"Two consecutive {non_system[i].role} messages at non-system "
            f"position {i - 1},{i} β€” Anthropic API rejects this. "
            f"Roles: {[m.role for m in cm.items]}"
        )


@pytest.mark.asyncio
async def test_compact_succeeds_when_post_compact_under_threshold():
    """Happy path: when compaction does its job, no exception raised."""
    cm = _make_cm(model_max_tokens=100_000)
    cm.items = [
        Message(role="system", content="system"),
        Message(role="user", content="task"),
        Message(role="assistant", content="x" * 1000),
        Message(role="user", content="follow-up"),
        Message(role="assistant", content="reply"),
        Message(role="user", content="follow-up 2"),
        Message(role="assistant", content="reply 2"),
    ]
    cm.running_context_usage = 95_000

    async def fake_summarize(*args, **kwargs):
        return ("summary", 10)

    def fake_recompute(self, model_name):
        self.running_context_usage = 5_000  # well under threshold

    with (
        patch(
            "agent.context_manager.manager.summarize_messages",
            side_effect=fake_summarize,
        ),
        patch.object(ContextManager, "_recompute_usage", fake_recompute),
        patch("litellm.token_counter", return_value=100),
    ):
        await cm.compact(
            model_name="anthropic/claude-opus-4-6",
            tool_specs=None,
            hf_token=None,
            session=None,
        )
    assert cm.running_context_usage == 5_000


# ── _compact_and_notify behavior on CompactionFailedError ──────────────


@pytest.mark.asyncio
async def test_compact_and_notify_terminates_session_on_failure():
    """The PR's #213's P0 bug-class: setting ``is_running = False`` is
    only effective if the agent loop checks it. This test asserts the
    flag IS set AND a ``session_terminated`` event is emitted, so a
    follow-up assertion in the agent loop test catches the loop-exit.
    """
    from agent.core.agent_loop import _compact_and_notify

    session = MagicMock()
    session.session_id = "sess-123"
    session.is_running = True
    session.config.model_name = "anthropic/claude-opus-4-6"
    session.hf_token = None
    session.tool_router.get_tool_specs_for_llm.return_value = []
    session.send_event = AsyncMock()

    cm = MagicMock()
    cm.running_context_usage = 95_000
    cm.compaction_threshold = 90_000
    cm.model_max_tokens = 100_000
    cm.items = []
    cm.needs_compaction = True
    cm.compact = AsyncMock(side_effect=CompactionFailedError("ineffective"))
    session.context_manager = cm

    await _compact_and_notify(session)

    assert session.is_running is False, (
        "_compact_and_notify must set is_running=False so the agent loop "
        "can exit. P0 caught by bot review on PR #213 was that the loop "
        "didn't actually check this flag."
    )
    assert session.send_event.await_count == 1
    event = session.send_event.await_args.args[0]
    assert event.event_type == "session_terminated"
    assert event.data["reason"] == "compaction_failed"
    assert event.data["context_usage"] == 95_000


@pytest.mark.asyncio
async def test_compact_and_notify_passes_through_on_success():
    """When compaction succeeds, no termination event, is_running stays True."""
    from agent.core.agent_loop import _compact_and_notify

    session = MagicMock()
    session.session_id = "sess-456"
    session.is_running = True
    session.config.model_name = "anthropic/claude-opus-4-6"
    session.hf_token = None
    session.tool_router.get_tool_specs_for_llm.return_value = []
    session.send_event = AsyncMock()

    cm = MagicMock()
    cm.running_context_usage = 5_000
    cm.compaction_threshold = 90_000
    cm.model_max_tokens = 100_000
    cm.items = []
    cm.needs_compaction = False
    cm.compact = AsyncMock(return_value=None)  # success
    session.context_manager = cm

    # Pretend old_usage == new_usage so the "compacted" event is also skipped
    await _compact_and_notify(session)

    assert session.is_running is True
    # No session_terminated event emitted
    for call in session.send_event.await_args_list:
        ev = call.args[0]
        assert ev.event_type != "session_terminated"