AIstudioProxyAPI / tests /integration /test_queue_fifo.py
peijun1's picture
Deploy AI Studio Proxy API to Hugging Face Spaces
a5784e9
Raw
History Blame Contribute Delete
15.3 kB
"""
Integration tests for queue FIFO (First-In-First-Out) ordering.
These tests use REAL asyncio.Queue from server_state to verify actual queue
behavior, ensuring requests are processed in the correct order.
Test Strategy:
- Use real_server_state fixture (real asyncio.Queue, real asyncio.Lock)
- Verify FIFO ordering is maintained
- Test concurrent request submission
- Verify queue processing order matches submission order
Coverage Target: Queue processing order integrity
"""
import asyncio
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import pytest
# Test stub for QueueManager used in these tests
class QueueManager:
"""Functional stub class for queue management in tests."""
request_queue: Any = None
processing_lock: Any = None
logger: Any = None
async def check_queue_disconnects(self) -> Any:
"""Logic for checking disconnects in the queue."""
from api_utils.client_connection import check_client_connection
from api_utils.error_utils import client_disconnected
queue_size = self.request_queue.qsize()
items_to_requeue = []
for _ in range(queue_size):
try:
item = self.request_queue.get_nowait()
item_req_id = item.get("req_id")
if not item.get("cancelled", False):
item_http_req = item.get("http_request")
if item_http_req:
try:
if not await check_client_connection(
item_req_id, item_http_req
):
item["cancelled"] = True
item_fut = item.get("result_future")
if item_fut and not item_fut.done():
item_fut.set_exception(
client_disconnected(
item_req_id,
"Client disconnected while queued.",
)
)
except Exception as e:
if self.logger:
self.logger.error(
f"[{item_req_id}] Error in disconnect check: {e}"
)
items_to_requeue.append(item)
except asyncio.QueueEmpty:
break
for item in items_to_requeue:
await self.request_queue.put(item)
@pytest.mark.integration
class TestQueueFIFOOrdering:
"""Integration tests for queue FIFO ordering with real asyncio.Queue."""
async def test_queue_processes_requests_in_fifo_order(self, real_server_state):
"""
Test that queue processes requests in submission order (FIFO).
Uses REAL asyncio.Queue to verify actual ordering behavior.
"""
queue = real_server_state.request_queue
processing_order = []
# Create 5 requests
requests = []
for i in range(5):
item = {
"req_id": f"req-{i}",
"request_data": MagicMock(),
"http_request": MagicMock(),
"result_future": asyncio.Future(),
"cancelled": False,
}
requests.append(item)
await queue.put(item)
# Process requests and track order
while not queue.empty():
item = await queue.get()
processing_order.append(item["req_id"])
queue.task_done()
# Verify FIFO ordering
expected_order = [f"req-{i}" for i in range(5)]
assert processing_order == expected_order
async def test_concurrent_submission_maintains_order(self, real_server_state):
"""
Test that concurrent request submissions maintain submission order.
Verifies queue.put() is thread-safe and maintains order.
"""
queue = real_server_state.request_queue
submission_log = []
async def submit_request(req_id: str):
"""Submit a request and log submission."""
submission_log.append(req_id)
item = {
"req_id": req_id,
"request_data": MagicMock(),
"http_request": MagicMock(),
"result_future": asyncio.Future(),
"cancelled": False,
}
await queue.put(item)
# Small delay to simulate real submission timing
await asyncio.sleep(0.01)
# Submit 10 requests concurrently
tasks = [asyncio.create_task(submit_request(f"req-{i}")) for i in range(10)]
await asyncio.gather(*tasks)
# Process and verify order matches submission
processing_order = []
while not queue.empty():
item = await queue.get()
processing_order.append(item["req_id"])
# Processing order should match submission order
assert processing_order == submission_log
async def test_queue_with_mixed_priorities_still_fifo(self, real_server_state):
"""
Test that even with different request types, FIFO is maintained.
Different models, streaming vs non-streaming - all should be FIFO.
"""
queue = real_server_state.request_queue
# Create diverse request types
requests = [
{
"req_id": "req-0-streaming",
"request_data": MagicMock(stream=True, model="gemini-1.5-pro"),
"http_request": MagicMock(),
"result_future": asyncio.Future(),
"cancelled": False,
},
{
"req_id": "req-1-non-streaming",
"request_data": MagicMock(stream=False, model="gemini-1.5-pro"),
"http_request": MagicMock(),
"result_future": asyncio.Future(),
"cancelled": False,
},
{
"req_id": "req-2-flash",
"request_data": MagicMock(stream=True, model="gemini-1.5-flash"),
"http_request": MagicMock(),
"result_future": asyncio.Future(),
"cancelled": False,
},
{
"req_id": "req-3-pro-large",
"request_data": MagicMock(stream=False, model="gemini-1.5-pro"),
"http_request": MagicMock(),
"result_future": asyncio.Future(),
"cancelled": False,
},
]
# Add all requests
for req in requests:
await queue.put(req)
# Process and verify order
processing_order = []
while not queue.empty():
item = await queue.get()
processing_order.append(item["req_id"])
expected_order = [r["req_id"] for r in requests]
assert processing_order == expected_order
@pytest.mark.integration
class TestQueueWithDisconnects:
"""Integration tests for queue behavior when clients disconnect."""
async def test_cancelled_requests_maintain_queue_order(self, real_server_state):
"""
Test that cancelled requests don't disrupt queue ordering.
Cancelled items should be skipped but not affect FIFO order.
"""
queue = real_server_state.request_queue
queue_manager = QueueManager()
queue_manager.request_queue = queue
# Create 5 requests, mark 2nd and 4th as cancelled
requests = []
for i in range(5):
item = {
"req_id": f"req-{i}",
"request_data": MagicMock(),
"http_request": MagicMock(),
"result_future": asyncio.Future(),
"cancelled": i in [1, 3], # Cancel req-1 and req-3
}
requests.append(item)
await queue.put(item)
# Process queue
processed = []
cancelled = []
while not queue.empty():
item = await queue.get()
if item["cancelled"]:
cancelled.append(item["req_id"])
else:
processed.append(item["req_id"])
queue.task_done()
# Verify cancelled items were detected
assert cancelled == ["req-1", "req-3"]
# Verify non-cancelled items processed in order
assert processed == ["req-0", "req-2", "req-4"]
async def test_check_queue_disconnects_preserves_order(self, real_server_state):
"""
Test that check_queue_disconnects maintains FIFO order.
When checking disconnects, items should be re-queued in same order.
"""
queue = real_server_state.request_queue
queue_manager = QueueManager()
queue_manager.request_queue = queue
queue_manager.logger = MagicMock()
# Create 3 items, 2nd is disconnected
item1 = {
"req_id": "req-0",
"http_request": MagicMock(),
"cancelled": False,
"result_future": asyncio.Future(),
}
item1["http_request"].is_disconnected = AsyncMock(return_value=False)
item2 = {
"req_id": "req-1",
"http_request": MagicMock(),
"cancelled": False,
"result_future": asyncio.Future(),
}
item2["http_request"].is_disconnected = AsyncMock(return_value=True)
item3 = {
"req_id": "req-2",
"http_request": MagicMock(),
"cancelled": False,
"result_future": asyncio.Future(),
}
item3["http_request"].is_disconnected = AsyncMock(return_value=False)
# Add to queue
await queue.put(item1)
await queue.put(item2)
await queue.put(item3)
# Check disconnects
await queue_manager.check_queue_disconnects()
# Verify order maintained (disconnected item should be cancelled)
assert item2["cancelled"] is True
# Verify queue still has items in correct order
order = []
while not queue.empty():
item = await queue.get()
order.append(item["req_id"])
assert order == ["req-0", "req-1", "req-2"]
@pytest.mark.integration
class TestQueuePerformance:
"""Performance tests for queue operations."""
async def test_large_queue_maintains_fifo(self, real_server_state):
"""
Test FIFO ordering with large queue (100 items).
Verifies performance doesn't affect correctness.
"""
queue = real_server_state.request_queue
# Add 100 requests
for i in range(100):
item = {
"req_id": f"req-{i:03d}",
"request_data": MagicMock(),
"http_request": MagicMock(),
"result_future": asyncio.Future(),
"cancelled": False,
}
await queue.put(item)
# Process all
processing_order = []
while not queue.empty():
item = await queue.get()
processing_order.append(item["req_id"])
queue.task_done()
# Verify all 100 in correct order
expected_order = [f"req-{i:03d}" for i in range(100)]
assert processing_order == expected_order
async def test_queue_performance_with_concurrent_access(self, real_server_state):
"""
Test queue maintains FIFO under concurrent access.
Multiple tasks submitting and processing simultaneously.
"""
queue = real_server_state.request_queue
submitted = []
processed = []
submit_lock = asyncio.Lock()
process_lock = asyncio.Lock()
async def submitter(start_id: int, count: int):
"""Submit requests concurrently."""
for i in range(count):
req_id = f"req-{start_id + i}"
async with submit_lock:
submitted.append(req_id)
item = {
"req_id": req_id,
"request_data": MagicMock(),
"http_request": MagicMock(),
"result_future": asyncio.Future(),
"cancelled": False,
}
await queue.put(item)
async def processor():
"""Process requests concurrently."""
while True:
try:
item = await asyncio.wait_for(queue.get(), timeout=0.1)
async with process_lock:
processed.append(item["req_id"])
queue.task_done()
except asyncio.TimeoutError:
break
# Start 3 submitters and 2 processors concurrently
submitters = [
asyncio.create_task(submitter(0, 10)),
asyncio.create_task(submitter(10, 10)),
asyncio.create_task(submitter(20, 10)),
]
# Wait for all submissions
await asyncio.gather(*submitters)
# Start processors
processors = [
asyncio.create_task(processor()),
asyncio.create_task(processor()),
]
# Wait for all processing
await asyncio.gather(*processors)
# Verify all 30 items processed
assert len(processed) == 30
# Verify order matches submission
assert processed == submitted
@pytest.mark.integration
class TestQueueRecovery:
"""Tests for queue recovery after errors."""
async def test_queue_continues_after_processing_error(self, real_server_state):
"""
Test that queue processing continues after error in one request.
Error in req-1 should not affect req-2 processing order.
"""
queue = real_server_state.request_queue
processing_log = []
# Add 3 requests
items = []
for i in range(3):
item = {
"req_id": f"req-{i}",
"request_data": MagicMock(),
"http_request": MagicMock(),
"result_future": asyncio.Future(),
"cancelled": False,
}
items.append(item)
await queue.put(item)
# Process with error on req-1
while not queue.empty():
item = await queue.get()
processing_log.append(item["req_id"])
if item["req_id"] == "req-1":
# Simulate processing error
item["result_future"].set_exception(
Exception("Processing failed for req-1")
)
else:
# Normal processing
item["result_future"].set_result("Success")
queue.task_done()
# Verify all 3 processed in order despite error
assert processing_log == ["req-0", "req-1", "req-2"]
# Retrieve exceptions from futures to prevent warnings
for item in items:
if item["result_future"].done():
try:
item["result_future"].exception()
except Exception:
pass