Spaces:
Paused
Paused
| from seleniumbase import Driver | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from bs4 import BeautifulSoup | |
| from urllib.parse import urlparse, parse_qs | |
| from datetime import datetime | |
| import time, random, json, os, requests | |
| # 配置 | |
| DEFAULT_CONFIG = { | |
| "total_accounts": 20, | |
| "mail_api": "https://mail.chatgpt.org.uk", | |
| "mail_key": "gpt-test", | |
| "output_dir": "gemini_accounts", | |
| "login_url": "https://auth.business.gemini.google/login?continueUrl=https:%2F%2Fbusiness.gemini.google%2F&wiffid=CAoSJDIwNTlhYzBjLTVlMmMtNGUxZC1hY2JkLThmOGY2ZDE0ODM1Mg", | |
| "headless": False | |
| } | |
| TOTAL_ACCOUNTS = DEFAULT_CONFIG["total_accounts"] | |
| MAIL_API = DEFAULT_CONFIG["mail_api"] | |
| MAIL_KEY = DEFAULT_CONFIG["mail_key"] | |
| OUTPUT_DIR = DEFAULT_CONFIG["output_dir"] | |
| LOGIN_URL = DEFAULT_CONFIG["login_url"] | |
| HEADLESS = DEFAULT_CONFIG["headless"] | |
| SCREENSHOT_DIR = "screenshots" | |
| # XPath | |
| XPATH = { | |
| "email_input": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[1]/div[1]/div/span[2]/input", | |
| "continue_btn": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/button", | |
| "verify_btn": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/div[1]/span/div[1]/button", | |
| } | |
| NAMES = ["James Smith", "John Johnson", "Robert Williams", "Michael Brown", "William Jones", | |
| "David Garcia", "Mary Miller", "Patricia Davis", "Jennifer Rodriguez", "Linda Martinez"] | |
| def log(msg, level="INFO"): print(f"[{level}] {msg}") | |
| def load_config(path="config.json"): | |
| cfg = dict(DEFAULT_CONFIG) | |
| if os.path.exists(path): | |
| with open(path, "r", encoding="utf-8") as f: | |
| user_cfg = json.load(f) | |
| if not isinstance(user_cfg, dict): | |
| raise ValueError("config.json 必须是 JSON 对象") | |
| cfg.update(expand_env_vars(user_cfg)) | |
| return cfg | |
| def expand_env_vars(value): | |
| if isinstance(value, dict): | |
| return {k: expand_env_vars(v) for k, v in value.items()} | |
| if isinstance(value, list): | |
| return [expand_env_vars(v) for v in value] | |
| if isinstance(value, str): | |
| return os.path.expandvars(value) | |
| return value | |
| def apply_config(cfg): | |
| global TOTAL_ACCOUNTS, MAIL_API, MAIL_KEY, OUTPUT_DIR, LOGIN_URL, HEADLESS | |
| TOTAL_ACCOUNTS = int(cfg.get("total_accounts", TOTAL_ACCOUNTS)) | |
| MAIL_API = cfg.get("mail_api", MAIL_API) | |
| MAIL_KEY = cfg.get("mail_key", MAIL_KEY) | |
| OUTPUT_DIR = cfg.get("output_dir", OUTPUT_DIR) | |
| LOGIN_URL = cfg.get("login_url", LOGIN_URL) | |
| HEADLESS = cfg.get("headless", HEADLESS) | |
| def create_email(): | |
| """创建临时邮箱""" | |
| try: | |
| r = requests.get(f"{MAIL_API}/api/generate-email", | |
| headers={"X-API-Key": MAIL_KEY}, timeout=30) | |
| if r.status_code == 200 and r.json().get('success'): | |
| email = r.json()['data']['email'] | |
| log(f"邮箱创建: {email}") | |
| return email | |
| except Exception as e: | |
| log(f"创建邮箱失败: {e}", "ERR") | |
| return None | |
| def get_code(email, driver, timeout=30): | |
| """获取验证码""" | |
| log(f"等待验证码 (最多{timeout}s)...") | |
| start = time.time() | |
| while time.time() - start < timeout: | |
| try: | |
| r = requests.get(f"{MAIL_API}/api/emails", params={"email": email}, | |
| headers={"X-API-Key": MAIL_KEY}, timeout=30) | |
| if r.status_code == 200: | |
| emails = r.json().get('data', {}).get('emails', []) | |
| if emails: | |
| html = emails[0].get('html_content') or emails[0].get('content', '') | |
| soup = BeautifulSoup(html, 'html.parser') | |
| span = soup.find('span', class_='verification-code') | |
| if span: | |
| code = span.get_text().strip() | |
| if len(code) == 6: | |
| log(f"验证码: {code}") | |
| return code | |
| except: pass | |
| print(f" 等待中... ({int(time.time()-start)}s)", end='\r') | |
| time.sleep(3) | |
| log("验证码超时", "ERR") | |
| return None | |
| def save_config(email, cookies, url): | |
| """保存配置""" | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| parsed = urlparse(url) | |
| path_parts = url.split('/') | |
| config_id = None | |
| for i, p in enumerate(path_parts): | |
| if p == 'cid' and i+1 < len(path_parts): | |
| config_id = path_parts[i+1] | |
| # 清理 config_id 结尾的 ?csesidx=xxx | |
| if config_id and '?' in config_id: | |
| config_id = config_id.split('?')[0] | |
| break | |
| cookie_dict = {c['name']: c for c in cookies} | |
| ses_cookie = cookie_dict.get('__Secure-C_SES', {}) | |
| data = { | |
| "id": email, | |
| "csesidx": parse_qs(parsed.query).get('csesidx', [None])[0], | |
| "config_id": config_id, | |
| "secure_c_ses": ses_cookie.get('value'), | |
| "host_c_oses": cookie_dict.get('__Host-C_OSES', {}).get('value'), | |
| "expires_at": datetime.fromtimestamp(ses_cookie.get('expiry', 0) - 43200).strftime('%Y-%m-%d %H:%M:%S') if ses_cookie.get('expiry') else None | |
| } | |
| with open(f"{OUTPUT_DIR}/{email}.json", 'w') as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| log(f"配置已保存: {email}.json") | |
| return data | |
| def save_error_screenshot(driver, email): | |
| """失败时保存截图""" | |
| safe_email = email or "unknown" | |
| os.makedirs(SCREENSHOT_DIR, exist_ok=True) | |
| filename = f"{safe_email}_err.png" | |
| path = os.path.join(SCREENSHOT_DIR, filename) | |
| try: | |
| driver.save_screenshot(path) | |
| log(f"失败截图已保存: {path}", "WARN") | |
| return filename | |
| except Exception as e: | |
| log(f"保存失败截图出错: {e}", "WARN") | |
| return None | |
| def register(driver): | |
| """注册单个账号""" | |
| email = create_email() | |
| if not email: return None, False, None | |
| wait = WebDriverWait(driver, 60) | |
| # 1. 访问登录页 | |
| driver.get(LOGIN_URL) | |
| time.sleep(5) | |
| # 2. 输入邮箱 | |
| log("输入邮箱...") | |
| inp = wait.until(EC.visibility_of_element_located((By.XPATH, XPATH["email_input"]))) | |
| inp.click(); time.sleep(0.3); inp.clear(); time.sleep(0.3) | |
| for c in email: inp.send_keys(c); time.sleep(0.05) | |
| log(f"邮箱: {email}, 实际值: {inp.get_attribute('value')}") | |
| time.sleep(1) | |
| # 3. 点击继续 | |
| btn = wait.until(EC.element_to_be_clickable((By.XPATH, XPATH["continue_btn"]))) | |
| driver.execute_script("arguments[0].click();", btn) | |
| log("点击继续") | |
| time.sleep(3) | |
| # 4. 获取验证码 | |
| code = get_code(email, driver) | |
| if not code: return email, False, None | |
| # 5. 输入验证码 | |
| time.sleep(2) | |
| log(f"输入验证码: {code}") | |
| try: | |
| pin = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "input[name='pinInput']"))) | |
| pin.click(); time.sleep(0.2) | |
| for c in code: pin.send_keys(c); time.sleep(0.1) | |
| except: | |
| try: | |
| span = driver.find_element(By.CSS_SELECTOR, "span[data-index='0']") | |
| span.click(); time.sleep(0.3) | |
| driver.switch_to.active_element.send_keys(code) | |
| except Exception as e: | |
| log(f"验证码输入失败: {e}", "ERR") | |
| return email, False, None | |
| # 6. 点击验证 | |
| time.sleep(1) | |
| try: | |
| vbtn = driver.find_element(By.XPATH, XPATH["verify_btn"]) | |
| driver.execute_script("arguments[0].click();", vbtn) | |
| except: | |
| for btn in driver.find_elements(By.TAG_NAME, "button"): | |
| if '验证' in btn.text: driver.execute_script("arguments[0].click();", btn); break | |
| log("点击验证") | |
| time.sleep(5) | |
| # 7. 输入姓名 | |
| try: | |
| name_inp = WebDriverWait(driver, 30).until(EC.visibility_of_element_located( | |
| (By.CSS_SELECTOR, "input[formcontrolname='fullName'], input[placeholder='全名'], input#mat-input-0"))) | |
| name = random.choice(NAMES) | |
| name_inp.clear(); time.sleep(0.3) | |
| for c in name: name_inp.send_keys(c); time.sleep(0.03) | |
| log(f"姓名: {name}") | |
| from selenium.webdriver.common.keys import Keys | |
| name_inp.send_keys(Keys.ENTER) | |
| except Exception as e: | |
| log(f"姓名输入异常: {e}", "WARN") | |
| # 8. 等待进入工作台 | |
| log("等待工作台...") | |
| time.sleep(6) | |
| for _ in range(30): | |
| if 'business.gemini.google' in driver.current_url and 'auth' not in driver.current_url: | |
| break | |
| time.sleep(2) | |
| time.sleep(3) | |
| # 9. 保存配置 | |
| config = save_config(email, driver.get_cookies(), driver.current_url) | |
| log(f"注册成功: {email}") | |
| return email, True, config | |
| def create_driver(): | |
| chrome_bin = os.getenv("CHROME_BIN") | |
| chromium_args = [ | |
| "--no-sandbox", | |
| "--disable-dev-shm-usage", | |
| "--disable-gpu", | |
| "--disable-software-rasterizer", | |
| "--window-size=1280,720", | |
| ] | |
| return Driver( | |
| uc=True, | |
| headless=HEADLESS, | |
| browser="chrome", | |
| binary_location=chrome_bin, | |
| chromium_arg=chromium_args, | |
| ) | |
| def register_one_account(): | |
| driver = create_driver() | |
| email = None | |
| screenshot = None | |
| try: | |
| email, ok, cfg = register(driver) | |
| if not ok: | |
| screenshot = save_error_screenshot(driver, email) | |
| return email, ok, cfg, screenshot | |
| except Exception: | |
| screenshot = save_error_screenshot(driver, email) | |
| raise | |
| finally: | |
| try: driver.quit() | |
| except: pass | |
| def main(): | |
| cfg = load_config() | |
| apply_config(cfg) | |
| print(f"\n{'='*50}\nGemini Business 批量注册 - 共 {TOTAL_ACCOUNTS} 个\n{'='*50}\n") | |
| driver = create_driver() | |
| success, fail, accounts = 0, 0, [] | |
| for i in range(TOTAL_ACCOUNTS): | |
| print(f"\n{'#'*40}\n注册 {i+1}/{TOTAL_ACCOUNTS}\n{'#'*40}\n") | |
| try: | |
| driver.current_url # 检查driver是否有效 | |
| except: | |
| driver = create_driver() | |
| email = None | |
| try: | |
| email, ok, cfg = register(driver) | |
| if ok: success += 1; accounts.append((email, cfg)) | |
| else: | |
| save_error_screenshot(driver, email) | |
| fail += 1 | |
| except Exception as e: | |
| log(f"异常: {e}", "ERR"); fail += 1 | |
| save_error_screenshot(driver, email) | |
| try: driver.quit() | |
| except: pass | |
| driver = create_driver() | |
| print(f"\n进度: {i+1}/{TOTAL_ACCOUNTS} | 成功: {success} | 失败: {fail}") | |
| if i < TOTAL_ACCOUNTS - 1: | |
| try: driver.delete_all_cookies() | |
| except: pass | |
| time.sleep(random.randint(3, 5)) | |
| try: driver.quit() | |
| except: pass | |
| print(f"\n{'='*50}\n完成! 成功: {success}, 失败: {fail}\n配置保存在: {OUTPUT_DIR}/\n{'='*50}") | |
| if __name__ == "__main__": | |
| main() | |