chatgpt2api / services /codex_register_service.py
tx1538's picture
Upload 179 files
9d7ddb9 verified
Raw
History Blame
4.38 kB
from __future__ import annotations
import time
from services.codex_cpa_service import build_codex_upload_file
from services.cpa_push_service import upload_auth_file
from services.cpa_service import cpa_config, list_remote_files
from services import hero_sms_country_reputation as country_reputation
from services.register import openai_register
from services.register.openai_register import PlatformRegistrar
CPA_UPLOAD_VERIFY_ATTEMPTS = 3
CPA_UPLOAD_VERIFY_DELAY_SECONDS = 1.0
def _first_cpa_pool() -> dict:
pools = cpa_config.list_pools()
if not pools:
raise RuntimeError("未配置 CPA 号池,无法上传 Codex auth file")
return pools[0]
def _codex_tokens(result: dict) -> dict:
return {
"email": str(result.get("email") or "").strip(),
"access_token": str(result.get("access_token") or "").strip(),
"refresh_token": str(result.get("refresh_token") or "").strip(),
"id_token": str(result.get("id_token") or "").strip(),
}
def _hero_sms_meta(result: dict) -> dict:
hero_sms = result.get("hero_sms") if isinstance(result.get("hero_sms"), dict) else {}
return hero_sms if isinstance(hero_sms, dict) else {}
def _record_cpa_fail(result: dict, reason: str) -> None:
hero_sms = _hero_sms_meta(result)
if not hero_sms:
return
country_reputation.store.record_event(
hero_sms.get("country"),
"cpa_fail",
price=hero_sms.get("price"),
reason=reason,
)
def _verify_uploaded_file(pool: dict, filename: str) -> dict:
expected = str(filename or "").strip()
if not expected:
raise RuntimeError("CPA 上传验证失败: filename 为空")
last_files: list[dict] = []
attempts = max(1, int(CPA_UPLOAD_VERIFY_ATTEMPTS or 1))
delay = max(0.0, float(CPA_UPLOAD_VERIFY_DELAY_SECONDS or 0))
for attempt in range(1, attempts + 1):
last_files = list_remote_files(pool)
for item in last_files:
if str(item.get("name") or "").strip() == expected:
return {"verified": True, "attempt": attempt, "remote_count": len(last_files)}
if attempt < attempts and delay:
time.sleep(delay)
preview = ", ".join(str(item.get("name") or "").strip() for item in last_files[:8] if item.get("name"))
raise RuntimeError(
"CPA 上传后未在远端列表中确认: "
f"pool={pool.get('id') or '-'}, file={expected}, remote_count={len(last_files)}, preview={preview or '-'}"
)
def run_codex_registration(index: int, *, pool: dict | None = None) -> dict:
pool = pool or _first_cpa_pool()
start = time.time()
registrar = PlatformRegistrar(openai_register.config.get("proxy") or "")
try:
openai_register.step(index, "Codex CPA 注册任务启动")
result = registrar.register(index, profile=openai_register.codex_oauth_profile)
openai_register._record_mail_success(result)
filename, body = build_codex_upload_file(_codex_tokens(result))
try:
upload_auth_file(pool, filename, body)
verify_result = _verify_uploaded_file(pool, filename)
except Exception as exc:
_record_cpa_fail(result, f"codex_cpa_upload_failed:{openai_register.redact_sensitive_text(exc)}")
openai_register.step(index, f"Codex CPA 上传/确认失败: {openai_register.redact_sensitive_text(exc)}", "red")
raise
hero_sms = _hero_sms_meta(result)
if hero_sms:
country_reputation.store.record_event(
hero_sms.get("country"),
"cpa_success",
price=hero_sms.get("price"),
reason="codex_cpa_uploaded",
)
cost = time.time() - start
openai_register.step(index, f"Codex CPA 上传完成: pool={pool.get('id')}, file={filename}, 耗时{cost:.1f}s", "green")
return {
"ok": True,
"index": index,
"result": result,
"cpa": {
"pool_id": str(pool.get("id") or "").strip(),
"filename": filename,
"verified": bool(verify_result.get("verified")),
"verify_attempt": int(verify_result.get("attempt") or 0),
"remote_count": int(verify_result.get("remote_count") or 0),
},
}
finally:
registrar.close()