File size: 2,893 Bytes
d639380
7be0b4f
 
d639380
7be0b4f
d639380
 
7be0b4f
d639380
 
7be0b4f
 
 
 
 
d639380
7be0b4f
 
 
 
 
 
 
 
d639380
 
7be0b4f
 
 
 
 
d639380
7be0b4f
 
d639380
 
 
 
 
 
7be0b4f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d639380
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7be0b4f
d639380
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import asyncio
import os
import random

import ua_generator
from fastapi import HTTPException

import chatgpt.globals as globals
from chatgpt.refreshToken import rt2ac
from utils.Logger import logger
from utils.config import authorization_list, random_token

os.environ['PYTHONHASHSEED'] = '0'
random.seed(0)


def get_req_token(req_token, seed=None):
    available_token_list = list(set(globals.token_list) - set(globals.error_token_list))
    length = len(available_token_list)
    if seed and length > 0:
        req_token = globals.token_list[hash(seed) % length]
        while req_token in globals.error_token_list:
            req_token = random.choice(globals.token_list)
        return req_token

    if req_token in authorization_list:
        if len(available_token_list) > 0:
            if random_token:
                req_token = random.choice(available_token_list)
                return req_token
            else:
                globals.count += 1
                globals.count %= length
                return available_token_list[globals.count]
        else:
            return None
    else:
        return req_token


def get_ua(req_token):
    user_agent = globals.user_agent_map.get(req_token, "")
    # token为空,免登录用户,则随机生成ua
    if not user_agent:
        ua = ua_generator.generate(device='desktop', browser=('chrome', 'edge'), platform=('windows', 'macos'))
        return {
            "User-Agent": ua.text,
            "Sec-Ch-Ua-Platform": ua.platform,
            "Sec-Ch-Ua": ua.ch.brands,
            "Sec-Ch-Ua-Mobile": ua.ch.mobile,
            "impersonate": random.choice(globals.impersonate_list),
        }
    else:
        return user_agent


async def verify_token(req_token):
    if not req_token:
        if authorization_list:
            logger.error("Unauthorized with empty token.")
            raise HTTPException(status_code=401)
        else:
            return None
    else:
        if req_token.startswith("eyJhbGciOi") or req_token.startswith("fk-"):
            access_token = req_token
            return access_token
        elif len(req_token) == 45:
            try:
                access_token = await rt2ac(req_token, force_refresh=False)
                return access_token
            except HTTPException as e:
                raise HTTPException(status_code=e.status_code, detail=e.detail)
        else:
            return req_token


async def refresh_all_tokens(force_refresh=False):
    for token in list(set(globals.token_list) - set(globals.error_token_list)):
        if len(token) == 45:
            try:
                await asyncio.sleep(2)
                await rt2ac(token, force_refresh=force_refresh)
            except HTTPException:
                pass
    logger.info("All tokens refreshed.")