zhaoxiaozhao07 commited on
Commit
93e3c04
·
1 Parent(s): 962759f

feat(core): 增加 token 池管理和请求重试逻辑

Browse files

- 新增 token 池配置和管理逻辑,支持多 token 负载均衡
- 实现请求重试机制,包括处理网络错误、速率限制和服务器错误
- 优化上游 API 调用流程,增加错误处理和日志记录
- 调整流式响应处理逻辑,提高容错性和稳定性

app/api/admin.py ADDED
@@ -0,0 +1,60 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Admin API endpoints for token management
3
+ """
4
+
5
+ from fastapi import APIRouter, Depends, HTTPException, status
6
+ from fastapi.security import HTTPBearer
7
+ from fastapi.security.http import HTTPAuthorizationCredentials
8
+ from typing import Dict, Any
9
+
10
+ from app.core.config import settings
11
+ from app.core.token_manager import token_manager
12
+
13
+ router = APIRouter(prefix="/admin", tags=["admin"])
14
+ security = HTTPBearer()
15
+
16
+
17
+ def verify_admin_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
18
+ """Verify admin authentication token"""
19
+ if settings.SKIP_AUTH_TOKEN:
20
+ return credentials.credentials
21
+
22
+ if credentials.credentials != settings.AUTH_TOKEN:
23
+ raise HTTPException(
24
+ status_code=status.HTTP_401_UNAUTHORIZED,
25
+ detail="Invalid authentication credentials",
26
+ headers={"WWW-Authenticate": "Bearer"},
27
+ )
28
+ return credentials.credentials
29
+
30
+
31
+ @router.get("/token-stats")
32
+ async def get_token_stats(token: str = Depends(verify_admin_token)) -> Dict[str, Any]:
33
+ """Get token pool statistics"""
34
+ return token_manager.get_token_stats()
35
+
36
+
37
+ @router.post("/reload-tokens")
38
+ async def reload_tokens(token: str = Depends(verify_admin_token)) -> Dict[str, str]:
39
+ """Force reload tokens from file"""
40
+ try:
41
+ token_manager.reload_tokens()
42
+ return {"message": "Token池已重新加载"}
43
+ except Exception as e:
44
+ raise HTTPException(
45
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
46
+ detail=f"重新加载失败: {str(e)}"
47
+ )
48
+
49
+
50
+ @router.post("/reset-tokens")
51
+ async def reset_tokens(token: str = Depends(verify_admin_token)) -> Dict[str, str]:
52
+ """Reset all tokens (clear failure counts)"""
53
+ try:
54
+ token_manager.reset_all_tokens()
55
+ return {"message": "所有token状态已重置"}
56
+ except Exception as e:
57
+ raise HTTPException(
58
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
59
+ detail=f"重置失败: {str(e)}"
60
+ )
app/core/config.py CHANGED
@@ -32,6 +32,20 @@ class Settings(BaseSettings):
32
  SCAN_LIMIT: int = int(os.getenv("SCAN_LIMIT", "200000"))
33
  SKIP_AUTH_TOKEN: bool = os.getenv("SKIP_AUTH_TOKEN", "false").lower() == "true"
34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
  # Browser Headers
36
  CLIENT_HEADERS: Dict[str, str] = {
37
  "Content-Type": "application/json",
 
32
  SCAN_LIMIT: int = int(os.getenv("SCAN_LIMIT", "200000"))
33
  SKIP_AUTH_TOKEN: bool = os.getenv("SKIP_AUTH_TOKEN", "false").lower() == "true"
34
 
35
+ # Token Pool Configuration
36
+ TOKEN_FILE_PATH: str = os.getenv("TOKEN_FILE_PATH", "./tokens.txt")
37
+ TOKEN_MAX_FAILURES: int = int(os.getenv("TOKEN_MAX_FAILURES", "3"))
38
+ TOKEN_RELOAD_INTERVAL: int = int(os.getenv("TOKEN_RELOAD_INTERVAL", "60"))
39
+
40
+ # Request Configuration
41
+ REQUEST_TIMEOUT: int = int(os.getenv("REQUEST_TIMEOUT", "120"))
42
+ CONNECTION_TIMEOUT: int = int(os.getenv("CONNECTION_TIMEOUT", "30"))
43
+ MAX_RETRIES: int = int(os.getenv("MAX_RETRIES", "3"))
44
+
45
+ # Proxy Configuration
46
+ HTTP_PROXY: Optional[str] = os.getenv("HTTP_PROXY")
47
+ HTTPS_PROXY: Optional[str] = os.getenv("HTTPS_PROXY")
48
+
49
  # Browser Headers
50
  CLIENT_HEADERS: Dict[str, str] = {
51
  "Content-Type": "application/json",
app/core/response_handlers.py CHANGED
@@ -15,6 +15,7 @@ from app.models.schemas import (
15
  UpstreamRequest, UpstreamData, UpstreamError, ModelItem
16
  )
17
  from app.utils.helpers import debug_log, call_upstream_api, transform_thinking_content
 
18
  from app.utils.sse_parser import SSEParser
19
  from app.utils.tools import extract_tool_invocations, remove_tool_json_content
20
 
@@ -61,11 +62,96 @@ class ResponseHandler:
61
 
62
  def _call_upstream(self) -> requests.Response:
63
  """Call upstream API with error handling"""
64
- try:
65
- return call_upstream_api(self.upstream_req, self.chat_id, self.auth_token)
66
- except Exception as e:
67
- debug_log(f"调用上游失败: {e}")
68
- raise
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
 
70
  def _handle_upstream_error(self, response: requests.Response) -> None:
71
  """Handle upstream error response"""
@@ -108,28 +194,51 @@ class StreamResponseHandler(ResponseHandler):
108
  # Process stream
109
  debug_log("开始读取上游SSE流")
110
  sent_initial_answer = False
 
111
 
112
- with SSEParser(response, debug_mode=settings.DEBUG_LOGGING) as parser:
113
- for event in parser.iter_json_data(UpstreamData):
114
- upstream_data = event['data']
115
-
116
- # Check for errors
117
- if self._has_error(upstream_data):
118
- error = self._get_error(upstream_data)
119
- yield from handle_upstream_error(error)
120
- break
121
-
122
- debug_log(f"解析成功 - 类型: {upstream_data.type}, 阶段: {upstream_data.data.phase}, "
123
- f"内容长度: {len(upstream_data.data.delta_content)}, 完成: {upstream_data.data.done}")
124
-
125
- # Process content
126
- yield from self._process_content(upstream_data, sent_initial_answer)
127
-
128
- # Check if done
129
- if upstream_data.data.done or upstream_data.data.phase == "done":
130
- debug_log("检测到流结束信号")
131
- yield from self._send_end_chunk()
132
- break
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
133
 
134
  def _has_error(self, upstream_data: UpstreamData) -> bool:
135
  """Check if upstream data contains error"""
@@ -203,15 +312,16 @@ class StreamResponseHandler(ResponseHandler):
203
  parts = edit_content.split("</details>")
204
  return parts[1] if len(parts) > 1 else ""
205
 
206
- def _send_end_chunk(self) -> Generator[str, None, None]:
207
  """Send end chunk and DONE signal"""
208
  finish_reason = "stop"
209
 
210
- if self.has_tools:
211
  # Try to extract tool calls from buffered content
212
  self.tool_calls = extract_tool_invocations(self.buffered_content)
213
 
214
  if self.tool_calls:
 
215
  # Send tool calls with proper format
216
  for i, tc in enumerate(self.tool_calls):
217
  tool_call_delta = {
@@ -232,11 +342,21 @@ class StreamResponseHandler(ResponseHandler):
232
  # Send regular content
233
  trimmed_content = remove_tool_json_content(self.buffered_content)
234
  if trimmed_content:
 
235
  content_chunk = create_openai_response_chunk(
236
  model=settings.PRIMARY_MODEL,
237
  delta=Delta(content=trimmed_content)
238
  )
239
  yield f"data: {content_chunk.model_dump_json()}\n\n"
 
 
 
 
 
 
 
 
 
240
 
241
  # Send final chunk
242
  end_chunk = create_openai_response_chunk(
@@ -245,7 +365,7 @@ class StreamResponseHandler(ResponseHandler):
245
  )
246
  yield f"data: {end_chunk.model_dump_json()}\n\n"
247
  yield "data: [DONE]\n\n"
248
- debug_log("流式响应完成")
249
 
250
 
251
  class NonStreamResponseHandler(ResponseHandler):
@@ -272,23 +392,38 @@ class NonStreamResponseHandler(ResponseHandler):
272
  # Collect full response
273
  full_content = []
274
  debug_log("开始收集完整响应内容")
 
275
 
276
- with SSEParser(response, debug_mode=settings.DEBUG_LOGGING) as parser:
277
- for event in parser.iter_json_data(UpstreamData):
278
- upstream_data = event['data']
279
-
280
- if upstream_data.data.delta_content:
281
- content = upstream_data.data.delta_content
282
 
283
- if upstream_data.data.phase == "thinking":
284
- content = transform_thinking_content(content)
 
 
 
 
 
 
285
 
286
- if content:
287
- full_content.append(content)
288
-
289
- if upstream_data.data.done or upstream_data.data.phase == "done":
290
- debug_log("检测到完成信号,停止收集")
291
- break
 
 
 
 
 
 
 
 
 
 
292
 
293
  final_content = "".join(full_content)
294
  debug_log(f"内容收集完成,最终长度: {len(final_content)}")
 
15
  UpstreamRequest, UpstreamData, UpstreamError, ModelItem
16
  )
17
  from app.utils.helpers import debug_log, call_upstream_api, transform_thinking_content
18
+ from app.core.token_manager import token_manager
19
  from app.utils.sse_parser import SSEParser
20
  from app.utils.tools import extract_tool_invocations, remove_tool_json_content
21
 
 
62
 
63
  def _call_upstream(self) -> requests.Response:
64
  """Call upstream API with error handling"""
65
+ max_retries = settings.MAX_RETRIES
66
+ retry_count = 0
67
+
68
+ while retry_count < max_retries:
69
+ try:
70
+ debug_log(f"尝试调用上游API (第 {retry_count + 1}/{max_retries} 次)")
71
+ response = call_upstream_api(self.upstream_req, self.chat_id, self.auth_token)
72
+
73
+ # Check if response is successful
74
+ if response.status_code == 200:
75
+ # Mark token as successful
76
+ token_manager.mark_token_success(self.auth_token)
77
+ debug_log("上游API调用成功")
78
+ return response
79
+ elif response.status_code in [401, 403]:
80
+ # Authentication/authorization error - mark token as failed
81
+ debug_log(f"Token认证失败 (状态码: {response.status_code}): {self.auth_token[:20]}...")
82
+ token_manager.mark_token_failed(self.auth_token)
83
+
84
+ # Try to get a new token
85
+ new_token = token_manager.get_next_token()
86
+ if new_token and new_token != self.auth_token:
87
+ debug_log(f"尝试使用新token: {new_token[:20]}...")
88
+ self.auth_token = new_token
89
+ retry_count += 1
90
+ continue
91
+ else:
92
+ debug_log("没有更多可用token")
93
+ return response
94
+ elif response.status_code in [429]:
95
+ # Rate limit - don't mark token as failed, just retry
96
+ debug_log(f"遇到速率限制 (状态码: {response.status_code}),等待后重试")
97
+ if retry_count < max_retries - 1:
98
+ import time
99
+ time.sleep(2 ** retry_count) # 指数退避
100
+ retry_count += 1
101
+ continue
102
+ else:
103
+ return response
104
+ elif response.status_code >= 500:
105
+ # Server error - retry without marking token as failed
106
+ debug_log(f"服务器错误 (状态码: {response.status_code}),稍后重试")
107
+ if retry_count < max_retries - 1:
108
+ import time
109
+ time.sleep(1)
110
+ retry_count += 1
111
+ continue
112
+ else:
113
+ return response
114
+ else:
115
+ # Other client errors, return response as-is
116
+ debug_log(f"客户端错误 (状态码: {response.status_code})")
117
+ return response
118
+
119
+ except Exception as e:
120
+ error_msg = str(e)
121
+ debug_log(f"调用上游失败 (尝试 {retry_count + 1}/{max_retries}): {error_msg}")
122
+
123
+ # 判断是否是连接问题还是token问题
124
+ is_connection_error = any(keyword in error_msg.lower() for keyword in [
125
+ 'connection', 'timeout', 'network', 'dns', 'socket', 'ssl'
126
+ ])
127
+
128
+ if is_connection_error:
129
+ debug_log("检测到网络连接问题,不标记token失败")
130
+ # 网络问题不标记token失败,直接重试
131
+ if retry_count < max_retries - 1:
132
+ import time
133
+ time.sleep(2) # 等待2秒后重试
134
+ retry_count += 1
135
+ continue
136
+ else:
137
+ raise Exception(f"网络连接问题,重试{max_retries}次后仍失败: {error_msg}")
138
+ else:
139
+ # 其他错误可能是token问题,标记失败并尝试新token
140
+ debug_log("检测到可能的token问题,标记token失败")
141
+ token_manager.mark_token_failed(self.auth_token)
142
+
143
+ # Try to get a new token
144
+ new_token = token_manager.get_next_token()
145
+ if new_token and new_token != self.auth_token and retry_count < max_retries - 1:
146
+ debug_log(f"尝试��用新token: {new_token[:20]}...")
147
+ self.auth_token = new_token
148
+ retry_count += 1
149
+ continue
150
+ else:
151
+ raise
152
+
153
+ # If we get here, all retries failed
154
+ raise Exception("所有重试尝试均失败")
155
 
156
  def _handle_upstream_error(self, response: requests.Response) -> None:
157
  """Handle upstream error response"""
 
194
  # Process stream
195
  debug_log("开始读取上游SSE流")
196
  sent_initial_answer = False
197
+ stream_ended_normally = False
198
 
199
+ try:
200
+ with SSEParser(response, debug_mode=settings.DEBUG_LOGGING) as parser:
201
+ for event in parser.iter_json_data(UpstreamData):
202
+ upstream_data = event['data']
203
+
204
+ # Check for errors
205
+ if self._has_error(upstream_data):
206
+ error = self._get_error(upstream_data)
207
+ yield from handle_upstream_error(error)
208
+ stream_ended_normally = True
209
+ break
210
+
211
+ debug_log(f"解析成功 - 类型: {upstream_data.type}, 阶段: {upstream_data.data.phase}, "
212
+ f"内容长度: {len(upstream_data.data.delta_content or '')}, 完成: {upstream_data.data.done}")
213
+
214
+ # Process content
215
+ yield from self._process_content(upstream_data, sent_initial_answer)
216
+
217
+ # Update sent_initial_answer flag if we sent content
218
+ if not sent_initial_answer and (upstream_data.data.delta_content or upstream_data.data.edit_content):
219
+ sent_initial_answer = True
220
+
221
+ # Check if done
222
+ if upstream_data.data.done or upstream_data.data.phase == "done":
223
+ debug_log("检测到流结束信号")
224
+ yield from self._send_end_chunk()
225
+ stream_ended_normally = True
226
+ break
227
+
228
+ except Exception as e:
229
+ debug_log(f"SSE流处理异常: {e}")
230
+ # 流异常结束,发送错误响应
231
+ if not stream_ended_normally:
232
+ error_chunk = create_openai_response_chunk(
233
+ model=settings.PRIMARY_MODEL,
234
+ delta=Delta(content=f"\n\n[系统提示: 连接中断,响应可能不完整]")
235
+ )
236
+ yield f"data: {error_chunk.model_dump_json()}\n\n"
237
+
238
+ # 确保流正常结束
239
+ if not stream_ended_normally:
240
+ debug_log("流未正常结束,发送结束信号")
241
+ yield from self._send_end_chunk(force_stop=True)
242
 
243
  def _has_error(self, upstream_data: UpstreamData) -> bool:
244
  """Check if upstream data contains error"""
 
312
  parts = edit_content.split("</details>")
313
  return parts[1] if len(parts) > 1 else ""
314
 
315
+ def _send_end_chunk(self, force_stop: bool = False) -> Generator[str, None, None]:
316
  """Send end chunk and DONE signal"""
317
  finish_reason = "stop"
318
 
319
+ if self.has_tools and not force_stop:
320
  # Try to extract tool calls from buffered content
321
  self.tool_calls = extract_tool_invocations(self.buffered_content)
322
 
323
  if self.tool_calls:
324
+ debug_log(f"检测到工具调用: {len(self.tool_calls)} 个")
325
  # Send tool calls with proper format
326
  for i, tc in enumerate(self.tool_calls):
327
  tool_call_delta = {
 
342
  # Send regular content
343
  trimmed_content = remove_tool_json_content(self.buffered_content)
344
  if trimmed_content:
345
+ debug_log(f"发送常规内容: {len(trimmed_content)} 字符")
346
  content_chunk = create_openai_response_chunk(
347
  model=settings.PRIMARY_MODEL,
348
  delta=Delta(content=trimmed_content)
349
  )
350
  yield f"data: {content_chunk.model_dump_json()}\n\n"
351
+ elif force_stop:
352
+ # 强制结束时,发送缓冲的内容(如果有)
353
+ if self.buffered_content:
354
+ debug_log(f"强制结束,发送缓冲内容: {len(self.buffered_content)} 字符")
355
+ content_chunk = create_openai_response_chunk(
356
+ model=settings.PRIMARY_MODEL,
357
+ delta=Delta(content=self.buffered_content)
358
+ )
359
+ yield f"data: {content_chunk.model_dump_json()}\n\n"
360
 
361
  # Send final chunk
362
  end_chunk = create_openai_response_chunk(
 
365
  )
366
  yield f"data: {end_chunk.model_dump_json()}\n\n"
367
  yield "data: [DONE]\n\n"
368
+ debug_log(f"流式响应完成 (finish_reason: {finish_reason})")
369
 
370
 
371
  class NonStreamResponseHandler(ResponseHandler):
 
392
  # Collect full response
393
  full_content = []
394
  debug_log("开始收集完整响应内容")
395
+ response_completed = False
396
 
397
+ try:
398
+ with SSEParser(response, debug_mode=settings.DEBUG_LOGGING) as parser:
399
+ for event in parser.iter_json_data(UpstreamData):
400
+ upstream_data = event['data']
 
 
401
 
402
+ if upstream_data.data.delta_content:
403
+ content = upstream_data.data.delta_content
404
+
405
+ if upstream_data.data.phase == "thinking":
406
+ content = transform_thinking_content(content)
407
+
408
+ if content:
409
+ full_content.append(content)
410
 
411
+ if upstream_data.data.done or upstream_data.data.phase == "done":
412
+ debug_log("检测到完成信号,停止收集")
413
+ response_completed = True
414
+ break
415
+
416
+ except Exception as e:
417
+ debug_log(f"非流式响应收集异常: {e}")
418
+ if not full_content:
419
+ # 如果没有收集到任何内容,抛出异常
420
+ raise HTTPException(status_code=502, detail=f"Response collection failed: {str(e)}")
421
+ else:
422
+ debug_log(f"部分内容收集成功,继续处理 ({len(full_content)} 个片段)")
423
+
424
+ if not response_completed and not full_content:
425
+ debug_log("响应未完成且无内容,可能是连接问题")
426
+ raise HTTPException(status_code=502, detail="Incomplete response from upstream")
427
 
428
  final_content = "".join(full_content)
429
  debug_log(f"内容收集完成,最终长度: {len(final_content)}")
app/core/token_manager.py ADDED
@@ -0,0 +1,233 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Token pool management with load balancing and round-robin mechanism
3
+ """
4
+
5
+ import os
6
+ import time
7
+ import threading
8
+ from typing import List, Optional, Dict, Any, Set
9
+ from dataclasses import dataclass, field
10
+
11
+
12
+ def debug_log(message: str, *args) -> None:
13
+ """Log debug message if debug mode is enabled"""
14
+ # Import here to avoid circular import
15
+ try:
16
+ from app.core.config import settings
17
+ if settings.DEBUG_LOGGING:
18
+ if args:
19
+ print(f"[DEBUG] {message % args}")
20
+ else:
21
+ print(f"[DEBUG] {message}")
22
+ except:
23
+ # Fallback if settings not available
24
+ print(f"[DEBUG] {message}")
25
+
26
+
27
+ @dataclass
28
+ class TokenInfo:
29
+ """Token information with failure tracking"""
30
+ token: str
31
+ failure_count: int = 0
32
+ is_active: bool = True
33
+ last_failure_time: Optional[float] = None
34
+ last_used_time: Optional[float] = None
35
+
36
+
37
+ class TokenManager:
38
+ """Token pool manager with load balancing and failure handling"""
39
+
40
+ def __init__(self, token_file_path: str = None):
41
+ try:
42
+ from app.core.config import settings
43
+ self.token_file_path = token_file_path or getattr(settings, 'TOKEN_FILE_PATH', './tokens.txt')
44
+ self.max_failures = getattr(settings, 'TOKEN_MAX_FAILURES', 3)
45
+ self.reload_interval = getattr(settings, 'TOKEN_RELOAD_INTERVAL', 60)
46
+ except ImportError:
47
+ # Fallback values if settings not available
48
+ self.token_file_path = token_file_path or './tokens.txt'
49
+ self.max_failures = 3
50
+ self.reload_interval = 60
51
+
52
+ self.tokens: List[TokenInfo] = []
53
+ self.current_index = 0
54
+ self.last_reload_time = 0
55
+ self._lock = threading.Lock()
56
+
57
+ # Load tokens on initialization
58
+ self._load_tokens()
59
+
60
+ def _load_tokens(self) -> None:
61
+ """Load tokens from file"""
62
+ try:
63
+ if not os.path.exists(self.token_file_path):
64
+ debug_log(f"Token文件不存在: {self.token_file_path}")
65
+ # Fallback to BACKUP_TOKEN if file doesn't exist
66
+ try:
67
+ from app.core.config import settings
68
+ if hasattr(settings, 'BACKUP_TOKEN') and settings.BACKUP_TOKEN:
69
+ self.tokens = [TokenInfo(token=settings.BACKUP_TOKEN)]
70
+ debug_log("使用配置文件中的BACKUP_TOKEN作为备用")
71
+ except ImportError:
72
+ pass
73
+ return
74
+
75
+ with open(self.token_file_path, 'r', encoding='utf-8') as f:
76
+ lines = f.readlines()
77
+
78
+ new_tokens = []
79
+ for line in lines:
80
+ token = line.strip()
81
+ if token and not token.startswith('#'): # Skip empty lines and comments
82
+ # Check if this token already exists to preserve failure count
83
+ existing_token = next((t for t in self.tokens if t.token == token), None)
84
+ if existing_token:
85
+ new_tokens.append(existing_token)
86
+ else:
87
+ new_tokens.append(TokenInfo(token=token))
88
+
89
+ if new_tokens:
90
+ with self._lock:
91
+ self.tokens = new_tokens
92
+ # Reset index if it's out of bounds
93
+ if self.current_index >= len(self.tokens):
94
+ self.current_index = 0
95
+ self.last_reload_time = time.time()
96
+
97
+ debug_log(f"成功加载 {len(self.tokens)} 个token")
98
+ active_count = sum(1 for t in self.tokens if t.is_active)
99
+ debug_log(f"活跃token数量: {active_count}")
100
+ else:
101
+ debug_log("Token文件为空或无有效token")
102
+
103
+ except Exception as e:
104
+ debug_log(f"加载token文件失败: {e}")
105
+
106
+ def _should_reload(self) -> bool:
107
+ """Check if tokens should be reloaded"""
108
+ return time.time() - self.last_reload_time > self.reload_interval
109
+
110
+ def get_next_token(self) -> Optional[str]:
111
+ """Get next available token using round-robin with load balancing"""
112
+ # Reload tokens if needed
113
+ if self._should_reload():
114
+ self._load_tokens()
115
+
116
+ with self._lock:
117
+ if not self.tokens:
118
+ debug_log("没有可用的token")
119
+ return None
120
+
121
+ # Find active tokens
122
+ active_tokens = [i for i, t in enumerate(self.tokens) if t.is_active]
123
+
124
+ if not active_tokens:
125
+ debug_log("没有活跃的token,尝试重置失败计数")
126
+ # Reset all tokens if none are active (maybe temporary network issues)
127
+ for token in self.tokens:
128
+ token.is_active = True
129
+ token.failure_count = 0
130
+ active_tokens = list(range(len(self.tokens)))
131
+
132
+ # Round-robin selection from active tokens
133
+ attempts = 0
134
+ max_attempts = len(active_tokens)
135
+
136
+ while attempts < max_attempts:
137
+ # Find next active token starting from current_index
138
+ token_index = None
139
+ for i in range(len(self.tokens)):
140
+ idx = (self.current_index + i) % len(self.tokens)
141
+ if idx in active_tokens:
142
+ token_index = idx
143
+ break
144
+
145
+ if token_index is not None:
146
+ self.current_index = (token_index + 1) % len(self.tokens)
147
+ token_info = self.tokens[token_index]
148
+ token_info.last_used_time = time.time()
149
+ debug_log(f"选择token[{token_index}]: {token_info.token[:20]}...")
150
+ return token_info.token
151
+
152
+ attempts += 1
153
+
154
+ debug_log("无法找到可用的token")
155
+ return None
156
+
157
+ def mark_token_failed(self, token: str) -> None:
158
+ """Mark a token as failed and deactivate if necessary"""
159
+ with self._lock:
160
+ for token_info in self.tokens:
161
+ if token_info.token == token:
162
+ token_info.failure_count += 1
163
+ token_info.last_failure_time = time.time()
164
+
165
+ if token_info.failure_count >= self.max_failures:
166
+ token_info.is_active = False
167
+ debug_log(f"Token失效 (失败{token_info.failure_count}次): {token[:20]}...")
168
+ else:
169
+ debug_log(f"Token失败 ({token_info.failure_count}/{self.max_failures}): {token[:20]}...")
170
+ break
171
+
172
+ def mark_token_success(self, token: str) -> None:
173
+ """Mark a token as successful (reset failure count)"""
174
+ with self._lock:
175
+ for token_info in self.tokens:
176
+ if token_info.token == token:
177
+ if token_info.failure_count > 0:
178
+ debug_log(f"Token恢复正常: {token[:20]}...")
179
+ token_info.failure_count = 0
180
+ token_info.is_active = True
181
+ break
182
+
183
+ def get_token_stats(self) -> Dict[str, Any]:
184
+ """Get token pool statistics"""
185
+ with self._lock:
186
+ if not self.tokens:
187
+ return {
188
+ "total": 0,
189
+ "active": 0,
190
+ "failed": 0,
191
+ "tokens": []
192
+ }
193
+
194
+ active_count = sum(1 for t in self.tokens if t.is_active)
195
+ failed_count = len(self.tokens) - active_count
196
+
197
+ token_details = []
198
+ for i, token_info in enumerate(self.tokens):
199
+ token_details.append({
200
+ "index": i,
201
+ "token_preview": token_info.token[:20] + "...",
202
+ "is_active": token_info.is_active,
203
+ "failure_count": token_info.failure_count,
204
+ "last_failure_time": token_info.last_failure_time,
205
+ "last_used_time": token_info.last_used_time
206
+ })
207
+
208
+ return {
209
+ "total": len(self.tokens),
210
+ "active": active_count,
211
+ "failed": failed_count,
212
+ "current_index": self.current_index,
213
+ "last_reload_time": self.last_reload_time,
214
+ "tokens": token_details
215
+ }
216
+
217
+ def reset_all_tokens(self) -> None:
218
+ """Reset all tokens (clear failure counts and reactivate)"""
219
+ with self._lock:
220
+ for token_info in self.tokens:
221
+ token_info.is_active = True
222
+ token_info.failure_count = 0
223
+ token_info.last_failure_time = None
224
+ debug_log("已重置所有token状态")
225
+
226
+ def reload_tokens(self) -> None:
227
+ """Force reload tokens from file"""
228
+ debug_log("强制重新加载token文件")
229
+ self._load_tokens()
230
+
231
+
232
+ # Global token manager instance
233
+ token_manager = TokenManager()
app/utils/helpers.py CHANGED
@@ -11,6 +11,7 @@ import requests
11
  from fake_useragent import UserAgent
