import logging import time from asyncio import Lock, Queue from fastapi import Depends from fastapi.responses import JSONResponse from logging_utils import set_request_id from ..dependencies import get_logger, get_processing_lock, get_request_queue from ..error_utils import client_cancelled async def cancel_queued_request( req_id: str, request_queue: Queue, logger: logging.Logger ) -> bool: set_request_id(req_id) items_to_requeue = [] found = False try: while not request_queue.empty(): item = request_queue.get_nowait() if item.get("req_id") == req_id: logger.info("Found request in queue, marking as cancelled.") item["cancelled"] = True if (future := item.get("result_future")) and not future.done(): future.set_exception(client_cancelled(req_id)) found = True items_to_requeue.append(item) finally: for item in items_to_requeue: await request_queue.put(item) return found async def cancel_request( req_id: str, logger: logging.Logger = Depends(get_logger), request_queue: Queue = Depends(get_request_queue), ): set_request_id(req_id) logger.info("Received cancellation request.") if await cancel_queued_request(req_id, request_queue, logger): return JSONResponse( content={ "success": True, "message": f"Request {req_id} marked as cancelled.", } ) else: return JSONResponse( status_code=404, content={ "success": False, "message": f"Request {req_id} not found in queue.", }, ) async def get_queue_status( request_queue: Queue = Depends(get_request_queue), processing_lock: Lock = Depends(get_processing_lock), ): # Extract all items temporarily to inspect queue contents queue_items = [] try: while not request_queue.empty(): item = request_queue.get_nowait() queue_items.append(item) except Exception: pass finally: # Put all items back in original order for item in queue_items: await request_queue.put(item) queue_length = len(queue_items) return JSONResponse( content={ "queue_length": queue_length, "is_processing_locked": processing_lock.locked(), "items": sorted( [ { "req_id": item.get("req_id", "unknown"), "enqueue_time": item.get("enqueue_time", 0), "wait_time_seconds": round( time.time() - item.get("enqueue_time", 0), 2 ), "is_streaming": item.get("request_data").stream, "cancelled": item.get("cancelled", False), } for item in queue_items ], key=lambda x: x.get("enqueue_time", 0), ), } )