Spaces:
Paused
Paused
File size: 15,819 Bytes
469e046 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
"""
队列工作器模块
处理请求队列中的任务
"""
import asyncio
import time
from fastapi import HTTPException
async def queue_worker():
"""队列工作器,处理请求队列中的任务"""
# 导入全局变量
from server import (
logger, request_queue, processing_lock, model_switching_lock,
params_cache_lock
)
logger.info("--- 队列 Worker 已启动 ---")
# 检查并初始化全局变量
if request_queue is None:
logger.info("初始化 request_queue...")
from asyncio import Queue
request_queue = Queue()
if processing_lock is None:
logger.info("初始化 processing_lock...")
from asyncio import Lock
processing_lock = Lock()
if model_switching_lock is None:
logger.info("初始化 model_switching_lock...")
from asyncio import Lock
model_switching_lock = Lock()
if params_cache_lock is None:
logger.info("初始化 params_cache_lock...")
from asyncio import Lock
params_cache_lock = Lock()
was_last_request_streaming = False
last_request_completion_time = 0
while True:
request_item = None
result_future = None
req_id = "UNKNOWN"
completion_event = None
try:
# 检查队列中的项目,清理已断开连接的请求
queue_size = request_queue.qsize()
if queue_size > 0:
checked_count = 0
items_to_requeue = []
processed_ids = set()
while checked_count < queue_size and checked_count < 10:
try:
item = request_queue.get_nowait()
item_req_id = item.get("req_id", "unknown")
if item_req_id in processed_ids:
items_to_requeue.append(item)
continue
processed_ids.add(item_req_id)
if not item.get("cancelled", False):
item_http_request = item.get("http_request")
if item_http_request:
try:
if await item_http_request.is_disconnected():
logger.info(f"[{item_req_id}] (Worker Queue Check) 检测到客户端已断开,标记为取消。")
item["cancelled"] = True
item_future = item.get("result_future")
if item_future and not item_future.done():
item_future.set_exception(HTTPException(status_code=499, detail=f"[{item_req_id}] Client disconnected while queued."))
except Exception as check_err:
logger.error(f"[{item_req_id}] (Worker Queue Check) Error checking disconnect: {check_err}")
items_to_requeue.append(item)
checked_count += 1
except asyncio.QueueEmpty:
break
for item in items_to_requeue:
await request_queue.put(item)
# 获取下一个请求
try:
request_item = await asyncio.wait_for(request_queue.get(), timeout=5.0)
except asyncio.TimeoutError:
# 如果5秒内没有新请求,继续循环检查
continue
req_id = request_item["req_id"]
request_data = request_item["request_data"]
http_request = request_item["http_request"]
result_future = request_item["result_future"]
if request_item.get("cancelled", False):
logger.info(f"[{req_id}] (Worker) 请求已取消,跳过。")
if not result_future.done():
result_future.set_exception(HTTPException(status_code=499, detail=f"[{req_id}] 请求已被用户取消"))
request_queue.task_done()
continue
is_streaming_request = request_data.stream
logger.info(f"[{req_id}] (Worker) 取出请求。模式: {'流式' if is_streaming_request else '非流式'}")
# 流式请求间隔控制
current_time = time.time()
if was_last_request_streaming and is_streaming_request and (current_time - last_request_completion_time < 1.0):
delay_time = max(0.5, 1.0 - (current_time - last_request_completion_time))
logger.info(f"[{req_id}] (Worker) 连续流式请求,添加 {delay_time:.2f}s 延迟...")
await asyncio.sleep(delay_time)
if await http_request.is_disconnected():
logger.info(f"[{req_id}] (Worker) 客户端在等待锁时断开。取消。")
if not result_future.done():
result_future.set_exception(HTTPException(status_code=499, detail=f"[{req_id}] 客户端关闭了请求"))
request_queue.task_done()
continue
logger.info(f"[{req_id}] (Worker) 等待处理锁...")
async with processing_lock:
logger.info(f"[{req_id}] (Worker) 已获取处理锁。开始核心处理...")
if await http_request.is_disconnected():
logger.info(f"[{req_id}] (Worker) 客户端在获取锁后断开。取消。")
if not result_future.done():
result_future.set_exception(HTTPException(status_code=499, detail=f"[{req_id}] 客户端关闭了请求"))
elif result_future.done():
logger.info(f"[{req_id}] (Worker) Future 在处理前已完成/取消。跳过。")
else:
# 调用实际的请求处理函数
try:
from api_utils import _process_request_refactored
returned_value = await _process_request_refactored(
req_id, request_data, http_request, result_future
)
completion_event, submit_btn_loc, client_disco_checker = None, None, None
current_request_was_streaming = False
if isinstance(returned_value, tuple) and len(returned_value) == 3:
completion_event, submit_btn_loc, client_disco_checker = returned_value
if completion_event is not None:
current_request_was_streaming = True
logger.info(f"[{req_id}] (Worker) _process_request_refactored returned stream info (event, locator, checker).")
else:
current_request_was_streaming = False
logger.info(f"[{req_id}] (Worker) _process_request_refactored returned a tuple, but completion_event is None (likely non-stream or early exit).")
elif returned_value is None:
current_request_was_streaming = False
logger.info(f"[{req_id}] (Worker) _process_request_refactored returned non-stream completion (None).")
else:
current_request_was_streaming = False
logger.warning(f"[{req_id}] (Worker) _process_request_refactored returned unexpected type: {type(returned_value)}")
# 关键修复:在锁内等待流式完成(与原始参考文件一致)
if completion_event:
logger.info(f"[{req_id}] (Worker) 等待流式生成器完成信号...")
try:
from server import RESPONSE_COMPLETION_TIMEOUT
await asyncio.wait_for(completion_event.wait(), timeout=RESPONSE_COMPLETION_TIMEOUT/1000 + 60)
logger.info(f"[{req_id}] (Worker) ✅ 流式生成器完成信号收到。")
# 等待发送按钮禁用确认流式响应完全结束
if submit_btn_loc and client_disco_checker:
logger.info(f"[{req_id}] (Worker) 流式响应完成,检查并处理发送按钮状态...")
wait_timeout_ms = 30000 # 30 seconds
try:
from playwright.async_api import expect as expect_async
from api_utils.request_processor import ClientDisconnectedError
# 检查客户端连接状态
client_disco_checker("流式响应后按钮状态检查 - 前置检查: ")
await asyncio.sleep(0.5) # 给UI一点时间更新
# 检查按钮是否仍然启用,如果启用则直接点击停止
logger.info(f"[{req_id}] (Worker) 检查发送按钮状态...")
try:
is_button_enabled = await submit_btn_loc.is_enabled(timeout=2000)
logger.info(f"[{req_id}] (Worker) 发送按钮启用状态: {is_button_enabled}")
if is_button_enabled:
# 流式响应完成后按钮仍启用,直接点击停止
logger.info(f"[{req_id}] (Worker) 流式响应完成但按钮仍启用,主动点击按钮停止生成...")
await submit_btn_loc.click(timeout=5000, force=True)
logger.info(f"[{req_id}] (Worker) ✅ 发送按钮点击完成。")
else:
logger.info(f"[{req_id}] (Worker) 发送按钮已禁用,无需点击。")
except Exception as button_check_err:
logger.warning(f"[{req_id}] (Worker) 检查按钮状态失败: {button_check_err}")
# 等待按钮最终禁用
logger.info(f"[{req_id}] (Worker) 等待发送按钮最终禁用...")
await expect_async(submit_btn_loc).to_be_disabled(timeout=wait_timeout_ms)
logger.info(f"[{req_id}] ✅ 发送按钮已禁用。")
except Exception as e_pw_disabled:
logger.warning(f"[{req_id}] ⚠️ 流式响应后按钮状态处理超时或错误: {e_pw_disabled}")
from api_utils.request_processor import save_error_snapshot
await save_error_snapshot(f"stream_post_submit_button_handling_timeout_{req_id}")
except ClientDisconnectedError:
logger.info(f"[{req_id}] 客户端在流式响应后按钮状态处理时断开连接。")
elif current_request_was_streaming:
logger.warning(f"[{req_id}] (Worker) 流式请求但 submit_btn_loc 或 client_disco_checker 未提供。跳过按钮禁用等待。")
except asyncio.TimeoutError:
logger.warning(f"[{req_id}] (Worker) ⚠️ 等待流式生成器完成信号超时。")
if not result_future.done():
result_future.set_exception(HTTPException(status_code=504, detail=f"[{req_id}] Stream generation timed out waiting for completion signal."))
except Exception as ev_wait_err:
logger.error(f"[{req_id}] (Worker) ❌ 等待流式完成事件时出错: {ev_wait_err}")
if not result_future.done():
result_future.set_exception(HTTPException(status_code=500, detail=f"[{req_id}] Error waiting for stream completion: {ev_wait_err}"))
except Exception as process_err:
logger.error(f"[{req_id}] (Worker) _process_request_refactored execution error: {process_err}")
if not result_future.done():
result_future.set_exception(HTTPException(status_code=500, detail=f"[{req_id}] Request processing error: {process_err}"))
logger.info(f"[{req_id}] (Worker) 释放处理锁。")
# 在释放处理锁后立即执行清空操作
try:
# 清空流式队列缓存
from api_utils import clear_stream_queue
await clear_stream_queue()
# 清空聊天历史(对于所有模式:流式和非流式)
if submit_btn_loc and client_disco_checker:
from server import page_instance, is_page_ready
if page_instance and is_page_ready:
from browser_utils.page_controller import PageController
page_controller = PageController(page_instance, logger, req_id)
logger.info(f"[{req_id}] (Worker) 执行聊天历史清空({'流式' if completion_event else '非流式'}模式)...")
await page_controller.clear_chat_history(client_disco_checker)
logger.info(f"[{req_id}] (Worker) ✅ 聊天历史清空完成。")
else:
logger.info(f"[{req_id}] (Worker) 跳过聊天历史清空:缺少必要参数(submit_btn_loc: {bool(submit_btn_loc)}, client_disco_checker: {bool(client_disco_checker)})")
except Exception as clear_err:
logger.error(f"[{req_id}] (Worker) 清空操作时发生错误: {clear_err}", exc_info=True)
was_last_request_streaming = is_streaming_request
last_request_completion_time = time.time()
except asyncio.CancelledError:
logger.info("--- 队列 Worker 被取消 ---")
if result_future and not result_future.done():
result_future.cancel("Worker cancelled")
break
except Exception as e:
logger.error(f"[{req_id}] (Worker) ❌ 处理请求时发生意外错误: {e}", exc_info=True)
if result_future and not result_future.done():
result_future.set_exception(HTTPException(status_code=500, detail=f"[{req_id}] 服务器内部错误: {e}"))
finally:
if request_item:
request_queue.task_done()
logger.info("--- 队列 Worker 已停止 ---") |