Spaces:
Paused
Paused
File size: 3,136 Bytes
a5784e9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 | import logging
import time
from asyncio import Lock, Queue
from fastapi import Depends
from fastapi.responses import JSONResponse
from logging_utils import set_request_id
from ..dependencies import get_logger, get_processing_lock, get_request_queue
from ..error_utils import client_cancelled
async def cancel_queued_request(
req_id: str, request_queue: Queue, logger: logging.Logger
) -> bool:
set_request_id(req_id)
items_to_requeue = []
found = False
try:
while not request_queue.empty():
item = request_queue.get_nowait()
if item.get("req_id") == req_id:
logger.info("Found request in queue, marking as cancelled.")
item["cancelled"] = True
if (future := item.get("result_future")) and not future.done():
future.set_exception(client_cancelled(req_id))
found = True
items_to_requeue.append(item)
finally:
for item in items_to_requeue:
await request_queue.put(item)
return found
async def cancel_request(
req_id: str,
logger: logging.Logger = Depends(get_logger),
request_queue: Queue = Depends(get_request_queue),
):
set_request_id(req_id)
logger.info("Received cancellation request.")
if await cancel_queued_request(req_id, request_queue, logger):
return JSONResponse(
content={
"success": True,
"message": f"Request {req_id} marked as cancelled.",
}
)
else:
return JSONResponse(
status_code=404,
content={
"success": False,
"message": f"Request {req_id} not found in queue.",
},
)
async def get_queue_status(
request_queue: Queue = Depends(get_request_queue),
processing_lock: Lock = Depends(get_processing_lock),
):
# Extract all items temporarily to inspect queue contents
queue_items = []
try:
while not request_queue.empty():
item = request_queue.get_nowait()
queue_items.append(item)
except Exception:
pass
finally:
# Put all items back in original order
for item in queue_items:
await request_queue.put(item)
queue_length = len(queue_items)
return JSONResponse(
content={
"queue_length": queue_length,
"is_processing_locked": processing_lock.locked(),
"items": sorted(
[
{
"req_id": item.get("req_id", "unknown"),
"enqueue_time": item.get("enqueue_time", 0),
"wait_time_seconds": round(
time.time() - item.get("enqueue_time", 0), 2
),
"is_streaming": item.get("request_data").stream,
"cancelled": item.get("cancelled", False),
}
for item in queue_items
],
key=lambda x: x.get("enqueue_time", 0),
),
}
)
|