|
|
|
|
| """
|
| OpenAI 自动注册脚本(异步最终版,含弹窗关闭)
|
| """
|
| import json
|
| import os
|
| import re
|
| import sys
|
| import time
|
| import random
|
| import string
|
| import secrets
|
| import hashlib
|
| import base64
|
| import argparse
|
| import asyncio
|
| from pathlib import Path
|
| from datetime import datetime, timedelta
|
| from dataclasses import dataclass
|
| from typing import Any, Dict, Optional, List, Tuple
|
|
|
| import urllib.parse
|
| import urllib.request
|
| import urllib.error
|
|
|
| from curl_cffi import requests
|
| from playwright.async_api import async_playwright
|
|
|
|
|
|
|
|
|
| OUT_DIR = Path(__file__).parent.resolve()
|
| TOKEN_DIR = OUT_DIR / "tokens"
|
| TOKEN_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
| AUTH_URL = "https://auth.openai.com/oauth/authorize"
|
| TOKEN_URL = "https://auth.openai.com/oauth/token"
|
| CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
|
| DEFAULT_REDIRECT_URI = "http://localhost:1455/auth/callback"
|
| DEFAULT_SCOPE = "openid email profile offline_access"
|
|
|
|
|
|
|
|
|
| def _gen_password() -> str:
|
| alphabet = string.ascii_letters + string.digits
|
| special = "!@#$%^&*.-"
|
| base = [
|
| random.choice(string.ascii_lowercase),
|
| random.choice(string.ascii_uppercase),
|
| random.choice(string.digits),
|
| random.choice(special),
|
| ]
|
| base += [random.choice(alphabet + special) for _ in range(12)]
|
| random.shuffle(base)
|
| return "".join(base)
|
|
|
| def _random_name() -> str:
|
| letters = string.ascii_lowercase
|
| n = random.randint(5, 9)
|
| s = ''.join(random.choice(letters) for _ in range(n))
|
| return s.capitalize()
|
|
|
| def _random_birthdate() -> str:
|
| start = datetime(1970,1,1); end = datetime(1999,12,31)
|
| delta = end - start
|
| d = start + timedelta(days=random.randrange(delta.days + 1))
|
| return d.strftime('%Y-%m-%d')
|
|
|
| def _b64url_no_pad(raw: bytes) -> str:
|
| return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
|
|
|
| def _sha256_b64url_no_pad(s: str) -> str:
|
| return _b64url_no_pad(hashlib.sha256(s.encode("ascii")).digest())
|
|
|
| def _random_state(nbytes: int = 16) -> str:
|
| return secrets.token_urlsafe(nbytes)
|
|
|
| def _pkce_verifier() -> str:
|
| return secrets.token_urlsafe(64)
|
|
|
| def _parse_callback_url(callback_url: str) -> Dict[str, Any]:
|
| candidate = callback_url.strip()
|
| if not candidate:
|
| return {"code": "","state": "","error": "","error_description": ""}
|
| if "://" not in candidate:
|
| if candidate.startswith("?"):
|
| candidate = f"http://localhost{candidate}"
|
| elif any(ch in candidate for ch in "/?#") or ":" in candidate:
|
| candidate = f"http://{candidate}"
|
| elif "=" in candidate:
|
| candidate = f"http://localhost/?{candidate}"
|
| parsed = urllib.parse.urlparse(candidate)
|
| query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True)
|
| fragment = urllib.parse.parse_qs(parsed.fragment, keep_blank_values=True)
|
| for key, values in fragment.items():
|
| if key not in query or not query[key] or not (query[key][0] or "").strip():
|
| query[key] = values
|
| def get1(k: str) -> str:
|
| v = query.get(k, [""])
|
| return (v[0] or "").strip()
|
| code = get1("code"); state = get1("state")
|
| error = get1("error"); error_description = get1("error_description")
|
| if code and not state and "#" in code:
|
| code, state = code.split("#",1)
|
| if not error and error_description:
|
| error, error_description = error_description, ""
|
| return {"code": code,"state": state,"error": error,"error_description": error_description}
|
|
|
| def _jwt_claims_no_verify(id_token: str) -> Dict[str, Any]:
|
| if not id_token or id_token.count(".") < 2:
|
| return {}
|
| payload_b64 = id_token.split(".")[1]
|
| pad = "=" * ((4 - (len(payload_b64) % 4)) % 4)
|
| try:
|
| payload = base64.urlsafe_b64decode((payload_b64 + pad).encode("ascii"))
|
| return json.loads(payload.decode("utf-8"))
|
| except Exception:
|
| return {}
|
|
|
| def _decode_jwt_segment(seg: str) -> Dict[str, Any]:
|
| raw = (seg or "").strip()
|
| if not raw: return {}
|
| pad = "=" * ((4 - (len(raw) % 4)) % 4)
|
| try:
|
| decoded = base64.urlsafe_b64decode((raw + pad).encode("ascii"))
|
| return json.loads(decoded.decode("utf-8"))
|
| except Exception:
|
| return {}
|
|
|
| def _to_int(v: Any) -> int:
|
| try: return int(v)
|
| except (TypeError, ValueError): return 0
|
|
|
| def _post_form(url: str, data: Dict[str, str], timeout: int = 30) -> Dict[str, Any]:
|
| body = urllib.parse.urlencode(data).encode("utf-8")
|
| req = urllib.request.Request(
|
| url, data=body, method="POST",
|
| headers={"Content-Type": "application/x-www-form-urlencoded","Accept": "application/json"},
|
| )
|
| try:
|
| with urllib.request.urlopen(req, timeout=timeout) as resp:
|
| raw = resp.read()
|
| if resp.status != 200:
|
| raise RuntimeError(f"token exchange failed: {resp.status}: {raw.decode('utf-8','replace')}")
|
| return json.loads(raw.decode("utf-8"))
|
| except urllib.error.HTTPError as exc:
|
| raw = exc.read()
|
| raise RuntimeError(f"token exchange failed: {exc.code}: {raw.decode('utf-8','replace')}") from exc
|
|
|
| @dataclass(frozen=True)
|
| class OAuthStart:
|
| auth_url: str
|
| state: str
|
| code_verifier: str
|
| redirect_uri: str
|
|
|
| def generate_oauth_url(*, redirect_uri: str = DEFAULT_REDIRECT_URI, scope: str = DEFAULT_SCOPE) -> OAuthStart:
|
| state = _random_state()
|
| code_verifier = _pkce_verifier()
|
| code_challenge = _sha256_b64url_no_pad(code_verifier)
|
| params = {
|
| "client_id": CLIENT_ID,
|
| "response_type": "code",
|
| "redirect_uri": redirect_uri,
|
| "scope": scope,
|
| "state": state,
|
| "code_challenge": code_challenge,
|
| "code_challenge_method": "S256",
|
| "prompt": "login",
|
| "id_token_add_organizations": "true",
|
| "codex_cli_simplified_flow": "true",
|
| }
|
| auth_url = f"{AUTH_URL}?{urllib.parse.urlencode(params)}"
|
| return OAuthStart(auth_url=auth_url, state=state, code_verifier=code_verifier, redirect_uri=redirect_uri)
|
|
|
| def fetch_sentinel_token(*, flow: str, did: str, proxies: Any = None) -> Optional[str]:
|
| try:
|
| body = json.dumps({"p": "", "id": did, "flow": flow})
|
| resp = requests.post(
|
| "https://sentinel.openai.com/backend-api/sentinel/req",
|
| headers={
|
| "origin": "https://sentinel.openai.com",
|
| "referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6",
|
| "content-type": "text/plain;charset=UTF-8",
|
| },
|
| data=body,
|
| proxies=proxies,
|
| impersonate="chrome",
|
| timeout=15,
|
| )
|
| if resp.status_code != 200:
|
| print(f"[Error] Sentinel flow={flow} 状态码: {resp.status_code}")
|
| try:
|
| print(resp.text)
|
| except Exception:
|
| pass
|
| return None
|
| return resp.json().get("token")
|
| except Exception as e:
|
| print(f"[Error] Sentinel flow={flow} 获取失败: {e}")
|
| return None
|
|
|
| def submit_callback_url(*, callback_url: str, expected_state: str, code_verifier: str, redirect_uri: str = DEFAULT_REDIRECT_URI) -> str:
|
| cb = _parse_callback_url(callback_url)
|
| if cb["error"]:
|
| desc = cb["error_description"]
|
| raise RuntimeError(f"oauth error: {cb['error']}: {desc}".strip())
|
| if not cb["code"]:
|
| raise ValueError("callback url missing ?code=")
|
| if not cb["state"]:
|
| raise ValueError("callback url missing ?state=")
|
| if cb["state"] != expected_state:
|
| raise ValueError("state mismatch")
|
|
|
| token_resp = _post_form(
|
| TOKEN_URL,
|
| {
|
| "grant_type": "authorization_code",
|
| "client_id": CLIENT_ID,
|
| "code": cb["code"],
|
| "redirect_uri": redirect_uri,
|
| "code_verifier": code_verifier,
|
| },
|
| )
|
| access_token = (token_resp.get("access_token") or "").strip()
|
| refresh_token = (token_resp.get("refresh_token") or "").strip()
|
| id_token = (token_resp.get("id_token") or "").strip()
|
| expires_in = _to_int(token_resp.get("expires_in"))
|
|
|
| claims = _jwt_claims_no_verify(id_token)
|
| email = str(claims.get("email") or "").strip()
|
| auth_claims = claims.get("https://api.openai.com/auth") or {}
|
| account_id = str(auth_claims.get("chatgpt_account_id") or "").strip()
|
|
|
| now = int(time.time())
|
| expired_rfc3339 = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + max(expires_in, 0)))
|
| now_rfc3339 = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now))
|
|
|
| config = {
|
| "id_token": id_token,
|
| "access_token": access_token,
|
| "refresh_token": refresh_token,
|
| "account_id": account_id,
|
| "last_refresh": now_rfc3339,
|
| "email": email,
|
| "type": "codex",
|
| "expired": expired_rfc3339,
|
| }
|
| return json.dumps(config, ensure_ascii=False, separators=(",", ":"))
|
|
|
|
|
|
|
|
|
| async def generate_email_by_click(mail_page) -> Tuple[str, str]:
|
| """关闭“我知道了”弹窗,然后点击生成按钮,返回 (email, token)"""
|
| try:
|
| close_btn = await mail_page.query_selector('button.announcement-close-btn, button:has-text("我知道了")')
|
| if close_btn:
|
| is_visible = await close_btn.is_visible()
|
| if is_visible:
|
| await close_btn.click()
|
| print("[*] 已点击“我知道了”关闭按钮")
|
| await asyncio.sleep(1)
|
| except Exception as e:
|
| print(f"[*] 关闭弹窗异常(可忽略): {e}")
|
|
|
| button_selectors = [
|
| 'button.action-btn',
|
| 'button:has-text("随机生成")',
|
| 'button:has-text("Generate")',
|
| 'button:has-text("刷新")',
|
| 'button:has-text("新邮箱")',
|
| '.generate-btn',
|
| '#generate'
|
| ]
|
| button = None
|
| for selector in button_selectors:
|
| try:
|
| button = await mail_page.wait_for_selector(selector, timeout=5000)
|
| print(f"找到按钮: {selector}")
|
| break
|
| except:
|
| continue
|
|
|
| if not button:
|
| raise RuntimeError("未找到生成按钮,请检查页面元素")
|
|
|
| async with mail_page.expect_response(lambda response: "/api/generate-email" in response.url and response.status == 200) as response_info:
|
| await button.click()
|
| response = await response_info.value
|
| data = await response.json()
|
| if data.get('success'):
|
| email = data['data']['email']
|
| token = data['auth']['token']
|
| return email, token
|
| else:
|
| raise RuntimeError(f"生成邮箱失败: {data.get('error')}")
|
|
|
| async def playwright_fetch_code(mail_page, email: str, token: str, timeout: int = 180) -> str:
|
| """在邮箱页面轮询获取验证码,返回验证码字符串(增强版)"""
|
| try:
|
| await mail_page.evaluate("""
|
| if (typeof refreshInbox === 'function') {
|
| refreshInbox();
|
| }
|
| """)
|
| refresh_btn = await mail_page.query_selector('button:has-text("手动刷新")')
|
| if refresh_btn:
|
| await refresh_btn.click()
|
| print("[*] 点击了手动刷新按钮")
|
| await asyncio.sleep(2)
|
| except Exception as e:
|
| print(f"[*] 首次刷新异常: {e}")
|
|
|
| start_time = time.time()
|
| attempt = 0
|
| while time.time() - start_time < timeout:
|
| attempt += 1
|
| try:
|
| await mail_page.evaluate("refreshInbox?.()")
|
| refresh_btn = await mail_page.query_selector('button:has-text("手动刷新")')
|
| if refresh_btn:
|
| await refresh_btn.click()
|
| except:
|
| pass
|
|
|
| result = await mail_page.evaluate("""
|
| async ([email, token]) => {
|
| try {
|
| const response = await fetch(`/api/emails?email=${encodeURIComponent(email)}`, {
|
| method: 'GET',
|
| headers: {
|
| 'Accept': 'application/json',
|
| 'X-Inbox-Token': token
|
| }
|
| });
|
| if (!response.ok) {
|
| return { success: false, error: `HTTP ${response.status}` };
|
| }
|
| return await response.json();
|
| } catch (error) {
|
| return { success: false, error: error.toString() };
|
| }
|
| }
|
| """, [email, token])
|
|
|
| if not result.get('success'):
|
| print(f"[*] 第 {attempt} 次获取邮件列表失败: {result.get('error')}")
|
| await asyncio.sleep(3)
|
| continue
|
|
|
| emails = result.get('data', {}).get('emails', [])
|
| emails.sort(key=lambda x: x.get('timestamp', 0), reverse=True)
|
| print(f"[*] 第 {attempt} 次获取到 {len(emails)} 封邮件")
|
|
|
| for idx, mail in enumerate(emails):
|
| subject = mail.get('subject', '')
|
| content = mail.get('html_content', '') or mail.get('content', '') or mail.get('text', '')
|
| print(f" 邮件 {idx+1} 主题: {subject[:50]}")
|
| print(f" 内容预览: {content[:100].replace(chr(10), ' ')}")
|
|
|
| patterns = [
|
| (r'(?<!\d)(\d{6})(?!\d)', 1),
|
| (r'代码为\s*(\d{6})', 1),
|
| (r'verification code[:\s]*(\d{6})', 1),
|
| (r'is[:\s]*(\d{6})', 1),
|
| (r'<[^>]*>(\d)[^>]*>(\d)[^>]*>(\d)[^>]*>(\d)[^>]*>(\d)[^>]*>(\d)', 6),
|
| ]
|
| for pattern, group_count in patterns:
|
| if group_count == 6:
|
| match = re.search(pattern, content, re.DOTALL)
|
| if match:
|
| code = ''.join(match.groups())
|
| if len(code) == 6 and code.isdigit():
|
| print(f"[✓] 从分隔数字中提取验证码: {code}")
|
| return code
|
| else:
|
| match = re.search(pattern, subject + ' ' + content, re.IGNORECASE)
|
| if match:
|
| code = match.group(1)
|
| if len(code) == 6 and code.isdigit():
|
| print(f"[✓] 提取验证码: {code}")
|
| return code
|
|
|
| print("[*] 未找到验证码,等待后重试...")
|
| await asyncio.sleep(3)
|
|
|
| return ""
|
|
|
|
|
|
|
|
|
| async def async_run_registration(proxy: Optional[str]) -> Optional[Tuple[str, str, str]]:
|
| proxies = None
|
| if proxy:
|
| proxies = {"http": proxy, "https": proxy}
|
|
|
| async with async_playwright() as p:
|
| launch_args = [
|
| '--disable-blink-features=AutomationControlled',
|
| '--no-sandbox',
|
| '--disable-gpu',
|
| '--disable-dev-shm-usage',
|
| ]
|
| browser = await p.chromium.launch(headless=True, args=launch_args)
|
| context_kwargs = {}
|
| if proxy:
|
| context_kwargs["proxy"] = {"server": proxy}
|
| context = await browser.new_context(**context_kwargs)
|
| mail_page = await context.new_page()
|
|
|
| await mail_page.goto("https://mail.chatgpt.org.uk/", timeout=60000)
|
| await mail_page.wait_for_load_state("networkidle")
|
|
|
| email, token = await generate_email_by_click(mail_page)
|
| print(f"[*] 成功获取临时邮箱: {email}")
|
|
|
| s = requests.Session(proxies=proxies, impersonate="chrome")
|
| s.headers.update({"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36"})
|
|
|
| try:
|
| trace = s.get("https://cloudflare.com/cdn-cgi/trace", timeout=10)
|
| loc_re = re.search(r"^loc=(.+)$", trace.text, re.MULTILINE)
|
| loc = loc_re.group(1) if loc_re else None
|
| print(f"[*] 当前 IP 所在地: {loc}")
|
| if loc in ("CN", "HK"):
|
| raise RuntimeError("代理所在地不支持 CN/HK")
|
| except Exception as e:
|
| print(f"[Error] 网络连接检查失败: {e}")
|
| await browser.close()
|
| return None
|
|
|
| oauth = generate_oauth_url()
|
| url = oauth.auth_url
|
|
|
| try:
|
| resp = s.get(url, timeout=15)
|
| did = s.cookies.get("oai-did")
|
| print(f"[*] Device ID: {did}")
|
|
|
| signup_body = json.dumps({"username": {"value": email, "kind": "email"}, "screen_hint": "signup"})
|
| sen_req_body = json.dumps({"p": "", "id": did, "flow": "authorize_continue"})
|
|
|
| sen_resp = requests.post(
|
| "https://sentinel.openai.com/backend-api/sentinel/req",
|
| headers={
|
| "origin": "https://sentinel.openai.com",
|
| "referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6",
|
| "content-type": "text/plain;charset=UTF-8",
|
| "user-agent": s.headers["user-agent"],
|
| },
|
| data=sen_req_body,
|
| proxies=proxies,
|
| impersonate="chrome",
|
| timeout=15,
|
| )
|
| print(f"[*] sentinel authorize_continue 状态: {sen_resp.status_code}")
|
| if sen_resp.status_code != 200:
|
| print(f"[Error] Sentinel 异常拦截")
|
| await browser.close()
|
| return None
|
|
|
| sen_token = sen_resp.json().get("token")
|
| sentinel = json.dumps({"p": "", "t": "", "c": sen_token, "id": did, "flow": "authorize_continue"}) if sen_token else None
|
|
|
| so_token = fetch_sentinel_token(flow="oauth_create_account", did=did, proxies=proxies)
|
| print(f"[*] sentinel oauth_create_account token: {bool(so_token)}")
|
|
|
| signup_headers = {
|
| "referer": "https://auth.openai.com/create-account",
|
| "accept": "application/json",
|
| "content-type": "application/json",
|
| }
|
| if sentinel:
|
| signup_headers["openai-sentinel-token"] = sentinel
|
| signup_resp = s.post(
|
| "https://auth.openai.com/api/accounts/authorize/continue",
|
| headers=signup_headers,
|
| data=signup_body,
|
| )
|
| print(f"[*] authorize/continue 状态: {signup_resp.status_code}")
|
| if signup_resp.status_code != 200:
|
| print("[Error] 注册表单提交失败")
|
| await browser.close()
|
| return None
|
|
|
| password = _gen_password()
|
| register_headers = {
|
| "referer": "https://auth.openai.com/create-account/password",
|
| "accept": "application/json",
|
| "content-type": "application/json",
|
| }
|
| if sentinel:
|
| register_headers["openai-sentinel-token"] = sentinel
|
| reg_resp = s.post(
|
| "https://auth.openai.com/api/accounts/user/register",
|
| headers=register_headers,
|
| data=json.dumps({"password": password, "username": email}),
|
| )
|
| print(f"[*] 设置密码状态: {reg_resp.status_code}")
|
| if reg_resp.status_code != 200:
|
| print("[Error] 设置密码失败")
|
| await browser.close()
|
| return None
|
|
|
| for attempt in range(3):
|
| try:
|
| send_headers = {
|
| "referer": "https://auth.openai.com/create-account/password",
|
| "accept": "application/json",
|
| }
|
| send_resp = s.get(
|
| "https://auth.openai.com/api/accounts/email-otp/send",
|
| headers=send_headers,
|
| timeout=30,
|
| )
|
| print(f"[*] 触发发送验证码状态: {send_resp.status_code}")
|
| if send_resp.status_code == 200:
|
| break
|
| except Exception as e:
|
| print(f"[Warn] send 调用异常 (尝试 {attempt+1}/3): {e}")
|
| await asyncio.sleep(2)
|
| else:
|
| print("[Error] 触发验证码发送失败,重试耗尽")
|
| await browser.close()
|
| return None
|
|
|
| code = await playwright_fetch_code(mail_page, email, token, timeout=180)
|
| if not code:
|
| print("[Error] 未能收到验证码")
|
| await browser.close()
|
| return None
|
| print(f"[*] 收到验证码: {code}")
|
|
|
| code_body = json.dumps({"code": code})
|
| validate_headers = {
|
| "referer": "https://auth.openai.com/email-verification",
|
| "accept": "application/json",
|
| "content-type": "application/json",
|
| }
|
| if sentinel:
|
| validate_headers["openai-sentinel-token"] = sentinel
|
|
|
| code_resp = s.post(
|
| "https://auth.openai.com/api/accounts/email-otp/validate",
|
| headers=validate_headers,
|
| data=code_body,
|
| )
|
| print(f"[*] 验证码校验状态: {code_resp.status_code}")
|
| if code_resp.status_code != 200:
|
| print("[Error] 验证码校验失败")
|
| await browser.close()
|
| return None
|
|
|
| create_account_body = json.dumps({"name": _random_name(), "birthdate": _random_birthdate()})
|
| create_headers = {
|
| "referer": "https://auth.openai.com/about-you",
|
| "accept": "application/json",
|
| "content-type": "application/json",
|
| }
|
| if so_token:
|
| create_headers["openai-sentinel-so-token"] = so_token
|
|
|
| create_account_resp = s.post(
|
| "https://auth.openai.com/api/accounts/create_account",
|
| headers=create_headers,
|
| data=create_account_body,
|
| )
|
| print(f"[*] 账户创建状态: {create_account_resp.status_code}")
|
| if create_account_resp.status_code != 200:
|
| print("[Error] 账户创建失败")
|
| await browser.close()
|
| return None
|
|
|
| auth_cookie = s.cookies.get("oai-client-auth-session")
|
| if not auth_cookie:
|
| print("[Error] 未能获取到授权 Cookie")
|
| await browser.close()
|
| return None
|
|
|
| auth_json = _decode_jwt_segment(auth_cookie.split(".")[0])
|
| workspaces = auth_json.get("workspaces") or []
|
| if not workspaces:
|
| print("[Error] 授权 Cookie 里没有 workspace 信息")
|
| await browser.close()
|
| return None
|
| workspace_id = str((workspaces[0] or {}).get("id") or "").strip()
|
| if not workspace_id:
|
| print("[Error] 无法解析 workspace_id")
|
| await browser.close()
|
| return None
|
|
|
| select_body = json.dumps({"workspace_id": workspace_id})
|
| select_resp = s.post(
|
| "https://auth.openai.com/api/accounts/workspace/select",
|
| headers={
|
| "referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent",
|
| "content-type": "application/json",
|
| },
|
| data=select_body,
|
| )
|
| if select_resp.status_code != 200:
|
| print(f"[Error] 选择 workspace 失败")
|
| await browser.close()
|
| return None
|
|
|
| continue_url = str((select_resp.json() or {}).get("continue_url") or "").strip()
|
| if not continue_url:
|
| print("[Error] workspace/select 响应缺少 continue_url")
|
| await browser.close()
|
| return None
|
|
|
| current_url = continue_url
|
| for _ in range(6):
|
| final_resp = s.get(current_url, allow_redirects=False, timeout=15)
|
| location = final_resp.headers.get("Location") or ""
|
| if final_resp.status_code not in [301, 302, 303, 307, 308]:
|
| break
|
| if not location:
|
| break
|
| next_url = urllib.parse.urljoin(current_url, location)
|
| if "code=" in next_url and "state=" in next_url:
|
| token_json = submit_callback_url(
|
| callback_url=next_url,
|
| code_verifier=oauth.code_verifier,
|
| redirect_uri=oauth.redirect_uri,
|
| expected_state=oauth.state,
|
| )
|
| await browser.close()
|
| return token_json, email, password
|
| current_url = next_url
|
|
|
| print("[Error] 未能在重定向链中捕获到最终 Callback URL")
|
| await browser.close()
|
| return None
|
|
|
| except Exception as e:
|
| print(f"[Error] 运行时发生错误: {e}")
|
| await browser.close()
|
| return None
|
|
|
|
|
|
|
|
|
| async def main():
|
| parser = argparse.ArgumentParser(description="OpenAI 自动注册脚本(异步最终版)")
|
| parser.add_argument("--proxy", default=None, help="代理地址,如 http://127.0.0.1:7890")
|
| parser.add_argument("--once", action="store_true", help="只运行一次")
|
| parser.add_argument("--sleep-min", type=int, default=5, help="循环模式最短等待秒数")
|
| parser.add_argument("--sleep-max", type=int, default=30, help="循环模式最长等待秒数")
|
| args = parser.parse_args()
|
|
|
| sleep_min = max(1, args.sleep_min)
|
| sleep_max = max(sleep_min, args.sleep_max)
|
|
|
| count = 0
|
| print("[Info] OpenAI 自动注册脚本 (异步最终版) 启动")
|
| OUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
| while True:
|
| count += 1
|
| print(f"\n[{datetime.now().strftime('%H:%M:%S')}] >>> 开始第 {count} 次注册流程 <<<")
|
| try:
|
| result = await async_run_registration(args.proxy)
|
| if result:
|
| token_json, email, password = result
|
| try:
|
| t_data = json.loads(token_json)
|
| fname_email = t_data.get("email", "unknown").replace("@", "_")
|
| except Exception:
|
| fname_email = "unknown"
|
|
|
| file_path = TOKEN_DIR / f"token_{fname_email}_{int(time.time())}.json"
|
| try:
|
| file_path.write_text(token_json, encoding="utf-8")
|
| print(f"[*] 成功! Token 已保存至: {file_path}")
|
| except Exception as e:
|
| print(f"[Error] 保存 token 失败: {e}")
|
|
|
| acc_file = TOKEN_DIR / "accounts.txt"
|
| try:
|
| with open(acc_file, "a", encoding="utf-8") as f:
|
| f.write(f"{email}----{password}\n")
|
| except Exception as e:
|
| print(f"[Error] 保存账号信息失败: {e}")
|
| else:
|
| print("[-] 本次注册失败。")
|
| except Exception as e:
|
| print(f"[Error] 发生未捕获异常: {e}")
|
|
|
| if args.once:
|
| break
|
| wait_time = random.randint(sleep_min, sleep_max)
|
| print(f"[*] 休息 {wait_time} 秒...")
|
| await asyncio.sleep(wait_time)
|
|
|
| if __name__ == "__main__":
|
| asyncio.run(main()) |