Spaces:
Paused
Paused
| """Tests for cross-loop client cache isolation fix (#2681). | |
| Verifies that _get_cached_client() returns different AsyncOpenAI clients | |
| when called from different event loops, preventing the httpx deadlock | |
| that occurs when a cached async client bound to loop A is reused on loop B. | |
| This test file is self-contained and does not import the full tool chain, | |
| so it can run without optional dependencies like firecrawl. | |
| """ | |
| import asyncio | |
| import threading | |
| from concurrent.futures import ThreadPoolExecutor | |
| from unittest.mock import patch, MagicMock | |
| from types import SimpleNamespace | |
| import pytest | |
| # --------------------------------------------------------------------------- | |
| # Minimal stubs so we can import _get_cached_client without the full tree | |
| # --------------------------------------------------------------------------- | |
| def _stub_resolve_provider_client(provider, model, async_mode, **kw): | |
| """Return a unique mock client each time, simulating AsyncOpenAI creation.""" | |
| client = MagicMock(name=f"client-{provider}-async={async_mode}") | |
| client.api_key = "test" | |
| client.base_url = kw.get("explicit_base_url", "http://localhost:8081/v1") | |
| return client, model or "test-model" | |
| def _clean_client_cache(): | |
| """Clear the client cache before each test.""" | |
| import importlib | |
| # We need to patch before importing | |
| with patch.dict("sys.modules", {}): | |
| pass | |
| # Import and clear | |
| import agent.auxiliary_client as ac | |
| ac._client_cache.clear() | |
| yield | |
| ac._client_cache.clear() | |
| class TestCrossLoopCacheIsolation: | |
| """Verify async clients are cached per-event-loop, not globally.""" | |
| def test_same_loop_reuses_client(self): | |
| """Within a single event loop, the same client should be returned.""" | |
| from agent.auxiliary_client import _get_cached_client, _client_cache | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| with patch("agent.auxiliary_client.resolve_provider_client", | |
| side_effect=_stub_resolve_provider_client): | |
| client1, _ = _get_cached_client("custom", "m1", async_mode=True, | |
| base_url="http://localhost:8081/v1") | |
| client2, _ = _get_cached_client("custom", "m1", async_mode=True, | |
| base_url="http://localhost:8081/v1") | |
| assert client1 is client2, ( | |
| "Same loop should return the same cached client" | |
| ) | |
| loop.close() | |
| def test_different_loops_get_different_clients(self): | |
| """Different event loops must get separate client instances.""" | |
| from agent.auxiliary_client import _get_cached_client | |
| results = {} | |
| def _get_client_on_new_loop(name): | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| with patch("agent.auxiliary_client.resolve_provider_client", | |
| side_effect=_stub_resolve_provider_client): | |
| client, _ = _get_cached_client("custom", "m1", async_mode=True, | |
| base_url="http://localhost:8081/v1") | |
| results[name] = (id(client), id(loop)) | |
| # Don't close loop — simulates real usage where loops persist | |
| t1 = threading.Thread(target=_get_client_on_new_loop, args=("a",)) | |
| t2 = threading.Thread(target=_get_client_on_new_loop, args=("b",)) | |
| t1.start(); t1.join() | |
| t2.start(); t2.join() | |
| client_id_a, loop_id_a = results["a"] | |
| client_id_b, loop_id_b = results["b"] | |
| assert loop_id_a != loop_id_b, "Test setup error: same loop on both threads" | |
| assert client_id_a != client_id_b, ( | |
| "Different event loops got the SAME cached client — this causes " | |
| "httpx cross-loop deadlocks in gateway mode (#2681)" | |
| ) | |
| def test_sync_clients_not_affected(self): | |
| """Sync clients (async_mode=False) should still be cached globally, | |
| since httpx.Client (sync) doesn't bind to an event loop.""" | |
| from agent.auxiliary_client import _get_cached_client | |
| results = {} | |
| def _get_sync_client(name): | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| with patch("agent.auxiliary_client.resolve_provider_client", | |
| side_effect=_stub_resolve_provider_client): | |
| client, _ = _get_cached_client("custom", "m1", async_mode=False, | |
| base_url="http://localhost:8081/v1") | |
| results[name] = id(client) | |
| t1 = threading.Thread(target=_get_sync_client, args=("a",)) | |
| t2 = threading.Thread(target=_get_sync_client, args=("b",)) | |
| t1.start(); t1.join() | |
| t2.start(); t2.join() | |
| assert results["a"] == results["b"], ( | |
| "Sync clients should be shared across threads (no loop binding)" | |
| ) | |
| def test_gateway_simulation_no_deadlock(self): | |
| """Simulate gateway mode: _run_async spawns a thread with asyncio.run(), | |
| which creates a new loop. The cached client must be created on THAT loop, | |
| not reused from a different one.""" | |
| from agent.auxiliary_client import _get_cached_client | |
| # Simulate: first call on "gateway loop" | |
| gateway_loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(gateway_loop) | |
| with patch("agent.auxiliary_client.resolve_provider_client", | |
| side_effect=_stub_resolve_provider_client): | |
| gateway_client, _ = _get_cached_client("custom", "m1", async_mode=True, | |
| base_url="http://localhost:8081/v1") | |
| # Simulate: _run_async spawns a thread with asyncio.run() | |
| worker_client_id = [None] | |
| def _worker(): | |
| async def _inner(): | |
| with patch("agent.auxiliary_client.resolve_provider_client", | |
| side_effect=_stub_resolve_provider_client): | |
| client, _ = _get_cached_client("custom", "m1", async_mode=True, | |
| base_url="http://localhost:8081/v1") | |
| worker_client_id[0] = id(client) | |
| asyncio.run(_inner()) | |
| t = threading.Thread(target=_worker) | |
| t.start() | |
| t.join() | |
| assert worker_client_id[0] != id(gateway_client), ( | |
| "Worker thread (asyncio.run) got the gateway's cached client — " | |
| "this is the exact cross-loop scenario that causes httpx deadlocks. " | |
| "The cache key must include the event loop identity (#2681)" | |
| ) | |
| gateway_loop.close() | |
| def test_closed_loop_client_discarded(self): | |
| """A cached client whose loop has closed should be replaced.""" | |
| from agent.auxiliary_client import _get_cached_client | |
| loop1 = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop1) | |
| with patch("agent.auxiliary_client.resolve_provider_client", | |
| side_effect=_stub_resolve_provider_client): | |
| client1, _ = _get_cached_client("custom", "m1", async_mode=True, | |
| base_url="http://localhost:8081/v1") | |
| loop1.close() | |
| # New loop on same thread | |
| loop2 = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop2) | |
| with patch("agent.auxiliary_client.resolve_provider_client", | |
| side_effect=_stub_resolve_provider_client): | |
| client2, _ = _get_cached_client("custom", "m1", async_mode=True, | |
| base_url="http://localhost:8081/v1") | |
| assert client1 is not client2, ( | |
| "Client from closed loop should not be reused" | |
| ) | |
| loop2.close() | |