peijun1's picture
Deploy AI Studio Proxy API to Hugging Face Spaces
a5784e9
Raw
History Blame Contribute Delete
5.3 kB
import asyncio
import json
import time
from unittest.mock import MagicMock
import pytest
from fastapi.responses import JSONResponse
from api_utils.routers.queue import (
cancel_queued_request,
cancel_request,
get_queue_status,
)
@pytest.mark.asyncio
async def test_cancel_queued_request_found():
"""Test cancelling a request that is in the queue."""
req_id = "req_123"
queue = asyncio.Queue()
logger = MagicMock()
# Create a mock item
future = asyncio.Future()
item = {"req_id": req_id, "result_future": future, "cancelled": False}
await queue.put(item)
# Add another item
await queue.put({"req_id": "other_req"})
result = await cancel_queued_request(req_id, queue, logger)
assert result is True
assert item["cancelled"] is True
assert future.done()
try:
future.result()
except Exception as e:
assert "cancelled" in str(e).lower()
# Verify queue state
assert queue.qsize() == 2
# Verify items are put back
items = []
while not queue.empty():
items.append(await queue.get())
assert len(items) == 2
assert items[0]["req_id"] == req_id
assert items[1]["req_id"] == "other_req"
@pytest.mark.asyncio
async def test_cancel_queued_request_not_found():
"""Test cancelling a request that is not in the queue."""
req_id = "req_123"
queue = asyncio.Queue()
logger = MagicMock()
await queue.put({"req_id": "other_req"})
result = await cancel_queued_request(req_id, queue, logger)
assert result is False
assert queue.qsize() == 1
@pytest.mark.asyncio
async def test_cancel_request_endpoint_success():
"""Test cancel_request endpoint when request is found."""
req_id = "req_123"
queue = asyncio.Queue()
logger = MagicMock()
# Add item to queue
await queue.put({"req_id": req_id, "cancelled": False})
response = await cancel_request(req_id, logger, queue)
assert isinstance(response, JSONResponse)
assert response.status_code == 200
body = bytes(response.body).decode()
assert "success" in body
assert "marked as cancelled" in body
@pytest.mark.asyncio
async def test_cancel_request_endpoint_not_found():
"""Test cancel_request endpoint when request is not found."""
req_id = "req_123"
queue = asyncio.Queue()
logger = MagicMock()
response = await cancel_request(req_id, logger, queue)
assert isinstance(response, JSONResponse)
assert response.status_code == 404
body = bytes(response.body).decode()
assert "not found" in body
@pytest.mark.asyncio
async def test_get_queue_status():
"""Test get_queue_status endpoint."""
queue = asyncio.Queue()
lock = asyncio.Lock()
# Add items
mock_request = MagicMock()
mock_request.stream = True
item1 = {
"req_id": "req_1",
"enqueue_time": time.time() - 10,
"request_data": mock_request,
"cancelled": False,
}
item2 = {
"req_id": "req_2",
"enqueue_time": time.time() - 5,
"request_data": mock_request,
"cancelled": True,
}
await queue.put(item1)
await queue.put(item2)
# Lock the lock
await lock.acquire()
response = await get_queue_status(queue, lock)
assert isinstance(response, JSONResponse)
assert response.status_code == 200
import json
data = json.loads(bytes(response.body))
assert data["queue_length"] == 2
assert data["is_processing_locked"] is True
assert len(data["items"]) == 2
assert data["items"][0]["req_id"] == "req_1"
assert data["items"][1]["req_id"] == "req_2"
assert data["items"][0]["wait_time_seconds"] >= 10
lock.release()
@pytest.mark.asyncio
async def test_get_queue_status_empty():
"""Test get_queue_status with empty queue."""
queue = asyncio.Queue()
lock = asyncio.Lock()
response = await get_queue_status(queue, lock)
data = json.loads(bytes(response.body))
assert data["queue_length"] == 0
assert data["items"] == []
assert data["is_processing_locked"] is False
"""
Extended tests for api_utils/routers/queue.py - Coverage completion.
Focus: Cover the last 2 uncovered lines (62-63).
Strategy: Test exception handling when accessing queue._queue fails.
"""
from unittest.mock import PropertyMock
@pytest.mark.asyncio
async def test_get_queue_status_queue_access_exception():
"""
Test scenario: Accessing queue._queue throws exception
Expected: Return empty list, no interruption (lines 62-63)
"""
# Create a mock queue that raises exception when _queue is accessed
mock_queue = MagicMock(spec=asyncio.Queue)
type(mock_queue)._queue = PropertyMock(side_effect=Exception("Queue access error"))
mock_lock = MagicMock()
mock_lock.locked.return_value = False
# Execute: Call get_queue_status
response = await get_queue_status(mock_queue, mock_lock)
# Verify: Return success response, but queue_items is empty (line 63 executed)
assert response.status_code == 200
import json
data = json.loads(bytes(response.body))
# Verify: queue_length is 0 (because queue_items = [])
assert data["queue_length"] == 0
assert data["items"] == []
assert data["is_processing_locked"] is False