gemini-2api / core /gemini_automation_uc.py
xiaoyukkkk's picture
Upload 18 files
6bf62af verified
"""
Gemini自动化登录模块(使用 undetected-chromedriver)
更强的反检测能力,支持无头模式
"""
import random
import string
import time
from datetime import datetime, timedelta
from typing import Optional
from urllib.parse import quote
import undetected_chromedriver as uc
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
# 常量
AUTH_HOME_URL = "https://auth.business.gemini.google/"
LOGIN_URL = "https://auth.business.gemini.google/login?continueUrl=https:%2F%2Fbusiness.gemini.google%2F&wiffid=CAoSJDIwNTlhYzBjLTVlMmMtNGUxZS1hY2JkLThmOGY2ZDE0ODM1Mg"
DEFAULT_XSRF_TOKEN = "KdLRzKwwBTD5wo8nUollAbY6cW0"
class GeminiAutomationUC:
"""Gemini自动化登录(使用 undetected-chromedriver)"""
def __init__(
self,
user_agent: str = "",
proxy: str = "",
headless: bool = True,
timeout: int = 60,
log_callback=None,
) -> None:
self.user_agent = user_agent or self._get_ua()
self.proxy = proxy
self.headless = headless
self.timeout = timeout
self.log_callback = log_callback
self.driver = None
def login_and_extract(self, email: str, mail_client) -> dict:
"""执行登录并提取配置"""
try:
self._create_driver()
return self._run_flow(email, mail_client)
except Exception as exc:
self._log("error", f"automation error: {exc}")
return {"success": False, "error": str(exc)}
finally:
self._cleanup()
def _create_driver(self):
"""创建浏览器驱动"""
options = uc.ChromeOptions()
# 基础参数
options.add_argument("--no-sandbox")
options.add_argument("--disable-setuid-sandbox")
options.add_argument("--window-size=1280,800")
# 代理设置
if self.proxy:
options.add_argument(f"--proxy-server={self.proxy}")
# 无头模式
if self.headless:
options.add_argument("--headless=new")
options.add_argument("--disable-gpu")
options.add_argument("--disable-dev-shm-usage")
# User-Agent
if self.user_agent:
options.add_argument(f"--user-agent={self.user_agent}")
# 创建驱动(undetected-chromedriver 会自动处理反检测)
self.driver = uc.Chrome(
options=options,
version_main=None, # 自动检测 Chrome 版本
use_subprocess=True,
)
# 设置超时
self.driver.set_page_load_timeout(self.timeout)
self.driver.implicitly_wait(10)
def _run_flow(self, email: str, mail_client) -> dict:
"""执行登录流程"""
self._log("info", f"navigating to login page for {email}")
# 访问登录页面
self.driver.get(LOGIN_URL)
time.sleep(3)
# 检查当前页面状态
current_url = self.driver.current_url
has_business_params = "business.gemini.google" in current_url and "csesidx=" in current_url and "/cid/" in current_url
if has_business_params:
return self._extract_config(email)
# 输入邮箱地址
try:
self._log("info", "entering email address")
email_input = WebDriverWait(self.driver, 30).until(
EC.element_to_be_clickable((By.XPATH, "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[1]/div[1]/div/span[2]/input"))
)
email_input.click()
email_input.clear()
for char in email:
email_input.send_keys(char)
time.sleep(0.02)
time.sleep(0.5)
except Exception as e:
self._log("error", f"failed to enter email: {e}")
self._save_screenshot("email_input_failed")
return {"success": False, "error": f"failed to enter email: {e}"}
# 点击继续按钮
try:
continue_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/button"))
)
self.driver.execute_script("arguments[0].click();", continue_btn)
time.sleep(2)
except Exception as e:
self._log("error", f"failed to click continue: {e}")
self._save_screenshot("continue_button_failed")
return {"success": False, "error": f"failed to click continue: {e}"}
# 记录发送验证码的时间
from datetime import datetime
send_time = datetime.now()
# 检查是否需要点击"发送验证码"按钮
self._log("info", "clicking send verification code button")
if not self._click_send_code_button():
self._log("error", "send code button not found")
self._save_screenshot("send_code_button_missing")
return {"success": False, "error": "send code button not found"}
# 等待验证码输入框出现
code_input = self._wait_for_code_input()
if not code_input:
self._log("error", "code input not found")
self._save_screenshot("code_input_missing")
return {"success": False, "error": "code input not found"}
# 获取验证码(传入发送时间)
self._log("info", "polling for verification code")
code = mail_client.poll_for_code(timeout=40, interval=4, since_time=send_time)
if not code:
self._log("error", "verification code timeout")
self._save_screenshot("code_timeout")
return {"success": False, "error": "verification code timeout"}
self._log("info", f"code received: {code}")
# 输入验证码
time.sleep(1)
try:
code_input = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "input[name='pinInput']"))
)
code_input.click()
time.sleep(0.1)
for char in code:
code_input.send_keys(char)
time.sleep(0.05)
except Exception:
try:
span = self.driver.find_element(By.CSS_SELECTOR, "span[data-index='0']")
span.click()
time.sleep(0.2)
self.driver.switch_to.active_element.send_keys(code)
except Exception as e:
self._log("error", f"failed to input code: {e}")
self._save_screenshot("code_input_failed")
return {"success": False, "error": f"failed to input code: {e}"}
# 点击验证按钮
time.sleep(0.5)
try:
verify_btn = self.driver.find_element(By.XPATH, "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/div[1]/span/div[1]/button")
self.driver.execute_script("arguments[0].click();", verify_btn)
except Exception:
try:
buttons = self.driver.find_elements(By.TAG_NAME, "button")
for btn in buttons:
if "验证" in btn.text:
self.driver.execute_script("arguments[0].click();", btn)
break
except Exception as e:
self._log("warning", f"failed to click verify button: {e}")
time.sleep(5)
# 处理协议页面
self._handle_agreement_page()
# 导航到业务页面并等待参数生成
self._log("info", "navigating to business page")
self.driver.get("https://business.gemini.google/")
time.sleep(3)
# 处理用户名设置
if "cid" not in self.driver.current_url:
if self._handle_username_setup():
time.sleep(3)
# 等待 URL 参数生成(csesidx 和 cid)
self._log("info", "waiting for URL parameters")
if not self._wait_for_business_params():
self._log("warning", "URL parameters not generated, trying refresh")
self.driver.refresh()
time.sleep(3)
if not self._wait_for_business_params():
self._log("error", "URL parameters generation failed")
self._save_screenshot("params_missing")
return {"success": False, "error": "URL parameters not found"}
# 提取配置
self._log("info", "login success")
return self._extract_config(email)
def _click_send_code_button(self) -> bool:
"""点击发送验证码按钮(如果需要)"""
time.sleep(2)
# 方法1: 直接通过ID查找
try:
direct_btn = WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable((By.ID, "sign-in-with-email"))
)
self.driver.execute_script("arguments[0].click();", direct_btn)
time.sleep(2)
return True
except TimeoutException:
pass
# 方法2: 通过关键词查找按钮
keywords = ["通过电子邮件发送验证码", "通过电子邮件发送", "email", "Email", "Send code", "Send verification", "Verification code"]
try:
buttons = self.driver.find_elements(By.TAG_NAME, "button")
for btn in buttons:
text = btn.text.strip() if btn.text else ""
if text and any(kw in text for kw in keywords):
self.driver.execute_script("arguments[0].click();", btn)
time.sleep(2)
return True
except Exception:
pass
# 方法3: 检查是否已经在验证码输入页面
try:
code_input = self.driver.find_element(By.CSS_SELECTOR, "input[name='pinInput']")
if code_input:
return True
except NoSuchElementException:
pass
return False
def _wait_for_code_input(self, timeout: int = 30):
"""等待验证码输入框出现"""
try:
element = WebDriverWait(self.driver, timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "input[name='pinInput']"))
)
return element
except TimeoutException:
return None
def _find_code_input(self):
"""查找验证码输入框"""
try:
return self.driver.find_element(By.CSS_SELECTOR, "input[name='pinInput']")
except NoSuchElementException:
return None
def _find_verify_button(self):
"""查找验证按钮"""
try:
return self.driver.find_element(By.XPATH, "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/div[1]/span/div[1]/button")
except NoSuchElementException:
pass
try:
buttons = self.driver.find_elements(By.TAG_NAME, "button")
for btn in buttons:
text = btn.text.strip()
if text and "验证" in text:
return btn
except Exception:
pass
return None
def _handle_agreement_page(self) -> None:
"""处理协议页面"""
if "/admin/create" in self.driver.current_url:
try:
agree_btn = WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "button.agree-button"))
)
agree_btn.click()
time.sleep(2)
except TimeoutException:
pass
def _wait_for_cid(self, timeout: int = 10) -> bool:
"""等待URL包含cid"""
for _ in range(timeout):
if "cid" in self.driver.current_url:
return True
time.sleep(1)
return False
def _wait_for_business_params(self, timeout: int = 30) -> bool:
"""等待业务页面参数生成(csesidx 和 cid)"""
for _ in range(timeout):
url = self.driver.current_url
if "csesidx=" in url and "/cid/" in url:
self._log("info", f"business params ready: {url}")
return True
time.sleep(1)
return False
def _handle_username_setup(self) -> bool:
"""处理用户名设置页面"""
current_url = self.driver.current_url
if "auth.business.gemini.google/login" in current_url:
return False
selectors = [
"input[formcontrolname='fullName']",
"input[placeholder='全名']",
"input[placeholder='Full name']",
"input#mat-input-0",
"input[type='text']",
"input[name='displayName']",
]
username_input = None
for _ in range(30):
for selector in selectors:
try:
username_input = self.driver.find_element(By.CSS_SELECTOR, selector)
if username_input.is_displayed():
break
except Exception:
continue
if username_input and username_input.is_displayed():
break
time.sleep(1)
if not username_input or not username_input.is_displayed():
return False
suffix = "".join(random.choices(string.ascii_letters + string.digits, k=3))
username = f"Test{suffix}"
try:
username_input.click()
time.sleep(0.2)
username_input.clear()
for char in username:
username_input.send_keys(char)
time.sleep(0.02)
time.sleep(0.3)
from selenium.webdriver.common.keys import Keys
username_input.send_keys(Keys.ENTER)
time.sleep(1)
return True
except Exception:
return False
def _extract_config(self, email: str) -> dict:
"""提取配置"""
try:
if "cid/" not in self.driver.current_url:
self.driver.get("https://business.gemini.google/")
time.sleep(3)
url = self.driver.current_url
if "cid/" not in url:
return {"success": False, "error": "cid not found"}
# 提取参数
config_id = url.split("cid/")[1].split("?")[0].split("/")[0]
csesidx = url.split("csesidx=")[1].split("&")[0] if "csesidx=" in url else ""
# 提取 Cookie
cookies = self.driver.get_cookies()
ses = next((c["value"] for c in cookies if c["name"] == "__Secure-C_SES"), None)
host = next((c["value"] for c in cookies if c["name"] == "__Host-C_OSES"), None)
# 计算过期时间
ses_obj = next((c for c in cookies if c["name"] == "__Secure-C_SES"), None)
if ses_obj and "expiry" in ses_obj:
expires_at = datetime.fromtimestamp(ses_obj["expiry"] - 43200).strftime("%Y-%m-%d %H:%M:%S")
else:
expires_at = (datetime.now() + timedelta(hours=12)).strftime("%Y-%m-%d %H:%M:%S")
config = {
"id": email,
"csesidx": csesidx,
"config_id": config_id,
"secure_c_ses": ses,
"host_c_oses": host,
"expires_at": expires_at,
}
return {"success": True, "config": config}
except Exception as e:
return {"success": False, "error": str(e)}
def _save_screenshot(self, name: str) -> None:
"""保存截图"""
try:
import os
screenshot_dir = os.path.join("data", "automation")
os.makedirs(screenshot_dir, exist_ok=True)
path = os.path.join(screenshot_dir, f"{name}_{int(time.time())}.png")
self.driver.save_screenshot(path)
except Exception:
pass
def _cleanup(self) -> None:
"""清理资源"""
if self.driver:
try:
self.driver.quit()
except Exception:
pass
def _log(self, level: str, message: str) -> None:
"""记录日志"""
if self.log_callback:
try:
self.log_callback(level, message)
except Exception:
pass
@staticmethod
def _get_ua() -> str:
"""生成随机User-Agent"""
v = random.choice(["120.0.0.0", "121.0.0.0", "122.0.0.0"])
return f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{v} Safari/537.36"