Rfym21 commited on
Commit
07c206b
·
verified ·
1 Parent(s): 1b32100

Delete auth_utils.py

Browse files
Files changed (1) hide show
  1. auth_utils.py +0 -114
auth_utils.py DELETED
@@ -1,114 +0,0 @@
1
- import logging
2
- import re
3
- from typing import Dict, Any, Optional
4
-
5
- import requests
6
- from requests.exceptions import RequestException
7
-
8
- # 常量定义
9
- _BASE_URL = "https://chat.notdiamond.ai"
10
- _API_BASE_URL = "https://spuckhogycrxcbomznwo.supabase.co"
11
- _USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36'
12
-
13
- class AuthManager:
14
- """
15
- AuthManager类用于管理身份验证过程,包括获取API密钥、用户信息和处理刷新令牌等操作。
16
- """
17
-
18
- def __init__(self, email: str, password: str):
19
- self._email: str = email
20
- self._password: str = password
21
- self._api_key: str = ""
22
- self._user_info: Dict[str, Any] = {}
23
- self._refresh_token: str = ""
24
- self._session: requests.Session = requests.session()
25
-
26
- self._logger: logging.Logger = logging.getLogger(__name__)
27
- logging.basicConfig(level=logging.INFO)
28
-
29
- def login(self) -> None:
30
- """使用电子邮件和密码进行用户登录,并获取用户信息。"""
31
- url = f"{_API_BASE_URL}/auth/v1/token?grant_type=password"
32
- headers = self._get_headers(with_content_type=True)
33
- data = {
34
- "email": self._email,
35
- "password": self._password,
36
- "gotrue_meta_security": {}
37
- }
38
-
39
- try:
40
- response = self._make_request('POST', url, headers=headers, json=data)
41
- self._user_info = response.json()
42
- self._refresh_token = self._user_info.get('refresh_token', '')
43
- self._log_values()
44
- except RequestException as e:
45
- self._logger.error(f"\033[91m登录请求错误: {e}\033[0m")
46
-
47
- def refresh_user_token(self) -> None:
48
- """使用刷新令牌来请求一个新的访问令牌并更新实例变量。"""
49
- url = f"{_API_BASE_URL}/auth/v1/token?grant_type=refresh_token"
50
- headers = self._get_headers(with_content_type=True)
51
- data = {"refresh_token": self._refresh_token}
52
-
53
- try:
54
- response = self._make_request('POST', url, headers=headers, json=data)
55
- self._user_info = response.json()
56
- self._refresh_token = self._user_info.get('refresh_token', '')
57
- self._log_values()
58
- except RequestException as e:
59
- self._logger.error(f"刷新令牌请求错误: {e}")
60
-
61
- def get_jwt_value(self) -> str:
62
- """返回访问令牌。"""
63
- return self._user_info.get('access_token', '')
64
-
65
- def _log_values(self) -> None:
66
- """记录刷新令牌到日志中。"""
67
- self._logger.info(f"\033[92mRefresh Token: {self._refresh_token}\033[0m")
68
-
69
- def _fetch_apikey(self) -> str:
70
- """获取API密钥。"""
71
- if self._api_key:
72
- return self._api_key
73
-
74
- try:
75
- login_url = f"{_BASE_URL}/login"
76
- response = self._make_request('GET', login_url)
77
-
78
- match = re.search(r'<script src="(/_next/static/chunks/app/layout-[^"]+\.js)"', response.text)
79
- if not match:
80
- raise ValueError("未找到匹配的脚本标签")
81
-
82
- js_url = f"{_BASE_URL}{match.group(1)}"
83
- js_response = self._make_request('GET', js_url)
84
-
85
- api_key_match = re.search(r'\("https://spuckhogycrxcbomznwo\.supabase\.co","([^"]+)"\)', js_response.text)
86
- if not api_key_match:
87
- raise ValueError("未能匹配API key")
88
-
89
- self._api_key = api_key_match.group(1)
90
- return self._api_key
91
-
92
- except (RequestException, ValueError) as e:
93
- self._logger.error(f"获取API密钥时发生错误: {e}")
94
- return ""
95
-
96
- def _get_headers(self, with_content_type: bool = False) -> Dict[str, str]:
97
- """生成请求头。"""
98
- headers = {
99
- 'apikey': self._fetch_apikey(),
100
- 'user-agent': _USER_AGENT
101
- }
102
- if with_content_type:
103
- headers['Content-Type'] = 'application/json'
104
- return headers
105
-
106
- def _make_request(self, method: str, url: str, **kwargs) -> requests.Response:
107
- """发送HTTP请求并处理异常。"""
108
- try:
109
- response = self._session.request(method, url, **kwargs)
110
- response.raise_for_status()
111
- return response
112
- except RequestException as e:
113
- self._logger.error(f"请求错误 ({method} {url}): {e}")
114
- raise