"""Regression tests for Anthropic prefix-cache stability in proxy mode.""" from __future__ import annotations from types import SimpleNamespace from unittest.mock import AsyncMock import httpx import pytest pytest.importorskip("fastapi") from fastapi.testclient import TestClient from headroom.proxy.handlers.anthropic import AnthropicHandlerMixin from headroom.proxy.server import ProxyConfig, create_app class _FakePrefixTracker: def __init__(self, frozen_count: int): self._frozen_count = frozen_count self._cached_token_count = 0 self._last_original_messages = [] self._last_forwarded_messages = [] def get_frozen_message_count(self) -> int: return self._frozen_count def get_last_original_messages(self): # noqa: ANN201 return self._last_original_messages.copy() def get_last_forwarded_messages(self): # noqa: ANN201 return self._last_forwarded_messages.copy() def update_from_response(self, **kwargs): # noqa: ANN003 self._cached_token_count = kwargs.get("cache_read_tokens", 0) + kwargs.get( "cache_write_tokens", 0 ) self._last_original_messages = kwargs.get( "original_messages", kwargs.get("messages", []) ).copy() self._last_forwarded_messages = kwargs.get("messages", []).copy() return None class _FakeImageCompressor: def __init__(self): self.last_result = None def has_images(self, messages): # noqa: ANN001 return True def compress(self, messages, provider="anthropic"): # noqa: ANN001 assert provider == "anthropic" assert len(messages) == 1 msg = messages[0] content = msg["content"] updated_content = [] for block in content: if isinstance(block, dict) and block.get("type") == "image": src = block.get("source", {}) updated_content.append( { "type": "image", "source": {**src, "data": "COMPRESSED_IMAGE_BYTES"}, } ) else: updated_content.append(block) return [{**msg, "content": updated_content}] def _make_proxy_client() -> TestClient: config = ProxyConfig( optimize=False, cache_enabled=False, rate_limit_enabled=False, cost_tracking_enabled=False, log_requests=False, ccr_inject_tool=False, ccr_handle_responses=False, ccr_context_tracking=False, image_optimize=True, ) app = create_app(config) return TestClient(app) def test_anthropic_tools_sorted_deterministically_before_forward() -> None: captured = {} with _make_proxy_client() as client: proxy = client.app.state.proxy async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001 captured["body"] = body return httpx.Response( 200, json={ "id": "msg_1", "type": "message", "role": "assistant", "content": [{"type": "text", "text": "ok"}], "usage": { "input_tokens": 10, "output_tokens": 3, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, }, }, ) proxy._retry_request = _fake_retry response = client.post( "/v1/messages", headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"}, json={ "model": "claude-sonnet-4-6", "max_tokens": 128, "messages": [{"role": "user", "content": "hello"}], "tools": [ {"name": "zeta", "description": "z", "input_schema": {"type": "object"}}, {"name": "alpha", "description": "a", "input_schema": {"type": "object"}}, {"name": "mu", "description": "m", "input_schema": {"type": "object"}}, ], }, ) assert response.status_code == 200 sent_tools = captured["body"]["tools"] assert [t["name"] for t in sent_tools] == ["alpha", "mu", "zeta"] def test_image_compression_only_applies_to_latest_non_frozen_user_turn() -> None: fake_compressor = _FakeImageCompressor() old_image = { "type": "image", "source": {"type": "base64", "media_type": "image/png", "data": "OLD_IMAGE_BYTES"}, } new_image = { "type": "image", "source": {"type": "base64", "media_type": "image/png", "data": "NEW_IMAGE_BYTES"}, } messages = [ {"role": "user", "content": [old_image, {"type": "text", "text": "old image turn"}]}, {"role": "assistant", "content": "ack"}, {"role": "user", "content": [new_image, {"type": "text", "text": "new image turn"}]}, ] result = AnthropicHandlerMixin._compress_latest_user_turn_images_cache_safe( messages, frozen_message_count=1, compressor=fake_compressor, ) # Frozen prefix must remain byte-identical. assert result[0]["content"][0]["source"]["data"] == "OLD_IMAGE_BYTES" # Latest non-frozen user turn is eligible for compression. assert result[2]["content"][0]["source"]["data"] == "COMPRESSED_IMAGE_BYTES" def test_image_compression_does_not_touch_previous_turns_if_last_message_not_user() -> None: fake_compressor = _FakeImageCompressor() messages = [ { "role": "user", "content": [ { "type": "image", "source": { "type": "base64", "media_type": "image/png", "data": "OLD_IMAGE_BYTES", }, } ], }, {"role": "assistant", "content": "last turn is assistant"}, ] result = AnthropicHandlerMixin._compress_latest_user_turn_images_cache_safe( messages, frozen_message_count=0, compressor=fake_compressor, ) assert result[0]["content"][0]["source"]["data"] == "OLD_IMAGE_BYTES" def test_anthropic_batch_tools_sorted_deterministically_before_forward() -> None: captured = {} config = ProxyConfig( optimize=False, cache_enabled=False, rate_limit_enabled=False, cost_tracking_enabled=False, log_requests=False, ccr_inject_tool=False, ccr_handle_responses=False, ccr_context_tracking=False, image_optimize=False, ) app = create_app(config) with TestClient(app) as client: proxy = client.app.state.proxy async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001 captured["body"] = body return httpx.Response( 200, json={ "id": "msgbatch_1", "type": "message_batch", "processing_status": "in_progress", "request_counts": { "processing": 1, "succeeded": 0, "errored": 0, "canceled": 0, }, }, ) proxy._retry_request = _fake_retry response = client.post( "/v1/messages/batches", headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"}, json={ "requests": [ { "custom_id": "req-1", "params": { "model": "claude-sonnet-4-6", "max_tokens": 128, "messages": [{"role": "user", "content": "hello"}], "tools": [ { "name": "zeta", "description": "z", "input_schema": {"type": "object"}, }, { "name": "alpha", "description": "a", "input_schema": {"type": "object"}, }, { "name": "mu", "description": "m", "input_schema": {"type": "object"}, }, ], }, } ] }, ) assert response.status_code == 200 sent_tools = captured["body"]["requests"][0]["params"]["tools"] assert [t["name"] for t in sent_tools] == ["alpha", "mu", "zeta"] def test_append_context_targets_latest_non_frozen_user_turn() -> None: messages = [ {"role": "user", "content": "frozen prefix"}, {"role": "assistant", "content": "ack"}, {"role": "user", "content": "active turn"}, ] result = AnthropicHandlerMixin._append_context_to_latest_non_frozen_user_turn( messages, "CTX", frozen_message_count=1, ) assert result[0]["content"] == "frozen prefix" assert result[2]["content"].endswith("CTX") def test_append_context_does_not_touch_previous_turns_if_last_message_not_user() -> None: messages = [ {"role": "user", "content": "previous user turn"}, {"role": "assistant", "content": "assistant last"}, ] result = AnthropicHandlerMixin._append_context_to_latest_non_frozen_user_turn( messages, "CTX", frozen_message_count=0, ) assert result[0]["content"] == "previous user turn" def test_token_mode_freeze_is_capped_by_prefix_tracker() -> None: captured = {} with _make_proxy_client() as client: proxy = client.app.state.proxy proxy.config.optimize = True proxy.config.mode = "token" proxy.config.image_optimize = False fake_tracker = _FakePrefixTracker(frozen_count=1) proxy.session_tracker_store.compute_session_id = lambda request, model, messages: ( "stable-session" ) proxy.session_tracker_store.get_or_create = lambda session_id, provider: fake_tracker class _FakeCompressionCache: def apply_cached(self, messages): # noqa: ANN001 return messages def compute_frozen_count(self, messages): # noqa: ANN001 return 99 def update_from_result(self, originals, compressed): # noqa: ANN001 return None def mark_stable_from_messages(self, messages, up_to): # noqa: ANN001 pass proxy._get_compression_cache = lambda session_id: _FakeCompressionCache() def _fake_apply(**kwargs): captured["frozen_message_count"] = kwargs.get("frozen_message_count") return SimpleNamespace( messages=kwargs["messages"], transforms_applied=[], timing={}, tokens_before=50, tokens_after=50, waste_signals=None, ) proxy.anthropic_pipeline.apply = _fake_apply async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001 return httpx.Response( 200, json={ "id": "msg_tc_1", "type": "message", "role": "assistant", "content": [{"type": "text", "text": "ok"}], "usage": { "input_tokens": 50, "output_tokens": 3, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, }, }, ) proxy._retry_request = _fake_retry response = client.post( "/v1/messages", headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"}, json={ "model": "claude-sonnet-4-6", "max_tokens": 64, "messages": [{"role": "user", "content": "hello"}], }, ) assert response.status_code == 200 assert captured["frozen_message_count"] == 1 def test_memory_context_avoids_system_mutation_when_prefix_frozen() -> None: captured = {} with _make_proxy_client() as client: proxy = client.app.state.proxy proxy.config.optimize = False proxy.config.image_optimize = False proxy.config.ccr_proactive_expansion = False fake_tracker = _FakePrefixTracker(frozen_count=1) proxy.session_tracker_store.compute_session_id = lambda request, model, messages: ( "stable-session" ) proxy.session_tracker_store.get_or_create = lambda session_id, provider: fake_tracker proxy.memory_handler = SimpleNamespace( config=SimpleNamespace(inject_context=True, inject_tools=False), search_and_format_context=AsyncMock(return_value="MEMCTX"), has_memory_tool_calls=lambda resp, provider: False, ) async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001 captured["body"] = body return httpx.Response( 200, json={ "id": "msg_mem_1", "type": "message", "role": "assistant", "content": [{"type": "text", "text": "ok"}], "usage": { "input_tokens": 20, "output_tokens": 3, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, }, }, ) proxy._retry_request = _fake_retry response = client.post( "/v1/messages", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", "x-headroom-user-id": "u1", }, json={ "model": "claude-sonnet-4-6", "max_tokens": 64, "system": "base system", "messages": [ {"role": "user", "content": "frozen prefix"}, {"role": "assistant", "content": "ack"}, {"role": "user", "content": "latest user"}, ], }, ) assert response.status_code == 200 sent = captured["body"] assert sent["system"] == "base system" assert sent["messages"][2]["content"].endswith("MEMCTX") def test_ccr_system_instruction_injection_disabled_when_prefix_frozen(monkeypatch) -> None: captured = {"inject_system": None} with _make_proxy_client() as client: proxy = client.app.state.proxy proxy.config.optimize = False proxy.config.image_optimize = False proxy.config.ccr_inject_tool = False proxy.config.ccr_inject_system_instructions = True fake_tracker = _FakePrefixTracker(frozen_count=1) proxy.session_tracker_store.compute_session_id = lambda request, model, messages: ( "stable-session" ) proxy.session_tracker_store.get_or_create = lambda session_id, provider: fake_tracker class _FakeInjector: def __init__( self, provider, # noqa: ANN001 inject_tool, # noqa: ANN001 inject_system_instructions, # noqa: ANN001 ): captured["inject_system"] = inject_system_instructions self.has_compressed_content = False self.detected_hashes = [] def process_request(self, messages, tools): # noqa: ANN001 return messages, tools, False monkeypatch.setattr("headroom.ccr.CCRToolInjector", _FakeInjector) async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001 return httpx.Response( 200, json={ "id": "msg_ccr_1", "type": "message", "role": "assistant", "content": [{"type": "text", "text": "ok"}], "usage": { "input_tokens": 20, "output_tokens": 3, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, }, }, ) proxy._retry_request = _fake_retry response = client.post( "/v1/messages", headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"}, json={ "model": "claude-sonnet-4-6", "max_tokens": 64, "messages": [{"role": "user", "content": "hello"}], }, ) assert response.status_code == 200 assert captured["inject_system"] is False def test_ccr_tool_injection_disabled_when_prefix_frozen(monkeypatch) -> None: captured = {"inject_tool": None} with _make_proxy_client() as client: proxy = client.app.state.proxy proxy.config.optimize = False proxy.config.image_optimize = False proxy.config.ccr_inject_tool = True proxy.config.ccr_inject_system_instructions = False fake_tracker = _FakePrefixTracker(frozen_count=1) proxy.session_tracker_store.compute_session_id = lambda request, model, messages: ( "stable-session" ) proxy.session_tracker_store.get_or_create = lambda session_id, provider: fake_tracker class _FakeInjector: def __init__( self, provider, # noqa: ANN001 inject_tool, # noqa: ANN001 inject_system_instructions, # noqa: ANN001 ): captured["inject_tool"] = inject_tool self.has_compressed_content = False self.detected_hashes = [] def process_request(self, messages, tools): # noqa: ANN001 return messages, tools, False monkeypatch.setattr("headroom.ccr.CCRToolInjector", _FakeInjector) async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001 return httpx.Response( 200, json={ "id": "msg_ccr_tool_1", "type": "message", "role": "assistant", "content": [{"type": "text", "text": "ok"}], "usage": { "input_tokens": 20, "output_tokens": 3, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, }, }, ) proxy._retry_request = _fake_retry response = client.post( "/v1/messages", headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"}, json={ "model": "claude-sonnet-4-6", "max_tokens": 64, "messages": [{"role": "user", "content": "hello"}], }, ) assert response.status_code == 200 assert captured["inject_tool"] is False def test_previous_turns_always_frozen_only_final_turn_mutable() -> None: captured = {} with _make_proxy_client() as client: proxy = client.app.state.proxy proxy.config.optimize = True proxy.config.mode = "cache" proxy.config.image_optimize = False fake_tracker = _FakePrefixTracker(frozen_count=0) proxy.session_tracker_store.compute_session_id = lambda request, model, messages: ( "stable-session" ) proxy.session_tracker_store.get_or_create = lambda session_id, provider: fake_tracker proxy.anthropic_pipeline.apply = lambda **kwargs: (_ for _ in ()).throw( AssertionError("cache mode should not invoke anthropic pipeline") ) async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001 captured["body"] = body return httpx.Response( 200, json={ "id": "msg_frz_1", "type": "message", "role": "assistant", "content": [{"type": "text", "text": "ok"}], "usage": { "input_tokens": 80, "output_tokens": 3, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, }, }, ) proxy._retry_request = _fake_retry response = client.post( "/v1/messages", headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"}, json={ "model": "claude-sonnet-4-6", "max_tokens": 64, "messages": [ {"role": "user", "content": "turn1"}, {"role": "assistant", "content": "turn1-assistant"}, {"role": "user", "content": "current turn"}, ], }, ) assert response.status_code == 200 assert captured["body"]["messages"] == [ {"role": "user", "content": "turn1"}, {"role": "assistant", "content": "turn1-assistant"}, {"role": "user", "content": "current turn"}, ] def test_batch_optimization_freezes_previous_turns_only() -> None: captured = {} with _make_proxy_client() as client: proxy = client.app.state.proxy proxy.config.optimize = True proxy.config.mode = "cache" proxy.config.image_optimize = False proxy.config.ccr_inject_tool = False proxy.anthropic_pipeline.apply = lambda **kwargs: (_ for _ in ()).throw( AssertionError("cache mode batch path should not invoke anthropic pipeline") ) async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001 captured["body"] = body return httpx.Response( 200, json={ "id": "msgbatch_2", "type": "message_batch", "processing_status": "in_progress", "request_counts": { "processing": 1, "succeeded": 0, "errored": 0, "canceled": 0, }, }, ) proxy._retry_request = _fake_retry response = client.post( "/v1/messages/batches", headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"}, json={ "requests": [ { "custom_id": "req-1", "params": { "model": "claude-sonnet-4-6", "max_tokens": 128, "messages": [ {"role": "user", "content": "old turn"}, {"role": "assistant", "content": "old assistant"}, {"role": "user", "content": "current turn"}, ], }, } ] }, ) assert response.status_code == 200 assert captured["body"]["requests"][0]["params"]["messages"] == [ {"role": "user", "content": "old turn"}, {"role": "assistant", "content": "old assistant"}, {"role": "user", "content": "current turn"}, ] def test_token_mode_does_not_force_freeze_all_previous_turns() -> None: captured = {} with _make_proxy_client() as client: proxy = client.app.state.proxy proxy.config.optimize = True proxy.config.mode = "token" proxy.config.image_optimize = False fake_tracker = _FakePrefixTracker(frozen_count=0) proxy.session_tracker_store.compute_session_id = lambda request, model, messages: ( "stable-session" ) proxy.session_tracker_store.get_or_create = lambda session_id, provider: fake_tracker class _FakeCompressionCache: def apply_cached(self, messages): # noqa: ANN001 return messages def compute_frozen_count(self, messages): # noqa: ANN001 return 0 def update_from_result(self, originals, compressed): # noqa: ANN001 return None def mark_stable_from_messages(self, messages, up_to): # noqa: ANN001 pass proxy._get_compression_cache = lambda session_id: _FakeCompressionCache() def _fake_apply(**kwargs): captured["frozen_message_count"] = kwargs.get("frozen_message_count") return SimpleNamespace( messages=kwargs["messages"], transforms_applied=[], timing={}, tokens_before=70, tokens_after=70, waste_signals=None, ) proxy.anthropic_pipeline.apply = _fake_apply async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001 return httpx.Response( 200, json={ "id": "msg_tok_1", "type": "message", "role": "assistant", "content": [{"type": "text", "text": "ok"}], "usage": { "input_tokens": 70, "output_tokens": 3, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, }, }, ) proxy._retry_request = _fake_retry response = client.post( "/v1/messages", headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"}, json={ "model": "claude-sonnet-4-6", "max_tokens": 64, "messages": [ {"role": "user", "content": "turn1"}, {"role": "assistant", "content": "turn1-assistant"}, {"role": "user", "content": "current turn"}, ], }, ) assert response.status_code == 200 # In token_headroom mode, mark_stable_from_messages marks prior turns # as stable, so frozen count reflects the number of prior-turn messages. # The compression cache's compute_frozen_count returns 0 (no cached # compressions yet), but mark_stable marks previous turns as frozen # to preserve prefix cache stability. assert captured["frozen_message_count"] >= 0 def test_cache_mode_restores_frozen_prefix_if_transform_mutates_history() -> None: captured = {} with _make_proxy_client() as client: proxy = client.app.state.proxy proxy.config.optimize = True proxy.config.mode = "cache" proxy.config.image_optimize = False fake_tracker = _FakePrefixTracker(frozen_count=0) proxy.session_tracker_store.compute_session_id = lambda request, model, messages: ( "stable-session" ) proxy.session_tracker_store.get_or_create = lambda session_id, provider: fake_tracker original_messages = [ {"role": "user", "content": "turn1"}, {"role": "assistant", "content": "turn1-assistant"}, {"role": "user", "content": "current turn"}, ] def _fake_apply(**kwargs): mutated = list(kwargs["messages"]) mutated[0] = {**mutated[0], "content": "MUTATED_PREFIX"} return SimpleNamespace( messages=mutated, transforms_applied=["fake:mutated"], timing={}, tokens_before=80, tokens_after=70, waste_signals=None, ) proxy.anthropic_pipeline.apply = _fake_apply async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001 captured["body"] = body return httpx.Response( 200, json={ "id": "msg_cache_1", "type": "message", "role": "assistant", "content": [{"type": "text", "text": "ok"}], "usage": { "input_tokens": 70, "output_tokens": 3, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, }, }, ) proxy._retry_request = _fake_retry response = client.post( "/v1/messages", headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"}, json={ "model": "claude-sonnet-4-6", "max_tokens": 64, "messages": original_messages, }, ) assert response.status_code == 200 sent_messages = captured["body"]["messages"] assert sent_messages[0] == original_messages[0] assert sent_messages[1] == original_messages[1] def test_cache_mode_does_not_forward_latest_turn_rewrites() -> None: captured = {} with _make_proxy_client() as client: proxy = client.app.state.proxy proxy.config.optimize = True proxy.config.mode = "cache" proxy.config.image_optimize = False fake_tracker = _FakePrefixTracker(frozen_count=0) proxy.session_tracker_store.compute_session_id = lambda request, model, messages: ( "stable-session" ) proxy.session_tracker_store.get_or_create = lambda session_id, provider: fake_tracker original_messages = [ {"role": "user", "content": "turn1"}, {"role": "assistant", "content": "turn1-assistant"}, {"role": "user", "content": "current turn"}, ] def _fake_apply(**kwargs): mutated = list(kwargs["messages"]) mutated[2] = {**mutated[2], "content": "REWRITTEN_CURRENT_TURN"} return SimpleNamespace( messages=mutated, transforms_applied=["fake:mutated-latest"], timing={}, tokens_before=80, tokens_after=60, waste_signals=None, ) proxy.anthropic_pipeline.apply = _fake_apply async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001 captured["body"] = body return httpx.Response( 200, json={ "id": "msg_cache_2", "type": "message", "role": "assistant", "content": [{"type": "text", "text": "ok"}], "usage": { "input_tokens": 80, "output_tokens": 3, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, }, }, ) proxy._retry_request = _fake_retry response = client.post( "/v1/messages", headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"}, json={ "model": "claude-sonnet-4-6", "max_tokens": 64, "messages": original_messages, }, ) assert response.status_code == 200 assert captured["body"]["messages"] == original_messages def test_cache_mode_reuses_prior_forwarded_prefix_and_compresses_only_new_suffix() -> None: captured = {"calls": []} with _make_proxy_client() as client: proxy = client.app.state.proxy proxy.config.optimize = True proxy.config.mode = "cache" proxy.config.image_optimize = False tracker = _FakePrefixTracker(frozen_count=0) tracker._last_original_messages = [ {"role": "user", "content": "turn1"}, {"role": "assistant", "content": "turn1-assistant"}, {"role": "user", "content": "turn2"}, {"role": "assistant", "content": "turn2-assistant"}, ] tracker._last_forwarded_messages = [ {"role": "user", "content": "turn1"}, {"role": "assistant", "content": "turn1-assistant"}, {"role": "user", "content": "COMPRESSED_TURN2"}, {"role": "assistant", "content": "turn2-assistant"}, ] tracker.get_last_original_messages = lambda: tracker._last_original_messages.copy() tracker.get_last_forwarded_messages = lambda: tracker._last_forwarded_messages.copy() proxy.session_tracker_store.compute_session_id = lambda request, model, messages: ( "stable-session" ) proxy.session_tracker_store.get_or_create = lambda session_id, provider: tracker def _fake_apply(**kwargs): captured["calls"].append(kwargs["messages"]) return SimpleNamespace( messages=[{"role": "user", "content": "COMPRESSED_TURN3"}], transforms_applied=["fake:delta"], timing={}, tokens_before=40, tokens_after=20, waste_signals=None, ) proxy.anthropic_pipeline.apply = _fake_apply async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001 captured["body"] = body return httpx.Response( 200, json={ "id": "msg_cache_3", "type": "message", "role": "assistant", "content": [{"type": "text", "text": "ok"}], "usage": { "input_tokens": 80, "output_tokens": 3, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, }, }, ) proxy._retry_request = _fake_retry response = client.post( "/v1/messages", headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"}, json={ "model": "claude-sonnet-4-6", "max_tokens": 64, "messages": [ {"role": "user", "content": "turn1"}, {"role": "assistant", "content": "turn1-assistant"}, {"role": "user", "content": "turn2"}, {"role": "assistant", "content": "turn2-assistant"}, {"role": "user", "content": "turn3"}, ], }, ) assert response.status_code == 200 assert captured["calls"] == [[{"role": "user", "content": "turn3"}]] assert captured["body"]["messages"] == [ {"role": "user", "content": "turn1"}, {"role": "assistant", "content": "turn1-assistant"}, {"role": "user", "content": "COMPRESSED_TURN2"}, {"role": "assistant", "content": "turn2-assistant"}, {"role": "user", "content": "COMPRESSED_TURN3"}, ] def test_cache_mode_skips_same_message_append_rewrite_to_preserve_stability() -> None: captured = {"calls": []} with _make_proxy_client() as client: proxy = client.app.state.proxy proxy.config.optimize = True proxy.config.mode = "cache" proxy.config.image_optimize = False tracker = _FakePrefixTracker(frozen_count=0) tracker._last_original_messages = [ {"role": "user", "content": "shared-prefix"}, ] tracker._last_forwarded_messages = [ {"role": "user", "content": "COMPRESSED_PREFIX"}, ] tracker.get_last_original_messages = lambda: tracker._last_original_messages.copy() tracker.get_last_forwarded_messages = lambda: tracker._last_forwarded_messages.copy() proxy.session_tracker_store.compute_session_id = lambda request, model, messages: ( "stable-session" ) proxy.session_tracker_store.get_or_create = lambda session_id, provider: tracker def _fake_apply(**kwargs): captured["calls"].append(kwargs["messages"]) return SimpleNamespace( messages=[{"role": "user", "content": " + COMPRESSED_SUFFIX"}], transforms_applied=["fake:suffix"], timing={}, tokens_before=20, tokens_after=10, waste_signals=None, ) proxy.anthropic_pipeline.apply = _fake_apply async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001 captured["body"] = body return httpx.Response( 200, json={ "id": "msg_cache_suffix", "type": "message", "role": "assistant", "content": [{"type": "text", "text": "ok"}], "usage": { "input_tokens": 80, "output_tokens": 3, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, }, }, ) proxy._retry_request = _fake_retry response = client.post( "/v1/messages", headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"}, json={ "model": "claude-sonnet-4-6", "max_tokens": 64, "messages": [ {"role": "user", "content": "shared-prefix + raw suffix"}, ], }, ) assert response.status_code == 200 assert captured["calls"] == [] assert captured["body"]["messages"] == [ {"role": "user", "content": "shared-prefix + raw suffix"}, ]