Spaces:
Sleeping
Sleeping
| """验证 OAuth 账号的自动刷新是否生效。 | |
| 用法(在容器内,工作目录 /app): | |
| uv run python scripts/verify_oauth_refresh.py # 只读诊断,安全 | |
| uv run python scripts/verify_oauth_refresh.py --force # 真正触发一次刷新 | |
| 只读模式:列出每个账号的 access_token 剩余有效期、是否带 refresh_token、 | |
| 上次刷新时间与上次刷新错误,判断"有没有料可刷"。 | |
| --force :对每个带 refresh_token 的账号强制走一次 refresh_access_token(force=True), | |
| 直接验证 refresh_token + 本项目 client_id 能否从 OpenAI 换出新 access_token。 | |
| 这会真实轮换 access_token(新 token 有效,不损坏账号)。 | |
| """ | |
| from __future__ import annotations | |
| import sys | |
| from services.account_service import account_service | |
| def _fmt_remaining(seconds: int | None) -> str: | |
| if seconds is None: | |
| return "无法解析 exp" | |
| if seconds <= 0: | |
| return f"已过期 {(-seconds) // 3600}h" | |
| return f"{seconds // 3600}h{(seconds % 3600) // 60}m 后过期" | |
| def diagnose() -> list[str]: | |
| """只读:打印每个账号的刷新就绪状态,返回带 refresh_token 的 token 列表。""" | |
| tokens = account_service.list_tokens() | |
| print(f"账号总数: {len(tokens)}\n") | |
| refreshable: list[str] = [] | |
| for token in tokens: | |
| account = account_service.get_account(token) or {} | |
| remaining = account_service._token_expires_in(token) | |
| has_rt = bool(str(account.get("refresh_token") or "").strip()) | |
| if has_rt: | |
| refreshable.append(token) | |
| print(f"- email={account.get('email') or '(未知)'}") | |
| print(f" access_token[:20] = {token[:20]}...") | |
| print(f" 距过期 = {_fmt_remaining(remaining)}") | |
| print(f" refresh_token = {'有 ✅' if has_rt else '无 ❌(无法自动刷新)'}") | |
| print(f" last_token_refresh_at = {account.get('last_token_refresh_at')}") | |
| print(f" last_token_refresh_error = {account.get('last_token_refresh_error')}") | |
| print() | |
| return refreshable | |
| def force_refresh(tokens: list[str]) -> None: | |
| """对每个账号 force 刷新一次,并对比前后状态判断成败。""" | |
| if not tokens: | |
| print("没有带 refresh_token 的账号,无法验证刷新。") | |
| return | |
| print("=" * 60) | |
| print(f"开始对 {len(tokens)} 个账号 force 刷新(真实调用 OpenAI)...\n") | |
| ok = 0 | |
| for token in tokens: | |
| before = account_service.get_account(token) or {} | |
| new_token = account_service.refresh_access_token(token, force=True, event="manual_verify") | |
| after = account_service.get_account(new_token) or {} | |
| err = str(after.get("last_token_refresh_error") or "").strip() | |
| rotated = new_token != token | |
| success = bool(new_token) and not err | |
| if success: | |
| ok += 1 | |
| print(f"- email={before.get('email') or '(未知)'}") | |
| print(f" 旧 access_token[:20] = {token[:20]}...") | |
| print(f" 新 access_token[:20] = {new_token[:20]}...") | |
| print(f" token 是否轮换 = {'是' if rotated else '否(exp 未到刷新窗口时可能返回原值)'}") | |
| print(f" last_token_refresh_at = {after.get('last_token_refresh_at')}") | |
| print(f" last_token_refresh_error = {after.get('last_token_refresh_error') or '无'}") | |
| print(f" >>> 刷新结果 = {'成功 ✅' if success else '失败 ❌'}") | |
| print() | |
| print("=" * 60) | |
| print(f"汇总: {ok}/{len(tokens)} 个账号刷新成功") | |
| if ok == len(tokens): | |
| print("✅ 自动刷新机制对这些账号完全可用——refresh_token 与 client_id 匹配。") | |
| else: | |
| print("❌ 有账号刷新失败,看上面的 last_token_refresh_error,或 docker logs 里的 [oauth-login]/refresh 日志。") | |
| def main() -> None: | |
| do_force = "--force" in sys.argv[1:] | |
| refreshable = diagnose() | |
| if do_force: | |
| force_refresh(refreshable) | |
| else: | |
| print("提示: 加 --force 参数可真正触发一次刷新以验证能否成功。") | |
| if __name__ == "__main__": | |
| main() | |