rita / auto_register.py
Spooker's picture
Upload 2943 files
c6393bd verified
"""
auto_register.py — Rita.ai 自动注册 & Token 获取模块
完整流程:
1. 使用 GPTMail 创建临时邮箱
2. 通过 accountapi.gosplit.net 注册 Rita.ai 账号
3. 使用 YesCaptcha 解决 reCAPTCHA Enterprise
4. 等待邮箱验证码并提交
5. 获取 token 并自动添加到 AccountManager
依赖: pip install requests curl_cffi
外部服务: YesCaptcha (reCAPTCHA), GPTMail (临时邮箱)
"""
import json
import os
import re
import time
import random
import string
import requests
import threading
from pathlib import Path
# curl_cffi for browser TLS fingerprint (anti-bot)
try:
from curl_cffi import requests as curl_requests
_HAS_CURL_CFFI = True
except ImportError:
_HAS_CURL_CFFI = False
# ===================== Configuration =====================
_DISABLE_SSL_VERIFY = os.getenv("DISABLE_SSL_VERIFY", "0") == "1"
# YesCaptcha config
YESCAPTCHA_KEY = os.getenv("YESCAPTCHA_KEY", "")
YESCAPTCHA_API = "https://api.yescaptcha.com"
# GPTMail config
GPTMAIL_API_KEY = os.getenv("GPTMAIL_API_KEY", "")
GPTMAIL_API_BASE = os.getenv("GPTMAIL_API_BASE", "https://mail.chatgpt.org.uk")
# YYDS Mail config
YYDSMAIL_API_KEY = os.getenv("YYDSMAIL_API_KEY", "")
YYDSMAIL_API_BASE = os.getenv("YYDSMAIL_API_BASE", "https://maliapi.215.im/v1")
# Rita.ai reCAPTCHA v2 sitekey (from account.rita.ai JS bundle)
RECAPTCHA_SITEKEY = "6Lej6N4hAAAAANgkiQRXxLrlue_J_y035Dm6UhPk"
RECAPTCHA_URL = "https://account.rita.ai"
# Gosplit auth API
GOSPLIT_API = "https://accountapi.gosplit.net"
# Auto-register settings
AUTO_REGISTER_ENABLED = os.getenv("AUTO_REGISTER_ENABLED", "0") == "1"
AUTO_REGISTER_MIN_ACTIVE = int(os.getenv("AUTO_REGISTER_MIN_ACTIVE", "2"))
AUTO_REGISTER_BATCH = int(os.getenv("AUTO_REGISTER_BATCH", "1"))
AUTO_REGISTER_PASSWORD = os.getenv("AUTO_REGISTER_PASSWORD", "@qazwsx123456")
# ===================== Logging =====================
_log_fn = None
def _log(msg, level="INFO"):
if _log_fn:
_log_fn(msg, level)
else:
print(f"[AutoRegister] {msg}")
# ===================== Live Config (DB > env) =====================
def _get_live_config() -> dict:
"""Read config from database first, fallback to module-level env vars."""
try:
from database import get_db
db = get_db()
return {
"YESCAPTCHA_KEY": db.get_config("YESCAPTCHA_KEY") or YESCAPTCHA_KEY,
"GPTMAIL_API_KEY": db.get_config("GPTMAIL_API_KEY") or GPTMAIL_API_KEY,
"GPTMAIL_API_BASE": db.get_config("GPTMAIL_API_BASE") or GPTMAIL_API_BASE,
"YYDSMAIL_API_KEY": db.get_config("YYDSMAIL_API_KEY") or YYDSMAIL_API_KEY,
"YYDSMAIL_API_BASE": db.get_config("YYDSMAIL_API_BASE") or YYDSMAIL_API_BASE,
"AUTO_REGISTER_ENABLED": db.get_config("AUTO_REGISTER_ENABLED", "0") == "1" or AUTO_REGISTER_ENABLED,
"AUTO_REGISTER_MIN_ACTIVE": int(db.get_config("AUTO_REGISTER_MIN_ACTIVE") or AUTO_REGISTER_MIN_ACTIVE),
"AUTO_REGISTER_BATCH": int(db.get_config("AUTO_REGISTER_BATCH") or AUTO_REGISTER_BATCH),
"AUTO_REGISTER_PASSWORD": db.get_config("AUTO_REGISTER_PASSWORD") or AUTO_REGISTER_PASSWORD,
"DISABLE_SSL_VERIFY": db.get_config("DISABLE_SSL_VERIFY", "0") == "1" or _DISABLE_SSL_VERIFY,
}
except Exception:
return {
"YESCAPTCHA_KEY": YESCAPTCHA_KEY,
"GPTMAIL_API_KEY": GPTMAIL_API_KEY,
"GPTMAIL_API_BASE": GPTMAIL_API_BASE,
"YYDSMAIL_API_KEY": YYDSMAIL_API_KEY,
"YYDSMAIL_API_BASE": YYDSMAIL_API_BASE,
"AUTO_REGISTER_ENABLED": AUTO_REGISTER_ENABLED,
"AUTO_REGISTER_MIN_ACTIVE": AUTO_REGISTER_MIN_ACTIVE,
"AUTO_REGISTER_BATCH": AUTO_REGISTER_BATCH,
"AUTO_REGISTER_PASSWORD": AUTO_REGISTER_PASSWORD,
"DISABLE_SSL_VERIFY": _DISABLE_SSL_VERIFY,
}
# ===================== GPTMail — Temporary Email =====================
def _gptmail_headers(api_key=""):
h = {"Accept": "application/json"}
key = api_key or _get_live_config()["GPTMAIL_API_KEY"]
if key:
h["X-API-Key"] = key
return h
def create_temp_email() -> tuple[str, None]:
"""Create a temporary email via GPTMail. Returns email address."""
cfg = _get_live_config()
resp = requests.post(
f"{cfg['GPTMAIL_API_BASE']}/api/generate-email",
headers=_gptmail_headers(cfg["GPTMAIL_API_KEY"]),
json={},
timeout=15,
verify=not cfg["DISABLE_SSL_VERIFY"],
)
resp.raise_for_status()
data = resp.json()
if not data.get("success"):
raise Exception(f"GPTMail create failed: {data}")
email = data["data"]["email"]
return email
def _gptmail_get_detail(api_base, headers, mail_id, ssl_verify):
"""Fetch a single email's detail from GPTMail. Returns parsed dict or None."""
try:
r = requests.get(
f"{api_base}/api/email/{mail_id}",
headers=headers, timeout=15, verify=ssl_verify,
)
return r.json() if r.status_code == 200 else None
except Exception:
return None
def wait_for_verification_code(email: str, timeout: int = 120) -> str | None:
"""Poll GPTMail for verification code.
Optimized: extract from subject first (fast path), then from content/html.
"""
cfg = _get_live_config()
hdrs = _gptmail_headers(cfg["GPTMAIL_API_KEY"])
api_base = cfg["GPTMAIL_API_BASE"]
ssl_verify = not cfg["DISABLE_SSL_VERIFY"]
start = time.time()
while time.time() - start < timeout:
try:
resp = requests.get(
f"{api_base}/api/emails",
params={"email": email},
headers=hdrs, timeout=15, verify=ssl_verify,
)
if resp.status_code == 200:
data = resp.json()
emails_list = data.get("data", {}).get("emails", [])
if not emails_list:
raw = data.get("data")
emails_list = raw if isinstance(raw, list) else []
for mail in emails_list:
# Fast path: try subject first (no extra request)
code = _extract_code(mail.get("subject", ""))
if code:
return code
# Slow path: fetch detail
mail_id = mail.get("id")
if not mail_id:
continue
detail = _gptmail_get_detail(api_base, hdrs, mail_id, ssl_verify)
if detail:
d = detail.get("data", {})
content = d.get("content", "") or ""
html = d.get("html_content", "") or ""
code = _extract_code(content) or _extract_code(html)
if code:
return code
except Exception as e:
_log(f"📧 Mail poll error: {e}", "DEBUG")
elapsed = int(time.time() - start)
_log(f"📧 Waiting for verification code... ({elapsed}s/{timeout}s)", "DEBUG")
time.sleep(OTP_POLL_INTERVAL)
return None
def _extract_code(content) -> str | None:
"""Extract 4-6 digit verification code from email content.
Handles str and list content. Multi-pattern matching.
"""
if not content:
return None
# Handle list content (some mail APIs return lists)
if isinstance(content, list):
content = " ".join(str(c) for c in content)
if not isinstance(content, str):
content = str(content)
patterns = [
r"(?:verification|verify|code|验证码|验证|代码)[\s::]*(\d{4,6})",
r">\s*(\d{4,6})\s*<",
r"\b(\d{4,6})\b",
]
for pat in patterns:
matches = re.findall(pat, content, re.IGNORECASE)
for code in matches:
if len(code) >= 4 and not code.startswith("0"):
return code
return None
# ===================== YesCaptcha — reCAPTCHA Enterprise Solver =====================
# 任务类型优先级:Enterprise 优先,标准 V2 兜底
_RECAPTCHA_TASK_TYPES = [
"RecaptchaV2EnterpriseTaskProxyless", # 企业版 (Rita.ai 实际使用)
"NoCaptchaTaskProxyless", # 标准 V2 兜底
]
def _solve_one_type(yescaptcha_key: str, task_type: str, ssl_verify: bool) -> str:
"""用指定任务类型执行一次完整的 YesCaptcha 验证流程."""
task_payload = {
"type": task_type,
"websiteURL": RECAPTCHA_URL,
"websiteKey": RECAPTCHA_SITEKEY,
}
# reCAPTCHA Enterprise 需要额外参数 (from HAR 抓包)
if task_type == "RecaptchaV2EnterpriseTaskProxyless":
task_payload["enterprisePayload"] = {
"s": "ENTERPRISE",
"co": "aHR0cHM6Ly9hY2NvdW50LnJpdGEuYWk6NDQz",
"hl": "zh-CN",
}
task_payload["apiDomain"] = "https://www.google.com/recaptcha/enterprise.js"
# Create task
create_resp = requests.post(
f"{YESCAPTCHA_API}/createTask",
json={"clientKey": yescaptcha_key, "task": task_payload},
timeout=30,
verify=ssl_verify,
)
create_resp.raise_for_status()
create_data = create_resp.json()
if create_data.get("errorId", 0) != 0:
raise Exception(f"[{create_data.get('errorCode')}] {create_data.get('errorDescription', create_data)}")
task_id = create_data.get("taskId")
if not task_id:
raise Exception(f"YesCaptcha create failed: {create_data}")
_log(f"🔐 reCAPTCHA task created: {task_id} (type={task_type})", "DEBUG")
# Poll for result (max 120s, interval 1s)
for attempt in range(120):
time.sleep(1)
try:
result_resp = requests.post(
f"{YESCAPTCHA_API}/getTaskResult",
json={"clientKey": yescaptcha_key, "taskId": task_id},
timeout=15,
verify=ssl_verify,
)
result_resp.raise_for_status()
result_data = result_resp.json()
if result_data.get("errorId", 0) != 0:
raise Exception(f"[{result_data.get('errorCode')}] {result_data.get('errorDescription', result_data)}")
status = result_data.get("status", "")
if status == "ready":
solution = result_data.get("solution", {})
token = (
solution.get("gRecaptchaResponse")
or solution.get("g-recaptcha-response")
or solution.get("token", "")
)
if token:
_log(f"🔐 reCAPTCHA solved! ({len(token)} chars, {attempt + 1}s)", "DEBUG")
return token
raise Exception(f"ready but no token: {result_data}")
if status == "failed":
raise Exception(f"YesCaptcha failed: {result_data}")
if attempt % 10 == 0:
_log(f"🔐 Waiting... status={status} ({attempt + 1}s)", "DEBUG")
except requests.RequestException as e:
if attempt % 10 == 0:
_log(f"🔐 Network error: {e} ({attempt + 1}s)", "DEBUG")
raise Exception(f"YesCaptcha timeout (120s) for {task_type}")
def solve_recaptcha() -> str | None:
"""Solve reCAPTCHA Enterprise via YesCaptcha.
Tries Enterprise task type first, falls back to standard V2.
Returns g-recaptcha-response token.
"""
cfg = _get_live_config()
yescaptcha_key = cfg["YESCAPTCHA_KEY"]
ssl_verify = not cfg["DISABLE_SSL_VERIFY"]
if not yescaptcha_key:
raise Exception("YESCAPTCHA_KEY not configured")
last_error = None
for task_type in _RECAPTCHA_TASK_TYPES:
_log(f"🔐 Trying task type: {task_type}", "DEBUG")
try:
return _solve_one_type(yescaptcha_key, task_type, ssl_verify)
except Exception as e:
last_error = e
_log(f"🔐 {task_type} failed: {e}", "WARNING")
raise Exception(f"All YesCaptcha task types failed. Last error: {last_error}")
# ===================== Browser Fingerprint (anti-bot) =====================
_CHROME_IMPERSONATE = "chrome120"
_CHROME_UA = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.6099.{patch} Safari/537.36"
)
_SEC_CH_UA = '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"'
_ACCEPT_LANGUAGES = [
"zh-CN,zh;q=0.9,en;q=0.8",
"zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
"en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
]
def _random_browser_headers() -> dict:
"""Generate Chrome-like browser headers to pass Rita anti-bot checks."""
patch = random.randint(109, 234)
ua = _CHROME_UA.format(patch=patch)
return {
"User-Agent": ua,
"Accept-Language": random.choice(_ACCEPT_LANGUAGES),
"sec-ch-ua": _SEC_CH_UA,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
}
def _create_rita_session(ssl_verify: bool = True):
"""Create an HTTP session with browser TLS fingerprint.
Uses curl_cffi for Chrome TLS fingerprint if available, falls back to requests.
"""
if _HAS_CURL_CFFI:
session = curl_requests.Session(impersonate=_CHROME_IMPERSONATE)
# curl_cffi doesn't use verify= in the same way, set it via env if needed
return session, _CHROME_IMPERSONATE
else:
_log("⚠️ curl_cffi not available, using plain requests (may be detected as bot)", "WARNING")
session = requests.Session()
session.verify = ssl_verify
return session, None
# ===================== Rita.ai Registration Flow =====================
def _gosplit_headers():
return {
"Content-Type": "application/json",
"Accept": "application/json, text/plain, */*",
"Origin": "https://account.rita.ai",
"Referer": "https://account.rita.ai/",
**_random_browser_headers(),
}
def _update_session_headers(headers: dict, resp_data: dict):
"""Extract token/visitorid from API response and update headers.
Rita API requires these headers to maintain session state across steps.
"""
if not isinstance(resp_data, dict):
return
data = resp_data.get("data", resp_data)
if not isinstance(data, dict):
return
for key in ("token", "access_token", "session_token"):
t = data.get(key, "")
if t and isinstance(t, str) and len(t) > 8:
headers["token"] = t
_log(f" -> session token: {t[:8]}***{t[-4:]}", "DEBUG")
break
for key in ("visitorid", "visitor_id"):
v = data.get(key, "")
if v and isinstance(v, str) and len(v) > 8:
headers["visitorid"] = v
_log(f" -> visitorid: {v[:8]}***", "DEBUG")
break
# ===================== Registration Constants =====================
MAX_CAPTCHA_ATTEMPTS = 4 # captcha 提交最多重试次数
MAX_RESEND_ATTEMPTS = 2 # OTP 邮件最多重发次数
OTP_WAIT_TIMEOUT = 90 # 每轮 OTP 等待超时(秒)
OTP_POLL_INTERVAL = 3 # OTP 轮询间隔(秒)
def register_rita_account(email: str) -> dict:
"""
Full registration flow for Rita.ai.
Key: tracks session token/visitorid from each response and passes
them in subsequent request headers (required by Rita API).
Returns: {"token": "...", "email": "...", "ticket": "..."}
Raises: Exception on failure
"""
cfg = _get_live_config()
session, impersonate = _create_rita_session(ssl_verify=not cfg["DISABLE_SSL_VERIFY"])
headers = _gosplit_headers()
redirect_uri = "https://www.rita.ai/zh/ai-chat"
def _post(path, payload):
"""POST helper that auto-updates session headers from response."""
kwargs = {"headers": headers, "json": payload, "timeout": 30}
if impersonate:
kwargs["impersonate"] = impersonate
r = session.post(f"{GOSPLIT_API}{path}", **kwargs)
try:
resp = r.json()
except Exception:
resp = {"_raw": r.text[:500], "_status": r.status_code}
_update_session_headers(headers, resp)
return resp, r
# ---- Step 1: authenticate (init session) ----
_log("Step 1/6: authenticate (init)", "DEBUG")
resp, _ = _post("/authorize/authenticate", {"redirect_uri": redirect_uri})
# ---- Step 2: sign_process (email + agree) ----
_log(f"Step 2/6: sign_process (email={email})", "DEBUG")
resp, _ = _post("/authorize/sign_process", {
"redirect_uri": redirect_uri, "language": "zh",
"email": email, "agree": 1,
})
_log(f" code={resp.get('code', '?')} need_captcha={resp.get('data', {}).get('need_captcha', 0)}", "DEBUG")
# Human-like delay before captcha
time.sleep(random.uniform(2.0, 4.0))
# ---- Step 3: solve reCAPTCHA (with retry) ----
captcha_token = None
if resp.get("data", {}).get("need_captcha"):
for cap_attempt in range(1, MAX_CAPTCHA_ATTEMPTS + 1):
if cap_attempt > 1:
_log(f" captcha submit failed, retrying ({cap_attempt}/{MAX_CAPTCHA_ATTEMPTS})...", "WARNING")
time.sleep(random.uniform(5.0, 8.0))
_log(f"Step 3/6: solving reCAPTCHA (attempt {cap_attempt})...", "INFO")
try:
captcha_token = solve_recaptcha()
except Exception as e:
_log(f" reCAPTCHA solve error: {e}", "WARNING")
if cap_attempt >= MAX_CAPTCHA_ATTEMPTS:
raise Exception(f"reCAPTCHA failed after {MAX_CAPTCHA_ATTEMPTS} attempts: {e}")
continue
if not captcha_token:
if cap_attempt >= MAX_CAPTCHA_ATTEMPTS:
raise Exception(f"reCAPTCHA failed after {MAX_CAPTCHA_ATTEMPTS} attempts")
continue
# Submit captcha with human-like delay
time.sleep(random.uniform(1.5, 3.5))
resp, _ = _post("/authorize/sign_process", {
"redirect_uri": redirect_uri, "language": "zh",
"email": email, "agree": 1,
"g-recaptcha-response": captcha_token,
})
resp_code = resp.get("code", -1)
resp_type = resp.get("type", "")
_log(f" code={resp_code} type={resp_type}", "DEBUG")
if resp_code == 0 and resp_type == "success":
_log(" captcha verified!", "DEBUG")
break
elif cap_attempt >= MAX_CAPTCHA_ATTEMPTS:
raise Exception(f"captcha verification failed after {MAX_CAPTCHA_ATTEMPTS} attempts: {resp}")
else:
_log("Step 3/6: no captcha needed, skipped", "DEBUG")
time.sleep(random.uniform(0.5, 1.5))
# ---- Step 4: trigger emailCode (explicit send) ----
_log("Step 4/6: emailCode (send OTP)...", "INFO")
ec_payload = {"email": email, "redirect_uri": redirect_uri, "language": "zh"}
if captcha_token:
ec_payload["g-recaptcha-response"] = captcha_token
resp, _ = _post("/authorize/emailCode", ec_payload)
ec_code = resp.get("code", -1)
_log(f" code={ec_code} type={resp.get('type', '?')}", "DEBUG")
# emailCode may return non-0 but OTP was still sent
if ec_code != 0 and resp.get("type") != "success":
_log(" emailCode returned non-success, but OTP may still have been sent", "WARNING")
time.sleep(random.uniform(1.0, 2.5))
# ---- Step 5: wait for OTP with resend support ----
_log("Step 5/6: waiting for verification code...", "INFO")
otp_code = None
for attempt in range(1 + MAX_RESEND_ATTEMPTS):
if attempt > 0:
_log(f" Resending verification email ({attempt}/{MAX_RESEND_ATTEMPTS})...", "INFO")
# Resend: no captcha needed (session already verified)
resp, _ = _post("/authorize/emailCode", {
"email": email, "redirect_uri": redirect_uri, "language": "zh",
})
_log(f" resend code={resp.get('code', '?')}", "DEBUG")
time.sleep(random.uniform(2.0, 4.0))
otp_code = wait_for_verification_code(email, timeout=OTP_WAIT_TIMEOUT)
if otp_code:
_log(f" Got OTP: {otp_code}", "DEBUG")
break
_log(f" OTP wait timed out ({attempt + 1}/{1 + MAX_RESEND_ATTEMPTS})", "WARNING")
if not otp_code:
raise Exception(f"Verification code not received after {1 + MAX_RESEND_ATTEMPTS} attempts ({OTP_WAIT_TIMEOUT}s each)")
# ---- Submit OTP via code_sign ----
resp, r = _post("/authorize/code_sign", {
"email": email, "code": otp_code,
"redirect_uri": redirect_uri, "language": "zh", "agreeTC": 1,
})
# ---- OTP verification failure: resend and retry once ----
if resp.get("code") != 0 and resp.get("type") != "success":
_log(f" OTP verification failed (code={resp.get('code')}), resending...", "WARNING")
time.sleep(random.uniform(3.0, 5.0))
_post("/authorize/emailCode", {
"email": email, "redirect_uri": redirect_uri, "language": "zh",
})
time.sleep(random.uniform(3.0, 5.0))
new_code = wait_for_verification_code(email, timeout=OTP_WAIT_TIMEOUT)
if new_code and new_code != otp_code:
_log(f" New OTP: {new_code}, re-submitting...", "DEBUG")
resp, r = _post("/authorize/code_sign", {
"email": email, "code": new_code,
"redirect_uri": redirect_uri, "language": "zh", "agreeTC": 1,
})
if resp.get("code") != 0 and resp.get("type") != "success":
raise Exception(f"code_sign failed: {resp}")
# ---- Extract token ----
token = headers.get("token", "")
if not token:
data = resp.get("data", {})
token = data.get("token", "") if isinstance(data, dict) else ""
if not token:
token = session.cookies.get("token", "")
if not token:
for cookie in r.cookies:
if cookie.name == "token":
token = cookie.value
break
if not token:
raise Exception(f"No token in code_sign response: {resp}")
_log(f"Got token: {token[:8]}...", "SUCCESS")
# ---- Step 6: authenticate with token to get ticket ----
_log("Step 6/6: authenticate (get ticket)", "DEBUG")
headers["token"] = token
resp, _ = _post("/authorize/authenticate", {"redirect_uri": redirect_uri})
ticket = resp.get("data", {}).get("ticket", "") if isinstance(resp.get("data"), dict) else ""
# Set password silently
try:
_post("/user/silent_edit", {"password": cfg["AUTO_REGISTER_PASSWORD"], "language": "zh"})
except Exception:
pass
# ---- Step 7: activate account on api_v2 via ticket ----
# The gosplit registration creates the account in the auth system, but
# api_v2.rita.ai (the chat API) needs a separate activation via ticket.
# In browser flow, this happens when redirected to www.rita.ai/zh/ai-chat?ticket=xxx
if ticket:
_log("Step 7: activating account on api_v2 via ticket...", "DEBUG")
try:
activate_resp = requests.get(
f"https://www.rita.ai/zh/ai-chat",
params={"ticket": ticket},
headers={
**_random_browser_headers(),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Referer": "https://account.rita.ai/",
},
timeout=15,
allow_redirects=True,
verify=not cfg["DISABLE_SSL_VERIFY"],
)
_log(f" activate status={activate_resp.status_code}", "DEBUG")
except Exception as e:
_log(f" activate warning: {e}", "WARNING")
_log(f"Registration complete! token={token[:8]}..., ticket={ticket[:8] if ticket else 'N/A'}...", "SUCCESS")
return {"token": token, "email": email, "ticket": ticket}
# ===================== Orchestrator =====================
def auto_register_one(account_manager=None, upstream_url="", origin="") -> dict | None:
"""
Register one new Rita.ai account and optionally add it to AccountManager.
Returns: {"token": ..., "email": ..., "account_id": ...} or None on failure
"""
cfg = _get_live_config()
try:
# 1. Create temp email
_log("🔄 Creating temporary email...", "INFO")
email = create_temp_email()
_log(f"📧 Email: {email}", "INFO")
# 2. Register
_log("🔄 Starting registration...", "INFO")
result = register_rita_account(email)
token = result["token"]
# 3. Add to AccountManager
account_id = None
if account_manager:
name_part = email.split("@")[0]
acc = account_manager.add(
token=token, name=f"auto-{name_part}",
email=email, password=cfg["AUTO_REGISTER_PASSWORD"],
mail_provider="gptmail", mail_api_key=cfg["GPTMAIL_API_KEY"],
)
account_id = acc.id
_log(f"➕ Account added: {acc.name} ({acc.id})", "SUCCESS")
# Verify the new token works
if upstream_url and origin:
test = account_manager.test_account(acc.id, upstream_url, origin)
if test.get("ok"):
_log(f"✅ Token verified: {test.get('models', 0)} models available", "SUCCESS")
else:
_log(f"⚠️ Token test failed: {test}", "WARNING")
return {"token": token, "email": email, "account_id": account_id}
except Exception as e:
_log(f"❌ Auto-register failed: {e}", "ERROR")
return None
def auto_register_batch(count: int = 1, account_manager=None,
upstream_url="", origin="") -> list[dict]:
"""Register multiple accounts. Returns list of results."""
results = []
for i in range(count):
_log(f"📋 Registering account {i+1}/{count}...", "INFO")
result = auto_register_one(account_manager, upstream_url, origin)
if result:
results.append(result)
# Delay between registrations
if i < count - 1:
delay = random.uniform(5, 15)
_log(f"⏳ Waiting {delay:.0f}s before next registration...", "DEBUG")
time.sleep(delay)
return results
# ===================== Background Auto-Replenish =====================
_replenish_lock = threading.Lock()
def start_auto_replenish(account_manager, upstream_url: str, origin: str,
check_interval: int = 300, log_fn=None):
"""
Background thread: when active accounts drop below min_active,
automatically register new accounts to replenish the pool.
Now reads config from DB each loop iteration so changes take effect live.
"""
global _log_fn
if log_fn:
_log_fn = log_fn
# Check initial config from DB
cfg = _get_live_config()
if not cfg["AUTO_REGISTER_ENABLED"]:
_log("⏸ Auto-register disabled (set AUTO_REGISTER_ENABLED=1 to enable)", "INFO")
return
if not cfg["YESCAPTCHA_KEY"]:
_log("⚠️ Auto-register disabled: YESCAPTCHA_KEY not set", "WARNING")
return
if not cfg["GPTMAIL_API_KEY"] and not cfg["YYDSMAIL_API_KEY"]:
_log("⚠️ Auto-register disabled: no mail API key set", "WARNING")
return
def loop():
time.sleep(60)
_log(f"🔄 Auto-replenish started (interval={check_interval}s)", "INFO")
while True:
try:
# Re-read config each iteration so DB changes are picked up
live_cfg = _get_live_config()
min_active = live_cfg["AUTO_REGISTER_MIN_ACTIVE"]
batch_size = live_cfg["AUTO_REGISTER_BATCH"]
with _replenish_lock:
summary = account_manager.summary()
active = summary.get("active", 0)
if active < min_active:
need = min_active - active
to_create = min(need, batch_size)
_log(f"⚠️ Active accounts ({active}) below minimum ({min_active}), "
f"registering {to_create} new account(s)...", "WARNING")
auto_register_batch(to_create, account_manager, upstream_url, origin)
else:
_log(f"✅ Active accounts: {active} (min: {min_active})", "DEBUG")
except Exception as e:
_log(f"❌ Auto-replenish error: {e}", "ERROR")
time.sleep(check_interval)
t = threading.Thread(target=loop, daemon=True, name="auto-replenish")
t.start()
# ===================== Config Check =====================
def check_config() -> dict:
"""Check if auto-register dependencies are configured.
Reads from database for live config, falls back to module-level env vars.
"""
try:
from database import get_db
db = get_db()
yescaptcha_key = db.get_config("YESCAPTCHA_KEY") or YESCAPTCHA_KEY
gptmail_key = db.get_config("GPTMAIL_API_KEY") or GPTMAIL_API_KEY
gptmail_base = db.get_config("GPTMAIL_API_BASE") or GPTMAIL_API_BASE
yydsmail_key = db.get_config("YYDSMAIL_API_KEY") or YYDSMAIL_API_KEY
yydsmail_base = db.get_config("YYDSMAIL_API_BASE") or YYDSMAIL_API_BASE
auto_enabled = db.get_config("AUTO_REGISTER_ENABLED", "0") == "1" or AUTO_REGISTER_ENABLED
min_active = int(db.get_config("AUTO_REGISTER_MIN_ACTIVE") or AUTO_REGISTER_MIN_ACTIVE)
batch_size = int(db.get_config("AUTO_REGISTER_BATCH") or AUTO_REGISTER_BATCH)
except Exception:
# Fallback to env vars if DB is unavailable
yescaptcha_key = YESCAPTCHA_KEY
gptmail_key = GPTMAIL_API_KEY
gptmail_base = GPTMAIL_API_BASE
yydsmail_key = YYDSMAIL_API_KEY
yydsmail_base = YYDSMAIL_API_BASE
auto_enabled = AUTO_REGISTER_ENABLED
min_active = AUTO_REGISTER_MIN_ACTIVE
batch_size = AUTO_REGISTER_BATCH
return {
"auto_register_enabled": auto_enabled,
"yescaptcha_configured": bool(yescaptcha_key),
"gptmail_configured": bool(gptmail_key),
"gptmail_api_base": gptmail_base,
"yydsmail_configured": bool(yydsmail_key),
"yydsmail_api_base": yydsmail_base,
"recaptcha_sitekey": RECAPTCHA_SITEKEY,
"min_active_accounts": min_active,
"batch_size": batch_size,
"ready": bool(yescaptcha_key and (gptmail_key or yydsmail_key)),
}
# ===================== YYDS Mail Support =====================
def _yydsmail_create_email(api_key: str = "") -> tuple[str, str]:
"""Create a temporary email via YYDS Mail.
Returns: (email_address, session_token)
The session token is needed for subsequent message queries.
"""
cfg = _get_live_config()
key = api_key or cfg["YYDSMAIL_API_KEY"]
api_base = cfg["YYDSMAIL_API_BASE"]
ssl_verify = not cfg["DISABLE_SSL_VERIFY"]
if not key:
raise Exception("YYDSMAIL_API_KEY not configured")
headers = {"Accept": "application/json", "Content-Type": "application/json",
"X-API-Key": key}
# Fetch available domains
domains = []
try:
r = requests.get(f"{api_base}/domains", headers=headers,
timeout=15, verify=ssl_verify)
if r.status_code == 200:
raw = r.json()
data = raw if isinstance(raw, list) else raw.get("data", [])
domains = [d.get("domain") if isinstance(d, dict) else d
for d in data if (d.get("domain") if isinstance(d, dict) else d)]
except Exception as e:
_log(f"📧 YYDS Mail: fetch domains failed: {e}", "WARNING")
if not domains:
raise Exception("YYDS Mail: no domains available")
domain = random.choice(domains)
prefix = "".join(random.choices(string.ascii_lowercase + string.digits,
k=random.randint(8, 12)))
r = requests.post(
f"{api_base}/accounts",
headers=headers,
json={"address": prefix, "domain": domain},
timeout=15, verify=ssl_verify,
)
if r.status_code not in (200, 201):
raise Exception(f"YYDS Mail create failed: {r.status_code} {r.text[:200]}")
resp = r.json()
data = resp.get("data", resp) if isinstance(resp, dict) else resp
email = data.get("address", f"{prefix}@{domain}")
token = data.get("token", "")
if not token:
raise Exception("YYDS Mail: no token returned from account creation")
return email, token
def _yydsmail_wait_for_code(email: str, mail_api_key: str = "",
mail_token: str = "",
timeout: int = 120) -> str | None:
"""Poll YYDS Mail for verification code.
Uses the session token from _yydsmail_create_email (not the API key).
Falls back to api_key as Bearer if no mail_token.
"""
cfg = _get_live_config()
bearer = mail_token or mail_api_key or cfg["YYDSMAIL_API_KEY"]
if not bearer:
_log("⚠️ YYDS Mail: no token/key for message query", "WARNING")
return None
ssl_verify = not cfg["DISABLE_SSL_VERIFY"]
api_base = cfg["YYDSMAIL_API_BASE"]
headers = {"Accept": "application/json",
"Authorization": f"Bearer {bearer}"}
start = time.time()
while time.time() - start < timeout:
try:
resp = requests.get(
f"{api_base}/messages",
headers=headers, timeout=15, verify=ssl_verify,
)
if resp.status_code == 200:
raw = resp.json()
msgs = raw if isinstance(raw, list) else (
raw.get("data", {}).get("messages", [])
if isinstance(raw.get("data"), dict)
else raw.get("data", [])
)
for msg in (msgs or []):
msg_id = msg.get("id")
if not msg_id:
continue
# Fetch detail
try:
dr = requests.get(
f"{api_base}/messages/{msg_id}",
headers=headers, timeout=15, verify=ssl_verify,
)
if dr.status_code == 200:
detail = dr.json()
d = detail.get("data", detail) if isinstance(detail, dict) else detail
content = d.get("text", "") or d.get("html", "") or ""
code = _extract_code(content)
if code:
return code
except Exception:
pass
except Exception as e:
_log(f"📧 YYDS Mail poll error: {e}", "DEBUG")
elapsed = int(time.time() - start)
_log(f"📧 YYDS Mail waiting... ({elapsed}s/{timeout}s)", "DEBUG")
time.sleep(OTP_POLL_INTERVAL)
return None
def wait_for_code_by_provider(email: str, mail_provider: str = "",
mail_api_key: str = "",
timeout: int = 120) -> str | None:
"""Wait for verification code using the appropriate mail provider."""
provider = (mail_provider or "gptmail").lower().strip()
if provider == "yydsmail":
return _yydsmail_wait_for_code(email, mail_api_key=mail_api_key, timeout=timeout)
else:
# Default to GPTMail
return wait_for_verification_code(email, timeout)
# ===================== Token Refresh (Re-login existing account) =====================
def refresh_account_token(email: str, password: str = "",
mail_provider: str = "", mail_api_key: str = "") -> dict:
"""
Re-login an existing Rita.ai account to get a fresh token.
Optimized: explicit emailCode, resend on timeout, captcha retry.
Returns: {"token": "...", "email": "...", "ticket": "..."}
"""
cfg = _get_live_config()
session, impersonate = _create_rita_session(ssl_verify=not cfg["DISABLE_SSL_VERIFY"])
headers = _gosplit_headers()
redirect_uri = "https://www.rita.ai/zh/ai-chat"
def _session_post(url, **kwargs):
"""POST with optional impersonate for curl_cffi."""
if impersonate:
kwargs["impersonate"] = impersonate
return session.post(url, **kwargs)
# Step 1: authenticate (init)
_log(f"🔄 Refresh: authenticate (email={email})", "DEBUG")
r = _session_post(
f"{GOSPLIT_API}/authorize/authenticate",
headers=headers,
json={"redirect_uri": redirect_uri},
timeout=30,
)
r.raise_for_status()
# Step 2: sign_process (email + agree)
_log("🔄 Refresh: sign_process", "DEBUG")
r = _session_post(
f"{GOSPLIT_API}/authorize/sign_process",
headers=headers,
json={"redirect_uri": redirect_uri, "language": "zh",
"email": email, "agree": 1},
timeout=30,
)
r.raise_for_status()
data = r.json()
# Step 3: handle captcha if needed (with retry)
captcha_token = None
if data.get("data", {}).get("need_captcha"):
for cap_attempt in range(1, MAX_CAPTCHA_ATTEMPTS + 1):
if cap_attempt > 1:
time.sleep(random.uniform(3.0, 5.0))
_log(f"🔄 Refresh: solving reCAPTCHA (attempt {cap_attempt})...", "INFO")
try:
captcha_token = solve_recaptcha()
except Exception as e:
if cap_attempt >= MAX_CAPTCHA_ATTEMPTS:
raise Exception(f"reCAPTCHA failed for refresh: {e}")
continue
time.sleep(random.uniform(1.0, 2.0))
r = _session_post(
f"{GOSPLIT_API}/authorize/sign_process",
headers=headers,
json={"redirect_uri": redirect_uri, "language": "zh",
"email": email, "agree": 1,
"g-recaptcha-response": captcha_token},
timeout=30,
)
r.raise_for_status()
data = r.json()
if data.get("code") == 0 or data.get("type") == "success":
break
if cap_attempt >= MAX_CAPTCHA_ATTEMPTS:
raise Exception(f"Refresh captcha failed: {data}")
# Step 4: trigger emailCode
_log("🔄 Refresh: sending verification email...", "INFO")
try:
r = _session_post(
f"{GOSPLIT_API}/authorize/emailCode",
headers=headers,
json={"email": email, "redirect_uri": redirect_uri, "language": "zh",
**({"g-recaptcha-response": captcha_token} if captcha_token else {})},
timeout=30,
)
r.raise_for_status()
except Exception as e:
_log(f" ⚠️ emailCode failed: {e}, continuing...", "WARNING")
time.sleep(random.uniform(1.0, 2.0))
# Step 5: wait for OTP with resend
code = None
for attempt in range(1 + MAX_RESEND_ATTEMPTS):
if attempt > 0:
_log(f" 📧 Resending verification email ({attempt}/{MAX_RESEND_ATTEMPTS})...", "INFO")
try:
_session_post(
f"{GOSPLIT_API}/authorize/emailCode",
headers=headers,
json={"email": email, "redirect_uri": redirect_uri, "language": "zh"},
timeout=30,
)
except Exception:
pass
time.sleep(random.uniform(2.0, 3.0))
code = wait_for_code_by_provider(email, mail_provider, mail_api_key, timeout=OTP_WAIT_TIMEOUT)
if code:
break
_log(f" ⏰ OTP wait timed out ({attempt + 1}/{1 + MAX_RESEND_ATTEMPTS})", "WARNING")
if not code:
raise Exception(f"Failed to receive verification code for {email}")
_log(f"📧 Refresh: got code {code}", "DEBUG")
r = _session_post(
f"{GOSPLIT_API}/authorize/code_sign",
headers=headers,
json={"email": email, "code": code,
"redirect_uri": redirect_uri, "language": "zh", "agreeTC": 1},
timeout=30,
)
r.raise_for_status()
code_sign_data = r.json()
# OTP verification failure: resend once
if code_sign_data.get("code") != 0 and code_sign_data.get("type") != "success":
_log(" ⚠️ OTP verification failed, resending...", "WARNING")
time.sleep(random.uniform(2.0, 4.0))
try:
_session_post(
f"{GOSPLIT_API}/authorize/emailCode",
headers=headers,
json={"email": email, "redirect_uri": redirect_uri, "language": "zh"},
timeout=30,
)
except Exception:
pass
time.sleep(random.uniform(2.0, 3.0))
new_code = wait_for_code_by_provider(email, mail_provider, mail_api_key, timeout=OTP_WAIT_TIMEOUT)
if new_code and new_code != code:
r = _session_post(
f"{GOSPLIT_API}/authorize/code_sign",
headers=headers,
json={"email": email, "code": new_code,
"redirect_uri": redirect_uri, "language": "zh", "agreeTC": 1},
timeout=30,
)
r.raise_for_status()
code_sign_data = r.json()
if code_sign_data.get("code") != 0 and code_sign_data.get("type") != "success":
raise Exception(f"Refresh code_sign failed: {code_sign_data}")
# Extract token
token = code_sign_data.get("data", {}).get("token", "")
if not token:
try:
token = session.cookies.get("token", "")
except Exception:
pass
if not token:
for cookie in r.cookies:
if cookie.name == "token":
token = cookie.value
break
if not token:
raise Exception(f"No token in refresh response: {code_sign_data}")
# Step 6: authenticate with new token
_log("🔄 Refresh: authenticate (get ticket)", "DEBUG")
auth_headers = {**headers, "token": token}
r = _session_post(
f"{GOSPLIT_API}/authorize/authenticate",
headers=auth_headers,
json={"redirect_uri": redirect_uri},
timeout=30,
)
r.raise_for_status()
auth_data = r.json()
ticket = auth_data.get("data", {}).get("ticket", "")
_log(f"✅ Token refreshed! token={token[:8]}...", "SUCCESS")
return {"token": token, "email": email, "ticket": ticket}
# ===================== Ticket Re-login (use existing token) =====================
def relogin_for_ticket(token: str) -> dict:
"""
Re-authenticate with an existing token to get a fresh ticket.
This is the simplest flow — just call authenticate with the token header.
Returns: {"ticket": "..."} or raises Exception
"""
cfg = _get_live_config()
session, impersonate = _create_rita_session(ssl_verify=not cfg["DISABLE_SSL_VERIFY"])
headers = _gosplit_headers()
headers["token"] = token
def _session_post(url, **kwargs):
if impersonate:
kwargs["impersonate"] = impersonate
return session.post(url, **kwargs)
# Step 1: authorize/url
r = _session_post(
f"{GOSPLIT_API}/authorize/url",
headers=_gosplit_headers(),
json={"login_url": "https://account.rita.ai", "source_url": "https://www.rita.ai"},
timeout=15,
)
r.raise_for_status()
# Step 2: authenticate with token
r = _session_post(
f"{GOSPLIT_API}/authorize/authenticate",
headers=headers,
json={"redirect_uri": "https://www.rita.ai/zh/ai-chat"},
timeout=15,
)
r.raise_for_status()
data = r.json()
ticket = data.get("data", {}).get("ticket", "")
status = data.get("data", {}).get("status", False)
if ticket:
_log(f"🎫 Got ticket: {ticket[:8]}...", "SUCCESS")
return {"ticket": ticket}
elif status:
# status=true means token is valid but no ticket in response
return {"ticket": "", "message": "Token valid but no ticket returned"}
else:
raise Exception(f"Authentication failed: {data}")