Chat2API / chatgpt /authorization.py
Rfym21's picture
Upload 24 files
a7df830 verified
raw
history blame
3.65 kB
import asyncio
import json
import os
import random
import ua_generator
from fastapi import HTTPException
import chatgpt.globals as globals
from chatgpt.refreshToken import rt2ac
from utils.Logger import logger
from utils.config import authorization_list, random_token
os.environ['PYTHONHASHSEED'] = '0'
random.seed(0)
def get_req_token(req_token, seed=None):
available_token_list = list(set(globals.token_list) - set(globals.error_token_list))
length = len(available_token_list)
if seed and length > 0:
req_token = globals.token_list[hash(seed) % length]
while req_token in globals.error_token_list:
req_token = random.choice(globals.token_list)
return req_token
if req_token in authorization_list:
if len(available_token_list) > 0:
if random_token:
req_token = random.choice(available_token_list)
return req_token
else:
globals.count += 1
globals.count %= length
return available_token_list[globals.count]
else:
return None
else:
return req_token
def get_ua(req_token):
user_agent = globals.user_agent_map.get(req_token, "")
user_agent = {k.lower(): v for k, v in user_agent.items()}
if not user_agent:
if not req_token:
ua = ua_generator.generate(device='desktop', browser=('chrome', 'edge'), platform=('windows', 'macos'))
return {
"user-agent": ua.text,
"sec-ch-ua-platform": ua.platform,
"sec-ch-ua": ua.ch.brands,
"sec-ch-ua-mobile": ua.ch.mobile,
"impersonate": random.choice(globals.impersonate_list),
}
else:
ua = ua_generator.generate(device='desktop', browser=('chrome', 'edge'), platform=('windows', 'macos'))
user_agent = {
"user-agent": ua.text,
"sec-ch-ua-platform": ua.platform,
"sec-ch-ua": ua.ch.brands,
"sec-ch-ua-mobile": ua.ch.mobile,
"impersonate": random.choice(globals.impersonate_list),
}
globals.user_agent_map[req_token] = user_agent
with open(globals.USER_AGENTS_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps({req_token: user_agent}, indent=4))
return user_agent
else:
return user_agent
async def verify_token(req_token):
if not req_token:
if authorization_list:
logger.error("Unauthorized with empty token.")
raise HTTPException(status_code=401)
else:
return None
else:
if req_token.startswith("eyJhbGciOi") or req_token.startswith("fk-"):
access_token = req_token
return access_token
elif len(req_token) == 45:
try:
access_token = await rt2ac(req_token, force_refresh=False)
return access_token
except HTTPException as e:
raise HTTPException(status_code=e.status_code, detail=e.detail)
else:
return req_token
async def refresh_all_tokens(force_refresh=False):
for token in list(set(globals.token_list) - set(globals.error_token_list)):
if len(token) == 45:
try:
await asyncio.sleep(2)
await rt2ac(token, force_refresh=force_refresh)
except HTTPException:
pass
logger.info("All tokens refreshed.")