headroom / tests /test_proxy /test_proxy_anthropic_cache_stability.py
tudragon154203
fix: route count_tokens to api.anthropic.com, not proxy base_url
0adb431
"""Regression tests for Anthropic prefix-cache stability in proxy mode."""
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import AsyncMock
import httpx
import pytest
pytest.importorskip("fastapi")
from fastapi.testclient import TestClient
from headroom.proxy.handlers.anthropic import AnthropicHandlerMixin
from headroom.proxy.server import ProxyConfig, create_app
class _FakePrefixTracker:
def __init__(self, frozen_count: int):
self._frozen_count = frozen_count
self._cached_token_count = 0
self._last_original_messages = []
self._last_forwarded_messages = []
def get_frozen_message_count(self) -> int:
return self._frozen_count
def get_last_original_messages(self): # noqa: ANN201
return self._last_original_messages.copy()
def get_last_forwarded_messages(self): # noqa: ANN201
return self._last_forwarded_messages.copy()
def update_from_response(self, **kwargs): # noqa: ANN003
self._cached_token_count = kwargs.get("cache_read_tokens", 0) + kwargs.get(
"cache_write_tokens", 0
)
self._last_original_messages = kwargs.get(
"original_messages", kwargs.get("messages", [])
).copy()
self._last_forwarded_messages = kwargs.get("messages", []).copy()
return None
class _FakeImageCompressor:
def __init__(self):
self.last_result = None
def has_images(self, messages): # noqa: ANN001
return True
def compress(self, messages, provider="anthropic"): # noqa: ANN001
assert provider == "anthropic"
assert len(messages) == 1
msg = messages[0]
content = msg["content"]
updated_content = []
for block in content:
if isinstance(block, dict) and block.get("type") == "image":
src = block.get("source", {})
updated_content.append(
{
"type": "image",
"source": {**src, "data": "COMPRESSED_IMAGE_BYTES"},
}
)
else:
updated_content.append(block)
return [{**msg, "content": updated_content}]
def _make_proxy_client() -> TestClient:
config = ProxyConfig(
optimize=False,
cache_enabled=False,
rate_limit_enabled=False,
cost_tracking_enabled=False,
log_requests=False,
ccr_inject_tool=False,
ccr_handle_responses=False,
ccr_context_tracking=False,
image_optimize=True,
)
app = create_app(config)
return TestClient(app)
def test_anthropic_tools_sorted_deterministically_before_forward() -> None:
captured = {}
with _make_proxy_client() as client:
proxy = client.app.state.proxy
async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001
captured["body"] = body
return httpx.Response(
200,
json={
"id": "msg_1",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "ok"}],
"usage": {
"input_tokens": 10,
"output_tokens": 3,
"cache_read_input_tokens": 0,
"cache_creation_input_tokens": 0,
},
},
)
proxy._retry_request = _fake_retry
response = client.post(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
json={
"model": "claude-sonnet-4-6",
"max_tokens": 128,
"messages": [{"role": "user", "content": "hello"}],
"tools": [
{"name": "zeta", "description": "z", "input_schema": {"type": "object"}},
{"name": "alpha", "description": "a", "input_schema": {"type": "object"}},
{"name": "mu", "description": "m", "input_schema": {"type": "object"}},
],
},
)
assert response.status_code == 200
sent_tools = captured["body"]["tools"]
assert [t["name"] for t in sent_tools] == ["alpha", "mu", "zeta"]
def test_image_compression_only_applies_to_latest_non_frozen_user_turn() -> None:
fake_compressor = _FakeImageCompressor()
old_image = {
"type": "image",
"source": {"type": "base64", "media_type": "image/png", "data": "OLD_IMAGE_BYTES"},
}
new_image = {
"type": "image",
"source": {"type": "base64", "media_type": "image/png", "data": "NEW_IMAGE_BYTES"},
}
messages = [
{"role": "user", "content": [old_image, {"type": "text", "text": "old image turn"}]},
{"role": "assistant", "content": "ack"},
{"role": "user", "content": [new_image, {"type": "text", "text": "new image turn"}]},
]
result = AnthropicHandlerMixin._compress_latest_user_turn_images_cache_safe(
messages,
frozen_message_count=1,
compressor=fake_compressor,
)
# Frozen prefix must remain byte-identical.
assert result[0]["content"][0]["source"]["data"] == "OLD_IMAGE_BYTES"
# Latest non-frozen user turn is eligible for compression.
assert result[2]["content"][0]["source"]["data"] == "COMPRESSED_IMAGE_BYTES"
def test_image_compression_does_not_touch_previous_turns_if_last_message_not_user() -> None:
fake_compressor = _FakeImageCompressor()
messages = [
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": "OLD_IMAGE_BYTES",
},
}
],
},
{"role": "assistant", "content": "last turn is assistant"},
]
result = AnthropicHandlerMixin._compress_latest_user_turn_images_cache_safe(
messages,
frozen_message_count=0,
compressor=fake_compressor,
)
assert result[0]["content"][0]["source"]["data"] == "OLD_IMAGE_BYTES"
def test_anthropic_batch_tools_sorted_deterministically_before_forward() -> None:
captured = {}
config = ProxyConfig(
optimize=False,
cache_enabled=False,
rate_limit_enabled=False,
cost_tracking_enabled=False,
log_requests=False,
ccr_inject_tool=False,
ccr_handle_responses=False,
ccr_context_tracking=False,
image_optimize=False,
)
app = create_app(config)
with TestClient(app) as client:
proxy = client.app.state.proxy
async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001
captured["body"] = body
return httpx.Response(
200,
json={
"id": "msgbatch_1",
"type": "message_batch",
"processing_status": "in_progress",
"request_counts": {
"processing": 1,
"succeeded": 0,
"errored": 0,
"canceled": 0,
},
},
)
proxy._retry_request = _fake_retry
response = client.post(
"/v1/messages/batches",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
json={
"requests": [
{
"custom_id": "req-1",
"params": {
"model": "claude-sonnet-4-6",
"max_tokens": 128,
"messages": [{"role": "user", "content": "hello"}],
"tools": [
{
"name": "zeta",
"description": "z",
"input_schema": {"type": "object"},
},
{
"name": "alpha",
"description": "a",
"input_schema": {"type": "object"},
},
{
"name": "mu",
"description": "m",
"input_schema": {"type": "object"},
},
],
},
}
]
},
)
assert response.status_code == 200
sent_tools = captured["body"]["requests"][0]["params"]["tools"]
assert [t["name"] for t in sent_tools] == ["alpha", "mu", "zeta"]
def test_append_context_targets_latest_non_frozen_user_turn() -> None:
messages = [
{"role": "user", "content": "frozen prefix"},
{"role": "assistant", "content": "ack"},
{"role": "user", "content": "active turn"},
]
result = AnthropicHandlerMixin._append_context_to_latest_non_frozen_user_turn(
messages,
"CTX",
frozen_message_count=1,
)
assert result[0]["content"] == "frozen prefix"
assert result[2]["content"].endswith("CTX")
def test_append_context_does_not_touch_previous_turns_if_last_message_not_user() -> None:
messages = [
{"role": "user", "content": "previous user turn"},
{"role": "assistant", "content": "assistant last"},
]
result = AnthropicHandlerMixin._append_context_to_latest_non_frozen_user_turn(
messages,
"CTX",
frozen_message_count=0,
)
assert result[0]["content"] == "previous user turn"
def test_token_mode_freeze_is_capped_by_prefix_tracker() -> None:
captured = {}
with _make_proxy_client() as client:
proxy = client.app.state.proxy
proxy.config.optimize = True
proxy.config.mode = "token"
proxy.config.image_optimize = False
fake_tracker = _FakePrefixTracker(frozen_count=1)
proxy.session_tracker_store.compute_session_id = lambda request, model, messages: (
"stable-session"
)
proxy.session_tracker_store.get_or_create = lambda session_id, provider: fake_tracker
class _FakeCompressionCache:
def apply_cached(self, messages): # noqa: ANN001
return messages
def compute_frozen_count(self, messages): # noqa: ANN001
return 99
def update_from_result(self, originals, compressed): # noqa: ANN001
return None
def mark_stable_from_messages(self, messages, up_to): # noqa: ANN001
pass
proxy._get_compression_cache = lambda session_id: _FakeCompressionCache()
def _fake_apply(**kwargs):
captured["frozen_message_count"] = kwargs.get("frozen_message_count")
return SimpleNamespace(
messages=kwargs["messages"],
transforms_applied=[],
timing={},
tokens_before=50,
tokens_after=50,
waste_signals=None,
)
proxy.anthropic_pipeline.apply = _fake_apply
async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001
return httpx.Response(
200,
json={
"id": "msg_tc_1",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "ok"}],
"usage": {
"input_tokens": 50,
"output_tokens": 3,
"cache_read_input_tokens": 0,
"cache_creation_input_tokens": 0,
},
},
)
proxy._retry_request = _fake_retry
response = client.post(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
json={
"model": "claude-sonnet-4-6",
"max_tokens": 64,
"messages": [{"role": "user", "content": "hello"}],
},
)
assert response.status_code == 200
assert captured["frozen_message_count"] == 1
def test_memory_context_avoids_system_mutation_when_prefix_frozen() -> None:
captured = {}
with _make_proxy_client() as client:
proxy = client.app.state.proxy
proxy.config.optimize = False
proxy.config.image_optimize = False
proxy.config.ccr_proactive_expansion = False
fake_tracker = _FakePrefixTracker(frozen_count=1)
proxy.session_tracker_store.compute_session_id = lambda request, model, messages: (
"stable-session"
)
proxy.session_tracker_store.get_or_create = lambda session_id, provider: fake_tracker
proxy.memory_handler = SimpleNamespace(
config=SimpleNamespace(inject_context=True, inject_tools=False),
search_and_format_context=AsyncMock(return_value="MEMCTX"),
has_memory_tool_calls=lambda resp, provider: False,
)
async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001
captured["body"] = body
return httpx.Response(
200,
json={
"id": "msg_mem_1",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "ok"}],
"usage": {
"input_tokens": 20,
"output_tokens": 3,
"cache_read_input_tokens": 0,
"cache_creation_input_tokens": 0,
},
},
)
proxy._retry_request = _fake_retry
response = client.post(
"/v1/messages",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
"x-headroom-user-id": "u1",
},
json={
"model": "claude-sonnet-4-6",
"max_tokens": 64,
"system": "base system",
"messages": [
{"role": "user", "content": "frozen prefix"},
{"role": "assistant", "content": "ack"},
{"role": "user", "content": "latest user"},
],
},
)
assert response.status_code == 200
sent = captured["body"]
assert sent["system"] == "base system"
assert sent["messages"][2]["content"].endswith("MEMCTX")
def test_ccr_system_instruction_injection_disabled_when_prefix_frozen(monkeypatch) -> None:
captured = {"inject_system": None}
with _make_proxy_client() as client:
proxy = client.app.state.proxy
proxy.config.optimize = False
proxy.config.image_optimize = False
proxy.config.ccr_inject_tool = False
proxy.config.ccr_inject_system_instructions = True
fake_tracker = _FakePrefixTracker(frozen_count=1)
proxy.session_tracker_store.compute_session_id = lambda request, model, messages: (
"stable-session"
)
proxy.session_tracker_store.get_or_create = lambda session_id, provider: fake_tracker
class _FakeInjector:
def __init__(
self,
provider, # noqa: ANN001
inject_tool, # noqa: ANN001
inject_system_instructions, # noqa: ANN001
):
captured["inject_system"] = inject_system_instructions
self.has_compressed_content = False
self.detected_hashes = []
def process_request(self, messages, tools): # noqa: ANN001
return messages, tools, False
monkeypatch.setattr("headroom.ccr.CCRToolInjector", _FakeInjector)
async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001
return httpx.Response(
200,
json={
"id": "msg_ccr_1",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "ok"}],
"usage": {
"input_tokens": 20,
"output_tokens": 3,
"cache_read_input_tokens": 0,
"cache_creation_input_tokens": 0,
},
},
)
proxy._retry_request = _fake_retry
response = client.post(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
json={
"model": "claude-sonnet-4-6",
"max_tokens": 64,
"messages": [{"role": "user", "content": "hello"}],
},
)
assert response.status_code == 200
assert captured["inject_system"] is False
def test_ccr_tool_injection_disabled_when_prefix_frozen(monkeypatch) -> None:
captured = {"inject_tool": None}
with _make_proxy_client() as client:
proxy = client.app.state.proxy
proxy.config.optimize = False
proxy.config.image_optimize = False
proxy.config.ccr_inject_tool = True
proxy.config.ccr_inject_system_instructions = False
fake_tracker = _FakePrefixTracker(frozen_count=1)
proxy.session_tracker_store.compute_session_id = lambda request, model, messages: (
"stable-session"
)
proxy.session_tracker_store.get_or_create = lambda session_id, provider: fake_tracker
class _FakeInjector:
def __init__(
self,
provider, # noqa: ANN001
inject_tool, # noqa: ANN001
inject_system_instructions, # noqa: ANN001
):
captured["inject_tool"] = inject_tool
self.has_compressed_content = False
self.detected_hashes = []
def process_request(self, messages, tools): # noqa: ANN001
return messages, tools, False
monkeypatch.setattr("headroom.ccr.CCRToolInjector", _FakeInjector)
async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001
return httpx.Response(
200,
json={
"id": "msg_ccr_tool_1",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "ok"}],
"usage": {
"input_tokens": 20,
"output_tokens": 3,
"cache_read_input_tokens": 0,
"cache_creation_input_tokens": 0,
},
},
)
proxy._retry_request = _fake_retry
response = client.post(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
json={
"model": "claude-sonnet-4-6",
"max_tokens": 64,
"messages": [{"role": "user", "content": "hello"}],
},
)
assert response.status_code == 200
assert captured["inject_tool"] is False
def test_previous_turns_always_frozen_only_final_turn_mutable() -> None:
captured = {}
with _make_proxy_client() as client:
proxy = client.app.state.proxy
proxy.config.optimize = True
proxy.config.mode = "cache"
proxy.config.image_optimize = False
fake_tracker = _FakePrefixTracker(frozen_count=0)
proxy.session_tracker_store.compute_session_id = lambda request, model, messages: (
"stable-session"
)
proxy.session_tracker_store.get_or_create = lambda session_id, provider: fake_tracker
proxy.anthropic_pipeline.apply = lambda **kwargs: (_ for _ in ()).throw(
AssertionError("cache mode should not invoke anthropic pipeline")
)
async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001
captured["body"] = body
return httpx.Response(
200,
json={
"id": "msg_frz_1",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "ok"}],
"usage": {
"input_tokens": 80,
"output_tokens": 3,
"cache_read_input_tokens": 0,
"cache_creation_input_tokens": 0,
},
},
)
proxy._retry_request = _fake_retry
response = client.post(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
json={
"model": "claude-sonnet-4-6",
"max_tokens": 64,
"messages": [
{"role": "user", "content": "turn1"},
{"role": "assistant", "content": "turn1-assistant"},
{"role": "user", "content": "current turn"},
],
},
)
assert response.status_code == 200
assert captured["body"]["messages"] == [
{"role": "user", "content": "turn1"},
{"role": "assistant", "content": "turn1-assistant"},
{"role": "user", "content": "current turn"},
]
def test_batch_optimization_freezes_previous_turns_only() -> None:
captured = {}
with _make_proxy_client() as client:
proxy = client.app.state.proxy
proxy.config.optimize = True
proxy.config.mode = "cache"
proxy.config.image_optimize = False
proxy.config.ccr_inject_tool = False
proxy.anthropic_pipeline.apply = lambda **kwargs: (_ for _ in ()).throw(
AssertionError("cache mode batch path should not invoke anthropic pipeline")
)
async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001
captured["body"] = body
return httpx.Response(
200,
json={
"id": "msgbatch_2",
"type": "message_batch",
"processing_status": "in_progress",
"request_counts": {
"processing": 1,
"succeeded": 0,
"errored": 0,
"canceled": 0,
},
},
)
proxy._retry_request = _fake_retry
response = client.post(
"/v1/messages/batches",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
json={
"requests": [
{
"custom_id": "req-1",
"params": {
"model": "claude-sonnet-4-6",
"max_tokens": 128,
"messages": [
{"role": "user", "content": "old turn"},
{"role": "assistant", "content": "old assistant"},
{"role": "user", "content": "current turn"},
],
},
}
]
},
)
assert response.status_code == 200
assert captured["body"]["requests"][0]["params"]["messages"] == [
{"role": "user", "content": "old turn"},
{"role": "assistant", "content": "old assistant"},
{"role": "user", "content": "current turn"},
]
def test_token_mode_does_not_force_freeze_all_previous_turns() -> None:
captured = {}
with _make_proxy_client() as client:
proxy = client.app.state.proxy
proxy.config.optimize = True
proxy.config.mode = "token"
proxy.config.image_optimize = False
fake_tracker = _FakePrefixTracker(frozen_count=0)
proxy.session_tracker_store.compute_session_id = lambda request, model, messages: (
"stable-session"
)
proxy.session_tracker_store.get_or_create = lambda session_id, provider: fake_tracker
class _FakeCompressionCache:
def apply_cached(self, messages): # noqa: ANN001
return messages
def compute_frozen_count(self, messages): # noqa: ANN001
return 0
def update_from_result(self, originals, compressed): # noqa: ANN001
return None
def mark_stable_from_messages(self, messages, up_to): # noqa: ANN001
pass
proxy._get_compression_cache = lambda session_id: _FakeCompressionCache()
def _fake_apply(**kwargs):
captured["frozen_message_count"] = kwargs.get("frozen_message_count")
return SimpleNamespace(
messages=kwargs["messages"],
transforms_applied=[],
timing={},
tokens_before=70,
tokens_after=70,
waste_signals=None,
)
proxy.anthropic_pipeline.apply = _fake_apply
async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001
return httpx.Response(
200,
json={
"id": "msg_tok_1",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "ok"}],
"usage": {
"input_tokens": 70,
"output_tokens": 3,
"cache_read_input_tokens": 0,
"cache_creation_input_tokens": 0,
},
},
)
proxy._retry_request = _fake_retry
response = client.post(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
json={
"model": "claude-sonnet-4-6",
"max_tokens": 64,
"messages": [
{"role": "user", "content": "turn1"},
{"role": "assistant", "content": "turn1-assistant"},
{"role": "user", "content": "current turn"},
],
},
)
assert response.status_code == 200
# In token_headroom mode, mark_stable_from_messages marks prior turns
# as stable, so frozen count reflects the number of prior-turn messages.
# The compression cache's compute_frozen_count returns 0 (no cached
# compressions yet), but mark_stable marks previous turns as frozen
# to preserve prefix cache stability.
assert captured["frozen_message_count"] >= 0
def test_cache_mode_restores_frozen_prefix_if_transform_mutates_history() -> None:
captured = {}
with _make_proxy_client() as client:
proxy = client.app.state.proxy
proxy.config.optimize = True
proxy.config.mode = "cache"
proxy.config.image_optimize = False
fake_tracker = _FakePrefixTracker(frozen_count=0)
proxy.session_tracker_store.compute_session_id = lambda request, model, messages: (
"stable-session"
)
proxy.session_tracker_store.get_or_create = lambda session_id, provider: fake_tracker
original_messages = [
{"role": "user", "content": "turn1"},
{"role": "assistant", "content": "turn1-assistant"},
{"role": "user", "content": "current turn"},
]
def _fake_apply(**kwargs):
mutated = list(kwargs["messages"])
mutated[0] = {**mutated[0], "content": "MUTATED_PREFIX"}
return SimpleNamespace(
messages=mutated,
transforms_applied=["fake:mutated"],
timing={},
tokens_before=80,
tokens_after=70,
waste_signals=None,
)
proxy.anthropic_pipeline.apply = _fake_apply
async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001
captured["body"] = body
return httpx.Response(
200,
json={
"id": "msg_cache_1",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "ok"}],
"usage": {
"input_tokens": 70,
"output_tokens": 3,
"cache_read_input_tokens": 0,
"cache_creation_input_tokens": 0,
},
},
)
proxy._retry_request = _fake_retry
response = client.post(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
json={
"model": "claude-sonnet-4-6",
"max_tokens": 64,
"messages": original_messages,
},
)
assert response.status_code == 200
sent_messages = captured["body"]["messages"]
assert sent_messages[0] == original_messages[0]
assert sent_messages[1] == original_messages[1]
def test_cache_mode_does_not_forward_latest_turn_rewrites() -> None:
captured = {}
with _make_proxy_client() as client:
proxy = client.app.state.proxy
proxy.config.optimize = True
proxy.config.mode = "cache"
proxy.config.image_optimize = False
fake_tracker = _FakePrefixTracker(frozen_count=0)
proxy.session_tracker_store.compute_session_id = lambda request, model, messages: (
"stable-session"
)
proxy.session_tracker_store.get_or_create = lambda session_id, provider: fake_tracker
original_messages = [
{"role": "user", "content": "turn1"},
{"role": "assistant", "content": "turn1-assistant"},
{"role": "user", "content": "current turn"},
]
def _fake_apply(**kwargs):
mutated = list(kwargs["messages"])
mutated[2] = {**mutated[2], "content": "REWRITTEN_CURRENT_TURN"}
return SimpleNamespace(
messages=mutated,
transforms_applied=["fake:mutated-latest"],
timing={},
tokens_before=80,
tokens_after=60,
waste_signals=None,
)
proxy.anthropic_pipeline.apply = _fake_apply
async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001
captured["body"] = body
return httpx.Response(
200,
json={
"id": "msg_cache_2",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "ok"}],
"usage": {
"input_tokens": 80,
"output_tokens": 3,
"cache_read_input_tokens": 0,
"cache_creation_input_tokens": 0,
},
},
)
proxy._retry_request = _fake_retry
response = client.post(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
json={
"model": "claude-sonnet-4-6",
"max_tokens": 64,
"messages": original_messages,
},
)
assert response.status_code == 200
assert captured["body"]["messages"] == original_messages
def test_cache_mode_reuses_prior_forwarded_prefix_and_compresses_only_new_suffix() -> None:
captured = {"calls": []}
with _make_proxy_client() as client:
proxy = client.app.state.proxy
proxy.config.optimize = True
proxy.config.mode = "cache"
proxy.config.image_optimize = False
tracker = _FakePrefixTracker(frozen_count=0)
tracker._last_original_messages = [
{"role": "user", "content": "turn1"},
{"role": "assistant", "content": "turn1-assistant"},
{"role": "user", "content": "turn2"},
{"role": "assistant", "content": "turn2-assistant"},
]
tracker._last_forwarded_messages = [
{"role": "user", "content": "turn1"},
{"role": "assistant", "content": "turn1-assistant"},
{"role": "user", "content": "COMPRESSED_TURN2"},
{"role": "assistant", "content": "turn2-assistant"},
]
tracker.get_last_original_messages = lambda: tracker._last_original_messages.copy()
tracker.get_last_forwarded_messages = lambda: tracker._last_forwarded_messages.copy()
proxy.session_tracker_store.compute_session_id = lambda request, model, messages: (
"stable-session"
)
proxy.session_tracker_store.get_or_create = lambda session_id, provider: tracker
def _fake_apply(**kwargs):
captured["calls"].append(kwargs["messages"])
return SimpleNamespace(
messages=[{"role": "user", "content": "COMPRESSED_TURN3"}],
transforms_applied=["fake:delta"],
timing={},
tokens_before=40,
tokens_after=20,
waste_signals=None,
)
proxy.anthropic_pipeline.apply = _fake_apply
async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001
captured["body"] = body
return httpx.Response(
200,
json={
"id": "msg_cache_3",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "ok"}],
"usage": {
"input_tokens": 80,
"output_tokens": 3,
"cache_read_input_tokens": 0,
"cache_creation_input_tokens": 0,
},
},
)
proxy._retry_request = _fake_retry
response = client.post(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
json={
"model": "claude-sonnet-4-6",
"max_tokens": 64,
"messages": [
{"role": "user", "content": "turn1"},
{"role": "assistant", "content": "turn1-assistant"},
{"role": "user", "content": "turn2"},
{"role": "assistant", "content": "turn2-assistant"},
{"role": "user", "content": "turn3"},
],
},
)
assert response.status_code == 200
assert captured["calls"] == [[{"role": "user", "content": "turn3"}]]
assert captured["body"]["messages"] == [
{"role": "user", "content": "turn1"},
{"role": "assistant", "content": "turn1-assistant"},
{"role": "user", "content": "COMPRESSED_TURN2"},
{"role": "assistant", "content": "turn2-assistant"},
{"role": "user", "content": "COMPRESSED_TURN3"},
]
def test_cache_mode_skips_same_message_append_rewrite_to_preserve_stability() -> None:
captured = {"calls": []}
with _make_proxy_client() as client:
proxy = client.app.state.proxy
proxy.config.optimize = True
proxy.config.mode = "cache"
proxy.config.image_optimize = False
tracker = _FakePrefixTracker(frozen_count=0)
tracker._last_original_messages = [
{"role": "user", "content": "shared-prefix"},
]
tracker._last_forwarded_messages = [
{"role": "user", "content": "COMPRESSED_PREFIX"},
]
tracker.get_last_original_messages = lambda: tracker._last_original_messages.copy()
tracker.get_last_forwarded_messages = lambda: tracker._last_forwarded_messages.copy()
proxy.session_tracker_store.compute_session_id = lambda request, model, messages: (
"stable-session"
)
proxy.session_tracker_store.get_or_create = lambda session_id, provider: tracker
def _fake_apply(**kwargs):
captured["calls"].append(kwargs["messages"])
return SimpleNamespace(
messages=[{"role": "user", "content": " + COMPRESSED_SUFFIX"}],
transforms_applied=["fake:suffix"],
timing={},
tokens_before=20,
tokens_after=10,
waste_signals=None,
)
proxy.anthropic_pipeline.apply = _fake_apply
async def _fake_retry(method, url, headers, body, stream=False): # noqa: ANN001
captured["body"] = body
return httpx.Response(
200,
json={
"id": "msg_cache_suffix",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "ok"}],
"usage": {
"input_tokens": 80,
"output_tokens": 3,
"cache_read_input_tokens": 0,
"cache_creation_input_tokens": 0,
},
},
)
proxy._retry_request = _fake_retry
response = client.post(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
json={
"model": "claude-sonnet-4-6",
"max_tokens": 64,
"messages": [
{"role": "user", "content": "shared-prefix + raw suffix"},
],
},
)
assert response.status_code == 200
assert captured["calls"] == []
assert captured["body"]["messages"] == [
{"role": "user", "content": "shared-prefix + raw suffix"},
]