Spaces:
Paused
Paused
| """ | |
| Token 刷新模块 | |
| 支持 Session Token 和 OAuth Refresh Token 两种刷新方式 | |
| """ | |
| import logging | |
| import json | |
| import time | |
| from typing import Optional, Dict, Any, Tuple | |
| from dataclasses import dataclass | |
| from datetime import datetime, timedelta | |
| from curl_cffi import requests as cffi_requests | |
| from ...config.settings import get_settings | |
| from ...database.session import get_db | |
| from ...database import crud | |
| from ...database.models import Account | |
| logger = logging.getLogger(__name__) | |
| class TokenRefreshResult: | |
| """Token 刷新结果""" | |
| success: bool | |
| access_token: str = "" | |
| refresh_token: str = "" | |
| expires_at: Optional[datetime] = None | |
| error_message: str = "" | |
| class TokenRefreshManager: | |
| """ | |
| Token 刷新管理器 | |
| 支持两种刷新方式: | |
| 1. Session Token 刷新(优先) | |
| 2. OAuth Refresh Token 刷新 | |
| """ | |
| # OpenAI OAuth 端点 | |
| SESSION_URL = "https://chatgpt.com/api/auth/session" | |
| TOKEN_URL = "https://auth.openai.com/oauth/token" | |
| def __init__(self, proxy_url: Optional[str] = None): | |
| """ | |
| 初始化 Token 刷新管理器 | |
| Args: | |
| proxy_url: 代理 URL | |
| """ | |
| self.proxy_url = proxy_url | |
| self.settings = get_settings() | |
| def _create_session(self) -> cffi_requests.Session: | |
| """创建 HTTP 会话""" | |
| session = cffi_requests.Session(impersonate="chrome120", proxy=self.proxy_url) | |
| return session | |
| def refresh_by_session_token(self, session_token: str) -> TokenRefreshResult: | |
| """ | |
| 使用 Session Token 刷新 | |
| Args: | |
| session_token: 会话令牌 | |
| Returns: | |
| TokenRefreshResult: 刷新结果 | |
| """ | |
| result = TokenRefreshResult(success=False) | |
| try: | |
| session = self._create_session() | |
| # 设置会话 Cookie | |
| session.cookies.set( | |
| "__Secure-next-auth.session-token", | |
| session_token, | |
| domain=".chatgpt.com", | |
| path="/" | |
| ) | |
| # 请求会话端点 | |
| response = session.get( | |
| self.SESSION_URL, | |
| headers={ | |
| "accept": "application/json", | |
| "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" | |
| }, | |
| timeout=30 | |
| ) | |
| if response.status_code != 200: | |
| result.error_message = f"Session token 刷新失败: HTTP {response.status_code}" | |
| logger.warning(result.error_message) | |
| return result | |
| data = response.json() | |
| # 提取 access_token | |
| access_token = data.get("accessToken") | |
| if not access_token: | |
| result.error_message = "Session token 刷新失败: 未找到 accessToken" | |
| logger.warning(result.error_message) | |
| return result | |
| # 提取过期时间 | |
| expires_at = None | |
| expires_str = data.get("expires") | |
| if expires_str: | |
| try: | |
| expires_at = datetime.fromisoformat(expires_str.replace("Z", "+00:00")) | |
| except: | |
| pass | |
| result.success = True | |
| result.access_token = access_token | |
| result.expires_at = expires_at | |
| logger.info(f"Session token 刷新成功,过期时间: {expires_at}") | |
| return result | |
| except Exception as e: | |
| result.error_message = f"Session token 刷新异常: {str(e)}" | |
| logger.error(result.error_message) | |
| return result | |
| def refresh_by_oauth_token( | |
| self, | |
| refresh_token: str, | |
| client_id: Optional[str] = None | |
| ) -> TokenRefreshResult: | |
| """ | |
| 使用 OAuth Refresh Token 刷新 | |
| Args: | |
| refresh_token: OAuth 刷新令牌 | |
| client_id: OAuth Client ID | |
| Returns: | |
| TokenRefreshResult: 刷新结果 | |
| """ | |
| result = TokenRefreshResult(success=False) | |
| try: | |
| session = self._create_session() | |
| # 使用配置的 client_id 或默认值 | |
| client_id = client_id or self.settings.openai_client_id | |
| # 构建请求体 | |
| token_data = { | |
| "client_id": client_id, | |
| "grant_type": "refresh_token", | |
| "refresh_token": refresh_token, | |
| "redirect_uri": self.settings.openai_redirect_uri | |
| } | |
| response = session.post( | |
| self.TOKEN_URL, | |
| headers={ | |
| "content-type": "application/x-www-form-urlencoded", | |
| "accept": "application/json" | |
| }, | |
| data=token_data, | |
| timeout=30 | |
| ) | |
| if response.status_code != 200: | |
| result.error_message = f"OAuth token 刷新失败: HTTP {response.status_code}" | |
| logger.warning(f"{result.error_message}, 响应: {response.text[:200]}") | |
| return result | |
| data = response.json() | |
| # 提取令牌 | |
| access_token = data.get("access_token") | |
| new_refresh_token = data.get("refresh_token", refresh_token) | |
| expires_in = data.get("expires_in", 3600) | |
| if not access_token: | |
| result.error_message = "OAuth token 刷新失败: 未找到 access_token" | |
| logger.warning(result.error_message) | |
| return result | |
| # 计算过期时间 | |
| expires_at = datetime.utcnow() + timedelta(seconds=expires_in) | |
| result.success = True | |
| result.access_token = access_token | |
| result.refresh_token = new_refresh_token | |
| result.expires_at = expires_at | |
| logger.info(f"OAuth token 刷新成功,过期时间: {expires_at}") | |
| return result | |
| except Exception as e: | |
| result.error_message = f"OAuth token 刷新异常: {str(e)}" | |
| logger.error(result.error_message) | |
| return result | |
| def refresh_account(self, account: Account) -> TokenRefreshResult: | |
| """ | |
| 刷新账号的 Token | |
| 优先级: | |
| 1. Session Token 刷新 | |
| 2. OAuth Refresh Token 刷新 | |
| Args: | |
| account: 账号对象 | |
| Returns: | |
| TokenRefreshResult: 刷新结果 | |
| """ | |
| # 优先尝试 Session Token | |
| if account.session_token: | |
| logger.info(f"尝试使用 Session Token 刷新账号 {account.email}") | |
| result = self.refresh_by_session_token(account.session_token) | |
| if result.success: | |
| return result | |
| logger.warning(f"Session Token 刷新失败,尝试 OAuth 刷新") | |
| # 尝试 OAuth Refresh Token | |
| if account.refresh_token: | |
| logger.info(f"尝试使用 OAuth Refresh Token 刷新账号 {account.email}") | |
| result = self.refresh_by_oauth_token( | |
| refresh_token=account.refresh_token, | |
| client_id=account.client_id | |
| ) | |
| return result | |
| # 无可用刷新方式 | |
| return TokenRefreshResult( | |
| success=False, | |
| error_message="账号没有可用的刷新方式(缺少 session_token 和 refresh_token)" | |
| ) | |
| def validate_token(self, access_token: str) -> Tuple[bool, Optional[str]]: | |
| """ | |
| 验证 Access Token 是否有效 | |
| Args: | |
| access_token: 访问令牌 | |
| Returns: | |
| Tuple[bool, Optional[str]]: (是否有效, 错误信息) | |
| """ | |
| try: | |
| session = self._create_session() | |
| # 调用 OpenAI API 验证 token | |
| response = session.get( | |
| "https://chatgpt.com/backend-api/me", | |
| headers={ | |
| "authorization": f"Bearer {access_token}", | |
| "accept": "application/json" | |
| }, | |
| timeout=30 | |
| ) | |
| if response.status_code == 200: | |
| return True, None | |
| elif response.status_code == 401: | |
| return False, "Token 无效或已过期" | |
| elif response.status_code == 403: | |
| return False, "账号可能被封禁" | |
| else: | |
| return False, f"验证失败: HTTP {response.status_code}" | |
| except Exception as e: | |
| return False, f"验证异常: {str(e)}" | |
| def refresh_account_token(account_id: int, proxy_url: Optional[str] = None) -> TokenRefreshResult: | |
| """ | |
| 刷新指定账号的 Token 并更新数据库 | |
| Args: | |
| account_id: 账号 ID | |
| proxy_url: 代理 URL | |
| Returns: | |
| TokenRefreshResult: 刷新结果 | |
| """ | |
| with get_db() as db: | |
| account = crud.get_account_by_id(db, account_id) | |
| if not account: | |
| return TokenRefreshResult(success=False, error_message="账号不存在") | |
| manager = TokenRefreshManager(proxy_url=proxy_url) | |
| result = manager.refresh_account(account) | |
| if result.success: | |
| # 更新数据库 | |
| update_data = { | |
| "access_token": result.access_token, | |
| "last_refresh": datetime.utcnow() | |
| } | |
| if result.refresh_token: | |
| update_data["refresh_token"] = result.refresh_token | |
| if result.expires_at: | |
| update_data["expires_at"] = result.expires_at | |
| crud.update_account(db, account_id, **update_data) | |
| return result | |
| def validate_account_token(account_id: int, proxy_url: Optional[str] = None) -> Tuple[bool, Optional[str]]: | |
| """ | |
| 验证指定账号的 Token 是否有效 | |
| Args: | |
| account_id: 账号 ID | |
| proxy_url: 代理 URL | |
| Returns: | |
| Tuple[bool, Optional[str]]: (是否有效, 错误信息) | |
| """ | |
| with get_db() as db: | |
| account = crud.get_account_by_id(db, account_id) | |
| if not account: | |
| return False, "账号不存在" | |
| if not account.access_token: | |
| return False, "账号没有 access_token" | |
| manager = TokenRefreshManager(proxy_url=proxy_url) | |
| return manager.validate_token(account.access_token) | |