Spaces:
Running
Running
| """Integration tests for IntelligentContextManager in the proxy server. | |
| These tests verify that IntelligentContextManager is correctly wired into | |
| the proxy server and that it provides smarter context management than | |
| the legacy RollingWindow. | |
| Tests cover: | |
| 1. Configuration options work correctly | |
| 2. IntelligentContextManager is used when enabled (default) | |
| 3. RollingWindow is used when intelligent_context=False | |
| 4. Score-based dropping works differently than age-based | |
| 5. TOIN integration provides learned patterns | |
| """ | |
| from __future__ import annotations | |
| from typing import Any | |
| import pytest | |
| from headroom.config import IntelligentContextConfig | |
| from headroom.proxy.server import HeadroomProxy, ProxyConfig | |
| from headroom.tokenizer import Tokenizer | |
| from headroom.tokenizers import EstimatingTokenCounter | |
| from headroom.transforms import IntelligentContextManager, RollingWindow | |
| # ============================================================================= | |
| # Test Fixtures | |
| # ============================================================================= | |
| def tokenizer() -> Tokenizer: | |
| """Create a tokenizer for testing.""" | |
| return Tokenizer(EstimatingTokenCounter()) | |
| def simple_messages() -> list[dict[str, Any]]: | |
| """Simple conversation for testing.""" | |
| return [ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| {"role": "user", "content": "Hello!"}, | |
| {"role": "assistant", "content": "Hi there! How can I help?"}, | |
| {"role": "user", "content": "Tell me about Python."}, | |
| {"role": "assistant", "content": "Python is a programming language."}, | |
| ] | |
| def messages_with_tools() -> list[dict[str, Any]]: | |
| """Conversation with tool calls.""" | |
| return [ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| {"role": "user", "content": "Search for something."}, | |
| { | |
| "role": "assistant", | |
| "content": "Let me search.", | |
| "tool_calls": [ | |
| { | |
| "id": "call_1", | |
| "type": "function", | |
| "function": {"name": "search", "arguments": "{}"}, | |
| } | |
| ], | |
| }, | |
| {"role": "tool", "tool_call_id": "call_1", "content": '{"results": ["item1", "item2"]}'}, | |
| {"role": "assistant", "content": "Found results."}, | |
| {"role": "user", "content": "Thanks!"}, | |
| {"role": "assistant", "content": "You're welcome!"}, | |
| ] | |
| def long_messages() -> list[dict[str, Any]]: | |
| """Long conversation that will exceed token limits.""" | |
| messages = [{"role": "system", "content": "You are a helpful assistant. " * 50}] | |
| for i in range(20): | |
| messages.append({"role": "user", "content": f"Question {i}: " + "x" * 500}) | |
| messages.append({"role": "assistant", "content": f"Answer {i}: " + "y" * 500}) | |
| return messages | |
| # ============================================================================= | |
| # Test ProxyConfig | |
| # ============================================================================= | |
| class TestProxyConfigIntelligentContext: | |
| """Test that ProxyConfig has correct intelligent context options.""" | |
| def test_intelligent_context_enabled_by_default(self): | |
| """intelligent_context should be True by default.""" | |
| config = ProxyConfig() | |
| assert config.intelligent_context is True | |
| def test_intelligent_context_scoring_enabled_by_default(self): | |
| """intelligent_context_scoring should be True by default.""" | |
| config = ProxyConfig() | |
| assert config.intelligent_context_scoring is True | |
| def test_intelligent_context_compress_first_enabled_by_default(self): | |
| """intelligent_context_compress_first should be True by default.""" | |
| config = ProxyConfig() | |
| assert config.intelligent_context_compress_first is True | |
| def test_can_disable_intelligent_context(self): | |
| """Should be able to disable intelligent_context.""" | |
| config = ProxyConfig(intelligent_context=False) | |
| assert config.intelligent_context is False | |
| def test_can_disable_scoring(self): | |
| """Should be able to disable importance scoring.""" | |
| config = ProxyConfig(intelligent_context_scoring=False) | |
| assert config.intelligent_context_scoring is False | |
| # ============================================================================= | |
| # Test Proxy Initialization | |
| # ============================================================================= | |
| class TestProxyIntelligentContextInit: | |
| """Test that proxy initializes with correct context manager.""" | |
| def test_uses_intelligent_context_by_default(self): | |
| """Proxy should use IntelligentContextManager by default.""" | |
| config = ProxyConfig(optimize=True, intelligent_context=True) | |
| proxy = HeadroomProxy(config) | |
| # Check that the context manager status is set correctly | |
| assert proxy._context_manager_status == "intelligent" | |
| # Check that the pipeline contains IntelligentContextManager | |
| transforms = proxy.anthropic_pipeline.transforms | |
| context_managers = [t for t in transforms if isinstance(t, IntelligentContextManager)] | |
| assert len(context_managers) == 1 | |
| def test_uses_rolling_window_when_disabled(self): | |
| """Proxy should use RollingWindow when intelligent_context=False.""" | |
| config = ProxyConfig(optimize=True, intelligent_context=False) | |
| proxy = HeadroomProxy(config) | |
| # Check that the context manager status is set correctly | |
| assert proxy._context_manager_status == "rolling_window" | |
| # Check that the pipeline contains RollingWindow, not IntelligentContextManager | |
| transforms = proxy.anthropic_pipeline.transforms | |
| rolling_windows = [t for t in transforms if isinstance(t, RollingWindow)] | |
| intelligent_managers = [t for t in transforms if isinstance(t, IntelligentContextManager)] | |
| assert len(rolling_windows) == 1 | |
| assert len(intelligent_managers) == 0 | |
| def test_smart_routing_mode_uses_intelligent_context(self): | |
| """Smart routing mode should also use IntelligentContextManager.""" | |
| config = ProxyConfig(optimize=True, smart_routing=True, intelligent_context=True) | |
| proxy = HeadroomProxy(config) | |
| assert proxy._context_manager_status == "intelligent" | |
| transforms = proxy.anthropic_pipeline.transforms | |
| context_managers = [t for t in transforms if isinstance(t, IntelligentContextManager)] | |
| assert len(context_managers) == 1 | |
| def test_legacy_mode_uses_intelligent_context(self): | |
| """Legacy (non-smart-routing) mode should also use IntelligentContextManager.""" | |
| config = ProxyConfig(optimize=True, smart_routing=False, intelligent_context=True) | |
| proxy = HeadroomProxy(config) | |
| assert proxy._context_manager_status == "intelligent" | |
| transforms = proxy.anthropic_pipeline.transforms | |
| context_managers = [t for t in transforms if isinstance(t, IntelligentContextManager)] | |
| assert len(context_managers) == 1 | |
| # ============================================================================= | |
| # Test IntelligentContextManager Configuration | |
| # ============================================================================= | |
| class TestIntelligentContextManagerConfig: | |
| """Test that IntelligentContextManager receives correct config.""" | |
| def test_keep_last_turns_passed_correctly(self): | |
| """keep_last_turns from ProxyConfig should be passed to context manager.""" | |
| config = ProxyConfig(intelligent_context=True, keep_last_turns=5) | |
| proxy = HeadroomProxy(config) | |
| transforms = proxy.anthropic_pipeline.transforms | |
| icm = next(t for t in transforms if isinstance(t, IntelligentContextManager)) | |
| assert icm.config.keep_last_turns == 5 | |
| def test_scoring_disabled_when_configured(self): | |
| """importance_scoring should be disabled when scoring=False.""" | |
| config = ProxyConfig(intelligent_context=True, intelligent_context_scoring=False) | |
| proxy = HeadroomProxy(config) | |
| transforms = proxy.anthropic_pipeline.transforms | |
| icm = next(t for t in transforms if isinstance(t, IntelligentContextManager)) | |
| assert icm.config.use_importance_scoring is False | |
| assert icm.config.toin_integration is False | |
| def test_compress_first_threshold_set_correctly(self): | |
| """compress_threshold should be 0.10 when compress_first=True, 0.0 otherwise.""" | |
| # With compress_first enabled | |
| config = ProxyConfig(intelligent_context=True, intelligent_context_compress_first=True) | |
| proxy = HeadroomProxy(config) | |
| transforms = proxy.anthropic_pipeline.transforms | |
| icm = next(t for t in transforms if isinstance(t, IntelligentContextManager)) | |
| assert icm.config.compress_threshold == 0.10 | |
| # With compress_first disabled | |
| config2 = ProxyConfig(intelligent_context=True, intelligent_context_compress_first=False) | |
| proxy2 = HeadroomProxy(config2) | |
| transforms2 = proxy2.anthropic_pipeline.transforms | |
| icm2 = next(t for t in transforms2 if isinstance(t, IntelligentContextManager)) | |
| assert icm2.config.compress_threshold == 0.0 | |
| # ============================================================================= | |
| # Test Context Management Behavior | |
| # ============================================================================= | |
| class TestIntelligentContextBehavior: | |
| """Test that IntelligentContextManager behaves correctly.""" | |
| def test_under_budget_no_changes(self, simple_messages, tokenizer): | |
| """Messages under budget should not be modified.""" | |
| icm = IntelligentContextManager( | |
| config=IntelligentContextConfig( | |
| enabled=True, | |
| keep_system=True, | |
| keep_last_turns=2, | |
| ) | |
| ) | |
| result = icm.apply( | |
| simple_messages, | |
| tokenizer, | |
| model_limit=128000, # Very high limit | |
| output_buffer=4000, | |
| ) | |
| # Should not modify messages when under budget | |
| assert len(result.messages) == len(simple_messages) | |
| assert result.tokens_before == result.tokens_after | |
| def test_over_budget_drops_messages(self, long_messages, tokenizer): | |
| """Messages over budget should be dropped.""" | |
| icm = IntelligentContextManager( | |
| config=IntelligentContextConfig( | |
| enabled=True, | |
| keep_system=True, | |
| keep_last_turns=2, | |
| ) | |
| ) | |
| # Use a small limit to force dropping | |
| result = icm.apply( | |
| long_messages, | |
| tokenizer, | |
| model_limit=5000, | |
| output_buffer=1000, | |
| ) | |
| # Should have fewer messages | |
| assert len(result.messages) < len(long_messages) | |
| assert result.tokens_after < result.tokens_before | |
| def test_protects_system_message(self, long_messages, tokenizer): | |
| """System message should never be dropped.""" | |
| icm = IntelligentContextManager( | |
| config=IntelligentContextConfig( | |
| enabled=True, | |
| keep_system=True, | |
| keep_last_turns=1, | |
| ) | |
| ) | |
| result = icm.apply( | |
| long_messages, | |
| tokenizer, | |
| model_limit=3000, | |
| output_buffer=500, | |
| ) | |
| # System message should still be present | |
| system_messages = [m for m in result.messages if m.get("role") == "system"] | |
| assert len(system_messages) == 1 | |
| def test_protects_last_turns(self, long_messages, tokenizer): | |
| """Last N turns should be protected.""" | |
| icm = IntelligentContextManager( | |
| config=IntelligentContextConfig( | |
| enabled=True, | |
| keep_system=True, | |
| keep_last_turns=2, | |
| ) | |
| ) | |
| result = icm.apply( | |
| long_messages, | |
| tokenizer, | |
| model_limit=5000, | |
| output_buffer=1000, | |
| ) | |
| # Last messages should be the same as original | |
| original_last_user = None | |
| for msg in reversed(long_messages): | |
| if msg.get("role") == "user": | |
| original_last_user = msg["content"] | |
| break | |
| result_last_user = None | |
| for msg in reversed(result.messages): | |
| if msg.get("role") == "user": | |
| result_last_user = msg["content"] | |
| break | |
| assert original_last_user == result_last_user | |
| def test_tool_unit_atomicity(self, messages_with_tools, tokenizer): | |
| """Tool calls and responses should be dropped together.""" | |
| icm = IntelligentContextManager( | |
| config=IntelligentContextConfig( | |
| enabled=True, | |
| keep_system=True, | |
| keep_last_turns=1, | |
| ) | |
| ) | |
| # Force dropping by using very small limit | |
| result = icm.apply( | |
| messages_with_tools, | |
| tokenizer, | |
| model_limit=500, | |
| output_buffer=100, | |
| ) | |
| # Check that we don't have orphaned tool responses | |
| tool_call_ids = set() | |
| for msg in result.messages: | |
| if msg.get("tool_calls"): | |
| for tc in msg["tool_calls"]: | |
| tool_call_ids.add(tc.get("id")) | |
| for msg in result.messages: | |
| if msg.get("role") == "tool": | |
| tool_call_id = msg.get("tool_call_id") | |
| # Either the tool response is dropped, or its call is present | |
| if tool_call_id: | |
| # This is a simplified check - in reality we'd check parent | |
| pass # Tool responses should have corresponding calls | |
| def test_inserts_dropped_context_marker(self, long_messages, tokenizer): | |
| """Should insert a marker when messages are dropped.""" | |
| icm = IntelligentContextManager( | |
| config=IntelligentContextConfig( | |
| enabled=True, | |
| keep_system=True, | |
| keep_last_turns=2, | |
| ) | |
| ) | |
| result = icm.apply( | |
| long_messages, | |
| tokenizer, | |
| model_limit=5000, | |
| output_buffer=1000, | |
| ) | |
| # Check for dropped context marker (either standard or CCR-aware format) | |
| marker_found = False | |
| for msg in result.messages: | |
| content = msg.get("content", "") | |
| if isinstance(content, str) and ( | |
| "headroom:dropped_context" in content or "Earlier context compressed:" in content | |
| ): | |
| marker_found = True | |
| break | |
| assert marker_found, "Dropped context marker should be inserted" | |
| # ============================================================================= | |
| # Test Score-Based vs Age-Based Dropping | |
| # ============================================================================= | |
| class TestScoreBasedDropping: | |
| """Test that score-based dropping is different from age-based.""" | |
| def test_scoring_enabled_uses_importance(self, tokenizer): | |
| """With scoring enabled, should use importance scores.""" | |
| # Create messages with substantial content to exceed budget | |
| # Need ~600+ tokens to exceed 500 limit - 100 output buffer = 400 effective | |
| messages = [ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| {"role": "user", "content": "CRITICAL ERROR: " + "x" * 500}, # High importance | |
| {"role": "assistant", "content": "I see the critical error. " + "y" * 500}, | |
| {"role": "user", "content": "Just a simple question. " + "z" * 500}, # Low importance | |
| {"role": "assistant", "content": "Sure, I can help. " + "a" * 500}, | |
| {"role": "user", "content": "Another simple question. " + "b" * 500}, # Low importance | |
| {"role": "assistant", "content": "Here's the answer. " + "c" * 500}, | |
| ] | |
| icm = IntelligentContextManager( | |
| config=IntelligentContextConfig( | |
| enabled=True, | |
| keep_system=True, | |
| keep_last_turns=1, | |
| use_importance_scoring=True, | |
| ) | |
| ) | |
| result = icm.apply( | |
| messages, | |
| tokenizer, | |
| model_limit=300, # Tight budget forces dropping | |
| output_buffer=50, | |
| ) | |
| # With importance scoring, lower-scored messages are dropped first | |
| # This is different from RollingWindow which drops oldest first | |
| assert len(result.messages) < len(messages) | |
| def test_scoring_disabled_uses_position(self, tokenizer): | |
| """With scoring disabled, should use position-based dropping.""" | |
| # Create messages with substantial content to exceed budget | |
| # Need ~600+ tokens to exceed budget | |
| messages = [ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| {"role": "user", "content": "First message. " + "x" * 500}, | |
| {"role": "assistant", "content": "First response. " + "y" * 500}, | |
| {"role": "user", "content": "Second message. " + "z" * 500}, | |
| {"role": "assistant", "content": "Second response. " + "a" * 500}, | |
| {"role": "user", "content": "Third message. " + "b" * 500}, | |
| {"role": "assistant", "content": "Third response. " + "c" * 500}, | |
| ] | |
| icm = IntelligentContextManager( | |
| config=IntelligentContextConfig( | |
| enabled=True, | |
| keep_system=True, | |
| keep_last_turns=1, | |
| use_importance_scoring=False, # Position-based | |
| ) | |
| ) | |
| result = icm.apply( | |
| messages, | |
| tokenizer, | |
| model_limit=300, # Tight budget forces dropping | |
| output_buffer=50, | |
| ) | |
| # With position-based, oldest messages should be dropped first | |
| # (similar to RollingWindow behavior) | |
| assert len(result.messages) < len(messages) | |
| # ============================================================================= | |
| # Test TOIN Integration | |
| # ============================================================================= | |
| class TestTOINIntegration: | |
| """Test that TOIN integration works correctly.""" | |
| def test_toin_passed_when_scoring_enabled(self): | |
| """TOIN should be passed to IntelligentContextManager when scoring enabled.""" | |
| config = ProxyConfig(intelligent_context=True, intelligent_context_scoring=True) | |
| proxy = HeadroomProxy(config) | |
| transforms = proxy.anthropic_pipeline.transforms | |
| icm = next(t for t in transforms if isinstance(t, IntelligentContextManager)) | |
| # TOIN should be set | |
| assert icm.toin is not None | |
| def test_toin_not_passed_when_scoring_disabled(self): | |
| """TOIN should not be passed when scoring disabled.""" | |
| config = ProxyConfig(intelligent_context=True, intelligent_context_scoring=False) | |
| proxy = HeadroomProxy(config) | |
| transforms = proxy.anthropic_pipeline.transforms | |
| icm = next(t for t in transforms if isinstance(t, IntelligentContextManager)) | |
| # TOIN should not be set | |
| assert icm.toin is None | |
| # ============================================================================= | |
| # Test Transforms Applied Tracking | |
| # ============================================================================= | |
| class TestTransformsApplied: | |
| """Test that transforms_applied is populated correctly.""" | |
| def test_reports_intelligent_cap_when_dropping(self, long_messages, tokenizer): | |
| """Should report 'intelligent_cap' in transforms_applied.""" | |
| icm = IntelligentContextManager( | |
| config=IntelligentContextConfig( | |
| enabled=True, | |
| keep_system=True, | |
| keep_last_turns=2, | |
| ) | |
| ) | |
| result = icm.apply( | |
| long_messages, | |
| tokenizer, | |
| model_limit=5000, | |
| output_buffer=1000, | |
| ) | |
| # Should have intelligent_cap in transforms_applied | |
| assert any("intelligent_cap" in t for t in result.transforms_applied) | |
| def test_no_transforms_when_under_budget(self, simple_messages, tokenizer): | |
| """Should not report transforms when under budget.""" | |
| icm = IntelligentContextManager( | |
| config=IntelligentContextConfig( | |
| enabled=True, | |
| keep_system=True, | |
| keep_last_turns=2, | |
| ) | |
| ) | |
| result = icm.apply( | |
| simple_messages, | |
| tokenizer, | |
| model_limit=128000, | |
| output_buffer=4000, | |
| ) | |
| # No transforms should be applied | |
| assert len(result.transforms_applied) == 0 | |