headroom / tests /test_compression /test_compression_cache.py
tudragon154203
fix: route count_tokens to api.anthropic.com, not proxy base_url
0adb431
"""Tests for CompressionCache with LRU eviction."""
from __future__ import annotations
import pytest
from headroom.cache.compression_cache import CompressionCache
@pytest.fixture
def cache() -> CompressionCache:
return CompressionCache()
@pytest.fixture
def small_cache() -> CompressionCache:
return CompressionCache(max_entries=3)
class TestCompressionCache:
def test_cache_miss_returns_none(self, cache: CompressionCache) -> None:
h = CompressionCache.content_hash("some content")
assert cache.get_compressed(h) is None
def test_store_and_retrieve(self, cache: CompressionCache) -> None:
content = "hello world this is a long message"
h = CompressionCache.content_hash(content)
cache.store_compressed(h, "hello world...compressed", tokens_saved=15)
assert cache.get_compressed(h) == "hello world...compressed"
def test_different_content_different_hash(self) -> None:
h1 = CompressionCache.content_hash("content A")
h2 = CompressionCache.content_hash("content B")
assert h1 != h2
def test_overwrite_same_hash(self, cache: CompressionCache) -> None:
h = CompressionCache.content_hash("some content")
cache.store_compressed(h, "v1", tokens_saved=10)
cache.store_compressed(h, "v2", tokens_saved=20)
assert cache.get_compressed(h) == "v2"
def test_stats_tracking(self, cache: CompressionCache) -> None:
h = CompressionCache.content_hash("content")
cache.store_compressed(h, "compressed", tokens_saved=5)
# One hit
cache.get_compressed(h)
# One miss
cache.get_compressed("nonexistent")
stats = cache.get_stats()
assert stats["hits"] == 1
assert stats["misses"] == 1
assert stats["entries"] == 1
assert stats["tokens_saved"] == 5
def test_eviction_at_max_entries(self, small_cache: CompressionCache) -> None:
h1 = CompressionCache.content_hash("a")
h2 = CompressionCache.content_hash("b")
h3 = CompressionCache.content_hash("c")
h4 = CompressionCache.content_hash("d")
small_cache.store_compressed(h1, "ca", tokens_saved=1)
small_cache.store_compressed(h2, "cb", tokens_saved=1)
small_cache.store_compressed(h3, "cc", tokens_saved=1)
# Adding a 4th should evict the oldest (h1)
small_cache.store_compressed(h4, "cd", tokens_saved=1)
assert small_cache.get_compressed(h1) is None
assert small_cache.get_compressed(h2) == "cb"
assert small_cache.get_compressed(h4) == "cd"
def test_access_refreshes_lru(self, small_cache: CompressionCache) -> None:
h1 = CompressionCache.content_hash("a")
h2 = CompressionCache.content_hash("b")
h3 = CompressionCache.content_hash("c")
h4 = CompressionCache.content_hash("d")
small_cache.store_compressed(h1, "ca", tokens_saved=1)
small_cache.store_compressed(h2, "cb", tokens_saved=1)
small_cache.store_compressed(h3, "cc", tokens_saved=1)
# Access h1 to refresh it
small_cache.get_compressed(h1)
# Adding h4 should evict h2 (oldest untouched), not h1
small_cache.store_compressed(h4, "cd", tokens_saved=1)
assert small_cache.get_compressed(h1) == "ca"
assert small_cache.get_compressed(h2) is None
assert small_cache.get_compressed(h4) == "cd"
def test_content_hash_list_content(self) -> None:
"""content_hash handles Anthropic-format list content."""
list_content = [
{"type": "text", "text": "hello"},
{"type": "text", "text": "world"},
]
h = CompressionCache.content_hash(list_content)
assert isinstance(h, str)
assert len(h) == 16
# Same content produces same hash
assert CompressionCache.content_hash(list_content) == h
def test_content_hash_string_length(self) -> None:
h = CompressionCache.content_hash("test")
assert len(h) == 16
class TestCompressionCacheFrozenCount:
def test_empty_cache_returns_zero(self, cache: CompressionCache) -> None:
assert cache.compute_frozen_count([]) == 0
def test_user_assistant_always_stable(self, cache: CompressionCache) -> None:
messages = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi there"},
{"role": "user", "content": "how are you"},
]
assert cache.compute_frozen_count(messages) == 3
def test_tool_result_with_cache_hit_is_stable(self, cache: CompressionCache) -> None:
tool_content = "tool output data"
h = CompressionCache.content_hash(tool_content)
cache.store_compressed(h, "compressed tool output", tokens_saved=5)
messages = [
{"role": "user", "content": "do something"},
{
"role": "assistant",
"content": [{"type": "tool_use", "id": "t1", "name": "my_tool", "input": {}}],
},
{
"role": "user",
"content": [{"type": "tool_result", "tool_use_id": "t1", "content": tool_content}],
},
]
assert cache.compute_frozen_count(messages) == 3
def test_tool_result_cache_miss_stops_frozen(self, cache: CompressionCache) -> None:
messages = [
{"role": "user", "content": "hello"},
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "t1", "content": "uncached stuff"}
],
},
{"role": "user", "content": "follow up"},
]
assert cache.compute_frozen_count(messages) == 1
def test_frozen_count_with_dropped_messages(self, cache: CompressionCache) -> None:
cached_content = "cached tool output"
h = CompressionCache.content_hash(cached_content)
cache.store_compressed(h, "compressed", tokens_saved=3)
messages = [
{"role": "user", "content": "start"},
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "t1", "content": cached_content}
],
},
{
"role": "user",
"content": [{"type": "tool_result", "tool_use_id": "t2", "content": "not cached"}],
},
]
assert cache.compute_frozen_count(messages) == 2
def test_stable_hash_allows_frozen_count_past_uncached_tool_result(
self, cache: CompressionCache
) -> None:
"""Tool_results marked stable should not stop the frozen count walk."""
tool_content = "excluded Read output — big file contents"
h = CompressionCache.content_hash(tool_content)
cache.mark_stable(h)
messages = [
{"role": "user", "content": "hello"},
{
"role": "user",
"content": [{"type": "tool_result", "tool_use_id": "t1", "content": tool_content}],
},
{"role": "user", "content": "follow up"},
]
# Without mark_stable, this would stop at msg[1] → frozen=1.
# With stable hash, the walk continues past msg[1] → frozen=3.
assert cache.compute_frozen_count(messages) == 3
def test_update_from_result_identical_content_marks_stable(
self, cache: CompressionCache
) -> None:
"""When orig == compressed, update_from_result marks the hash as stable."""
tool_content = "unchanged tool output"
originals = [
{"role": "user", "content": "hi"},
{
"role": "user",
"content": [{"type": "tool_result", "tool_use_id": "t1", "content": tool_content}],
},
]
# Compressed is identical to originals (no compression happened)
compressed = [
{"role": "user", "content": "hi"},
{
"role": "user",
"content": [{"type": "tool_result", "tool_use_id": "t1", "content": tool_content}],
},
]
cache.update_from_result(originals, compressed)
h = CompressionCache.content_hash(tool_content)
assert h in cache._stable_hashes
# Frozen count should now walk past this tool_result
messages = [
{"role": "user", "content": "hello"},
{
"role": "user",
"content": [{"type": "tool_result", "tool_use_id": "t1", "content": tool_content}],
},
{"role": "user", "content": "more stuff"},
]
assert cache.compute_frozen_count(messages) == 3
def test_mark_stable_from_messages(self, cache: CompressionCache) -> None:
"""mark_stable_from_messages records hashes for tool_results."""
content_a = "tool output A"
content_b = "tool output B"
messages = [
{"role": "user", "content": "hi"},
{
"role": "user",
"content": [{"type": "tool_result", "tool_use_id": "t1", "content": content_a}],
},
{
"role": "user",
"content": [{"type": "tool_result", "tool_use_id": "t2", "content": content_b}],
},
]
# Mark first 2 messages (msg[0] + msg[1])
cache.mark_stable_from_messages(messages, 2)
ha = CompressionCache.content_hash(content_a)
hb = CompressionCache.content_hash(content_b)
assert ha in cache._stable_hashes
assert hb not in cache._stable_hashes # msg[2] not included
def test_should_defer_compression_new_content(self, cache: CompressionCache) -> None:
"""First-time content should be deferred."""
h = CompressionCache.content_hash("brand new content")
assert cache.should_defer_compression(h, ttl_seconds=300, batch_window=30) is True
def test_should_defer_compression_near_ttl(self, cache: CompressionCache) -> None:
"""Content near TTL boundary should NOT be deferred."""
import time
h = CompressionCache.content_hash("old content")
# Backdate first_seen to simulate age near TTL
cache._first_seen[h] = time.time() - 280 # 280s old, TTL=300, window=30
assert cache.should_defer_compression(h, ttl_seconds=300, batch_window=30) is False
class TestCompressionCacheApplyAndUpdate:
def test_apply_cached_swaps_tool_results(self, cache: CompressionCache) -> None:
original_content = "big tool output"
h = CompressionCache.content_hash(original_content)
cache.store_compressed(h, "small output", tokens_saved=5)
messages = [
{"role": "user", "content": "hi"},
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "t1", "content": original_content}
],
},
]
result = cache.apply_cached(messages)
assert result[1]["content"][0]["content"] == "small output"
def test_apply_cached_preserves_uncached_messages(self, cache: CompressionCache) -> None:
messages = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "world"},
]
result = cache.apply_cached(messages)
assert result[0] is messages[0]
assert result[1] is messages[1]
def test_apply_cached_never_adds_messages(self, cache: CompressionCache) -> None:
# Store something in cache that doesn't correspond to any message
cache.store_compressed("orphan_hash", "orphan_value", tokens_saved=1)
messages = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi"},
]
result = cache.apply_cached(messages)
assert len(result) == len(messages)
def test_update_from_result_caches_changes(self, cache: CompressionCache) -> None:
originals = [
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "t1", "content": "original output"}
],
},
]
compressed = [
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "t1", "content": "compressed output"}
],
},
]
cache.update_from_result(originals, compressed)
h = CompressionCache.content_hash("original output")
assert cache.get_compressed(h) == "compressed output"
def test_update_from_result_ignores_unchanged(self, cache: CompressionCache) -> None:
originals = [
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "t1", "content": "same content"}
],
},
]
compressed = [
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "t1", "content": "same content"}
],
},
]
cache.update_from_result(originals, compressed)
h = CompressionCache.content_hash("same content")
assert cache.get_compressed(h) is None
def test_apply_does_not_modify_original_messages(self, cache: CompressionCache) -> None:
original_content = "big tool output"
h = CompressionCache.content_hash(original_content)
cache.store_compressed(h, "small output", tokens_saved=5)
msg = {
"role": "user",
"content": [{"type": "tool_result", "tool_use_id": "t1", "content": original_content}],
}
messages = [msg]
cache.apply_cached(messages)
# Original must be untouched
assert msg["content"][0]["content"] == original_content
def test_openai_format_tool_result(self, cache: CompressionCache) -> None:
original_content = "openai tool output"
h = CompressionCache.content_hash(original_content)
cache.store_compressed(h, "compressed openai", tokens_saved=4)
messages = [
{"role": "tool", "tool_call_id": "tc1", "content": original_content},
]
result = cache.apply_cached(messages)
assert result[0]["content"] == "compressed openai"
# Original untouched
assert messages[0]["content"] == original_content