File size: 4,469 Bytes
9a62f96
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import logging
import re
from typing import Dict, Any, Optional

import requests
from requests.exceptions import RequestException

# 常量定义
_BASE_URL = "https://chat.notdiamond.ai"
_API_BASE_URL = "https://spuckhogycrxcbomznwo.supabase.co"
_USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36'

class AuthManager:
    """
    AuthManager类用于管理身份验证过程,包括获取API密钥、用户信息和处理刷新令牌等操作。
    """
    
    def __init__(self, email: str, password: str):
        self._email: str = email
        self._password: str = password
        self._api_key: str = ""
        self._user_info: Dict[str, Any] = {}
        self._refresh_token: str = ""
        self._session: requests.Session = requests.session()
        
        self._logger: logging.Logger = logging.getLogger(__name__)
        logging.basicConfig(level=logging.INFO)
        
    def login(self) -> None:
        """使用电子邮件和密码进行用户登录,并获取用户信息。"""
        url = f"{_API_BASE_URL}/auth/v1/token?grant_type=password"
        headers = self._get_headers(with_content_type=True)
        data = {
            "email": self._email,
            "password": self._password,
            "gotrue_meta_security": {}
        }

        try:
            response = self._make_request('POST', url, headers=headers, json=data)
            self._user_info = response.json()
            self._refresh_token = self._user_info.get('refresh_token', '')
            self._log_values()
        except RequestException as e:
            self._logger.error(f"\033[91m登录请求错误: {e}\033[0m")

    def refresh_user_token(self) -> None:
        """使用刷新令牌来请求一个新的访问令牌并更新实例变量。"""
        url = f"{_API_BASE_URL}/auth/v1/token?grant_type=refresh_token"
        headers = self._get_headers(with_content_type=True)
        data = {"refresh_token": self._refresh_token}

        try:
            response = self._make_request('POST', url, headers=headers, json=data)
            self._user_info = response.json()
            self._refresh_token = self._user_info.get('refresh_token', '')
            self._log_values()
        except RequestException as e:
            self._logger.error(f"刷新令牌请求错误: {e}")

    def get_jwt_value(self) -> str:
        """返回访问令牌。"""
        return self._user_info.get('access_token', '')

    def _log_values(self) -> None:
        """记录刷新令牌到日志中。"""
        self._logger.info(f"\033[92mRefresh Token: {self._refresh_token}\033[0m")
        
    def _fetch_apikey(self) -> str:
        """获取API密钥。"""
        if self._api_key:
            return self._api_key

        try:
            login_url = f"{_BASE_URL}/login"
            response = self._make_request('GET', login_url)
            
            match = re.search(r'<script src="(/_next/static/chunks/app/layout-[^"]+\.js)"', response.text)
            if not match:
                raise ValueError("未找到匹配的脚本标签")

            js_url = f"{_BASE_URL}{match.group(1)}"
            js_response = self._make_request('GET', js_url)
            
            api_key_match = re.search(r'\("https://spuckhogycrxcbomznwo\.supabase\.co","([^"]+)"\)', js_response.text)
            if not api_key_match:
                raise ValueError("未能匹配API key")
            
            self._api_key = api_key_match.group(1)
            return self._api_key

        except (RequestException, ValueError) as e:
            self._logger.error(f"获取API密钥时发生错误: {e}")
            return ""

    def _get_headers(self, with_content_type: bool = False) -> Dict[str, str]:
        """生成请求头。"""
        headers = {
            'apikey': self._fetch_apikey(),
            'user-agent': _USER_AGENT
        }
        if with_content_type:
            headers['Content-Type'] = 'application/json'
        return headers

    def _make_request(self, method: str, url: str, **kwargs) -> requests.Response:
        """发送HTTP请求并处理异常。"""
        try:
            response = self._session.request(method, url, **kwargs)
            response.raise_for_status()
            return response
        except RequestException as e:
            self._logger.error(f"请求错误 ({method} {url}): {e}")
            raise