Spaces:
Sleeping
Sleeping
| from camoufox.sync_api import Camoufox | |
| from playwright.sync_api import TimeoutError as PlaywrightTimeoutError | |
| from bs4 import BeautifulSoup | |
| from urllib.parse import urlparse, parse_qs | |
| from datetime import datetime | |
| import time, random, json, os, requests | |
| print("Main module loaded") | |
| # 配置 | |
| DEFAULT_CONFIG = { | |
| "total_accounts": 20, | |
| "mail_api": "https://mail.chatgpt.org.uk", | |
| "mail_key": "gpt-test", | |
| "output_dir": "gemini_accounts", | |
| "login_url": "https://auth.business.gemini.google/login?continueUrl=https:%2F%2Fbusiness.gemini.google%2F&wiffid=CAoSJDIwNTlhYzBjLTVlMmMtNGUxZC1hY2JkLThmOGY2ZDE0ODM1Mg", | |
| "headless": True, | |
| "window_size": "1366,768", | |
| "lang": "zh-CN", | |
| "timezone": "Asia/Shanghai", | |
| "nav_timeout_ms": 120000, | |
| "config_wait_ms": 30000 | |
| } | |
| TOTAL_ACCOUNTS = DEFAULT_CONFIG["total_accounts"] | |
| MAIL_API = DEFAULT_CONFIG["mail_api"] | |
| MAIL_KEY = DEFAULT_CONFIG["mail_key"] | |
| OUTPUT_DIR = DEFAULT_CONFIG["output_dir"] | |
| LOGIN_URL = DEFAULT_CONFIG["login_url"] | |
| HEADLESS = DEFAULT_CONFIG["headless"] | |
| WINDOW_SIZE = DEFAULT_CONFIG["window_size"] | |
| BROWSER_LANG = DEFAULT_CONFIG["lang"] | |
| TIMEZONE = DEFAULT_CONFIG["timezone"] | |
| NAV_TIMEOUT_MS = DEFAULT_CONFIG["nav_timeout_ms"] | |
| CONFIG_WAIT_MS = DEFAULT_CONFIG["config_wait_ms"] | |
| SCREENSHOT_DIR = "screenshots" | |
| # XPath | |
| XPATH = { | |
| "email_input": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[1]/div[1]/div/span[2]/input", | |
| "continue_btn": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/button", | |
| "verify_btn": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/div[1]/span/div[1]/button", | |
| } | |
| NAMES = ["James Smith", "John Johnson", "Robert Williams", "Michael Brown", "William Jones", | |
| "David Garcia", "Mary Miller", "Patricia Davis", "Jennifer Rodriguez", "Linda Martinez"] | |
| def parse_window_size(value, fallback=(1366, 768)): | |
| try: | |
| parts = str(value).lower().replace("x", ",").split(",") | |
| width = int(parts[0].strip()) | |
| height = int(parts[1].strip()) | |
| return width, height | |
| except Exception: | |
| return fallback | |
| class CamoufoxSession: | |
| def __init__(self, launch_options, context_options): | |
| self.camoufox = Camoufox(**launch_options) | |
| self.browser = self.camoufox.__enter__() | |
| self.context = self.browser.new_context(**context_options) | |
| self.page = self.context.new_page() | |
| def clear_cookies(self): | |
| try: | |
| self.context.clear_cookies() | |
| except Exception: | |
| pass | |
| def close(self): | |
| try: | |
| self.context.close() | |
| except Exception: | |
| pass | |
| try: | |
| self.camoufox.__exit__(None, None, None) | |
| except Exception: | |
| pass | |
| def log(msg, level="INFO"): print(f"[{level}] {msg}") | |
| def load_config(path="config.json"): | |
| cfg = dict(DEFAULT_CONFIG) | |
| if os.path.exists(path): | |
| with open(path, "r", encoding="utf-8") as f: | |
| user_cfg = json.load(f) | |
| if not isinstance(user_cfg, dict): | |
| raise ValueError("config.json 必须是 JSON 对象") | |
| cfg.update(expand_env_vars(user_cfg)) | |
| return cfg | |
| def expand_env_vars(value): | |
| if isinstance(value, dict): | |
| return {k: expand_env_vars(v) for k, v in value.items()} | |
| if isinstance(value, list): | |
| return [expand_env_vars(v) for v in value] | |
| if isinstance(value, str): | |
| return os.path.expandvars(value) | |
| return value | |
| def apply_config(cfg): | |
| global TOTAL_ACCOUNTS, MAIL_API, MAIL_KEY, OUTPUT_DIR, LOGIN_URL, HEADLESS, WINDOW_SIZE, BROWSER_LANG, TIMEZONE, NAV_TIMEOUT_MS, CONFIG_WAIT_MS | |
| TOTAL_ACCOUNTS = int(cfg.get("total_accounts", TOTAL_ACCOUNTS)) | |
| MAIL_API = cfg.get("mail_api", MAIL_API) | |
| MAIL_KEY = cfg.get("mail_key", MAIL_KEY) | |
| OUTPUT_DIR = cfg.get("output_dir", OUTPUT_DIR) | |
| LOGIN_URL = cfg.get("login_url", LOGIN_URL) | |
| HEADLESS = True | |
| WINDOW_SIZE = cfg.get("window_size", WINDOW_SIZE) | |
| BROWSER_LANG = cfg.get("lang", BROWSER_LANG) | |
| TIMEZONE = cfg.get("timezone", TIMEZONE) | |
| NAV_TIMEOUT_MS = int(cfg.get("nav_timeout_ms", NAV_TIMEOUT_MS)) | |
| CONFIG_WAIT_MS = int(cfg.get("config_wait_ms", CONFIG_WAIT_MS)) | |
| def create_email(): | |
| """创建临时邮箱""" | |
| try: | |
| r = requests.get(f"{MAIL_API}/api/generate-email", | |
| headers={"X-API-Key": MAIL_KEY}, timeout=30) | |
| if r.status_code == 200 and r.json().get('success'): | |
| email = r.json()['data']['email'] | |
| log(f"邮箱创建: {email}") | |
| return email | |
| except Exception as e: | |
| log(f"创建邮箱失败: {e}", "ERR") | |
| return None | |
| def get_code(email, timeout=30): | |
| """获取验证码""" | |
| log(f"等待验证码 (最多{timeout}s)...") | |
| start = time.time() | |
| while time.time() - start < timeout: | |
| try: | |
| r = requests.get(f"{MAIL_API}/api/emails", params={"email": email}, | |
| headers={"X-API-Key": MAIL_KEY}, timeout=30) | |
| if r.status_code == 200: | |
| emails = r.json().get('data', {}).get('emails', []) | |
| if emails: | |
| html = emails[0].get('html_content') or emails[0].get('content', '') | |
| soup = BeautifulSoup(html, 'html.parser') | |
| span = soup.find('span', class_='verification-code') | |
| if span: | |
| code = span.get_text().strip() | |
| if len(code) == 6: | |
| log(f"验证码: {code}") | |
| return code | |
| except: pass | |
| print(f" 等待中... ({int(time.time()-start)}s)", end='\r') | |
| time.sleep(3) | |
| log("验证码超时", "ERR") | |
| return None | |
| def save_config(email, page, context, timeout_ms=None): | |
| """保存配置,轮询等待关键字段""" | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| wait_ms = CONFIG_WAIT_MS if timeout_ms is None else timeout_ms | |
| log(f"等待配置数据 (最多{wait_ms}ms)...") | |
| start = time.time() | |
| while (time.time() - start) * 1000 < wait_ms: | |
| url = page.url | |
| parsed = urlparse(url) | |
| path_parts = url.split('/') | |
| config_id = None | |
| for i, p in enumerate(path_parts): | |
| if p == 'cid' and i+1 < len(path_parts): | |
| config_id = path_parts[i+1] | |
| if config_id and '?' in config_id: | |
| config_id = config_id.split('?')[0] | |
| break | |
| cookies = context.cookies() | |
| cookie_dict = {c['name']: c for c in cookies} | |
| ses_cookie = cookie_dict.get('__Secure-C_SES', {}) | |
| host_cookie = cookie_dict.get('__Host-C_OSES', {}) | |
| csesidx = parse_qs(parsed.query).get('csesidx', [None])[0] | |
| if ses_cookie.get('value') and host_cookie.get('value') and csesidx and config_id: | |
| expiry = ses_cookie.get('expiry', None) | |
| if expiry is None: | |
| expiry = ses_cookie.get('expires', None) | |
| expires_at = None | |
| if expiry and expiry > 0: | |
| expires_at = datetime.fromtimestamp(expiry - 43200).strftime('%Y-%m-%d %H:%M:%S') | |
| data = { | |
| "id": email, | |
| "csesidx": csesidx, | |
| "config_id": config_id, | |
| "secure_c_ses": ses_cookie.get('value'), | |
| "host_c_oses": host_cookie.get('value'), | |
| "expires_at": expires_at | |
| } | |
| log(f"配置数据已就绪 ({time.time() - start:.1f}s)") | |
| with open(f"{OUTPUT_DIR}/{email}.json", 'w') as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| log(f"配置已保存: {email}.json") | |
| return data | |
| time.sleep(1) | |
| log(f"保存配置: url={page.url}") | |
| log(f"Cookie键: {list({c['name'] for c in context.cookies()})}") | |
| parsed = urlparse(page.url) | |
| missing = [] | |
| cookie_dict = {c['name']: c for c in context.cookies()} | |
| if not cookie_dict.get('__Secure-C_SES', {}).get('value'): missing.append('secure_c_ses') | |
| if not cookie_dict.get('__Host-C_OSES', {}).get('value'): missing.append('host_c_oses') | |
| if not parse_qs(parsed.query).get('csesidx', [None])[0]: missing.append('csesidx') | |
| if '/cid/' not in page.url: missing.append('config_id') | |
| log(f"配置不完整,缺失字段: {', '.join(missing)},跳过: {email}", "WARN") | |
| return None | |
| def save_error_screenshot(page, email): | |
| """失败时保存截图""" | |
| safe_email = email or "unknown" | |
| os.makedirs(SCREENSHOT_DIR, exist_ok=True) | |
| filename = f"{safe_email}_err.png" | |
| path = os.path.join(SCREENSHOT_DIR, filename) | |
| try: | |
| page.screenshot(path=path, full_page=True) | |
| log(f"失败截图已保存: {path}", "WARN") | |
| return filename | |
| except Exception as e: | |
| log(f"保存失败截图出错: {e}", "WARN") | |
| return None | |
| def register(page, context): | |
| """注册单个账号""" | |
| email = create_email() | |
| if not email: return None, False, None | |
| # 1. 访问登录页 | |
| log(f"访问登录页: {LOGIN_URL}") | |
| page.goto(LOGIN_URL, wait_until="domcontentloaded", timeout=NAV_TIMEOUT_MS) | |
| log(f"页面已加载: {page.url}") | |
| time.sleep(5) | |
| # 2. 输入邮箱 | |
| log("输入邮箱...") | |
| inp = page.locator(f"xpath={XPATH['email_input']}") | |
| inp.wait_for(state="visible", timeout=60000) | |
| inp.click() | |
| time.sleep(0.3) | |
| inp.fill("") | |
| page.keyboard.type(email, delay=50) | |
| actual_value = inp.input_value() | |
| log(f"邮箱: {email}, 实际值: {actual_value}") | |
| time.sleep(1) | |
| # 3. 点击继续 | |
| page.locator(f"xpath={XPATH['continue_btn']}").click() | |
| log("点击继续") | |
| time.sleep(3) | |
| log(f"继续后URL: {page.url}") | |
| # 4. 获取验证码 | |
| code = get_code(email) | |
| if not code: return email, False, None | |
| # 5. 输入验证码 | |
| time.sleep(2) | |
| log(f"输入验证码: {code}") | |
| try: | |
| pin = page.wait_for_selector("input[name='pinInput']", timeout=60000) | |
| pin.click() | |
| time.sleep(0.2) | |
| page.keyboard.type(code, delay=100) | |
| except PlaywrightTimeoutError: | |
| try: | |
| page.locator("span[data-index='0']").click() | |
| time.sleep(0.3) | |
| page.keyboard.type(code, delay=100) | |
| except Exception as e: | |
| log(f"验证码输入失败: {e}", "ERR") | |
| return email, False, None | |
| # 6. 点击验证 | |
| time.sleep(1) | |
| try: | |
| page.locator(f"xpath={XPATH['verify_btn']}").click() | |
| except Exception: | |
| try: | |
| page.locator("button", has_text="验证").first.click() | |
| except Exception: | |
| pass | |
| log("点击验证") | |
| time.sleep(5) | |
| log(f"验证后URL: {page.url}") | |
| # 7. 输入姓名 | |
| try: | |
| name_inp = page.wait_for_selector( | |
| "input[formcontrolname='fullName'], input[placeholder='全名'], input#mat-input-0", | |
| timeout=30000, | |
| ) | |
| name = random.choice(NAMES) | |
| name_inp.fill("") | |
| time.sleep(0.3) | |
| page.keyboard.type(name, delay=30) | |
| log(f"姓名: {name}") | |
| name_inp.press("Enter") | |
| except Exception as e: | |
| log(f"姓名输入异常: {e}", "WARN") | |
| # 8. 等待进入工作台 | |
| log("等待工作台...") | |
| time.sleep(6) | |
| for _ in range(30): | |
| if 'business.gemini.google' in page.url and 'auth' not in page.url: | |
| break | |
| time.sleep(2) | |
| time.sleep(3) | |
| log(f"最终URL: {page.url}") | |
| # 9. 保存配置 | |
| config = save_config(email, page, context) | |
| if config: | |
| log(f"注册成功: {email}") | |
| return email, True, config | |
| return email, False, None | |
| def create_session(): | |
| if TIMEZONE: | |
| os.environ["TZ"] = TIMEZONE | |
| if hasattr(time, "tzset"): | |
| time.tzset() | |
| window_width, window_height = parse_window_size(WINDOW_SIZE) | |
| camoufox_exec = os.getenv("CAMOUFOX_EXECUTABLE") | |
| launch_options = { | |
| "headless": HEADLESS, | |
| "window": (window_width, window_height), | |
| } | |
| if BROWSER_LANG: | |
| launch_options["locale"] = BROWSER_LANG | |
| if camoufox_exec: | |
| launch_options["executable_path"] = camoufox_exec | |
| context_options = { | |
| "viewport": {"width": window_width, "height": window_height}, | |
| } | |
| if BROWSER_LANG: | |
| context_options["locale"] = BROWSER_LANG | |
| if TIMEZONE: | |
| context_options["timezone_id"] = TIMEZONE | |
| session = CamoufoxSession(launch_options, context_options) | |
| session.page.set_default_timeout(NAV_TIMEOUT_MS) | |
| session.page.set_default_navigation_timeout(NAV_TIMEOUT_MS) | |
| return session | |
| def register_one_account(): | |
| session = create_session() | |
| email = None | |
| screenshot = None | |
| try: | |
| email, ok, cfg = register(session.page, session.context) | |
| if not ok: | |
| screenshot = save_error_screenshot(session.page, email) | |
| return email, ok, cfg, screenshot | |
| except Exception: | |
| screenshot = save_error_screenshot(session.page, email) | |
| raise | |
| finally: | |
| session.close() | |
| def main(): | |
| cfg = load_config() | |
| apply_config(cfg) | |
| print(f"\n{'='*50}\nGemini Business 批量注册 - 共 {TOTAL_ACCOUNTS} 个\n{'='*50}\n") | |
| session = create_session() | |
| success, fail, accounts = 0, 0, [] | |
| for i in range(TOTAL_ACCOUNTS): | |
| print(f"\n{'#'*40}\n注册 {i+1}/{TOTAL_ACCOUNTS}\n{'#'*40}\n") | |
| try: | |
| if session.page.is_closed(): | |
| raise RuntimeError("page已关闭") | |
| except: | |
| session.close() | |
| session = create_session() | |
| email = None | |
| try: | |
| email, ok, cfg = register(session.page, session.context) | |
| if ok: success += 1; accounts.append((email, cfg)) | |
| else: | |
| save_error_screenshot(session.page, email) | |
| fail += 1 | |
| except Exception as e: | |
| log(f"异常: {e}", "ERR"); fail += 1 | |
| save_error_screenshot(session.page, email) | |
| session.close() | |
| session = create_session() | |
| print(f"\n进度: {i+1}/{TOTAL_ACCOUNTS} | 成功: {success} | 失败: {fail}") | |
| if i < TOTAL_ACCOUNTS - 1: | |
| session.clear_cookies() | |
| time.sleep(random.randint(3, 5)) | |
| session.close() | |
| print(f"\n{'='*50}\n完成! 成功: {success}, 失败: {fail}\n配置保存在: {OUTPUT_DIR}/\n{'='*50}") | |
| if __name__ == "__main__": | |
| main() | |