12
 
13
  from app.core.config import settings
 
14
 
15
  # 全局 UserAgent 实例,避免每次调用都创建新实例
16
  _user_agent_instance = None
@@ -153,16 +154,23 @@ def get_anonymous_token() -> str:
153
 
154
 
155
  def get_auth_token() -> str:
156
- """Get authentication token (anonymous or fixed)"""
157
  if settings.ANONYMOUS_MODE:
158
  try:
159
  token = get_anonymous_token()
160
  debug_log(f"匿名token获取成功: {token[:10]}...")
161
  return token
162
  except Exception as e:
163
- debug_log(f"匿名token获取失败,回退固定token: {e}")
164
 
165
- return settings.BACKUP_TOKEN
 
 
 
 
 
 
 
166
 
167
 
168
  def transform_thinking_content(content: str) -> str:
@@ -196,16 +204,56 @@ def call_upstream_api(
196
  headers = get_browser_headers(chat_id)
197
  headers["Authorization"] = f"Bearer {auth_token}"
198
 
 
 
 
 
199
  debug_log(f"调用上游API: {settings.API_ENDPOINT}")
200
- debug_log(f"上游请求体: {upstream_req.model_dump_json()}")
201
-
202
- response = requests.post(
203
- settings.API_ENDPOINT,
204
- json=upstream_req.model_dump(exclude_none=True),
205
- headers=headers,
206
- timeout=60.0,
207
- stream=True
208
- )
209
-
210
- debug_log(f"上游响应状态: {response.status_code}")
211
- return response
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  from fake_useragent import UserAgent
12
 
13
  from app.core.config import settings
14
+ from app.core.token_manager import token_manager
15
 
16
  # 全局 UserAgent 实例,避免每次调用都创建新实例
17
  _user_agent_instance = None
 
154
 
155
 
156
  def get_auth_token() -> str:
157
+ """Get authentication token (anonymous or from token pool)"""
158
  if settings.ANONYMOUS_MODE:
159
  try:
160
  token = get_anonymous_token()
161
  debug_log(f"匿名token获取成功: {token[:10]}...")
162
  return token
163
  except Exception as e:
164
+ debug_log(f"匿名token获取失败,使用token池: {e}")
165
 
166
+ # Use token pool for load balancing
167
+ token = token_manager.get_next_token()
168
+ if token:
169
+ debug_log(f"从token池获取token: {token[:10]}...")
170
+ return token
171
+ else:
172
+ debug_log("token池无可用token,使用配置文件备用token")
173
+ return settings.BACKUP_TOKEN
174
 
175
 
176
  def transform_thinking_content(content: str) -> str:
 
204
  headers = get_browser_headers(chat_id)
205
  headers["Authorization"] = f"Bearer {auth_token}"
206
 
207
+ # 准备请求数据
208
+ request_data = upstream_req.model_dump(exclude_none=True)
209
+ request_json = upstream_req.model_dump_json()
210
+
211
  debug_log(f"调用上游API: {settings.API_ENDPOINT}")
212
+ debug_log(f"请求体大小: {len(request_json)} 字符")
213
+
214
+ # 如果请求体太大,只显示部分内容
215
+ if len(request_json) > 1000:
216
+ debug_log(f"上游请求体 (截断): {request_json[:500]}...{request_json[-200:]}")
217
+ else:
218
+ debug_log(f"上游请求体: {request_json}")
219
+
220
+ # 设置代理(如果配置了)
221
+ proxies = {}
222
+ if settings.HTTP_PROXY:
223
+ proxies['http'] = settings.HTTP_PROXY
224
+ if settings.HTTPS_PROXY:
225
+ proxies['https'] = settings.HTTPS_PROXY
226
+
227
+ try:
228
+ response = requests.post(
229
+ settings.API_ENDPOINT,
230
+ json=request_data,
231
+ headers=headers,
232
+ timeout=(settings.CONNECTION_TIMEOUT, settings.REQUEST_TIMEOUT),
233
+ stream=True,
234
+ proxies=proxies if proxies else None,
235
+ verify=True,
236
+ )
237
+
238
+ debug_log(f"上游响应状态: {response.status_code}")
239
+
240
+ # 检查响应头
241
+ if settings.DEBUG_LOGGING:
242
+ content_type = response.headers.get('content-type', 'unknown')
243
+ content_length = response.headers.get('content-length', 'unknown')
244
+ debug_log(f"响应类型: {content_type}, 长度: {content_length}")
245
+
246
+ return response
247
+
248
+ except requests.exceptions.Timeout as e:
249
+ debug_log(f"请求超时: {e}")
250
+ raise Exception(f"上游API请求超时: {e}")
251
+ except requests.exceptions.ConnectionError as e:
252
+ debug_log(f"连接错误: {e}")
253
+ raise Exception(f"上游API连接失败: {e}")
254
+ except requests.exceptions.RequestException as e:
255
+ debug_log(f"请求异常: {e}")
256
+ raise Exception(f"上游API请求失败: {e}")
257
+ except Exception as e:
258
+ debug_log(f"未知错误: {e}")
259
+ raise
deploy/Dockerfile DELETED
@@ -1,10 +0,0 @@
1
- FROM python:3.12-slim
2
-
3
- WORKDIR /app
4
-
5
- COPY requirements.txt .
6
- RUN pip install --no-cache-dir -r requirements.txt
7
-
8
- COPY . .
9
-
10
- CMD ["python", "main.py"]
 
 
 
 
 
 
 
 
 
 
 
deploy/docker-compose.yml DELETED
@@ -1,27 +0,0 @@
1
- version: '3.8'
2
-
3
- services:
4
- api-server:
5
- image: julienol/z-ai2api-python:latest
6
- container_name: z-ai-api-server
7
- ports:
8
- - "8084:8080"
9
- environment:
10
- # Auth Configuration
11
- - AUTH_TOKEN=sk-123456
12
- # 是否跳过api key验证
13
- - SKIP_AUTH_TOKEN=false
14
- # Server Configurations
15
- - DEBUG_LOGGING=true
16
- # Feature Configuration
17
- - THINKING_PROCESSING=think
18
- - ANONYMOUS_MODE=true
19
- - TOOL_SUPPORT=true
20
- - SCAN_LIMIT=200000
21
- restart: unless-stopped
22
- healthcheck:
23
- test: ["CMD", "curl", "-f", "http://localhost:8080/v1/models"]
24
- interval: 30s
25
- timeout: 10s
26
- retries: 3
27
- start_period: 10s
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
main.py CHANGED
@@ -10,6 +10,7 @@ from fastapi.middleware.cors import CORSMiddleware
10
 
11
  from app.core.config import settings
12
  from app.core import openai
 
13
  from app.utils.reload_config import RELOAD_CONFIG
14
 
15
  from granian import Granian
@@ -32,6 +33,7 @@ app.add_middleware(
32
 
33
  # Include API routers
34
  app.include_router(openai.router)
 
35
 
36
 
37
  @app.options("/")
 
10
 
11
  from app.core.config import settings
12
  from app.core import openai
13
+ from app.api import admin
14
  from app.utils.reload_config import RELOAD_CONFIG
15
 
16
  from granian import Granian
 
33
 
34
  # Include API routers
35
  app.include_router(openai.router)
36
+ app.include_router(admin.router)
37
 
38
 
39
  @app.options("/")