from camoufox.sync_api import Camoufox from playwright.sync_api import TimeoutError as PlaywrightTimeoutError from bs4 import BeautifulSoup from urllib.parse import urlparse, parse_qs from datetime import datetime import time, random, json, os, requests print("Main module loaded") # 配置 DEFAULT_CONFIG = { "total_accounts": 20, "mail_api": "https://mail.chatgpt.org.uk", "mail_key": "gpt-test", "output_dir": "gemini_accounts", "login_url": "https://auth.business.gemini.google/login?continueUrl=https:%2F%2Fbusiness.gemini.google%2F&wiffid=CAoSJDIwNTlhYzBjLTVlMmMtNGUxZC1hY2JkLThmOGY2ZDE0ODM1Mg", "headless": True, "window_size": "1366,768", "lang": "zh-CN", "timezone": "Asia/Shanghai", "nav_timeout_ms": 120000, "config_wait_ms": 30000 } TOTAL_ACCOUNTS = DEFAULT_CONFIG["total_accounts"] MAIL_API = DEFAULT_CONFIG["mail_api"] MAIL_KEY = DEFAULT_CONFIG["mail_key"] OUTPUT_DIR = DEFAULT_CONFIG["output_dir"] LOGIN_URL = DEFAULT_CONFIG["login_url"] HEADLESS = DEFAULT_CONFIG["headless"] WINDOW_SIZE = DEFAULT_CONFIG["window_size"] BROWSER_LANG = DEFAULT_CONFIG["lang"] TIMEZONE = DEFAULT_CONFIG["timezone"] NAV_TIMEOUT_MS = DEFAULT_CONFIG["nav_timeout_ms"] CONFIG_WAIT_MS = DEFAULT_CONFIG["config_wait_ms"] SCREENSHOT_DIR = "screenshots" # XPath XPATH = { "email_input": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[1]/div[1]/div/span[2]/input", "continue_btn": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/button", "verify_btn": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/div[1]/span/div[1]/button", } NAMES = ["James Smith", "John Johnson", "Robert Williams", "Michael Brown", "William Jones", "David Garcia", "Mary Miller", "Patricia Davis", "Jennifer Rodriguez", "Linda Martinez"] def parse_window_size(value, fallback=(1366, 768)): try: parts = str(value).lower().replace("x", ",").split(",") width = int(parts[0].strip()) height = int(parts[1].strip()) return width, height except Exception: return fallback class CamoufoxSession: def __init__(self, launch_options, context_options): self.camoufox = Camoufox(**launch_options) self.browser = self.camoufox.__enter__() self.context = self.browser.new_context(**context_options) self.page = self.context.new_page() def clear_cookies(self): try: self.context.clear_cookies() except Exception: pass def close(self): try: self.context.close() except Exception: pass try: self.camoufox.__exit__(None, None, None) except Exception: pass def log(msg, level="INFO"): print(f"[{level}] {msg}") def load_config(path="config.json"): cfg = dict(DEFAULT_CONFIG) if os.path.exists(path): with open(path, "r", encoding="utf-8") as f: user_cfg = json.load(f) if not isinstance(user_cfg, dict): raise ValueError("config.json 必须是 JSON 对象") cfg.update(expand_env_vars(user_cfg)) return cfg def expand_env_vars(value): if isinstance(value, dict): return {k: expand_env_vars(v) for k, v in value.items()} if isinstance(value, list): return [expand_env_vars(v) for v in value] if isinstance(value, str): return os.path.expandvars(value) return value def apply_config(cfg): global TOTAL_ACCOUNTS, MAIL_API, MAIL_KEY, OUTPUT_DIR, LOGIN_URL, HEADLESS, WINDOW_SIZE, BROWSER_LANG, TIMEZONE, NAV_TIMEOUT_MS, CONFIG_WAIT_MS TOTAL_ACCOUNTS = int(cfg.get("total_accounts", TOTAL_ACCOUNTS)) MAIL_API = cfg.get("mail_api", MAIL_API) MAIL_KEY = cfg.get("mail_key", MAIL_KEY) OUTPUT_DIR = cfg.get("output_dir", OUTPUT_DIR) LOGIN_URL = cfg.get("login_url", LOGIN_URL) HEADLESS = True WINDOW_SIZE = cfg.get("window_size", WINDOW_SIZE) BROWSER_LANG = cfg.get("lang", BROWSER_LANG) TIMEZONE = cfg.get("timezone", TIMEZONE) NAV_TIMEOUT_MS = int(cfg.get("nav_timeout_ms", NAV_TIMEOUT_MS)) CONFIG_WAIT_MS = int(cfg.get("config_wait_ms", CONFIG_WAIT_MS)) def create_email(): """创建临时邮箱""" try: r = requests.get(f"{MAIL_API}/api/generate-email", headers={"X-API-Key": MAIL_KEY}, timeout=30) if r.status_code == 200 and r.json().get('success'): email = r.json()['data']['email'] log(f"邮箱创建: {email}") return email except Exception as e: log(f"创建邮箱失败: {e}", "ERR") return None def get_code(email, timeout=30): """获取验证码""" log(f"等待验证码 (最多{timeout}s)...") start = time.time() while time.time() - start < timeout: try: r = requests.get(f"{MAIL_API}/api/emails", params={"email": email}, headers={"X-API-Key": MAIL_KEY}, timeout=30) if r.status_code == 200: emails = r.json().get('data', {}).get('emails', []) if emails: html = emails[0].get('html_content') or emails[0].get('content', '') soup = BeautifulSoup(html, 'html.parser') span = soup.find('span', class_='verification-code') if span: code = span.get_text().strip() if len(code) == 6: log(f"验证码: {code}") return code except: pass print(f" 等待中... ({int(time.time()-start)}s)", end='\r') time.sleep(3) log("验证码超时", "ERR") return None def save_config(email, page, context, timeout_ms=None): """保存配置,轮询等待关键字段""" os.makedirs(OUTPUT_DIR, exist_ok=True) wait_ms = CONFIG_WAIT_MS if timeout_ms is None else timeout_ms log(f"等待配置数据 (最多{wait_ms}ms)...") start = time.time() while (time.time() - start) * 1000 < wait_ms: url = page.url parsed = urlparse(url) path_parts = url.split('/') config_id = None for i, p in enumerate(path_parts): if p == 'cid' and i+1 < len(path_parts): config_id = path_parts[i+1] if config_id and '?' in config_id: config_id = config_id.split('?')[0] break cookies = context.cookies() cookie_dict = {c['name']: c for c in cookies} ses_cookie = cookie_dict.get('__Secure-C_SES', {}) host_cookie = cookie_dict.get('__Host-C_OSES', {}) csesidx = parse_qs(parsed.query).get('csesidx', [None])[0] if ses_cookie.get('value') and host_cookie.get('value') and csesidx and config_id: expiry = ses_cookie.get('expiry', None) if expiry is None: expiry = ses_cookie.get('expires', None) expires_at = None if expiry and expiry > 0: expires_at = datetime.fromtimestamp(expiry - 43200).strftime('%Y-%m-%d %H:%M:%S') data = { "id": email, "csesidx": csesidx, "config_id": config_id, "secure_c_ses": ses_cookie.get('value'), "host_c_oses": host_cookie.get('value'), "expires_at": expires_at } log(f"配置数据已就绪 ({time.time() - start:.1f}s)") with open(f"{OUTPUT_DIR}/{email}.json", 'w') as f: json.dump(data, f, indent=2, ensure_ascii=False) log(f"配置已保存: {email}.json") return data time.sleep(1) log(f"保存配置: url={page.url}") log(f"Cookie键: {list({c['name'] for c in context.cookies()})}") parsed = urlparse(page.url) missing = [] cookie_dict = {c['name']: c for c in context.cookies()} if not cookie_dict.get('__Secure-C_SES', {}).get('value'): missing.append('secure_c_ses') if not cookie_dict.get('__Host-C_OSES', {}).get('value'): missing.append('host_c_oses') if not parse_qs(parsed.query).get('csesidx', [None])[0]: missing.append('csesidx') if '/cid/' not in page.url: missing.append('config_id') log(f"配置不完整,缺失字段: {', '.join(missing)},跳过: {email}", "WARN") return None def save_error_screenshot(page, email): """失败时保存截图""" safe_email = email or "unknown" os.makedirs(SCREENSHOT_DIR, exist_ok=True) filename = f"{safe_email}_err.png" path = os.path.join(SCREENSHOT_DIR, filename) try: page.screenshot(path=path, full_page=True) log(f"失败截图已保存: {path}", "WARN") return filename except Exception as e: log(f"保存失败截图出错: {e}", "WARN") return None def register(page, context): """注册单个账号""" email = create_email() if not email: return None, False, None # 1. 访问登录页 log(f"访问登录页: {LOGIN_URL}") page.goto(LOGIN_URL, wait_until="domcontentloaded", timeout=NAV_TIMEOUT_MS) log(f"页面已加载: {page.url}") time.sleep(5) # 2. 输入邮箱 log("输入邮箱...") inp = page.locator(f"xpath={XPATH['email_input']}") inp.wait_for(state="visible", timeout=60000) inp.click() time.sleep(0.3) inp.fill("") page.keyboard.type(email, delay=50) actual_value = inp.input_value() log(f"邮箱: {email}, 实际值: {actual_value}") time.sleep(1) # 3. 点击继续 page.locator(f"xpath={XPATH['continue_btn']}").click() log("点击继续") time.sleep(3) log(f"继续后URL: {page.url}") # 4. 获取验证码 code = get_code(email) if not code: return email, False, None # 5. 输入验证码 time.sleep(2) log(f"输入验证码: {code}") try: pin = page.wait_for_selector("input[name='pinInput']", timeout=60000) pin.click() time.sleep(0.2) page.keyboard.type(code, delay=100) except PlaywrightTimeoutError: try: page.locator("span[data-index='0']").click() time.sleep(0.3) page.keyboard.type(code, delay=100) except Exception as e: log(f"验证码输入失败: {e}", "ERR") return email, False, None # 6. 点击验证 time.sleep(1) try: page.locator(f"xpath={XPATH['verify_btn']}").click() except Exception: try: page.locator("button", has_text="验证").first.click() except Exception: pass log("点击验证") time.sleep(5) log(f"验证后URL: {page.url}") # 7. 输入姓名 try: name_inp = page.wait_for_selector( "input[formcontrolname='fullName'], input[placeholder='全名'], input#mat-input-0", timeout=30000, ) name = random.choice(NAMES) name_inp.fill("") time.sleep(0.3) page.keyboard.type(name, delay=30) log(f"姓名: {name}") name_inp.press("Enter") except Exception as e: log(f"姓名输入异常: {e}", "WARN") # 8. 等待进入工作台 log("等待工作台...") time.sleep(6) for _ in range(30): if 'business.gemini.google' in page.url and 'auth' not in page.url: break time.sleep(2) time.sleep(3) log(f"最终URL: {page.url}") # 9. 保存配置 config = save_config(email, page, context) if config: log(f"注册成功: {email}") return email, True, config return email, False, None def create_session(): if TIMEZONE: os.environ["TZ"] = TIMEZONE if hasattr(time, "tzset"): time.tzset() window_width, window_height = parse_window_size(WINDOW_SIZE) camoufox_exec = os.getenv("CAMOUFOX_EXECUTABLE") launch_options = { "headless": HEADLESS, "window": (window_width, window_height), } if BROWSER_LANG: launch_options["locale"] = BROWSER_LANG if camoufox_exec: launch_options["executable_path"] = camoufox_exec context_options = { "viewport": {"width": window_width, "height": window_height}, } if BROWSER_LANG: context_options["locale"] = BROWSER_LANG if TIMEZONE: context_options["timezone_id"] = TIMEZONE session = CamoufoxSession(launch_options, context_options) session.page.set_default_timeout(NAV_TIMEOUT_MS) session.page.set_default_navigation_timeout(NAV_TIMEOUT_MS) return session def register_one_account(): session = create_session() email = None screenshot = None try: email, ok, cfg = register(session.page, session.context) if not ok: screenshot = save_error_screenshot(session.page, email) return email, ok, cfg, screenshot except Exception: screenshot = save_error_screenshot(session.page, email) raise finally: session.close() def main(): cfg = load_config() apply_config(cfg) print(f"\n{'='*50}\nGemini Business 批量注册 - 共 {TOTAL_ACCOUNTS} 个\n{'='*50}\n") session = create_session() success, fail, accounts = 0, 0, [] for i in range(TOTAL_ACCOUNTS): print(f"\n{'#'*40}\n注册 {i+1}/{TOTAL_ACCOUNTS}\n{'#'*40}\n") try: if session.page.is_closed(): raise RuntimeError("page已关闭") except: session.close() session = create_session() email = None try: email, ok, cfg = register(session.page, session.context) if ok: success += 1; accounts.append((email, cfg)) else: save_error_screenshot(session.page, email) fail += 1 except Exception as e: log(f"异常: {e}", "ERR"); fail += 1 save_error_screenshot(session.page, email) session.close() session = create_session() print(f"\n进度: {i+1}/{TOTAL_ACCOUNTS} | 成功: {success} | 失败: {fail}") if i < TOTAL_ACCOUNTS - 1: session.clear_cookies() time.sleep(random.randint(3, 5)) session.close() print(f"\n{'='*50}\n完成! 成功: {success}, 失败: {fail}\n配置保存在: {OUTPUT_DIR}/\n{'='*50}") if __name__ == "__main__": main()