| import asyncio | |
| import json | |
| import os | |
| import random | |
| import ua_generator | |
| from fastapi import HTTPException | |
| import chatgpt.globals as globals | |
| from chatgpt.refreshToken import rt2ac | |
| from utils.Logger import logger | |
| from utils.config import authorization_list, random_token | |
| os.environ['PYTHONHASHSEED'] = '0' | |
| random.seed(0) | |
| def get_req_token(req_token, seed=None): | |
| available_token_list = list(set(globals.token_list) - set(globals.error_token_list)) | |
| length = len(available_token_list) | |
| if seed and length > 0: | |
| req_token = globals.token_list[hash(seed) % length] | |
| while req_token in globals.error_token_list: | |
| req_token = random.choice(globals.token_list) | |
| return req_token | |
| if req_token in authorization_list: | |
| if len(available_token_list) > 0: | |
| if random_token: | |
| req_token = random.choice(available_token_list) | |
| return req_token | |
| else: | |
| globals.count += 1 | |
| globals.count %= length | |
| return available_token_list[globals.count] | |
| else: | |
| return None | |
| else: | |
| return req_token | |
| def get_ua(req_token): | |
| user_agent = globals.user_agent_map.get(req_token, "") | |
| user_agent = {k.lower(): v for k, v in user_agent.items()} | |
| if not user_agent: | |
| if not req_token: | |
| ua = ua_generator.generate(device='desktop', browser=('chrome', 'edge'), platform=('windows', 'macos')) | |
| return { | |
| "user-agent": ua.text, | |
| "sec-ch-ua-platform": ua.platform, | |
| "sec-ch-ua": ua.ch.brands, | |
| "sec-ch-ua-mobile": ua.ch.mobile, | |
| "impersonate": random.choice(globals.impersonate_list), | |
| } | |
| else: | |
| ua = ua_generator.generate(device='desktop', browser=('chrome', 'edge'), platform=('windows', 'macos')) | |
| user_agent = { | |
| "user-agent": ua.text, | |
| "sec-ch-ua-platform": ua.platform, | |
| "sec-ch-ua": ua.ch.brands, | |
| "sec-ch-ua-mobile": ua.ch.mobile, | |
| "impersonate": random.choice(globals.impersonate_list), | |
| } | |
| globals.user_agent_map[req_token] = user_agent | |
| with open(globals.USER_AGENTS_FILE, "a", encoding="utf-8") as f: | |
| f.write(json.dumps({req_token: user_agent}, indent=4)) | |
| return user_agent | |
| else: | |
| return user_agent | |
| async def verify_token(req_token): | |
| if not req_token: | |
| if authorization_list: | |
| logger.error("Unauthorized with empty token.") | |
| raise HTTPException(status_code=401) | |
| else: | |
| return None | |
| else: | |
| if req_token.startswith("eyJhbGciOi") or req_token.startswith("fk-"): | |
| access_token = req_token | |
| return access_token | |
| elif len(req_token) == 45: | |
| try: | |
| access_token = await rt2ac(req_token, force_refresh=False) | |
| return access_token | |
| except HTTPException as e: | |
| raise HTTPException(status_code=e.status_code, detail=e.detail) | |
| else: | |
| return req_token | |
| async def refresh_all_tokens(force_refresh=False): | |
| for token in list(set(globals.token_list) - set(globals.error_token_list)): | |
| if len(token) == 45: | |
| try: | |
| await asyncio.sleep(2) | |
| await rt2ac(token, force_refresh=force_refresh) | |
| except HTTPException: | |
| pass | |
| logger.info("All tokens refreshed.") | |