| """
|
| Gemini自动化登录模块(用于新账号注册)
|
| """
|
| import os
|
| import random
|
| import string
|
| import time
|
| from datetime import datetime, timedelta, timezone
|
| from typing import Optional
|
| from urllib.parse import quote
|
|
|
| from DrissionPage import ChromiumPage, ChromiumOptions
|
|
|
|
|
|
|
| AUTH_HOME_URL = "https://auth.business.gemini.google/"
|
| DEFAULT_XSRF_TOKEN = "KdLRzKwwBTD5wo8nUollAbY6cW0"
|
|
|
|
|
| CHROMIUM_PATHS = [
|
| "/usr/bin/chromium",
|
| "/usr/bin/chromium-browser",
|
| "/usr/bin/google-chrome",
|
| "/usr/bin/google-chrome-stable",
|
| ]
|
|
|
|
|
| def _find_chromium_path() -> Optional[str]:
|
| """查找可用的 Chromium/Chrome 浏览器路径"""
|
| for path in CHROMIUM_PATHS:
|
| if os.path.isfile(path) and os.access(path, os.X_OK):
|
| return path
|
| return None
|
|
|
|
|
| class GeminiAutomation:
|
| """Gemini自动化登录"""
|
|
|
| def __init__(
|
| self,
|
| user_agent: str = "",
|
| proxy: str = "",
|
| headless: bool = True,
|
| timeout: int = 60,
|
| log_callback=None,
|
| ) -> None:
|
| self.user_agent = user_agent or self._get_ua()
|
| self.proxy = proxy
|
| self.headless = headless
|
| self.timeout = timeout
|
| self.log_callback = log_callback
|
|
|
| def login_and_extract(self, email: str, mail_client) -> dict:
|
| """执行登录并提取配置"""
|
| page = None
|
| user_data_dir = None
|
| try:
|
| page = self._create_page()
|
| user_data_dir = getattr(page, 'user_data_dir', None)
|
| return self._run_flow(page, email, mail_client)
|
| except Exception as exc:
|
| self._log("error", f"automation error: {exc}")
|
| return {"success": False, "error": str(exc)}
|
| finally:
|
| if page:
|
| try:
|
| page.quit()
|
| except Exception:
|
| pass
|
| self._cleanup_user_data(user_data_dir)
|
|
|
| def _create_page(self) -> ChromiumPage:
|
| """创建浏览器页面"""
|
| options = ChromiumOptions()
|
|
|
|
|
| chromium_path = _find_chromium_path()
|
| if chromium_path:
|
| options.set_browser_path(chromium_path)
|
| self._log("info", f"using browser: {chromium_path}")
|
|
|
| options.set_argument("--incognito")
|
| options.set_argument("--no-sandbox")
|
| options.set_argument("--disable-dev-shm-usage")
|
| options.set_argument("--disable-setuid-sandbox")
|
| options.set_argument("--disable-blink-features=AutomationControlled")
|
| options.set_argument("--window-size=1280,800")
|
| options.set_user_agent(self.user_agent)
|
|
|
|
|
| options.set_argument("--lang=zh-CN")
|
| options.set_pref("intl.accept_languages", "zh-CN,zh")
|
|
|
| if self.proxy:
|
| options.set_argument(f"--proxy-server={self.proxy}")
|
|
|
| if self.headless:
|
|
|
| options.set_argument("--headless=new")
|
| options.set_argument("--disable-gpu")
|
| options.set_argument("--no-first-run")
|
| options.set_argument("--disable-extensions")
|
|
|
| options.set_argument("--disable-infobars")
|
| options.set_argument("--enable-features=NetworkService,NetworkServiceInProcess")
|
|
|
| options.auto_port()
|
| page = ChromiumPage(options)
|
| page.set.timeouts(self.timeout)
|
|
|
|
|
| if self.headless:
|
| try:
|
| page.run_cdp("Page.addScriptToEvaluateOnNewDocument", source="""
|
| Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
|
| Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]});
|
| Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN', 'zh', 'en']});
|
| window.chrome = {runtime: {}};
|
|
|
| // 额外的反检测措施
|
| Object.defineProperty(navigator, 'maxTouchPoints', {get: () => 1});
|
| Object.defineProperty(navigator, 'platform', {get: () => 'Win32'});
|
| Object.defineProperty(navigator, 'vendor', {get: () => 'Google Inc.'});
|
|
|
| // 隐藏 headless 特征
|
| Object.defineProperty(navigator, 'hardwareConcurrency', {get: () => 8});
|
| Object.defineProperty(navigator, 'deviceMemory', {get: () => 8});
|
|
|
| // 模拟真实的 permissions
|
| const originalQuery = window.navigator.permissions.query;
|
| window.navigator.permissions.query = (parameters) => (
|
| parameters.name === 'notifications' ?
|
| Promise.resolve({state: Notification.permission}) :
|
| originalQuery(parameters)
|
| );
|
| """)
|
| except Exception:
|
| pass
|
|
|
| return page
|
|
|
| def _run_flow(self, page, email: str, mail_client) -> dict:
|
| """执行登录流程"""
|
|
|
|
|
| from datetime import datetime
|
| send_time = datetime.now()
|
|
|
|
|
| self._log("info", f"navigating to login page for {email}")
|
|
|
| page.get(AUTH_HOME_URL, timeout=self.timeout)
|
| time.sleep(2)
|
|
|
|
|
| try:
|
| page.set.cookies({
|
| "name": "__Host-AP_SignInXsrf",
|
| "value": DEFAULT_XSRF_TOKEN,
|
| "url": AUTH_HOME_URL,
|
| "path": "/",
|
| "secure": True,
|
| })
|
|
|
| page.set.cookies({
|
| "name": "_GRECAPTCHA",
|
| "value": "09ABCL...",
|
| "url": "https://google.com",
|
| "path": "/",
|
| "secure": True,
|
| })
|
| except Exception as e:
|
| self._log("warning", f"failed to set cookies: {e}")
|
|
|
| login_hint = quote(email, safe="")
|
| login_url = f"https://auth.business.gemini.google/login/email?continueUrl=https%3A%2F%2Fbusiness.gemini.google%2F&loginHint={login_hint}&xsrfToken={DEFAULT_XSRF_TOKEN}"
|
| page.get(login_url, timeout=self.timeout)
|
| time.sleep(5)
|
|
|
|
|
| current_url = page.url
|
| has_business_params = "business.gemini.google" in current_url and "csesidx=" in current_url and "/cid/" in current_url
|
|
|
| if has_business_params:
|
| return self._extract_config(page, email)
|
|
|
|
|
| self._log("info", "clicking send verification code button")
|
| if not self._click_send_code_button(page):
|
| self._log("error", "send code button not found")
|
| self._save_screenshot(page, "send_code_button_missing")
|
| return {"success": False, "error": "send code button not found"}
|
|
|
|
|
| code_input = self._wait_for_code_input(page)
|
| if not code_input:
|
| self._log("error", "code input not found")
|
| self._save_screenshot(page, "code_input_missing")
|
| return {"success": False, "error": "code input not found"}
|
|
|
|
|
| self._log("info", "polling for verification code")
|
| code = mail_client.poll_for_code(timeout=40, interval=4, since_time=send_time)
|
|
|
| if not code:
|
| self._log("warning", "verification code timeout, trying to resend")
|
|
|
| send_time = datetime.now()
|
|
|
| if self._click_resend_code_button(page):
|
| self._log("info", "resend button clicked, waiting for new code")
|
|
|
| code = mail_client.poll_for_code(timeout=40, interval=4, since_time=send_time)
|
| if not code:
|
| self._log("error", "verification code timeout after resend")
|
| self._save_screenshot(page, "code_timeout_after_resend")
|
| return {"success": False, "error": "verification code timeout after resend"}
|
| else:
|
| self._log("error", "verification code timeout and resend button not found")
|
| self._save_screenshot(page, "code_timeout")
|
| return {"success": False, "error": "verification code timeout"}
|
|
|
| self._log("info", f"code received: {code}")
|
|
|
|
|
| code_input = page.ele("css:input[jsname='ovqh0b']", timeout=3) or \
|
| page.ele("css:input[type='tel']", timeout=2)
|
|
|
| if not code_input:
|
| self._log("error", "code input expired")
|
| return {"success": False, "error": "code input expired"}
|
|
|
| self._log("info", "inputting verification code")
|
| code_input.input(code, clear=True)
|
| time.sleep(0.5)
|
|
|
| verify_btn = page.ele("css:button[jsname='XooR8e']", timeout=3)
|
| if verify_btn:
|
| self._log("info", "clicking verify button (method 1)")
|
| verify_btn.click()
|
| else:
|
| verify_btn = self._find_verify_button(page)
|
| if verify_btn:
|
| self._log("info", "clicking verify button (method 2)")
|
| verify_btn.click()
|
| else:
|
| self._log("info", "pressing enter to submit")
|
| code_input.input("\n")
|
|
|
|
|
| self._log("info", "waiting for auto-redirect after verification")
|
| time.sleep(12)
|
|
|
|
|
| current_url = page.url
|
| self._log("info", f"current URL after verification: {current_url}")
|
|
|
|
|
| if "verify-oob-code" in current_url:
|
| self._log("error", "verification code submission failed, still on verification page")
|
| self._save_screenshot(page, "verification_submit_failed")
|
| return {"success": False, "error": "verification code submission failed"}
|
|
|
|
|
| self._handle_agreement_page(page)
|
|
|
|
|
| current_url = page.url
|
| has_business_params = "business.gemini.google" in current_url and "csesidx=" in current_url and "/cid/" in current_url
|
|
|
| if has_business_params:
|
|
|
| self._log("info", "already on business page with parameters")
|
| return self._extract_config(page, email)
|
|
|
|
|
| if "business.gemini.google" not in current_url:
|
| self._log("info", "navigating to business page")
|
| page.get("https://business.gemini.google/", timeout=self.timeout)
|
| time.sleep(5)
|
| current_url = page.url
|
| self._log("info", f"URL after navigation: {current_url}")
|
|
|
|
|
| if "cid" not in page.url:
|
| if self._handle_username_setup(page):
|
| time.sleep(5)
|
|
|
|
|
| self._log("info", "waiting for URL parameters")
|
| if not self._wait_for_business_params(page):
|
| self._log("warning", "URL parameters not generated, trying refresh")
|
| page.refresh()
|
| time.sleep(5)
|
| if not self._wait_for_business_params(page):
|
| self._log("error", "URL parameters generation failed")
|
| current_url = page.url
|
| self._log("error", f"final URL: {current_url}")
|
| self._save_screenshot(page, "params_missing")
|
| return {"success": False, "error": "URL parameters not found"}
|
|
|
|
|
| self._log("info", "login success")
|
| return self._extract_config(page, email)
|
|
|
| def _click_send_code_button(self, page) -> bool:
|
| """点击发送验证码按钮(如果需要)"""
|
| time.sleep(2)
|
|
|
|
|
| direct_btn = page.ele("#sign-in-with-email", timeout=5)
|
| if direct_btn:
|
| try:
|
| direct_btn.click()
|
| return True
|
| except Exception:
|
| pass
|
|
|
|
|
| keywords = ["通过电子邮件发送验证码", "通过电子邮件发送", "email", "Email", "Send code", "Send verification", "Verification code"]
|
| try:
|
| buttons = page.eles("tag:button")
|
| for btn in buttons:
|
| text = (btn.text or "").strip()
|
| if text and any(kw in text for kw in keywords):
|
| try:
|
| btn.click()
|
| return True
|
| except Exception:
|
| pass
|
| except Exception:
|
| pass
|
|
|
|
|
| code_input = page.ele("css:input[jsname='ovqh0b']", timeout=2) or page.ele("css:input[name='pinInput']", timeout=1)
|
| if code_input:
|
| return True
|
|
|
| return False
|
|
|
| def _wait_for_code_input(self, page, timeout: int = 30):
|
| """等待验证码输入框出现"""
|
| selectors = [
|
| "css:input[jsname='ovqh0b']",
|
| "css:input[type='tel']",
|
| "css:input[name='pinInput']",
|
| "css:input[autocomplete='one-time-code']",
|
| ]
|
| for _ in range(timeout // 2):
|
| for selector in selectors:
|
| try:
|
| el = page.ele(selector, timeout=1)
|
| if el:
|
| return el
|
| except Exception:
|
| continue
|
| time.sleep(2)
|
| return None
|
|
|
| def _find_verify_button(self, page):
|
| """查找验证按钮(排除重新发送按钮)"""
|
| try:
|
| buttons = page.eles("tag:button")
|
| for btn in buttons:
|
| text = (btn.text or "").strip().lower()
|
| if text and "重新" not in text and "发送" not in text and "resend" not in text and "send" not in text:
|
| return btn
|
| except Exception:
|
| pass
|
| return None
|
|
|
| def _click_resend_code_button(self, page) -> bool:
|
| """点击重新发送验证码按钮"""
|
| time.sleep(2)
|
|
|
|
|
| try:
|
| buttons = page.eles("tag:button")
|
| for btn in buttons:
|
| text = (btn.text or "").strip().lower()
|
| if text and ("重新" in text or "resend" in text):
|
| try:
|
| self._log("info", f"found resend button: {text}")
|
| btn.click()
|
| time.sleep(2)
|
| return True
|
| except Exception:
|
| pass
|
| except Exception:
|
| pass
|
|
|
| return False
|
|
|
| def _handle_agreement_page(self, page) -> None:
|
| """处理协议页面"""
|
| if "/admin/create" in page.url:
|
| agree_btn = page.ele("css:button.agree-button", timeout=5)
|
| if agree_btn:
|
| agree_btn.click()
|
| time.sleep(2)
|
|
|
| def _wait_for_cid(self, page, timeout: int = 10) -> bool:
|
| """等待URL包含cid"""
|
| for _ in range(timeout):
|
| if "cid" in page.url:
|
| return True
|
| time.sleep(1)
|
| return False
|
|
|
| def _wait_for_business_params(self, page, timeout: int = 30) -> bool:
|
| """等待业务页面参数生成(csesidx 和 cid)"""
|
| for _ in range(timeout):
|
| url = page.url
|
| if "csesidx=" in url and "/cid/" in url:
|
| self._log("info", f"business params ready: {url}")
|
| return True
|
| time.sleep(1)
|
| return False
|
|
|
| def _handle_username_setup(self, page) -> bool:
|
| """处理用户名设置页面"""
|
| current_url = page.url
|
|
|
| if "auth.business.gemini.google/login" in current_url:
|
| return False
|
|
|
| selectors = [
|
| "css:input[type='text']",
|
| "css:input[name='displayName']",
|
| "css:input[aria-label*='用户名' i]",
|
| "css:input[aria-label*='display name' i]",
|
| ]
|
|
|
| username_input = None
|
| for selector in selectors:
|
| try:
|
| username_input = page.ele(selector, timeout=2)
|
| if username_input:
|
| break
|
| except Exception:
|
| continue
|
|
|
| if not username_input:
|
| return False
|
|
|
| suffix = "".join(random.choices(string.ascii_letters + string.digits, k=3))
|
| username = f"Test{suffix}"
|
|
|
| try:
|
| username_input.click()
|
| time.sleep(0.2)
|
| username_input.clear()
|
| username_input.input(username)
|
| time.sleep(0.3)
|
|
|
| buttons = page.eles("tag:button")
|
| submit_btn = None
|
| for btn in buttons:
|
| text = (btn.text or "").strip().lower()
|
| if any(kw in text for kw in ["确认", "提交", "继续", "submit", "continue", "confirm", "save", "保存", "下一步", "next"]):
|
| submit_btn = btn
|
| break
|
|
|
| if submit_btn:
|
| submit_btn.click()
|
| else:
|
| username_input.input("\n")
|
|
|
| time.sleep(5)
|
| return True
|
| except Exception:
|
| return False
|
|
|
| def _extract_config(self, page, email: str) -> dict:
|
| """提取配置"""
|
| try:
|
| if "cid/" not in page.url:
|
| page.get("https://business.gemini.google/", timeout=self.timeout)
|
| time.sleep(3)
|
|
|
| url = page.url
|
| if "cid/" not in url:
|
| return {"success": False, "error": "cid not found"}
|
|
|
| config_id = url.split("cid/")[1].split("?")[0].split("/")[0]
|
| csesidx = url.split("csesidx=")[1].split("&")[0] if "csesidx=" in url else ""
|
|
|
| cookies = page.cookies()
|
| ses = next((c["value"] for c in cookies if c["name"] == "__Secure-C_SES"), None)
|
| host = next((c["value"] for c in cookies if c["name"] == "__Host-C_OSES"), None)
|
|
|
| ses_obj = next((c for c in cookies if c["name"] == "__Secure-C_SES"), None)
|
|
|
| beijing_tz = timezone(timedelta(hours=8))
|
| if ses_obj and "expiry" in ses_obj:
|
|
|
| cookie_expire_beijing = datetime.fromtimestamp(ses_obj["expiry"], tz=beijing_tz)
|
| expires_at = (cookie_expire_beijing - timedelta(hours=12)).strftime("%Y-%m-%d %H:%M:%S")
|
| else:
|
| expires_at = (datetime.now(beijing_tz) + timedelta(hours=12)).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
| config = {
|
| "id": email,
|
| "csesidx": csesidx,
|
| "config_id": config_id,
|
| "secure_c_ses": ses,
|
| "host_c_oses": host,
|
| "expires_at": expires_at,
|
| }
|
| return {"success": True, "config": config}
|
| except Exception as e:
|
| return {"success": False, "error": str(e)}
|
|
|
| def _save_screenshot(self, page, name: str) -> None:
|
| """保存截图"""
|
| try:
|
| import os
|
| screenshot_dir = os.path.join("data", "automation")
|
| os.makedirs(screenshot_dir, exist_ok=True)
|
| path = os.path.join(screenshot_dir, f"{name}_{int(time.time())}.png")
|
| page.get_screenshot(path=path)
|
| except Exception:
|
| pass
|
|
|
| def _log(self, level: str, message: str) -> None:
|
| """记录日志"""
|
| if self.log_callback:
|
| try:
|
| self.log_callback(level, message)
|
| except Exception:
|
| pass
|
|
|
| def _cleanup_user_data(self, user_data_dir: Optional[str]) -> None:
|
| """清理浏览器用户数据目录"""
|
| if not user_data_dir:
|
| return
|
| try:
|
| import shutil
|
| if os.path.exists(user_data_dir):
|
| shutil.rmtree(user_data_dir, ignore_errors=True)
|
| except Exception:
|
| pass
|
|
|
| @staticmethod
|
| def _get_ua() -> str:
|
| """生成随机User-Agent"""
|
| v = random.choice(["120.0.0.0", "121.0.0.0", "122.0.0.0"])
|
| return f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{v} Safari/537.36"
|
|
|