Spaces:
Paused
Paused
| """ | |
| 队列工作器模块 | |
| 处理请求队列中的任务 | |
| """ | |
| import asyncio | |
| import time | |
| from fastapi import HTTPException | |
| async def queue_worker(): | |
| """队列工作器,处理请求队列中的任务""" | |
| # 导入全局变量 | |
| from server import ( | |
| logger, request_queue, processing_lock, model_switching_lock, | |
| params_cache_lock | |
| ) | |
| logger.info("--- 队列 Worker 已启动 ---") | |
| # 检查并初始化全局变量 | |
| if request_queue is None: | |
| logger.info("初始化 request_queue...") | |
| from asyncio import Queue | |
| request_queue = Queue() | |
| if processing_lock is None: | |
| logger.info("初始化 processing_lock...") | |
| from asyncio import Lock | |
| processing_lock = Lock() | |
| if model_switching_lock is None: | |
| logger.info("初始化 model_switching_lock...") | |
| from asyncio import Lock | |
| model_switching_lock = Lock() | |
| if params_cache_lock is None: | |
| logger.info("初始化 params_cache_lock...") | |
| from asyncio import Lock | |
| params_cache_lock = Lock() | |
| was_last_request_streaming = False | |
| last_request_completion_time = 0 | |
| while True: | |
| request_item = None | |
| result_future = None | |
| req_id = "UNKNOWN" | |
| completion_event = None | |
| try: | |
| # 检查队列中的项目,清理已断开连接的请求 | |
| queue_size = request_queue.qsize() | |
| if queue_size > 0: | |
| checked_count = 0 | |
| items_to_requeue = [] | |
| processed_ids = set() | |
| while checked_count < queue_size and checked_count < 10: | |
| try: | |
| item = request_queue.get_nowait() | |
| item_req_id = item.get("req_id", "unknown") | |
| if item_req_id in processed_ids: | |
| items_to_requeue.append(item) | |
| continue | |
| processed_ids.add(item_req_id) | |
| if not item.get("cancelled", False): | |
| item_http_request = item.get("http_request") | |
| if item_http_request: | |
| try: | |
| if await item_http_request.is_disconnected(): | |
| logger.info(f"[{item_req_id}] (Worker Queue Check) 检测到客户端已断开,标记为取消。") | |
| item["cancelled"] = True | |
| item_future = item.get("result_future") | |
| if item_future and not item_future.done(): | |
| item_future.set_exception(HTTPException(status_code=499, detail=f"[{item_req_id}] Client disconnected while queued.")) | |
| except Exception as check_err: | |
| logger.error(f"[{item_req_id}] (Worker Queue Check) Error checking disconnect: {check_err}") | |
| items_to_requeue.append(item) | |
| checked_count += 1 | |
| except asyncio.QueueEmpty: | |
| break | |
| for item in items_to_requeue: | |
| await request_queue.put(item) | |
| # 获取下一个请求 | |
| try: | |
| request_item = await asyncio.wait_for(request_queue.get(), timeout=5.0) | |
| except asyncio.TimeoutError: | |
| # 如果5秒内没有新请求,继续循环检查 | |
| continue | |
| req_id = request_item["req_id"] | |
| request_data = request_item["request_data"] | |
| http_request = request_item["http_request"] | |
| result_future = request_item["result_future"] | |
| if request_item.get("cancelled", False): | |
| logger.info(f"[{req_id}] (Worker) 请求已取消,跳过。") | |
| if not result_future.done(): | |
| result_future.set_exception(HTTPException(status_code=499, detail=f"[{req_id}] 请求已被用户取消")) | |
| request_queue.task_done() | |
| continue | |
| is_streaming_request = request_data.stream | |
| logger.info(f"[{req_id}] (Worker) 取出请求。模式: {'流式' if is_streaming_request else '非流式'}") | |
| # 流式请求间隔控制 | |
| current_time = time.time() | |
| if was_last_request_streaming and is_streaming_request and (current_time - last_request_completion_time < 1.0): | |
| delay_time = max(0.5, 1.0 - (current_time - last_request_completion_time)) | |
| logger.info(f"[{req_id}] (Worker) 连续流式请求,添加 {delay_time:.2f}s 延迟...") | |
| await asyncio.sleep(delay_time) | |
| if await http_request.is_disconnected(): | |
| logger.info(f"[{req_id}] (Worker) 客户端在等待锁时断开。取消。") | |
| if not result_future.done(): | |
| result_future.set_exception(HTTPException(status_code=499, detail=f"[{req_id}] 客户端关闭了请求")) | |
| request_queue.task_done() | |
| continue | |
| logger.info(f"[{req_id}] (Worker) 等待处理锁...") | |
| async with processing_lock: | |
| logger.info(f"[{req_id}] (Worker) 已获取处理锁。开始核心处理...") | |
| if await http_request.is_disconnected(): | |
| logger.info(f"[{req_id}] (Worker) 客户端在获取锁后断开。取消。") | |
| if not result_future.done(): | |
| result_future.set_exception(HTTPException(status_code=499, detail=f"[{req_id}] 客户端关闭了请求")) | |
| elif result_future.done(): | |
| logger.info(f"[{req_id}] (Worker) Future 在处理前已完成/取消。跳过。") | |
| else: | |
| # 调用实际的请求处理函数 | |
| try: | |
| from api_utils import _process_request_refactored | |
| returned_value = await _process_request_refactored( | |
| req_id, request_data, http_request, result_future | |
| ) | |
| completion_event, submit_btn_loc, client_disco_checker = None, None, None | |
| current_request_was_streaming = False | |
| if isinstance(returned_value, tuple) and len(returned_value) == 3: | |
| completion_event, submit_btn_loc, client_disco_checker = returned_value | |
| if completion_event is not None: | |
| current_request_was_streaming = True | |
| logger.info(f"[{req_id}] (Worker) _process_request_refactored returned stream info (event, locator, checker).") | |
| else: | |
| current_request_was_streaming = False | |
| logger.info(f"[{req_id}] (Worker) _process_request_refactored returned a tuple, but completion_event is None (likely non-stream or early exit).") | |
| elif returned_value is None: | |
| current_request_was_streaming = False | |
| logger.info(f"[{req_id}] (Worker) _process_request_refactored returned non-stream completion (None).") | |
| else: | |
| current_request_was_streaming = False | |
| logger.warning(f"[{req_id}] (Worker) _process_request_refactored returned unexpected type: {type(returned_value)}") | |
| # 关键修复:在锁内等待流式完成(与原始参考文件一致) | |
| if completion_event: | |
| logger.info(f"[{req_id}] (Worker) 等待流式生成器完成信号...") | |
| try: | |
| from server import RESPONSE_COMPLETION_TIMEOUT | |
| await asyncio.wait_for(completion_event.wait(), timeout=RESPONSE_COMPLETION_TIMEOUT/1000 + 60) | |
| logger.info(f"[{req_id}] (Worker) ✅ 流式生成器完成信号收到。") | |
| # 等待发送按钮禁用确认流式响应完全结束 | |
| if submit_btn_loc and client_disco_checker: | |
| logger.info(f"[{req_id}] (Worker) 流式响应完成,检查并处理发送按钮状态...") | |
| wait_timeout_ms = 30000 # 30 seconds | |
| try: | |
| from playwright.async_api import expect as expect_async | |
| from api_utils.request_processor import ClientDisconnectedError | |
| # 检查客户端连接状态 | |
| client_disco_checker("流式响应后按钮状态检查 - 前置检查: ") | |
| await asyncio.sleep(0.5) # 给UI一点时间更新 | |
| # 检查按钮是否仍然启用,如果启用则直接点击停止 | |
| logger.info(f"[{req_id}] (Worker) 检查发送按钮状态...") | |
| try: | |
| is_button_enabled = await submit_btn_loc.is_enabled(timeout=2000) | |
| logger.info(f"[{req_id}] (Worker) 发送按钮启用状态: {is_button_enabled}") | |
| if is_button_enabled: | |
| # 流式响应完成后按钮仍启用,直接点击停止 | |
| logger.info(f"[{req_id}] (Worker) 流式响应完成但按钮仍启用,主动点击按钮停止生成...") | |
| await submit_btn_loc.click(timeout=5000, force=True) | |
| logger.info(f"[{req_id}] (Worker) ✅ 发送按钮点击完成。") | |
| else: | |
| logger.info(f"[{req_id}] (Worker) 发送按钮已禁用,无需点击。") | |
| except Exception as button_check_err: | |
| logger.warning(f"[{req_id}] (Worker) 检查按钮状态失败: {button_check_err}") | |
| # 等待按钮最终禁用 | |
| logger.info(f"[{req_id}] (Worker) 等待发送按钮最终禁用...") | |
| await expect_async(submit_btn_loc).to_be_disabled(timeout=wait_timeout_ms) | |
| logger.info(f"[{req_id}] ✅ 发送按钮已禁用。") | |
| except Exception as e_pw_disabled: | |
| logger.warning(f"[{req_id}] ⚠️ 流式响应后按钮状态处理超时或错误: {e_pw_disabled}") | |
| from api_utils.request_processor import save_error_snapshot | |
| await save_error_snapshot(f"stream_post_submit_button_handling_timeout_{req_id}") | |
| except ClientDisconnectedError: | |
| logger.info(f"[{req_id}] 客户端在流式响应后按钮状态处理时断开连接。") | |
| elif current_request_was_streaming: | |
| logger.warning(f"[{req_id}] (Worker) 流式请求但 submit_btn_loc 或 client_disco_checker 未提供。跳过按钮禁用等待。") | |
| except asyncio.TimeoutError: | |
| logger.warning(f"[{req_id}] (Worker) ⚠️ 等待流式生成器完成信号超时。") | |
| if not result_future.done(): | |
| result_future.set_exception(HTTPException(status_code=504, detail=f"[{req_id}] Stream generation timed out waiting for completion signal.")) | |
| except Exception as ev_wait_err: | |
| logger.error(f"[{req_id}] (Worker) ❌ 等待流式完成事件时出错: {ev_wait_err}") | |
| if not result_future.done(): | |
| result_future.set_exception(HTTPException(status_code=500, detail=f"[{req_id}] Error waiting for stream completion: {ev_wait_err}")) | |
| except Exception as process_err: | |
| logger.error(f"[{req_id}] (Worker) _process_request_refactored execution error: {process_err}") | |
| if not result_future.done(): | |
| result_future.set_exception(HTTPException(status_code=500, detail=f"[{req_id}] Request processing error: {process_err}")) | |
| logger.info(f"[{req_id}] (Worker) 释放处理锁。") | |
| # 在释放处理锁后立即执行清空操作 | |
| try: | |
| # 清空流式队列缓存 | |
| from api_utils import clear_stream_queue | |
| await clear_stream_queue() | |
| # 清空聊天历史(对于所有模式:流式和非流式) | |
| if submit_btn_loc and client_disco_checker: | |
| from server import page_instance, is_page_ready | |
| if page_instance and is_page_ready: | |
| from browser_utils.page_controller import PageController | |
| page_controller = PageController(page_instance, logger, req_id) | |
| logger.info(f"[{req_id}] (Worker) 执行聊天历史清空({'流式' if completion_event else '非流式'}模式)...") | |
| await page_controller.clear_chat_history(client_disco_checker) | |
| logger.info(f"[{req_id}] (Worker) ✅ 聊天历史清空完成。") | |
| else: | |
| logger.info(f"[{req_id}] (Worker) 跳过聊天历史清空:缺少必要参数(submit_btn_loc: {bool(submit_btn_loc)}, client_disco_checker: {bool(client_disco_checker)})") | |
| except Exception as clear_err: | |
| logger.error(f"[{req_id}] (Worker) 清空操作时发生错误: {clear_err}", exc_info=True) | |
| was_last_request_streaming = is_streaming_request | |
| last_request_completion_time = time.time() | |
| except asyncio.CancelledError: | |
| logger.info("--- 队列 Worker 被取消 ---") | |
| if result_future and not result_future.done(): | |
| result_future.cancel("Worker cancelled") | |
| break | |
| except Exception as e: | |
| logger.error(f"[{req_id}] (Worker) ❌ 处理请求时发生意外错误: {e}", exc_info=True) | |
| if result_future and not result_future.done(): | |
| result_future.set_exception(HTTPException(status_code=500, detail=f"[{req_id}] 服务器内部错误: {e}")) | |
| finally: | |
| if request_item: | |
| request_queue.task_done() | |
| logger.info("--- 队列 Worker 已停止 ---") |