google / src /services /token_manager.py
Admin
整合sora2api
22a3c56
raw
history blame
37.8 kB
"""Token management module"""
import jwt
import asyncio
import random
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
from curl_cffi.requests import AsyncSession
from faker import Faker
from ..core.database import Database
from ..core.models import Token, TokenStats
from ..core.config import config
from .proxy_manager import ProxyManager
class TokenManager:
"""Token lifecycle manager"""
def __init__(self, db: Database):
self.db = db
self._lock = asyncio.Lock()
self.proxy_manager = ProxyManager(db)
self.fake = Faker()
async def decode_jwt(self, token: str) -> dict:
"""Decode JWT token without verification"""
try:
decoded = jwt.decode(token, options={"verify_signature": False})
return decoded
except Exception as e:
raise ValueError(f"Invalid JWT token: {str(e)}")
def _generate_random_username(self) -> str:
"""Generate a random username using faker
Returns:
A random username string
"""
# 生成真实姓名
first_name = self.fake.first_name()
last_name = self.fake.last_name()
# 去除姓名中的空格和特殊字符,只保留字母
first_name_clean = ''.join(c for c in first_name if c.isalpha())
last_name_clean = ''.join(c for c in last_name if c.isalpha())
# 生成1-4位随机数字
random_digits = str(random.randint(1, 9999))
# 随机选择用户名格式
format_choice = random.choice([
f"{first_name_clean}{last_name_clean}{random_digits}",
f"{first_name_clean}.{last_name_clean}{random_digits}",
f"{first_name_clean}{random_digits}",
f"{last_name_clean}{random_digits}",
f"{first_name_clean[0]}{last_name_clean}{random_digits}",
f"{first_name_clean}{last_name_clean[0]}{random_digits}"
])
# 转换为小写
return format_choice.lower()
async def get_user_info(self, access_token: str) -> dict:
"""Get user info from Sora API"""
proxy_url = await self.proxy_manager.get_proxy_url()
async with AsyncSession() as session:
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
"Origin": "https://sora.chatgpt.com",
"Referer": "https://sora.chatgpt.com/"
}
kwargs = {
"headers": headers,
"timeout": 30,
"impersonate": "chrome" # 自动生成 User-Agent 和浏览器指纹
}
if proxy_url:
kwargs["proxy"] = proxy_url
response = await session.get(
f"{config.sora_base_url}/me",
**kwargs
)
if response.status_code != 200:
raise ValueError(f"Failed to get user info: {response.status_code}")
return response.json()
async def get_subscription_info(self, token: str) -> Dict[str, Any]:
"""Get subscription information from Sora API
Returns:
{
"plan_type": "chatgpt_team",
"plan_title": "ChatGPT Business",
"subscription_end": "2025-11-13T16:58:21Z"
}
"""
print(f"🔍 开始获取订阅信息...")
proxy_url = await self.proxy_manager.get_proxy_url()
headers = {
"Authorization": f"Bearer {token}"
}
async with AsyncSession() as session:
url = "https://sora.chatgpt.com/backend/billing/subscriptions"
print(f"📡 请求 URL: {url}")
print(f"🔑 使用 Token: {token[:30]}...")
kwargs = {
"headers": headers,
"timeout": 30,
"impersonate": "chrome" # 自动生成 User-Agent 和浏览器指纹
}
if proxy_url:
kwargs["proxy"] = proxy_url
print(f"🌐 使用代理: {proxy_url}")
response = await session.get(url, **kwargs)
print(f"📥 响应状态码: {response.status_code}")
if response.status_code == 200:
data = response.json()
print(f"📦 响应数据: {data}")
# 提取第一个订阅信息
if data.get("data") and len(data["data"]) > 0:
subscription = data["data"][0]
plan = subscription.get("plan", {})
result = {
"plan_type": plan.get("id", ""),
"plan_title": plan.get("title", ""),
"subscription_end": subscription.get("end_ts", "")
}
print(f"✅ 订阅信息提取成功: {result}")
return result
print(f"⚠️ 响应数据中没有订阅信息")
return {
"plan_type": "",
"plan_title": "",
"subscription_end": ""
}
else:
error_msg = f"Failed to get subscription info: {response.status_code}"
print(f"❌ {error_msg}")
print(f"📄 响应内容: {response.text[:500]}")
raise Exception(error_msg)
async def get_sora2_invite_code(self, access_token: str) -> dict:
"""Get Sora2 invite code"""
proxy_url = await self.proxy_manager.get_proxy_url()
print(f"🔍 开始获取Sora2邀请码...")
async with AsyncSession() as session:
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json"
}
kwargs = {
"headers": headers,
"timeout": 30,
"impersonate": "chrome" # 自动生成 User-Agent 和浏览器指纹
}
if proxy_url:
kwargs["proxy"] = proxy_url
print(f"🌐 使用代理: {proxy_url}")
response = await session.get(
"https://sora.chatgpt.com/backend/project_y/invite/mine",
**kwargs
)
print(f"📥 响应状态码: {response.status_code}")
if response.status_code == 200:
data = response.json()
print(f"✅ Sora2邀请码获取成功: {data}")
return {
"supported": True,
"invite_code": data.get("invite_code"),
"redeemed_count": data.get("redeemed_count", 0),
"total_count": data.get("total_count", 0)
}
else:
# Check if it's 401 unauthorized
try:
error_data = response.json()
if error_data.get("error", {}).get("message", "").startswith("401"):
print(f"⚠️ Token不支持Sora2")
return {
"supported": False,
"invite_code": None
}
except:
pass
print(f"❌ 获取Sora2邀请码失败: {response.status_code}")
print(f"📄 响应内容: {response.text[:500]}")
return {
"supported": False,
"invite_code": None
}
async def get_sora2_remaining_count(self, access_token: str) -> dict:
"""Get Sora2 remaining video count
Returns:
{
"remaining_count": 27,
"rate_limit_reached": false,
"access_resets_in_seconds": 46833
}
"""
proxy_url = await self.proxy_manager.get_proxy_url()
print(f"🔍 开始获取Sora2剩余次数...")
async with AsyncSession() as session:
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json"
}
kwargs = {
"headers": headers,
"timeout": 30,
"impersonate": "chrome" # 自动生成 User-Agent 和浏览器指纹
}
if proxy_url:
kwargs["proxy"] = proxy_url
print(f"🌐 使用代理: {proxy_url}")
response = await session.get(
"https://sora.chatgpt.com/backend/nf/check",
**kwargs
)
print(f"📥 响应状态码: {response.status_code}")
if response.status_code == 200:
data = response.json()
print(f"✅ Sora2剩余次数获取成功: {data}")
rate_limit_info = data.get("rate_limit_and_credit_balance", {})
return {
"success": True,
"remaining_count": rate_limit_info.get("estimated_num_videos_remaining", 0),
"rate_limit_reached": rate_limit_info.get("rate_limit_reached", False),
"access_resets_in_seconds": rate_limit_info.get("access_resets_in_seconds", 0)
}
else:
print(f"❌ 获取Sora2剩余次数失败: {response.status_code}")
print(f"📄 响应内容: {response.text[:500]}")
return {
"success": False,
"remaining_count": 0,
"error": f"Failed to get remaining count: {response.status_code}"
}
async def check_username_available(self, access_token: str, username: str) -> bool:
"""Check if username is available
Args:
access_token: Access token for authentication
username: Username to check
Returns:
True if username is available, False otherwise
"""
proxy_url = await self.proxy_manager.get_proxy_url()
print(f"🔍 检查用户名是否可用: {username}")
async with AsyncSession() as session:
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
kwargs = {
"headers": headers,
"json": {"username": username},
"timeout": 30,
"impersonate": "chrome"
}
if proxy_url:
kwargs["proxy"] = proxy_url
print(f"🌐 使用代理: {proxy_url}")
response = await session.post(
"https://sora.chatgpt.com/backend/project_y/profile/username/check",
**kwargs
)
print(f"📥 响应状态码: {response.status_code}")
if response.status_code == 200:
data = response.json()
available = data.get("available", False)
print(f"✅ 用户名检查结果: available={available}")
return available
else:
print(f"❌ 用户名检查失败: {response.status_code}")
print(f"📄 响应内容: {response.text[:500]}")
return False
async def set_username(self, access_token: str, username: str) -> dict:
"""Set username for the account
Args:
access_token: Access token for authentication
username: Username to set
Returns:
User profile information after setting username
"""
proxy_url = await self.proxy_manager.get_proxy_url()
print(f"🔍 开始设置用户名: {username}")
async with AsyncSession() as session:
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
kwargs = {
"headers": headers,
"json": {"username": username},
"timeout": 30,
"impersonate": "chrome"
}
if proxy_url:
kwargs["proxy"] = proxy_url
print(f"🌐 使用代理: {proxy_url}")
response = await session.post(
"https://sora.chatgpt.com/backend/project_y/profile/username/set",
**kwargs
)
print(f"📥 响应状态码: {response.status_code}")
if response.status_code == 200:
data = response.json()
print(f"✅ 用户名设置成功: {data.get('username')}")
return data
else:
print(f"❌ 用户名设置失败: {response.status_code}")
print(f"📄 响应内容: {response.text[:500]}")
raise Exception(f"Failed to set username: {response.status_code}")
async def activate_sora2_invite(self, access_token: str, invite_code: str) -> dict:
"""Activate Sora2 with invite code"""
import uuid
proxy_url = await self.proxy_manager.get_proxy_url()
print(f"🔍 开始激活Sora2邀请码: {invite_code}")
print(f"🔑 Access Token 前缀: {access_token[:50]}...")
async with AsyncSession() as session:
# 生成设备ID
device_id = str(uuid.uuid4())
# 只设置必要的头,让 impersonate 处理其他
headers = {
"authorization": f"Bearer {access_token}",
"cookie": f"oai-did={device_id}"
}
print(f"🆔 设备ID: {device_id}")
print(f"📦 请求体: {{'invite_code': '{invite_code}'}}")
kwargs = {
"headers": headers,
"json": {"invite_code": invite_code},
"timeout": 30,
"impersonate": "chrome120" # 使用 chrome120 让库自动处理 UA 等头
}
if proxy_url:
kwargs["proxy"] = proxy_url
print(f"🌐 使用代理: {proxy_url}")
response = await session.post(
"https://sora.chatgpt.com/backend/project_y/invite/accept",
**kwargs
)
print(f"📥 响应状态码: {response.status_code}")
if response.status_code == 200:
data = response.json()
print(f"✅ Sora2激活成功: {data}")
return {
"success": data.get("success", False),
"already_accepted": data.get("already_accepted", False)
}
else:
print(f"❌ Sora2激活失败: {response.status_code}")
print(f"📄 响应内容: {response.text[:500]}")
raise Exception(f"Failed to activate Sora2: {response.status_code}")
async def st_to_at(self, session_token: str) -> dict:
"""Convert Session Token to Access Token"""
proxy_url = await self.proxy_manager.get_proxy_url()
async with AsyncSession() as session:
headers = {
"Cookie": f"__Secure-next-auth.session-token={session_token}",
"Accept": "application/json",
"Origin": "https://sora.chatgpt.com",
"Referer": "https://sora.chatgpt.com/"
}
kwargs = {
"headers": headers,
"timeout": 30,
"impersonate": "chrome" # 自动生成 User-Agent 和浏览器指纹
}
if proxy_url:
kwargs["proxy"] = proxy_url
response = await session.get(
"https://sora.chatgpt.com/api/auth/session",
**kwargs
)
if response.status_code != 200:
raise ValueError(f"Failed to convert ST to AT: {response.status_code}")
data = response.json()
return {
"access_token": data.get("accessToken"),
"email": data.get("user", {}).get("email"),
"expires": data.get("expires")
}
async def rt_to_at(self, refresh_token: str) -> dict:
"""Convert Refresh Token to Access Token"""
proxy_url = await self.proxy_manager.get_proxy_url()
async with AsyncSession() as session:
headers = {
"Accept": "application/json",
"Content-Type": "application/json"
}
kwargs = {
"headers": headers,
"json": {
"client_id": "app_LlGpXReQgckcGGUo2JrYvtJK",
"grant_type": "refresh_token",
"redirect_uri": "com.openai.chat://auth0.openai.com/ios/com.openai.chat/callback",
"refresh_token": refresh_token
},
"timeout": 30,
"impersonate": "chrome" # 自动生成 User-Agent 和浏览器指纹
}
if proxy_url:
kwargs["proxy"] = proxy_url
response = await session.post(
"https://auth.openai.com/oauth/token",
**kwargs
)
if response.status_code != 200:
raise ValueError(f"Failed to convert RT to AT: {response.status_code} - {response.text}")
data = response.json()
return {
"access_token": data.get("access_token"),
"refresh_token": data.get("refresh_token"),
"expires_in": data.get("expires_in")
}
async def add_token(self, token_value: str,
st: Optional[str] = None,
rt: Optional[str] = None,
remark: Optional[str] = None,
update_if_exists: bool = False,
image_enabled: bool = True,
video_enabled: bool = True) -> Token:
"""Add a new Access Token to database
Args:
token_value: Access Token
st: Session Token (optional)
rt: Refresh Token (optional)
remark: Remark (optional)
update_if_exists: If True, update existing token instead of raising error
image_enabled: Enable image generation (default: True)
video_enabled: Enable video generation (default: True)
Returns:
Token object
Raises:
ValueError: If token already exists and update_if_exists is False
"""
# Check if token already exists
existing_token = await self.db.get_token_by_value(token_value)
if existing_token:
if not update_if_exists:
raise ValueError(f"Token 已存在(邮箱: {existing_token.email})。如需更新,请先删除旧 Token 或使用更新功能。")
# Update existing token
return await self.update_existing_token(existing_token.id, token_value, st, rt, remark)
# Decode JWT to get expiry time and email
decoded = await self.decode_jwt(token_value)
# Extract expiry time from JWT
expiry_time = datetime.fromtimestamp(decoded.get("exp", 0)) if "exp" in decoded else None
# Extract email from JWT (OpenAI JWT format)
jwt_email = None
if "https://api.openai.com/profile" in decoded:
jwt_email = decoded["https://api.openai.com/profile"].get("email")
# Get user info from Sora API
try:
user_info = await self.get_user_info(token_value)
email = user_info.get("email", jwt_email or "")
name = user_info.get("name") or ""
except Exception as e:
# If API call fails, use JWT data
email = jwt_email or ""
name = email.split("@")[0] if email else ""
# Get subscription info from Sora API
plan_type = None
plan_title = None
subscription_end = None
try:
sub_info = await self.get_subscription_info(token_value)
plan_type = sub_info.get("plan_type")
plan_title = sub_info.get("plan_title")
# Parse subscription end time
if sub_info.get("subscription_end"):
from dateutil import parser
subscription_end = parser.parse(sub_info["subscription_end"])
except Exception as e:
# If API call fails, subscription info will be None
print(f"Failed to get subscription info: {e}")
# Get Sora2 invite code
sora2_supported = None
sora2_invite_code = None
sora2_redeemed_count = 0
sora2_total_count = 0
sora2_remaining_count = 0
try:
sora2_info = await self.get_sora2_invite_code(token_value)
sora2_supported = sora2_info.get("supported", False)
sora2_invite_code = sora2_info.get("invite_code")
sora2_redeemed_count = sora2_info.get("redeemed_count", 0)
sora2_total_count = sora2_info.get("total_count", 0)
# If Sora2 is supported, get remaining count
if sora2_supported:
try:
remaining_info = await self.get_sora2_remaining_count(token_value)
if remaining_info.get("success"):
sora2_remaining_count = remaining_info.get("remaining_count", 0)
print(f"✅ Sora2剩余次数: {sora2_remaining_count}")
except Exception as e:
print(f"Failed to get Sora2 remaining count: {e}")
except Exception as e:
# If API call fails, Sora2 info will be None
print(f"Failed to get Sora2 info: {e}")
# Check and set username if needed
try:
# Get fresh user info to check username
user_info = await self.get_user_info(token_value)
username = user_info.get("username")
# If username is null, need to set one
if username is None:
print(f"⚠️ 检测到用户名为null,需要设置用户名")
# Generate random username
max_attempts = 5
for attempt in range(max_attempts):
generated_username = self._generate_random_username()
print(f"🔄 尝试用户名 ({attempt + 1}/{max_attempts}): {generated_username}")
# Check if username is available
if await self.check_username_available(token_value, generated_username):
# Set the username
try:
await self.set_username(token_value, generated_username)
print(f"✅ 用户名设置成功: {generated_username}")
break
except Exception as e:
print(f"❌ 用户名设置失败: {e}")
if attempt == max_attempts - 1:
print(f"⚠️ 达到最大尝试次数,跳过用户名设置")
else:
print(f"⚠️ 用户名 {generated_username} 已被占用,尝试下一个")
if attempt == max_attempts - 1:
print(f"⚠️ 达到最大尝试次数,跳过用户名设置")
else:
print(f"✅ 用户名已设置: {username}")
except Exception as e:
print(f"⚠️ 用户名检查/设置过程中出错: {e}")
# Create token object
token = Token(
token=token_value,
email=email,
name=name,
st=st,
rt=rt,
remark=remark,
expiry_time=expiry_time,
is_active=True,
plan_type=plan_type,
plan_title=plan_title,
subscription_end=subscription_end,
sora2_supported=sora2_supported,
sora2_invite_code=sora2_invite_code,
sora2_redeemed_count=sora2_redeemed_count,
sora2_total_count=sora2_total_count,
sora2_remaining_count=sora2_remaining_count,
image_enabled=image_enabled,
video_enabled=video_enabled
)
# Save to database
token_id = await self.db.add_token(token)
token.id = token_id
return token
async def update_existing_token(self, token_id: int, token_value: str,
st: Optional[str] = None,
rt: Optional[str] = None,
remark: Optional[str] = None) -> Token:
"""Update an existing token with new information"""
# Decode JWT to get expiry time
decoded = await self.decode_jwt(token_value)
expiry_time = datetime.fromtimestamp(decoded.get("exp", 0)) if "exp" in decoded else None
# Get user info from Sora API
jwt_email = None
if "https://api.openai.com/profile" in decoded:
jwt_email = decoded["https://api.openai.com/profile"].get("email")
try:
user_info = await self.get_user_info(token_value)
email = user_info.get("email", jwt_email or "")
name = user_info.get("name", "")
except Exception as e:
email = jwt_email or ""
name = email.split("@")[0] if email else ""
# Get subscription info from Sora API
plan_type = None
plan_title = None
subscription_end = None
try:
sub_info = await self.get_subscription_info(token_value)
plan_type = sub_info.get("plan_type")
plan_title = sub_info.get("plan_title")
if sub_info.get("subscription_end"):
from dateutil import parser
subscription_end = parser.parse(sub_info["subscription_end"])
except Exception as e:
print(f"Failed to get subscription info: {e}")
# Update token in database
await self.db.update_token(
token_id=token_id,
token=token_value,
st=st,
rt=rt,
remark=remark,
expiry_time=expiry_time,
plan_type=plan_type,
plan_title=plan_title,
subscription_end=subscription_end
)
# Get updated token
updated_token = await self.db.get_token(token_id)
return updated_token
async def delete_token(self, token_id: int):
"""Delete a token"""
await self.db.delete_token(token_id)
async def update_token(self, token_id: int,
token: Optional[str] = None,
st: Optional[str] = None,
rt: Optional[str] = None,
remark: Optional[str] = None,
image_enabled: Optional[bool] = None,
video_enabled: Optional[bool] = None):
"""Update token (AT, ST, RT, remark, image_enabled, video_enabled)"""
# If token (AT) is updated, decode JWT to get new expiry time
expiry_time = None
if token:
try:
decoded = await self.decode_jwt(token)
expiry_time = datetime.fromtimestamp(decoded.get("exp", 0)) if "exp" in decoded else None
except Exception:
pass # If JWT decode fails, keep expiry_time as None
await self.db.update_token(token_id, token=token, st=st, rt=rt, remark=remark, expiry_time=expiry_time,
image_enabled=image_enabled, video_enabled=video_enabled)
async def get_active_tokens(self) -> List[Token]:
"""Get all active tokens (not cooled down)"""
return await self.db.get_active_tokens()
async def get_all_tokens(self) -> List[Token]:
"""Get all tokens"""
return await self.db.get_all_tokens()
async def update_token_status(self, token_id: int, is_active: bool):
"""Update token active status"""
await self.db.update_token_status(token_id, is_active)
async def enable_token(self, token_id: int):
"""Enable a token and reset error count"""
await self.db.update_token_status(token_id, True)
# Reset error count when enabling (in token_stats table)
await self.db.reset_error_count(token_id)
async def disable_token(self, token_id: int):
"""Disable a token"""
await self.db.update_token_status(token_id, False)
async def test_token(self, token_id: int) -> dict:
"""Test if a token is valid by calling Sora API and refresh Sora2 info"""
# Get token from database
token_data = await self.db.get_token(token_id)
if not token_data:
return {"valid": False, "message": "Token not found"}
try:
# Try to get user info from Sora API
user_info = await self.get_user_info(token_data.token)
# Refresh Sora2 invite code and counts
sora2_info = await self.get_sora2_invite_code(token_data.token)
sora2_supported = sora2_info.get("supported", False)
sora2_invite_code = sora2_info.get("invite_code")
sora2_redeemed_count = sora2_info.get("redeemed_count", 0)
sora2_total_count = sora2_info.get("total_count", 0)
sora2_remaining_count = 0
# If Sora2 is supported, get remaining count
if sora2_supported:
try:
remaining_info = await self.get_sora2_remaining_count(token_data.token)
if remaining_info.get("success"):
sora2_remaining_count = remaining_info.get("remaining_count", 0)
except Exception as e:
print(f"Failed to get Sora2 remaining count: {e}")
# Update token Sora2 info in database
await self.db.update_token_sora2(
token_id,
supported=sora2_supported,
invite_code=sora2_invite_code,
redeemed_count=sora2_redeemed_count,
total_count=sora2_total_count,
remaining_count=sora2_remaining_count
)
return {
"valid": True,
"message": "Token is valid",
"email": user_info.get("email"),
"username": user_info.get("username"),
"sora2_supported": sora2_supported,
"sora2_invite_code": sora2_invite_code,
"sora2_redeemed_count": sora2_redeemed_count,
"sora2_total_count": sora2_total_count,
"sora2_remaining_count": sora2_remaining_count
}
except Exception as e:
return {
"valid": False,
"message": f"Token is invalid: {str(e)}"
}
async def record_usage(self, token_id: int, is_video: bool = False):
"""Record token usage"""
await self.db.update_token_usage(token_id)
if is_video:
await self.db.increment_video_count(token_id)
else:
await self.db.increment_image_count(token_id)
async def record_error(self, token_id: int):
"""Record token error"""
await self.db.increment_error_count(token_id)
# Check if should ban
stats = await self.db.get_token_stats(token_id)
admin_config = await self.db.get_admin_config()
if stats and stats.error_count >= admin_config.error_ban_threshold:
await self.db.update_token_status(token_id, False)
async def record_success(self, token_id: int, is_video: bool = False):
"""Record successful request (reset error count)"""
await self.db.reset_error_count(token_id)
# Update Sora2 remaining count after video generation
if is_video:
try:
token_data = await self.db.get_token(token_id)
if token_data and token_data.sora2_supported:
remaining_info = await self.get_sora2_remaining_count(token_data.token)
if remaining_info.get("success"):
remaining_count = remaining_info.get("remaining_count", 0)
await self.db.update_token_sora2_remaining(token_id, remaining_count)
print(f"✅ 更新Token {token_id} 的Sora2剩余次数: {remaining_count}")
# If remaining count is 0, set cooldown
if remaining_count == 0:
reset_seconds = remaining_info.get("access_resets_in_seconds", 0)
if reset_seconds > 0:
cooldown_until = datetime.now() + timedelta(seconds=reset_seconds)
await self.db.update_token_sora2_cooldown(token_id, cooldown_until)
print(f"⏱️ Token {token_id} 剩余次数为0,设置冷却时间至: {cooldown_until}")
except Exception as e:
print(f"Failed to update Sora2 remaining count: {e}")
async def refresh_sora2_remaining_if_cooldown_expired(self, token_id: int):
"""Refresh Sora2 remaining count if cooldown has expired"""
try:
token_data = await self.db.get_token(token_id)
if not token_data or not token_data.sora2_supported:
return
# Check if Sora2 cooldown has expired
if token_data.sora2_cooldown_until and token_data.sora2_cooldown_until <= datetime.now():
print(f"🔄 Token {token_id} Sora2冷却已过期,正在刷新剩余次数...")
try:
remaining_info = await self.get_sora2_remaining_count(token_data.token)
if remaining_info.get("success"):
remaining_count = remaining_info.get("remaining_count", 0)
await self.db.update_token_sora2_remaining(token_id, remaining_count)
# Clear cooldown
await self.db.update_token_sora2_cooldown(token_id, None)
print(f"✅ Token {token_id} Sora2剩余次数已刷新: {remaining_count}")
except Exception as e:
print(f"Failed to refresh Sora2 remaining count: {e}")
except Exception as e:
print(f"Error in refresh_sora2_remaining_if_cooldown_expired: {e}")
async def auto_refresh_expiring_token(self, token_id: int) -> bool:
"""
Auto refresh token when expiry time is within 24 hours using ST or RT
Returns:
True if refresh successful, False otherwise
"""
try:
token_data = await self.db.get_token(token_id)
if not token_data:
return False
# Check if token is expiring within 24 hours
if not token_data.expiry_time:
return False # No expiry time set
time_until_expiry = token_data.expiry_time - datetime.now()
hours_until_expiry = time_until_expiry.total_seconds() / 3600
# Only refresh if expiry is within 24 hours (1440 minutes)
if hours_until_expiry > 24:
return False # Token not expiring soon
if hours_until_expiry < 0:
# Token already expired, still try to refresh
print(f"🔄 Token {token_id} 已过期,尝试自动刷新...")
else:
print(f"🔄 Token {token_id} 将在 {hours_until_expiry:.1f} 小时后过期,尝试自动刷新...")
# Priority: ST > RT
new_at = None
new_st = None
new_rt = None
if token_data.st:
# Try to refresh using ST
try:
print(f"📝 使用 ST 刷新 Token {token_id}...")
result = await self.st_to_at(token_data.st)
new_at = result.get("access_token")
# ST refresh doesn't return new ST, so keep the old one
new_st = token_data.st
print(f"✅ 使用 ST 刷新成功")
except Exception as e:
print(f"❌ 使用 ST 刷新失败: {e}")
new_at = None
if not new_at and token_data.rt:
# Try to refresh using RT
try:
print(f"📝 使用 RT 刷新 Token {token_id}...")
result = await self.rt_to_at(token_data.rt)
new_at = result.get("access_token")
new_rt = result.get("refresh_token", token_data.rt) # RT might be updated
print(f"✅ 使用 RT 刷新成功")
except Exception as e:
print(f"❌ 使用 RT 刷新失败: {e}")
new_at = None
if new_at:
# Update token with new AT
await self.update_token(token_id, token=new_at, st=new_st, rt=new_rt)
print(f"✅ Token {token_id} 已自动刷新")
return True
else:
# No ST or RT, disable token
print(f"⚠️ Token {token_id} 无法刷新(无 ST 或 RT),已禁用")
await self.disable_token(token_id)
return False
except Exception as e:
print(f"❌ 自动刷新 Token {token_id} 失败: {e}")
return False