|
|
""" |
|
|
Token pool management with load balancing and round-robin mechanism |
|
|
""" |
|
|
|
|
|
import os |
|
|
import time |
|
|
import threading |
|
|
from typing import List, Optional, Dict, Any, Set |
|
|
from dataclasses import dataclass, field |
|
|
|
|
|
|
|
|
def debug_log(message: str, *args) -> None: |
|
|
"""Log debug message if debug mode is enabled""" |
|
|
|
|
|
try: |
|
|
from app.core.config import settings |
|
|
if settings.DEBUG_LOGGING: |
|
|
if args: |
|
|
print(f"[DEBUG] {message % args}") |
|
|
else: |
|
|
print(f"[DEBUG] {message}") |
|
|
except: |
|
|
|
|
|
print(f"[DEBUG] {message}") |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class TokenInfo: |
|
|
"""Token information with failure tracking""" |
|
|
token: str |
|
|
failure_count: int = 0 |
|
|
is_active: bool = True |
|
|
last_failure_time: Optional[float] = None |
|
|
last_used_time: Optional[float] = None |
|
|
|
|
|
|
|
|
class TokenManager: |
|
|
"""Token pool manager with load balancing and failure handling""" |
|
|
|
|
|
def __init__(self, token_file_path: str = None): |
|
|
try: |
|
|
from app.core.config import settings |
|
|
self.token_file_path = token_file_path or getattr(settings, 'TOKEN_FILE_PATH', './tokens.txt') |
|
|
self.max_failures = getattr(settings, 'TOKEN_MAX_FAILURES', 3) |
|
|
self.reload_interval = getattr(settings, 'TOKEN_RELOAD_INTERVAL', 60) |
|
|
except ImportError: |
|
|
|
|
|
self.token_file_path = token_file_path or './tokens.txt' |
|
|
self.max_failures = 3 |
|
|
self.reload_interval = 60 |
|
|
|
|
|
self.tokens: List[TokenInfo] = [] |
|
|
self.current_index = 0 |
|
|
self.last_reload_time = 0 |
|
|
self._lock = threading.Lock() |
|
|
|
|
|
|
|
|
self._load_tokens() |
|
|
|
|
|
def _load_tokens(self) -> None: |
|
|
"""Load tokens from file""" |
|
|
try: |
|
|
new_tokens = [] |
|
|
|
|
|
|
|
|
if os.path.exists(self.token_file_path): |
|
|
with open(self.token_file_path, 'r', encoding='utf-8') as f: |
|
|
lines = f.readlines() |
|
|
|
|
|
for line in lines: |
|
|
token = line.strip() |
|
|
if token and not token.startswith('#'): |
|
|
|
|
|
existing_token = next((t for t in self.tokens if t.token == token), None) |
|
|
if existing_token: |
|
|
new_tokens.append(existing_token) |
|
|
else: |
|
|
new_tokens.append(TokenInfo(token=token)) |
|
|
|
|
|
if new_tokens: |
|
|
debug_log(f"从tokens.txt文件加载了 {len(new_tokens)} 个token") |
|
|
else: |
|
|
debug_log("Token文件为空或无有效token") |
|
|
|
|
|
|
|
|
try: |
|
|
from app.core.config import settings |
|
|
if hasattr(settings, 'BACKUP_TOKEN') and settings.BACKUP_TOKEN: |
|
|
|
|
|
backup_tokens = [token.strip() for token in settings.BACKUP_TOKEN.split(',') if token.strip()] |
|
|
|
|
|
|
|
|
for backup_token in backup_tokens: |
|
|
|
|
|
existing_token = next((t for t in new_tokens if t.token == backup_token), None) |
|
|
if not existing_token: |
|
|
|
|
|
old_token = next((t for t in self.tokens if t.token == backup_token), None) |
|
|
if old_token: |
|
|
new_tokens.append(old_token) |
|
|
else: |
|
|
new_tokens.append(TokenInfo(token=backup_token)) |
|
|
|
|
|
debug_log(f"从BACKUP_TOKEN加载了 {len(backup_tokens)} 个token") |
|
|
except ImportError: |
|
|
pass |
|
|
|
|
|
|
|
|
if not new_tokens: |
|
|
try: |
|
|
from app.core.config import settings |
|
|
if hasattr(settings, 'BACKUP_TOKEN') and settings.BACKUP_TOKEN: |
|
|
|
|
|
backup_tokens = [token.strip() for token in settings.BACKUP_TOKEN.split(',') if token.strip()] |
|
|
new_tokens = [TokenInfo(token=token) for token in backup_tokens] |
|
|
debug_log(f"仅使用BACKUP_TOKEN,共{len(backup_tokens)}个token") |
|
|
except ImportError: |
|
|
pass |
|
|
|
|
|
if new_tokens: |
|
|
with self._lock: |
|
|
self.tokens = new_tokens |
|
|
|
|
|
if self.current_index >= len(self.tokens): |
|
|
self.current_index = 0 |
|
|
self.last_reload_time = time.time() |
|
|
|
|
|
debug_log(f"总共加载了 {len(self.tokens)} 个token") |
|
|
active_count = sum(1 for t in self.tokens if t.is_active) |
|
|
debug_log(f"活跃token数量: {active_count}") |
|
|
else: |
|
|
debug_log("没有找到任何可用的token") |
|
|
|
|
|
except Exception as e: |
|
|
debug_log(f"加载token失败: {e}") |
|
|
|
|
|
def _should_reload(self) -> bool: |
|
|
"""Check if tokens should be reloaded""" |
|
|
return time.time() - self.last_reload_time > self.reload_interval |
|
|
|
|
|
def get_next_token(self) -> Optional[str]: |
|
|
"""Get next available token using round-robin with load balancing""" |
|
|
|
|
|
if self._should_reload(): |
|
|
self._load_tokens() |
|
|
|
|
|
with self._lock: |
|
|
if not self.tokens: |
|
|
debug_log("没有可用的token") |
|
|
return None |
|
|
|
|
|
|
|
|
active_tokens = [i for i, t in enumerate(self.tokens) if t.is_active] |
|
|
|
|
|
if not active_tokens: |
|
|
debug_log("没有活跃的token,尝试重置失败计数") |
|
|
|
|
|
for token in self.tokens: |
|
|
token.is_active = True |
|
|
token.failure_count = 0 |
|
|
active_tokens = list(range(len(self.tokens))) |
|
|
|
|
|
|
|
|
attempts = 0 |
|
|
max_attempts = len(active_tokens) |
|
|
|
|
|
while attempts < max_attempts: |
|
|
|
|
|
token_index = None |
|
|
for i in range(len(self.tokens)): |
|
|
idx = (self.current_index + i) % len(self.tokens) |
|
|
if idx in active_tokens: |
|
|
token_index = idx |
|
|
break |
|
|
|
|
|
if token_index is not None: |
|
|
self.current_index = (token_index + 1) % len(self.tokens) |
|
|
token_info = self.tokens[token_index] |
|
|
token_info.last_used_time = time.time() |
|
|
debug_log(f"选择token[{token_index}]: {token_info.token[:20]}...") |
|
|
return token_info.token |
|
|
|
|
|
attempts += 1 |
|
|
|
|
|
debug_log("无法找到可用的token") |
|
|
return None |
|
|
|
|
|
def mark_token_failed(self, token: str) -> None: |
|
|
"""Mark a token as failed and deactivate if necessary""" |
|
|
with self._lock: |
|
|
for token_info in self.tokens: |
|
|
if token_info.token == token: |
|
|
token_info.failure_count += 1 |
|
|
token_info.last_failure_time = time.time() |
|
|
|
|
|
if token_info.failure_count >= self.max_failures: |
|
|
token_info.is_active = False |
|
|
debug_log(f"Token失效 (失败{token_info.failure_count}次): {token[:20]}...") |
|
|
else: |
|
|
debug_log(f"Token失败 ({token_info.failure_count}/{self.max_failures}): {token[:20]}...") |
|
|
break |
|
|
|
|
|
def mark_token_success(self, token: str) -> None: |
|
|
"""Mark a token as successful (reset failure count)""" |
|
|
with self._lock: |
|
|
for token_info in self.tokens: |
|
|
if token_info.token == token: |
|
|
if token_info.failure_count > 0: |
|
|
debug_log(f"Token恢复正常: {token[:20]}...") |
|
|
token_info.failure_count = 0 |
|
|
token_info.is_active = True |
|
|
break |
|
|
|
|
|
def get_token_stats(self) -> Dict[str, Any]: |
|
|
"""Get token pool statistics""" |
|
|
with self._lock: |
|
|
if not self.tokens: |
|
|
return { |
|
|
"total": 0, |
|
|
"active": 0, |
|
|
"failed": 0, |
|
|
"tokens": [] |
|
|
} |
|
|
|
|
|
active_count = sum(1 for t in self.tokens if t.is_active) |
|
|
failed_count = len(self.tokens) - active_count |
|
|
|
|
|
token_details = [] |
|
|
for i, token_info in enumerate(self.tokens): |
|
|
token_details.append({ |
|
|
"index": i, |
|
|
"token_preview": token_info.token[:20] + "...", |
|
|
"is_active": token_info.is_active, |
|
|
"failure_count": token_info.failure_count, |
|
|
"last_failure_time": token_info.last_failure_time, |
|
|
"last_used_time": token_info.last_used_time |
|
|
}) |
|
|
|
|
|
return { |
|
|
"total": len(self.tokens), |
|
|
"active": active_count, |
|
|
"failed": failed_count, |
|
|
"current_index": self.current_index, |
|
|
"last_reload_time": self.last_reload_time, |
|
|
"tokens": token_details |
|
|
} |
|
|
|
|
|
def reset_all_tokens(self) -> None: |
|
|
"""Reset all tokens (clear failure counts and reactivate)""" |
|
|
with self._lock: |
|
|
for token_info in self.tokens: |
|
|
token_info.is_active = True |
|
|
token_info.failure_count = 0 |
|
|
token_info.last_failure_time = None |
|
|
debug_log("已重置所有token状态") |
|
|
|
|
|
def reload_tokens(self) -> None: |
|
|
"""Force reload tokens from file""" |
|
|
debug_log("强制重新加载token文件") |
|
|
self._load_tokens() |
|
|
|
|
|
|
|
|
|
|
|
token_manager = TokenManager() |