File size: 5,665 Bytes
bab9c3b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
import json
import requests
from flask import redirect, url_for, session, flash
from functools import wraps
import utils.configs as configs
import utils.globals as globals
from utils.globals import *
from utils.tools import *
# 刷新 access_token 的主函数
def refresh_access_tokens():
globals.failed_tokens = []
# 遍历 refresh_token 列表
for token_info in globals.chatToken:
email = token_info['email']
refresh_token = token_info['refresh_token']
# 如果 refresh_token 为空,跳过这一行
if not refresh_token:
continue
try:
# 使用 POST 请求通过 refresh_token 获取 access_token
url = "https://auth0.openai.com/oauth/token"
headers = {"Content-Type": "application/json"}
data = {
"redirect_uri": "com.openai.chat://auth0.openai.com/ios/com.openai.chat/callback",
"grant_type": "refresh_token",
"client_id": "pdlLIX2Y72MIl2rhLhTE9VV9bN905kBh",
"refresh_token": refresh_token
}
response = requests.post(url, headers=headers, data=json.dumps(data))
response_data = response.json()
access_token = response_data.get("access_token")
if access_token: # 如果成功获取到 access_token
# 更新 access_token 和状态为 True
for i, user in enumerate(globals.users):
if user['bind_email'] == email:
globals.users[i]['bind_token'] = access_token
set_seedmap(globals.users[i]['id'],access_token)
save_users(globals.users)
token_info['access_token'] = access_token
token_info['status'] = True
else:
# 如果获取失败,设置状态为 False
token_info['status'] = False
globals.failed_tokens.append(token_info)
except Exception as e:
# 捕获请求错误并记录失败的 token,状态为 False
token_info['status'] = False
globals.failed_tokens.append(token_info)
# 保存更新后的 retoken 数据
save_retoken(globals.chatToken)
# 如果有失败的 token,记录到 failed_tokens.json
save_failed_tokens(globals.failed_tokens)
return globals.chatToken
# 获取登陆链接
def getoauth(seed_token):
domain = configs.domain_chatgpt
url = f'{domain}/?token={seed_token}'
try:
return url
except requests.RequestException as e:
return None
# 验证是否登录的装饰器
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get('logged_in'):
flash('请先登录。', 'warning')
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
# 验证是否为管理员的装饰器
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get('logged_in'):
flash('请先登录。', 'warning')
return redirect(url_for('login'))
if session.get('role') != 'admin':
flash('需要管理员权限。', 'danger')
return redirect(url_for('index'))
return f(*args, **kwargs)
return decorated_function
# 设置网关seedmap
def set_seedmap(user_id,token):
domain = configs.domain_chatgpt
url = f'{domain}/seedtoken'
headers = {
"Authorization": f"Bearer {configs.authorization}",
"Content-Type": "application/json"
}
data = {
"seed": user_id,
"token": token,
}
response = requests.post(url, headers=headers, data=json.dumps(data))
return response.status_code
# 删除网关seedmap
def del_seedmap(user_id):
domain = configs.domain_chatgpt
url = f'{domain}/seedtoken'
headers = {
"Authorization": f"Bearer {configs.authorization}",
"Content-Type": "application/json"
}
data = {
"seed": user_id
}
response = requests.delete(url, headers=headers, data=json.dumps(data))
return response.status_code
# 获取Claude登陆链接
def get_claude_login_url(session_key,uname):
domain = configs.domain_claude
url = f'{domain}/manage-api/auth/oauth_token'
# 请求体参数
data = {
'session_key': session_key,
'unique_name': uname, # 生成唯一标识符
"expires_in": 3600 #过期时间1小时
}
# 设置请求头
headers = {'Content-Type': 'application/json'}
try:
# 发送 POST 请求
response = requests.post(url, headers=headers, data=json.dumps(data))
# 检查响应状态码是否为200
if response.status_code == 200:
response_data = response.json()
# 检查 'login_url' 是否存在
if 'login_url' in response_data:
login_url = response_data['login_url']
# 如果URL没有以http开头,拼接基础URL
if not login_url.startswith('http'):
return f'{domain}' + login_url
return login_url
# 如果状态码不是200或login_url不存在,返回None
return None
except requests.RequestException as e:
# 捕获异常并返回错误信息
return None |