rita / register /register.py
Spooker's picture
Upload 2943 files
c6393bd verified
"""
rita-register — Rita.ai 批量自动注册工具
注册流程 (accountapi.gosplit.net):
1. POST /authorize/authenticate → 初始化会话
2. POST /authorize/sign_process → 提交邮箱 + agree (可能触发 need_captcha)
3. [YesCaptcha 过 reCAPTCHA Enterprise] → 如果 step2 返回 need_captcha=1
4. POST /authorize/emailCode (captcha) → 发送邮箱 OTP (首次需要 captcha token)
5. 等待邮箱验证码 (GPTMail / YYDS Mail)
6. POST /authorize/emailCode (resend) → 重发验证码 (无需 captcha,会话已验证)
7. POST /authorize/code_sign → 提交 OTP → 注册成功
8. POST /authorize/authenticate → 获取 session token
9. POST /user/silent_edit → 设置密码
依赖: curl_cffi (随机浏览器指纹)
"""
import json
import os
import re
import time
import random
import string
import threading
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests as stdlib_requests # 仅用于 YesCaptcha / upload
from curl_cffi import requests as curl_requests
# ===================== Config =====================
def _load_config() -> dict:
defaults = {
"total_accounts": 3,
"max_workers": 2,
"password": "@qazwsx123456",
"mail_provider": "gptmail",
"gptmail_api_base": "https://mail.chatgpt.org.uk",
"gptmail_api_key": "gpt-test",
"yydsmail_api_base": "https://maliapi.215.im/v1",
"yydsmail_api_key": "",
"yescaptcha_client_key": "",
"yescaptcha_website_key": "6Lej6N4hAAAAANgkiQRXxLrlue_J_y035Dm6UhPk",
"yescaptcha_website_url": "https://account.rita.ai",
"yescaptcha_task_type": "NoCaptchaTaskProxyless",
"rita_account_api": "https://accountapi.gosplit.net",
"rita_origin": "https://account.rita.ai",
"rita_redirect_uri": "https://www.rita.ai/zh/ai-chat",
"rita_language": "zh",
"proxy": "",
"mail_use_proxy": False,
"upload_enabled": True,
"upload_api_url": "http://localhost:10089/api/accounts/batch",
"output_file": "registered_accounts.txt",
}
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json")
if os.path.exists(config_path):
try:
with open(config_path, "r", encoding="utf-8") as f:
file_config = json.load(f)
file_config = {k: v for k, v in file_config.items() if not k.startswith("_comment")}
defaults.update(file_config)
except Exception as e:
print(f"[WARN] 加载 config.json 失败: {e}")
# Env overrides
for key in list(defaults.keys()):
env_val = os.environ.get(key.upper())
if env_val is not None:
if isinstance(defaults[key], bool):
defaults[key] = env_val.lower() in ("1", "true", "yes")
elif isinstance(defaults[key], int):
try:
defaults[key] = int(env_val)
except ValueError:
pass
else:
defaults[key] = env_val
return defaults
CFG = _load_config()
_print_lock = threading.Lock()
_file_lock = threading.Lock()
def log(msg, tag=""):
prefix = f"[{tag}] " if tag else ""
with _print_lock:
print(f"[{time.strftime('%H:%M:%S')}] {prefix}{msg}", flush=True)
# ===================== 浏览器指纹 =====================
_CHROME_PROFILES = [
# 只保留 curl_cffi 支持且验证过可通过 Rita 风控的版本
{"major": 120, "impersonate": "chrome120", "build": 6099, "patch": (109, 234),
"sec_ch_ua": '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"'},
]
_OS_PROFILES = [
# 锁定 Windows — 验证过可通过 Rita 风控
{"platform": "Windows NT 10.0; Win64; x64", "sec_ch_ua_platform": '"Windows"'},
]
_ACCEPT_LANGUAGES = [
"zh-CN,zh;q=0.9,en;q=0.8",
"zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
"en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
"en-US,en;q=0.9",
]
def _random_fingerprint():
"""返回 (impersonate, ua, headers_dict)"""
chrome = random.choice(_CHROME_PROFILES)
os_p = random.choice(_OS_PROFILES)
major = chrome["major"]
build = chrome["build"]
patch = random.randint(*chrome["patch"])
full_ver = f"{major}.0.{build}.{patch}"
ua = (
f"Mozilla/5.0 ({os_p['platform']}) "
f"AppleWebKit/537.36 (KHTML, like Gecko) "
f"Chrome/{full_ver} Safari/537.36"
)
hdrs = {
"User-Agent": ua,
"Accept-Language": random.choice(_ACCEPT_LANGUAGES),
"sec-ch-ua": chrome["sec_ch_ua"],
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": os_p["sec_ch_ua_platform"],
}
return chrome["impersonate"], ua, hdrs
# ===================== Mail Providers =====================
def _extract_code(text: str) -> str | None:
"""从邮件内容提取 4-6 位验证码"""
if not text:
return None
patterns = [
r"(?:verification|verify|code|验证码|验证|代码)[\s::]*(\d{4,6})",
r">\s*(\d{4,6})\s*<",
r"\b(\d{4,6})\b",
]
for pat in patterns:
matches = re.findall(pat, text, re.IGNORECASE)
for code in matches:
if len(code) >= 4 and not code.startswith("0"):
return code
return None
class GPTMailProvider:
"""GPTMail (mail.chatgpt.org.uk)"""
def __init__(self, impersonate: str):
self.base = CFG["gptmail_api_base"].rstrip("/")
self.api_key = CFG["gptmail_api_key"]
self.imp = impersonate
self.session = curl_requests.Session(impersonate=self.imp)
if CFG.get("mail_use_proxy") and CFG.get("proxy"):
self.session.proxies = {"http": CFG["proxy"], "https": CFG["proxy"]}
def _headers(self) -> dict:
h = {"Accept": "application/json"}
if self.api_key:
h["X-API-Key"] = self.api_key
return h
def create_email(self) -> tuple[str, dict]:
r = self.session.post(
f"{self.base}/api/generate-email",
headers=self._headers(), json={}, timeout=15, impersonate=self.imp,
)
r.raise_for_status()
data = r.json()
email = data.get("data", {}).get("email", "")
if not email:
raise Exception(f"GPTMail 创建失败: {data}")
return email, {"email": email}
def wait_for_code(self, ctx: dict, timeout: int = 120) -> str | None:
email = ctx["email"]
start = time.time()
while time.time() - start < timeout:
try:
r = self.session.get(
f"{self.base}/api/emails", params={"email": email},
headers=self._headers(), timeout=15, impersonate=self.imp,
)
if r.status_code == 200:
emails_list = r.json().get("data", {}).get("emails", [])
for mail in emails_list:
code = _extract_code(mail.get("subject", ""))
if code:
return code
mail_id = mail.get("id")
if mail_id:
detail = self._detail(mail_id)
if detail:
content = detail.get("data", {}).get("content", "")
html = detail.get("data", {}).get("html_content", "")
code = _extract_code(content) or _extract_code(html)
if code:
return code
except Exception:
pass
time.sleep(3)
return None
def _detail(self, mail_id: str):
try:
r = self.session.get(
f"{self.base}/api/email/{mail_id}",
headers=self._headers(), timeout=15, impersonate=self.imp,
)
return r.json() if r.status_code == 200 else None
except Exception:
return None
class YYDSMailProvider:
"""YYDS Mail (maliapi.215.im)"""
def __init__(self, impersonate: str):
self.base = CFG["yydsmail_api_base"].rstrip("/")
self.api_key = CFG["yydsmail_api_key"]
self.imp = impersonate
self.session = curl_requests.Session(impersonate=self.imp)
if CFG.get("mail_use_proxy") and CFG.get("proxy"):
self.session.proxies = {"http": CFG["proxy"], "https": CFG["proxy"]}
self._domains: list[str] = []
def _headers(self, token: str = "") -> dict:
h = {"Accept": "application/json", "Content-Type": "application/json"}
if token:
h["Authorization"] = f"Bearer {token}"
elif self.api_key:
h["X-API-Key"] = self.api_key
return h
def _fetch_domains(self) -> list[str]:
if self._domains:
return self._domains
try:
r = self.session.get(f"{self.base}/domains", headers=self._headers(), timeout=15, impersonate=self.imp)
if r.status_code == 200:
raw = r.json()
data = raw if isinstance(raw, list) else raw.get("data", [])
names = [d.get("domain") or d if isinstance(d, str) else d.get("domain", "") for d in data]
self._domains = [n for n in names if n]
except Exception:
pass
return self._domains
def create_email(self) -> tuple[str, dict]:
domains = self._fetch_domains()
if not domains:
raise Exception("YYDS Mail: 无可用域名")
domain = random.choice(domains)
prefix = "".join(random.choices(string.ascii_lowercase + string.digits, k=random.randint(8, 12)))
r = self.session.post(
f"{self.base}/accounts", headers=self._headers(),
json={"address": prefix, "domain": domain}, timeout=15, impersonate=self.imp,
)
if r.status_code not in (200, 201):
raise Exception(f"YYDS Mail 创建失败: {r.status_code} {r.text[:200]}")
resp = r.json()
data = resp.get("data", resp) if isinstance(resp, dict) else resp
email = data.get("address", f"{prefix}@{domain}")
token = data.get("token", "")
if not token:
raise Exception(f"YYDS Mail 未返回 token")
return email, {"token": token}
def wait_for_code(self, ctx: dict, timeout: int = 120) -> str | None:
token = ctx["token"]
start = time.time()
while time.time() - start < timeout:
try:
r = self.session.get(
f"{self.base}/messages", headers=self._headers(token),
timeout=15, impersonate=self.imp,
)
if r.status_code == 200:
resp = r.json()
msgs = resp if isinstance(resp, list) else (resp.get("data", {}).get("messages", []) if isinstance(resp.get("data"), dict) else resp.get("data", []))
for msg in (msgs or []):
msg_id = msg.get("id")
if not msg_id:
continue
detail = self._detail(token, msg_id)
if detail:
content = detail.get("text", "") or detail.get("html", "")
if isinstance(content, list):
content = " ".join(content)
code = _extract_code(content)
if code:
return code
except Exception:
pass
time.sleep(3)
return None
def _detail(self, token: str, msg_id: str):
try:
r = self.session.get(
f"{self.base}/messages/{msg_id}", headers=self._headers(token),
timeout=15, impersonate=self.imp,
)
if r.status_code == 200:
resp = r.json()
return resp.get("data", resp) if isinstance(resp, dict) else resp
except Exception:
pass
return None
def get_mail_provider(impersonate: str):
if CFG.get("mail_provider", "").lower() == "yydsmail":
return YYDSMailProvider(impersonate)
return GPTMailProvider(impersonate)
# ===================== YesCaptcha =====================
# ===================== YesCaptcha =====================
# YesCaptcha 支持的任务类型,按优先级尝试
_RECAPTCHA_TASK_TYPES = [
"RecaptchaV2EnterpriseTaskProxyless", # 企业版 (需要完整参数)
"NoCaptchaTaskProxyless", # 标准 V2 兜底
]
def solve_recaptcha(tag: str = "") -> str:
"""
用 YesCaptcha 过 reCAPTCHA Enterprise.
返回 g-recaptcha-response token.
HAR 解析出的完整参数:
sitekey: 6Lej6N4hAAAAANgkiQRXxLrlue_J_y035Dm6UhPk
subdomain: account.rita.ai:443
version: kUYUkUlSyqkjTSMaN2w3RaOh
language: zh-CN
策略: 依次尝试多个任务类型,直到成功。确保每种类型都有足够的 API 调用次数。
"""
client_key = CFG.get("yescaptcha_client_key", "")
if not client_key:
raise Exception("yescaptcha_client_key 未配置")
website_key = CFG.get("yescaptcha_website_key", "6Lej6N4hAAAAANgkiQRXxLrlue_J_y035Dm6UhPk")
website_url = CFG.get("yescaptcha_website_url", "https://account.rita.ai")
# 优先使用配置的 task_type,否则依次尝试 _RECAPTCHA_TASK_TYPES
task_types_to_try = [
CFG.get("yescaptcha_task_type", _RECAPTCHA_TASK_TYPES[0])
] + [t for t in _RECAPTCHA_TASK_TYPES if t != CFG.get("yescaptcha_task_type", "")]
# 去重
seen = set()
task_types_to_try = [x for x in task_types_to_try if not (x in seen or seen.add(x))]
last_error = None
for task_type in task_types_to_try:
log(f"[YesCaptcha] 尝试任务类型: {task_type}", tag)
try:
token = _solve_one_type(
client_key, website_key, website_url, task_type, tag
)
return token
except Exception as e:
last_error = e
log(f"[YesCaptcha] {task_type} 失败: {e}", tag)
raise Exception(f"All YesCaptcha task types failed. Last error: {last_error}")
def _solve_one_type(
client_key: str, website_key: str, website_url: str,
task_type: str, tag: str
) -> str:
"""用指定任务类型执行一次完整的 YesCaptcha 验证流程."""
# 根据任务类型构造不同的 task payload
task_payload = {
"websiteURL": website_url,
"websiteKey": website_key,
"type": task_type,
}
# reCAPTCHA Enterprise 需要额外参数 (from HAR)
if task_type == "RecaptchaV2EnterpriseTaskProxyless":
task_payload["enterprisePayload"] = {
"s": "ENTERPRISE", # 明确告知是 Enterprise 版
"co": "aHR0cHM6Ly9hY2NvdW50LnJpdGEuYWk6NDQz",
"hl": "zh-CN",
}
task_payload["apiDomain"] = "https://www.google.com/recaptcha/enterprise.js"
log(f"[YesCaptcha] 创建任务 (type={task_type})...", tag)
# Create task
create_payload = {
"clientKey": client_key,
"task": task_payload,
}
r = stdlib_requests.post(
"https://api.yescaptcha.com/createTask",
json=create_payload,
timeout=30,
)
result = r.json()
log(f"[YesCaptcha] createTask 响应: errorId={result.get('errorId', 0)} taskId={result.get('taskId', 'N/A')}", tag)
if result.get("errorId", 0) != 0:
raise Exception(
f"[{result.get('errorCode')}] {result.get('errorDescription', result)}"
)
task_id = result.get("taskId")
if not task_id:
raise Exception(f"无 taskId: {result}")
# Poll for result (间隔 1s; 最多 120s)
max_attempts = 120
for attempt in range(max_attempts):
time.sleep(1)
try:
r = stdlib_requests.post(
"https://api.yescaptcha.com/getTaskResult",
json={"clientKey": client_key, "taskId": task_id},
timeout=15,
)
result = r.json()
if result.get("errorId", 0) != 0:
raise Exception(
f"[{result.get('errorCode')}] {result.get('errorDescription', result)}"
)
status = result.get("status", "")
if status == "ready":
solution = result.get("solution", {})
# YesCaptcha 返回字段兼容: gRecaptchaResponse / g-recaptcha-response / token
token = (
solution.get("gRecaptchaResponse") or
solution.get("g-recaptcha-response") or
solution.get("token", "")
)
if token:
log(f"[YesCaptcha] OK ({len(token)} chars, {attempt + 1}s)", tag)
return token
else:
log(f"[YesCaptcha] ready 但无 token, keys={list(solution.keys())}", tag)
raise Exception(f"ready 但无 token: {result}")
if attempt % 10 == 0:
log(f"[YesCaptcha] 等待... status={status} ({attempt + 1}s)", tag)
except stdlib_requests.RequestException as e:
if attempt % 10 == 0:
log(f"[YesCaptcha] 网络错误: {e} ({attempt + 1}s)", tag)
raise Exception(f"YesCaptcha {task_type} 超时 ({max_attempts}s)")
# ===================== Rita Registration =====================
class RitaRegistration:
"""
Rita.ai 注册流程 (基于 HAR 抓包还原):
1. sign_process(email+agree) → 提交邮箱,返回 need_captcha=1
2. solve_recaptcha → 过 reCAPTCHA Enterprise
3. sign_process(captcha) → 提交 captcha 完成邮箱验证
4. emailCode(captcha) → 首次发送 OTP (需要 captcha token)
5. wait OTP → 从临时邮箱获取验证码
6. emailCode(resend) → 重发 OTP (无需 captcha,会话已验证)
7. code_sign → 提交 OTP 完成注册
8. authenticate → 获取 session token
9. silent_edit → 设置密码
"""
MAX_RESEND_ATTEMPTS = 2 # 最多重发验证码次数
MAX_CAPTCHA_ATTEMPTS = 4 # 最多 captcha 提交次数 (首次 + 3次重试)
def __init__(self, tag: str = ""):
self.tag = tag
self.api = CFG["rita_account_api"].rstrip("/")
self.origin = CFG["rita_origin"]
self.redirect_uri = CFG["rita_redirect_uri"]
self.language = CFG["rita_language"]
# 随机浏览器指纹
self.impersonate, self.ua, self.fp_headers = _random_fingerprint()
self.session = curl_requests.Session(impersonate=self.impersonate)
proxy = CFG.get("proxy", "")
if proxy:
self.session.proxies = {"http": proxy, "https": proxy}
# State
self.token: str = ""
self.visitorid: str = ""
def _log(self, msg):
log(msg, self.tag)
def _post(self, path: str, payload: dict) -> dict:
hdrs = {
"Content-Type": "application/json",
"Accept": "application/json, text/plain, */*",
"Origin": self.origin,
"Referer": f"{self.origin}/",
**self.fp_headers,
}
if self.token:
hdrs["token"] = self.token
if self.visitorid:
hdrs["visitorid"] = self.visitorid
r = self.session.post(
f"{self.api}{path}", json=payload, headers=hdrs,
timeout=30, impersonate=self.impersonate,
)
try:
return r.json()
except Exception:
return {"_raw": r.text[:500], "_status": r.status_code}
def _update_auth(self, resp: dict):
"""从响应中提取 token / visitorid"""
if not isinstance(resp, dict):
return
data = resp.get("data", resp)
if not isinstance(data, dict):
return
for key in ("token", "access_token", "session_token"):
t = data.get(key, "")
if t and isinstance(t, str) and len(t) > 8:
self.token = t
self._log(f" → token: {t[:8]}***{t[-4:]}")
break
for key in ("visitorid", "visitor_id"):
v = data.get(key, "")
if v and isinstance(v, str) and len(v) > 8:
self.visitorid = v
self._log(f" → visitorid: {v[:8]}***")
break
def _delay(self, lo=0.3, hi=0.8):
time.sleep(random.uniform(lo, hi))
# ---- API steps ----
def step_sign_process(self, email: str, captcha_token: str = "") -> dict:
"""
POST /authorize/sign_process
- 首次调用 (无 captcha): 提交邮箱 + agree,触发 need_captcha=1
- 带 captcha 调用: 完成验证,建立会话
"""
payload = {
"redirect_uri": self.redirect_uri,
"language": self.language,
"email": email,
"agree": 1,
}
if captcha_token:
payload["g-recaptcha-response"] = captcha_token
resp = self._post("/authorize/sign_process", payload)
self._update_auth(resp)
return resp
def step_email_code(self, email: str, captcha_token: str = "") -> dict:
"""
POST /authorize/emailCode — 发送 / 重发邮箱验证码
- 首次: 需要 g-recaptcha-response
- 重发: 无需 captcha (会话已通过验证)
"""
payload = {
"email": email,
"language": self.language,
"redirect_uri": self.redirect_uri,
}
if captcha_token:
payload["g-recaptcha-response"] = captcha_token
resp = self._post("/authorize/emailCode", payload)
self._update_auth(resp)
return resp
def step_code_sign(self, email: str, code: str) -> dict:
"""POST /authorize/code_sign — 提交 OTP 完成注册"""
resp = self._post("/authorize/code_sign", {
"email": email,
"code": code,
"redirect_uri": self.redirect_uri,
"language": self.language,
"agreeTC": 1,
})
self._update_auth(resp)
return resp
def step_authenticate(self) -> dict:
"""POST /authorize/authenticate — 获取 session token"""
resp = self._post("/authorize/authenticate", {"redirect_uri": self.redirect_uri})
self._update_auth(resp)
return resp
def step_set_password(self, password: str) -> dict:
"""POST /user/silent_edit — 设置密码"""
resp = self._post("/user/silent_edit", {
"password": password,
"language": self.language,
})
self._update_auth(resp)
return resp
def _wait_otp(self, mail_provider, mail_ctx: dict, timeout: int = 90) -> str | None:
"""等待邮箱验证码"""
return mail_provider.wait_for_code(mail_ctx, timeout=timeout)
# ---- Full flow with OTP retry ----
def register(self, email: str, password: str, mail_provider, mail_ctx: dict) -> dict:
result = {"ok": False, "token": "", "error": ""}
captcha_token: str = ""
try:
# === Phase 1: 邮箱验证 + reCAPTCHA ===
# 1. 提交邮箱 (无 captcha → 返回 need_captcha=1)
self._log("Step 1: sign_process (email + agree)")
resp1 = self.step_sign_process(email)
self._log(f" → code={resp1.get('code', '?')} need_captcha={resp1.get('data', {}).get('need_captcha', 0)}")
# 人性化等待: Rita 风控检测连续快速操作
time.sleep(random.uniform(2.5, 4.5))
# 2. 过 reCAPTCHA (最多尝试 MAX_CAPTCHA_ATTEMPTS 次)
for cap_attempt in range(1, self.MAX_CAPTCHA_ATTEMPTS + 1):
if cap_attempt > 1:
self._log(f" [WARN] captcha 提交失败 (第{cap_attempt-1}次),重新求解...")
# 失败后等待更长时间再重试
time.sleep(random.uniform(5.0, 8.0))
self._log(f"Step 2b: solve reCAPTCHA (attempt {cap_attempt})")
else:
self._log("Step 2: solve reCAPTCHA")
captcha_token = solve_recaptcha(self.tag)
# captcha 解出后,等待短暂时间再提交(模拟人类操作延迟)
time.sleep(random.uniform(1.5, 3.5))
# 3. 提交 sign_process + captcha
if cap_attempt == 1:
self._log("Step 3: sign_process (with captcha)")
else:
self._log(f"Step 3b: sign_process (with captcha, attempt {cap_attempt})")
resp3 = self.step_sign_process(email, captcha_token)
resp3_code = resp3.get("code", -1)
resp3_type = resp3.get("type", "")
self._log(f" → code={resp3_code} type={resp3_type} msg={resp3.get('message', '')}")
if resp3_code == 0 and resp3_type == "success":
# captcha 验证成功!
self._log(" → captcha 验证成功!")
break
elif cap_attempt >= self.MAX_CAPTCHA_ATTEMPTS:
result["error"] = f"captcha 验证失败 ({self.MAX_CAPTCHA_ATTEMPTS} 次尝试): code={resp3_code} {resp3.get('message', '')}"
return result
# else: continue to next captcha attempt
self._delay(0.5, 1.5)
# === Phase 2: 发送 OTP + 等待验证码 ===
# 4. 通过 emailCode 发送验证码 (使用最新验证通过的 captcha_token)
self._log("Step 4: emailCode (send OTP)")
resp4 = self.step_email_code(email, captcha_token)
resp4_code = resp4.get("code", -1)
self._log(f" → code={resp4_code} type={resp4.get('type', '?')}")
# emailCode 即使返回错误也可能已发送了 OTP(HAR 中成功流程也返回非0 code)
if resp4_code != 0 and resp4.get("type") != "success":
self._log(f" [WARN] emailCode 返回异常,继续等待 OTP...")
self._delay(1.0, 2.5)
# 5. 等待 OTP (带重发机制)
otp_code = None
for attempt in range(1 + self.MAX_RESEND_ATTEMPTS):
if attempt == 0:
self._log(f"Step 5: 等待验证码邮件 (最多 90s)...")
else:
# 重发: 直接调用 emailCode,无需 captcha (会话已验证)
self._log(f" [WARN] 第 {attempt} 次重发验证码...")
resp_resend = self.step_email_code(email)
resp_r_code = resp_resend.get("code", -1)
self._log(f" → resend code={resp_r_code}")
# 等待一段时间让邮件到达
time.sleep(random.uniform(2.0, 4.0))
otp_code = self._wait_otp(mail_provider, mail_ctx, timeout=90)
if otp_code:
break
self._log(f" [WARN] 第 {attempt + 1} 次等待 OTP 超时")
if not otp_code:
result["error"] = f"验证码获取超时 (已重发 {self.MAX_RESEND_ATTEMPTS} 次)"
return result
self._log(f" [MAIL] OTP: {otp_code}")
self._delay()
# === Phase 3: 提交 OTP + 获取 Token ===
# 6. 提交 OTP
self._log(f"Step 6: code_sign (code={otp_code})")
resp6 = self.step_code_sign(email, otp_code)
resp6_code = resp6.get("code", -1)
self._log(f" → code={resp6_code} type={resp6.get('type', '?')}")
# OTP 验证失败?尝试重发一次再验证
if resp6_code != 0 and resp6.get("type") != "success":
self._log(f" [WARN] OTP 验证失败,尝试重发...")
time.sleep(random.uniform(3.0, 5.0))
self.step_email_code(email)
time.sleep(random.uniform(3.0, 5.0))
# 再等一次 OTP
otp_code2 = self._wait_otp(mail_provider, mail_ctx, timeout=60)
if otp_code2 and otp_code2 != otp_code:
self._log(f" [MAIL] 新 OTP: {otp_code2},重新提交...")
resp6 = self.step_code_sign(email, otp_code2)
resp6_code = resp6.get("code", -1)
self._log(f" → retry code={resp6_code} type={resp6.get('type', '?')}")
if resp6_code != 0 and resp6.get("type") != "success":
result["error"] = f"OTP 验证失败: {resp6.get('message', resp6)}"
return result
self._delay()
# 7. 获取 session token
self._log("Step 7: authenticate (get token)")
resp7 = self.step_authenticate()
self._log(f" → code={resp7.get('code', '?')}")
self._delay()
# 8. 设置密码
self._log("Step 8: set password")
resp8 = self.step_set_password(password)
self._log(f" → code={resp8.get('code', '?')}")
if not self.token:
result["error"] = "流程完成但未获取 token"
return result
result["ok"] = True
result["token"] = self.token
return result
except Exception as e:
result["error"] = str(e)
self._log(f" [FAIL] 异常: {e}")
traceback.print_exc()
return result
# ===================== Upload to rita2api =====================
def upload_to_rita2api(accounts: list[dict]) -> bool:
url = CFG.get("upload_api_url", "")
if not url or not accounts:
return False
try:
r = stdlib_requests.post(
url, json={"accounts": accounts},
headers={"Content-Type": "application/json"}, timeout=15,
)
if r.status_code in (200, 201):
data = r.json()
log(f"[OK] 已上传 {data.get('added', len(accounts))} 个账号到 rita2api")
return True
log(f"[FAIL] 上传失败: {r.status_code} {r.text[:200]}")
return False
except Exception as e:
log(f"[FAIL] 上传异常: {e}")
return False
# ===================== Single Task =====================
def _register_one(idx: int, total: int) -> tuple[bool, dict | None, str]:
tag = f"{idx}/{total}"
# Random fingerprint for this task
impersonate, ua, fp_hdrs = _random_fingerprint()
mail = get_mail_provider(impersonate)
reg = RitaRegistration(tag=tag)
password = CFG.get("password", "@qazwsx123456")
try:
log(f"[MAIL] 创建临时邮箱...", tag)
email, mail_ctx = mail.create_email()
short = email.split("@")[0][:12]
reg.tag = short
log(f"[MAIL] {email}", short)
log(f"[ROCKET] 开始注册", short)
result = reg.register(email, password, mail, mail_ctx)
if result["ok"]:
account = {
"token": result["token"],
"name": f"auto-{short}",
}
with _file_lock:
with open(CFG["output_file"], "a", encoding="utf-8") as f:
f.write(f"{email}----{password}----token={result['token']}\n")
log(f"[OK] 注册成功!", short)
return True, account, ""
else:
log(f"[FAIL] 失败: {result['error']}", short)
return False, None, result["error"]
except Exception as e:
log(f"[FAIL] 异常: {e}", tag)
traceback.print_exc()
return False, None, str(e)
# ===================== Batch =====================
def run_batch():
total = CFG.get("total_accounts", 3)
max_workers = min(CFG.get("max_workers", 2), total)
upload_enabled = CFG.get("upload_enabled", True)
proxy = CFG.get("proxy", "")
print("\n" + "=" * 60)
print(" Rita.ai 批量自动注册工具")
print(f" 数量: {total} | 并发: {max_workers}")
print(f" 邮箱: {CFG.get('mail_provider', 'gptmail')}")
print(f" 代理: {proxy or '无'}{' (邮箱也走代理)' if proxy and CFG.get('mail_use_proxy') else ''}")
print(f" 验证码: YesCaptcha ({'[OK]' if CFG.get('yescaptcha_client_key') else '[MISSING]'})")
print(f" 上传: {'[OK] ' + CFG.get('upload_api_url', '') if upload_enabled else '[SKIP]'}")
print("=" * 60 + "\n")
if not CFG.get("yescaptcha_client_key"):
print("[WARN] 警告: yescaptcha_client_key 未配置,注册将失败!")
input("按 Enter 继续...")
success_count = 0
fail_count = 0
registered: list[dict] = []
start = time.time()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(_register_one, i, total): i for i in range(1, total + 1)}
for future in as_completed(futures):
try:
ok, account, err = future.result()
if ok and account:
success_count += 1
registered.append(account)
else:
fail_count += 1
except Exception as e:
fail_count += 1
log(f"线程异常: {e}")
elapsed = time.time() - start
print("\n" + "=" * 60)
print(f" 完成! 耗时 {elapsed:.1f}s")
print(f" 成功: {success_count} | 失败: {fail_count} | 总数: {total}")
if success_count:
print(f" 输出: {CFG['output_file']}")
print("=" * 60)
if upload_enabled and registered:
print(f"\n[UPLOAD] 上传 {len(registered)} 个账号到 rita2api...")
upload_to_rita2api(registered)
return success_count, fail_count
# ===================== Main =====================
def main():
print("=" * 60)
print(" Rita.ai 批量自动注册工具")
print("=" * 60)
count_input = input(f"\n注册数量 (默认 {CFG['total_accounts']}): ").strip()
if count_input.isdigit() and int(count_input) > 0:
CFG["total_accounts"] = int(count_input)
workers_input = input(f"并发数 (默认 {CFG['max_workers']}): ").strip()
if workers_input.isdigit() and int(workers_input) > 0:
CFG["max_workers"] = int(workers_input)
if not CFG.get("proxy"):
proxy_input = input("代理地址 (留空=不使用): ").strip()
if proxy_input:
CFG["proxy"] = proxy_input
run_batch()
if __name__ == "__main__":
main()