Spaces:
Running
Running
| """Tests that the Anthropic streaming finalizer logs requests for the feed. | |
| Without this, the streaming Anthropic path (which is what Claude Code uses) | |
| silently bypassed the request logger, leaving `/stats.recent_requests` and | |
| `/transformations/feed` permanently empty even when `--log-messages` was set. | |
| The non-streaming Anthropic path and the Bedrock streaming path were the | |
| only ones that called `self.logger.log(...)`. | |
| """ | |
| from unittest.mock import AsyncMock, MagicMock | |
| import httpx | |
| import pytest | |
| from headroom.proxy.request_logger import RequestLogger | |
| from headroom.proxy.server import HeadroomProxy | |
| def _build_proxy_with_real_logger(*, log_full_messages: bool) -> HeadroomProxy: | |
| """Build a HeadroomProxy with mocks for everything except the request logger, | |
| so we can assert what actually gets recorded.""" | |
| proxy = object.__new__(HeadroomProxy) | |
| proxy.http_client = MagicMock(spec=httpx.AsyncClient) | |
| proxy.metrics = MagicMock() | |
| proxy.metrics.record_request = AsyncMock(return_value=None) | |
| proxy.cost_tracker = MagicMock() | |
| proxy.cost_tracker.record_tokens.return_value = None | |
| proxy.memory_manager = None | |
| proxy.memory_handler = None | |
| proxy._config = MagicMock() | |
| proxy._config.log_full_messages = log_full_messages | |
| proxy._config.ccr_inject_tool = False | |
| proxy.config = proxy._config | |
| proxy.logger = RequestLogger(log_file=None, log_full_messages=log_full_messages) | |
| return proxy | |
| def _stream_state(output_tokens: int = 42) -> dict: | |
| return { | |
| "output_tokens": output_tokens, | |
| "total_bytes": 200, | |
| "ttfb_ms": 35.0, | |
| "input_tokens": 1000, | |
| "cache_read_input_tokens": 0, | |
| "cache_creation_input_tokens": 0, | |
| "cache_creation_ephemeral_5m_input_tokens": 0, | |
| "cache_creation_ephemeral_1h_input_tokens": 0, | |
| "sse_buffer": "", | |
| } | |
| async def test_finalize_stream_response_logs_request_for_feed(): | |
| proxy = _build_proxy_with_real_logger(log_full_messages=False) | |
| await proxy._finalize_stream_response( | |
| body={"messages": [{"role": "user", "content": "hi"}]}, | |
| provider="anthropic", | |
| model="claude-sonnet-4-6", | |
| request_id="req-stream-1", | |
| original_tokens=1000, | |
| optimized_tokens=600, | |
| tokens_saved=400, | |
| transforms_applied=["smart_crusher"], | |
| optimization_latency=12.0, | |
| stream_state=_stream_state(), | |
| start_time=0.0, | |
| tags={"stack": "wrap_claude"}, | |
| ) | |
| entries = proxy.logger.get_recent(10) | |
| assert len(entries) == 1, "streaming finalizer must log exactly one entry per request" | |
| entry = entries[0] | |
| assert entry["request_id"] == "req-stream-1" | |
| assert entry["provider"] == "anthropic" | |
| assert entry["model"] == "claude-sonnet-4-6" | |
| assert entry["input_tokens_original"] == 1000 | |
| assert entry["input_tokens_optimized"] == 600 | |
| assert entry["tokens_saved"] == 400 | |
| assert entry["savings_percent"] == pytest.approx(40.0) | |
| assert entry["transforms_applied"] == ["smart_crusher"] | |
| assert entry["tags"] == {"stack": "wrap_claude"} | |
| assert entry["cache_hit"] is False | |
| async def test_finalize_stream_response_includes_messages_when_log_full_messages_enabled(): | |
| proxy = _build_proxy_with_real_logger(log_full_messages=True) | |
| body = {"messages": [{"role": "user", "content": "hello"}]} | |
| await proxy._finalize_stream_response( | |
| body=body, | |
| provider="anthropic", | |
| model="claude-sonnet-4-6", | |
| request_id="req-stream-2", | |
| original_tokens=10, | |
| optimized_tokens=8, | |
| tokens_saved=2, | |
| transforms_applied=[], | |
| optimization_latency=1.0, | |
| stream_state=_stream_state(output_tokens=5), | |
| start_time=0.0, | |
| ) | |
| entries = proxy.logger.get_recent_with_messages(10) | |
| assert len(entries) == 1 | |
| assert entries[0]["request_messages"] == body["messages"] | |
| async def test_finalize_stream_response_omits_messages_when_log_full_messages_disabled(): | |
| proxy = _build_proxy_with_real_logger(log_full_messages=False) | |
| await proxy._finalize_stream_response( | |
| body={"messages": [{"role": "user", "content": "hello"}]}, | |
| provider="anthropic", | |
| model="claude-sonnet-4-6", | |
| request_id="req-stream-3", | |
| original_tokens=10, | |
| optimized_tokens=8, | |
| tokens_saved=2, | |
| transforms_applied=[], | |
| optimization_latency=1.0, | |
| stream_state=_stream_state(output_tokens=5), | |
| start_time=0.0, | |
| ) | |
| entries = proxy.logger.get_recent_with_messages(10) | |
| assert len(entries) == 1 | |
| assert entries[0]["request_messages"] is None | |
| async def test_finalize_stream_response_handles_zero_original_tokens(): | |
| proxy = _build_proxy_with_real_logger(log_full_messages=False) | |
| await proxy._finalize_stream_response( | |
| body={"messages": []}, | |
| provider="anthropic", | |
| model="claude-sonnet-4-6", | |
| request_id="req-stream-4", | |
| original_tokens=0, | |
| optimized_tokens=0, | |
| tokens_saved=0, | |
| transforms_applied=[], | |
| optimization_latency=0.0, | |
| stream_state=_stream_state(output_tokens=0), | |
| start_time=0.0, | |
| ) | |
| entries = proxy.logger.get_recent(10) | |
| assert len(entries) == 1 | |
| assert entries[0]["savings_percent"] == 0 | |
| async def test_finalize_stream_response_no_op_when_logger_disabled(): | |
| proxy = _build_proxy_with_real_logger(log_full_messages=False) | |
| proxy.logger = None # `--no-log-requests` would put us here | |
| # Should not raise. | |
| await proxy._finalize_stream_response( | |
| body={"messages": []}, | |
| provider="anthropic", | |
| model="claude-sonnet-4-6", | |
| request_id="req-stream-5", | |
| original_tokens=10, | |
| optimized_tokens=8, | |
| tokens_saved=2, | |
| transforms_applied=[], | |
| optimization_latency=1.0, | |
| stream_state=_stream_state(), | |
| start_time=0.0, | |
| ) | |