| import json |
| import os |
| import re |
| import time |
| import uuid |
| import random |
| import string |
| import secrets |
| import hashlib |
| import base64 |
| import argparse |
| from pathlib import Path |
| from datetime import datetime, timedelta |
| from dataclasses import dataclass |
| from typing import Any, Dict, Optional |
| import urllib.parse |
| import urllib.request |
| import urllib.error |
|
|
| from curl_cffi import requests |
| from curl_cffi.requests import Session |
|
|
| |
| OUT_DIR = Path(__file__).parent.resolve() |
| UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" |
|
|
| |
|
|
| def rstr(n=10): |
| return "".join(random.choices(string.ascii_lowercase + string.digits, k=n)) |
|
|
| def mreq(mt, pt, js=None, tk=None, proxies=None): |
| hdrs = { |
| "content-type": "application/json", |
| "accept": "application/json", |
| "user-agent": UA, |
| "pragma": "no-cache" |
| } |
| if tk: |
| hdrs["authorization"] = f"Bearer {tk}" |
| try: |
| with Session(proxies=proxies) as s: |
| return s.request(mt, f"https://api.mail.tm{pt}", json=js, headers=hdrs, timeout=20) |
| except: |
| return None |
|
|
| def getotp(tk, proxies=None): |
| for _ in range(60): |
| r = mreq("GET", "/messages", tk=tk, proxies=proxies) |
| if r and r.status_code == 200: |
| try: |
| dat = r.json() |
| except: |
| time.sleep(8); continue |
| |
| msgs = dat.get("hydra:member", []) if isinstance(dat, dict) else dat |
| if not isinstance(msgs, list): msgs = [] |
| |
| for m in msgs: |
| if not isinstance(m, dict): continue |
| sb = m.get("subject", "") |
| intro = m.get("intro", "") |
| if "OpenAI" in sb or "ChatGPT" in sb or "code" in intro: |
| rb = mreq("GET", f"/messages/{m.get('id')}", tk=tk, proxies=proxies) |
| if rb and rb.status_code == 200: |
| txt = rb.json().get("text", "") |
| mt = re.search(r"(\d{6})", txt) or re.search(r"(\d{6})", sb) |
| if mt: |
| return mt.group(1) |
| time.sleep(8) |
| return None |
|
|
| def setup_mail_tm(proxies=None): |
| """动态获取 mail.tm 邮箱并返回所需数据""" |
| mail_pw = "at41rvxgptye" |
| |
| |
| domain_res = mreq("GET", "/domains", proxies=proxies) |
| if not domain_res or domain_res.status_code != 200: |
| print(" [!] 无法获取可用邮箱域名") |
| return None, None, None |
| |
| try: |
| js_data = domain_res.json() |
| if isinstance(js_data, list): |
| domains_data = js_data |
| elif isinstance(js_data, dict): |
| domains_data = js_data.get("hydra:member", js_data.get("hydra:collection", [])) |
| else: |
| domains_data = [] |
|
|
| if not domains_data: |
| print(" [!] 域名列表为空") |
| return None, None, None |
| |
| active_domain = domains_data[0].get("domain") |
| except Exception as e: |
| print(f" [!] 解析域名失败: {e}") |
| return None, None, None |
|
|
| email = f"{rstr(10)}@{active_domain}" |
| openai_password = _gen_password() |
| |
| |
| r = mreq("POST", "/accounts", {"address": email, "password": mail_pw}, proxies=proxies) |
| if not r or r.status_code not in [200, 201]: |
| print(f" [!] 邮箱注册被拒: {r.text if r else '无响应'}") |
| return None, None, None |
| |
| |
| r = mreq("POST", "/token", {"address": email, "password": mail_pw}, proxies=proxies) |
| if not r or r.status_code != 200: |
| print(" [!] 获取邮箱 Token 失败") |
| return None, None, None |
| |
| mail_token = r.json().get("token") |
| if not mail_token: |
| return None, None, None |
|
|
| |
| def fetch_code(): |
| print(" [*] 正在等待验证码 (最多等待约8分钟)...") |
| return getotp(mail_token, proxies=proxies) |
| |
| return email, openai_password, fetch_code |
|
|
|
|
| |
|
|
| AUTH_URL = "https://auth.openai.com/oauth/authorize" |
| TOKEN_URL = "https://auth.openai.com/oauth/token" |
| CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann" |
| DEFAULT_REDIRECT_URI = "http://localhost:1455/auth/callback" |
| DEFAULT_SCOPE = "openid email profile offline_access" |
|
|
| def _gen_password() -> str: |
| alphabet = string.ascii_letters + string.digits |
| special = "!@#$%^&*.-" |
| base = [ |
| random.choice(string.ascii_lowercase), |
| random.choice(string.ascii_uppercase), |
| random.choice(string.digits), |
| random.choice(special), |
| ] |
| base += [random.choice(alphabet + special) for _ in range(12)] |
| random.shuffle(base) |
| return "".join(base) |
|
|
| def _random_name() -> str: |
| return ''.join(random.choice(string.ascii_lowercase) for _ in range(random.randint(5, 9))).capitalize() |
|
|
| def _random_birthdate() -> str: |
| start = datetime(1970,1,1) |
| end = datetime(1999,12,31) |
| d = start + timedelta(days=random.randrange((end - start).days + 1)) |
| return d.strftime('%Y-%m-%d') |
|
|
| def _b64url_no_pad(raw: bytes) -> str: |
| return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=") |
|
|
| def _sha256_b64url_no_pad(s: str) -> str: |
| return _b64url_no_pad(hashlib.sha256(s.encode("ascii")).digest()) |
|
|
| def _random_state(nbytes: int = 16) -> str: |
| return secrets.token_urlsafe(nbytes) |
|
|
| def _pkce_verifier() -> str: |
| return secrets.token_urlsafe(64) |
|
|
| def _parse_callback_url(callback_url: str) -> Dict[str, Any]: |
| candidate = callback_url.strip() |
| if not candidate: |
| return {"code": "","state": "","error": "","error_description": ""} |
| if "://" not in candidate: |
| if candidate.startswith("?"): candidate = f"http://localhost{candidate}" |
| elif any(ch in candidate for ch in "/?#") or ":" in candidate: candidate = f"http://{candidate}" |
| elif "=" in candidate: candidate = f"http://localhost/?{candidate}" |
| parsed = urllib.parse.urlparse(candidate) |
| query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True) |
| fragment = urllib.parse.parse_qs(parsed.fragment, keep_blank_values=True) |
| for key, values in fragment.items(): |
| if key not in query or not query[key] or not (query[key][0] or "").strip(): |
| query[key] = values |
| def get1(k: str) -> str: |
| v = query.get(k, [""]) |
| return (v[0] or "").strip() |
| code = get1("code"); state = get1("state") |
| error = get1("error"); error_description = get1("error_description") |
| if code and not state and "#" in code: |
| code, state = code.split("#",1) |
| if not error and error_description: |
| error, error_description = error_description, "" |
| return {"code": code,"state": state,"error": error,"error_description": error_description} |
|
|
| def _jwt_claims_no_verify(id_token: str) -> Dict[str, Any]: |
| if not id_token or id_token.count(".") < 2: return {} |
| payload_b64 = id_token.split(".")[1] |
| pad = "=" * ((4 - (len(payload_b64) % 4)) % 4) |
| try: |
| return json.loads(base64.urlsafe_b64decode((payload_b64 + pad).encode("ascii")).decode("utf-8")) |
| except: return {} |
|
|
| def _decode_jwt_segment(seg: str) -> Dict[str, Any]: |
| raw = (seg or "").strip() |
| if not raw: return {} |
| pad = "=" * ((4 - (len(raw) % 4)) % 4) |
| try: return json.loads(base64.urlsafe_b64decode((raw + pad).encode("ascii")).decode("utf-8")) |
| except: return {} |
|
|
| def _to_int(v: Any) -> int: |
| try: return int(v) |
| except: return 0 |
|
|
| def _post_form(url: str, data: Dict[str, str], timeout: int = 30) -> Dict[str, Any]: |
| body = urllib.parse.urlencode(data).encode("utf-8") |
| req = urllib.request.Request( |
| url, data=body, method="POST", |
| headers={"Content-Type": "application/x-www-form-urlencoded","Accept": "application/json"}, |
| ) |
| try: |
| with urllib.request.urlopen(req, timeout=timeout) as resp: |
| raw = resp.read() |
| if resp.status != 200: raise RuntimeError(f"token exchange failed: {resp.status}") |
| return json.loads(raw.decode("utf-8")) |
| except urllib.error.HTTPError as exc: |
| raise RuntimeError(f"token exchange failed: {exc.code}") from exc |
|
|
| @dataclass(frozen=True) |
| class OAuthStart: |
| auth_url: str |
| state: str |
| code_verifier: str |
| redirect_uri: str |
|
|
| def generate_oauth_url(*, redirect_uri: str = DEFAULT_REDIRECT_URI, scope: str = DEFAULT_SCOPE) -> OAuthStart: |
| state = _random_state() |
| code_verifier = _pkce_verifier() |
| code_challenge = _sha256_b64url_no_pad(code_verifier) |
| params = { |
| "client_id": CLIENT_ID, "response_type": "code", "redirect_uri": redirect_uri, |
| "scope": scope, "state": state, "code_challenge": code_challenge, |
| "code_challenge_method": "S256", "prompt": "login", |
| "id_token_add_organizations": "true", "codex_cli_simplified_flow": "true", |
| } |
| auth_url = f"{AUTH_URL}?{urllib.parse.urlencode(params)}" |
| return OAuthStart(auth_url=auth_url, state=state, code_verifier=code_verifier, redirect_uri=redirect_uri) |
|
|
| def fetch_sentinel_token(*, flow: str, did: str, proxies: Any = None) -> Optional[str]: |
| """获取 OpenAI 最新的反爬 Token (Sentinel)""" |
| try: |
| body = json.dumps({"p": "", "id": did, "flow": flow}) |
| resp = requests.post( |
| "https://sentinel.openai.com/backend-api/sentinel/req", |
| headers={ |
| "origin": "https://sentinel.openai.com", |
| "referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6", |
| "content-type": "text/plain;charset=UTF-8", |
| "user-agent": UA |
| }, |
| data=body, proxies=proxies, impersonate="chrome120", timeout=15, |
| ) |
| if resp.status_code != 200: return None |
| return resp.json().get("token") |
| except: return None |
|
|
| def submit_callback_url(*, callback_url: str, expected_state: str, code_verifier: str, redirect_uri: str = DEFAULT_REDIRECT_URI) -> str: |
| """提取重定向中的 Code 并换取最终的 Access / Refresh Token""" |
| cb = _parse_callback_url(callback_url) |
| if cb["error"]: raise RuntimeError(f"oauth error: {cb['error']}") |
| if not cb["code"] or not cb["state"]: raise ValueError("callback missing code/state") |
| if cb["state"] != expected_state: raise ValueError("state mismatch") |
|
|
| token_resp = _post_form(TOKEN_URL, { |
| "grant_type": "authorization_code", "client_id": CLIENT_ID, |
| "code": cb["code"], "redirect_uri": redirect_uri, "code_verifier": code_verifier, |
| }) |
| |
| access_token = (token_resp.get("access_token") or "").strip() |
| refresh_token = (token_resp.get("refresh_token") or "").strip() |
| id_token = (token_resp.get("id_token") or "").strip() |
| expires_in = _to_int(token_resp.get("expires_in")) |
|
|
| claims = _jwt_claims_no_verify(id_token) |
| email = str(claims.get("email") or "").strip() |
| auth_claims = claims.get("https://api.openai.com/auth") or {} |
| account_id = str(auth_claims.get("chatgpt_account_id") or "").strip() |
|
|
| now = int(time.time()) |
| expired_rfc3339 = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + max(expires_in, 0))) |
| now_rfc3339 = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now)) |
|
|
| config = { |
| "id_token": id_token, "access_token": access_token, "refresh_token": refresh_token, |
| "account_id": account_id, "last_refresh": now_rfc3339, "email": email, |
| "type": "codex", "expired": expired_rfc3339, |
| } |
| return json.dumps(config, ensure_ascii=False, separators=(",", ":")) |
|
|
|
|
| |
|
|
| def run(proxy: Optional[str]) -> Optional[tuple[str, str, str]]: |
| proxies = {"http": proxy, "https": proxy} if proxy else None |
| s = requests.Session(proxies=proxies, impersonate="chrome120") |
| s.headers.update({"user-agent": UA}) |
|
|
| print(f"[*] 初始化请求,准备获取临时邮箱...") |
| mail_data = setup_mail_tm(proxies) |
| if not mail_data or not mail_data[0]: |
| print("[Error] 获取 mail.tm 邮箱失败") |
| return None |
| |
| email, password, code_fetcher = mail_data |
| print(f"[*] 成功获取邮箱: {email}") |
| print(f"[*] 生成高强度密码: {password}") |
|
|
| oauth = generate_oauth_url() |
| |
| try: |
| |
| resp = s.get(oauth.auth_url, timeout=15) |
| print("DEBUG oauth", resp.status_code,str(resp.url),str(resp.headers)[:400],str(resp.text)[:300], s.cookies) |
| did = s.cookies.get("oai-did") |
| if not did: |
| print("[Error] 未能获取到 OpenAI Device ID (oai-did)") |
| return None |
|
|
| |
| sen_token = fetch_sentinel_token(flow="authorize_continue", did=did, proxies=proxies) |
| sentinel = json.dumps({"p": "", "t": "", "c": sen_token, "id": did, "flow": "authorize_continue"}) if sen_token else None |
|
|
| |
| so_token = fetch_sentinel_token(flow="oauth_create_account", did=did, proxies=proxies) |
|
|
| |
| signup_headers = {"referer": "https://auth.openai.com/create-account", "accept": "application/json", "content-type": "application/json"} |
| if sentinel: signup_headers["openai-sentinel-token"] = sentinel |
| signup_resp = s.post("https://auth.openai.com/api/accounts/authorize/continue", headers=signup_headers, data=json.dumps({"username": {"value": email, "kind": "email"}, "screen_hint": "signup"})) |
| if signup_resp.status_code != 200: |
| print(f"[Error] 提交邮箱失败: {signup_resp.status_code}") |
| return None |
|
|
| |
| register_headers = {"referer": "https://auth.openai.com/create-account/password", "accept": "application/json", "content-type": "application/json"} |
| if sentinel: register_headers["openai-sentinel-token"] = sentinel |
| reg_resp = s.post("https://auth.openai.com/api/accounts/user/register", headers=register_headers, data=json.dumps({"password": password, "username": email})) |
| if reg_resp.status_code != 200: |
| print(f"[Error] 设置密码失败: {reg_resp.status_code}") |
| return None |
|
|
| |
| s.get("https://auth.openai.com/api/accounts/email-otp/send", headers=register_headers, timeout=15) |
| code = code_fetcher() |
| if not code: |
| print("[Error] 验证码等待超时或提取失败") |
| return None |
| print(f"[*] 成功提取验证码: {code}") |
|
|
| |
| validate_headers = {"referer": "https://auth.openai.com/email-verification", "accept": "application/json", "content-type": "application/json"} |
| if sentinel: validate_headers["openai-sentinel-token"] = sentinel |
| code_resp = s.post("https://auth.openai.com/api/accounts/email-otp/validate", headers=validate_headers, data=json.dumps({"code": code})) |
| if code_resp.status_code != 200: |
| print(f"[Error] 验证码校验失败: {code_resp.status_code}") |
| return None |
|
|
| |
| create_headers = {"referer": "https://auth.openai.com/about-you", "accept": "application/json", "content-type": "application/json"} |
| if so_token: create_headers["openai-sentinel-so-token"] = so_token |
| create_resp = s.post("https://auth.openai.com/api/accounts/create_account", headers=create_headers, data=json.dumps({"name": _random_name(), "birthdate": _random_birthdate()})) |
| if create_resp.status_code != 200: |
| print(f"[Error] 账户信息填写失败: {create_resp.status_code}") |
| return None |
|
|
| |
| auth_cookie = s.cookies.get("oai-client-auth-session") |
| if not auth_cookie: return None |
| auth_json = _decode_jwt_segment(auth_cookie.split(".")[0]) |
| workspace_id = str((auth_json.get("workspaces") or [{}])[0].get("id") or "").strip() |
| |
| select_resp = s.post("https://auth.openai.com/api/accounts/workspace/select", headers={"referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent", "content-type": "application/json"}, data=json.dumps({"workspace_id": workspace_id})) |
| if select_resp.status_code != 200: return None |
| |
| continue_url = str((select_resp.json() or {}).get("continue_url") or "").strip() |
|
|
| |
| current_url = continue_url |
| for _ in range(6): |
| final_resp = s.get(current_url, allow_redirects=False, timeout=15) |
| location = final_resp.headers.get("Location") or "" |
| if final_resp.status_code not in [301, 302, 303, 307, 308] or not location: |
| break |
| next_url = urllib.parse.urljoin(current_url, location) |
| if "code=" in next_url and "state=" in next_url: |
| token_json = submit_callback_url(callback_url=next_url, code_verifier=oauth.code_verifier, redirect_uri=oauth.redirect_uri, expected_state=oauth.state) |
| return token_json, email, password |
| current_url = next_url |
|
|
| print("[Error] 未能在重定向链中捕获到最终 Token") |
| return None |
|
|
| except Exception as e: |
| print(f"[Error] 运行时异常: {e}") |
| return None |
|
|
|
|
| |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="OpenAI 完美融合自动化注册脚本 (By Gemini)") |
| parser.add_argument("--proxy", default=None, help="代理地址,如 http://127.0.0.1:7890") |
| parser.add_argument("--once", action="store_true", help="只运行一次") |
| args = parser.parse_args() |
|
|
| count = 0 |
| print("========================================") |
| print("🚀 OpenAI 终极注册机 (带 Token 提取及 mail.tm) ") |
| print("========================================") |
| |
| OUT_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| while True: |
| count += 1 |
| print(f"\n[{datetime.now().strftime('%H:%M:%S')}] >>> 开始第 {count} 次注册流程 <<<") |
| run_result = run(args.proxy) |
| |
| if run_result: |
| token_json, email, password = run_result |
| fname_email = email.replace("@", "_") |
|
|
| |
| tokens_dir = OUT_DIR / "tokens" |
| tokens_dir.mkdir(parents=True, exist_ok=True) |
| file_path = tokens_dir / f"token_{fname_email}_{int(time.time())}.json" |
| file_path.write_text(token_json, encoding="utf-8") |
| print(f"[🎉] 成功获取 Token!已保存至: {file_path}") |
|
|
| |
| acc_file = tokens_dir / "accounts.txt" |
| with open(acc_file, "a", encoding="utf-8") as f: |
| f.write(f"{email}----{password}\n") |
| print(f"[📝] 账号已追加至: {acc_file}") |
| |
| else: |
| print("[-] 本次注册流程断开。") |
|
|
| if args.once: |
| break |
| |
| wait_time = random.randint(5, 15) |
| print(f"[*] 冷却 {wait_time} 秒...") |
| time.sleep(wait_time) |
|
|
| if __name__ == "__main__": |
| main() |
|
|