Spaces:
Paused
Paused
| """ | |
| Integration tests for queue FIFO (First-In-First-Out) ordering. | |
| These tests use REAL asyncio.Queue from server_state to verify actual queue | |
| behavior, ensuring requests are processed in the correct order. | |
| Test Strategy: | |
| - Use real_server_state fixture (real asyncio.Queue, real asyncio.Lock) | |
| - Verify FIFO ordering is maintained | |
| - Test concurrent request submission | |
| - Verify queue processing order matches submission order | |
| Coverage Target: Queue processing order integrity | |
| """ | |
| import asyncio | |
| from typing import Any | |
| from unittest.mock import AsyncMock, MagicMock | |
| import pytest | |
| # Test stub for QueueManager used in these tests | |
| class QueueManager: | |
| """Functional stub class for queue management in tests.""" | |
| request_queue: Any = None | |
| processing_lock: Any = None | |
| logger: Any = None | |
| async def check_queue_disconnects(self) -> Any: | |
| """Logic for checking disconnects in the queue.""" | |
| from api_utils.client_connection import check_client_connection | |
| from api_utils.error_utils import client_disconnected | |
| queue_size = self.request_queue.qsize() | |
| items_to_requeue = [] | |
| for _ in range(queue_size): | |
| try: | |
| item = self.request_queue.get_nowait() | |
| item_req_id = item.get("req_id") | |
| if not item.get("cancelled", False): | |
| item_http_req = item.get("http_request") | |
| if item_http_req: | |
| try: | |
| if not await check_client_connection( | |
| item_req_id, item_http_req | |
| ): | |
| item["cancelled"] = True | |
| item_fut = item.get("result_future") | |
| if item_fut and not item_fut.done(): | |
| item_fut.set_exception( | |
| client_disconnected( | |
| item_req_id, | |
| "Client disconnected while queued.", | |
| ) | |
| ) | |
| except Exception as e: | |
| if self.logger: | |
| self.logger.error( | |
| f"[{item_req_id}] Error in disconnect check: {e}" | |
| ) | |
| items_to_requeue.append(item) | |
| except asyncio.QueueEmpty: | |
| break | |
| for item in items_to_requeue: | |
| await self.request_queue.put(item) | |
| class TestQueueFIFOOrdering: | |
| """Integration tests for queue FIFO ordering with real asyncio.Queue.""" | |
| async def test_queue_processes_requests_in_fifo_order(self, real_server_state): | |
| """ | |
| Test that queue processes requests in submission order (FIFO). | |
| Uses REAL asyncio.Queue to verify actual ordering behavior. | |
| """ | |
| queue = real_server_state.request_queue | |
| processing_order = [] | |
| # Create 5 requests | |
| requests = [] | |
| for i in range(5): | |
| item = { | |
| "req_id": f"req-{i}", | |
| "request_data": MagicMock(), | |
| "http_request": MagicMock(), | |
| "result_future": asyncio.Future(), | |
| "cancelled": False, | |
| } | |
| requests.append(item) | |
| await queue.put(item) | |
| # Process requests and track order | |
| while not queue.empty(): | |
| item = await queue.get() | |
| processing_order.append(item["req_id"]) | |
| queue.task_done() | |
| # Verify FIFO ordering | |
| expected_order = [f"req-{i}" for i in range(5)] | |
| assert processing_order == expected_order | |
| async def test_concurrent_submission_maintains_order(self, real_server_state): | |
| """ | |
| Test that concurrent request submissions maintain submission order. | |
| Verifies queue.put() is thread-safe and maintains order. | |
| """ | |
| queue = real_server_state.request_queue | |
| submission_log = [] | |
| async def submit_request(req_id: str): | |
| """Submit a request and log submission.""" | |
| submission_log.append(req_id) | |
| item = { | |
| "req_id": req_id, | |
| "request_data": MagicMock(), | |
| "http_request": MagicMock(), | |
| "result_future": asyncio.Future(), | |
| "cancelled": False, | |
| } | |
| await queue.put(item) | |
| # Small delay to simulate real submission timing | |
| await asyncio.sleep(0.01) | |
| # Submit 10 requests concurrently | |
| tasks = [asyncio.create_task(submit_request(f"req-{i}")) for i in range(10)] | |
| await asyncio.gather(*tasks) | |
| # Process and verify order matches submission | |
| processing_order = [] | |
| while not queue.empty(): | |
| item = await queue.get() | |
| processing_order.append(item["req_id"]) | |
| # Processing order should match submission order | |
| assert processing_order == submission_log | |
| async def test_queue_with_mixed_priorities_still_fifo(self, real_server_state): | |
| """ | |
| Test that even with different request types, FIFO is maintained. | |
| Different models, streaming vs non-streaming - all should be FIFO. | |
| """ | |
| queue = real_server_state.request_queue | |
| # Create diverse request types | |
| requests = [ | |
| { | |
| "req_id": "req-0-streaming", | |
| "request_data": MagicMock(stream=True, model="gemini-1.5-pro"), | |
| "http_request": MagicMock(), | |
| "result_future": asyncio.Future(), | |
| "cancelled": False, | |
| }, | |
| { | |
| "req_id": "req-1-non-streaming", | |
| "request_data": MagicMock(stream=False, model="gemini-1.5-pro"), | |
| "http_request": MagicMock(), | |
| "result_future": asyncio.Future(), | |
| "cancelled": False, | |
| }, | |
| { | |
| "req_id": "req-2-flash", | |
| "request_data": MagicMock(stream=True, model="gemini-1.5-flash"), | |
| "http_request": MagicMock(), | |
| "result_future": asyncio.Future(), | |
| "cancelled": False, | |
| }, | |
| { | |
| "req_id": "req-3-pro-large", | |
| "request_data": MagicMock(stream=False, model="gemini-1.5-pro"), | |
| "http_request": MagicMock(), | |
| "result_future": asyncio.Future(), | |
| "cancelled": False, | |
| }, | |
| ] | |
| # Add all requests | |
| for req in requests: | |
| await queue.put(req) | |
| # Process and verify order | |
| processing_order = [] | |
| while not queue.empty(): | |
| item = await queue.get() | |
| processing_order.append(item["req_id"]) | |
| expected_order = [r["req_id"] for r in requests] | |
| assert processing_order == expected_order | |
| class TestQueueWithDisconnects: | |
| """Integration tests for queue behavior when clients disconnect.""" | |
| async def test_cancelled_requests_maintain_queue_order(self, real_server_state): | |
| """ | |
| Test that cancelled requests don't disrupt queue ordering. | |
| Cancelled items should be skipped but not affect FIFO order. | |
| """ | |
| queue = real_server_state.request_queue | |
| queue_manager = QueueManager() | |
| queue_manager.request_queue = queue | |
| # Create 5 requests, mark 2nd and 4th as cancelled | |
| requests = [] | |
| for i in range(5): | |
| item = { | |
| "req_id": f"req-{i}", | |
| "request_data": MagicMock(), | |
| "http_request": MagicMock(), | |
| "result_future": asyncio.Future(), | |
| "cancelled": i in [1, 3], # Cancel req-1 and req-3 | |
| } | |
| requests.append(item) | |
| await queue.put(item) | |
| # Process queue | |
| processed = [] | |
| cancelled = [] | |
| while not queue.empty(): | |
| item = await queue.get() | |
| if item["cancelled"]: | |
| cancelled.append(item["req_id"]) | |
| else: | |
| processed.append(item["req_id"]) | |
| queue.task_done() | |
| # Verify cancelled items were detected | |
| assert cancelled == ["req-1", "req-3"] | |
| # Verify non-cancelled items processed in order | |
| assert processed == ["req-0", "req-2", "req-4"] | |
| async def test_check_queue_disconnects_preserves_order(self, real_server_state): | |
| """ | |
| Test that check_queue_disconnects maintains FIFO order. | |
| When checking disconnects, items should be re-queued in same order. | |
| """ | |
| queue = real_server_state.request_queue | |
| queue_manager = QueueManager() | |
| queue_manager.request_queue = queue | |
| queue_manager.logger = MagicMock() | |
| # Create 3 items, 2nd is disconnected | |
| item1 = { | |
| "req_id": "req-0", | |
| "http_request": MagicMock(), | |
| "cancelled": False, | |
| "result_future": asyncio.Future(), | |
| } | |
| item1["http_request"].is_disconnected = AsyncMock(return_value=False) | |
| item2 = { | |
| "req_id": "req-1", | |
| "http_request": MagicMock(), | |
| "cancelled": False, | |
| "result_future": asyncio.Future(), | |
| } | |
| item2["http_request"].is_disconnected = AsyncMock(return_value=True) | |
| item3 = { | |
| "req_id": "req-2", | |
| "http_request": MagicMock(), | |
| "cancelled": False, | |
| "result_future": asyncio.Future(), | |
| } | |
| item3["http_request"].is_disconnected = AsyncMock(return_value=False) | |
| # Add to queue | |
| await queue.put(item1) | |
| await queue.put(item2) | |
| await queue.put(item3) | |
| # Check disconnects | |
| await queue_manager.check_queue_disconnects() | |
| # Verify order maintained (disconnected item should be cancelled) | |
| assert item2["cancelled"] is True | |
| # Verify queue still has items in correct order | |
| order = [] | |
| while not queue.empty(): | |
| item = await queue.get() | |
| order.append(item["req_id"]) | |
| assert order == ["req-0", "req-1", "req-2"] | |
| class TestQueuePerformance: | |
| """Performance tests for queue operations.""" | |
| async def test_large_queue_maintains_fifo(self, real_server_state): | |
| """ | |
| Test FIFO ordering with large queue (100 items). | |
| Verifies performance doesn't affect correctness. | |
| """ | |
| queue = real_server_state.request_queue | |
| # Add 100 requests | |
| for i in range(100): | |
| item = { | |
| "req_id": f"req-{i:03d}", | |
| "request_data": MagicMock(), | |
| "http_request": MagicMock(), | |
| "result_future": asyncio.Future(), | |
| "cancelled": False, | |
| } | |
| await queue.put(item) | |
| # Process all | |
| processing_order = [] | |
| while not queue.empty(): | |
| item = await queue.get() | |
| processing_order.append(item["req_id"]) | |
| queue.task_done() | |
| # Verify all 100 in correct order | |
| expected_order = [f"req-{i:03d}" for i in range(100)] | |
| assert processing_order == expected_order | |
| async def test_queue_performance_with_concurrent_access(self, real_server_state): | |
| """ | |
| Test queue maintains FIFO under concurrent access. | |
| Multiple tasks submitting and processing simultaneously. | |
| """ | |
| queue = real_server_state.request_queue | |
| submitted = [] | |
| processed = [] | |
| submit_lock = asyncio.Lock() | |
| process_lock = asyncio.Lock() | |
| async def submitter(start_id: int, count: int): | |
| """Submit requests concurrently.""" | |
| for i in range(count): | |
| req_id = f"req-{start_id + i}" | |
| async with submit_lock: | |
| submitted.append(req_id) | |
| item = { | |
| "req_id": req_id, | |
| "request_data": MagicMock(), | |
| "http_request": MagicMock(), | |
| "result_future": asyncio.Future(), | |
| "cancelled": False, | |
| } | |
| await queue.put(item) | |
| async def processor(): | |
| """Process requests concurrently.""" | |
| while True: | |
| try: | |
| item = await asyncio.wait_for(queue.get(), timeout=0.1) | |
| async with process_lock: | |
| processed.append(item["req_id"]) | |
| queue.task_done() | |
| except asyncio.TimeoutError: | |
| break | |
| # Start 3 submitters and 2 processors concurrently | |
| submitters = [ | |
| asyncio.create_task(submitter(0, 10)), | |
| asyncio.create_task(submitter(10, 10)), | |
| asyncio.create_task(submitter(20, 10)), | |
| ] | |
| # Wait for all submissions | |
| await asyncio.gather(*submitters) | |
| # Start processors | |
| processors = [ | |
| asyncio.create_task(processor()), | |
| asyncio.create_task(processor()), | |
| ] | |
| # Wait for all processing | |
| await asyncio.gather(*processors) | |
| # Verify all 30 items processed | |
| assert len(processed) == 30 | |
| # Verify order matches submission | |
| assert processed == submitted | |
| class TestQueueRecovery: | |
| """Tests for queue recovery after errors.""" | |
| async def test_queue_continues_after_processing_error(self, real_server_state): | |
| """ | |
| Test that queue processing continues after error in one request. | |
| Error in req-1 should not affect req-2 processing order. | |
| """ | |
| queue = real_server_state.request_queue | |
| processing_log = [] | |
| # Add 3 requests | |
| items = [] | |
| for i in range(3): | |
| item = { | |
| "req_id": f"req-{i}", | |
| "request_data": MagicMock(), | |
| "http_request": MagicMock(), | |
| "result_future": asyncio.Future(), | |
| "cancelled": False, | |
| } | |
| items.append(item) | |
| await queue.put(item) | |
| # Process with error on req-1 | |
| while not queue.empty(): | |
| item = await queue.get() | |
| processing_log.append(item["req_id"]) | |
| if item["req_id"] == "req-1": | |
| # Simulate processing error | |
| item["result_future"].set_exception( | |
| Exception("Processing failed for req-1") | |
| ) | |
| else: | |
| # Normal processing | |
| item["result_future"].set_result("Success") | |
| queue.task_done() | |
| # Verify all 3 processed in order despite error | |
| assert processing_log == ["req-0", "req-1", "req-2"] | |
| # Retrieve exceptions from futures to prevent warnings | |
| for item in items: | |
| if item["result_future"].done(): | |
| try: | |
| item["result_future"].exception() | |
| except Exception: | |
| pass | |