chatgpt2api / scripts /verify_oauth_refresh.py
peijun1's picture
clean deploy
dc03b36
Raw
History Blame Contribute Delete
4.21 kB
"""验证 OAuth 账号的自动刷新是否生效。
用法(在容器内,工作目录 /app):
uv run python scripts/verify_oauth_refresh.py # 只读诊断,安全
uv run python scripts/verify_oauth_refresh.py --force # 真正触发一次刷新
只读模式:列出每个账号的 access_token 剩余有效期、是否带 refresh_token、
上次刷新时间与上次刷新错误,判断"有没有料可刷"。
--force :对每个带 refresh_token 的账号强制走一次 refresh_access_token(force=True),
直接验证 refresh_token + 本项目 client_id 能否从 OpenAI 换出新 access_token。
这会真实轮换 access_token(新 token 有效,不损坏账号)。
"""
from __future__ import annotations
import sys
from services.account_service import account_service
def _fmt_remaining(seconds: int | None) -> str:
if seconds is None:
return "无法解析 exp"
if seconds <= 0:
return f"已过期 {(-seconds) // 3600}h"
return f"{seconds // 3600}h{(seconds % 3600) // 60}m 后过期"
def diagnose() -> list[str]:
"""只读:打印每个账号的刷新就绪状态,返回带 refresh_token 的 token 列表。"""
tokens = account_service.list_tokens()
print(f"账号总数: {len(tokens)}\n")
refreshable: list[str] = []
for token in tokens:
account = account_service.get_account(token) or {}
remaining = account_service._token_expires_in(token)
has_rt = bool(str(account.get("refresh_token") or "").strip())
if has_rt:
refreshable.append(token)
print(f"- email={account.get('email') or '(未知)'}")
print(f" access_token[:20] = {token[:20]}...")
print(f" 距过期 = {_fmt_remaining(remaining)}")
print(f" refresh_token = {'有 ✅' if has_rt else '无 ❌(无法自动刷新)'}")
print(f" last_token_refresh_at = {account.get('last_token_refresh_at')}")
print(f" last_token_refresh_error = {account.get('last_token_refresh_error')}")
print()
return refreshable
def force_refresh(tokens: list[str]) -> None:
"""对每个账号 force 刷新一次,并对比前后状态判断成败。"""
if not tokens:
print("没有带 refresh_token 的账号,无法验证刷新。")
return
print("=" * 60)
print(f"开始对 {len(tokens)} 个账号 force 刷新(真实调用 OpenAI)...\n")
ok = 0
for token in tokens:
before = account_service.get_account(token) or {}
new_token = account_service.refresh_access_token(token, force=True, event="manual_verify")
after = account_service.get_account(new_token) or {}
err = str(after.get("last_token_refresh_error") or "").strip()
rotated = new_token != token
success = bool(new_token) and not err
if success:
ok += 1
print(f"- email={before.get('email') or '(未知)'}")
print(f" 旧 access_token[:20] = {token[:20]}...")
print(f" 新 access_token[:20] = {new_token[:20]}...")
print(f" token 是否轮换 = {'是' if rotated else '否(exp 未到刷新窗口时可能返回原值)'}")
print(f" last_token_refresh_at = {after.get('last_token_refresh_at')}")
print(f" last_token_refresh_error = {after.get('last_token_refresh_error') or '无'}")
print(f" >>> 刷新结果 = {'成功 ✅' if success else '失败 ❌'}")
print()
print("=" * 60)
print(f"汇总: {ok}/{len(tokens)} 个账号刷新成功")
if ok == len(tokens):
print("✅ 自动刷新机制对这些账号完全可用——refresh_token 与 client_id 匹配。")
else:
print("❌ 有账号刷新失败,看上面的 last_token_refresh_error,或 docker logs 里的 [oauth-login]/refresh 日志。")
def main() -> None:
do_force = "--force" in sys.argv[1:]
refreshable = diagnose()
if do_force:
force_refresh(refreshable)
else:
print("提示: 加 --force 参数可真正触发一次刷新以验证能否成功。")
if __name__ == "__main__":
main()