altreg / reg.py
0Time's picture
update
2f61603 verified
Raw
History Blame Contribute Delete
19.6 kB
import json
import os
import re
import time
import uuid
import random
import string
import secrets
import hashlib
import base64
import argparse
from pathlib import Path
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Any, Dict, Optional
import urllib.parse
import urllib.request
import urllib.error
from curl_cffi import requests
from curl_cffi.requests import Session
# 配置输出目录和请求UA
OUT_DIR = Path(__file__).parent.resolve()
UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
# ========== 1. Mail.tm 临时邮箱处理模块 ==========
def rstr(n=10):
return "".join(random.choices(string.ascii_lowercase + string.digits, k=n))
def mreq(mt, pt, js=None, tk=None, proxies=None):
hdrs = {
"content-type": "application/json",
"accept": "application/json",
"user-agent": UA,
"pragma": "no-cache"
}
if tk:
hdrs["authorization"] = f"Bearer {tk}"
try:
with Session(proxies=proxies) as s:
return s.request(mt, f"https://api.mail.tm{pt}", json=js, headers=hdrs, timeout=20)
except:
return None
def getotp(tk, proxies=None):
for _ in range(60):
r = mreq("GET", "/messages", tk=tk, proxies=proxies)
if r and r.status_code == 200:
try:
dat = r.json()
except:
time.sleep(8); continue
msgs = dat.get("hydra:member", []) if isinstance(dat, dict) else dat
if not isinstance(msgs, list): msgs = []
for m in msgs:
if not isinstance(m, dict): continue
sb = m.get("subject", "")
intro = m.get("intro", "")
if "OpenAI" in sb or "ChatGPT" in sb or "code" in intro:
rb = mreq("GET", f"/messages/{m.get('id')}", tk=tk, proxies=proxies)
if rb and rb.status_code == 200:
txt = rb.json().get("text", "")
mt = re.search(r"(\d{6})", txt) or re.search(r"(\d{6})", sb)
if mt:
return mt.group(1)
time.sleep(8)
return None
def setup_mail_tm(proxies=None):
"""动态获取 mail.tm 邮箱并返回所需数据"""
mail_pw = "at41rvxgptye"
# 动态获取当前可用的邮箱域名
domain_res = mreq("GET", "/domains", proxies=proxies)
if not domain_res or domain_res.status_code != 200:
print(" [!] 无法获取可用邮箱域名")
return None, None, None
try:
js_data = domain_res.json()
if isinstance(js_data, list):
domains_data = js_data
elif isinstance(js_data, dict):
domains_data = js_data.get("hydra:member", js_data.get("hydra:collection", []))
else:
domains_data = []
if not domains_data:
print(" [!] 域名列表为空")
return None, None, None
active_domain = domains_data[0].get("domain")
except Exception as e:
print(f" [!] 解析域名失败: {e}")
return None, None, None
email = f"{rstr(10)}@{active_domain}"
openai_password = _gen_password() # 为 OpenAI 账户生成高强度密码
# 注册 mail.tm 邮箱
r = mreq("POST", "/accounts", {"address": email, "password": mail_pw}, proxies=proxies)
if not r or r.status_code not in [200, 201]:
print(f" [!] 邮箱注册被拒: {r.text if r else '无响应'}")
return None, None, None
# 获取 mail.tm 的 Token
r = mreq("POST", "/token", {"address": email, "password": mail_pw}, proxies=proxies)
if not r or r.status_code != 200:
print(" [!] 获取邮箱 Token 失败")
return None, None, None
mail_token = r.json().get("token")
if not mail_token:
return None, None, None
# 定义提取验证码的闭包函数
def fetch_code():
print(" [*] 正在等待验证码 (最多等待约8分钟)...")
return getotp(mail_token, proxies=proxies)
return email, openai_password, fetch_code
# ========== 2. OpenAI OAuth2 授权与环境生成模块 ==========
AUTH_URL = "https://auth.openai.com/oauth/authorize"
TOKEN_URL = "https://auth.openai.com/oauth/token"
CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
DEFAULT_REDIRECT_URI = "http://localhost:1455/auth/callback"
DEFAULT_SCOPE = "openid email profile offline_access"
def _gen_password() -> str:
alphabet = string.ascii_letters + string.digits
special = "!@#$%^&*.-"
base = [
random.choice(string.ascii_lowercase),
random.choice(string.ascii_uppercase),
random.choice(string.digits),
random.choice(special),
]
base += [random.choice(alphabet + special) for _ in range(12)]
random.shuffle(base)
return "".join(base)
def _random_name() -> str:
return ''.join(random.choice(string.ascii_lowercase) for _ in range(random.randint(5, 9))).capitalize()
def _random_birthdate() -> str:
start = datetime(1970,1,1)
end = datetime(1999,12,31)
d = start + timedelta(days=random.randrange((end - start).days + 1))
return d.strftime('%Y-%m-%d')
def _b64url_no_pad(raw: bytes) -> str:
return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
def _sha256_b64url_no_pad(s: str) -> str:
return _b64url_no_pad(hashlib.sha256(s.encode("ascii")).digest())
def _random_state(nbytes: int = 16) -> str:
return secrets.token_urlsafe(nbytes)
def _pkce_verifier() -> str:
return secrets.token_urlsafe(64)
def _parse_callback_url(callback_url: str) -> Dict[str, Any]:
candidate = callback_url.strip()
if not candidate:
return {"code": "","state": "","error": "","error_description": ""}
if "://" not in candidate:
if candidate.startswith("?"): candidate = f"http://localhost{candidate}"
elif any(ch in candidate for ch in "/?#") or ":" in candidate: candidate = f"http://{candidate}"
elif "=" in candidate: candidate = f"http://localhost/?{candidate}"
parsed = urllib.parse.urlparse(candidate)
query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True)
fragment = urllib.parse.parse_qs(parsed.fragment, keep_blank_values=True)
for key, values in fragment.items():
if key not in query or not query[key] or not (query[key][0] or "").strip():
query[key] = values
def get1(k: str) -> str:
v = query.get(k, [""])
return (v[0] or "").strip()
code = get1("code"); state = get1("state")
error = get1("error"); error_description = get1("error_description")
if code and not state and "#" in code:
code, state = code.split("#",1)
if not error and error_description:
error, error_description = error_description, ""
return {"code": code,"state": state,"error": error,"error_description": error_description}
def _jwt_claims_no_verify(id_token: str) -> Dict[str, Any]:
if not id_token or id_token.count(".") < 2: return {}
payload_b64 = id_token.split(".")[1]
pad = "=" * ((4 - (len(payload_b64) % 4)) % 4)
try:
return json.loads(base64.urlsafe_b64decode((payload_b64 + pad).encode("ascii")).decode("utf-8"))
except: return {}
def _decode_jwt_segment(seg: str) -> Dict[str, Any]:
raw = (seg or "").strip()
if not raw: return {}
pad = "=" * ((4 - (len(raw) % 4)) % 4)
try: return json.loads(base64.urlsafe_b64decode((raw + pad).encode("ascii")).decode("utf-8"))
except: return {}
def _to_int(v: Any) -> int:
try: return int(v)
except: return 0
def _post_form(url: str, data: Dict[str, str], timeout: int = 30) -> Dict[str, Any]:
body = urllib.parse.urlencode(data).encode("utf-8")
req = urllib.request.Request(
url, data=body, method="POST",
headers={"Content-Type": "application/x-www-form-urlencoded","Accept": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read()
if resp.status != 200: raise RuntimeError(f"token exchange failed: {resp.status}")
return json.loads(raw.decode("utf-8"))
except urllib.error.HTTPError as exc:
raise RuntimeError(f"token exchange failed: {exc.code}") from exc
@dataclass(frozen=True)
class OAuthStart:
auth_url: str
state: str
code_verifier: str
redirect_uri: str
def generate_oauth_url(*, redirect_uri: str = DEFAULT_REDIRECT_URI, scope: str = DEFAULT_SCOPE) -> OAuthStart:
state = _random_state()
code_verifier = _pkce_verifier()
code_challenge = _sha256_b64url_no_pad(code_verifier)
params = {
"client_id": CLIENT_ID, "response_type": "code", "redirect_uri": redirect_uri,
"scope": scope, "state": state, "code_challenge": code_challenge,
"code_challenge_method": "S256", "prompt": "login",
"id_token_add_organizations": "true", "codex_cli_simplified_flow": "true",
}
auth_url = f"{AUTH_URL}?{urllib.parse.urlencode(params)}"
return OAuthStart(auth_url=auth_url, state=state, code_verifier=code_verifier, redirect_uri=redirect_uri)
def fetch_sentinel_token(*, flow: str, did: str, proxies: Any = None) -> Optional[str]:
"""获取 OpenAI 最新的反爬 Token (Sentinel)"""
try:
body = json.dumps({"p": "", "id": did, "flow": flow})
resp = requests.post(
"https://sentinel.openai.com/backend-api/sentinel/req",
headers={
"origin": "https://sentinel.openai.com",
"referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6",
"content-type": "text/plain;charset=UTF-8",
"user-agent": UA
},
data=body, proxies=proxies, impersonate="chrome120", timeout=15,
)
if resp.status_code != 200: return None
return resp.json().get("token")
except: return None
def submit_callback_url(*, callback_url: str, expected_state: str, code_verifier: str, redirect_uri: str = DEFAULT_REDIRECT_URI) -> str:
"""提取重定向中的 Code 并换取最终的 Access / Refresh Token"""
cb = _parse_callback_url(callback_url)
if cb["error"]: raise RuntimeError(f"oauth error: {cb['error']}")
if not cb["code"] or not cb["state"]: raise ValueError("callback missing code/state")
if cb["state"] != expected_state: raise ValueError("state mismatch")
token_resp = _post_form(TOKEN_URL, {
"grant_type": "authorization_code", "client_id": CLIENT_ID,
"code": cb["code"], "redirect_uri": redirect_uri, "code_verifier": code_verifier,
})
access_token = (token_resp.get("access_token") or "").strip()
refresh_token = (token_resp.get("refresh_token") or "").strip()
id_token = (token_resp.get("id_token") or "").strip()
expires_in = _to_int(token_resp.get("expires_in"))
claims = _jwt_claims_no_verify(id_token)
email = str(claims.get("email") or "").strip()
auth_claims = claims.get("https://api.openai.com/auth") or {}
account_id = str(auth_claims.get("chatgpt_account_id") or "").strip()
now = int(time.time())
expired_rfc3339 = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + max(expires_in, 0)))
now_rfc3339 = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now))
config = {
"id_token": id_token, "access_token": access_token, "refresh_token": refresh_token,
"account_id": account_id, "last_refresh": now_rfc3339, "email": email,
"type": "codex", "expired": expired_rfc3339,
}
return json.dumps(config, ensure_ascii=False, separators=(",", ":"))
# ========== 3. 核心注册与提取流程 ==========
def run(proxy: Optional[str]) -> Optional[tuple[str, str, str]]:
proxies = {"http": proxy, "https": proxy} if proxy else None
s = requests.Session(proxies=proxies, impersonate="chrome120")
s.headers.update({"user-agent": UA})
print(f"[*] 初始化请求,准备获取临时邮箱...")
mail_data = setup_mail_tm(proxies)
if not mail_data or not mail_data[0]:
print("[Error] 获取 mail.tm 邮箱失败")
return None
email, password, code_fetcher = mail_data
print(f"[*] 成功获取邮箱: {email}")
print(f"[*] 生成高强度密码: {password}")
oauth = generate_oauth_url()
try:
# 第一步:进入 OAuth
resp = s.get(oauth.auth_url, timeout=15)
print("DEBUG oauth", resp.status_code,str(resp.url),str(resp.headers)[:400],str(resp.text)[:300], s.cookies)
did = s.cookies.get("oai-did")
if not did:
print("[Error] 未能获取到 OpenAI Device ID (oai-did)")
return None
# 第二步:获取 Sentinel Token (authorize_continue)
sen_token = fetch_sentinel_token(flow="authorize_continue", did=did, proxies=proxies)
sentinel = json.dumps({"p": "", "t": "", "c": sen_token, "id": did, "flow": "authorize_continue"}) if sen_token else None
# 第三步:获取 Sentinel SO Token (oauth_create_account)
so_token = fetch_sentinel_token(flow="oauth_create_account", did=did, proxies=proxies)
# 第四步:提交邮箱授权
signup_headers = {"referer": "https://auth.openai.com/create-account", "accept": "application/json", "content-type": "application/json"}
if sentinel: signup_headers["openai-sentinel-token"] = sentinel
signup_resp = s.post("https://auth.openai.com/api/accounts/authorize/continue", headers=signup_headers, data=json.dumps({"username": {"value": email, "kind": "email"}, "screen_hint": "signup"}))
if signup_resp.status_code != 200:
print(f"[Error] 提交邮箱失败: {signup_resp.status_code}")
return None
# 第五步:设置密码
register_headers = {"referer": "https://auth.openai.com/create-account/password", "accept": "application/json", "content-type": "application/json"}
if sentinel: register_headers["openai-sentinel-token"] = sentinel
reg_resp = s.post("https://auth.openai.com/api/accounts/user/register", headers=register_headers, data=json.dumps({"password": password, "username": email}))
if reg_resp.status_code != 200:
print(f"[Error] 设置密码失败: {reg_resp.status_code}")
return None
# 第六步:触发并提取验证码
s.get("https://auth.openai.com/api/accounts/email-otp/send", headers=register_headers, timeout=15)
code = code_fetcher()
if not code:
print("[Error] 验证码等待超时或提取失败")
return None
print(f"[*] 成功提取验证码: {code}")
# 第七步:校验验证码
validate_headers = {"referer": "https://auth.openai.com/email-verification", "accept": "application/json", "content-type": "application/json"}
if sentinel: validate_headers["openai-sentinel-token"] = sentinel
code_resp = s.post("https://auth.openai.com/api/accounts/email-otp/validate", headers=validate_headers, data=json.dumps({"code": code}))
if code_resp.status_code != 200:
print(f"[Error] 验证码校验失败: {code_resp.status_code}")
return None
# 第八步:完成账号注册填写
create_headers = {"referer": "https://auth.openai.com/about-you", "accept": "application/json", "content-type": "application/json"}
if so_token: create_headers["openai-sentinel-so-token"] = so_token
create_resp = s.post("https://auth.openai.com/api/accounts/create_account", headers=create_headers, data=json.dumps({"name": _random_name(), "birthdate": _random_birthdate()}))
if create_resp.status_code != 200:
print(f"[Error] 账户信息填写失败: {create_resp.status_code}")
return None
# 第九步:选择工作区 Workspace
auth_cookie = s.cookies.get("oai-client-auth-session")
if not auth_cookie: return None
auth_json = _decode_jwt_segment(auth_cookie.split(".")[0])
workspace_id = str((auth_json.get("workspaces") or [{}])[0].get("id") or "").strip()
select_resp = s.post("https://auth.openai.com/api/accounts/workspace/select", headers={"referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent", "content-type": "application/json"}, data=json.dumps({"workspace_id": workspace_id}))
if select_resp.status_code != 200: return None
continue_url = str((select_resp.json() or {}).get("continue_url") or "").strip()
# 第十步:拦截重定向,提取终极 Token
current_url = continue_url
for _ in range(6):
final_resp = s.get(current_url, allow_redirects=False, timeout=15)
location = final_resp.headers.get("Location") or ""
if final_resp.status_code not in [301, 302, 303, 307, 308] or not location:
break
next_url = urllib.parse.urljoin(current_url, location)
if "code=" in next_url and "state=" in next_url:
token_json = submit_callback_url(callback_url=next_url, code_verifier=oauth.code_verifier, redirect_uri=oauth.redirect_uri, expected_state=oauth.state)
return token_json, email, password
current_url = next_url
print("[Error] 未能在重定向链中捕获到最终 Token")
return None
except Exception as e:
print(f"[Error] 运行时异常: {e}")
return None
# ========== 4. 主程序轮询与保存 ==========
def main():
parser = argparse.ArgumentParser(description="OpenAI 完美融合自动化注册脚本 (By Gemini)")
parser.add_argument("--proxy", default=None, help="代理地址,如 http://127.0.0.1:7890")
parser.add_argument("--once", action="store_true", help="只运行一次")
args = parser.parse_args()
count = 0
print("========================================")
print("🚀 OpenAI 终极注册机 (带 Token 提取及 mail.tm) ")
print("========================================")
OUT_DIR.mkdir(parents=True, exist_ok=True)
while True:
count += 1
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] >>> 开始第 {count} 次注册流程 <<<")
run_result = run(args.proxy)
if run_result:
token_json, email, password = run_result
fname_email = email.replace("@", "_")
# 保存机制 1:单独保存 Token JSON 文件
tokens_dir = OUT_DIR / "tokens"
tokens_dir.mkdir(parents=True, exist_ok=True)
file_path = tokens_dir / f"token_{fname_email}_{int(time.time())}.json"
file_path.write_text(token_json, encoding="utf-8")
print(f"[🎉] 成功获取 Token!已保存至: {file_path}")
# 保存机制 2:汇总账号密码信息
acc_file = tokens_dir / "accounts.txt"
with open(acc_file, "a", encoding="utf-8") as f:
f.write(f"{email}----{password}\n")
print(f"[📝] 账号已追加至: {acc_file}")
else:
print("[-] 本次注册流程断开。")
if args.once:
break
wait_time = random.randint(5, 15)
print(f"[*] 冷却 {wait_time} 秒...")
time.sleep(wait_time)
if __name__ == "__main__":
main()