Spaces:
Paused
Paused
File size: 16,872 Bytes
fd21f34 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 |
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Token池管理器
实现AUTH_TOKEN的轮询机制,提供负载均衡和容错功能
"""
import asyncio
import time
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from threading import Lock
import httpx
from app.utils.logger import logger
@dataclass
class TokenStatus:
"""Token状态信息"""
token: str
is_available: bool = True
failure_count: int = 0
last_failure_time: float = 0.0
last_success_time: float = 0.0
total_requests: int = 0
successful_requests: int = 0
token_type: str = "unknown" # "user", "guest", "unknown"
@property
def success_rate(self) -> float:
"""成功率"""
if self.total_requests == 0:
return 1.0
return self.successful_requests / self.total_requests
@property
def is_healthy(self) -> bool:
"""
是否健康
健康的定义:
1. 必须是认证用户token (token_type = "user")
2. 当前可用 (is_available = True)
3. 成功率 >= 50% 或者总请求数 <= 3(新token容错)
注意:guest token不应该在AUTH_TOKENS中
"""
# guest token永远不健康
if self.token_type == "guest":
return False
# 未知类型token不健康
if self.token_type != "user":
return False
# 不可用的token不健康
if not self.is_available:
return False
# 对于认证用户token,基于成功率判断
# 新token或请求数很少时,给予容错
if self.total_requests <= 3:
return self.failure_count == 0
# 基于成功率判断健康状态
return self.success_rate >= 0.5
class TokenPool:
"""Token池管理器"""
def __init__(self, tokens: List[str], failure_threshold: int = 3, recovery_timeout: int = 1800):
"""
初始化Token池
Args:
tokens: token列表
failure_threshold: 失败阈值,超过此次数将标记为不可用
recovery_timeout: 恢复超时时间(秒),失败token在此时间后重新尝试
"""
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self._lock = Lock()
self._current_index = 0
# 初始化token状态
self.token_statuses: Dict[str, TokenStatus] = {}
original_count = len(tokens)
unique_tokens = []
# 去重处理
for token in tokens:
if token and token not in self.token_statuses: # 过滤空token和重复token
# 预设为认证用户token,因为这些是用户手动配置的token
self.token_statuses[token] = TokenStatus(token=token, token_type="user")
unique_tokens.append(token)
duplicate_count = original_count - len(unique_tokens)
if duplicate_count > 0:
logger.warning(f"⚠️ 检测到 {duplicate_count} 个重复token,已自动去重")
if not self.token_statuses:
logger.warning("⚠️ Token池为空,将依赖匿名模式")
# else:
# logger.info(f"🔧 初始化Token池,共 {len(self.token_statuses)} 个token")
def get_next_token(self) -> Optional[str]:
"""
获取下一个可用的token(轮询算法)
Returns:
可用的token,如果没有可用token则返回None
"""
with self._lock:
if not self.token_statuses:
return None
available_tokens = self._get_available_tokens()
if not available_tokens:
# 尝试恢复过期的失败token
self._try_recover_failed_tokens()
available_tokens = self._get_available_tokens()
if not available_tokens:
logger.warning("⚠️ 没有可用的token")
return None
# 轮询选择token
token = available_tokens[self._current_index % len(available_tokens)]
self._current_index = (self._current_index + 1) % len(available_tokens)
return token
def _get_available_tokens(self) -> List[str]:
"""
获取当前可用的认证用户token列表
返回满足以下条件的token:
1. is_available = True (可用状态)
2. token_type == "user" (认证用户token)
这确保轮询机制只会选择有效的认证用户token,跳过匿名用户token
"""
available_user_tokens = [
status.token for status in self.token_statuses.values()
if status.is_available and status.token_type == "user"
]
# 检查是否有匿名用户token并给出警告
if not available_user_tokens and self.token_statuses:
guest_tokens = [
status.token for status in self.token_statuses.values()
if status.token_type == "guest"
]
if guest_tokens:
logger.warning(f"⚠️ 检测到 {len(guest_tokens)} 个匿名用户token,轮询机制将跳过这些token")
return available_user_tokens
def _try_recover_failed_tokens(self):
"""尝试恢复失败的token"""
current_time = time.time()
recovered_count = 0
for status in self.token_statuses.values():
if (not status.is_available and
current_time - status.last_failure_time > self.recovery_timeout):
status.is_available = True
status.failure_count = 0
recovered_count += 1
logger.info(f"🔄 恢复失败token: {status.token[:20]}...")
if recovered_count > 0:
logger.info(f"✅ 恢复了 {recovered_count} 个失败的token")
def mark_token_success(self, token: str):
"""标记token使用成功"""
with self._lock:
if token in self.token_statuses:
status = self.token_statuses[token]
status.total_requests += 1
status.successful_requests += 1
status.last_success_time = time.time()
status.failure_count = 0 # 重置失败计数
if not status.is_available:
status.is_available = True
logger.info(f"✅ Token恢复可用: {token[:20]}...")
def mark_token_failure(self, token: str, error: Exception = None):
"""标记token使用失败"""
with self._lock:
if token in self.token_statuses:
status = self.token_statuses[token]
status.total_requests += 1
status.failure_count += 1
status.last_failure_time = time.time()
if status.failure_count >= self.failure_threshold:
status.is_available = False
logger.warning(f"🚫 Token已禁用: {token[:20]}... (失败 {status.failure_count} 次)")
def get_pool_status(self) -> Dict:
"""获取token池状态信息"""
with self._lock:
available_count = len(self._get_available_tokens())
total_count = len(self.token_statuses)
# 统计健康token数量
healthy_count = sum(1 for status in self.token_statuses.values() if status.is_healthy)
status_info = {
"total_tokens": total_count,
"available_tokens": available_count,
"unavailable_tokens": total_count - available_count,
"healthy_tokens": healthy_count,
"unhealthy_tokens": total_count - healthy_count,
"current_index": self._current_index,
"tokens": []
}
for token, status in self.token_statuses.items():
status_info["tokens"].append({
"token": f"{token[:10]}...{token[-10:]}",
"token_type": status.token_type,
"is_available": status.is_available,
"failure_count": status.failure_count,
"success_count": status.successful_requests,
"success_rate": f"{status.success_rate:.2%}",
"total_requests": status.total_requests,
"is_healthy": status.is_healthy,
"last_failure_time": status.last_failure_time,
"last_success_time": status.last_success_time
})
return status_info
def update_tokens(self, new_tokens: List[str]):
"""动态更新token列表"""
with self._lock:
# 保留现有token的状态信息
old_statuses = self.token_statuses.copy()
self.token_statuses.clear()
original_count = len(new_tokens)
unique_tokens = []
# 去重并添加新token,保留已存在token的状态
for token in new_tokens:
if token and token not in self.token_statuses: # 过滤空token和重复token
if token in old_statuses:
self.token_statuses[token] = old_statuses[token]
else:
# 预设为认证用户token,因为这些是用户手动配置的token
self.token_statuses[token] = TokenStatus(token=token, token_type="user")
unique_tokens.append(token)
# 记录去重信息
duplicate_count = original_count - len(unique_tokens)
if duplicate_count > 0:
logger.warning(f"⚠️ 更新时检测到 {duplicate_count} 个重复token,已自动去重")
# 重置索引
self._current_index = 0
logger.info(f"🔄 更新Token池,共 {len(self.token_statuses)} 个token")
async def health_check_token(self, token: str, auth_url: str = "https://chat.z.ai/api/v1/auths/") -> bool:
"""
异步健康检查单个token
使用Z.AI认证API验证token的有效性,通过检查响应内容判断token是否有效
Args:
token: 要检查的token
auth_url: 认证URL
Returns:
token是否健康
"""
try:
# 构建完整的请求头,模拟真实浏览器请求
headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Authorization": f"Bearer {token}",
"Connection": "keep-alive",
"Content-Type": "application/json",
"DNT": "1",
"Referer": "https://chat.z.ai/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
"sec-ch-ua": '"Chromium";v="140", "Not=A?Brand";v="24", "Google Chrome";v="140"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "Windows"
}
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.get(auth_url, headers=headers)
# 验证token有效性并获取类型
token_type, is_healthy = self._validate_token_response(response)
# 更新token类型
if token in self.token_statuses:
self.token_statuses[token].token_type = token_type
if is_healthy:
self.mark_token_success(token)
else:
# 简化错误信息,只记录关键错误类型
if token_type == "guest":
error_msg = "匿名用户token"
elif response.status_code != 200:
error_msg = f"HTTP {response.status_code}"
else:
error_msg = "认证失败"
self.mark_token_failure(token, Exception(error_msg))
return is_healthy
except (httpx.TimeoutException, httpx.ConnectError, Exception) as e:
self.mark_token_failure(token, e)
return False
def _validate_token_response(self, response: httpx.Response) -> bool:
"""
基于Z.AI API响应中的role字段验证token类型
验证规则:
- role: "user" = 认证用户token(有效,可用于AUTH_TOKENS)
- role: "guest" = 匿名用户token(无效,不应在AUTH_TOKENS中)
- 无role字段或其他值 = 无效token
Args:
response: HTTP响应对象
Returns:
token是否为有效的认证用户token
"""
# 首先检查HTTP状态码
if response.status_code != 200:
return ("unknown", False)
try:
# 尝试解析JSON响应
response_data = response.json()
if not isinstance(response_data, dict):
return ("unknown", False)
# 检查是否包含错误信息
if "error" in response_data:
return ("unknown", False)
if "message" in response_data and "error" in response_data.get("message", "").lower():
return ("unknown", False)
# 核心验证:检查role字段
role = response_data.get("role")
if role == "user":
return ("user", True)
elif role == "guest":
if not hasattr(self, '_guest_token_warned'):
logger.warning("⚠️ 检测到匿名用户token,建议仅在AUTH_TOKENS中配置认证用户token")
self._guest_token_warned = True
return ("guest", False)
else:
return ("unknown", False)
except (ValueError, Exception):
return ("unknown", False)
async def health_check_all(self, auth_url: str = "https://chat.z.ai/api/v1/auths/"):
"""异步健康检查所有token"""
if not self.token_statuses:
logger.warning("⚠️ Token池为空,跳过健康检查")
return
total_tokens = len(self.token_statuses)
logger.info(f"🔍 开始Token池健康检查... (共 {total_tokens} 个token)")
# 并发执行所有token的健康检查
tasks = []
token_list = list(self.token_statuses.keys())
for token in token_list:
task = self.health_check_token(token, auth_url)
tasks.append(task)
# 执行并收集结果
results = await asyncio.gather(*tasks, return_exceptions=True)
# 统计结果
healthy_count = 0
failed_count = 0
exception_count = 0
for i, result in enumerate(results):
if result is True:
healthy_count += 1
elif result is False:
failed_count += 1
else:
# 异常情况
exception_count += 1
token = token_list[i]
logger.error(f"💥 Token {token[:20]}... 健康检查异常: {result}")
health_rate = (healthy_count / total_tokens) * 100 if total_tokens > 0 else 0
if healthy_count == 0 and total_tokens > 0:
logger.warning(f"⚠️ 健康检查完成: 0/{total_tokens} 个token健康 - 请检查token配置")
elif failed_count > 0:
logger.warning(f"⚠️ 健康检查完成: {healthy_count}/{total_tokens} 个token健康 ({health_rate:.1f}%)")
else:
logger.info(f"✅ 健康检查完成: {healthy_count}/{total_tokens} 个token健康")
if exception_count > 0:
logger.error(f"💥 {exception_count} 个token检查异常")
# 全局token池实例
_token_pool: Optional[TokenPool] = None
_pool_lock = Lock()
def get_token_pool() -> Optional[TokenPool]:
"""获取全局token池实例"""
return _token_pool
def initialize_token_pool(tokens: List[str], failure_threshold: int = 3, recovery_timeout: int = 1800) -> TokenPool:
"""初始化全局token池"""
global _token_pool
with _pool_lock:
_token_pool = TokenPool(tokens, failure_threshold, recovery_timeout)
return _token_pool
def update_token_pool(tokens: List[str]):
"""更新全局token池"""
global _token_pool
with _pool_lock:
if _token_pool:
_token_pool.update_tokens(tokens)
else:
_token_pool = TokenPool(tokens)
|