AIstudioProxyAPI / browser_utils /page_controller.py
hins111's picture
Upload 7 files
ac029f2 verified
"""
PageController模块
封装了所有与Playwright页面直接交互的复杂逻辑。
"""
import asyncio
from typing import Callable, List, Dict, Any
from playwright.async_api import Page as AsyncPage, expect as expect_async, TimeoutError
from config import (
TEMPERATURE_INPUT_SELECTOR, MAX_OUTPUT_TOKENS_SELECTOR, STOP_SEQUENCE_INPUT_SELECTOR,
MAT_CHIP_REMOVE_BUTTON_SELECTOR, TOP_P_INPUT_SELECTOR, SUBMIT_BUTTON_SELECTOR,
CLEAR_CHAT_BUTTON_SELECTOR, CLEAR_CHAT_CONFIRM_BUTTON_SELECTOR, OVERLAY_SELECTOR,
PROMPT_TEXTAREA_SELECTOR, RESPONSE_CONTAINER_SELECTOR, RESPONSE_TEXT_SELECTOR,
EDIT_MESSAGE_BUTTON_SELECTOR
)
from config import (
CLICK_TIMEOUT_MS, WAIT_FOR_ELEMENT_TIMEOUT_MS, CLEAR_CHAT_VERIFY_TIMEOUT_MS,
DEFAULT_TEMPERATURE, DEFAULT_MAX_OUTPUT_TOKENS, DEFAULT_STOP_SEQUENCES, DEFAULT_TOP_P
)
from models import ClientDisconnectedError
from .operations import save_error_snapshot, _wait_for_response_completion, _get_final_response_content
class PageController:
"""封装了与AI Studio页面交互的所有操作。"""
def __init__(self, page: AsyncPage, logger, req_id: str):
self.page = page
self.logger = logger
self.req_id = req_id
async def _check_disconnect(self, check_client_disconnected: Callable, stage: str):
"""检查客户端是否断开连接。"""
if check_client_disconnected(stage):
raise ClientDisconnectedError(f"[{self.req_id}] Client disconnected at stage: {stage}")
async def adjust_parameters(self, request_params: Dict[str, Any], page_params_cache: Dict[str, Any], params_cache_lock: asyncio.Lock, model_id_to_use: str, parsed_model_list: List[Dict[str, Any]], check_client_disconnected: Callable):
"""调整所有请求参数。"""
self.logger.info(f"[{self.req_id}] 开始调整所有请求参数...")
await self._check_disconnect(check_client_disconnected, "Start Parameter Adjustment")
# 调整温度
temp_to_set = request_params.get('temperature', DEFAULT_TEMPERATURE)
await self._adjust_temperature(temp_to_set, page_params_cache, params_cache_lock, check_client_disconnected)
await self._check_disconnect(check_client_disconnected, "After Temperature Adjustment")
# 调整最大Token
max_tokens_to_set = request_params.get('max_output_tokens', DEFAULT_MAX_OUTPUT_TOKENS)
await self._adjust_max_tokens(max_tokens_to_set, page_params_cache, params_cache_lock, model_id_to_use, parsed_model_list, check_client_disconnected)
await self._check_disconnect(check_client_disconnected, "After Max Tokens Adjustment")
# 调整停止序列
stop_to_set = request_params.get('stop', DEFAULT_STOP_SEQUENCES)
await self._adjust_stop_sequences(stop_to_set, page_params_cache, params_cache_lock, check_client_disconnected)
await self._check_disconnect(check_client_disconnected, "After Stop Sequences Adjustment")
# 调整Top P
top_p_to_set = request_params.get('top_p', DEFAULT_TOP_P)
await self._adjust_top_p(top_p_to_set, check_client_disconnected)
await self._check_disconnect(check_client_disconnected, "End Parameter Adjustment")
async def _adjust_temperature(self, temperature: float, page_params_cache: dict, params_cache_lock: asyncio.Lock, check_client_disconnected: Callable):
"""调整温度参数。"""
async with params_cache_lock:
self.logger.info(f"[{self.req_id}] 检查并调整温度设置...")
clamped_temp = max(0.0, min(2.0, temperature))
if clamped_temp != temperature:
self.logger.warning(f"[{self.req_id}] 请求的温度 {temperature} 超出范围 [0, 2],已调整为 {clamped_temp}")
cached_temp = page_params_cache.get("temperature")
if cached_temp is not None and abs(cached_temp - clamped_temp) < 0.001:
self.logger.info(f"[{self.req_id}] 温度 ({clamped_temp}) 与缓存值 ({cached_temp}) 一致。跳过页面交互。")
return
self.logger.info(f"[{self.req_id}] 请求温度 ({clamped_temp}) 与缓存值 ({cached_temp}) 不一致或缓存中无值。需要与页面交互。")
temp_input_locator = self.page.locator(TEMPERATURE_INPUT_SELECTOR)
try:
await expect_async(temp_input_locator).to_be_visible(timeout=5000)
await self._check_disconnect(check_client_disconnected, "温度调整 - 输入框可见后")
current_temp_str = await temp_input_locator.input_value(timeout=3000)
await self._check_disconnect(check_client_disconnected, "温度调整 - 读取输入框值后")
current_temp_float = float(current_temp_str)
self.logger.info(f"[{self.req_id}] 页面当前温度: {current_temp_float}, 请求调整后温度: {clamped_temp}")
if abs(current_temp_float - clamped_temp) < 0.001:
self.logger.info(f"[{self.req_id}] 页面当前温度 ({current_temp_float}) 与请求温度 ({clamped_temp}) 一致。更新缓存并跳过写入。")
page_params_cache["temperature"] = current_temp_float
else:
self.logger.info(f"[{self.req_id}] 页面温度 ({current_temp_float}) 与请求温度 ({clamped_temp}) 不同,正在更新...")
await temp_input_locator.fill(str(clamped_temp), timeout=5000)
await self._check_disconnect(check_client_disconnected, "温度调整 - 填充输入框后")
await asyncio.sleep(0.1)
new_temp_str = await temp_input_locator.input_value(timeout=3000)
new_temp_float = float(new_temp_str)
if abs(new_temp_float - clamped_temp) < 0.001:
self.logger.info(f"[{self.req_id}] ✅ 温度已成功更新为: {new_temp_float}。更新缓存。")
page_params_cache["temperature"] = new_temp_float
else:
self.logger.warning(f"[{self.req_id}] ⚠️ 温度更新后验证失败。页面显示: {new_temp_float}, 期望: {clamped_temp}。清除缓存中的温度。")
page_params_cache.pop("temperature", None)
await save_error_snapshot(f"temperature_verify_fail_{self.req_id}")
except ValueError as ve:
self.logger.error(f"[{self.req_id}] 转换温度值为浮点数时出错. 错误: {ve}。清除缓存中的温度。")
page_params_cache.pop("temperature", None)
await save_error_snapshot(f"temperature_value_error_{self.req_id}")
except Exception as pw_err:
self.logger.error(f"[{self.req_id}] ❌ 操作温度输入框时发生错误: {pw_err}。清除缓存中的温度。")
page_params_cache.pop("temperature", None)
await save_error_snapshot(f"temperature_playwright_error_{self.req_id}")
if isinstance(pw_err, ClientDisconnectedError):
raise
async def _adjust_max_tokens(self, max_tokens: int, page_params_cache: dict, params_cache_lock: asyncio.Lock, model_id_to_use: str, parsed_model_list: list, check_client_disconnected: Callable):
"""调整最大输出Token参数。"""
async with params_cache_lock:
self.logger.info(f"[{self.req_id}] 检查并调整最大输出 Token 设置...")
min_val_for_tokens = 1
max_val_for_tokens_from_model = 65536
if model_id_to_use and parsed_model_list:
current_model_data = next((m for m in parsed_model_list if m.get("id") == model_id_to_use), None)
if current_model_data and current_model_data.get("supported_max_output_tokens") is not None:
try:
supported_tokens = int(current_model_data["supported_max_output_tokens"])
if supported_tokens > 0:
max_val_for_tokens_from_model = supported_tokens
else:
self.logger.warning(f"[{self.req_id}] 模型 {model_id_to_use} supported_max_output_tokens 无效: {supported_tokens}")
except (ValueError, TypeError):
self.logger.warning(f"[{self.req_id}] 模型 {model_id_to_use} supported_max_output_tokens 解析失败")
clamped_max_tokens = max(min_val_for_tokens, min(max_val_for_tokens_from_model, max_tokens))
if clamped_max_tokens != max_tokens:
self.logger.warning(f"[{self.req_id}] 请求的最大输出 Tokens {max_tokens} 超出模型范围,已调整为 {clamped_max_tokens}")
cached_max_tokens = page_params_cache.get("max_output_tokens")
if cached_max_tokens is not None and cached_max_tokens == clamped_max_tokens:
self.logger.info(f"[{self.req_id}] 最大输出 Tokens ({clamped_max_tokens}) 与缓存值一致。跳过页面交互。")
return
max_tokens_input_locator = self.page.locator(MAX_OUTPUT_TOKENS_SELECTOR)
try:
await expect_async(max_tokens_input_locator).to_be_visible(timeout=5000)
await self._check_disconnect(check_client_disconnected, "最大输出Token调整 - 输入框可见后")
current_max_tokens_str = await max_tokens_input_locator.input_value(timeout=3000)
current_max_tokens_int = int(current_max_tokens_str)
if current_max_tokens_int == clamped_max_tokens:
self.logger.info(f"[{self.req_id}] 页面当前最大输出 Tokens ({current_max_tokens_int}) 与请求值 ({clamped_max_tokens}) 一致。更新缓存并跳过写入。")
page_params_cache["max_output_tokens"] = current_max_tokens_int
else:
self.logger.info(f"[{self.req_id}] 页面最大输出 Tokens ({current_max_tokens_int}) 与请求值 ({clamped_max_tokens}) 不同,正在更新...")
await max_tokens_input_locator.fill(str(clamped_max_tokens), timeout=5000)
await self._check_disconnect(check_client_disconnected, "最大输出Token调整 - 填充输入框后")
await asyncio.sleep(0.1)
new_max_tokens_str = await max_tokens_input_locator.input_value(timeout=3000)
new_max_tokens_int = int(new_max_tokens_str)
if new_max_tokens_int == clamped_max_tokens:
self.logger.info(f"[{self.req_id}] ✅ 最大输出 Tokens 已成功更新为: {new_max_tokens_int}")
page_params_cache["max_output_tokens"] = new_max_tokens_int
else:
self.logger.warning(f"[{self.req_id}] ⚠️ 最大输出 Tokens 更新后验证失败。页面显示: {new_max_tokens_int}, 期望: {clamped_max_tokens}。清除缓存。")
page_params_cache.pop("max_output_tokens", None)
await save_error_snapshot(f"max_tokens_verify_fail_{self.req_id}")
except (ValueError, TypeError) as ve:
self.logger.error(f"[{self.req_id}] 转换最大输出 Tokens 值时出错: {ve}。清除缓存。")
page_params_cache.pop("max_output_tokens", None)
await save_error_snapshot(f"max_tokens_value_error_{self.req_id}")
except Exception as e:
self.logger.error(f"[{self.req_id}] ❌ 调整最大输出 Tokens 时出错: {e}。清除缓存。")
page_params_cache.pop("max_output_tokens", None)
await save_error_snapshot(f"max_tokens_error_{self.req_id}")
if isinstance(e, ClientDisconnectedError):
raise
async def _adjust_stop_sequences(self, stop_sequences, page_params_cache: dict, params_cache_lock: asyncio.Lock, check_client_disconnected: Callable):
"""调整停止序列参数。"""
async with params_cache_lock:
self.logger.info(f"[{self.req_id}] 检查并设置停止序列...")
# 处理不同类型的stop_sequences输入
normalized_requested_stops = set()
if stop_sequences is not None:
if isinstance(stop_sequences, str):
# 单个字符串
if stop_sequences.strip():
normalized_requested_stops.add(stop_sequences.strip())
elif isinstance(stop_sequences, list):
# 字符串列表
for s in stop_sequences:
if isinstance(s, str) and s.strip():
normalized_requested_stops.add(s.strip())
cached_stops_set = page_params_cache.get("stop_sequences")
if cached_stops_set is not None and cached_stops_set == normalized_requested_stops:
self.logger.info(f"[{self.req_id}] 请求的停止序列与缓存值一致。跳过页面交互。")
return
stop_input_locator = self.page.locator(STOP_SEQUENCE_INPUT_SELECTOR)
remove_chip_buttons_locator = self.page.locator(MAT_CHIP_REMOVE_BUTTON_SELECTOR)
try:
# 清空已有的停止序列
initial_chip_count = await remove_chip_buttons_locator.count()
removed_count = 0
max_removals = initial_chip_count + 5
while await remove_chip_buttons_locator.count() > 0 and removed_count < max_removals:
await self._check_disconnect(check_client_disconnected, "停止序列清除 - 循环开始")
try:
await remove_chip_buttons_locator.first.click(timeout=2000)
removed_count += 1
await asyncio.sleep(0.15)
except Exception:
break
# 添加新的停止序列
if normalized_requested_stops:
await expect_async(stop_input_locator).to_be_visible(timeout=5000)
for seq in normalized_requested_stops:
await stop_input_locator.fill(seq, timeout=3000)
await stop_input_locator.press("Enter", timeout=3000)
await asyncio.sleep(0.2)
page_params_cache["stop_sequences"] = normalized_requested_stops
self.logger.info(f"[{self.req_id}] ✅ 停止序列已成功设置。缓存已更新。")
except Exception as e:
self.logger.error(f"[{self.req_id}] ❌ 设置停止序列时出错: {e}")
page_params_cache.pop("stop_sequences", None)
await save_error_snapshot(f"stop_sequence_error_{self.req_id}")
if isinstance(e, ClientDisconnectedError):
raise
async def _adjust_top_p(self, top_p: float, check_client_disconnected: Callable):
"""调整Top P参数。"""
self.logger.info(f"[{self.req_id}] 检查并调整 Top P 设置...")
clamped_top_p = max(0.0, min(1.0, top_p))
if abs(clamped_top_p - top_p) > 1e-9:
self.logger.warning(f"[{self.req_id}] 请求的 Top P {top_p} 超出范围 [0, 1],已调整为 {clamped_top_p}")
top_p_input_locator = self.page.locator(TOP_P_INPUT_SELECTOR)
try:
await expect_async(top_p_input_locator).to_be_visible(timeout=5000)
await self._check_disconnect(check_client_disconnected, "Top P 调整 - 输入框可见后")
current_top_p_str = await top_p_input_locator.input_value(timeout=3000)
current_top_p_float = float(current_top_p_str)
if abs(current_top_p_float - clamped_top_p) > 1e-9:
self.logger.info(f"[{self.req_id}] 页面 Top P ({current_top_p_float}) 与请求值 ({clamped_top_p}) 不同,正在更新...")
await top_p_input_locator.fill(str(clamped_top_p), timeout=5000)
await self._check_disconnect(check_client_disconnected, "Top P 调整 - 填充输入框后")
# 验证设置是否成功
await asyncio.sleep(0.1)
new_top_p_str = await top_p_input_locator.input_value(timeout=3000)
new_top_p_float = float(new_top_p_str)
if abs(new_top_p_float - clamped_top_p) <= 1e-9:
self.logger.info(f"[{self.req_id}] ✅ Top P 已成功更新为: {new_top_p_float}")
else:
self.logger.warning(f"[{self.req_id}] ⚠️ Top P 更新后验证失败。页面显示: {new_top_p_float}, 期望: {clamped_top_p}")
await save_error_snapshot(f"top_p_verify_fail_{self.req_id}")
else:
self.logger.info(f"[{self.req_id}] 页面 Top P ({current_top_p_float}) 与请求值 ({clamped_top_p}) 一致,无需更改")
except (ValueError, TypeError) as ve:
self.logger.error(f"[{self.req_id}] 转换 Top P 值时出错: {ve}")
await save_error_snapshot(f"top_p_value_error_{self.req_id}")
except Exception as e:
self.logger.error(f"[{self.req_id}] ❌ 调整 Top P 时出错: {e}")
await save_error_snapshot(f"top_p_error_{self.req_id}")
if isinstance(e, ClientDisconnectedError):
raise
async def clear_chat_history(self, check_client_disconnected: Callable):
"""清空聊天记录。"""
self.logger.info(f"[{self.req_id}] 开始清空聊天记录...")
await self._check_disconnect(check_client_disconnected, "Start Clear Chat")
try:
# 一般是使用流式代理时遇到,流式输出已结束,但页面上AI仍回复个不停,此时会锁住清空按钮,但页面仍是/new_chat,而跳过后续清空操作
# 导致后续请求无法发出而卡住,故先检查并点击发送按钮(此时是停止功能)
submit_button_locator = self.page.locator(SUBMIT_BUTTON_SELECTOR)
try:
self.logger.info(f"[{self.req_id}] 尝试检查发送按钮状态...")
# 使用较短的超时时间(1秒),避免长时间阻塞,因为这不是清空流程的常见步骤
await expect_async(submit_button_locator).to_be_enabled(timeout=1000)
self.logger.info(f"[{self.req_id}] 发送按钮可用,尝试点击并等待1秒...")
await submit_button_locator.click(timeout=CLICK_TIMEOUT_MS)
await asyncio.sleep(1.0)
self.logger.info(f"[{self.req_id}] 发送按钮点击并等待完成。")
except Exception as e_submit:
# 如果发送按钮不可用、超时或发生Playwright相关错误,记录日志并继续
self.logger.info(f"[{self.req_id}] 发送按钮不可用或检查/点击时发生Playwright错误。符合预期,继续检查清空按钮。")
clear_chat_button_locator = self.page.locator(CLEAR_CHAT_BUTTON_SELECTOR)
confirm_button_locator = self.page.locator(CLEAR_CHAT_CONFIRM_BUTTON_SELECTOR)
overlay_locator = self.page.locator(OVERLAY_SELECTOR)
can_attempt_clear = False
try:
await expect_async(clear_chat_button_locator).to_be_enabled(timeout=3000)
can_attempt_clear = True
self.logger.info(f"[{self.req_id}] \"清空聊天\"按钮可用,继续清空流程。")
except Exception as e_enable:
is_new_chat_url = '/prompts/new_chat' in self.page.url.rstrip('/')
if is_new_chat_url:
self.logger.info(f"[{self.req_id}] \"清空聊天\"按钮不可用 (预期,因为在 new_chat 页面)。跳过清空操作。")
else:
self.logger.warning(f"[{self.req_id}] 等待\"清空聊天\"按钮可用失败: {e_enable}。清空操作可能无法执行。")
await self._check_disconnect(check_client_disconnected, "清空聊天 - \"清空聊天\"按钮可用性检查后")
if can_attempt_clear:
await self._execute_chat_clear(clear_chat_button_locator, confirm_button_locator, overlay_locator, check_client_disconnected)
await self._verify_chat_cleared(check_client_disconnected)
except Exception as e_clear:
self.logger.error(f"[{self.req_id}] 清空聊天过程中发生错误: {e_clear}")
if not (isinstance(e_clear, ClientDisconnectedError) or (hasattr(e_clear, 'name') and 'Disconnect' in e_clear.name)):
await save_error_snapshot(f"clear_chat_error_{self.req_id}")
raise
async def _execute_chat_clear(self, clear_chat_button_locator, confirm_button_locator, overlay_locator, check_client_disconnected: Callable):
"""执行清空聊天操作"""
overlay_initially_visible = False
try:
if await overlay_locator.is_visible(timeout=1000):
overlay_initially_visible = True
self.logger.info(f"[{self.req_id}] 清空聊天确认遮罩层已可见。直接点击\"继续\"。")
except TimeoutError:
self.logger.info(f"[{self.req_id}] 清空聊天确认遮罩层初始不可见 (检查超时或未找到)。")
overlay_initially_visible = False
except Exception as e_vis_check:
self.logger.warning(f"[{self.req_id}] 检查遮罩层可见性时发生错误: {e_vis_check}。假定不可见。")
overlay_initially_visible = False
await self._check_disconnect(check_client_disconnected, "清空聊天 - 初始遮罩层检查后")
if overlay_initially_visible:
self.logger.info(f"[{self.req_id}] 点击\"继续\"按钮 (遮罩层已存在): {CLEAR_CHAT_CONFIRM_BUTTON_SELECTOR}")
await confirm_button_locator.click(timeout=CLICK_TIMEOUT_MS)
else:
self.logger.info(f"[{self.req_id}] 点击\"清空聊天\"按钮: {CLEAR_CHAT_BUTTON_SELECTOR}")
await clear_chat_button_locator.click(timeout=CLICK_TIMEOUT_MS)
await self._check_disconnect(check_client_disconnected, "清空聊天 - 点击\"清空聊天\"后")
try:
self.logger.info(f"[{self.req_id}] 等待清空聊天确认遮罩层出现: {OVERLAY_SELECTOR}")
await expect_async(overlay_locator).to_be_visible(timeout=WAIT_FOR_ELEMENT_TIMEOUT_MS)
self.logger.info(f"[{self.req_id}] 清空聊天确认遮罩层已出现。")
except TimeoutError:
error_msg = f"等待清空聊天确认遮罩层超时 (点击清空按钮后)。请求 ID: {self.req_id}"
self.logger.error(error_msg)
await save_error_snapshot(f"clear_chat_overlay_timeout_{self.req_id}")
raise Exception(error_msg)
await self._check_disconnect(check_client_disconnected, "清空聊天 - 遮罩层出现后")
self.logger.info(f"[{self.req_id}] 点击\"继续\"按钮 (在对话框中): {CLEAR_CHAT_CONFIRM_BUTTON_SELECTOR}")
await confirm_button_locator.click(timeout=CLICK_TIMEOUT_MS)
await self._check_disconnect(check_client_disconnected, "清空聊天 - 点击\"继续\"后")
# 等待对话框消失
max_retries_disappear = 3
for attempt_disappear in range(max_retries_disappear):
try:
self.logger.info(f"[{self.req_id}] 等待清空聊天确认按钮/对话框消失 (尝试 {attempt_disappear + 1}/{max_retries_disappear})...")
await expect_async(confirm_button_locator).to_be_hidden(timeout=CLEAR_CHAT_VERIFY_TIMEOUT_MS)
await expect_async(overlay_locator).to_be_hidden(timeout=1000)
self.logger.info(f"[{self.req_id}] ✅ 清空聊天确认对话框已成功消失。")
break
except TimeoutError:
self.logger.warning(f"[{self.req_id}] ⚠️ 等待清空聊天确认对话框消失超时 (尝试 {attempt_disappear + 1}/{max_retries_disappear})。")
if attempt_disappear < max_retries_disappear - 1:
await asyncio.sleep(1.0)
await self._check_disconnect(check_client_disconnected, f"清空聊天 - 重试消失检查 {attempt_disappear + 1} 前")
continue
else:
error_msg = f"达到最大重试次数。清空聊天确认对话框未消失。请求 ID: {self.req_id}"
self.logger.error(error_msg)
await save_error_snapshot(f"clear_chat_dialog_disappear_timeout_{self.req_id}")
raise Exception(error_msg)
except ClientDisconnectedError:
self.logger.info(f"[{self.req_id}] 客户端在等待清空确认对话框消失时断开连接。")
raise
except Exception as other_err:
self.logger.warning(f"[{self.req_id}] 等待清空确认对话框消失时发生其他错误: {other_err}")
if attempt_disappear < max_retries_disappear - 1:
await asyncio.sleep(1.0)
continue
else:
raise
await self._check_disconnect(check_client_disconnected, f"清空聊天 - 消失检查尝试 {attempt_disappear + 1} 后")
async def _verify_chat_cleared(self, check_client_disconnected: Callable):
"""验证聊天已清空"""
last_response_container = self.page.locator(RESPONSE_CONTAINER_SELECTOR).last
await asyncio.sleep(0.5)
await self._check_disconnect(check_client_disconnected, "After Clear Post-Delay")
try:
await expect_async(last_response_container).to_be_hidden(timeout=CLEAR_CHAT_VERIFY_TIMEOUT_MS - 500)
self.logger.info(f"[{self.req_id}] ✅ 聊天已成功清空 (验证通过 - 最后响应容器隐藏)。")
except Exception as verify_err:
self.logger.warning(f"[{self.req_id}] ⚠️ 警告: 清空聊天验证失败 (最后响应容器未隐藏): {verify_err}")
async def submit_prompt(self, prompt: str, check_client_disconnected: Callable):
"""提交提示到页面。"""
self.logger.info(f"[{self.req_id}] 填充并提交提示 ({len(prompt)} chars)...")
prompt_textarea_locator = self.page.locator(PROMPT_TEXTAREA_SELECTOR)
autosize_wrapper_locator = self.page.locator('ms-prompt-input-wrapper ms-autosize-textarea')
submit_button_locator = self.page.locator(SUBMIT_BUTTON_SELECTOR)
try:
await expect_async(prompt_textarea_locator).to_be_visible(timeout=5000)
await self._check_disconnect(check_client_disconnected, "After Input Visible")
# 使用 JavaScript 填充文本
await prompt_textarea_locator.evaluate(
'''
(element, text) => {
element.value = text;
element.dispatchEvent(new Event('input', { bubbles: true, cancelable: true }));
element.dispatchEvent(new Event('change', { bubbles: true, cancelable: true }));
}
''',
prompt
)
await autosize_wrapper_locator.evaluate('(element, text) => { element.setAttribute("data-value", text); }', prompt)
await self._check_disconnect(check_client_disconnected, "After Input Fill")
# 等待发送按钮启用
wait_timeout_ms_submit_enabled = 40000
try:
await self._check_disconnect(check_client_disconnected, "填充提示后等待发送按钮启用 - 前置检查")
await expect_async(submit_button_locator).to_be_enabled(timeout=wait_timeout_ms_submit_enabled)
self.logger.info(f"[{self.req_id}] ✅ 发送按钮已启用。")
except Exception as e_pw_enabled:
self.logger.error(f"[{self.req_id}] ❌ 等待发送按钮启用超时或错误: {e_pw_enabled}")
await save_error_snapshot(f"submit_button_enable_timeout_{self.req_id}")
raise
await self._check_disconnect(check_client_disconnected, "After Submit Button Enabled")
await asyncio.sleep(0.3)
# 尝试使用快捷键提交
submitted_successfully = await self._try_shortcut_submit(prompt_textarea_locator, check_client_disconnected)
# 如果快捷键失败,使用按钮点击
if not submitted_successfully:
self.logger.info(f"[{self.req_id}] 快捷键提交失败,尝试点击提交按钮...")
try:
await submit_button_locator.click(timeout=5000)
self.logger.info(f"[{self.req_id}] ✅ 提交按钮点击完成。")
except Exception as click_err:
self.logger.error(f"[{self.req_id}] ❌ 提交按钮点击失败: {click_err}")
await save_error_snapshot(f"submit_button_click_fail_{self.req_id}")
raise
await self._check_disconnect(check_client_disconnected, "After Submit")
except Exception as e_input_submit:
self.logger.error(f"[{self.req_id}] 输入和提交过程中发生错误: {e_input_submit}")
if not isinstance(e_input_submit, ClientDisconnectedError):
await save_error_snapshot(f"input_submit_error_{self.req_id}")
raise
async def _try_shortcut_submit(self, prompt_textarea_locator, check_client_disconnected: Callable) -> bool:
"""尝试使用快捷键提交"""
import os
try:
# 检测操作系统
host_os_from_launcher = os.environ.get('HOST_OS_FOR_SHORTCUT')
is_mac_determined = False
if host_os_from_launcher == "Darwin":
is_mac_determined = True
elif host_os_from_launcher in ["Windows", "Linux"]:
is_mac_determined = False
else:
# 使用浏览器检测
try:
user_agent_data_platform = await self.page.evaluate("() => navigator.userAgentData?.platform || ''")
except Exception:
user_agent_string = await self.page.evaluate("() => navigator.userAgent || ''")
user_agent_string_lower = user_agent_string.lower()
if "macintosh" in user_agent_string_lower or "mac os x" in user_agent_string_lower:
user_agent_data_platform = "macOS"
else:
user_agent_data_platform = "Other"
is_mac_determined = "mac" in user_agent_data_platform.lower()
shortcut_modifier = "Meta" if is_mac_determined else "Control"
shortcut_key = "Enter"
self.logger.info(f"[{self.req_id}] 使用快捷键: {shortcut_modifier}+{shortcut_key}")
await prompt_textarea_locator.focus(timeout=5000)
await self._check_disconnect(check_client_disconnected, "After Input Focus")
await asyncio.sleep(0.1)
# 记录提交前的输入框内容,用于验证
original_content = ""
try:
original_content = await prompt_textarea_locator.input_value(timeout=2000) or ""
except Exception:
# 如果无法获取原始内容,仍然尝试提交
pass
try:
await self.page.keyboard.press(f'{shortcut_modifier}+{shortcut_key}')
except Exception:
# 尝试分步按键
await self.page.keyboard.down(shortcut_modifier)
await asyncio.sleep(0.05)
await self.page.keyboard.press(shortcut_key)
await asyncio.sleep(0.05)
await self.page.keyboard.up(shortcut_modifier)
await self._check_disconnect(check_client_disconnected, "After Shortcut Press")
# 等待更长时间让提交完成
await asyncio.sleep(2.0)
# 多种方式验证提交是否成功
submission_success = False
try:
# 方法1: 检查原始输入框是否清空
current_content = await prompt_textarea_locator.input_value(timeout=2000) or ""
if original_content and not current_content.strip():
self.logger.info(f"[{self.req_id}] 验证方法1: 输入框已清空,快捷键提交成功")
submission_success = True
# 方法2: 检查提交按钮状态
if not submission_success:
submit_button_locator = self.page.locator(SUBMIT_BUTTON_SELECTOR)
try:
is_disabled = await submit_button_locator.is_disabled(timeout=2000)
if is_disabled:
self.logger.info(f"[{self.req_id}] 验证方法2: 提交按钮已禁用,快捷键提交成功")
submission_success = True
except Exception:
pass
# 方法3: 检查是否有响应容器出现
if not submission_success:
try:
response_container = self.page.locator(RESPONSE_CONTAINER_SELECTOR)
container_count = await response_container.count()
if container_count > 0:
# 检查最后一个容器是否是新的
last_container = response_container.last
if await last_container.is_visible(timeout=1000):
self.logger.info(f"[{self.req_id}] 验证方法3: 检测到响应容器,快捷键提交成功")
submission_success = True
except Exception:
pass
except Exception as verify_err:
self.logger.warning(f"[{self.req_id}] 快捷键提交验证过程出错: {verify_err}")
# 出错时假定提交成功,让后续流程继续
submission_success = True
if submission_success:
self.logger.info(f"[{self.req_id}] ✅ 快捷键提交成功")
return True
else:
self.logger.warning(f"[{self.req_id}] ⚠️ 快捷键提交验证失败")
return False
except Exception as shortcut_err:
self.logger.warning(f"[{self.req_id}] 快捷键提交失败: {shortcut_err}")
return False
async def get_response(self, check_client_disconnected: Callable) -> str:
"""获取响应内容。"""
self.logger.info(f"[{self.req_id}] 等待并获取响应...")
try:
# 等待响应容器出现
response_container_locator = self.page.locator(RESPONSE_CONTAINER_SELECTOR).last
response_element_locator = response_container_locator.locator(RESPONSE_TEXT_SELECTOR)
self.logger.info(f"[{self.req_id}] 等待响应元素附加到DOM...")
await expect_async(response_element_locator).to_be_attached(timeout=90000)
await self._check_disconnect(check_client_disconnected, "获取响应 - 响应元素已附加")
# 等待响应完成
submit_button_locator = self.page.locator(SUBMIT_BUTTON_SELECTOR)
edit_button_locator = self.page.locator(EDIT_MESSAGE_BUTTON_SELECTOR)
input_field_locator = self.page.locator(PROMPT_TEXTAREA_SELECTOR)
self.logger.info(f"[{self.req_id}] 等待响应完成...")
completion_detected = await _wait_for_response_completion(
self.page, input_field_locator, submit_button_locator, edit_button_locator, self.req_id, check_client_disconnected, None
)
if not completion_detected:
self.logger.warning(f"[{self.req_id}] 响应完成检测失败,尝试获取当前内容")
else:
self.logger.info(f"[{self.req_id}] ✅ 响应完成检测成功")
# 获取最终响应内容
final_content = await _get_final_response_content(self.page, self.req_id, check_client_disconnected)
if not final_content or not final_content.strip():
self.logger.warning(f"[{self.req_id}] ⚠️ 获取到的响应内容为空")
await save_error_snapshot(f"empty_response_{self.req_id}")
# 不抛出异常,返回空内容让上层处理
return ""
self.logger.info(f"[{self.req_id}] ✅ 成功获取响应内容 ({len(final_content)} chars)")
return final_content
except Exception as e:
self.logger.error(f"[{self.req_id}] ❌ 获取响应时出错: {e}")
if not isinstance(e, ClientDisconnectedError):
await save_error_snapshot(f"get_response_error_{self.req_id}")
raise