ehhhh / main.py
StarrySkyWorld's picture
feat(main): 替换SeleniumBase为Playwright并增加超时设置
86d4562
from camoufox.sync_api import Camoufox
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs
from datetime import datetime
import time, random, json, os, requests
print("Main module loaded")
# 配置
DEFAULT_CONFIG = {
"total_accounts": 20,
"mail_api": "https://mail.chatgpt.org.uk",
"mail_key": "gpt-test",
"output_dir": "gemini_accounts",
"login_url": "https://auth.business.gemini.google/login?continueUrl=https:%2F%2Fbusiness.gemini.google%2F&wiffid=CAoSJDIwNTlhYzBjLTVlMmMtNGUxZC1hY2JkLThmOGY2ZDE0ODM1Mg",
"headless": True,
"window_size": "1366,768",
"lang": "zh-CN",
"timezone": "Asia/Shanghai",
"nav_timeout_ms": 120000,
"config_wait_ms": 30000
}
TOTAL_ACCOUNTS = DEFAULT_CONFIG["total_accounts"]
MAIL_API = DEFAULT_CONFIG["mail_api"]
MAIL_KEY = DEFAULT_CONFIG["mail_key"]
OUTPUT_DIR = DEFAULT_CONFIG["output_dir"]
LOGIN_URL = DEFAULT_CONFIG["login_url"]
HEADLESS = DEFAULT_CONFIG["headless"]
WINDOW_SIZE = DEFAULT_CONFIG["window_size"]
BROWSER_LANG = DEFAULT_CONFIG["lang"]
TIMEZONE = DEFAULT_CONFIG["timezone"]
NAV_TIMEOUT_MS = DEFAULT_CONFIG["nav_timeout_ms"]
CONFIG_WAIT_MS = DEFAULT_CONFIG["config_wait_ms"]
SCREENSHOT_DIR = "screenshots"
# XPath
XPATH = {
"email_input": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[1]/div[1]/div/span[2]/input",
"continue_btn": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/button",
"verify_btn": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/div[1]/span/div[1]/button",
}
NAMES = ["James Smith", "John Johnson", "Robert Williams", "Michael Brown", "William Jones",
"David Garcia", "Mary Miller", "Patricia Davis", "Jennifer Rodriguez", "Linda Martinez"]
def parse_window_size(value, fallback=(1366, 768)):
try:
parts = str(value).lower().replace("x", ",").split(",")
width = int(parts[0].strip())
height = int(parts[1].strip())
return width, height
except Exception:
return fallback
class CamoufoxSession:
def __init__(self, launch_options, context_options):
self.camoufox = Camoufox(**launch_options)
self.browser = self.camoufox.__enter__()
self.context = self.browser.new_context(**context_options)
self.page = self.context.new_page()
def clear_cookies(self):
try:
self.context.clear_cookies()
except Exception:
pass
def close(self):
try:
self.context.close()
except Exception:
pass
try:
self.camoufox.__exit__(None, None, None)
except Exception:
pass
def log(msg, level="INFO"): print(f"[{level}] {msg}")
def load_config(path="config.json"):
cfg = dict(DEFAULT_CONFIG)
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
user_cfg = json.load(f)
if not isinstance(user_cfg, dict):
raise ValueError("config.json 必须是 JSON 对象")
cfg.update(expand_env_vars(user_cfg))
return cfg
def expand_env_vars(value):
if isinstance(value, dict):
return {k: expand_env_vars(v) for k, v in value.items()}
if isinstance(value, list):
return [expand_env_vars(v) for v in value]
if isinstance(value, str):
return os.path.expandvars(value)
return value
def apply_config(cfg):
global TOTAL_ACCOUNTS, MAIL_API, MAIL_KEY, OUTPUT_DIR, LOGIN_URL, HEADLESS, WINDOW_SIZE, BROWSER_LANG, TIMEZONE, NAV_TIMEOUT_MS, CONFIG_WAIT_MS
TOTAL_ACCOUNTS = int(cfg.get("total_accounts", TOTAL_ACCOUNTS))
MAIL_API = cfg.get("mail_api", MAIL_API)
MAIL_KEY = cfg.get("mail_key", MAIL_KEY)
OUTPUT_DIR = cfg.get("output_dir", OUTPUT_DIR)
LOGIN_URL = cfg.get("login_url", LOGIN_URL)
HEADLESS = True
WINDOW_SIZE = cfg.get("window_size", WINDOW_SIZE)
BROWSER_LANG = cfg.get("lang", BROWSER_LANG)
TIMEZONE = cfg.get("timezone", TIMEZONE)
NAV_TIMEOUT_MS = int(cfg.get("nav_timeout_ms", NAV_TIMEOUT_MS))
CONFIG_WAIT_MS = int(cfg.get("config_wait_ms", CONFIG_WAIT_MS))
def create_email():
"""创建临时邮箱"""
try:
r = requests.get(f"{MAIL_API}/api/generate-email",
headers={"X-API-Key": MAIL_KEY}, timeout=30)
if r.status_code == 200 and r.json().get('success'):
email = r.json()['data']['email']
log(f"邮箱创建: {email}")
return email
except Exception as e:
log(f"创建邮箱失败: {e}", "ERR")
return None
def get_code(email, timeout=30):
"""获取验证码"""
log(f"等待验证码 (最多{timeout}s)...")
start = time.time()
while time.time() - start < timeout:
try:
r = requests.get(f"{MAIL_API}/api/emails", params={"email": email},
headers={"X-API-Key": MAIL_KEY}, timeout=30)
if r.status_code == 200:
emails = r.json().get('data', {}).get('emails', [])
if emails:
html = emails[0].get('html_content') or emails[0].get('content', '')
soup = BeautifulSoup(html, 'html.parser')
span = soup.find('span', class_='verification-code')
if span:
code = span.get_text().strip()
if len(code) == 6:
log(f"验证码: {code}")
return code
except: pass
print(f" 等待中... ({int(time.time()-start)}s)", end='\r')
time.sleep(3)
log("验证码超时", "ERR")
return None
def save_config(email, page, context, timeout_ms=None):
"""保存配置,轮询等待关键字段"""
os.makedirs(OUTPUT_DIR, exist_ok=True)
wait_ms = CONFIG_WAIT_MS if timeout_ms is None else timeout_ms
log(f"等待配置数据 (最多{wait_ms}ms)...")
start = time.time()
while (time.time() - start) * 1000 < wait_ms:
url = page.url
parsed = urlparse(url)
path_parts = url.split('/')
config_id = None
for i, p in enumerate(path_parts):
if p == 'cid' and i+1 < len(path_parts):
config_id = path_parts[i+1]
if config_id and '?' in config_id:
config_id = config_id.split('?')[0]
break
cookies = context.cookies()
cookie_dict = {c['name']: c for c in cookies}
ses_cookie = cookie_dict.get('__Secure-C_SES', {})
host_cookie = cookie_dict.get('__Host-C_OSES', {})
csesidx = parse_qs(parsed.query).get('csesidx', [None])[0]
if ses_cookie.get('value') and host_cookie.get('value') and csesidx and config_id:
expiry = ses_cookie.get('expiry', None)
if expiry is None:
expiry = ses_cookie.get('expires', None)
expires_at = None
if expiry and expiry > 0:
expires_at = datetime.fromtimestamp(expiry - 43200).strftime('%Y-%m-%d %H:%M:%S')
data = {
"id": email,
"csesidx": csesidx,
"config_id": config_id,
"secure_c_ses": ses_cookie.get('value'),
"host_c_oses": host_cookie.get('value'),
"expires_at": expires_at
}
log(f"配置数据已就绪 ({time.time() - start:.1f}s)")
with open(f"{OUTPUT_DIR}/{email}.json", 'w') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
log(f"配置已保存: {email}.json")
return data
time.sleep(1)
log(f"保存配置: url={page.url}")
log(f"Cookie键: {list({c['name'] for c in context.cookies()})}")
parsed = urlparse(page.url)
missing = []
cookie_dict = {c['name']: c for c in context.cookies()}
if not cookie_dict.get('__Secure-C_SES', {}).get('value'): missing.append('secure_c_ses')
if not cookie_dict.get('__Host-C_OSES', {}).get('value'): missing.append('host_c_oses')
if not parse_qs(parsed.query).get('csesidx', [None])[0]: missing.append('csesidx')
if '/cid/' not in page.url: missing.append('config_id')
log(f"配置不完整,缺失字段: {', '.join(missing)},跳过: {email}", "WARN")
return None
def save_error_screenshot(page, email):
"""失败时保存截图"""
safe_email = email or "unknown"
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
filename = f"{safe_email}_err.png"
path = os.path.join(SCREENSHOT_DIR, filename)
try:
page.screenshot(path=path, full_page=True)
log(f"失败截图已保存: {path}", "WARN")
return filename
except Exception as e:
log(f"保存失败截图出错: {e}", "WARN")
return None
def register(page, context):
"""注册单个账号"""
email = create_email()
if not email: return None, False, None
# 1. 访问登录页
log(f"访问登录页: {LOGIN_URL}")
page.goto(LOGIN_URL, wait_until="domcontentloaded", timeout=NAV_TIMEOUT_MS)
log(f"页面已加载: {page.url}")
time.sleep(5)
# 2. 输入邮箱
log("输入邮箱...")
inp = page.locator(f"xpath={XPATH['email_input']}")
inp.wait_for(state="visible", timeout=60000)
inp.click()
time.sleep(0.3)
inp.fill("")
page.keyboard.type(email, delay=50)
actual_value = inp.input_value()
log(f"邮箱: {email}, 实际值: {actual_value}")
time.sleep(1)
# 3. 点击继续
page.locator(f"xpath={XPATH['continue_btn']}").click()
log("点击继续")
time.sleep(3)
log(f"继续后URL: {page.url}")
# 4. 获取验证码
code = get_code(email)
if not code: return email, False, None
# 5. 输入验证码
time.sleep(2)
log(f"输入验证码: {code}")
try:
pin = page.wait_for_selector("input[name='pinInput']", timeout=60000)
pin.click()
time.sleep(0.2)
page.keyboard.type(code, delay=100)
except PlaywrightTimeoutError:
try:
page.locator("span[data-index='0']").click()
time.sleep(0.3)
page.keyboard.type(code, delay=100)
except Exception as e:
log(f"验证码输入失败: {e}", "ERR")
return email, False, None
# 6. 点击验证
time.sleep(1)
try:
page.locator(f"xpath={XPATH['verify_btn']}").click()
except Exception:
try:
page.locator("button", has_text="验证").first.click()
except Exception:
pass
log("点击验证")
time.sleep(5)
log(f"验证后URL: {page.url}")
# 7. 输入姓名
try:
name_inp = page.wait_for_selector(
"input[formcontrolname='fullName'], input[placeholder='全名'], input#mat-input-0",
timeout=30000,
)
name = random.choice(NAMES)
name_inp.fill("")
time.sleep(0.3)
page.keyboard.type(name, delay=30)
log(f"姓名: {name}")
name_inp.press("Enter")
except Exception as e:
log(f"姓名输入异常: {e}", "WARN")
# 8. 等待进入工作台
log("等待工作台...")
time.sleep(6)
for _ in range(30):
if 'business.gemini.google' in page.url and 'auth' not in page.url:
break
time.sleep(2)
time.sleep(3)
log(f"最终URL: {page.url}")
# 9. 保存配置
config = save_config(email, page, context)
if config:
log(f"注册成功: {email}")
return email, True, config
return email, False, None
def create_session():
if TIMEZONE:
os.environ["TZ"] = TIMEZONE
if hasattr(time, "tzset"):
time.tzset()
window_width, window_height = parse_window_size(WINDOW_SIZE)
camoufox_exec = os.getenv("CAMOUFOX_EXECUTABLE")
launch_options = {
"headless": HEADLESS,
"window": (window_width, window_height),
}
if BROWSER_LANG:
launch_options["locale"] = BROWSER_LANG
if camoufox_exec:
launch_options["executable_path"] = camoufox_exec
context_options = {
"viewport": {"width": window_width, "height": window_height},
}
if BROWSER_LANG:
context_options["locale"] = BROWSER_LANG
if TIMEZONE:
context_options["timezone_id"] = TIMEZONE
session = CamoufoxSession(launch_options, context_options)
session.page.set_default_timeout(NAV_TIMEOUT_MS)
session.page.set_default_navigation_timeout(NAV_TIMEOUT_MS)
return session
def register_one_account():
session = create_session()
email = None
screenshot = None
try:
email, ok, cfg = register(session.page, session.context)
if not ok:
screenshot = save_error_screenshot(session.page, email)
return email, ok, cfg, screenshot
except Exception:
screenshot = save_error_screenshot(session.page, email)
raise
finally:
session.close()
def main():
cfg = load_config()
apply_config(cfg)
print(f"\n{'='*50}\nGemini Business 批量注册 - 共 {TOTAL_ACCOUNTS} 个\n{'='*50}\n")
session = create_session()
success, fail, accounts = 0, 0, []
for i in range(TOTAL_ACCOUNTS):
print(f"\n{'#'*40}\n注册 {i+1}/{TOTAL_ACCOUNTS}\n{'#'*40}\n")
try:
if session.page.is_closed():
raise RuntimeError("page已关闭")
except:
session.close()
session = create_session()
email = None
try:
email, ok, cfg = register(session.page, session.context)
if ok: success += 1; accounts.append((email, cfg))
else:
save_error_screenshot(session.page, email)
fail += 1
except Exception as e:
log(f"异常: {e}", "ERR"); fail += 1
save_error_screenshot(session.page, email)
session.close()
session = create_session()
print(f"\n进度: {i+1}/{TOTAL_ACCOUNTS} | 成功: {success} | 失败: {fail}")
if i < TOTAL_ACCOUNTS - 1:
session.clear_cookies()
time.sleep(random.randint(3, 5))
session.close()
print(f"\n{'='*50}\n完成! 成功: {success}, 失败: {fail}\n配置保存在: {OUTPUT_DIR}/\n{'='*50}")
if __name__ == "__main__":
main()