""" Gemini自动化登录模块(用于新账号注册) """ import os import json import random import re import string import time from datetime import datetime, timedelta, timezone from typing import Optional from urllib.parse import quote from DrissionPage import ChromiumPage, ChromiumOptions from core.base_task_service import TaskCancelledError # 常量 AUTH_HOME_URL = "https://auth.business.gemini.google/login" # Linux 下常见的 Chromium 路径 CHROMIUM_PATHS = [ "/usr/bin/chromium", "/usr/bin/chromium-browser", "/usr/bin/google-chrome", "/usr/bin/google-chrome-stable", ] # 注册时随机使用的真实英文姓名(避免明显的机器人特征) REGISTER_NAMES = [ "James Smith", "John Johnson", "Robert Williams", "Michael Brown", "William Jones", "David Garcia", "Mary Miller", "Patricia Davis", "Jennifer Rodriguez", "Linda Martinez", "Barbara Anderson", "Susan Thomas", "Jessica Jackson", "Sarah White", "Karen Harris", "Lisa Martin", "Nancy Thompson", "Betty Garcia", "Margaret Martinez", "Sandra Robinson", "Ashley Clark", "Dorothy Rodriguez", "Emma Lewis", "Olivia Lee", "Ava Walker", "Emily Hall", "Abigail Allen", "Madison Young", "Elizabeth Hernandez", "Charlotte King", ] # 常见桌面分辨率(避免固定 1280x800 成为指纹) COMMON_VIEWPORTS = [ (1366, 768), (1440, 900), (1536, 864), (1280, 720), (1920, 1080), (1600, 900), (1280, 800), (1360, 768), ] def _find_chromium_path() -> Optional[str]: """查找可用的 Chromium/Chrome 浏览器路径""" for path in CHROMIUM_PATHS: if os.path.isfile(path) and os.access(path, os.X_OK): return path return None class GeminiAutomation: """Gemini自动化登录""" def __init__( self, user_agent: str = "", proxy: str = "", headless: bool = True, timeout: int = 60, log_callback=None, ) -> None: self.user_agent = user_agent or self._get_ua() self.proxy = proxy self.headless = headless self.timeout = timeout self.log_callback = log_callback self._page = None self._user_data_dir = None self._last_send_error = "" def stop(self) -> None: """外部请求停止:尽力关闭浏览器实例。""" page = self._page if page: try: page.quit() except Exception: pass def login_and_extract(self, email: str, mail_client, is_new_account: bool = False) -> dict: """执行登录并提取配置""" page = None user_data_dir = None try: page = self._create_page() user_data_dir = getattr(page, 'user_data_dir', None) self._page = page self._user_data_dir = user_data_dir return self._run_flow(page, email, mail_client, is_new_account=is_new_account) except TaskCancelledError: raise except Exception as exc: self._log("error", f"automation error: {exc}") return {"success": False, "error": str(exc)} finally: if page: try: page.quit() except Exception: pass self._page = None self._cleanup_user_data(user_data_dir) self._user_data_dir = None def _create_page(self) -> ChromiumPage: """创建浏览器页面""" options = ChromiumOptions() # 自动检测 Chromium 浏览器路径(Linux/Docker 环境) chromium_path = _find_chromium_path() if chromium_path: options.set_browser_path(chromium_path) options.set_argument("--incognito") options.set_argument("--no-sandbox") options.set_argument("--disable-dev-shm-usage") options.set_argument("--disable-setuid-sandbox") options.set_argument("--disable-blink-features=AutomationControlled") # 随机窗口尺寸(避免固定分辨率成为指纹) vw, vh = random.choice(COMMON_VIEWPORTS) options.set_argument(f"--window-size={vw},{vh}") options.set_user_agent(self.user_agent) # 防止 WebRTC 泄露真实 IP(即使使用代理也可能暴露) options.set_argument("--disable-webrtc") options.set_argument("--enforce-webrtc-ip-handling-policy") options.set_pref("webrtc.ip_handling_policy", "disable_non_proxied_udp") options.set_pref("webrtc.multiple_routes_enabled", False) options.set_pref("webrtc.nonproxied_udp_enabled", False) # 语言设置(确保使用中文界面) options.set_argument("--lang=zh-CN") options.set_pref("intl.accept_languages", "zh-CN,zh") if self.proxy: options.set_argument(f"--proxy-server={self.proxy}") if self.headless: # 使用新版无头模式,更接近真实浏览器 options.set_argument("--headless=new") options.set_argument("--disable-gpu") options.set_argument("--no-first-run") options.set_argument("--disable-extensions") # 反检测参数 options.set_argument("--disable-infobars") options.set_argument("--enable-features=NetworkService,NetworkServiceInProcess") options.auto_port() page = ChromiumPage(options) page.set.timeouts(self.timeout) # 最小化 JS 注入:只设置 window.chrome(不使用 Object.defineProperty,避免被 reCAPTCHA 检测) # 注意:DrissionPage 不像 Selenium 那样暴露 navigator.webdriver,无需额外隐藏 try: page.run_cdp("Page.addScriptToEvaluateOnNewDocument", source=""" // 确保 window.chrome 存在(headless 模式下可能缺失) if (!window.chrome) { window.chrome = {runtime: {}, loadTimes: function(){return {}}, csi: function(){return {}}}; } """) except Exception: pass return page def _extract_xsrf_token(self, page) -> str: """从页面中提取真实的 XSRF Token(避免硬编码被标黑)""" try: html = page.html or "" # 尝试从 meta 标签提取 m = re.search(r'name=["\']xsrf-token["\']\s+content=["\']([^"\']+)["\']', html, re.IGNORECASE) if m: self._log("info", "🔑 从 meta 标签提取到 XSRF token") return m.group(1) # 尝试从隐藏 input 提取 m = re.search(r'name=["\']xsrfToken["\'][^>]*value=["\']([A-Za-z0-9_-]{20,})["\']', html) if m: self._log("info", "🔑 从 input 提取到 XSRF token") return m.group(1) # 尝试从 JS 变量提取 m = re.search(r'xsrfToken["\']?\s*[=:]\s*["\']([A-Za-z0-9_-]{20,})["\']', html) if m: self._log("info", "🔑 从 JS 提取到 XSRF token") return m.group(1) # 尝试从 URL 参数提取 m = re.search(r'xsrfToken=([A-Za-z0-9_-]{20,})', html) if m: self._log("info", "🔑 从 URL 参数提取到 XSRF token") return m.group(1) except Exception as e: self._log("warning", f"⚠️ XSRF token 提取异常: {e}") self._log("warning", "⚠️ 未能从页面提取 XSRF token,使用备用值") return "GXO_B0wnNhs6UQJZMcrSbTsbEEs" def _run_flow(self, page, email: str, mail_client, is_new_account: bool = False) -> dict: """执行登录流程(is_new_account=True 时启用注册专用的增强用户名处理)""" # 记录任务开始时间,用于邮件时间过滤(全流程固定,不随重发更新) from datetime import datetime task_start_time = datetime.now() # Step 1: 导航到登录页面 self._log("info", f"🌐 打开登录页面: {email}") page.get(AUTH_HOME_URL, timeout=self.timeout) time.sleep(random.uniform(2, 4)) # 从页面动态提取 XSRF token(避免硬编码被 Google 标黑) xsrf_token = self._extract_xsrf_token(page) # 设置 XSRF Cookie try: self._log("info", "🍪 设置 XSRF Cookie...") page.set.cookies({ "name": "__Host-AP_SignInXsrf", "value": xsrf_token, "url": AUTH_HOME_URL, "path": "/", "secure": True, }) except Exception as e: self._log("warning", f"⚠️ Cookie 设置失败: {e}") # Step 1.5: 通过 URL 方式提交邮箱(稳定,不触发风控) login_hint = quote(email, safe="") login_url = f"https://auth.business.gemini.google/login/email?continueUrl=https%3A%2F%2Fbusiness.gemini.google%2F&loginHint={login_hint}&xsrfToken={xsrf_token}" # 先启动网络监听,再导航(避免漏掉页面加载期间的请求) try: page.listen.start( targets=["batchexecute"], is_regex=False, method=("POST",), res_type=("XHR", "FETCH"), ) except Exception: pass self._log("info", "📧 使用 URL 方式提交邮箱...") page.get(login_url, timeout=self.timeout) time.sleep(random.uniform(3, 5)) # 模拟真实用户行为:页面加载后随机滚动 self._random_scroll(page) # Step 2: 检查当前页面状态 current_url = page.url self._log("info", f"📍 当前 URL: {current_url}") # 检测 signin-error 页面(极端情况,一般 URL 方式不会触发) if "signin-error" in current_url: self._log("error", "❌ 进入 signin-error 页面,可能是代理或网络问题") self._save_screenshot(page, "signin_error") return {"success": False, "error": "signin-error: token rejected by Google, try changing proxy"} has_business_params = "business.gemini.google" in current_url and "csesidx=" in current_url and "/cid/" in current_url if has_business_params: self._log("info", "✅ 已登录,提取配置") return self._extract_config(page, email) # 检测 403 Access Restricted(刷新/登录时账户可能已被封禁) access_error = self._check_access_restricted(page, email) if access_error: return access_error # Step 3: 点击发送验证码按钮(最多5轮,适度退避间隔) self._log("info", "📧 发送验证码...") max_send_rounds = 5 send_round_delays = [10, 10, 15, 15, 20] send_round = 0 while True: send_round += 1 if self._click_send_code_button(page): break if send_round >= max_send_rounds: self._log("error", "❌ 验证码发送失败(可能触发风控),建议更换代理IP") self._save_screenshot(page, "send_code_button_failed") return {"success": False, "error": "send code failed after retries"} delay = send_round_delays[min(send_round - 1, len(send_round_delays) - 1)] self._log("warning", f"⚠️ 发送失败,{delay}秒后重试 ({send_round}/{max_send_rounds})") time.sleep(delay) # Step 4: 等待验证码输入框出现 code_input = self._wait_for_code_input(page) if not code_input: self._log("error", "❌ 验证码输入框未出现") self._save_screenshot(page, "code_input_missing") return {"success": False, "error": "code input not found"} # Step 5: 轮询邮件获取验证码(云环境默认延长,支持环境变量覆盖) try: code_timeout = int(str(os.getenv("REGISTER_CODE_TIMEOUT_SECONDS", "60")).strip() or "60") except Exception: code_timeout = 60 try: code_interval = int(str(os.getenv("REGISTER_CODE_INTERVAL_SECONDS", "5")).strip() or "5") except Exception: code_interval = 5 code_timeout = max(15, code_timeout) code_interval = max(2, code_interval) self._log("info", f"📬 等待邮箱验证码... (timeout={code_timeout}s, interval={code_interval}s)") code = mail_client.poll_for_code(timeout=code_timeout, interval=code_interval, since_time=task_start_time) if not code: self._log("warning", "⚠️ 验证码超时,等待后重新发送...") time.sleep(random.uniform(12, 18)) # 尝试点击重新发送按钮 if self._click_resend_code_button(page): # 再次轮询验证码 code = mail_client.poll_for_code(timeout=code_timeout, interval=code_interval, since_time=task_start_time) if not code: self._log("error", "❌ 重新发送后仍未收到验证码") self._save_screenshot(page, "code_timeout_after_resend") return {"success": False, "error": "verification code timeout after resend"} else: self._log("error", "❌ 验证码超时且未找到重新发送按钮") self._save_screenshot(page, "code_timeout") return {"success": False, "error": "verification code timeout"} self._log("info", f"✅ 收到验证码: {code}") # Step 6: 输入验证码并提交 code_input = page.ele("css:input[jsname='ovqh0b']", timeout=3) or \ page.ele("css:input[type='tel']", timeout=2) if not code_input: self._log("error", "❌ 验证码输入框已失效") return {"success": False, "error": "code input expired"} # 尝试模拟人类输入,失败则降级到直接注入 self._log("info", "⌨️ 输入验证码...") if not self._simulate_human_input(code_input, code): self._log("warning", "⚠️ 模拟输入失败,降级为直接输入") code_input.input(code, clear=True) time.sleep(random.uniform(0.4, 0.8)) # 提交验证码:先回车,再找验证按钮兜底 self._log("info", "⏎ 提交验证码") code_input.input("\n") time.sleep(random.uniform(1, 2)) # 如果回车没触发,找验证按钮点击 if "verify-oob-code" in page.url: verify_btn = self._find_verify_button(page) if verify_btn: try: verify_btn.click() self._log("info", "✅ 已点击验证按钮(兜底)") except Exception: pass # [注册专用] 验证码提交后先等几秒让页面跳转,再检查 403 if is_new_account: time.sleep(3) access_error = self._check_access_restricted(page, email) if access_error: return access_error self._log("info", "📝 [注册] 验证码已提交,等待姓名输入页面...") if self._handle_username_setup(page, is_new_account=True): self._log("info", "✅ 姓名填写完成,等待工作台 URL...") if self._wait_for_business_params(page, timeout=45): self._log("info", "🎊 注册成功,提取配置...") return self._extract_config(page, email) # 姓名步骤失败或未出现,继续走通用流程兜底 self._log("info", "⚠️ 姓名步骤未完成,走通用流程兜底...") # Step 7: 等待页面自动重定向(提交验证码后 Google 会自动跳转) self._log("info", "⏳ 等待验证后跳转...") time.sleep(random.uniform(10, 15)) # 记录当前 URL 状态 current_url = page.url self._log("info", f"📍 验证后 URL: {current_url}") # 检查是否还停留在验证码页面(说明提交失败) if "verify-oob-code" in current_url: self._log("error", "❌ 验证码提交失败") self._save_screenshot(page, "verification_submit_failed") return {"success": False, "error": "verification code submission failed"} # Step 8: 处理协议页面(如果有) self._handle_agreement_page(page) # Step 8.5: 检测 403 Access Restricted 页面 access_error = self._check_access_restricted(page, email) if access_error: return access_error # Step 9: 检查是否已经在正确的页面 current_url = page.url has_business_params = "business.gemini.google" in current_url and "csesidx=" in current_url and "/cid/" in current_url if has_business_params: return self._extract_config(page, email) # Step 10: 如果不在正确的页面,尝试导航 if "business.gemini.google" not in current_url: page.get("https://business.gemini.google/", timeout=self.timeout) time.sleep(random.uniform(4, 7)) # Step 11: 检查是否需要设置用户名(仅登录刷新走此路径,注册已在早期处理) if not is_new_account and "cid" not in page.url: if self._handle_username_setup(page): time.sleep(random.uniform(4, 7)) # Step 12: 再次检测 403(导航后可能出现) access_error = self._check_access_restricted(page, email) if access_error: return access_error # Step 13: 等待 URL 参数生成(csesidx 和 cid) if not self._wait_for_business_params(page): page.refresh() time.sleep(random.uniform(4, 7)) if not self._wait_for_business_params(page): self._log("error", "❌ URL 参数生成失败") self._save_screenshot(page, "params_missing") return {"success": False, "error": "URL parameters not found"} # Step 13: 提取配置 self._log("info", "🎊 登录成功,提取配置...") return self._extract_config(page, email) def _click_send_code_button(self, page) -> bool: """点击发送验证码按钮(如果需要)""" time.sleep(random.uniform(1.5, 3)) max_send_attempts = 5 # 适度退避延迟序列(秒) retry_delays = [10, 10, 15, 15, 20] # 方法1: 直接通过ID查找 direct_btn = page.ele("#sign-in-with-email", timeout=5) if direct_btn: for attempt in range(1, max_send_attempts + 1): try: self._last_send_error = "" self._human_click(page, direct_btn) if self._verify_code_send_by_network(page) or self._verify_code_send_status(page): self._stop_listen(page) return True delay = retry_delays[min(attempt - 1, len(retry_delays) - 1)] if self._last_send_error == "captcha_check_failed": self._log("error", f"❌ 触发风控,建议更换代理IP ({attempt}/{max_send_attempts})") else: self._log("warning", f"⚠️ 发送失败,{delay}秒后重试 ({attempt}/{max_send_attempts})") time.sleep(delay) except Exception as e: self._log("warning", f"⚠️ 点击失败: {e}") self._stop_listen(page) return False # 方法2: 通过关键词查找 keywords = ["通过电子邮件发送验证码", "通过电子邮件发送", "email", "Email", "Send code", "Send verification", "Verification code"] try: buttons = page.eles("tag:button") for btn in buttons: text = (btn.text or "").strip() if text and any(kw in text for kw in keywords): for attempt in range(1, max_send_attempts + 1): try: self._last_send_error = "" self._human_click(page, btn) if self._verify_code_send_by_network(page) or self._verify_code_send_status(page): self._stop_listen(page) return True delay = retry_delays[min(attempt - 1, len(retry_delays) - 1)] if self._last_send_error == "captcha_check_failed": self._log("error", f"❌ 触发风控,建议更换代理IP ({attempt}/{max_send_attempts})") else: self._log("warning", f"⚠️ 发送失败,{delay}秒后重试 ({attempt}/{max_send_attempts})") time.sleep(delay) except Exception as e: self._log("warning", f"⚠️ 点击失败: {e}") self._stop_listen(page) return False except Exception as e: self._log("warning", f"⚠️ 搜索按钮异常: {e}") # 检查是否在 signin-error 页面(不应该继续尝试发送) if "signin-error" in (page.url or ""): self._stop_listen(page) self._log("error", "❌ 在 signin-error 页面,无法发送验证码") return False # 检查是否已经在验证码输入页面 code_input = page.ele("css:input[jsname='ovqh0b']", timeout=2) or page.ele("css:input[name='pinInput']", timeout=1) if code_input: self._stop_listen(page) self._log("info", "✅ 已在验证码输入页面") # 直接点击重新发送按钮(不管之前是否发送过) if self._click_resend_code_button(page): self._log("info", "✅ 已点击重新发送按钮") return True else: self._log("warning", "⚠️ 未找到重新发送按钮,继续流程") return True self._stop_listen(page) self._log("error", "❌ 未找到发送验证码按钮") return False def _stop_listen(self, page) -> None: """安全地停止网络监听""" try: if hasattr(page, 'listen') and page.listen: page.listen.stop() except Exception: pass def _verify_code_send_by_network(self, page) -> bool: """通过监听网络请求验证验证码是否成功发送""" try: time.sleep(1) packets = [] max_wait_seconds = 6 deadline = time.time() + max_wait_seconds try: while time.time() < deadline: got_any = False for packet in page.listen.steps(timeout=1, gap=1): packets.append(packet) got_any = True if got_any: time.sleep(0.2) else: break except Exception: return False if not packets: return False # 保存网络日志(仅用于调试) self._save_network_packets(packets) found_batchexecute = False found_batchexecute_error = False for packet in packets: try: url = str(packet.url) if hasattr(packet, 'url') else str(packet) if 'batchexecute' in url: found_batchexecute = True try: response = packet.response if hasattr(packet, 'response') else None if response and hasattr(response, 'raw_body'): body = response.raw_body raw_body_str = str(body) if "CAPTCHA_CHECK_FAILED" in raw_body_str: found_batchexecute_error = True self._last_send_error = "captcha_check_failed" elif "SendEmailOtpError" in raw_body_str: found_batchexecute_error = True self._last_send_error = "send_email_otp_error" except Exception: pass except Exception: continue if found_batchexecute: if found_batchexecute_error: return False return True else: return False except Exception: return False def _verify_code_send_status(self, page) -> bool: """检测页面提示判断是否发送成功""" time.sleep(random.uniform(1.5, 3)) try: success_keywords = ["验证码已发送", "code sent", "email sent", "check your email", "已发送"] error_keywords = [ "出了点问题", "something went wrong", "error", "failed", "try again", "稍后再试", "选择其他登录方法" ] selectors = [ "css:.zyTWof-gIZMF", "css:[role='alert']", "css:aside", ] for selector in selectors: try: elements = page.eles(selector, timeout=1) for elem in elements[:20]: text = (elem.text or "").strip() if not text: continue if any(kw in text for kw in error_keywords): return False if any(kw in text for kw in success_keywords): return True except Exception: continue return True except Exception: return True def _truncate_text(self, text: str, max_len: int = 2000) -> str: if text is None: return "" if len(text) <= max_len: return text return text[:max_len] + f"...(truncated, total={len(text)})" def _save_network_packets(self, packets) -> None: """保存网络日志(仅用于调试)""" try: from core.storage import _data_file_path base_dir = _data_file_path(os.path.join("logs", "network")) os.makedirs(base_dir, exist_ok=True) ts = datetime.now().strftime("%Y%m%d-%H%M%S") file_path = os.path.join(base_dir, f"network-{ts}.jsonl") def safe_str(value): try: return value if isinstance(value, str) else str(value) except Exception: return "" with open(file_path, "a", encoding="utf-8") as f: for packet in packets: try: req = packet.request if hasattr(packet, "request") else None resp = packet.response if hasattr(packet, "response") else None fail = packet.fail_info if hasattr(packet, "fail_info") else None item = { "url": safe_str(packet.url) if hasattr(packet, "url") else safe_str(packet), "method": safe_str(packet.method) if hasattr(packet, "method") else "UNKNOWN", "resourceType": safe_str(packet.resourceType) if hasattr(packet, "resourceType") else "", "is_failed": bool(packet.is_failed) if hasattr(packet, "is_failed") else False, "fail_info": safe_str(fail) if fail else "", "request": { "headers": req.headers if req and hasattr(req, "headers") else {}, "postData": req.postData if req and hasattr(req, "postData") else "", }, "response": { "status": resp.status if resp and hasattr(resp, "status") else 0, "headers": resp.headers if resp and hasattr(resp, "headers") else {}, "raw_body": resp.raw_body if resp and hasattr(resp, "raw_body") else "", }, } f.write(json.dumps(item, ensure_ascii=False) + "\n") except Exception as e: f.write(json.dumps({"error": safe_str(e)}, ensure_ascii=False) + "\n") except Exception: pass def _wait_for_code_input(self, page, timeout: int = 30): """等待验证码输入框出现""" selectors = [ "css:input[jsname='ovqh0b']", "css:input[type='tel']", "css:input[name='pinInput']", "css:input[autocomplete='one-time-code']", ] for _ in range(timeout // 2): for selector in selectors: try: el = page.ele(selector, timeout=1) if el: return el except Exception: continue time.sleep(2) return None def _simulate_human_input(self, element, text: str) -> bool: """模拟人类输入(逐字符输入,带非均匀延迟) Args: element: 输入框元素 text: 要输入的文本 Returns: bool: 是否成功 """ try: # 先点击输入框获取焦点 element.click() time.sleep(random.uniform(0.2, 0.5)) # 逐字符输入,模拟真实打字节奏 for i, char in enumerate(text): element.input(char) # 基础延迟 80-180ms(正常打字速度) delay = random.uniform(0.08, 0.18) # 每3-5个字符偶尔有更长的停顿(模拟犹豫/看屏幕) if i > 0 and random.random() < 0.2: delay += random.uniform(0.2, 0.5) time.sleep(delay) # 输入完成后停顿(模拟核对) time.sleep(random.uniform(0.3, 0.8)) return True except Exception: return False def _human_click(self, page, element) -> None: """模拟人类点击:先移动鼠标到元素附近,再点击""" try: # 尝试用 actions 链模拟鼠标移动 + 点击 page.actions.move_to(element) time.sleep(random.uniform(0.1, 0.3)) page.actions.click() except Exception: # 降级为直接点击 element.click() def _random_scroll(self, page) -> None: """模拟真实用户的页面滚动行为""" try: scroll_amount = random.randint(50, 200) page.run_js(f"window.scrollBy(0, {scroll_amount})") time.sleep(random.uniform(0.3, 0.8)) # 有时候滚回去一点 if random.random() < 0.3: page.run_js(f"window.scrollBy(0, -{random.randint(20, 80)})") time.sleep(random.uniform(0.2, 0.5)) except Exception: pass def _find_verify_button(self, page): """查找验证按钮(排除重新发送按钮)""" try: buttons = page.eles("tag:button") for btn in buttons: text = (btn.text or "").strip().lower() if text and "重新" not in text and "发送" not in text and "resend" not in text and "send" not in text: return btn except Exception: pass return None def _click_resend_code_button(self, page) -> bool: """点击重新发送验证码按钮""" time.sleep(random.uniform(1.5, 3)) # 查找包含重新发送关键词的按钮(与 _find_verify_button 相反) try: buttons = page.eles("tag:button") for btn in buttons: text = (btn.text or "").strip().lower() if text and ("重新" in text or "resend" in text): try: self._log("info", f"🔄 点击重新发送按钮") self._human_click(page, btn) time.sleep(random.uniform(1.5, 3)) return True except Exception: pass except Exception: pass return False def _check_access_restricted(self, page, email: str = "") -> dict | None: """检测 403 Access Restricted 页面,返回错误 dict 或 None""" domain = email.split("@")[1] if "@" in email else "unknown" error_msg = f"403 域名封禁 ({domain})" # 方法1: 搜索 h1 标签 try: h1 = page.ele("tag:h1", timeout=2) h1_text = h1.text if h1 else "" if h1_text and "Access Restricted" in h1_text: self._log("error", "⛔ 403 Access Restricted: email banned by Google") self._log("error", f"⛔ 403 访问受限,域名 {domain} 可能已被 Google 封禁") self._save_screenshot(page, "access_restricted_403") return {"success": False, "error": error_msg} except Exception: pass # 方法2: body 文本 try: body = page.ele("tag:body", timeout=2) body_text = (body.text or "")[:500] if body else "" if "Access Restricted" in body_text: self._log("error", "⛔ 403 Access Restricted: email banned by Google") self._log("error", f"⛔ 403 访问受限,域名 {domain} 可能已被 Google 封禁") self._save_screenshot(page, "access_restricted_403") return {"success": False, "error": error_msg} except Exception: pass # 方法3: page.html 源码 try: html = (page.html or "")[:2000] if "Access Restricted" in html: self._log("error", "⛔ 403 Access Restricted: email banned by Google") self._log("error", f"⛔ 403 访问受限,域名 {domain} 可能已被 Google 封禁") self._save_screenshot(page, "access_restricted_403") return {"success": False, "error": error_msg} except Exception: pass return None def _handle_agreement_page(self, page) -> None: """处理协议页面""" if "/admin/create" in page.url: agree_btn = page.ele("css:button.agree-button", timeout=5) if agree_btn: self._human_click(page, agree_btn) time.sleep(random.uniform(2, 4)) def _wait_for_cid(self, page, timeout: int = 10) -> bool: """等待URL包含cid""" for _ in range(timeout): if "cid" in page.url: return True time.sleep(1) return False def _wait_for_business_params(self, page, timeout: int = 30) -> bool: """等待业务页面参数生成(csesidx 和 cid)""" for _ in range(timeout): url = page.url if "csesidx=" in url and "/cid/" in url: return True time.sleep(1) return False def _handle_username_setup(self, page, is_new_account: bool = False) -> bool: """处理用户名设置页面(is_new_account=True 时启用按钮兜底和延长超时)""" current_url = page.url if "auth.business.gemini.google/login" in current_url: return False # 精准选择器(参考实际页面 DOM,优先级从高到低) selectors = [ "css:input[formcontrolname='fullName']", "css:input#mat-input-0", "css:input[placeholder='全名']", "css:input[placeholder='Full name']", "css:input[name='displayName']", "css:input[aria-label*='用户名' i]", "css:input[aria-label*='display name' i]", "css:input[type='text']", ] # 轮询等待输入框出现(最多30秒,每秒检查一次) # 与参考代码对齐:页面加载慢时不会过早放弃 username_input = None self._log("info", "⏳ 等待用户名输入框出现(最多30秒)...") for i in range(30): for selector in selectors: try: el = page.ele(selector, timeout=1) if el: username_input = el self._log("info", f"✅ 找到用户名输入框: {selector}") break except Exception: continue if username_input: break time.sleep(1) if not username_input: self._log("warning", "⚠️ 30秒内未找到用户名输入框,跳过此步骤") return False name = random.choice(REGISTER_NAMES) self._log("info", f"✏️ 输入姓名: {name}") try: # 清空输入框 username_input.click() time.sleep(random.uniform(0.2, 0.5)) username_input.clear() time.sleep(random.uniform(0.1, 0.3)) # 尝试模拟人类输入,失败则降级到直接注入 if not self._simulate_human_input(username_input, name): username_input.input(name) time.sleep(0.3) # 回车提交 username_input.input("\n") if is_new_account: # 注册专用:回车后等待1.5秒,若未跳转则用按钮兜底 time.sleep(random.uniform(1.5, 3)) if "cid" not in page.url: self._log("info", "⌨️ 回车未跳转,尝试点击提交按钮...") try: for btn in page.eles("tag:button"): try: if btn.is_displayed() and btn.is_enabled(): btn.click() self._log("info", "✅ 已点击提交按钮(兜底)") time.sleep(1) break except Exception: continue except Exception as e: self._log("warning", f"⚠️ 按钮兜底失败: {e}") # 注册专用:等待45秒,失败则刷新再等15秒 if not self._wait_for_cid(page, timeout=45): self._log("warning", "⚠️ 用户名提交后未检测到 cid 参数,尝试刷新...") page.refresh() time.sleep(random.uniform(2, 4)) if not self._wait_for_cid(page, timeout=15): self._log("error", "❌ 刷新后仍未检测到 cid 参数") self._save_screenshot(page, "step7_after_verify") return False else: # 登录刷新:原有30秒逻辑 if not self._wait_for_cid(page, timeout=30): self._log("warning", "⚠️ 用户名提交后未检测到 cid 参数") return False return True except Exception as e: self._log("warning", f"⚠️ 用户名设置异常: {e}") return False def _extract_config(self, page, email: str) -> dict: """提取配置(轮询等待 cookie 到位)""" try: if "cid/" not in page.url: page.get("https://business.gemini.google/", timeout=self.timeout) time.sleep(random.uniform(2, 4)) url = page.url if "cid/" not in url: return {"success": False, "error": "cid not found"} config_id = url.split("cid/")[1].split("?")[0].split("/")[0] csesidx = url.split("csesidx=")[1].split("&")[0] if "csesidx=" in url else "" # 轮询等待关键 cookie 到位(最多10秒) ses = None host = None ses_obj = None for _ in range(10): cookies = page.cookies() ses = next((c["value"] for c in cookies if c["name"] == "__Secure-C_SES"), None) host = next((c["value"] for c in cookies if c["name"] == "__Host-C_OSES"), None) ses_obj = next((c for c in cookies if c["name"] == "__Secure-C_SES"), None) if ses and host: break time.sleep(1) if not ses or not host: self._log("warning", f"⚠️ Cookie 不完整 (ses={'有' if ses else '无'}, host={'有' if host else '无'})") # 使用北京时区,确保时间计算正确(Cookie expiry 是 UTC 时间戳) beijing_tz = timezone(timedelta(hours=8)) if ses_obj and "expiry" in ses_obj: cookie_expire_beijing = datetime.fromtimestamp(ses_obj["expiry"], tz=beijing_tz) expires_at = (cookie_expire_beijing - timedelta(hours=12)).strftime("%Y-%m-%d %H:%M:%S") else: expires_at = (datetime.now(beijing_tz) + timedelta(hours=12)).strftime("%Y-%m-%d %H:%M:%S") config = { "id": email, "csesidx": csesidx, "config_id": config_id, "secure_c_ses": ses, "host_c_oses": host, "expires_at": expires_at, } # 提取试用期信息 trial_end = self._extract_trial_end(page, csesidx, config_id) if trial_end: config["trial_end"] = trial_end return {"success": True, "config": config} except Exception as e: return {"success": False, "error": str(e)} def _extract_trial_end(self, page, csesidx: str, config_id: str) -> Optional[str]: """从页面中提取试用期到期日期,不跳转到可能 400 的深层路径""" # re 已在文件顶部导入 try: self._log("info", "📅 获取试用期信息...") def _days_to_end_date(days: int) -> str: end_date = (datetime.now(timezone(timedelta(hours=8))) + timedelta(days=days)).strftime("%Y-%m-%d") self._log("info", f"📅 试用期剩余 {days} 天,到期日: {end_date}") return end_date def _search_page_source(source: str) -> Optional[str]: """在页面源码中搜索试用期信息""" # 格式1: "daysLeft":29 (JSON数据) m = re.search(r'"daysLeft"\s*:\s*(\d+)', source) if m: return _days_to_end_date(int(m.group(1))) # 格式2: "trialDaysRemaining":29 m = re.search(r'"trialDaysRemaining"\s*:\s*(\d+)', source) if m: return _days_to_end_date(int(m.group(1))) # 格式3: 日期数组 "[2026,3,25]" 形式 (batchexecute格式) m = re.search(r'\[(\d{4}),(\d{1,2}),(\d{1,2})\].*?\[(\d{4}),(\d{1,2}),(\d{1,2})\]', source) if m: # 取第二个日期(结束日期) try: end_date = f"{m.group(4):0>4}-{int(m.group(5)):02d}-{int(m.group(6)):02d}" # 简单校验年份合理 if 2025 <= int(m.group(4)) <= 2030: self._log("info", f"📅 试用期到期日: {end_date}") return end_date except Exception: pass # 格式4: "29 days left" 或 "还剩29天" m = re.search(r'(\d+)\s*days?\s*left', source, re.IGNORECASE) if m: return _days_to_end_date(int(m.group(1))) m = re.search(r'还剩\s*(\d+)\s*天', source) if m: return _days_to_end_date(int(m.group(1))) return None # ——— 方式1: 当前页面(刚登录完,不需要跳转)——— try: source = page.html result = _search_page_source(source or "") if result: return result except Exception: pass # ——— 方式2: 跳转到 /settings(不带 billing/plans 后缀,SPA可以处理)——— try: settings_url = f"https://business.gemini.google/cid/{config_id}/settings?csesidx={csesidx}" page.get(settings_url, timeout=self.timeout) time.sleep(random.uniform(1.5, 3)) source = page.html result = _search_page_source(source or "") if result: return result except Exception: pass # ——— 方式3: 跳转到主页(最保险)——— try: main_url = f"https://business.gemini.google/cid/{config_id}?csesidx={csesidx}" page.get(main_url, timeout=self.timeout) time.sleep(random.uniform(1.5, 3)) source = page.html result = _search_page_source(source or "") if result: return result except Exception: pass self._log("warning", "⚠️ 未能获取试用期信息(页面中未找到相关数据)") return None except Exception as e: self._log("warning", f"⚠️ 获取试用期失败: {e}") return None def _save_screenshot(self, page, name: str) -> None: """保存截图""" try: from core.storage import _data_file_path screenshot_dir = _data_file_path("automation") os.makedirs(screenshot_dir, exist_ok=True) path = os.path.join(screenshot_dir, f"{name}_{int(time.time())}.png") page.get_screenshot(path=path) except Exception: pass def _log(self, level: str, message: str) -> None: """记录日志""" if self.log_callback: try: self.log_callback(level, message) except TaskCancelledError: raise except Exception: pass def _cleanup_user_data(self, user_data_dir: Optional[str]) -> None: """清理浏览器用户数据目录""" if not user_data_dir: return try: import shutil if os.path.exists(user_data_dir): shutil.rmtree(user_data_dir, ignore_errors=True) except Exception: pass @staticmethod def _get_ua() -> str: """生成随机User-Agent(使用当前主流 Chrome 版本)""" major = random.choice([132, 133, 134, 135]) v = f"{major}.0.{random.randint(6800, 6950)}.{random.randint(50, 150)}" return f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{v} Safari/537.36"