File size: 11,075 Bytes
93e3c04
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9f61557
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
93e3c04
 
 
9f61557
 
 
 
93e3c04
 
 
 
 
 
 
 
 
 
 
9f61557
93e3c04
 
 
9f61557
93e3c04
 
9f61557
93e3c04
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
"""
Token pool management with load balancing and round-robin mechanism
"""

import os
import time
import threading
from typing import List, Optional, Dict, Any, Set
from dataclasses import dataclass, field


def debug_log(message: str, *args) -> None:
    """Log debug message if debug mode is enabled"""
    # Import here to avoid circular import
    try:
        from app.core.config import settings
        if settings.DEBUG_LOGGING:
            if args:
                print(f"[DEBUG] {message % args}")
            else:
                print(f"[DEBUG] {message}")
    except:
        # Fallback if settings not available
        print(f"[DEBUG] {message}")


@dataclass
class TokenInfo:
    """Token information with failure tracking"""
    token: str
    failure_count: int = 0
    is_active: bool = True
    last_failure_time: Optional[float] = None
    last_used_time: Optional[float] = None


class TokenManager:
    """Token pool manager with load balancing and failure handling"""
    
    def __init__(self, token_file_path: str = None):
        try:
            from app.core.config import settings
            self.token_file_path = token_file_path or getattr(settings, 'TOKEN_FILE_PATH', './tokens.txt')
            self.max_failures = getattr(settings, 'TOKEN_MAX_FAILURES', 3)
            self.reload_interval = getattr(settings, 'TOKEN_RELOAD_INTERVAL', 60)
        except ImportError:
            # Fallback values if settings not available
            self.token_file_path = token_file_path or './tokens.txt'
            self.max_failures = 3
            self.reload_interval = 60
        
        self.tokens: List[TokenInfo] = []
        self.current_index = 0
        self.last_reload_time = 0
        self._lock = threading.Lock()
        
        # Load tokens on initialization
        self._load_tokens()
    
    def _load_tokens(self) -> None:
        """Load tokens from file"""
        try:
            new_tokens = []
            
            # 首先尝试从tokens.txt文件加载token
            if os.path.exists(self.token_file_path):
                with open(self.token_file_path, 'r', encoding='utf-8') as f:
                    lines = f.readlines()
                
                for line in lines:
                    token = line.strip()
                    if token and not token.startswith('#'):  # Skip empty lines and comments
                        # Check if this token already exists to preserve failure count
                        existing_token = next((t for t in self.tokens if t.token == token), None)
                        if existing_token:
                            new_tokens.append(existing_token)
                        else:
                            new_tokens.append(TokenInfo(token=token))
                
                if new_tokens:
                    debug_log(f"从tokens.txt文件加载了 {len(new_tokens)} 个token")
                else:
                    debug_log("Token文件为空或无有效token")
            
            # 然后尝试从BACKUP_TOKEN环境变量加载token
            try:
                from app.core.config import settings
                if hasattr(settings, 'BACKUP_TOKEN') and settings.BACKUP_TOKEN:
                    # 支持多个BACKUP_TOKEN值,以逗号分隔
                    backup_tokens = [token.strip() for token in settings.BACKUP_TOKEN.split(',') if token.strip()]
                    
                    # 添加不重复的backup token
                    for backup_token in backup_tokens:
                        # 检查是否已经存在相同的token
                        existing_token = next((t for t in new_tokens if t.token == backup_token), None)
                        if not existing_token:
                            # 检查是否在原有tokens中存在,以保留失败计数
                            old_token = next((t for t in self.tokens if t.token == backup_token), None)
                            if old_token:
                                new_tokens.append(old_token)
                            else:
                                new_tokens.append(TokenInfo(token=backup_token))
                    
                    debug_log(f"从BACKUP_TOKEN加载了 {len(backup_tokens)} 个token")
            except ImportError:
                pass
            
            # 如果没有任何token,尝试仅使用BACKUP_TOKEN
            if not new_tokens:
                try:
                    from app.core.config import settings
                    if hasattr(settings, 'BACKUP_TOKEN') and settings.BACKUP_TOKEN:
                        # 支持多个BACKUP_TOKEN值,以逗号分隔
                        backup_tokens = [token.strip() for token in settings.BACKUP_TOKEN.split(',') if token.strip()]
                        new_tokens = [TokenInfo(token=token) for token in backup_tokens]
                        debug_log(f"仅使用BACKUP_TOKEN,共{len(backup_tokens)}个token")
                except ImportError:
                    pass
            
            if new_tokens:
                with self._lock:
                    self.tokens = new_tokens
                    # Reset index if it's out of bounds
                    if self.current_index >= len(self.tokens):
                        self.current_index = 0
                    self.last_reload_time = time.time()
                
                debug_log(f"总共加载了 {len(self.tokens)} 个token")
                active_count = sum(1 for t in self.tokens if t.is_active)
                debug_log(f"活跃token数量: {active_count}")
            else:
                debug_log("没有找到任何可用的token")
                
        except Exception as e:
            debug_log(f"加载token失败: {e}")
    
    def _should_reload(self) -> bool:
        """Check if tokens should be reloaded"""
        return time.time() - self.last_reload_time > self.reload_interval
    
    def get_next_token(self) -> Optional[str]:
        """Get next available token using round-robin with load balancing"""
        # Reload tokens if needed
        if self._should_reload():
            self._load_tokens()
        
        with self._lock:
            if not self.tokens:
                debug_log("没有可用的token")
                return None
            
            # Find active tokens
            active_tokens = [i for i, t in enumerate(self.tokens) if t.is_active]
            
            if not active_tokens:
                debug_log("没有活跃的token,尝试重置失败计数")
                # Reset all tokens if none are active (maybe temporary network issues)
                for token in self.tokens:
                    token.is_active = True
                    token.failure_count = 0
                active_tokens = list(range(len(self.tokens)))
            
            # Round-robin selection from active tokens
            attempts = 0
            max_attempts = len(active_tokens)
            
            while attempts < max_attempts:
                # Find next active token starting from current_index
                token_index = None
                for i in range(len(self.tokens)):
                    idx = (self.current_index + i) % len(self.tokens)
                    if idx in active_tokens:
                        token_index = idx
                        break
                
                if token_index is not None:
                    self.current_index = (token_index + 1) % len(self.tokens)
                    token_info = self.tokens[token_index]
                    token_info.last_used_time = time.time()
                    debug_log(f"选择token[{token_index}]: {token_info.token[:20]}...")
                    return token_info.token
                
                attempts += 1
            
            debug_log("无法找到可用的token")
            return None
    
    def mark_token_failed(self, token: str) -> None:
        """Mark a token as failed and deactivate if necessary"""
        with self._lock:
            for token_info in self.tokens:
                if token_info.token == token:
                    token_info.failure_count += 1
                    token_info.last_failure_time = time.time()
                    
                    if token_info.failure_count >= self.max_failures:
                        token_info.is_active = False
                        debug_log(f"Token失效 (失败{token_info.failure_count}次): {token[:20]}...")
                    else:
                        debug_log(f"Token失败 ({token_info.failure_count}/{self.max_failures}): {token[:20]}...")
                    break
    
    def mark_token_success(self, token: str) -> None:
        """Mark a token as successful (reset failure count)"""
        with self._lock:
            for token_info in self.tokens:
                if token_info.token == token:
                    if token_info.failure_count > 0:
                        debug_log(f"Token恢复正常: {token[:20]}...")
                    token_info.failure_count = 0
                    token_info.is_active = True
                    break
    
    def get_token_stats(self) -> Dict[str, Any]:
        """Get token pool statistics"""
        with self._lock:
            if not self.tokens:
                return {
                    "total": 0,
                    "active": 0,
                    "failed": 0,
                    "tokens": []
                }
            
            active_count = sum(1 for t in self.tokens if t.is_active)
            failed_count = len(self.tokens) - active_count
            
            token_details = []
            for i, token_info in enumerate(self.tokens):
                token_details.append({
                    "index": i,
                    "token_preview": token_info.token[:20] + "...",
                    "is_active": token_info.is_active,
                    "failure_count": token_info.failure_count,
                    "last_failure_time": token_info.last_failure_time,
                    "last_used_time": token_info.last_used_time
                })
            
            return {
                "total": len(self.tokens),
                "active": active_count,
                "failed": failed_count,
                "current_index": self.current_index,
                "last_reload_time": self.last_reload_time,
                "tokens": token_details
            }
    
    def reset_all_tokens(self) -> None:
        """Reset all tokens (clear failure counts and reactivate)"""
        with self._lock:
            for token_info in self.tokens:
                token_info.is_active = True
                token_info.failure_count = 0
                token_info.last_failure_time = None
            debug_log("已重置所有token状态")
    
    def reload_tokens(self) -> None:
        """Force reload tokens from file"""
        debug_log("强制重新加载token文件")
        self._load_tokens()


# Global token manager instance
token_manager = TokenManager()