|
|
import json
|
|
|
import requests
|
|
|
from flask import redirect, url_for, session, flash
|
|
|
from functools import wraps
|
|
|
import utils.configs as configs
|
|
|
import utils.globals as globals
|
|
|
from utils.globals import *
|
|
|
from utils.tools import *
|
|
|
|
|
|
|
|
|
|
|
|
def refresh_access_tokens():
|
|
|
globals.failed_tokens = []
|
|
|
|
|
|
for token_info in globals.chatToken:
|
|
|
email = token_info['email']
|
|
|
refresh_token = token_info['refresh_token']
|
|
|
|
|
|
|
|
|
if not refresh_token:
|
|
|
continue
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
|
url = "https://auth0.openai.com/oauth/token"
|
|
|
headers = {"Content-Type": "application/json"}
|
|
|
data = {
|
|
|
"redirect_uri": "com.openai.chat://auth0.openai.com/ios/com.openai.chat/callback",
|
|
|
"grant_type": "refresh_token",
|
|
|
"client_id": "pdlLIX2Y72MIl2rhLhTE9VV9bN905kBh",
|
|
|
"refresh_token": refresh_token
|
|
|
}
|
|
|
response = requests.post(url, headers=headers, data=json.dumps(data))
|
|
|
response_data = response.json()
|
|
|
|
|
|
access_token = response_data.get("access_token")
|
|
|
if access_token:
|
|
|
|
|
|
for i, user in enumerate(globals.users):
|
|
|
if user['bind_email'] == email:
|
|
|
globals.users[i]['bind_token'] = access_token
|
|
|
set_seedmap(globals.users[i]['id'],access_token)
|
|
|
save_users(globals.users)
|
|
|
token_info['access_token'] = access_token
|
|
|
token_info['status'] = True
|
|
|
else:
|
|
|
|
|
|
token_info['status'] = False
|
|
|
globals.failed_tokens.append(token_info)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
token_info['status'] = False
|
|
|
globals.failed_tokens.append(token_info)
|
|
|
|
|
|
|
|
|
save_retoken(globals.chatToken)
|
|
|
|
|
|
|
|
|
save_failed_tokens(globals.failed_tokens)
|
|
|
|
|
|
return globals.chatToken
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def getoauth(seed_token):
|
|
|
domain = configs.domain_chatgpt
|
|
|
|
|
|
url = f'{domain}/?token={seed_token}'
|
|
|
try:
|
|
|
return url
|
|
|
except requests.RequestException as e:
|
|
|
return None
|
|
|
|
|
|
|
|
|
def login_required(f):
|
|
|
@wraps(f)
|
|
|
def decorated_function(*args, **kwargs):
|
|
|
if not session.get('logged_in'):
|
|
|
flash('请先登录。', 'warning')
|
|
|
return redirect(url_for('login'))
|
|
|
return f(*args, **kwargs)
|
|
|
return decorated_function
|
|
|
|
|
|
|
|
|
def admin_required(f):
|
|
|
@wraps(f)
|
|
|
def decorated_function(*args, **kwargs):
|
|
|
if not session.get('logged_in'):
|
|
|
flash('请先登录。', 'warning')
|
|
|
return redirect(url_for('login'))
|
|
|
if session.get('role') != 'admin':
|
|
|
flash('需要管理员权限。', 'danger')
|
|
|
return redirect(url_for('index'))
|
|
|
return f(*args, **kwargs)
|
|
|
return decorated_function
|
|
|
|
|
|
|
|
|
def set_seedmap(user_id,token):
|
|
|
|
|
|
domain = configs.domain_chatgpt
|
|
|
|
|
|
url = f'{domain}/seedtoken'
|
|
|
headers = {
|
|
|
"Authorization": f"Bearer {configs.authorization}",
|
|
|
"Content-Type": "application/json"
|
|
|
}
|
|
|
data = {
|
|
|
"seed": user_id,
|
|
|
"token": token,
|
|
|
}
|
|
|
response = requests.post(url, headers=headers, data=json.dumps(data))
|
|
|
return response.status_code
|
|
|
|
|
|
|
|
|
|
|
|
def del_seedmap(user_id):
|
|
|
|
|
|
domain = configs.domain_chatgpt
|
|
|
|
|
|
url = f'{domain}/seedtoken'
|
|
|
headers = {
|
|
|
"Authorization": f"Bearer {configs.authorization}",
|
|
|
"Content-Type": "application/json"
|
|
|
}
|
|
|
data = {
|
|
|
"seed": user_id
|
|
|
}
|
|
|
response = requests.delete(url, headers=headers, data=json.dumps(data))
|
|
|
return response.status_code
|
|
|
|
|
|
|
|
|
def get_claude_login_url(session_key,uname):
|
|
|
domain = configs.domain_claude
|
|
|
url = f'{domain}/manage-api/auth/oauth_token'
|
|
|
|
|
|
|
|
|
data = {
|
|
|
'session_key': session_key,
|
|
|
'unique_name': uname,
|
|
|
"expires_in": 3600
|
|
|
}
|
|
|
|
|
|
|
|
|
headers = {'Content-Type': 'application/json'}
|
|
|
|
|
|
try:
|
|
|
|
|
|
response = requests.post(url, headers=headers, data=json.dumps(data))
|
|
|
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
response_data = response.json()
|
|
|
|
|
|
|
|
|
if 'login_url' in response_data:
|
|
|
login_url = response_data['login_url']
|
|
|
|
|
|
|
|
|
if not login_url.startswith('http'):
|
|
|
return f'{domain}' + login_url
|
|
|
return login_url
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
except requests.RequestException as e:
|
|
|
|
|
|
return None |