AIstudioProxyAPI / api_utils /queue_worker.py
hins111's picture
Upload 9 files
469e046 verified
"""
队列工作器模块
处理请求队列中的任务
"""
import asyncio
import time
from fastapi import HTTPException
async def queue_worker():
"""队列工作器,处理请求队列中的任务"""
# 导入全局变量
from server import (
logger, request_queue, processing_lock, model_switching_lock,
params_cache_lock
)
logger.info("--- 队列 Worker 已启动 ---")
# 检查并初始化全局变量
if request_queue is None:
logger.info("初始化 request_queue...")
from asyncio import Queue
request_queue = Queue()
if processing_lock is None:
logger.info("初始化 processing_lock...")
from asyncio import Lock
processing_lock = Lock()
if model_switching_lock is None:
logger.info("初始化 model_switching_lock...")
from asyncio import Lock
model_switching_lock = Lock()
if params_cache_lock is None:
logger.info("初始化 params_cache_lock...")
from asyncio import Lock
params_cache_lock = Lock()
was_last_request_streaming = False
last_request_completion_time = 0
while True:
request_item = None
result_future = None
req_id = "UNKNOWN"
completion_event = None
try:
# 检查队列中的项目,清理已断开连接的请求
queue_size = request_queue.qsize()
if queue_size > 0:
checked_count = 0
items_to_requeue = []
processed_ids = set()
while checked_count < queue_size and checked_count < 10:
try:
item = request_queue.get_nowait()
item_req_id = item.get("req_id", "unknown")
if item_req_id in processed_ids:
items_to_requeue.append(item)
continue
processed_ids.add(item_req_id)
if not item.get("cancelled", False):
item_http_request = item.get("http_request")
if item_http_request:
try:
if await item_http_request.is_disconnected():
logger.info(f"[{item_req_id}] (Worker Queue Check) 检测到客户端已断开,标记为取消。")
item["cancelled"] = True
item_future = item.get("result_future")
if item_future and not item_future.done():
item_future.set_exception(HTTPException(status_code=499, detail=f"[{item_req_id}] Client disconnected while queued."))
except Exception as check_err:
logger.error(f"[{item_req_id}] (Worker Queue Check) Error checking disconnect: {check_err}")
items_to_requeue.append(item)
checked_count += 1
except asyncio.QueueEmpty:
break
for item in items_to_requeue:
await request_queue.put(item)
# 获取下一个请求
try:
request_item = await asyncio.wait_for(request_queue.get(), timeout=5.0)
except asyncio.TimeoutError:
# 如果5秒内没有新请求,继续循环检查
continue
req_id = request_item["req_id"]
request_data = request_item["request_data"]
http_request = request_item["http_request"]
result_future = request_item["result_future"]
if request_item.get("cancelled", False):
logger.info(f"[{req_id}] (Worker) 请求已取消,跳过。")
if not result_future.done():
result_future.set_exception(HTTPException(status_code=499, detail=f"[{req_id}] 请求已被用户取消"))
request_queue.task_done()
continue
is_streaming_request = request_data.stream
logger.info(f"[{req_id}] (Worker) 取出请求。模式: {'流式' if is_streaming_request else '非流式'}")
# 流式请求间隔控制
current_time = time.time()
if was_last_request_streaming and is_streaming_request and (current_time - last_request_completion_time < 1.0):
delay_time = max(0.5, 1.0 - (current_time - last_request_completion_time))
logger.info(f"[{req_id}] (Worker) 连续流式请求,添加 {delay_time:.2f}s 延迟...")
await asyncio.sleep(delay_time)
if await http_request.is_disconnected():
logger.info(f"[{req_id}] (Worker) 客户端在等待锁时断开。取消。")
if not result_future.done():
result_future.set_exception(HTTPException(status_code=499, detail=f"[{req_id}] 客户端关闭了请求"))
request_queue.task_done()
continue
logger.info(f"[{req_id}] (Worker) 等待处理锁...")
async with processing_lock:
logger.info(f"[{req_id}] (Worker) 已获取处理锁。开始核心处理...")
if await http_request.is_disconnected():
logger.info(f"[{req_id}] (Worker) 客户端在获取锁后断开。取消。")
if not result_future.done():
result_future.set_exception(HTTPException(status_code=499, detail=f"[{req_id}] 客户端关闭了请求"))
elif result_future.done():
logger.info(f"[{req_id}] (Worker) Future 在处理前已完成/取消。跳过。")
else:
# 调用实际的请求处理函数
try:
from api_utils import _process_request_refactored
returned_value = await _process_request_refactored(
req_id, request_data, http_request, result_future
)
completion_event, submit_btn_loc, client_disco_checker = None, None, None
current_request_was_streaming = False
if isinstance(returned_value, tuple) and len(returned_value) == 3:
completion_event, submit_btn_loc, client_disco_checker = returned_value
if completion_event is not None:
current_request_was_streaming = True
logger.info(f"[{req_id}] (Worker) _process_request_refactored returned stream info (event, locator, checker).")
else:
current_request_was_streaming = False
logger.info(f"[{req_id}] (Worker) _process_request_refactored returned a tuple, but completion_event is None (likely non-stream or early exit).")
elif returned_value is None:
current_request_was_streaming = False
logger.info(f"[{req_id}] (Worker) _process_request_refactored returned non-stream completion (None).")
else:
current_request_was_streaming = False
logger.warning(f"[{req_id}] (Worker) _process_request_refactored returned unexpected type: {type(returned_value)}")
# 关键修复:在锁内等待流式完成(与原始参考文件一致)
if completion_event:
logger.info(f"[{req_id}] (Worker) 等待流式生成器完成信号...")
try:
from server import RESPONSE_COMPLETION_TIMEOUT
await asyncio.wait_for(completion_event.wait(), timeout=RESPONSE_COMPLETION_TIMEOUT/1000 + 60)
logger.info(f"[{req_id}] (Worker) ✅ 流式生成器完成信号收到。")
# 等待发送按钮禁用确认流式响应完全结束
if submit_btn_loc and client_disco_checker:
logger.info(f"[{req_id}] (Worker) 流式响应完成,检查并处理发送按钮状态...")
wait_timeout_ms = 30000 # 30 seconds
try:
from playwright.async_api import expect as expect_async
from api_utils.request_processor import ClientDisconnectedError
# 检查客户端连接状态
client_disco_checker("流式响应后按钮状态检查 - 前置检查: ")
await asyncio.sleep(0.5) # 给UI一点时间更新
# 检查按钮是否仍然启用,如果启用则直接点击停止
logger.info(f"[{req_id}] (Worker) 检查发送按钮状态...")
try:
is_button_enabled = await submit_btn_loc.is_enabled(timeout=2000)
logger.info(f"[{req_id}] (Worker) 发送按钮启用状态: {is_button_enabled}")
if is_button_enabled:
# 流式响应完成后按钮仍启用,直接点击停止
logger.info(f"[{req_id}] (Worker) 流式响应完成但按钮仍启用,主动点击按钮停止生成...")
await submit_btn_loc.click(timeout=5000, force=True)
logger.info(f"[{req_id}] (Worker) ✅ 发送按钮点击完成。")
else:
logger.info(f"[{req_id}] (Worker) 发送按钮已禁用,无需点击。")
except Exception as button_check_err:
logger.warning(f"[{req_id}] (Worker) 检查按钮状态失败: {button_check_err}")
# 等待按钮最终禁用
logger.info(f"[{req_id}] (Worker) 等待发送按钮最终禁用...")
await expect_async(submit_btn_loc).to_be_disabled(timeout=wait_timeout_ms)
logger.info(f"[{req_id}] ✅ 发送按钮已禁用。")
except Exception as e_pw_disabled:
logger.warning(f"[{req_id}] ⚠️ 流式响应后按钮状态处理超时或错误: {e_pw_disabled}")
from api_utils.request_processor import save_error_snapshot
await save_error_snapshot(f"stream_post_submit_button_handling_timeout_{req_id}")
except ClientDisconnectedError:
logger.info(f"[{req_id}] 客户端在流式响应后按钮状态处理时断开连接。")
elif current_request_was_streaming:
logger.warning(f"[{req_id}] (Worker) 流式请求但 submit_btn_loc 或 client_disco_checker 未提供。跳过按钮禁用等待。")
except asyncio.TimeoutError:
logger.warning(f"[{req_id}] (Worker) ⚠️ 等待流式生成器完成信号超时。")
if not result_future.done():
result_future.set_exception(HTTPException(status_code=504, detail=f"[{req_id}] Stream generation timed out waiting for completion signal."))
except Exception as ev_wait_err:
logger.error(f"[{req_id}] (Worker) ❌ 等待流式完成事件时出错: {ev_wait_err}")
if not result_future.done():
result_future.set_exception(HTTPException(status_code=500, detail=f"[{req_id}] Error waiting for stream completion: {ev_wait_err}"))
except Exception as process_err:
logger.error(f"[{req_id}] (Worker) _process_request_refactored execution error: {process_err}")
if not result_future.done():
result_future.set_exception(HTTPException(status_code=500, detail=f"[{req_id}] Request processing error: {process_err}"))
logger.info(f"[{req_id}] (Worker) 释放处理锁。")
# 在释放处理锁后立即执行清空操作
try:
# 清空流式队列缓存
from api_utils import clear_stream_queue
await clear_stream_queue()
# 清空聊天历史(对于所有模式:流式和非流式)
if submit_btn_loc and client_disco_checker:
from server import page_instance, is_page_ready
if page_instance and is_page_ready:
from browser_utils.page_controller import PageController
page_controller = PageController(page_instance, logger, req_id)
logger.info(f"[{req_id}] (Worker) 执行聊天历史清空({'流式' if completion_event else '非流式'}模式)...")
await page_controller.clear_chat_history(client_disco_checker)
logger.info(f"[{req_id}] (Worker) ✅ 聊天历史清空完成。")
else:
logger.info(f"[{req_id}] (Worker) 跳过聊天历史清空:缺少必要参数(submit_btn_loc: {bool(submit_btn_loc)}, client_disco_checker: {bool(client_disco_checker)})")
except Exception as clear_err:
logger.error(f"[{req_id}] (Worker) 清空操作时发生错误: {clear_err}", exc_info=True)
was_last_request_streaming = is_streaming_request
last_request_completion_time = time.time()
except asyncio.CancelledError:
logger.info("--- 队列 Worker 被取消 ---")
if result_future and not result_future.done():
result_future.cancel("Worker cancelled")
break
except Exception as e:
logger.error(f"[{req_id}] (Worker) ❌ 处理请求时发生意外错误: {e}", exc_info=True)
if result_future and not result_future.done():
result_future.set_exception(HTTPException(status_code=500, detail=f"[{req_id}] 服务器内部错误: {e}"))
finally:
if request_item:
request_queue.task_done()
logger.info("--- 队列 Worker 已停止 ---")