Spaces:
Running
Running
| """Tests for proxy scalability features. | |
| These tests verify connection pooling, HTTP/2, and worker configuration. | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| from unittest.mock import patch | |
| import httpx | |
| import pytest | |
| class TestConnectionPoolConfig: | |
| """Test connection pool configuration.""" | |
| def test_httpx_limits_basic(self): | |
| """Test that httpx accepts our connection limits.""" | |
| limits = httpx.Limits( | |
| max_connections=500, | |
| max_keepalive_connections=100, | |
| ) | |
| assert limits.max_connections == 500 | |
| assert limits.max_keepalive_connections == 100 | |
| def test_httpx_limits_custom(self): | |
| """Test custom connection limits.""" | |
| limits = httpx.Limits( | |
| max_connections=1000, | |
| max_keepalive_connections=200, | |
| ) | |
| assert limits.max_connections == 1000 | |
| assert limits.max_keepalive_connections == 200 | |
| def test_httpx_timeout_config(self): | |
| """Test timeout configuration for proxy.""" | |
| timeout = httpx.Timeout( | |
| connect=10.0, | |
| read=300.0, | |
| write=300.0, | |
| pool=10.0, | |
| ) | |
| assert timeout.connect == 10.0 | |
| assert timeout.read == 300.0 | |
| assert timeout.write == 300.0 | |
| assert timeout.pool == 10.0 | |
| def test_async_client_with_limits(self): | |
| """Test AsyncClient accepts connection pool limits.""" | |
| async def _run(): | |
| limits = httpx.Limits( | |
| max_connections=500, | |
| max_keepalive_connections=100, | |
| ) | |
| async with httpx.AsyncClient( | |
| limits=limits, | |
| timeout=httpx.Timeout(10.0), | |
| ) as client: | |
| assert client is not None | |
| assert limits.max_connections == 500 | |
| assert limits.max_keepalive_connections == 100 | |
| asyncio.run(_run()) | |
| class TestHTTP2Config: | |
| """Test HTTP/2 configuration.""" | |
| def test_http2_requires_h2_package(self): | |
| """Test that http2=True requires h2 package.""" | |
| import importlib.util | |
| h2_available = importlib.util.find_spec("h2") is not None | |
| if h2_available: | |
| client = httpx.Client(http2=True) | |
| assert client._base_url is not None | |
| client.close() | |
| else: | |
| with pytest.raises(ImportError): | |
| httpx.Client(http2=True) | |
| def test_async_client_http2(self): | |
| """Test AsyncClient with HTTP/2 enabled.""" | |
| import importlib.util | |
| if not importlib.util.find_spec("h2"): | |
| pytest.skip("h2 package not installed") | |
| async def _run(): | |
| async with httpx.AsyncClient( | |
| http2=True, | |
| limits=httpx.Limits(max_connections=100), | |
| ) as client: | |
| assert client is not None | |
| asyncio.run(_run()) | |
| class TestProxyConfigDataclass: | |
| """Test ProxyConfig dataclass with new fields.""" | |
| def test_proxy_config_defaults(self): | |
| """Test default values for scalability settings.""" | |
| from dataclasses import dataclass | |
| class ProxyConfigTest: | |
| """Minimal proxy config for testing.""" | |
| host: str = "127.0.0.1" | |
| port: int = 8787 | |
| request_timeout_seconds: int = 300 | |
| connect_timeout_seconds: int = 10 | |
| max_connections: int = 500 | |
| max_keepalive_connections: int = 100 | |
| http2: bool = True | |
| config = ProxyConfigTest() | |
| assert config.max_connections == 500 | |
| assert config.max_keepalive_connections == 100 | |
| assert config.http2 is True | |
| def test_proxy_config_custom_values(self): | |
| """Test custom values for scalability settings.""" | |
| from dataclasses import dataclass | |
| class ProxyConfigTest: | |
| max_connections: int = 500 | |
| max_keepalive_connections: int = 100 | |
| http2: bool = True | |
| config = ProxyConfigTest( | |
| max_connections=1000, | |
| max_keepalive_connections=200, | |
| http2=False, | |
| ) | |
| assert config.max_connections == 1000 | |
| assert config.max_keepalive_connections == 200 | |
| assert config.http2 is False | |
| class TestConcurrencyPatterns: | |
| """Test async concurrency patterns used in proxy.""" | |
| def test_semaphore_for_backpressure(self): | |
| """Test semaphore pattern for limiting concurrent requests.""" | |
| async def _run(): | |
| semaphore = asyncio.Semaphore(3) | |
| active = [] | |
| completed = [] | |
| async def task(task_id: int): | |
| async with semaphore: | |
| active.append(task_id) | |
| assert len(active) <= 3 | |
| await asyncio.sleep(0.01) | |
| active.remove(task_id) | |
| completed.append(task_id) | |
| tasks = [task(i) for i in range(10)] | |
| await asyncio.gather(*tasks) | |
| assert len(completed) == 10 | |
| asyncio.run(_run()) | |
| def test_connection_reuse_pattern(self): | |
| """Test that single client instance is reused (not recreated).""" | |
| async def _run(): | |
| clients_created = [] | |
| class MockProxyWithClient: | |
| def __init__(self): | |
| self.http_client = None | |
| async def startup(self): | |
| self.http_client = httpx.AsyncClient( | |
| limits=httpx.Limits(max_connections=100), | |
| ) | |
| clients_created.append(self.http_client) | |
| async def shutdown(self): | |
| if self.http_client: | |
| await self.http_client.aclose() | |
| async def make_request(self, url: str): | |
| return self.http_client | |
| proxy = MockProxyWithClient() | |
| await proxy.startup() | |
| client1 = await proxy.make_request("http://example1.com") | |
| client2 = await proxy.make_request("http://example2.com") | |
| client3 = await proxy.make_request("http://example3.com") | |
| assert client1 is client2 is client3 | |
| assert len(clients_created) == 1 | |
| await proxy.shutdown() | |
| asyncio.run(_run()) | |
| class TestTimeoutOverrides: | |
| """Test per-request timeout overrides.""" | |
| def test_request_level_timeout_override(self): | |
| """Test that timeout can be overridden per-request.""" | |
| async def _run(): | |
| async with httpx.AsyncClient( | |
| timeout=httpx.Timeout(10.0), | |
| ): | |
| override_timeout = httpx.Timeout(120.0) | |
| assert override_timeout.read == 120.0 | |
| assert override_timeout.connect == 120.0 | |
| asyncio.run(_run()) | |
| class TestWorkerConfiguration: | |
| """Test worker process configuration.""" | |
| def test_uvicorn_workers_parameter(self): | |
| """Test that uvicorn accepts workers parameter.""" | |
| uvicorn = pytest.importorskip("uvicorn") | |
| config = uvicorn.Config( | |
| app="app:app", | |
| workers=4, | |
| limit_concurrency=1000, | |
| ) | |
| assert config.workers == 4 | |
| assert config.limit_concurrency == 1000 | |
| def test_single_worker_default(self): | |
| """Test that default is single worker (None).""" | |
| uvicorn = pytest.importorskip("uvicorn") | |
| config = uvicorn.Config(app="app:app") | |
| assert config.workers is None or config.workers == 1 | |
| def test_run_server_uses_import_string_for_multiple_workers(self, monkeypatch): | |
| from headroom.proxy.models import ProxyConfig | |
| from headroom.proxy.server import _MULTI_WORKER_CONFIG_ENV, run_server | |
| captured = {} | |
| config = ProxyConfig(host="0.0.0.0", port=8787, max_connections=200) | |
| def fake_run(app, **kwargs): | |
| captured["app"] = app | |
| captured["kwargs"] = kwargs | |
| monkeypatch.delenv(_MULTI_WORKER_CONFIG_ENV, raising=False) | |
| with patch("headroom.proxy.server.uvicorn.run", fake_run): | |
| run_server(config, workers=4, limit_concurrency=250) | |
| assert captured["app"] == "headroom.proxy.server:create_app_from_env" | |
| assert captured["kwargs"]["workers"] == 4 | |
| assert captured["kwargs"]["limit_concurrency"] == 250 | |
| assert captured["kwargs"]["factory"] is True | |
| payload = json.loads(os.environ[_MULTI_WORKER_CONFIG_ENV]) | |
| assert payload["host"] == "0.0.0.0" | |
| assert payload["port"] == 8787 | |
| assert payload["max_connections"] == 200 | |
| monkeypatch.delenv(_MULTI_WORKER_CONFIG_ENV, raising=False) | |