gemini-business2api-github / core /gemini_automation.py
lijunke
Fix GPTMail code polling defaults and HF timeout stability
da3203c
"""
Gemini自动化登录模块(用于新账号注册)
"""
import os
import json
import random
import re
import string
import time
from datetime import datetime, timedelta, timezone
from typing import Optional
from urllib.parse import quote
from DrissionPage import ChromiumPage, ChromiumOptions
from core.base_task_service import TaskCancelledError
# 常量
AUTH_HOME_URL = "https://auth.business.gemini.google/login"
# Linux 下常见的 Chromium 路径
CHROMIUM_PATHS = [
"/usr/bin/chromium",
"/usr/bin/chromium-browser",
"/usr/bin/google-chrome",
"/usr/bin/google-chrome-stable",
]
# 注册时随机使用的真实英文姓名(避免明显的机器人特征)
REGISTER_NAMES = [
"James Smith", "John Johnson", "Robert Williams", "Michael Brown", "William Jones",
"David Garcia", "Mary Miller", "Patricia Davis", "Jennifer Rodriguez", "Linda Martinez",
"Barbara Anderson", "Susan Thomas", "Jessica Jackson", "Sarah White", "Karen Harris",
"Lisa Martin", "Nancy Thompson", "Betty Garcia", "Margaret Martinez", "Sandra Robinson",
"Ashley Clark", "Dorothy Rodriguez", "Emma Lewis", "Olivia Lee", "Ava Walker",
"Emily Hall", "Abigail Allen", "Madison Young", "Elizabeth Hernandez", "Charlotte King",
]
# 常见桌面分辨率(避免固定 1280x800 成为指纹)
COMMON_VIEWPORTS = [
(1366, 768), (1440, 900), (1536, 864), (1280, 720),
(1920, 1080), (1600, 900), (1280, 800), (1360, 768),
]
def _find_chromium_path() -> Optional[str]:
"""查找可用的 Chromium/Chrome 浏览器路径"""
for path in CHROMIUM_PATHS:
if os.path.isfile(path) and os.access(path, os.X_OK):
return path
return None
class GeminiAutomation:
"""Gemini自动化登录"""
def __init__(
self,
user_agent: str = "",
proxy: str = "",
headless: bool = True,
timeout: int = 60,
log_callback=None,
) -> None:
self.user_agent = user_agent or self._get_ua()
self.proxy = proxy
self.headless = headless
self.timeout = timeout
self.log_callback = log_callback
self._page = None
self._user_data_dir = None
self._last_send_error = ""
def stop(self) -> None:
"""外部请求停止:尽力关闭浏览器实例。"""
page = self._page
if page:
try:
page.quit()
except Exception:
pass
def login_and_extract(self, email: str, mail_client, is_new_account: bool = False) -> dict:
"""执行登录并提取配置"""
page = None
user_data_dir = None
try:
page = self._create_page()
user_data_dir = getattr(page, 'user_data_dir', None)
self._page = page
self._user_data_dir = user_data_dir
return self._run_flow(page, email, mail_client, is_new_account=is_new_account)
except TaskCancelledError:
raise
except Exception as exc:
self._log("error", f"automation error: {exc}")
return {"success": False, "error": str(exc)}
finally:
if page:
try:
page.quit()
except Exception:
pass
self._page = None
self._cleanup_user_data(user_data_dir)
self._user_data_dir = None
def _create_page(self) -> ChromiumPage:
"""创建浏览器页面"""
options = ChromiumOptions()
# 自动检测 Chromium 浏览器路径(Linux/Docker 环境)
chromium_path = _find_chromium_path()
if chromium_path:
options.set_browser_path(chromium_path)
options.set_argument("--incognito")
options.set_argument("--no-sandbox")
options.set_argument("--disable-dev-shm-usage")
options.set_argument("--disable-setuid-sandbox")
options.set_argument("--disable-blink-features=AutomationControlled")
# 随机窗口尺寸(避免固定分辨率成为指纹)
vw, vh = random.choice(COMMON_VIEWPORTS)
options.set_argument(f"--window-size={vw},{vh}")
options.set_user_agent(self.user_agent)
# 防止 WebRTC 泄露真实 IP(即使使用代理也可能暴露)
options.set_argument("--disable-webrtc")
options.set_argument("--enforce-webrtc-ip-handling-policy")
options.set_pref("webrtc.ip_handling_policy", "disable_non_proxied_udp")
options.set_pref("webrtc.multiple_routes_enabled", False)
options.set_pref("webrtc.nonproxied_udp_enabled", False)
# 语言设置(确保使用中文界面)
options.set_argument("--lang=zh-CN")
options.set_pref("intl.accept_languages", "zh-CN,zh")
if self.proxy:
options.set_argument(f"--proxy-server={self.proxy}")
if self.headless:
# 使用新版无头模式,更接近真实浏览器
options.set_argument("--headless=new")
options.set_argument("--disable-gpu")
options.set_argument("--no-first-run")
options.set_argument("--disable-extensions")
# 反检测参数
options.set_argument("--disable-infobars")
options.set_argument("--enable-features=NetworkService,NetworkServiceInProcess")
options.auto_port()
page = ChromiumPage(options)
page.set.timeouts(self.timeout)
# 最小化 JS 注入:只设置 window.chrome(不使用 Object.defineProperty,避免被 reCAPTCHA 检测)
# 注意:DrissionPage 不像 Selenium 那样暴露 navigator.webdriver,无需额外隐藏
try:
page.run_cdp("Page.addScriptToEvaluateOnNewDocument", source="""
// 确保 window.chrome 存在(headless 模式下可能缺失)
if (!window.chrome) {
window.chrome = {runtime: {}, loadTimes: function(){return {}}, csi: function(){return {}}};
}
""")
except Exception:
pass
return page
def _extract_xsrf_token(self, page) -> str:
"""从页面中提取真实的 XSRF Token(避免硬编码被标黑)"""
try:
html = page.html or ""
# 尝试从 meta 标签提取
m = re.search(r'name=["\']xsrf-token["\']\s+content=["\']([^"\']+)["\']', html, re.IGNORECASE)
if m:
self._log("info", "🔑 从 meta 标签提取到 XSRF token")
return m.group(1)
# 尝试从隐藏 input 提取
m = re.search(r'name=["\']xsrfToken["\'][^>]*value=["\']([A-Za-z0-9_-]{20,})["\']', html)
if m:
self._log("info", "🔑 从 input 提取到 XSRF token")
return m.group(1)
# 尝试从 JS 变量提取
m = re.search(r'xsrfToken["\']?\s*[=:]\s*["\']([A-Za-z0-9_-]{20,})["\']', html)
if m:
self._log("info", "🔑 从 JS 提取到 XSRF token")
return m.group(1)
# 尝试从 URL 参数提取
m = re.search(r'xsrfToken=([A-Za-z0-9_-]{20,})', html)
if m:
self._log("info", "🔑 从 URL 参数提取到 XSRF token")
return m.group(1)
except Exception as e:
self._log("warning", f"⚠️ XSRF token 提取异常: {e}")
self._log("warning", "⚠️ 未能从页面提取 XSRF token,使用备用值")
return "GXO_B0wnNhs6UQJZMcrSbTsbEEs"
def _run_flow(self, page, email: str, mail_client, is_new_account: bool = False) -> dict:
"""执行登录流程(is_new_account=True 时启用注册专用的增强用户名处理)"""
# 记录任务开始时间,用于邮件时间过滤(全流程固定,不随重发更新)
from datetime import datetime
task_start_time = datetime.now()
# Step 1: 导航到登录页面
self._log("info", f"🌐 打开登录页面: {email}")
page.get(AUTH_HOME_URL, timeout=self.timeout)
time.sleep(random.uniform(2, 4))
# 从页面动态提取 XSRF token(避免硬编码被 Google 标黑)
xsrf_token = self._extract_xsrf_token(page)
# 设置 XSRF Cookie
try:
self._log("info", "🍪 设置 XSRF Cookie...")
page.set.cookies({
"name": "__Host-AP_SignInXsrf",
"value": xsrf_token,
"url": AUTH_HOME_URL,
"path": "/",
"secure": True,
})
except Exception as e:
self._log("warning", f"⚠️ Cookie 设置失败: {e}")
# Step 1.5: 通过 URL 方式提交邮箱(稳定,不触发风控)
login_hint = quote(email, safe="")
login_url = f"https://auth.business.gemini.google/login/email?continueUrl=https%3A%2F%2Fbusiness.gemini.google%2F&loginHint={login_hint}&xsrfToken={xsrf_token}"
# 先启动网络监听,再导航(避免漏掉页面加载期间的请求)
try:
page.listen.start(
targets=["batchexecute"],
is_regex=False,
method=("POST",),
res_type=("XHR", "FETCH"),
)
except Exception:
pass
self._log("info", "📧 使用 URL 方式提交邮箱...")
page.get(login_url, timeout=self.timeout)
time.sleep(random.uniform(3, 5))
# 模拟真实用户行为:页面加载后随机滚动
self._random_scroll(page)
# Step 2: 检查当前页面状态
current_url = page.url
self._log("info", f"📍 当前 URL: {current_url}")
# 检测 signin-error 页面(极端情况,一般 URL 方式不会触发)
if "signin-error" in current_url:
self._log("error", "❌ 进入 signin-error 页面,可能是代理或网络问题")
self._save_screenshot(page, "signin_error")
return {"success": False, "error": "signin-error: token rejected by Google, try changing proxy"}
has_business_params = "business.gemini.google" in current_url and "csesidx=" in current_url and "/cid/" in current_url
if has_business_params:
self._log("info", "✅ 已登录,提取配置")
return self._extract_config(page, email)
# 检测 403 Access Restricted(刷新/登录时账户可能已被封禁)
access_error = self._check_access_restricted(page, email)
if access_error:
return access_error
# Step 3: 点击发送验证码按钮(最多5轮,适度退避间隔)
self._log("info", "📧 发送验证码...")
max_send_rounds = 5
send_round_delays = [10, 10, 15, 15, 20]
send_round = 0
while True:
send_round += 1
if self._click_send_code_button(page):
break
if send_round >= max_send_rounds:
self._log("error", "❌ 验证码发送失败(可能触发风控),建议更换代理IP")
self._save_screenshot(page, "send_code_button_failed")
return {"success": False, "error": "send code failed after retries"}
delay = send_round_delays[min(send_round - 1, len(send_round_delays) - 1)]
self._log("warning", f"⚠️ 发送失败,{delay}秒后重试 ({send_round}/{max_send_rounds})")
time.sleep(delay)
# Step 4: 等待验证码输入框出现
code_input = self._wait_for_code_input(page)
if not code_input:
self._log("error", "❌ 验证码输入框未出现")
self._save_screenshot(page, "code_input_missing")
return {"success": False, "error": "code input not found"}
# Step 5: 轮询邮件获取验证码(云环境默认延长,支持环境变量覆盖)
try:
code_timeout = int(str(os.getenv("REGISTER_CODE_TIMEOUT_SECONDS", "60")).strip() or "60")
except Exception:
code_timeout = 60
try:
code_interval = int(str(os.getenv("REGISTER_CODE_INTERVAL_SECONDS", "5")).strip() or "5")
except Exception:
code_interval = 5
code_timeout = max(15, code_timeout)
code_interval = max(2, code_interval)
self._log("info", f"📬 等待邮箱验证码... (timeout={code_timeout}s, interval={code_interval}s)")
code = mail_client.poll_for_code(timeout=code_timeout, interval=code_interval, since_time=task_start_time)
if not code:
self._log("warning", "⚠️ 验证码超时,等待后重新发送...")
time.sleep(random.uniform(12, 18))
# 尝试点击重新发送按钮
if self._click_resend_code_button(page):
# 再次轮询验证码
code = mail_client.poll_for_code(timeout=code_timeout, interval=code_interval, since_time=task_start_time)
if not code:
self._log("error", "❌ 重新发送后仍未收到验证码")
self._save_screenshot(page, "code_timeout_after_resend")
return {"success": False, "error": "verification code timeout after resend"}
else:
self._log("error", "❌ 验证码超时且未找到重新发送按钮")
self._save_screenshot(page, "code_timeout")
return {"success": False, "error": "verification code timeout"}
self._log("info", f"✅ 收到验证码: {code}")
# Step 6: 输入验证码并提交
code_input = page.ele("css:input[jsname='ovqh0b']", timeout=3) or \
page.ele("css:input[type='tel']", timeout=2)
if not code_input:
self._log("error", "❌ 验证码输入框已失效")
return {"success": False, "error": "code input expired"}
# 尝试模拟人类输入,失败则降级到直接注入
self._log("info", "⌨️ 输入验证码...")
if not self._simulate_human_input(code_input, code):
self._log("warning", "⚠️ 模拟输入失败,降级为直接输入")
code_input.input(code, clear=True)
time.sleep(random.uniform(0.4, 0.8))
# 提交验证码:先回车,再找验证按钮兜底
self._log("info", "⏎ 提交验证码")
code_input.input("\n")
time.sleep(random.uniform(1, 2))
# 如果回车没触发,找验证按钮点击
if "verify-oob-code" in page.url:
verify_btn = self._find_verify_button(page)
if verify_btn:
try:
verify_btn.click()
self._log("info", "✅ 已点击验证按钮(兜底)")
except Exception:
pass
# [注册专用] 验证码提交后先等几秒让页面跳转,再检查 403
if is_new_account:
time.sleep(3)
access_error = self._check_access_restricted(page, email)
if access_error:
return access_error
self._log("info", "📝 [注册] 验证码已提交,等待姓名输入页面...")
if self._handle_username_setup(page, is_new_account=True):
self._log("info", "✅ 姓名填写完成,等待工作台 URL...")
if self._wait_for_business_params(page, timeout=45):
self._log("info", "🎊 注册成功,提取配置...")
return self._extract_config(page, email)
# 姓名步骤失败或未出现,继续走通用流程兜底
self._log("info", "⚠️ 姓名步骤未完成,走通用流程兜底...")
# Step 7: 等待页面自动重定向(提交验证码后 Google 会自动跳转)
self._log("info", "⏳ 等待验证后跳转...")
time.sleep(random.uniform(10, 15))
# 记录当前 URL 状态
current_url = page.url
self._log("info", f"📍 验证后 URL: {current_url}")
# 检查是否还停留在验证码页面(说明提交失败)
if "verify-oob-code" in current_url:
self._log("error", "❌ 验证码提交失败")
self._save_screenshot(page, "verification_submit_failed")
return {"success": False, "error": "verification code submission failed"}
# Step 8: 处理协议页面(如果有)
self._handle_agreement_page(page)
# Step 8.5: 检测 403 Access Restricted 页面
access_error = self._check_access_restricted(page, email)
if access_error:
return access_error
# Step 9: 检查是否已经在正确的页面
current_url = page.url
has_business_params = "business.gemini.google" in current_url and "csesidx=" in current_url and "/cid/" in current_url
if has_business_params:
return self._extract_config(page, email)
# Step 10: 如果不在正确的页面,尝试导航
if "business.gemini.google" not in current_url:
page.get("https://business.gemini.google/", timeout=self.timeout)
time.sleep(random.uniform(4, 7))
# Step 11: 检查是否需要设置用户名(仅登录刷新走此路径,注册已在早期处理)
if not is_new_account and "cid" not in page.url:
if self._handle_username_setup(page):
time.sleep(random.uniform(4, 7))
# Step 12: 再次检测 403(导航后可能出现)
access_error = self._check_access_restricted(page, email)
if access_error:
return access_error
# Step 13: 等待 URL 参数生成(csesidx 和 cid)
if not self._wait_for_business_params(page):
page.refresh()
time.sleep(random.uniform(4, 7))
if not self._wait_for_business_params(page):
self._log("error", "❌ URL 参数生成失败")
self._save_screenshot(page, "params_missing")
return {"success": False, "error": "URL parameters not found"}
# Step 13: 提取配置
self._log("info", "🎊 登录成功,提取配置...")
return self._extract_config(page, email)
def _click_send_code_button(self, page) -> bool:
"""点击发送验证码按钮(如果需要)"""
time.sleep(random.uniform(1.5, 3))
max_send_attempts = 5
# 适度退避延迟序列(秒)
retry_delays = [10, 10, 15, 15, 20]
# 方法1: 直接通过ID查找
direct_btn = page.ele("#sign-in-with-email", timeout=5)
if direct_btn:
for attempt in range(1, max_send_attempts + 1):
try:
self._last_send_error = ""
self._human_click(page, direct_btn)
if self._verify_code_send_by_network(page) or self._verify_code_send_status(page):
self._stop_listen(page)
return True
delay = retry_delays[min(attempt - 1, len(retry_delays) - 1)]
if self._last_send_error == "captcha_check_failed":
self._log("error", f"❌ 触发风控,建议更换代理IP ({attempt}/{max_send_attempts})")
else:
self._log("warning", f"⚠️ 发送失败,{delay}秒后重试 ({attempt}/{max_send_attempts})")
time.sleep(delay)
except Exception as e:
self._log("warning", f"⚠️ 点击失败: {e}")
self._stop_listen(page)
return False
# 方法2: 通过关键词查找
keywords = ["通过电子邮件发送验证码", "通过电子邮件发送", "email", "Email", "Send code", "Send verification", "Verification code"]
try:
buttons = page.eles("tag:button")
for btn in buttons:
text = (btn.text or "").strip()
if text and any(kw in text for kw in keywords):
for attempt in range(1, max_send_attempts + 1):
try:
self._last_send_error = ""
self._human_click(page, btn)
if self._verify_code_send_by_network(page) or self._verify_code_send_status(page):
self._stop_listen(page)
return True
delay = retry_delays[min(attempt - 1, len(retry_delays) - 1)]
if self._last_send_error == "captcha_check_failed":
self._log("error", f"❌ 触发风控,建议更换代理IP ({attempt}/{max_send_attempts})")
else:
self._log("warning", f"⚠️ 发送失败,{delay}秒后重试 ({attempt}/{max_send_attempts})")
time.sleep(delay)
except Exception as e:
self._log("warning", f"⚠️ 点击失败: {e}")
self._stop_listen(page)
return False
except Exception as e:
self._log("warning", f"⚠️ 搜索按钮异常: {e}")
# 检查是否在 signin-error 页面(不应该继续尝试发送)
if "signin-error" in (page.url or ""):
self._stop_listen(page)
self._log("error", "❌ 在 signin-error 页面,无法发送验证码")
return False
# 检查是否已经在验证码输入页面
code_input = page.ele("css:input[jsname='ovqh0b']", timeout=2) or page.ele("css:input[name='pinInput']", timeout=1)
if code_input:
self._stop_listen(page)
self._log("info", "✅ 已在验证码输入页面")
# 直接点击重新发送按钮(不管之前是否发送过)
if self._click_resend_code_button(page):
self._log("info", "✅ 已点击重新发送按钮")
return True
else:
self._log("warning", "⚠️ 未找到重新发送按钮,继续流程")
return True
self._stop_listen(page)
self._log("error", "❌ 未找到发送验证码按钮")
return False
def _stop_listen(self, page) -> None:
"""安全地停止网络监听"""
try:
if hasattr(page, 'listen') and page.listen:
page.listen.stop()
except Exception:
pass
def _verify_code_send_by_network(self, page) -> bool:
"""通过监听网络请求验证验证码是否成功发送"""
try:
time.sleep(1)
packets = []
max_wait_seconds = 6
deadline = time.time() + max_wait_seconds
try:
while time.time() < deadline:
got_any = False
for packet in page.listen.steps(timeout=1, gap=1):
packets.append(packet)
got_any = True
if got_any:
time.sleep(0.2)
else:
break
except Exception:
return False
if not packets:
return False
# 保存网络日志(仅用于调试)
self._save_network_packets(packets)
found_batchexecute = False
found_batchexecute_error = False
for packet in packets:
try:
url = str(packet.url) if hasattr(packet, 'url') else str(packet)
if 'batchexecute' in url:
found_batchexecute = True
try:
response = packet.response if hasattr(packet, 'response') else None
if response and hasattr(response, 'raw_body'):
body = response.raw_body
raw_body_str = str(body)
if "CAPTCHA_CHECK_FAILED" in raw_body_str:
found_batchexecute_error = True
self._last_send_error = "captcha_check_failed"
elif "SendEmailOtpError" in raw_body_str:
found_batchexecute_error = True
self._last_send_error = "send_email_otp_error"
except Exception:
pass
except Exception:
continue
if found_batchexecute:
if found_batchexecute_error:
return False
return True
else:
return False
except Exception:
return False
def _verify_code_send_status(self, page) -> bool:
"""检测页面提示判断是否发送成功"""
time.sleep(random.uniform(1.5, 3))
try:
success_keywords = ["验证码已发送", "code sent", "email sent", "check your email", "已发送"]
error_keywords = [
"出了点问题",
"something went wrong",
"error",
"failed",
"try again",
"稍后再试",
"选择其他登录方法"
]
selectors = [
"css:.zyTWof-gIZMF",
"css:[role='alert']",
"css:aside",
]
for selector in selectors:
try:
elements = page.eles(selector, timeout=1)
for elem in elements[:20]:
text = (elem.text or "").strip()
if not text:
continue
if any(kw in text for kw in error_keywords):
return False
if any(kw in text for kw in success_keywords):
return True
except Exception:
continue
return True
except Exception:
return True
def _truncate_text(self, text: str, max_len: int = 2000) -> str:
if text is None:
return ""
if len(text) <= max_len:
return text
return text[:max_len] + f"...(truncated, total={len(text)})"
def _save_network_packets(self, packets) -> None:
"""保存网络日志(仅用于调试)"""
try:
from core.storage import _data_file_path
base_dir = _data_file_path(os.path.join("logs", "network"))
os.makedirs(base_dir, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
file_path = os.path.join(base_dir, f"network-{ts}.jsonl")
def safe_str(value):
try:
return value if isinstance(value, str) else str(value)
except Exception:
return "<unprintable>"
with open(file_path, "a", encoding="utf-8") as f:
for packet in packets:
try:
req = packet.request if hasattr(packet, "request") else None
resp = packet.response if hasattr(packet, "response") else None
fail = packet.fail_info if hasattr(packet, "fail_info") else None
item = {
"url": safe_str(packet.url) if hasattr(packet, "url") else safe_str(packet),
"method": safe_str(packet.method) if hasattr(packet, "method") else "UNKNOWN",
"resourceType": safe_str(packet.resourceType) if hasattr(packet, "resourceType") else "",
"is_failed": bool(packet.is_failed) if hasattr(packet, "is_failed") else False,
"fail_info": safe_str(fail) if fail else "",
"request": {
"headers": req.headers if req and hasattr(req, "headers") else {},
"postData": req.postData if req and hasattr(req, "postData") else "",
},
"response": {
"status": resp.status if resp and hasattr(resp, "status") else 0,
"headers": resp.headers if resp and hasattr(resp, "headers") else {},
"raw_body": resp.raw_body if resp and hasattr(resp, "raw_body") else "",
},
}
f.write(json.dumps(item, ensure_ascii=False) + "\n")
except Exception as e:
f.write(json.dumps({"error": safe_str(e)}, ensure_ascii=False) + "\n")
except Exception:
pass
def _wait_for_code_input(self, page, timeout: int = 30):
"""等待验证码输入框出现"""
selectors = [
"css:input[jsname='ovqh0b']",
"css:input[type='tel']",
"css:input[name='pinInput']",
"css:input[autocomplete='one-time-code']",
]
for _ in range(timeout // 2):
for selector in selectors:
try:
el = page.ele(selector, timeout=1)
if el:
return el
except Exception:
continue
time.sleep(2)
return None
def _simulate_human_input(self, element, text: str) -> bool:
"""模拟人类输入(逐字符输入,带非均匀延迟)
Args:
element: 输入框元素
text: 要输入的文本
Returns:
bool: 是否成功
"""
try:
# 先点击输入框获取焦点
element.click()
time.sleep(random.uniform(0.2, 0.5))
# 逐字符输入,模拟真实打字节奏
for i, char in enumerate(text):
element.input(char)
# 基础延迟 80-180ms(正常打字速度)
delay = random.uniform(0.08, 0.18)
# 每3-5个字符偶尔有更长的停顿(模拟犹豫/看屏幕)
if i > 0 and random.random() < 0.2:
delay += random.uniform(0.2, 0.5)
time.sleep(delay)
# 输入完成后停顿(模拟核对)
time.sleep(random.uniform(0.3, 0.8))
return True
except Exception:
return False
def _human_click(self, page, element) -> None:
"""模拟人类点击:先移动鼠标到元素附近,再点击"""
try:
# 尝试用 actions 链模拟鼠标移动 + 点击
page.actions.move_to(element)
time.sleep(random.uniform(0.1, 0.3))
page.actions.click()
except Exception:
# 降级为直接点击
element.click()
def _random_scroll(self, page) -> None:
"""模拟真实用户的页面滚动行为"""
try:
scroll_amount = random.randint(50, 200)
page.run_js(f"window.scrollBy(0, {scroll_amount})")
time.sleep(random.uniform(0.3, 0.8))
# 有时候滚回去一点
if random.random() < 0.3:
page.run_js(f"window.scrollBy(0, -{random.randint(20, 80)})")
time.sleep(random.uniform(0.2, 0.5))
except Exception:
pass
def _find_verify_button(self, page):
"""查找验证按钮(排除重新发送按钮)"""
try:
buttons = page.eles("tag:button")
for btn in buttons:
text = (btn.text or "").strip().lower()
if text and "重新" not in text and "发送" not in text and "resend" not in text and "send" not in text:
return btn
except Exception:
pass
return None
def _click_resend_code_button(self, page) -> bool:
"""点击重新发送验证码按钮"""
time.sleep(random.uniform(1.5, 3))
# 查找包含重新发送关键词的按钮(与 _find_verify_button 相反)
try:
buttons = page.eles("tag:button")
for btn in buttons:
text = (btn.text or "").strip().lower()
if text and ("重新" in text or "resend" in text):
try:
self._log("info", f"🔄 点击重新发送按钮")
self._human_click(page, btn)
time.sleep(random.uniform(1.5, 3))
return True
except Exception:
pass
except Exception:
pass
return False
def _check_access_restricted(self, page, email: str = "") -> dict | None:
"""检测 403 Access Restricted 页面,返回错误 dict 或 None"""
domain = email.split("@")[1] if "@" in email else "unknown"
error_msg = f"403 域名封禁 ({domain})"
# 方法1: 搜索 h1 标签
try:
h1 = page.ele("tag:h1", timeout=2)
h1_text = h1.text if h1 else ""
if h1_text and "Access Restricted" in h1_text:
self._log("error", "⛔ 403 Access Restricted: email banned by Google")
self._log("error", f"⛔ 403 访问受限,域名 {domain} 可能已被 Google 封禁")
self._save_screenshot(page, "access_restricted_403")
return {"success": False, "error": error_msg}
except Exception:
pass
# 方法2: body 文本
try:
body = page.ele("tag:body", timeout=2)
body_text = (body.text or "")[:500] if body else ""
if "Access Restricted" in body_text:
self._log("error", "⛔ 403 Access Restricted: email banned by Google")
self._log("error", f"⛔ 403 访问受限,域名 {domain} 可能已被 Google 封禁")
self._save_screenshot(page, "access_restricted_403")
return {"success": False, "error": error_msg}
except Exception:
pass
# 方法3: page.html 源码
try:
html = (page.html or "")[:2000]
if "Access Restricted" in html:
self._log("error", "⛔ 403 Access Restricted: email banned by Google")
self._log("error", f"⛔ 403 访问受限,域名 {domain} 可能已被 Google 封禁")
self._save_screenshot(page, "access_restricted_403")
return {"success": False, "error": error_msg}
except Exception:
pass
return None
def _handle_agreement_page(self, page) -> None:
"""处理协议页面"""
if "/admin/create" in page.url:
agree_btn = page.ele("css:button.agree-button", timeout=5)
if agree_btn:
self._human_click(page, agree_btn)
time.sleep(random.uniform(2, 4))
def _wait_for_cid(self, page, timeout: int = 10) -> bool:
"""等待URL包含cid"""
for _ in range(timeout):
if "cid" in page.url:
return True
time.sleep(1)
return False
def _wait_for_business_params(self, page, timeout: int = 30) -> bool:
"""等待业务页面参数生成(csesidx 和 cid)"""
for _ in range(timeout):
url = page.url
if "csesidx=" in url and "/cid/" in url:
return True
time.sleep(1)
return False
def _handle_username_setup(self, page, is_new_account: bool = False) -> bool:
"""处理用户名设置页面(is_new_account=True 时启用按钮兜底和延长超时)"""
current_url = page.url
if "auth.business.gemini.google/login" in current_url:
return False
# 精准选择器(参考实际页面 DOM,优先级从高到低)
selectors = [
"css:input[formcontrolname='fullName']",
"css:input#mat-input-0",
"css:input[placeholder='全名']",
"css:input[placeholder='Full name']",
"css:input[name='displayName']",
"css:input[aria-label*='用户名' i]",
"css:input[aria-label*='display name' i]",
"css:input[type='text']",
]
# 轮询等待输入框出现(最多30秒,每秒检查一次)
# 与参考代码对齐:页面加载慢时不会过早放弃
username_input = None
self._log("info", "⏳ 等待用户名输入框出现(最多30秒)...")
for i in range(30):
for selector in selectors:
try:
el = page.ele(selector, timeout=1)
if el:
username_input = el
self._log("info", f"✅ 找到用户名输入框: {selector}")
break
except Exception:
continue
if username_input:
break
time.sleep(1)
if not username_input:
self._log("warning", "⚠️ 30秒内未找到用户名输入框,跳过此步骤")
return False
name = random.choice(REGISTER_NAMES)
self._log("info", f"✏️ 输入姓名: {name}")
try:
# 清空输入框
username_input.click()
time.sleep(random.uniform(0.2, 0.5))
username_input.clear()
time.sleep(random.uniform(0.1, 0.3))
# 尝试模拟人类输入,失败则降级到直接注入
if not self._simulate_human_input(username_input, name):
username_input.input(name)
time.sleep(0.3)
# 回车提交
username_input.input("\n")
if is_new_account:
# 注册专用:回车后等待1.5秒,若未跳转则用按钮兜底
time.sleep(random.uniform(1.5, 3))
if "cid" not in page.url:
self._log("info", "⌨️ 回车未跳转,尝试点击提交按钮...")
try:
for btn in page.eles("tag:button"):
try:
if btn.is_displayed() and btn.is_enabled():
btn.click()
self._log("info", "✅ 已点击提交按钮(兜底)")
time.sleep(1)
break
except Exception:
continue
except Exception as e:
self._log("warning", f"⚠️ 按钮兜底失败: {e}")
# 注册专用:等待45秒,失败则刷新再等15秒
if not self._wait_for_cid(page, timeout=45):
self._log("warning", "⚠️ 用户名提交后未检测到 cid 参数,尝试刷新...")
page.refresh()
time.sleep(random.uniform(2, 4))
if not self._wait_for_cid(page, timeout=15):
self._log("error", "❌ 刷新后仍未检测到 cid 参数")
self._save_screenshot(page, "step7_after_verify")
return False
else:
# 登录刷新:原有30秒逻辑
if not self._wait_for_cid(page, timeout=30):
self._log("warning", "⚠️ 用户名提交后未检测到 cid 参数")
return False
return True
except Exception as e:
self._log("warning", f"⚠️ 用户名设置异常: {e}")
return False
def _extract_config(self, page, email: str) -> dict:
"""提取配置(轮询等待 cookie 到位)"""
try:
if "cid/" not in page.url:
page.get("https://business.gemini.google/", timeout=self.timeout)
time.sleep(random.uniform(2, 4))
url = page.url
if "cid/" not in url:
return {"success": False, "error": "cid not found"}
config_id = url.split("cid/")[1].split("?")[0].split("/")[0]
csesidx = url.split("csesidx=")[1].split("&")[0] if "csesidx=" in url else ""
# 轮询等待关键 cookie 到位(最多10秒)
ses = None
host = None
ses_obj = None
for _ in range(10):
cookies = page.cookies()
ses = next((c["value"] for c in cookies if c["name"] == "__Secure-C_SES"), None)
host = next((c["value"] for c in cookies if c["name"] == "__Host-C_OSES"), None)
ses_obj = next((c for c in cookies if c["name"] == "__Secure-C_SES"), None)
if ses and host:
break
time.sleep(1)
if not ses or not host:
self._log("warning", f"⚠️ Cookie 不完整 (ses={'有' if ses else '无'}, host={'有' if host else '无'})")
# 使用北京时区,确保时间计算正确(Cookie expiry 是 UTC 时间戳)
beijing_tz = timezone(timedelta(hours=8))
if ses_obj and "expiry" in ses_obj:
cookie_expire_beijing = datetime.fromtimestamp(ses_obj["expiry"], tz=beijing_tz)
expires_at = (cookie_expire_beijing - timedelta(hours=12)).strftime("%Y-%m-%d %H:%M:%S")
else:
expires_at = (datetime.now(beijing_tz) + timedelta(hours=12)).strftime("%Y-%m-%d %H:%M:%S")
config = {
"id": email,
"csesidx": csesidx,
"config_id": config_id,
"secure_c_ses": ses,
"host_c_oses": host,
"expires_at": expires_at,
}
# 提取试用期信息
trial_end = self._extract_trial_end(page, csesidx, config_id)
if trial_end:
config["trial_end"] = trial_end
return {"success": True, "config": config}
except Exception as e:
return {"success": False, "error": str(e)}
def _extract_trial_end(self, page, csesidx: str, config_id: str) -> Optional[str]:
"""从页面中提取试用期到期日期,不跳转到可能 400 的深层路径"""
# re 已在文件顶部导入
try:
self._log("info", "📅 获取试用期信息...")
def _days_to_end_date(days: int) -> str:
end_date = (datetime.now(timezone(timedelta(hours=8))) + timedelta(days=days)).strftime("%Y-%m-%d")
self._log("info", f"📅 试用期剩余 {days} 天,到期日: {end_date}")
return end_date
def _search_page_source(source: str) -> Optional[str]:
"""在页面源码中搜索试用期信息"""
# 格式1: "daysLeft":29 (JSON数据)
m = re.search(r'"daysLeft"\s*:\s*(\d+)', source)
if m:
return _days_to_end_date(int(m.group(1)))
# 格式2: "trialDaysRemaining":29
m = re.search(r'"trialDaysRemaining"\s*:\s*(\d+)', source)
if m:
return _days_to_end_date(int(m.group(1)))
# 格式3: 日期数组 "[2026,3,25]" 形式 (batchexecute格式)
m = re.search(r'\[(\d{4}),(\d{1,2}),(\d{1,2})\].*?\[(\d{4}),(\d{1,2}),(\d{1,2})\]', source)
if m:
# 取第二个日期(结束日期)
try:
end_date = f"{m.group(4):0>4}-{int(m.group(5)):02d}-{int(m.group(6)):02d}"
# 简单校验年份合理
if 2025 <= int(m.group(4)) <= 2030:
self._log("info", f"📅 试用期到期日: {end_date}")
return end_date
except Exception:
pass
# 格式4: "29 days left" 或 "还剩29天"
m = re.search(r'(\d+)\s*days?\s*left', source, re.IGNORECASE)
if m:
return _days_to_end_date(int(m.group(1)))
m = re.search(r'还剩\s*(\d+)\s*天', source)
if m:
return _days_to_end_date(int(m.group(1)))
return None
# ——— 方式1: 当前页面(刚登录完,不需要跳转)———
try:
source = page.html
result = _search_page_source(source or "")
if result:
return result
except Exception:
pass
# ——— 方式2: 跳转到 /settings(不带 billing/plans 后缀,SPA可以处理)———
try:
settings_url = f"https://business.gemini.google/cid/{config_id}/settings?csesidx={csesidx}"
page.get(settings_url, timeout=self.timeout)
time.sleep(random.uniform(1.5, 3))
source = page.html
result = _search_page_source(source or "")
if result:
return result
except Exception:
pass
# ——— 方式3: 跳转到主页(最保险)———
try:
main_url = f"https://business.gemini.google/cid/{config_id}?csesidx={csesidx}"
page.get(main_url, timeout=self.timeout)
time.sleep(random.uniform(1.5, 3))
source = page.html
result = _search_page_source(source or "")
if result:
return result
except Exception:
pass
self._log("warning", "⚠️ 未能获取试用期信息(页面中未找到相关数据)")
return None
except Exception as e:
self._log("warning", f"⚠️ 获取试用期失败: {e}")
return None
def _save_screenshot(self, page, name: str) -> None:
"""保存截图"""
try:
from core.storage import _data_file_path
screenshot_dir = _data_file_path("automation")
os.makedirs(screenshot_dir, exist_ok=True)
path = os.path.join(screenshot_dir, f"{name}_{int(time.time())}.png")
page.get_screenshot(path=path)
except Exception:
pass
def _log(self, level: str, message: str) -> None:
"""记录日志"""
if self.log_callback:
try:
self.log_callback(level, message)
except TaskCancelledError:
raise
except Exception:
pass
def _cleanup_user_data(self, user_data_dir: Optional[str]) -> None:
"""清理浏览器用户数据目录"""
if not user_data_dir:
return
try:
import shutil
if os.path.exists(user_data_dir):
shutil.rmtree(user_data_dir, ignore_errors=True)
except Exception:
pass
@staticmethod
def _get_ua() -> str:
"""生成随机User-Agent(使用当前主流 Chrome 版本)"""
major = random.choice([132, 133, 134, 135])
v = f"{major}.0.{random.randint(6800, 6950)}.{random.randint(50, 150)}"
return f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{v} Safari/537.36"