# --- browser_utils/operations.py --- # 浏览器页面操作相关功能模块 import asyncio import time import json import os import re import logging from typing import Optional, Any, List, Dict, Callable, Set from playwright.async_api import Page as AsyncPage, Locator, Error as PlaywrightAsyncError # 导入配置和模型 from config import * from models import ClientDisconnectedError logger = logging.getLogger("AIStudioProxyServer") async def get_raw_text_content(response_element: Locator, previous_text: str, req_id: str) -> str: """从响应元素获取原始文本内容""" raw_text = previous_text try: await response_element.wait_for(state='attached', timeout=1000) pre_element = response_element.locator('pre').last pre_found_and_visible = False try: await pre_element.wait_for(state='visible', timeout=250) pre_found_and_visible = True except PlaywrightAsyncError: pass if pre_found_and_visible: try: raw_text = await pre_element.inner_text(timeout=500) except PlaywrightAsyncError as pre_err: if DEBUG_LOGS_ENABLED: logger.debug(f"[{req_id}] (获取原始文本) 获取 pre 元素内部文本失败: {pre_err}") else: try: raw_text = await response_element.inner_text(timeout=500) except PlaywrightAsyncError as e_parent: if DEBUG_LOGS_ENABLED: logger.debug(f"[{req_id}] (获取原始文本) 获取响应元素内部文本失败: {e_parent}") except PlaywrightAsyncError as e_parent: if DEBUG_LOGS_ENABLED: logger.debug(f"[{req_id}] (获取原始文本) 响应元素未准备好: {e_parent}") except Exception as e_unexpected: logger.warning(f"[{req_id}] (获取原始文本) 意外错误: {e_unexpected}") if raw_text != previous_text: if DEBUG_LOGS_ENABLED: preview = raw_text[:100].replace('\n', '\\n') logger.debug(f"[{req_id}] (获取原始文本) 文本已更新,长度: {len(raw_text)},预览: '{preview}...'") return raw_text def _parse_userscript_models(script_content: str): """从油猴脚本中解析模型列表 - 使用JSON解析方式""" try: # 查找脚本版本号 version_pattern = r'const\s+SCRIPT_VERSION\s*=\s*[\'"]([^\'"]+)[\'"]' version_match = re.search(version_pattern, script_content) script_version = version_match.group(1) if version_match else "v1.6" # 查找 MODELS_TO_INJECT 数组的内容 models_array_pattern = r'const\s+MODELS_TO_INJECT\s*=\s*(\[.*?\]);' models_match = re.search(models_array_pattern, script_content, re.DOTALL) if not models_match: logger.warning("未找到 MODELS_TO_INJECT 数组") return [] models_js_code = models_match.group(1) # 将JavaScript数组转换为JSON格式 # 1. 替换模板字符串中的变量 models_js_code = models_js_code.replace('${SCRIPT_VERSION}', script_version) # 2. 移除JavaScript注释 models_js_code = re.sub(r'//.*?$', '', models_js_code, flags=re.MULTILINE) # 3. 将JavaScript对象转换为JSON格式 # 移除尾随逗号 models_js_code = re.sub(r',\s*([}\]])', r'\1', models_js_code) # 替换单引号为双引号 models_js_code = re.sub(r"(\w+):\s*'([^']*)'", r'"\1": "\2"', models_js_code) # 替换反引号为双引号 models_js_code = re.sub(r'(\w+):\s*`([^`]*)`', r'"\1": "\2"', models_js_code) # 确保属性名用双引号 models_js_code = re.sub(r'(\w+):', r'"\1":', models_js_code) # 4. 解析JSON import json models_data = json.loads(models_js_code) models = [] for model_obj in models_data: if isinstance(model_obj, dict) and 'name' in model_obj: models.append({ 'name': model_obj.get('name', ''), 'displayName': model_obj.get('displayName', ''), 'description': model_obj.get('description', '') }) logger.info(f"成功解析 {len(models)} 个模型从油猴脚本") return models except Exception as e: logger.error(f"解析油猴脚本模型列表失败: {e}") return [] def _get_injected_models(): """从油猴脚本中获取注入的模型列表,转换为API格式""" try: # 直接读取环境变量,避免复杂的导入 enable_injection = os.environ.get('ENABLE_SCRIPT_INJECTION', 'true').lower() in ('true', '1', 'yes') if not enable_injection: return [] # 获取脚本文件路径 script_path = os.environ.get('USERSCRIPT_PATH', 'browser_utils/more_modles.js') # 检查脚本文件是否存在 if not os.path.exists(script_path): # 脚本文件不存在,静默返回空列表 return [] # 读取油猴脚本内容 with open(script_path, 'r', encoding='utf-8') as f: script_content = f.read() # 从脚本中解析模型列表 models = _parse_userscript_models(script_content) if not models: return [] # 转换为API格式 injected_models = [] for model in models: model_name = model.get('name', '') if not model_name: continue # 跳过没有名称的模型 if model_name.startswith('models/'): simple_id = model_name[7:] # 移除 'models/' 前缀 else: simple_id = model_name display_name = model.get('displayName', model.get('display_name', simple_id)) description = model.get('description', f'Injected model: {simple_id}') # 注意:不再清理显示名称,保留原始的emoji和版本信息 model_entry = { "id": simple_id, "object": "model", "created": int(time.time()), "owned_by": "ai_studio_injected", "display_name": display_name, "description": description, "raw_model_path": model_name, "default_temperature": 1.0, "default_max_output_tokens": 65536, "supported_max_output_tokens": 65536, "default_top_p": 0.95, "injected": True # 标记为注入的模型 } injected_models.append(model_entry) return injected_models except Exception as e: # 静默处理错误,不输出日志,返回空列表 return [] async def _handle_model_list_response(response: Any): """处理模型列表响应""" # 需要访问全局变量 import server global_model_list_raw_json = getattr(server, 'global_model_list_raw_json', None) parsed_model_list = getattr(server, 'parsed_model_list', []) model_list_fetch_event = getattr(server, 'model_list_fetch_event', None) excluded_model_ids = getattr(server, 'excluded_model_ids', set()) if MODELS_ENDPOINT_URL_CONTAINS in response.url and response.ok: # 检查是否在登录流程中 launch_mode = os.environ.get('LAUNCH_MODE', 'debug') is_in_login_flow = launch_mode in ['debug'] and not getattr(server, 'is_page_ready', False) if is_in_login_flow: # 在登录流程中,静默处理,不输出干扰信息 pass # 静默处理,避免干扰用户输入 else: logger.info(f"捕获到潜在的模型列表响应来自: {response.url} (状态: {response.status})") try: data = await response.json() models_array_container = None if isinstance(data, list) and data: if isinstance(data[0], list) and data[0] and isinstance(data[0][0], list): if not is_in_login_flow: logger.info("检测到三层列表结构 data[0][0] is list. models_array_container 设置为 data[0]。") models_array_container = data[0] elif isinstance(data[0], list) and data[0] and isinstance(data[0][0], str): if not is_in_login_flow: logger.info("检测到两层列表结构 data[0][0] is str. models_array_container 设置为 data。") models_array_container = data elif isinstance(data[0], dict): if not is_in_login_flow: logger.info("检测到根列表,元素为字典。直接使用 data 作为 models_array_container。") models_array_container = data else: logger.warning(f"未知的列表嵌套结构。data[0] 类型: {type(data[0]) if data else 'N/A'}。data[0] 预览: {str(data[0])[:200] if data else 'N/A'}") elif isinstance(data, dict): if 'data' in data and isinstance(data['data'], list): models_array_container = data['data'] elif 'models' in data and isinstance(data['models'], list): models_array_container = data['models'] else: for key, value in data.items(): if isinstance(value, list) and len(value) > 0 and isinstance(value[0], (dict, list)): models_array_container = value logger.info(f"模型列表数据在 '{key}' 键下通过启发式搜索找到。") break if models_array_container is None: logger.warning("在字典响应中未能自动定位模型列表数组。") if model_list_fetch_event and not model_list_fetch_event.is_set(): model_list_fetch_event.set() return else: logger.warning(f"接收到的模型列表数据既不是列表也不是字典: {type(data)}") if model_list_fetch_event and not model_list_fetch_event.is_set(): model_list_fetch_event.set() return if models_array_container is not None: new_parsed_list = [] for entry_in_container in models_array_container: model_fields_list = None if isinstance(entry_in_container, dict): potential_id = entry_in_container.get('id', entry_in_container.get('model_id', entry_in_container.get('modelId'))) if potential_id: model_fields_list = entry_in_container else: model_fields_list = list(entry_in_container.values()) elif isinstance(entry_in_container, list): model_fields_list = entry_in_container else: logger.debug(f"Skipping entry of unknown type: {type(entry_in_container)}") continue if not model_fields_list: logger.debug("Skipping entry because model_fields_list is empty or None.") continue model_id_path_str = None display_name_candidate = "" description_candidate = "N/A" default_max_output_tokens_val = None default_top_p_val = None default_temperature_val = 1.0 supported_max_output_tokens_val = None current_model_id_for_log = "UnknownModelYet" try: if isinstance(model_fields_list, list): if not (len(model_fields_list) > 0 and isinstance(model_fields_list[0], (str, int, float))): logger.debug(f"Skipping list-based model_fields due to invalid first element: {str(model_fields_list)[:100]}") continue model_id_path_str = str(model_fields_list[0]) current_model_id_for_log = model_id_path_str.split('/')[-1] if model_id_path_str and '/' in model_id_path_str else model_id_path_str display_name_candidate = str(model_fields_list[3]) if len(model_fields_list) > 3 else "" description_candidate = str(model_fields_list[4]) if len(model_fields_list) > 4 else "N/A" if len(model_fields_list) > 6 and model_fields_list[6] is not None: try: val_int = int(model_fields_list[6]) default_max_output_tokens_val = val_int supported_max_output_tokens_val = val_int except (ValueError, TypeError): logger.warning(f"模型 {current_model_id_for_log}: 无法将列表索引6的值 '{model_fields_list[6]}' 解析为 max_output_tokens。") if len(model_fields_list) > 9 and model_fields_list[9] is not None: try: raw_top_p = float(model_fields_list[9]) if not (0.0 <= raw_top_p <= 1.0): logger.warning(f"模型 {current_model_id_for_log}: 原始 top_p值 {raw_top_p} (来自列表索引9) 超出 [0,1] 范围,将裁剪。") default_top_p_val = max(0.0, min(1.0, raw_top_p)) else: default_top_p_val = raw_top_p except (ValueError, TypeError): logger.warning(f"模型 {current_model_id_for_log}: 无法将列表索引9的值 '{model_fields_list[9]}' 解析为 top_p。") elif isinstance(model_fields_list, dict): model_id_path_str = str(model_fields_list.get('id', model_fields_list.get('model_id', model_fields_list.get('modelId')))) current_model_id_for_log = model_id_path_str.split('/')[-1] if model_id_path_str and '/' in model_id_path_str else model_id_path_str display_name_candidate = str(model_fields_list.get('displayName', model_fields_list.get('display_name', model_fields_list.get('name', '')))) description_candidate = str(model_fields_list.get('description', "N/A")) mot_parsed = model_fields_list.get('maxOutputTokens', model_fields_list.get('defaultMaxOutputTokens', model_fields_list.get('outputTokenLimit'))) if mot_parsed is not None: try: val_int = int(mot_parsed) default_max_output_tokens_val = val_int supported_max_output_tokens_val = val_int except (ValueError, TypeError): logger.warning(f"模型 {current_model_id_for_log}: 无法将字典值 '{mot_parsed}' 解析为 max_output_tokens。") top_p_parsed = model_fields_list.get('topP', model_fields_list.get('defaultTopP')) if top_p_parsed is not None: try: raw_top_p = float(top_p_parsed) if not (0.0 <= raw_top_p <= 1.0): logger.warning(f"模型 {current_model_id_for_log}: 原始 top_p值 {raw_top_p} (来自字典) 超出 [0,1] 范围,将裁剪。") default_top_p_val = max(0.0, min(1.0, raw_top_p)) else: default_top_p_val = raw_top_p except (ValueError, TypeError): logger.warning(f"模型 {current_model_id_for_log}: 无法将字典值 '{top_p_parsed}' 解析为 top_p。") temp_parsed = model_fields_list.get('temperature', model_fields_list.get('defaultTemperature')) if temp_parsed is not None: try: default_temperature_val = float(temp_parsed) except (ValueError, TypeError): logger.warning(f"模型 {current_model_id_for_log}: 无法将字典值 '{temp_parsed}' 解析为 temperature。") else: logger.debug(f"Skipping entry because model_fields_list is not list or dict: {type(model_fields_list)}") continue except Exception as e_parse_fields: logger.error(f"解析模型字段时出错 for entry {str(entry_in_container)[:100]}: {e_parse_fields}") continue if model_id_path_str and model_id_path_str.lower() != "none": simple_model_id_str = model_id_path_str.split('/')[-1] if '/' in model_id_path_str else model_id_path_str if simple_model_id_str in excluded_model_ids: if not is_in_login_flow: logger.info(f"模型 '{simple_model_id_str}' 在排除列表 excluded_model_ids 中,已跳过。") continue final_display_name_str = display_name_candidate if display_name_candidate else simple_model_id_str.replace("-", " ").title() model_entry_dict = { "id": simple_model_id_str, "object": "model", "created": int(time.time()), "owned_by": "ai_studio", "display_name": final_display_name_str, "description": description_candidate, "raw_model_path": model_id_path_str, "default_temperature": default_temperature_val, "default_max_output_tokens": default_max_output_tokens_val, "supported_max_output_tokens": supported_max_output_tokens_val, "default_top_p": default_top_p_val } new_parsed_list.append(model_entry_dict) else: logger.debug(f"Skipping entry due to invalid model_id_path: {model_id_path_str} from entry {str(entry_in_container)[:100]}") if new_parsed_list: # 尝试添加注入的模型到解析列表 injected_models = _get_injected_models() if injected_models: new_parsed_list.extend(injected_models) if not is_in_login_flow: logger.info(f"添加了 {len(injected_models)} 个注入的模型到API模型列表") server.parsed_model_list = sorted(new_parsed_list, key=lambda m: m.get('display_name', '').lower()) server.global_model_list_raw_json = json.dumps({"data": server.parsed_model_list, "object": "list"}) if DEBUG_LOGS_ENABLED: log_output = f"成功解析和更新模型列表。总共解析模型数: {len(server.parsed_model_list)}.\n" for i, item in enumerate(server.parsed_model_list[:min(3, len(server.parsed_model_list))]): log_output += f" Model {i+1}: ID={item.get('id')}, Name={item.get('display_name')}, Temp={item.get('default_temperature')}, MaxTokDef={item.get('default_max_output_tokens')}, MaxTokSup={item.get('supported_max_output_tokens')}, TopP={item.get('default_top_p')}\n" logger.info(log_output) if model_list_fetch_event and not model_list_fetch_event.is_set(): model_list_fetch_event.set() elif not server.parsed_model_list: logger.warning("解析后模型列表仍然为空。") if model_list_fetch_event and not model_list_fetch_event.is_set(): model_list_fetch_event.set() else: logger.warning("models_array_container 为 None,无法解析模型列表。") if model_list_fetch_event and not model_list_fetch_event.is_set(): model_list_fetch_event.set() except json.JSONDecodeError as json_err: logger.error(f"解析模型列表JSON失败: {json_err}. 响应 (前500字): {await response.text()[:500]}") except Exception as e_handle_list_resp: logger.exception(f"处理模型列表响应时发生未知错误: {e_handle_list_resp}") finally: if model_list_fetch_event and not model_list_fetch_event.is_set(): logger.info("处理模型列表响应结束,强制设置 model_list_fetch_event。") model_list_fetch_event.set() async def detect_and_extract_page_error(page: AsyncPage, req_id: str) -> Optional[str]: """检测并提取页面错误""" error_toast_locator = page.locator(ERROR_TOAST_SELECTOR).last try: await error_toast_locator.wait_for(state='visible', timeout=500) message_locator = error_toast_locator.locator('span.content-text') error_message = await message_locator.text_content(timeout=500) if error_message: logger.error(f"[{req_id}] 检测到并提取错误消息: {error_message}") return error_message.strip() else: logger.warning(f"[{req_id}] 检测到错误提示框,但无法提取消息。") return "检测到错误提示框,但无法提取特定消息。" except PlaywrightAsyncError: return None except Exception as e: logger.warning(f"[{req_id}] 检查页面错误时出错: {e}") return None async def save_error_snapshot(error_name: str = 'error'): """保存错误快照""" import server name_parts = error_name.split('_') req_id = name_parts[-1] if len(name_parts) > 1 and len(name_parts[-1]) == 7 else None base_error_name = error_name if not req_id else '_'.join(name_parts[:-1]) log_prefix = f"[{req_id}]" if req_id else "[无请求ID]" page_to_snapshot = server.page_instance if not server.browser_instance or not server.browser_instance.is_connected() or not page_to_snapshot or page_to_snapshot.is_closed(): logger.warning(f"{log_prefix} 无法保存快照 ({base_error_name}),浏览器/页面不可用。") return logger.info(f"{log_prefix} 尝试保存错误快照 ({base_error_name})...") timestamp = int(time.time() * 1000) error_dir = os.path.join(os.path.dirname(__file__), '..', 'errors_py') try: os.makedirs(error_dir, exist_ok=True) filename_suffix = f"{req_id}_{timestamp}" if req_id else f"{timestamp}" filename_base = f"{base_error_name}_{filename_suffix}" screenshot_path = os.path.join(error_dir, f"{filename_base}.png") html_path = os.path.join(error_dir, f"{filename_base}.html") try: await page_to_snapshot.screenshot(path=screenshot_path, full_page=True, timeout=15000) logger.info(f"{log_prefix} 快照已保存到: {screenshot_path}") except Exception as ss_err: logger.error(f"{log_prefix} 保存屏幕截图失败 ({base_error_name}): {ss_err}") try: content = await page_to_snapshot.content() f = None try: f = open(html_path, 'w', encoding='utf-8') f.write(content) logger.info(f"{log_prefix} HTML 已保存到: {html_path}") except Exception as write_err: logger.error(f"{log_prefix} 保存 HTML 失败 ({base_error_name}): {write_err}") finally: if f: try: f.close() logger.debug(f"{log_prefix} HTML 文件已正确关闭") except Exception as close_err: logger.error(f"{log_prefix} 关闭 HTML 文件时出错: {close_err}") except Exception as html_err: logger.error(f"{log_prefix} 获取页面内容失败 ({base_error_name}): {html_err}") except Exception as dir_err: logger.error(f"{log_prefix} 创建错误目录或保存快照时发生其他错误 ({base_error_name}): {dir_err}") async def get_response_via_edit_button( page: AsyncPage, req_id: str, check_client_disconnected: Callable ) -> Optional[str]: """通过编辑按钮获取响应""" logger.info(f"[{req_id}] (Helper) 尝试通过编辑按钮获取响应...") last_message_container = page.locator('ms-chat-turn').last edit_button = last_message_container.get_by_label("Edit") finish_edit_button = last_message_container.get_by_label("Stop editing") autosize_textarea_locator = last_message_container.locator('ms-autosize-textarea') actual_textarea_locator = autosize_textarea_locator.locator('textarea') try: logger.info(f"[{req_id}] - 尝试悬停最后一条消息以显示 'Edit' 按钮...") try: # 对消息容器执行悬停操作 await last_message_container.hover(timeout=CLICK_TIMEOUT_MS / 2) # 使用一半的点击超时作为悬停超时 await asyncio.sleep(0.3) # 等待悬停效果生效 check_client_disconnected("编辑响应 - 悬停后: ") except Exception as hover_err: logger.warning(f"[{req_id}] - (get_response_via_edit_button) 悬停最后一条消息失败 (忽略): {type(hover_err).__name__}") # 即使悬停失败,也继续尝试后续操作,Playwright的expect_async可能会处理 logger.info(f"[{req_id}] - 定位并点击 'Edit' 按钮...") try: from playwright.async_api import expect as expect_async await expect_async(edit_button).to_be_visible(timeout=CLICK_TIMEOUT_MS) check_client_disconnected("编辑响应 - 'Edit' 按钮可见后: ") await edit_button.click(timeout=CLICK_TIMEOUT_MS) logger.info(f"[{req_id}] - 'Edit' 按钮已点击。") except Exception as edit_btn_err: logger.error(f"[{req_id}] - 'Edit' 按钮不可见或点击失败: {edit_btn_err}") await save_error_snapshot(f"edit_response_edit_button_failed_{req_id}") return None check_client_disconnected("编辑响应 - 点击 'Edit' 按钮后: ") await asyncio.sleep(0.3) check_client_disconnected("编辑响应 - 点击 'Edit' 按钮后延时后: ") logger.info(f"[{req_id}] - 从文本区域获取内容...") response_content = None textarea_failed = False try: await expect_async(autosize_textarea_locator).to_be_visible(timeout=CLICK_TIMEOUT_MS) check_client_disconnected("编辑响应 - autosize-textarea 可见后: ") try: data_value_content = await autosize_textarea_locator.get_attribute("data-value") check_client_disconnected("编辑响应 - get_attribute data-value 后: ") if data_value_content is not None: response_content = str(data_value_content) logger.info(f"[{req_id}] - 从 data-value 获取内容成功。") except Exception as data_val_err: logger.warning(f"[{req_id}] - 获取 data-value 失败: {data_val_err}") check_client_disconnected("编辑响应 - get_attribute data-value 错误后: ") if response_content is None: logger.info(f"[{req_id}] - data-value 获取失败或为None,尝试从内部 textarea 获取 input_value...") try: await expect_async(actual_textarea_locator).to_be_visible(timeout=CLICK_TIMEOUT_MS/2) input_val_content = await actual_textarea_locator.input_value(timeout=CLICK_TIMEOUT_MS/2) check_client_disconnected("编辑响应 - input_value 后: ") if input_val_content is not None: response_content = str(input_val_content) logger.info(f"[{req_id}] - 从 input_value 获取内容成功。") except Exception as input_val_err: logger.warning(f"[{req_id}] - 获取 input_value 也失败: {input_val_err}") check_client_disconnected("编辑响应 - input_value 错误后: ") if response_content is not None: response_content = response_content.strip() content_preview = response_content[:100].replace('\\n', '\\\\n') logger.info(f"[{req_id}] - ✅ 最终获取内容 (长度={len(response_content)}): '{content_preview}...'") else: logger.warning(f"[{req_id}] - 所有方法 (data-value, input_value) 内容获取均失败或返回 None。") textarea_failed = True except Exception as textarea_err: logger.error(f"[{req_id}] - 定位或处理文本区域时失败: {textarea_err}") textarea_failed = True response_content = None check_client_disconnected("编辑响应 - 获取文本区域错误后: ") if not textarea_failed: logger.info(f"[{req_id}] - 定位并点击 'Stop editing' 按钮...") try: await expect_async(finish_edit_button).to_be_visible(timeout=CLICK_TIMEOUT_MS) check_client_disconnected("编辑响应 - 'Stop editing' 按钮可见后: ") await finish_edit_button.click(timeout=CLICK_TIMEOUT_MS) logger.info(f"[{req_id}] - 'Stop editing' 按钮已点击。") except Exception as finish_btn_err: logger.warning(f"[{req_id}] - 'Stop editing' 按钮不可见或点击失败: {finish_btn_err}") await save_error_snapshot(f"edit_response_finish_button_failed_{req_id}") check_client_disconnected("编辑响应 - 点击 'Stop editing' 后: ") await asyncio.sleep(0.2) check_client_disconnected("编辑响应 - 点击 'Stop editing' 后延时后: ") else: logger.info(f"[{req_id}] - 跳过点击 'Stop editing' 按钮,因为文本区域读取失败。") return response_content except ClientDisconnectedError: logger.info(f"[{req_id}] (Helper Edit) 客户端断开连接。") raise except Exception as e: logger.exception(f"[{req_id}] 通过编辑按钮获取响应过程中发生意外错误") await save_error_snapshot(f"edit_response_unexpected_error_{req_id}") return None async def get_response_via_copy_button( page: AsyncPage, req_id: str, check_client_disconnected: Callable ) -> Optional[str]: """通过复制按钮获取响应""" logger.info(f"[{req_id}] (Helper) 尝试通过复制按钮获取响应...") last_message_container = page.locator('ms-chat-turn').last more_options_button = last_message_container.get_by_label("Open options") copy_markdown_button = page.get_by_role("menuitem", name="Copy markdown") try: logger.info(f"[{req_id}] - 尝试悬停最后一条消息以显示选项...") await last_message_container.hover(timeout=CLICK_TIMEOUT_MS) check_client_disconnected("复制响应 - 悬停后: ") await asyncio.sleep(0.5) check_client_disconnected("复制响应 - 悬停后延时后: ") logger.info(f"[{req_id}] - 已悬停。") logger.info(f"[{req_id}] - 定位并点击 '更多选项' 按钮...") try: from playwright.async_api import expect as expect_async await expect_async(more_options_button).to_be_visible(timeout=CLICK_TIMEOUT_MS) check_client_disconnected("复制响应 - 更多选项按钮可见后: ") await more_options_button.click(timeout=CLICK_TIMEOUT_MS) logger.info(f"[{req_id}] - '更多选项' 已点击 (通过 get_by_label)。") except Exception as more_opts_err: logger.error(f"[{req_id}] - '更多选项' 按钮 (通过 get_by_label) 不可见或点击失败: {more_opts_err}") await save_error_snapshot(f"copy_response_more_options_failed_{req_id}") return None check_client_disconnected("复制响应 - 点击更多选项后: ") await asyncio.sleep(0.5) check_client_disconnected("复制响应 - 点击更多选项后延时后: ") logger.info(f"[{req_id}] - 定位并点击 '复制 Markdown' 按钮...") copy_success = False try: await expect_async(copy_markdown_button).to_be_visible(timeout=CLICK_TIMEOUT_MS) check_client_disconnected("复制响应 - 复制按钮可见后: ") await copy_markdown_button.click(timeout=CLICK_TIMEOUT_MS, force=True) copy_success = True logger.info(f"[{req_id}] - 已点击 '复制 Markdown' (通过 get_by_role)。") except Exception as copy_err: logger.error(f"[{req_id}] - '复制 Markdown' 按钮 (通过 get_by_role) 点击失败: {copy_err}") await save_error_snapshot(f"copy_response_copy_button_failed_{req_id}") return None if not copy_success: logger.error(f"[{req_id}] - 未能点击 '复制 Markdown' 按钮。") return None check_client_disconnected("复制响应 - 点击复制按钮后: ") await asyncio.sleep(0.5) check_client_disconnected("复制响应 - 点击复制按钮后延时后: ") logger.info(f"[{req_id}] - 正在读取剪贴板内容...") try: clipboard_content = await page.evaluate('navigator.clipboard.readText()') check_client_disconnected("复制响应 - 读取剪贴板后: ") if clipboard_content: content_preview = clipboard_content[:100].replace('\n', '\\\\n') logger.info(f"[{req_id}] - ✅ 成功获取剪贴板内容 (长度={len(clipboard_content)}): '{content_preview}...'") return clipboard_content else: logger.error(f"[{req_id}] - 剪贴板内容为空。") return None except Exception as clipboard_err: if "clipboard-read" in str(clipboard_err): logger.error(f"[{req_id}] - 读取剪贴板失败: 可能是权限问题。错误: {clipboard_err}") else: logger.error(f"[{req_id}] - 读取剪贴板失败: {clipboard_err}") await save_error_snapshot(f"copy_response_clipboard_read_failed_{req_id}") return None except ClientDisconnectedError: logger.info(f"[{req_id}] (Helper Copy) 客户端断开连接。") raise except Exception as e: logger.exception(f"[{req_id}] 复制响应过程中发生意外错误") await save_error_snapshot(f"copy_response_unexpected_error_{req_id}") return None async def _wait_for_response_completion( page: AsyncPage, prompt_textarea_locator: Locator, submit_button_locator: Locator, edit_button_locator: Locator, req_id: str, check_client_disconnected_func: Callable, current_chat_id: Optional[str], timeout_ms=RESPONSE_COMPLETION_TIMEOUT, initial_wait_ms=INITIAL_WAIT_MS_BEFORE_POLLING ) -> bool: """等待响应完成""" from playwright.async_api import TimeoutError logger.info(f"[{req_id}] (WaitV3) 开始等待响应完成... (超时: {timeout_ms}ms)") await asyncio.sleep(initial_wait_ms / 1000) # Initial brief wait start_time = time.time() wait_timeout_ms_short = 3000 # 3 seconds for individual element checks consecutive_empty_input_submit_disabled_count = 0 while True: try: check_client_disconnected_func("等待响应完成 - 循环开始") except ClientDisconnectedError: logger.info(f"[{req_id}] (WaitV3) 客户端断开连接,中止等待。") return False current_time_elapsed_ms = (time.time() - start_time) * 1000 if current_time_elapsed_ms > timeout_ms: logger.error(f"[{req_id}] (WaitV3) 等待响应完成超时 ({timeout_ms}ms)。") await save_error_snapshot(f"wait_completion_v3_overall_timeout_{req_id}") return False try: check_client_disconnected_func("等待响应完成 - 超时检查后") except ClientDisconnectedError: return False # --- 主要条件: 输入框空 & 提交按钮禁用 --- is_input_empty = await prompt_textarea_locator.input_value() == "" is_submit_disabled = False try: is_submit_disabled = await submit_button_locator.is_disabled(timeout=wait_timeout_ms_short) except TimeoutError: logger.warning(f"[{req_id}] (WaitV3) 检查提交按钮是否禁用超时。为本次检查假定其未禁用。") try: check_client_disconnected_func("等待响应完成 - 按钮状态检查后") except ClientDisconnectedError: return False if is_input_empty and is_submit_disabled: consecutive_empty_input_submit_disabled_count += 1 if DEBUG_LOGS_ENABLED: logger.debug(f"[{req_id}] (WaitV3) 主要条件满足: 输入框空,提交按钮禁用 (计数: {consecutive_empty_input_submit_disabled_count})。") # --- 最终确认: 编辑按钮可见 --- try: if await edit_button_locator.is_visible(timeout=wait_timeout_ms_short): logger.info(f"[{req_id}] (WaitV3) ✅ 响应完成: 输入框空,提交按钮禁用,编辑按钮可见。") return True # 明确完成 except TimeoutError: if DEBUG_LOGS_ENABLED: logger.debug(f"[{req_id}] (WaitV3) 主要条件满足后,检查编辑按钮可见性超时。") try: check_client_disconnected_func("等待响应完成 - 编辑按钮检查后") except ClientDisconnectedError: return False # 启发式完成: 如果主要条件持续满足,但编辑按钮仍未出现 if consecutive_empty_input_submit_disabled_count >= 3: # 例如,大约 1.5秒 (3 * 0.5秒轮询) logger.warning(f"[{req_id}] (WaitV3) 响应可能已完成 (启发式): 输入框空,提交按钮禁用,但在 {consecutive_empty_input_submit_disabled_count} 次检查后编辑按钮仍未出现。假定完成。后续若内容获取失败,可能与此有关。") return True # 启发式完成 else: # 主要条件 (输入框空 & 提交按钮禁用) 未满足 consecutive_empty_input_submit_disabled_count = 0 # 重置计数器 if DEBUG_LOGS_ENABLED: reasons = [] if not is_input_empty: reasons.append("输入框非空") if not is_submit_disabled: reasons.append("提交按钮非禁用") logger.debug(f"[{req_id}] (WaitV3) 主要条件未满足 ({', '.join(reasons)}). 继续轮询...") await asyncio.sleep(0.5) # 轮询间隔 async def _get_final_response_content( page: AsyncPage, req_id: str, check_client_disconnected: Callable ) -> Optional[str]: """获取最终响应内容""" logger.info(f"[{req_id}] (Helper GetContent) 开始获取最终响应内容...") response_content = await get_response_via_edit_button( page, req_id, check_client_disconnected ) if response_content is not None: logger.info(f"[{req_id}] (Helper GetContent) ✅ 成功通过编辑按钮获取内容。") return response_content logger.warning(f"[{req_id}] (Helper GetContent) 编辑按钮方法失败或返回空,回退到复制按钮方法...") response_content = await get_response_via_copy_button( page, req_id, check_client_disconnected ) if response_content is not None: logger.info(f"[{req_id}] (Helper GetContent) ✅ 成功通过复制按钮获取内容。") return response_content logger.error(f"[{req_id}] (Helper GetContent) 所有获取响应内容的方法均失败。") await save_error_snapshot(f"get_content_all_methods_failed_{req_id}") return None