Spaces:
Running
Running
| """Tests for CompressionCache with LRU eviction.""" | |
| from __future__ import annotations | |
| import pytest | |
| from headroom.cache.compression_cache import CompressionCache | |
| def cache() -> CompressionCache: | |
| return CompressionCache() | |
| def small_cache() -> CompressionCache: | |
| return CompressionCache(max_entries=3) | |
| class TestCompressionCache: | |
| def test_cache_miss_returns_none(self, cache: CompressionCache) -> None: | |
| h = CompressionCache.content_hash("some content") | |
| assert cache.get_compressed(h) is None | |
| def test_store_and_retrieve(self, cache: CompressionCache) -> None: | |
| content = "hello world this is a long message" | |
| h = CompressionCache.content_hash(content) | |
| cache.store_compressed(h, "hello world...compressed", tokens_saved=15) | |
| assert cache.get_compressed(h) == "hello world...compressed" | |
| def test_different_content_different_hash(self) -> None: | |
| h1 = CompressionCache.content_hash("content A") | |
| h2 = CompressionCache.content_hash("content B") | |
| assert h1 != h2 | |
| def test_overwrite_same_hash(self, cache: CompressionCache) -> None: | |
| h = CompressionCache.content_hash("some content") | |
| cache.store_compressed(h, "v1", tokens_saved=10) | |
| cache.store_compressed(h, "v2", tokens_saved=20) | |
| assert cache.get_compressed(h) == "v2" | |
| def test_stats_tracking(self, cache: CompressionCache) -> None: | |
| h = CompressionCache.content_hash("content") | |
| cache.store_compressed(h, "compressed", tokens_saved=5) | |
| # One hit | |
| cache.get_compressed(h) | |
| # One miss | |
| cache.get_compressed("nonexistent") | |
| stats = cache.get_stats() | |
| assert stats["hits"] == 1 | |
| assert stats["misses"] == 1 | |
| assert stats["entries"] == 1 | |
| assert stats["tokens_saved"] == 5 | |
| def test_eviction_at_max_entries(self, small_cache: CompressionCache) -> None: | |
| h1 = CompressionCache.content_hash("a") | |
| h2 = CompressionCache.content_hash("b") | |
| h3 = CompressionCache.content_hash("c") | |
| h4 = CompressionCache.content_hash("d") | |
| small_cache.store_compressed(h1, "ca", tokens_saved=1) | |
| small_cache.store_compressed(h2, "cb", tokens_saved=1) | |
| small_cache.store_compressed(h3, "cc", tokens_saved=1) | |
| # Adding a 4th should evict the oldest (h1) | |
| small_cache.store_compressed(h4, "cd", tokens_saved=1) | |
| assert small_cache.get_compressed(h1) is None | |
| assert small_cache.get_compressed(h2) == "cb" | |
| assert small_cache.get_compressed(h4) == "cd" | |
| def test_access_refreshes_lru(self, small_cache: CompressionCache) -> None: | |
| h1 = CompressionCache.content_hash("a") | |
| h2 = CompressionCache.content_hash("b") | |
| h3 = CompressionCache.content_hash("c") | |
| h4 = CompressionCache.content_hash("d") | |
| small_cache.store_compressed(h1, "ca", tokens_saved=1) | |
| small_cache.store_compressed(h2, "cb", tokens_saved=1) | |
| small_cache.store_compressed(h3, "cc", tokens_saved=1) | |
| # Access h1 to refresh it | |
| small_cache.get_compressed(h1) | |
| # Adding h4 should evict h2 (oldest untouched), not h1 | |
| small_cache.store_compressed(h4, "cd", tokens_saved=1) | |
| assert small_cache.get_compressed(h1) == "ca" | |
| assert small_cache.get_compressed(h2) is None | |
| assert small_cache.get_compressed(h4) == "cd" | |
| def test_content_hash_list_content(self) -> None: | |
| """content_hash handles Anthropic-format list content.""" | |
| list_content = [ | |
| {"type": "text", "text": "hello"}, | |
| {"type": "text", "text": "world"}, | |
| ] | |
| h = CompressionCache.content_hash(list_content) | |
| assert isinstance(h, str) | |
| assert len(h) == 16 | |
| # Same content produces same hash | |
| assert CompressionCache.content_hash(list_content) == h | |
| def test_content_hash_string_length(self) -> None: | |
| h = CompressionCache.content_hash("test") | |
| assert len(h) == 16 | |
| class TestCompressionCacheFrozenCount: | |
| def test_empty_cache_returns_zero(self, cache: CompressionCache) -> None: | |
| assert cache.compute_frozen_count([]) == 0 | |
| def test_user_assistant_always_stable(self, cache: CompressionCache) -> None: | |
| messages = [ | |
| {"role": "user", "content": "hello"}, | |
| {"role": "assistant", "content": "hi there"}, | |
| {"role": "user", "content": "how are you"}, | |
| ] | |
| assert cache.compute_frozen_count(messages) == 3 | |
| def test_tool_result_with_cache_hit_is_stable(self, cache: CompressionCache) -> None: | |
| tool_content = "tool output data" | |
| h = CompressionCache.content_hash(tool_content) | |
| cache.store_compressed(h, "compressed tool output", tokens_saved=5) | |
| messages = [ | |
| {"role": "user", "content": "do something"}, | |
| { | |
| "role": "assistant", | |
| "content": [{"type": "tool_use", "id": "t1", "name": "my_tool", "input": {}}], | |
| }, | |
| { | |
| "role": "user", | |
| "content": [{"type": "tool_result", "tool_use_id": "t1", "content": tool_content}], | |
| }, | |
| ] | |
| assert cache.compute_frozen_count(messages) == 3 | |
| def test_tool_result_cache_miss_stops_frozen(self, cache: CompressionCache) -> None: | |
| messages = [ | |
| {"role": "user", "content": "hello"}, | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "tool_result", "tool_use_id": "t1", "content": "uncached stuff"} | |
| ], | |
| }, | |
| {"role": "user", "content": "follow up"}, | |
| ] | |
| assert cache.compute_frozen_count(messages) == 1 | |
| def test_frozen_count_with_dropped_messages(self, cache: CompressionCache) -> None: | |
| cached_content = "cached tool output" | |
| h = CompressionCache.content_hash(cached_content) | |
| cache.store_compressed(h, "compressed", tokens_saved=3) | |
| messages = [ | |
| {"role": "user", "content": "start"}, | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "tool_result", "tool_use_id": "t1", "content": cached_content} | |
| ], | |
| }, | |
| { | |
| "role": "user", | |
| "content": [{"type": "tool_result", "tool_use_id": "t2", "content": "not cached"}], | |
| }, | |
| ] | |
| assert cache.compute_frozen_count(messages) == 2 | |
| def test_stable_hash_allows_frozen_count_past_uncached_tool_result( | |
| self, cache: CompressionCache | |
| ) -> None: | |
| """Tool_results marked stable should not stop the frozen count walk.""" | |
| tool_content = "excluded Read output — big file contents" | |
| h = CompressionCache.content_hash(tool_content) | |
| cache.mark_stable(h) | |
| messages = [ | |
| {"role": "user", "content": "hello"}, | |
| { | |
| "role": "user", | |
| "content": [{"type": "tool_result", "tool_use_id": "t1", "content": tool_content}], | |
| }, | |
| {"role": "user", "content": "follow up"}, | |
| ] | |
| # Without mark_stable, this would stop at msg[1] → frozen=1. | |
| # With stable hash, the walk continues past msg[1] → frozen=3. | |
| assert cache.compute_frozen_count(messages) == 3 | |
| def test_update_from_result_identical_content_marks_stable( | |
| self, cache: CompressionCache | |
| ) -> None: | |
| """When orig == compressed, update_from_result marks the hash as stable.""" | |
| tool_content = "unchanged tool output" | |
| originals = [ | |
| {"role": "user", "content": "hi"}, | |
| { | |
| "role": "user", | |
| "content": [{"type": "tool_result", "tool_use_id": "t1", "content": tool_content}], | |
| }, | |
| ] | |
| # Compressed is identical to originals (no compression happened) | |
| compressed = [ | |
| {"role": "user", "content": "hi"}, | |
| { | |
| "role": "user", | |
| "content": [{"type": "tool_result", "tool_use_id": "t1", "content": tool_content}], | |
| }, | |
| ] | |
| cache.update_from_result(originals, compressed) | |
| h = CompressionCache.content_hash(tool_content) | |
| assert h in cache._stable_hashes | |
| # Frozen count should now walk past this tool_result | |
| messages = [ | |
| {"role": "user", "content": "hello"}, | |
| { | |
| "role": "user", | |
| "content": [{"type": "tool_result", "tool_use_id": "t1", "content": tool_content}], | |
| }, | |
| {"role": "user", "content": "more stuff"}, | |
| ] | |
| assert cache.compute_frozen_count(messages) == 3 | |
| def test_mark_stable_from_messages(self, cache: CompressionCache) -> None: | |
| """mark_stable_from_messages records hashes for tool_results.""" | |
| content_a = "tool output A" | |
| content_b = "tool output B" | |
| messages = [ | |
| {"role": "user", "content": "hi"}, | |
| { | |
| "role": "user", | |
| "content": [{"type": "tool_result", "tool_use_id": "t1", "content": content_a}], | |
| }, | |
| { | |
| "role": "user", | |
| "content": [{"type": "tool_result", "tool_use_id": "t2", "content": content_b}], | |
| }, | |
| ] | |
| # Mark first 2 messages (msg[0] + msg[1]) | |
| cache.mark_stable_from_messages(messages, 2) | |
| ha = CompressionCache.content_hash(content_a) | |
| hb = CompressionCache.content_hash(content_b) | |
| assert ha in cache._stable_hashes | |
| assert hb not in cache._stable_hashes # msg[2] not included | |
| def test_should_defer_compression_new_content(self, cache: CompressionCache) -> None: | |
| """First-time content should be deferred.""" | |
| h = CompressionCache.content_hash("brand new content") | |
| assert cache.should_defer_compression(h, ttl_seconds=300, batch_window=30) is True | |
| def test_should_defer_compression_near_ttl(self, cache: CompressionCache) -> None: | |
| """Content near TTL boundary should NOT be deferred.""" | |
| import time | |
| h = CompressionCache.content_hash("old content") | |
| # Backdate first_seen to simulate age near TTL | |
| cache._first_seen[h] = time.time() - 280 # 280s old, TTL=300, window=30 | |
| assert cache.should_defer_compression(h, ttl_seconds=300, batch_window=30) is False | |
| class TestCompressionCacheApplyAndUpdate: | |
| def test_apply_cached_swaps_tool_results(self, cache: CompressionCache) -> None: | |
| original_content = "big tool output" | |
| h = CompressionCache.content_hash(original_content) | |
| cache.store_compressed(h, "small output", tokens_saved=5) | |
| messages = [ | |
| {"role": "user", "content": "hi"}, | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "tool_result", "tool_use_id": "t1", "content": original_content} | |
| ], | |
| }, | |
| ] | |
| result = cache.apply_cached(messages) | |
| assert result[1]["content"][0]["content"] == "small output" | |
| def test_apply_cached_preserves_uncached_messages(self, cache: CompressionCache) -> None: | |
| messages = [ | |
| {"role": "user", "content": "hello"}, | |
| {"role": "assistant", "content": "world"}, | |
| ] | |
| result = cache.apply_cached(messages) | |
| assert result[0] is messages[0] | |
| assert result[1] is messages[1] | |
| def test_apply_cached_never_adds_messages(self, cache: CompressionCache) -> None: | |
| # Store something in cache that doesn't correspond to any message | |
| cache.store_compressed("orphan_hash", "orphan_value", tokens_saved=1) | |
| messages = [ | |
| {"role": "user", "content": "hello"}, | |
| {"role": "assistant", "content": "hi"}, | |
| ] | |
| result = cache.apply_cached(messages) | |
| assert len(result) == len(messages) | |
| def test_update_from_result_caches_changes(self, cache: CompressionCache) -> None: | |
| originals = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "tool_result", "tool_use_id": "t1", "content": "original output"} | |
| ], | |
| }, | |
| ] | |
| compressed = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "tool_result", "tool_use_id": "t1", "content": "compressed output"} | |
| ], | |
| }, | |
| ] | |
| cache.update_from_result(originals, compressed) | |
| h = CompressionCache.content_hash("original output") | |
| assert cache.get_compressed(h) == "compressed output" | |
| def test_update_from_result_ignores_unchanged(self, cache: CompressionCache) -> None: | |
| originals = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "tool_result", "tool_use_id": "t1", "content": "same content"} | |
| ], | |
| }, | |
| ] | |
| compressed = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "tool_result", "tool_use_id": "t1", "content": "same content"} | |
| ], | |
| }, | |
| ] | |
| cache.update_from_result(originals, compressed) | |
| h = CompressionCache.content_hash("same content") | |
| assert cache.get_compressed(h) is None | |
| def test_apply_does_not_modify_original_messages(self, cache: CompressionCache) -> None: | |
| original_content = "big tool output" | |
| h = CompressionCache.content_hash(original_content) | |
| cache.store_compressed(h, "small output", tokens_saved=5) | |
| msg = { | |
| "role": "user", | |
| "content": [{"type": "tool_result", "tool_use_id": "t1", "content": original_content}], | |
| } | |
| messages = [msg] | |
| cache.apply_cached(messages) | |
| # Original must be untouched | |
| assert msg["content"][0]["content"] == original_content | |
| def test_openai_format_tool_result(self, cache: CompressionCache) -> None: | |
| original_content = "openai tool output" | |
| h = CompressionCache.content_hash(original_content) | |
| cache.store_compressed(h, "compressed openai", tokens_saved=4) | |
| messages = [ | |
| {"role": "tool", "tool_call_id": "tc1", "content": original_content}, | |
| ] | |
| result = cache.apply_cached(messages) | |
| assert result[0]["content"] == "compressed openai" | |
| # Original untouched | |
| assert messages[0]["content"] == original_content | |