Spaces:
Paused
Paused
| # --- browser_utils/operations.py --- | |
| # 浏览器页面操作相关功能模块 | |
| import asyncio | |
| import time | |
| import json | |
| import os | |
| import re | |
| import logging | |
| from typing import Optional, Any, List, Dict, Callable, Set | |
| from playwright.async_api import Page as AsyncPage, Locator, Error as PlaywrightAsyncError | |
| # 导入配置和模型 | |
| from config import * | |
| from models import ClientDisconnectedError | |
| logger = logging.getLogger("AIStudioProxyServer") | |
| async def get_raw_text_content(response_element: Locator, previous_text: str, req_id: str) -> str: | |
| """从响应元素获取原始文本内容""" | |
| raw_text = previous_text | |
| try: | |
| await response_element.wait_for(state='attached', timeout=1000) | |
| pre_element = response_element.locator('pre').last | |
| pre_found_and_visible = False | |
| try: | |
| await pre_element.wait_for(state='visible', timeout=250) | |
| pre_found_and_visible = True | |
| except PlaywrightAsyncError: | |
| pass | |
| if pre_found_and_visible: | |
| try: | |
| raw_text = await pre_element.inner_text(timeout=500) | |
| except PlaywrightAsyncError as pre_err: | |
| if DEBUG_LOGS_ENABLED: | |
| logger.debug(f"[{req_id}] (获取原始文本) 获取 pre 元素内部文本失败: {pre_err}") | |
| else: | |
| try: | |
| raw_text = await response_element.inner_text(timeout=500) | |
| except PlaywrightAsyncError as e_parent: | |
| if DEBUG_LOGS_ENABLED: | |
| logger.debug(f"[{req_id}] (获取原始文本) 获取响应元素内部文本失败: {e_parent}") | |
| except PlaywrightAsyncError as e_parent: | |
| if DEBUG_LOGS_ENABLED: | |
| logger.debug(f"[{req_id}] (获取原始文本) 响应元素未准备好: {e_parent}") | |
| except Exception as e_unexpected: | |
| logger.warning(f"[{req_id}] (获取原始文本) 意外错误: {e_unexpected}") | |
| if raw_text != previous_text: | |
| if DEBUG_LOGS_ENABLED: | |
| preview = raw_text[:100].replace('\n', '\\n') | |
| logger.debug(f"[{req_id}] (获取原始文本) 文本已更新,长度: {len(raw_text)},预览: '{preview}...'") | |
| return raw_text | |
| def _parse_userscript_models(script_content: str): | |
| """从油猴脚本中解析模型列表 - 使用JSON解析方式""" | |
| try: | |
| # 查找脚本版本号 | |
| version_pattern = r'const\s+SCRIPT_VERSION\s*=\s*[\'"]([^\'"]+)[\'"]' | |
| version_match = re.search(version_pattern, script_content) | |
| script_version = version_match.group(1) if version_match else "v1.6" | |
| # 查找 MODELS_TO_INJECT 数组的内容 | |
| models_array_pattern = r'const\s+MODELS_TO_INJECT\s*=\s*(\[.*?\]);' | |
| models_match = re.search(models_array_pattern, script_content, re.DOTALL) | |
| if not models_match: | |
| logger.warning("未找到 MODELS_TO_INJECT 数组") | |
| return [] | |
| models_js_code = models_match.group(1) | |
| # 将JavaScript数组转换为JSON格式 | |
| # 1. 替换模板字符串中的变量 | |
| models_js_code = models_js_code.replace('${SCRIPT_VERSION}', script_version) | |
| # 2. 移除JavaScript注释 | |
| models_js_code = re.sub(r'//.*?$', '', models_js_code, flags=re.MULTILINE) | |
| # 3. 将JavaScript对象转换为JSON格式 | |
| # 移除尾随逗号 | |
| models_js_code = re.sub(r',\s*([}\]])', r'\1', models_js_code) | |
| # 替换单引号为双引号 | |
| models_js_code = re.sub(r"(\w+):\s*'([^']*)'", r'"\1": "\2"', models_js_code) | |
| # 替换反引号为双引号 | |
| models_js_code = re.sub(r'(\w+):\s*`([^`]*)`', r'"\1": "\2"', models_js_code) | |
| # 确保属性名用双引号 | |
| models_js_code = re.sub(r'(\w+):', r'"\1":', models_js_code) | |
| # 4. 解析JSON | |
| import json | |
| models_data = json.loads(models_js_code) | |
| models = [] | |
| for model_obj in models_data: | |
| if isinstance(model_obj, dict) and 'name' in model_obj: | |
| models.append({ | |
| 'name': model_obj.get('name', ''), | |
| 'displayName': model_obj.get('displayName', ''), | |
| 'description': model_obj.get('description', '') | |
| }) | |
| logger.info(f"成功解析 {len(models)} 个模型从油猴脚本") | |
| return models | |
| except Exception as e: | |
| logger.error(f"解析油猴脚本模型列表失败: {e}") | |
| return [] | |
| def _get_injected_models(): | |
| """从油猴脚本中获取注入的模型列表,转换为API格式""" | |
| try: | |
| # 直接读取环境变量,避免复杂的导入 | |
| enable_injection = os.environ.get('ENABLE_SCRIPT_INJECTION', 'true').lower() in ('true', '1', 'yes') | |
| if not enable_injection: | |
| return [] | |
| # 获取脚本文件路径 | |
| script_path = os.environ.get('USERSCRIPT_PATH', 'browser_utils/more_modles.js') | |
| # 检查脚本文件是否存在 | |
| if not os.path.exists(script_path): | |
| # 脚本文件不存在,静默返回空列表 | |
| return [] | |
| # 读取油猴脚本内容 | |
| with open(script_path, 'r', encoding='utf-8') as f: | |
| script_content = f.read() | |
| # 从脚本中解析模型列表 | |
| models = _parse_userscript_models(script_content) | |
| if not models: | |
| return [] | |
| # 转换为API格式 | |
| injected_models = [] | |
| for model in models: | |
| model_name = model.get('name', '') | |
| if not model_name: | |
| continue # 跳过没有名称的模型 | |
| if model_name.startswith('models/'): | |
| simple_id = model_name[7:] # 移除 'models/' 前缀 | |
| else: | |
| simple_id = model_name | |
| display_name = model.get('displayName', model.get('display_name', simple_id)) | |
| description = model.get('description', f'Injected model: {simple_id}') | |
| # 注意:不再清理显示名称,保留原始的emoji和版本信息 | |
| model_entry = { | |
| "id": simple_id, | |
| "object": "model", | |
| "created": int(time.time()), | |
| "owned_by": "ai_studio_injected", | |
| "display_name": display_name, | |
| "description": description, | |
| "raw_model_path": model_name, | |
| "default_temperature": 1.0, | |
| "default_max_output_tokens": 65536, | |
| "supported_max_output_tokens": 65536, | |
| "default_top_p": 0.95, | |
| "injected": True # 标记为注入的模型 | |
| } | |
| injected_models.append(model_entry) | |
| return injected_models | |
| except Exception as e: | |
| # 静默处理错误,不输出日志,返回空列表 | |
| return [] | |
| async def _handle_model_list_response(response: Any): | |
| """处理模型列表响应""" | |
| # 需要访问全局变量 | |
| import server | |
| global_model_list_raw_json = getattr(server, 'global_model_list_raw_json', None) | |
| parsed_model_list = getattr(server, 'parsed_model_list', []) | |
| model_list_fetch_event = getattr(server, 'model_list_fetch_event', None) | |
| excluded_model_ids = getattr(server, 'excluded_model_ids', set()) | |
| if MODELS_ENDPOINT_URL_CONTAINS in response.url and response.ok: | |
| # 检查是否在登录流程中 | |
| launch_mode = os.environ.get('LAUNCH_MODE', 'debug') | |
| is_in_login_flow = launch_mode in ['debug'] and not getattr(server, 'is_page_ready', False) | |
| if is_in_login_flow: | |
| # 在登录流程中,静默处理,不输出干扰信息 | |
| pass # 静默处理,避免干扰用户输入 | |
| else: | |
| logger.info(f"捕获到潜在的模型列表响应来自: {response.url} (状态: {response.status})") | |
| try: | |
| data = await response.json() | |
| models_array_container = None | |
| if isinstance(data, list) and data: | |
| if isinstance(data[0], list) and data[0] and isinstance(data[0][0], list): | |
| if not is_in_login_flow: | |
| logger.info("检测到三层列表结构 data[0][0] is list. models_array_container 设置为 data[0]。") | |
| models_array_container = data[0] | |
| elif isinstance(data[0], list) and data[0] and isinstance(data[0][0], str): | |
| if not is_in_login_flow: | |
| logger.info("检测到两层列表结构 data[0][0] is str. models_array_container 设置为 data。") | |
| models_array_container = data | |
| elif isinstance(data[0], dict): | |
| if not is_in_login_flow: | |
| logger.info("检测到根列表,元素为字典。直接使用 data 作为 models_array_container。") | |
| models_array_container = data | |
| else: | |
| logger.warning(f"未知的列表嵌套结构。data[0] 类型: {type(data[0]) if data else 'N/A'}。data[0] 预览: {str(data[0])[:200] if data else 'N/A'}") | |
| elif isinstance(data, dict): | |
| if 'data' in data and isinstance(data['data'], list): | |
| models_array_container = data['data'] | |
| elif 'models' in data and isinstance(data['models'], list): | |
| models_array_container = data['models'] | |
| else: | |
| for key, value in data.items(): | |
| if isinstance(value, list) and len(value) > 0 and isinstance(value[0], (dict, list)): | |
| models_array_container = value | |
| logger.info(f"模型列表数据在 '{key}' 键下通过启发式搜索找到。") | |
| break | |
| if models_array_container is None: | |
| logger.warning("在字典响应中未能自动定位模型列表数组。") | |
| if model_list_fetch_event and not model_list_fetch_event.is_set(): | |
| model_list_fetch_event.set() | |
| return | |
| else: | |
| logger.warning(f"接收到的模型列表数据既不是列表也不是字典: {type(data)}") | |
| if model_list_fetch_event and not model_list_fetch_event.is_set(): | |
| model_list_fetch_event.set() | |
| return | |
| if models_array_container is not None: | |
| new_parsed_list = [] | |
| for entry_in_container in models_array_container: | |
| model_fields_list = None | |
| if isinstance(entry_in_container, dict): | |
| potential_id = entry_in_container.get('id', entry_in_container.get('model_id', entry_in_container.get('modelId'))) | |
| if potential_id: | |
| model_fields_list = entry_in_container | |
| else: | |
| model_fields_list = list(entry_in_container.values()) | |
| elif isinstance(entry_in_container, list): | |
| model_fields_list = entry_in_container | |
| else: | |
| logger.debug(f"Skipping entry of unknown type: {type(entry_in_container)}") | |
| continue | |
| if not model_fields_list: | |
| logger.debug("Skipping entry because model_fields_list is empty or None.") | |
| continue | |
| model_id_path_str = None | |
| display_name_candidate = "" | |
| description_candidate = "N/A" | |
| default_max_output_tokens_val = None | |
| default_top_p_val = None | |
| default_temperature_val = 1.0 | |
| supported_max_output_tokens_val = None | |
| current_model_id_for_log = "UnknownModelYet" | |
| try: | |
| if isinstance(model_fields_list, list): | |
| if not (len(model_fields_list) > 0 and isinstance(model_fields_list[0], (str, int, float))): | |
| logger.debug(f"Skipping list-based model_fields due to invalid first element: {str(model_fields_list)[:100]}") | |
| continue | |
| model_id_path_str = str(model_fields_list[0]) | |
| current_model_id_for_log = model_id_path_str.split('/')[-1] if model_id_path_str and '/' in model_id_path_str else model_id_path_str | |
| display_name_candidate = str(model_fields_list[3]) if len(model_fields_list) > 3 else "" | |
| description_candidate = str(model_fields_list[4]) if len(model_fields_list) > 4 else "N/A" | |
| if len(model_fields_list) > 6 and model_fields_list[6] is not None: | |
| try: | |
| val_int = int(model_fields_list[6]) | |
| default_max_output_tokens_val = val_int | |
| supported_max_output_tokens_val = val_int | |
| except (ValueError, TypeError): | |
| logger.warning(f"模型 {current_model_id_for_log}: 无法将列表索引6的值 '{model_fields_list[6]}' 解析为 max_output_tokens。") | |
| if len(model_fields_list) > 9 and model_fields_list[9] is not None: | |
| try: | |
| raw_top_p = float(model_fields_list[9]) | |
| if not (0.0 <= raw_top_p <= 1.0): | |
| logger.warning(f"模型 {current_model_id_for_log}: 原始 top_p值 {raw_top_p} (来自列表索引9) 超出 [0,1] 范围,将裁剪。") | |
| default_top_p_val = max(0.0, min(1.0, raw_top_p)) | |
| else: | |
| default_top_p_val = raw_top_p | |
| except (ValueError, TypeError): | |
| logger.warning(f"模型 {current_model_id_for_log}: 无法将列表索引9的值 '{model_fields_list[9]}' 解析为 top_p。") | |
| elif isinstance(model_fields_list, dict): | |
| model_id_path_str = str(model_fields_list.get('id', model_fields_list.get('model_id', model_fields_list.get('modelId')))) | |
| current_model_id_for_log = model_id_path_str.split('/')[-1] if model_id_path_str and '/' in model_id_path_str else model_id_path_str | |
| display_name_candidate = str(model_fields_list.get('displayName', model_fields_list.get('display_name', model_fields_list.get('name', '')))) | |
| description_candidate = str(model_fields_list.get('description', "N/A")) | |
| mot_parsed = model_fields_list.get('maxOutputTokens', model_fields_list.get('defaultMaxOutputTokens', model_fields_list.get('outputTokenLimit'))) | |
| if mot_parsed is not None: | |
| try: | |
| val_int = int(mot_parsed) | |
| default_max_output_tokens_val = val_int | |
| supported_max_output_tokens_val = val_int | |
| except (ValueError, TypeError): | |
| logger.warning(f"模型 {current_model_id_for_log}: 无法将字典值 '{mot_parsed}' 解析为 max_output_tokens。") | |
| top_p_parsed = model_fields_list.get('topP', model_fields_list.get('defaultTopP')) | |
| if top_p_parsed is not None: | |
| try: | |
| raw_top_p = float(top_p_parsed) | |
| if not (0.0 <= raw_top_p <= 1.0): | |
| logger.warning(f"模型 {current_model_id_for_log}: 原始 top_p值 {raw_top_p} (来自字典) 超出 [0,1] 范围,将裁剪。") | |
| default_top_p_val = max(0.0, min(1.0, raw_top_p)) | |
| else: | |
| default_top_p_val = raw_top_p | |
| except (ValueError, TypeError): | |
| logger.warning(f"模型 {current_model_id_for_log}: 无法将字典值 '{top_p_parsed}' 解析为 top_p。") | |
| temp_parsed = model_fields_list.get('temperature', model_fields_list.get('defaultTemperature')) | |
| if temp_parsed is not None: | |
| try: | |
| default_temperature_val = float(temp_parsed) | |
| except (ValueError, TypeError): | |
| logger.warning(f"模型 {current_model_id_for_log}: 无法将字典值 '{temp_parsed}' 解析为 temperature。") | |
| else: | |
| logger.debug(f"Skipping entry because model_fields_list is not list or dict: {type(model_fields_list)}") | |
| continue | |
| except Exception as e_parse_fields: | |
| logger.error(f"解析模型字段时出错 for entry {str(entry_in_container)[:100]}: {e_parse_fields}") | |
| continue | |
| if model_id_path_str and model_id_path_str.lower() != "none": | |
| simple_model_id_str = model_id_path_str.split('/')[-1] if '/' in model_id_path_str else model_id_path_str | |
| if simple_model_id_str in excluded_model_ids: | |
| if not is_in_login_flow: | |
| logger.info(f"模型 '{simple_model_id_str}' 在排除列表 excluded_model_ids 中,已跳过。") | |
| continue | |
| final_display_name_str = display_name_candidate if display_name_candidate else simple_model_id_str.replace("-", " ").title() | |
| model_entry_dict = { | |
| "id": simple_model_id_str, | |
| "object": "model", | |
| "created": int(time.time()), | |
| "owned_by": "ai_studio", | |
| "display_name": final_display_name_str, | |
| "description": description_candidate, | |
| "raw_model_path": model_id_path_str, | |
| "default_temperature": default_temperature_val, | |
| "default_max_output_tokens": default_max_output_tokens_val, | |
| "supported_max_output_tokens": supported_max_output_tokens_val, | |
| "default_top_p": default_top_p_val | |
| } | |
| new_parsed_list.append(model_entry_dict) | |
| else: | |
| logger.debug(f"Skipping entry due to invalid model_id_path: {model_id_path_str} from entry {str(entry_in_container)[:100]}") | |
| if new_parsed_list: | |
| # 尝试添加注入的模型到解析列表 | |
| injected_models = _get_injected_models() | |
| if injected_models: | |
| new_parsed_list.extend(injected_models) | |
| if not is_in_login_flow: | |
| logger.info(f"添加了 {len(injected_models)} 个注入的模型到API模型列表") | |
| server.parsed_model_list = sorted(new_parsed_list, key=lambda m: m.get('display_name', '').lower()) | |
| server.global_model_list_raw_json = json.dumps({"data": server.parsed_model_list, "object": "list"}) | |
| if DEBUG_LOGS_ENABLED: | |
| log_output = f"成功解析和更新模型列表。总共解析模型数: {len(server.parsed_model_list)}.\n" | |
| for i, item in enumerate(server.parsed_model_list[:min(3, len(server.parsed_model_list))]): | |
| log_output += f" Model {i+1}: ID={item.get('id')}, Name={item.get('display_name')}, Temp={item.get('default_temperature')}, MaxTokDef={item.get('default_max_output_tokens')}, MaxTokSup={item.get('supported_max_output_tokens')}, TopP={item.get('default_top_p')}\n" | |
| logger.info(log_output) | |
| if model_list_fetch_event and not model_list_fetch_event.is_set(): | |
| model_list_fetch_event.set() | |
| elif not server.parsed_model_list: | |
| logger.warning("解析后模型列表仍然为空。") | |
| if model_list_fetch_event and not model_list_fetch_event.is_set(): | |
| model_list_fetch_event.set() | |
| else: | |
| logger.warning("models_array_container 为 None,无法解析模型列表。") | |
| if model_list_fetch_event and not model_list_fetch_event.is_set(): | |
| model_list_fetch_event.set() | |
| except json.JSONDecodeError as json_err: | |
| logger.error(f"解析模型列表JSON失败: {json_err}. 响应 (前500字): {await response.text()[:500]}") | |
| except Exception as e_handle_list_resp: | |
| logger.exception(f"处理模型列表响应时发生未知错误: {e_handle_list_resp}") | |
| finally: | |
| if model_list_fetch_event and not model_list_fetch_event.is_set(): | |
| logger.info("处理模型列表响应结束,强制设置 model_list_fetch_event。") | |
| model_list_fetch_event.set() | |
| async def detect_and_extract_page_error(page: AsyncPage, req_id: str) -> Optional[str]: | |
| """检测并提取页面错误""" | |
| error_toast_locator = page.locator(ERROR_TOAST_SELECTOR).last | |
| try: | |
| await error_toast_locator.wait_for(state='visible', timeout=500) | |
| message_locator = error_toast_locator.locator('span.content-text') | |
| error_message = await message_locator.text_content(timeout=500) | |
| if error_message: | |
| logger.error(f"[{req_id}] 检测到并提取错误消息: {error_message}") | |
| return error_message.strip() | |
| else: | |
| logger.warning(f"[{req_id}] 检测到错误提示框,但无法提取消息。") | |
| return "检测到错误提示框,但无法提取特定消息。" | |
| except PlaywrightAsyncError: | |
| return None | |
| except Exception as e: | |
| logger.warning(f"[{req_id}] 检查页面错误时出错: {e}") | |
| return None | |
| async def save_error_snapshot(error_name: str = 'error'): | |
| """保存错误快照""" | |
| import server | |
| name_parts = error_name.split('_') | |
| req_id = name_parts[-1] if len(name_parts) > 1 and len(name_parts[-1]) == 7 else None | |
| base_error_name = error_name if not req_id else '_'.join(name_parts[:-1]) | |
| log_prefix = f"[{req_id}]" if req_id else "[无请求ID]" | |
| page_to_snapshot = server.page_instance | |
| if not server.browser_instance or not server.browser_instance.is_connected() or not page_to_snapshot or page_to_snapshot.is_closed(): | |
| logger.warning(f"{log_prefix} 无法保存快照 ({base_error_name}),浏览器/页面不可用。") | |
| return | |
| logger.info(f"{log_prefix} 尝试保存错误快照 ({base_error_name})...") | |
| timestamp = int(time.time() * 1000) | |
| error_dir = os.path.join(os.path.dirname(__file__), '..', 'errors_py') | |
| try: | |
| os.makedirs(error_dir, exist_ok=True) | |
| filename_suffix = f"{req_id}_{timestamp}" if req_id else f"{timestamp}" | |
| filename_base = f"{base_error_name}_{filename_suffix}" | |
| screenshot_path = os.path.join(error_dir, f"{filename_base}.png") | |
| html_path = os.path.join(error_dir, f"{filename_base}.html") | |
| try: | |
| await page_to_snapshot.screenshot(path=screenshot_path, full_page=True, timeout=15000) | |
| logger.info(f"{log_prefix} 快照已保存到: {screenshot_path}") | |
| except Exception as ss_err: | |
| logger.error(f"{log_prefix} 保存屏幕截图失败 ({base_error_name}): {ss_err}") | |
| try: | |
| content = await page_to_snapshot.content() | |
| f = None | |
| try: | |
| f = open(html_path, 'w', encoding='utf-8') | |
| f.write(content) | |
| logger.info(f"{log_prefix} HTML 已保存到: {html_path}") | |
| except Exception as write_err: | |
| logger.error(f"{log_prefix} 保存 HTML 失败 ({base_error_name}): {write_err}") | |
| finally: | |
| if f: | |
| try: | |
| f.close() | |
| logger.debug(f"{log_prefix} HTML 文件已正确关闭") | |
| except Exception as close_err: | |
| logger.error(f"{log_prefix} 关闭 HTML 文件时出错: {close_err}") | |
| except Exception as html_err: | |
| logger.error(f"{log_prefix} 获取页面内容失败 ({base_error_name}): {html_err}") | |
| except Exception as dir_err: | |
| logger.error(f"{log_prefix} 创建错误目录或保存快照时发生其他错误 ({base_error_name}): {dir_err}") | |
| async def get_response_via_edit_button( | |
| page: AsyncPage, | |
| req_id: str, | |
| check_client_disconnected: Callable | |
| ) -> Optional[str]: | |
| """通过编辑按钮获取响应""" | |
| logger.info(f"[{req_id}] (Helper) 尝试通过编辑按钮获取响应...") | |
| last_message_container = page.locator('ms-chat-turn').last | |
| edit_button = last_message_container.get_by_label("Edit") | |
| finish_edit_button = last_message_container.get_by_label("Stop editing") | |
| autosize_textarea_locator = last_message_container.locator('ms-autosize-textarea') | |
| actual_textarea_locator = autosize_textarea_locator.locator('textarea') | |
| try: | |
| logger.info(f"[{req_id}] - 尝试悬停最后一条消息以显示 'Edit' 按钮...") | |
| try: | |
| # 对消息容器执行悬停操作 | |
| await last_message_container.hover(timeout=CLICK_TIMEOUT_MS / 2) # 使用一半的点击超时作为悬停超时 | |
| await asyncio.sleep(0.3) # 等待悬停效果生效 | |
| check_client_disconnected("编辑响应 - 悬停后: ") | |
| except Exception as hover_err: | |
| logger.warning(f"[{req_id}] - (get_response_via_edit_button) 悬停最后一条消息失败 (忽略): {type(hover_err).__name__}") | |
| # 即使悬停失败,也继续尝试后续操作,Playwright的expect_async可能会处理 | |
| logger.info(f"[{req_id}] - 定位并点击 'Edit' 按钮...") | |
| try: | |
| from playwright.async_api import expect as expect_async | |
| await expect_async(edit_button).to_be_visible(timeout=CLICK_TIMEOUT_MS) | |
| check_client_disconnected("编辑响应 - 'Edit' 按钮可见后: ") | |
| await edit_button.click(timeout=CLICK_TIMEOUT_MS) | |
| logger.info(f"[{req_id}] - 'Edit' 按钮已点击。") | |
| except Exception as edit_btn_err: | |
| logger.error(f"[{req_id}] - 'Edit' 按钮不可见或点击失败: {edit_btn_err}") | |
| await save_error_snapshot(f"edit_response_edit_button_failed_{req_id}") | |
| return None | |
| check_client_disconnected("编辑响应 - 点击 'Edit' 按钮后: ") | |
| await asyncio.sleep(0.3) | |
| check_client_disconnected("编辑响应 - 点击 'Edit' 按钮后延时后: ") | |
| logger.info(f"[{req_id}] - 从文本区域获取内容...") | |
| response_content = None | |
| textarea_failed = False | |
| try: | |
| await expect_async(autosize_textarea_locator).to_be_visible(timeout=CLICK_TIMEOUT_MS) | |
| check_client_disconnected("编辑响应 - autosize-textarea 可见后: ") | |
| try: | |
| data_value_content = await autosize_textarea_locator.get_attribute("data-value") | |
| check_client_disconnected("编辑响应 - get_attribute data-value 后: ") | |
| if data_value_content is not None: | |
| response_content = str(data_value_content) | |
| logger.info(f"[{req_id}] - 从 data-value 获取内容成功。") | |
| except Exception as data_val_err: | |
| logger.warning(f"[{req_id}] - 获取 data-value 失败: {data_val_err}") | |
| check_client_disconnected("编辑响应 - get_attribute data-value 错误后: ") | |
| if response_content is None: | |
| logger.info(f"[{req_id}] - data-value 获取失败或为None,尝试从内部 textarea 获取 input_value...") | |
| try: | |
| await expect_async(actual_textarea_locator).to_be_visible(timeout=CLICK_TIMEOUT_MS/2) | |
| input_val_content = await actual_textarea_locator.input_value(timeout=CLICK_TIMEOUT_MS/2) | |
| check_client_disconnected("编辑响应 - input_value 后: ") | |
| if input_val_content is not None: | |
| response_content = str(input_val_content) | |
| logger.info(f"[{req_id}] - 从 input_value 获取内容成功。") | |
| except Exception as input_val_err: | |
| logger.warning(f"[{req_id}] - 获取 input_value 也失败: {input_val_err}") | |
| check_client_disconnected("编辑响应 - input_value 错误后: ") | |
| if response_content is not None: | |
| response_content = response_content.strip() | |
| content_preview = response_content[:100].replace('\\n', '\\\\n') | |
| logger.info(f"[{req_id}] - ✅ 最终获取内容 (长度={len(response_content)}): '{content_preview}...'") | |
| else: | |
| logger.warning(f"[{req_id}] - 所有方法 (data-value, input_value) 内容获取均失败或返回 None。") | |
| textarea_failed = True | |
| except Exception as textarea_err: | |
| logger.error(f"[{req_id}] - 定位或处理文本区域时失败: {textarea_err}") | |
| textarea_failed = True | |
| response_content = None | |
| check_client_disconnected("编辑响应 - 获取文本区域错误后: ") | |
| if not textarea_failed: | |
| logger.info(f"[{req_id}] - 定位并点击 'Stop editing' 按钮...") | |
| try: | |
| await expect_async(finish_edit_button).to_be_visible(timeout=CLICK_TIMEOUT_MS) | |
| check_client_disconnected("编辑响应 - 'Stop editing' 按钮可见后: ") | |
| await finish_edit_button.click(timeout=CLICK_TIMEOUT_MS) | |
| logger.info(f"[{req_id}] - 'Stop editing' 按钮已点击。") | |
| except Exception as finish_btn_err: | |
| logger.warning(f"[{req_id}] - 'Stop editing' 按钮不可见或点击失败: {finish_btn_err}") | |
| await save_error_snapshot(f"edit_response_finish_button_failed_{req_id}") | |
| check_client_disconnected("编辑响应 - 点击 'Stop editing' 后: ") | |
| await asyncio.sleep(0.2) | |
| check_client_disconnected("编辑响应 - 点击 'Stop editing' 后延时后: ") | |
| else: | |
| logger.info(f"[{req_id}] - 跳过点击 'Stop editing' 按钮,因为文本区域读取失败。") | |
| return response_content | |
| except ClientDisconnectedError: | |
| logger.info(f"[{req_id}] (Helper Edit) 客户端断开连接。") | |
| raise | |
| except Exception as e: | |
| logger.exception(f"[{req_id}] 通过编辑按钮获取响应过程中发生意外错误") | |
| await save_error_snapshot(f"edit_response_unexpected_error_{req_id}") | |
| return None | |
| async def get_response_via_copy_button( | |
| page: AsyncPage, | |
| req_id: str, | |
| check_client_disconnected: Callable | |
| ) -> Optional[str]: | |
| """通过复制按钮获取响应""" | |
| logger.info(f"[{req_id}] (Helper) 尝试通过复制按钮获取响应...") | |
| last_message_container = page.locator('ms-chat-turn').last | |
| more_options_button = last_message_container.get_by_label("Open options") | |
| copy_markdown_button = page.get_by_role("menuitem", name="Copy markdown") | |
| try: | |
| logger.info(f"[{req_id}] - 尝试悬停最后一条消息以显示选项...") | |
| await last_message_container.hover(timeout=CLICK_TIMEOUT_MS) | |
| check_client_disconnected("复制响应 - 悬停后: ") | |
| await asyncio.sleep(0.5) | |
| check_client_disconnected("复制响应 - 悬停后延时后: ") | |
| logger.info(f"[{req_id}] - 已悬停。") | |
| logger.info(f"[{req_id}] - 定位并点击 '更多选项' 按钮...") | |
| try: | |
| from playwright.async_api import expect as expect_async | |
| await expect_async(more_options_button).to_be_visible(timeout=CLICK_TIMEOUT_MS) | |
| check_client_disconnected("复制响应 - 更多选项按钮可见后: ") | |
| await more_options_button.click(timeout=CLICK_TIMEOUT_MS) | |
| logger.info(f"[{req_id}] - '更多选项' 已点击 (通过 get_by_label)。") | |
| except Exception as more_opts_err: | |
| logger.error(f"[{req_id}] - '更多选项' 按钮 (通过 get_by_label) 不可见或点击失败: {more_opts_err}") | |
| await save_error_snapshot(f"copy_response_more_options_failed_{req_id}") | |
| return None | |
| check_client_disconnected("复制响应 - 点击更多选项后: ") | |
| await asyncio.sleep(0.5) | |
| check_client_disconnected("复制响应 - 点击更多选项后延时后: ") | |
| logger.info(f"[{req_id}] - 定位并点击 '复制 Markdown' 按钮...") | |
| copy_success = False | |
| try: | |
| await expect_async(copy_markdown_button).to_be_visible(timeout=CLICK_TIMEOUT_MS) | |
| check_client_disconnected("复制响应 - 复制按钮可见后: ") | |
| await copy_markdown_button.click(timeout=CLICK_TIMEOUT_MS, force=True) | |
| copy_success = True | |
| logger.info(f"[{req_id}] - 已点击 '复制 Markdown' (通过 get_by_role)。") | |
| except Exception as copy_err: | |
| logger.error(f"[{req_id}] - '复制 Markdown' 按钮 (通过 get_by_role) 点击失败: {copy_err}") | |
| await save_error_snapshot(f"copy_response_copy_button_failed_{req_id}") | |
| return None | |
| if not copy_success: | |
| logger.error(f"[{req_id}] - 未能点击 '复制 Markdown' 按钮。") | |
| return None | |
| check_client_disconnected("复制响应 - 点击复制按钮后: ") | |
| await asyncio.sleep(0.5) | |
| check_client_disconnected("复制响应 - 点击复制按钮后延时后: ") | |
| logger.info(f"[{req_id}] - 正在读取剪贴板内容...") | |
| try: | |
| clipboard_content = await page.evaluate('navigator.clipboard.readText()') | |
| check_client_disconnected("复制响应 - 读取剪贴板后: ") | |
| if clipboard_content: | |
| content_preview = clipboard_content[:100].replace('\n', '\\\\n') | |
| logger.info(f"[{req_id}] - ✅ 成功获取剪贴板内容 (长度={len(clipboard_content)}): '{content_preview}...'") | |
| return clipboard_content | |
| else: | |
| logger.error(f"[{req_id}] - 剪贴板内容为空。") | |
| return None | |
| except Exception as clipboard_err: | |
| if "clipboard-read" in str(clipboard_err): | |
| logger.error(f"[{req_id}] - 读取剪贴板失败: 可能是权限问题。错误: {clipboard_err}") | |
| else: | |
| logger.error(f"[{req_id}] - 读取剪贴板失败: {clipboard_err}") | |
| await save_error_snapshot(f"copy_response_clipboard_read_failed_{req_id}") | |
| return None | |
| except ClientDisconnectedError: | |
| logger.info(f"[{req_id}] (Helper Copy) 客户端断开连接。") | |
| raise | |
| except Exception as e: | |
| logger.exception(f"[{req_id}] 复制响应过程中发生意外错误") | |
| await save_error_snapshot(f"copy_response_unexpected_error_{req_id}") | |
| return None | |
| async def _wait_for_response_completion( | |
| page: AsyncPage, | |
| prompt_textarea_locator: Locator, | |
| submit_button_locator: Locator, | |
| edit_button_locator: Locator, | |
| req_id: str, | |
| check_client_disconnected_func: Callable, | |
| current_chat_id: Optional[str], | |
| timeout_ms=RESPONSE_COMPLETION_TIMEOUT, | |
| initial_wait_ms=INITIAL_WAIT_MS_BEFORE_POLLING | |
| ) -> bool: | |
| """等待响应完成""" | |
| from playwright.async_api import TimeoutError | |
| logger.info(f"[{req_id}] (WaitV3) 开始等待响应完成... (超时: {timeout_ms}ms)") | |
| await asyncio.sleep(initial_wait_ms / 1000) # Initial brief wait | |
| start_time = time.time() | |
| wait_timeout_ms_short = 3000 # 3 seconds for individual element checks | |
| consecutive_empty_input_submit_disabled_count = 0 | |
| while True: | |
| try: | |
| check_client_disconnected_func("等待响应完成 - 循环开始") | |
| except ClientDisconnectedError: | |
| logger.info(f"[{req_id}] (WaitV3) 客户端断开连接,中止等待。") | |
| return False | |
| current_time_elapsed_ms = (time.time() - start_time) * 1000 | |
| if current_time_elapsed_ms > timeout_ms: | |
| logger.error(f"[{req_id}] (WaitV3) 等待响应完成超时 ({timeout_ms}ms)。") | |
| await save_error_snapshot(f"wait_completion_v3_overall_timeout_{req_id}") | |
| return False | |
| try: | |
| check_client_disconnected_func("等待响应完成 - 超时检查后") | |
| except ClientDisconnectedError: | |
| return False | |
| # --- 主要条件: 输入框空 & 提交按钮禁用 --- | |
| is_input_empty = await prompt_textarea_locator.input_value() == "" | |
| is_submit_disabled = False | |
| try: | |
| is_submit_disabled = await submit_button_locator.is_disabled(timeout=wait_timeout_ms_short) | |
| except TimeoutError: | |
| logger.warning(f"[{req_id}] (WaitV3) 检查提交按钮是否禁用超时。为本次检查假定其未禁用。") | |
| try: | |
| check_client_disconnected_func("等待响应完成 - 按钮状态检查后") | |
| except ClientDisconnectedError: | |
| return False | |
| if is_input_empty and is_submit_disabled: | |
| consecutive_empty_input_submit_disabled_count += 1 | |
| if DEBUG_LOGS_ENABLED: | |
| logger.debug(f"[{req_id}] (WaitV3) 主要条件满足: 输入框空,提交按钮禁用 (计数: {consecutive_empty_input_submit_disabled_count})。") | |
| # --- 最终确认: 编辑按钮可见 --- | |
| try: | |
| if await edit_button_locator.is_visible(timeout=wait_timeout_ms_short): | |
| logger.info(f"[{req_id}] (WaitV3) ✅ 响应完成: 输入框空,提交按钮禁用,编辑按钮可见。") | |
| return True # 明确完成 | |
| except TimeoutError: | |
| if DEBUG_LOGS_ENABLED: | |
| logger.debug(f"[{req_id}] (WaitV3) 主要条件满足后,检查编辑按钮可见性超时。") | |
| try: | |
| check_client_disconnected_func("等待响应完成 - 编辑按钮检查后") | |
| except ClientDisconnectedError: | |
| return False | |
| # 启发式完成: 如果主要条件持续满足,但编辑按钮仍未出现 | |
| if consecutive_empty_input_submit_disabled_count >= 3: # 例如,大约 1.5秒 (3 * 0.5秒轮询) | |
| logger.warning(f"[{req_id}] (WaitV3) 响应可能已完成 (启发式): 输入框空,提交按钮禁用,但在 {consecutive_empty_input_submit_disabled_count} 次检查后编辑按钮仍未出现。假定完成。后续若内容获取失败,可能与此有关。") | |
| return True # 启发式完成 | |
| else: # 主要条件 (输入框空 & 提交按钮禁用) 未满足 | |
| consecutive_empty_input_submit_disabled_count = 0 # 重置计数器 | |
| if DEBUG_LOGS_ENABLED: | |
| reasons = [] | |
| if not is_input_empty: | |
| reasons.append("输入框非空") | |
| if not is_submit_disabled: | |
| reasons.append("提交按钮非禁用") | |
| logger.debug(f"[{req_id}] (WaitV3) 主要条件未满足 ({', '.join(reasons)}). 继续轮询...") | |
| await asyncio.sleep(0.5) # 轮询间隔 | |
| async def _get_final_response_content( | |
| page: AsyncPage, | |
| req_id: str, | |
| check_client_disconnected: Callable | |
| ) -> Optional[str]: | |
| """获取最终响应内容""" | |
| logger.info(f"[{req_id}] (Helper GetContent) 开始获取最终响应内容...") | |
| response_content = await get_response_via_edit_button( | |
| page, req_id, check_client_disconnected | |
| ) | |
| if response_content is not None: | |
| logger.info(f"[{req_id}] (Helper GetContent) ✅ 成功通过编辑按钮获取内容。") | |
| return response_content | |
| logger.warning(f"[{req_id}] (Helper GetContent) 编辑按钮方法失败或返回空,回退到复制按钮方法...") | |
| response_content = await get_response_via_copy_button( | |
| page, req_id, check_client_disconnected | |
| ) | |
| if response_content is not None: | |
| logger.info(f"[{req_id}] (Helper GetContent) ✅ 成功通过复制按钮获取内容。") | |
| return response_content | |
| logger.error(f"[{req_id}] (Helper GetContent) 所有获取响应内容的方法均失败。") | |
| await save_error_snapshot(f"get_content_all_methods_failed_{req_id}") | |
| return None |