autgen / main.py
StarrySkyWorld's picture
refactor: 替换 undetected-chromedriver 为 seleniumbase,并简化 VNC 相关配置
c94202e
from seleniumbase import Driver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs
from datetime import datetime
import time, random, json, os, requests
# 配置
DEFAULT_CONFIG = {
"total_accounts": 20,
"mail_api": "https://mail.chatgpt.org.uk",
"mail_key": "gpt-test",
"output_dir": "gemini_accounts",
"login_url": "https://auth.business.gemini.google/login?continueUrl=https:%2F%2Fbusiness.gemini.google%2F&wiffid=CAoSJDIwNTlhYzBjLTVlMmMtNGUxZC1hY2JkLThmOGY2ZDE0ODM1Mg",
"headless": False
}
TOTAL_ACCOUNTS = DEFAULT_CONFIG["total_accounts"]
MAIL_API = DEFAULT_CONFIG["mail_api"]
MAIL_KEY = DEFAULT_CONFIG["mail_key"]
OUTPUT_DIR = DEFAULT_CONFIG["output_dir"]
LOGIN_URL = DEFAULT_CONFIG["login_url"]
HEADLESS = DEFAULT_CONFIG["headless"]
SCREENSHOT_DIR = "screenshots"
# XPath
XPATH = {
"email_input": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[1]/div[1]/div/span[2]/input",
"continue_btn": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/button",
"verify_btn": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/div[1]/span/div[1]/button",
}
NAMES = ["James Smith", "John Johnson", "Robert Williams", "Michael Brown", "William Jones",
"David Garcia", "Mary Miller", "Patricia Davis", "Jennifer Rodriguez", "Linda Martinez"]
def log(msg, level="INFO"): print(f"[{level}] {msg}")
def load_config(path="config.json"):
cfg = dict(DEFAULT_CONFIG)
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
user_cfg = json.load(f)
if not isinstance(user_cfg, dict):
raise ValueError("config.json 必须是 JSON 对象")
cfg.update(expand_env_vars(user_cfg))
return cfg
def expand_env_vars(value):
if isinstance(value, dict):
return {k: expand_env_vars(v) for k, v in value.items()}
if isinstance(value, list):
return [expand_env_vars(v) for v in value]
if isinstance(value, str):
return os.path.expandvars(value)
return value
def apply_config(cfg):
global TOTAL_ACCOUNTS, MAIL_API, MAIL_KEY, OUTPUT_DIR, LOGIN_URL, HEADLESS
TOTAL_ACCOUNTS = int(cfg.get("total_accounts", TOTAL_ACCOUNTS))
MAIL_API = cfg.get("mail_api", MAIL_API)
MAIL_KEY = cfg.get("mail_key", MAIL_KEY)
OUTPUT_DIR = cfg.get("output_dir", OUTPUT_DIR)
LOGIN_URL = cfg.get("login_url", LOGIN_URL)
HEADLESS = cfg.get("headless", HEADLESS)
def create_email():
"""创建临时邮箱"""
try:
r = requests.get(f"{MAIL_API}/api/generate-email",
headers={"X-API-Key": MAIL_KEY}, timeout=30)
if r.status_code == 200 and r.json().get('success'):
email = r.json()['data']['email']
log(f"邮箱创建: {email}")
return email
except Exception as e:
log(f"创建邮箱失败: {e}", "ERR")
return None
def get_code(email, driver, timeout=30):
"""获取验证码"""
log(f"等待验证码 (最多{timeout}s)...")
start = time.time()
while time.time() - start < timeout:
try:
r = requests.get(f"{MAIL_API}/api/emails", params={"email": email},
headers={"X-API-Key": MAIL_KEY}, timeout=30)
if r.status_code == 200:
emails = r.json().get('data', {}).get('emails', [])
if emails:
html = emails[0].get('html_content') or emails[0].get('content', '')
soup = BeautifulSoup(html, 'html.parser')
span = soup.find('span', class_='verification-code')
if span:
code = span.get_text().strip()
if len(code) == 6:
log(f"验证码: {code}")
return code
except: pass
print(f" 等待中... ({int(time.time()-start)}s)", end='\r')
time.sleep(3)
log("验证码超时", "ERR")
return None
def save_config(email, cookies, url):
"""保存配置"""
os.makedirs(OUTPUT_DIR, exist_ok=True)
parsed = urlparse(url)
path_parts = url.split('/')
config_id = None
for i, p in enumerate(path_parts):
if p == 'cid' and i+1 < len(path_parts):
config_id = path_parts[i+1]
# 清理 config_id 结尾的 ?csesidx=xxx
if config_id and '?' in config_id:
config_id = config_id.split('?')[0]
break
cookie_dict = {c['name']: c for c in cookies}
ses_cookie = cookie_dict.get('__Secure-C_SES', {})
data = {
"id": email,
"csesidx": parse_qs(parsed.query).get('csesidx', [None])[0],
"config_id": config_id,
"secure_c_ses": ses_cookie.get('value'),
"host_c_oses": cookie_dict.get('__Host-C_OSES', {}).get('value'),
"expires_at": datetime.fromtimestamp(ses_cookie.get('expiry', 0) - 43200).strftime('%Y-%m-%d %H:%M:%S') if ses_cookie.get('expiry') else None
}
with open(f"{OUTPUT_DIR}/{email}.json", 'w') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
log(f"配置已保存: {email}.json")
return data
def save_error_screenshot(driver, email):
"""失败时保存截图"""
safe_email = email or "unknown"
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
filename = f"{safe_email}_err.png"
path = os.path.join(SCREENSHOT_DIR, filename)
try:
driver.save_screenshot(path)
log(f"失败截图已保存: {path}", "WARN")
return filename
except Exception as e:
log(f"保存失败截图出错: {e}", "WARN")
return None
def register(driver):
"""注册单个账号"""
email = create_email()
if not email: return None, False, None
wait = WebDriverWait(driver, 60)
# 1. 访问登录页
driver.get(LOGIN_URL)
time.sleep(5)
# 2. 输入邮箱
log("输入邮箱...")
inp = wait.until(EC.visibility_of_element_located((By.XPATH, XPATH["email_input"])))
inp.click(); time.sleep(0.3); inp.clear(); time.sleep(0.3)
for c in email: inp.send_keys(c); time.sleep(0.05)
log(f"邮箱: {email}, 实际值: {inp.get_attribute('value')}")
time.sleep(1)
# 3. 点击继续
btn = wait.until(EC.element_to_be_clickable((By.XPATH, XPATH["continue_btn"])))
driver.execute_script("arguments[0].click();", btn)
log("点击继续")
time.sleep(3)
# 4. 获取验证码
code = get_code(email, driver)
if not code: return email, False, None
# 5. 输入验证码
time.sleep(2)
log(f"输入验证码: {code}")
try:
pin = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "input[name='pinInput']")))
pin.click(); time.sleep(0.2)
for c in code: pin.send_keys(c); time.sleep(0.1)
except:
try:
span = driver.find_element(By.CSS_SELECTOR, "span[data-index='0']")
span.click(); time.sleep(0.3)
driver.switch_to.active_element.send_keys(code)
except Exception as e:
log(f"验证码输入失败: {e}", "ERR")
return email, False, None
# 6. 点击验证
time.sleep(1)
try:
vbtn = driver.find_element(By.XPATH, XPATH["verify_btn"])
driver.execute_script("arguments[0].click();", vbtn)
except:
for btn in driver.find_elements(By.TAG_NAME, "button"):
if '验证' in btn.text: driver.execute_script("arguments[0].click();", btn); break
log("点击验证")
time.sleep(5)
# 7. 输入姓名
try:
name_inp = WebDriverWait(driver, 30).until(EC.visibility_of_element_located(
(By.CSS_SELECTOR, "input[formcontrolname='fullName'], input[placeholder='全名'], input#mat-input-0")))
name = random.choice(NAMES)
name_inp.clear(); time.sleep(0.3)
for c in name: name_inp.send_keys(c); time.sleep(0.03)
log(f"姓名: {name}")
from selenium.webdriver.common.keys import Keys
name_inp.send_keys(Keys.ENTER)
except Exception as e:
log(f"姓名输入异常: {e}", "WARN")
# 8. 等待进入工作台
log("等待工作台...")
time.sleep(6)
for _ in range(30):
if 'business.gemini.google' in driver.current_url and 'auth' not in driver.current_url:
break
time.sleep(2)
time.sleep(3)
# 9. 保存配置
config = save_config(email, driver.get_cookies(), driver.current_url)
log(f"注册成功: {email}")
return email, True, config
def create_driver():
chrome_bin = os.getenv("CHROME_BIN")
chromium_args = [
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--disable-software-rasterizer",
"--window-size=1280,720",
]
return Driver(
uc=True,
headless=HEADLESS,
browser="chrome",
binary_location=chrome_bin,
chromium_arg=chromium_args,
)
def register_one_account():
driver = create_driver()
email = None
screenshot = None
try:
email, ok, cfg = register(driver)
if not ok:
screenshot = save_error_screenshot(driver, email)
return email, ok, cfg, screenshot
except Exception:
screenshot = save_error_screenshot(driver, email)
raise
finally:
try: driver.quit()
except: pass
def main():
cfg = load_config()
apply_config(cfg)
print(f"\n{'='*50}\nGemini Business 批量注册 - 共 {TOTAL_ACCOUNTS} 个\n{'='*50}\n")
driver = create_driver()
success, fail, accounts = 0, 0, []
for i in range(TOTAL_ACCOUNTS):
print(f"\n{'#'*40}\n注册 {i+1}/{TOTAL_ACCOUNTS}\n{'#'*40}\n")
try:
driver.current_url # 检查driver是否有效
except:
driver = create_driver()
email = None
try:
email, ok, cfg = register(driver)
if ok: success += 1; accounts.append((email, cfg))
else:
save_error_screenshot(driver, email)
fail += 1
except Exception as e:
log(f"异常: {e}", "ERR"); fail += 1
save_error_screenshot(driver, email)
try: driver.quit()
except: pass
driver = create_driver()
print(f"\n进度: {i+1}/{TOTAL_ACCOUNTS} | 成功: {success} | 失败: {fail}")
if i < TOTAL_ACCOUNTS - 1:
try: driver.delete_all_cookies()
except: pass
time.sleep(random.randint(3, 5))
try: driver.quit()
except: pass
print(f"\n{'='*50}\n完成! 成功: {success}, 失败: {fail}\n配置保存在: {OUTPUT_DIR}/\n{'='*50}")
if __name__ == "__main__":
main()