Spaces:
Running
Running
| """Tests for ratelimit header forwarding in streaming responses. | |
| Verifies that anthropic-ratelimit-* headers from the upstream API response | |
| are forwarded to the client in StreamingResponse, even in SSE streaming mode. | |
| This was a bug where non-streaming responses correctly forwarded all headers | |
| via dict(response.headers), but streaming responses used StreamingResponse | |
| without passing any upstream headers — silently dropping ratelimit info. | |
| """ | |
| import json | |
| from unittest.mock import AsyncMock, MagicMock | |
| import httpx | |
| import pytest | |
| import headroom.proxy.handlers.streaming as streaming_module | |
| from headroom.proxy.server import HeadroomProxy | |
| class TestStreamingRatelimitHeaderForwarding: | |
| """Test that upstream ratelimit headers are forwarded in streaming responses.""" | |
| def _create_mock_proxy(self): | |
| """Create a HeadroomProxy with mocked internals for unit testing.""" | |
| proxy = object.__new__(HeadroomProxy) | |
| proxy.http_client = MagicMock(spec=httpx.AsyncClient) | |
| proxy.metrics = MagicMock() | |
| proxy.metrics.record_request = AsyncMock(return_value=None) | |
| proxy.metrics.record_failed = AsyncMock(return_value=None) | |
| proxy.cost_tracker = MagicMock() | |
| proxy.cost_tracker.estimate_cost.return_value = 0.001 | |
| proxy.cost_tracker.record_request.return_value = None | |
| proxy.stats = { | |
| "requests_total": 0, | |
| "requests_optimized": 0, | |
| "tokens": {"original": 0, "optimized": 0, "saved": 0}, | |
| "cost": {"total_usd": 0, "savings_usd": 0}, | |
| "errors": 0, | |
| "active_requests": 0, | |
| "requests_per_model": {}, | |
| } | |
| proxy.memory_manager = None | |
| proxy._config = MagicMock() | |
| proxy._config.memory_enabled = False | |
| proxy._config.ccr_inject_tool = False | |
| proxy._config.retry_max_attempts = 3 | |
| proxy._config.retry_base_delay_ms = 0 | |
| proxy._config.retry_max_delay_ms = 0 | |
| proxy.config = proxy._config | |
| proxy._parse_sse_usage_from_buffer = MagicMock(return_value=None) | |
| proxy.memory_handler = None | |
| return proxy | |
| def _create_mock_upstream_response(self, extra_headers=None): | |
| """Create a mock httpx streaming response with ratelimit headers.""" | |
| mock_response = AsyncMock() | |
| headers = { | |
| "content-type": "text/event-stream", | |
| "anthropic-ratelimit-tokens-limit": "80000", | |
| "anthropic-ratelimit-tokens-remaining": "75000", | |
| "anthropic-ratelimit-tokens-reset": "2026-03-25T12:00:00Z", | |
| "anthropic-ratelimit-requests-limit": "60", | |
| "anthropic-ratelimit-requests-remaining": "59", | |
| "anthropic-ratelimit-requests-reset": "2026-03-25T12:00:00Z", | |
| "anthropic-ratelimit-input-tokens-limit": "50000", | |
| "anthropic-ratelimit-input-tokens-remaining": "48000", | |
| "anthropic-ratelimit-input-tokens-reset": "2026-03-25T12:00:00Z", | |
| "anthropic-ratelimit-output-tokens-limit": "30000", | |
| "anthropic-ratelimit-output-tokens-remaining": "27000", | |
| "anthropic-ratelimit-output-tokens-reset": "2026-03-25T12:00:00Z", | |
| # Non-ratelimit headers that should NOT be forwarded | |
| "x-request-id": "req-12345", | |
| "cf-ray": "abc123", | |
| } | |
| if extra_headers: | |
| headers.update(extra_headers) | |
| mock_response.headers = httpx.Headers(headers) | |
| mock_response.status_code = 200 | |
| # Simulate a simple SSE stream | |
| sse_data = ( | |
| b'event: message_start\ndata: {"type":"message_start","message":{"id":"msg_01"}}\n\n' | |
| b'event: message_stop\ndata: {"type":"message_stop"}\n\n' | |
| ) | |
| async def aiter_bytes(): | |
| yield sse_data | |
| mock_response.aiter_bytes = aiter_bytes | |
| mock_response.aclose = AsyncMock() | |
| return mock_response | |
| async def test_ratelimit_headers_forwarded_in_streaming(self): | |
| """Ratelimit headers from upstream should appear in the StreamingResponse.""" | |
| proxy = self._create_mock_proxy() | |
| mock_response = self._create_mock_upstream_response() | |
| # Mock build_request + send to return our mock response | |
| mock_request = MagicMock() | |
| proxy.http_client.build_request = MagicMock(return_value=mock_request) | |
| proxy.http_client.send = AsyncMock(return_value=mock_response) | |
| result = await proxy._stream_response( | |
| url="https://api.anthropic.com/v1/messages", | |
| headers={"x-api-key": "sk-test", "anthropic-version": "2023-06-01"}, | |
| body={ | |
| "model": "claude-sonnet-4-20250514", | |
| "max_tokens": 100, | |
| "stream": True, | |
| "messages": [{"role": "user", "content": "hi"}], | |
| }, | |
| provider="anthropic", | |
| model="claude-sonnet-4-20250514", | |
| request_id="test-123", | |
| original_tokens=10, | |
| optimized_tokens=10, | |
| tokens_saved=0, | |
| transforms_applied=[], | |
| tags={}, | |
| optimization_latency=0.0, | |
| ) | |
| # Verify ratelimit headers are present in the StreamingResponse | |
| assert result.headers.get("anthropic-ratelimit-tokens-limit") == "80000" | |
| assert result.headers.get("anthropic-ratelimit-tokens-remaining") == "75000" | |
| assert result.headers.get("anthropic-ratelimit-tokens-reset") == "2026-03-25T12:00:00Z" | |
| assert result.headers.get("anthropic-ratelimit-requests-limit") == "60" | |
| assert result.headers.get("anthropic-ratelimit-requests-remaining") == "59" | |
| assert result.headers.get("anthropic-ratelimit-input-tokens-limit") == "50000" | |
| assert result.headers.get("anthropic-ratelimit-output-tokens-limit") == "30000" | |
| async def test_non_ratelimit_headers_not_forwarded(self): | |
| """Only ratelimit headers should be forwarded, not arbitrary upstream headers.""" | |
| proxy = self._create_mock_proxy() | |
| mock_response = self._create_mock_upstream_response() | |
| mock_request = MagicMock() | |
| proxy.http_client.build_request = MagicMock(return_value=mock_request) | |
| proxy.http_client.send = AsyncMock(return_value=mock_response) | |
| result = await proxy._stream_response( | |
| url="https://api.anthropic.com/v1/messages", | |
| headers={"x-api-key": "sk-test"}, | |
| body={ | |
| "model": "claude-sonnet-4-20250514", | |
| "max_tokens": 100, | |
| "stream": True, | |
| "messages": [{"role": "user", "content": "hi"}], | |
| }, | |
| provider="anthropic", | |
| model="claude-sonnet-4-20250514", | |
| request_id="test-456", | |
| original_tokens=10, | |
| optimized_tokens=10, | |
| tokens_saved=0, | |
| transforms_applied=[], | |
| tags={}, | |
| optimization_latency=0.0, | |
| ) | |
| # Non-ratelimit headers should NOT be in the response | |
| assert result.headers.get("x-request-id") is None | |
| assert result.headers.get("cf-ray") is None | |
| async def test_no_ratelimit_headers_still_works(self): | |
| """When upstream has no ratelimit headers, streaming should still work.""" | |
| proxy = self._create_mock_proxy() | |
| mock_response = self._create_mock_upstream_response() | |
| # Remove all ratelimit headers | |
| mock_response.headers = httpx.Headers( | |
| { | |
| "content-type": "text/event-stream", | |
| "x-request-id": "req-999", | |
| } | |
| ) | |
| mock_request = MagicMock() | |
| proxy.http_client.build_request = MagicMock(return_value=mock_request) | |
| proxy.http_client.send = AsyncMock(return_value=mock_response) | |
| result = await proxy._stream_response( | |
| url="https://api.anthropic.com/v1/messages", | |
| headers={"x-api-key": "sk-test"}, | |
| body={ | |
| "model": "claude-sonnet-4-20250514", | |
| "max_tokens": 100, | |
| "stream": True, | |
| "messages": [{"role": "user", "content": "hi"}], | |
| }, | |
| provider="anthropic", | |
| model="claude-sonnet-4-20250514", | |
| request_id="test-789", | |
| original_tokens=10, | |
| optimized_tokens=10, | |
| tokens_saved=0, | |
| transforms_applied=[], | |
| tags={}, | |
| optimization_latency=0.0, | |
| ) | |
| # Should still return a valid StreamingResponse | |
| assert result.media_type == "text/event-stream" | |
| # No ratelimit headers to forward | |
| assert result.headers.get("anthropic-ratelimit-tokens-limit") is None | |
| async def test_upstream_http_error_preserves_status_body_and_metrics(self, monkeypatch): | |
| """Upstream non-200 streaming responses should preserve status/body and metrics.""" | |
| proxy = self._create_mock_proxy() | |
| mock_response = self._create_mock_upstream_response() | |
| mock_response.status_code = 503 | |
| mock_response.headers = httpx.Headers( | |
| { | |
| "content-type": "application/json", | |
| "content-encoding": "gzip", | |
| "content-length": "42", | |
| } | |
| ) | |
| mock_response.aread = AsyncMock(return_value=b'{"error":{"message":"capacity exhausted"}}') | |
| mock_response.aclose = AsyncMock() | |
| mock_request = MagicMock() | |
| proxy.http_client.build_request = MagicMock(return_value=mock_request) | |
| proxy.http_client.send = AsyncMock(return_value=mock_response) | |
| fake_logger = MagicMock() | |
| monkeypatch.setattr(streaming_module, "logger", fake_logger) | |
| result = await proxy._stream_response( | |
| url="https://api.anthropic.com/v1/messages", | |
| headers={"x-api-key": "sk-test"}, | |
| body={ | |
| "model": "claude-sonnet-4-20250514", | |
| "max_tokens": 100, | |
| "stream": True, | |
| "messages": [{"role": "user", "content": "hi"}], | |
| }, | |
| provider="anthropic", | |
| model="claude-sonnet-4-20250514", | |
| request_id="test-http-error", | |
| original_tokens=10, | |
| optimized_tokens=10, | |
| tokens_saved=0, | |
| transforms_applied=[], | |
| tags={}, | |
| optimization_latency=0.0, | |
| ) | |
| assert result.status_code == 503 | |
| assert result.body == b'{"error":{"message":"capacity exhausted"}}' | |
| assert result.headers.get("content-encoding") is None | |
| fake_logger.warning.assert_any_call( | |
| "[%s] Forwarding upstream streaming error status=%s url=%s", | |
| "test-http-error", | |
| 503, | |
| "https://api.anthropic.com/v1/messages", | |
| ) | |
| proxy.metrics.record_request.assert_awaited_once() | |
| proxy.cost_tracker.record_tokens.assert_called_once() | |
| mock_response.aclose.assert_awaited_once() | |
| async def test_upstream_http_error_closes_response_when_body_read_fails(self, monkeypatch): | |
| """Reading a streaming error body should still close the upstream response.""" | |
| proxy = self._create_mock_proxy() | |
| mock_response = self._create_mock_upstream_response() | |
| mock_response.status_code = 502 | |
| mock_response.aread = AsyncMock(side_effect=RuntimeError("boom")) | |
| mock_response.aclose = AsyncMock() | |
| mock_request = MagicMock() | |
| proxy.http_client.build_request = MagicMock(return_value=mock_request) | |
| proxy.http_client.send = AsyncMock(return_value=mock_response) | |
| fake_logger = MagicMock() | |
| monkeypatch.setattr(streaming_module, "logger", fake_logger) | |
| result = await proxy._stream_response( | |
| url="https://api.anthropic.com/v1/messages", | |
| headers={"x-api-key": "sk-test"}, | |
| body={ | |
| "model": "claude-sonnet-4-20250514", | |
| "max_tokens": 100, | |
| "stream": True, | |
| "messages": [{"role": "user", "content": "hi"}], | |
| }, | |
| provider="anthropic", | |
| model="claude-sonnet-4-20250514", | |
| request_id="test-http-error-read-fail", | |
| original_tokens=10, | |
| optimized_tokens=10, | |
| tokens_saved=0, | |
| transforms_applied=[], | |
| tags={}, | |
| optimization_latency=0.0, | |
| ) | |
| assert result.status_code == 502 | |
| assert result.headers.get("content-type") == "application/json" | |
| assert b"Failed to read upstream error response body" in result.body | |
| fake_logger.warning.assert_any_call( | |
| "[%s] Failed reading upstream error body status=%s url=%s error=%s", | |
| "test-http-error-read-fail", | |
| 502, | |
| "https://api.anthropic.com/v1/messages", | |
| mock_response.aread.side_effect, | |
| ) | |
| mock_response.aclose.assert_awaited_once() | |
| async def test_connect_error_returns_sse_error(self): | |
| """Connection errors should return an SSE error event (not crash).""" | |
| proxy = self._create_mock_proxy() | |
| mock_request = MagicMock() | |
| proxy.http_client.build_request = MagicMock(return_value=mock_request) | |
| proxy.http_client.send = AsyncMock(side_effect=httpx.ConnectError("Connection refused")) | |
| result = await proxy._stream_response( | |
| url="https://api.anthropic.com/v1/messages", | |
| headers={"x-api-key": "sk-test"}, | |
| body={ | |
| "model": "claude-sonnet-4-20250514", | |
| "max_tokens": 100, | |
| "stream": True, | |
| "messages": [{"role": "user", "content": "hi"}], | |
| }, | |
| provider="anthropic", | |
| model="claude-sonnet-4-20250514", | |
| request_id="test-error", | |
| original_tokens=10, | |
| optimized_tokens=10, | |
| tokens_saved=0, | |
| transforms_applied=[], | |
| tags={}, | |
| optimization_latency=0.0, | |
| ) | |
| # Should return a StreamingResponse with error SSE event | |
| assert result.media_type == "text/event-stream" | |
| # Consume the generator to get the error event | |
| chunks = [] | |
| async for chunk in result.body_iterator: | |
| chunks.append(chunk) | |
| assert len(chunks) == 1 | |
| raw = chunks[0].decode("utf-8") | |
| assert "event: error" in raw | |
| error_data = json.loads(raw.split("data: ")[1].strip()) | |
| assert error_data["error"]["type"] == "connection_error" | |
| assert "Connection refused" in error_data["error"]["message"] | |
| async def test_connect_timeout_retries_before_returning_stream(self): | |
| """Transient connect timeouts should retry before failing the stream.""" | |
| proxy = self._create_mock_proxy() | |
| mock_response = self._create_mock_upstream_response() | |
| mock_request = MagicMock() | |
| proxy.http_client.build_request = MagicMock(return_value=mock_request) | |
| attempts = {"count": 0} | |
| async def flaky_send(*args, **kwargs): | |
| attempts["count"] += 1 | |
| if attempts["count"] == 1: | |
| raise httpx.ConnectTimeout("timed out") | |
| return mock_response | |
| proxy.http_client.send = AsyncMock(side_effect=flaky_send) | |
| result = await proxy._stream_response( | |
| url="https://api.anthropic.com/v1/messages", | |
| headers={"x-api-key": "sk-test"}, | |
| body={ | |
| "model": "claude-sonnet-4-20250514", | |
| "max_tokens": 100, | |
| "stream": True, | |
| "messages": [{"role": "user", "content": "hi"}], | |
| }, | |
| provider="anthropic", | |
| model="claude-sonnet-4-20250514", | |
| request_id="test-retry", | |
| original_tokens=10, | |
| optimized_tokens=10, | |
| tokens_saved=0, | |
| transforms_applied=[], | |
| tags={}, | |
| optimization_latency=0.0, | |
| ) | |
| chunks = [] | |
| async for chunk in result.body_iterator: | |
| chunks.append(chunk) | |
| assert attempts["count"] == 2 | |
| assert chunks | |