clash-linux commited on
Commit
8fea9ab
·
verified ·
1 Parent(s): 832e5ec

Upload 32 files

Browse files
app/api/auth.py ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import HTTPException, Request
2
+
3
+ # 密码验证依赖
4
+ async def verify_password(request: Request, PASSWORD: str = None):
5
+ """验证请求中的Bearer令牌是否与配置的密码匹配"""
6
+ if PASSWORD:
7
+ auth_header = request.headers.get("Authorization")
8
+ if not auth_header or not auth_header.startswith("Bearer "):
9
+ raise HTTPException(
10
+ status_code=401, detail="Unauthorized: Missing or invalid token")
11
+ token = auth_header.split(" ")[1]
12
+ if token != PASSWORD:
13
+ raise HTTPException(
14
+ status_code=401, detail="Unauthorized: Invalid token")
app/api/client_disconnect.py ADDED
@@ -0,0 +1,120 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import time
3
+ from fastapi import Request
4
+ from app.models import ChatCompletionRequest
5
+ from app.utils import create_error_response
6
+ from .logging_utils import log
7
+
8
+ # 客户端断开检测函数
9
+ async def check_client_disconnect(http_request: Request, current_api_key: str, request_type: str, model: str):
10
+ """检查客户端是否断开连接"""
11
+ while True:
12
+ if await http_request.is_disconnected():
13
+ extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': model, 'error_message': '检测到客户端断开连接'}
14
+ log('info', "客户端连接已中断,等待API请求完成", extra=extra_log)
15
+ return True
16
+ await asyncio.sleep(0.5)
17
+
18
+ # 客户端断开处理函数
19
+ async def handle_client_disconnect(
20
+ gemini_task: asyncio.Task,
21
+ chat_request: ChatCompletionRequest,
22
+ request_type: str,
23
+ current_api_key: str,
24
+ response_cache_manager,
25
+ cache_key: str = None,
26
+ client_ip: str = None
27
+ ):
28
+ try:
29
+ # 等待API任务完成,使用shield防止它被取消
30
+ response_content = await asyncio.shield(gemini_task)
31
+
32
+ # 检查响应文本是否为空
33
+ if response_content is None or response_content.text == "":
34
+ if response_content is None:
35
+ log('info', "客户端断开后API任务返回None",
36
+ extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
37
+ else:
38
+ extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'status_code': 204}
39
+ log('info', "客户端断开后Gemini API 返回空响应", extra=extra_log)
40
+
41
+ # 删除任何现有缓存,因为响应为空
42
+ if cache_key and cache_key in response_cache_manager.cache:
43
+ log('info', f"因空响应,删除缓存: {cache_key[:8]}...",
44
+ extra={'cache_operation': 'remove-on-empty', 'request_type': request_type})
45
+ del response_cache_manager.cache[cache_key]
46
+
47
+ # 返回错误响应而不是None
48
+ return create_error_response(chat_request.model, "AI未返回任何内容,请重试")
49
+
50
+ # 首先检查是否有现有缓存
51
+ cached_response, cache_hit = response_cache_manager.get(cache_key)
52
+ if cache_hit:
53
+ log('info', f"客户端断开但找到已存在缓存,将删除: {cache_key[:8]}...",
54
+ extra={'cache_operation': 'disconnect-found-cache', 'request_type': request_type})
55
+
56
+ # 安全删除缓存
57
+ if cache_key in response_cache_manager.cache:
58
+ del response_cache_manager.cache[cache_key]
59
+
60
+ # 不返回缓存,而是创建新响应并缓存
61
+
62
+ # 创建新响应
63
+ from app.utils.response import create_response
64
+ response = create_response(chat_request, response_content)
65
+
66
+ # 客户端已断开,此响应不会实际发送,可以考虑将其缓存以供后续使用
67
+ # 如果确实需要缓存,则可以取消下面的注释
68
+ # cache_response(response, cache_key, client_ip)
69
+
70
+ return response
71
+ except asyncio.CancelledError:
72
+ # 对于取消异常,仍然尝试继续完成任务
73
+ log('info', "客户端断开后任务被取消,但我们仍会尝试完成",
74
+ extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
75
+
76
+ # 检查任务是否已经完成
77
+ if gemini_task.done() and not gemini_task.cancelled():
78
+ try:
79
+ response_content = gemini_task.result()
80
+
81
+ # 首先检查是否有现有缓存
82
+ cached_response, cache_hit = response_cache_manager.get(cache_key)
83
+ if cache_hit:
84
+ log('info', f"任务被取消但找到已存在缓存,将删除: {cache_key[:8]}...",
85
+ extra={'cache_operation': 'cancel-found-cache', 'request_type': request_type})
86
+
87
+ # 安全删除缓存
88
+ if cache_key in response_cache_manager.cache:
89
+ del response_cache_manager.cache[cache_key]
90
+
91
+ # 创建但不缓存响应
92
+ from app.utils.response import create_response
93
+ response = create_response(chat_request, response_content)
94
+ return response
95
+ except Exception as inner_e:
96
+ log('error', f"客户端断开后从已完成任务获取结果失败: {str(inner_e)}",
97
+ extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
98
+
99
+ # 删除缓存,因为出现错误
100
+ if cache_key and cache_key in response_cache_manager.cache:
101
+ log('info', f"因任务获取结果失败,删除缓存: {cache_key[:8]}...",
102
+ extra={'cache_operation': 'remove-on-error', 'request_type': request_type})
103
+ del response_cache_manager.cache[cache_key]
104
+
105
+ # 创建错误响应而不是返回None
106
+ return create_error_response(chat_request.model, "请求处理过程中发生错误,请重试")
107
+ except Exception as e:
108
+ # 处理API任务异常
109
+ error_msg = str(e)
110
+ extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message': error_msg}
111
+ log('error', f"客户端断开后处理API响应时出错: {error_msg}", extra=extra_log)
112
+
113
+ # 删除缓存,因为出现错误
114
+ if cache_key and cache_key in response_cache_manager.cache:
115
+ log('info', f"因API响应错误,删除缓存: {cache_key[:8]}...",
116
+ extra={'cache_operation': 'remove-on-error', 'request_type': request_type})
117
+ del response_cache_manager.cache[cache_key]
118
+
119
+ # 创建错误响应而不是返回None
120
+ return create_error_response(chat_request.model, f"请求处理错误: {error_msg}")
app/api/gemini_handlers.py ADDED
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ from app.models import ChatCompletionRequest
3
+ from app.services import GeminiClient
4
+ from .logging_utils import log
5
+
6
+ # Gemini完成请求函数
7
+ async def run_gemini_completion(
8
+ gemini_client,
9
+ chat_request: ChatCompletionRequest,
10
+ contents,
11
+ system_instruction,
12
+ request_type: str,
13
+ current_api_key: str,
14
+ safety_settings,
15
+ safety_settings_g2
16
+ ):
17
+ """运行Gemini非流式请求"""
18
+ # 记录函数调用状态
19
+ run_fn = run_gemini_completion
20
+
21
+ try:
22
+ # 创建一个不会被客户端断开影响的任务
23
+ response_future = asyncio.create_task(
24
+ asyncio.to_thread(
25
+ gemini_client.complete_chat,
26
+ chat_request,
27
+ contents,
28
+ safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings,
29
+ system_instruction
30
+ )
31
+ )
32
+
33
+ # 使用shield防止任务被外部取消
34
+ response_content = await asyncio.shield(response_future)
35
+
36
+ # 只在第一次调用时记录完成日志
37
+ if not hasattr(run_fn, 'logged_complete'):
38
+ log('info', "非流式请求成功完成", extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
39
+ run_fn.logged_complete = True
40
+ return response_content
41
+ except asyncio.CancelledError:
42
+ # 即使任务被取消,我们也确保正在进行的API请求能够完成
43
+ if 'response_future' in locals() and not response_future.done():
44
+ try:
45
+ # 使用shield确保任务不被取消,并等待它完成
46
+ response_content = await asyncio.shield(response_future)
47
+ log('info', "API请求在客户端断开后完成", extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
48
+ return response_content
49
+ except Exception as e:
50
+ extra_log_gemini_cancel = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message': f'API请求在客户端断开后失败: {str(e)}'}
51
+ log('info', "API调用因客户端断开而失败", extra=extra_log_gemini_cancel)
52
+ raise
53
+
54
+ # 如果任务尚未开始或已经失败,记录日志
55
+ extra_log_gemini_cancel = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message': '客户端断开导致API调用取消'}
56
+ log('info', "API调用因客户端断开而取消", extra=extra_log_gemini_cancel)
57
+ raise
app/api/logging_utils.py ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ from app.utils import format_log_message
3
+
4
+ # 获取logger
5
+ logger = logging.getLogger("my_logger")
6
+
7
+ # 日志记录函数
8
+ def log(level: str, message: str, **extra):
9
+ """简化日志记录的统一函数"""
10
+ msg = format_log_message(level.upper(), message, extra=extra)
11
+ getattr(logger, level.lower())(msg)
app/api/nonstream_handlers.py ADDED
@@ -0,0 +1,157 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ from fastapi import HTTPException, status, Request
3
+ from app.models import ChatCompletionRequest
4
+ from app.services import GeminiClient
5
+ from app.utils import cache_response, update_api_call_stats
6
+ from .logging_utils import log
7
+ from .client_disconnect import check_client_disconnect, handle_client_disconnect
8
+ from .gemini_handlers import run_gemini_completion
9
+
10
+ # 非流式请求处理函数
11
+ async def process_nonstream_request(
12
+ chat_request: ChatCompletionRequest,
13
+ http_request: Request,
14
+ request_type: str,
15
+ contents,
16
+ system_instruction,
17
+ current_api_key: str,
18
+ response_cache_manager,
19
+ active_requests_manager,
20
+ safety_settings,
21
+ safety_settings_g2,
22
+ api_call_stats,
23
+ cache_key: str = None,
24
+ client_ip: str = None
25
+ ):
26
+ """处理非流式API请求"""
27
+ gemini_client = GeminiClient(current_api_key)
28
+
29
+ # 创建任务
30
+ gemini_task = asyncio.create_task(
31
+ run_gemini_completion(
32
+ gemini_client,
33
+ chat_request,
34
+ contents,
35
+ system_instruction,
36
+ request_type,
37
+ current_api_key,
38
+ safety_settings,
39
+ safety_settings_g2
40
+ )
41
+ )
42
+
43
+ disconnect_task = asyncio.create_task(
44
+ check_client_disconnect(
45
+ http_request,
46
+ current_api_key,
47
+ request_type,
48
+ chat_request.model
49
+ )
50
+ )
51
+
52
+ try:
53
+ # 先等待看是否API任务先完成,或者客户端先断开连接
54
+ done, pending = await asyncio.wait(
55
+ [gemini_task, disconnect_task],
56
+ return_when=asyncio.FIRST_COMPLETED
57
+ )
58
+
59
+ if disconnect_task in done:
60
+ # 客户端已断开连接,但我们仍继续完成API请求以便缓存结果
61
+ return await handle_client_disconnect(
62
+ gemini_task,
63
+ chat_request,
64
+ request_type,
65
+ current_api_key,
66
+ response_cache_manager,
67
+ cache_key,
68
+ client_ip
69
+ )
70
+ else:
71
+ # API任务先完成,取消断开检测任务
72
+ disconnect_task.cancel()
73
+
74
+ # 获取响应内容
75
+ response_content = await gemini_task
76
+
77
+ # 检查缓存是否已经存在,如果存在则不再创建新缓存
78
+ cached_response, cache_hit = response_cache_manager.get(cache_key)
79
+ if cache_hit:
80
+ log('info', f"缓存已存在,直接返回: {cache_key[:8]}...",
81
+ extra={'cache_operation': 'use-existing', 'request_type': request_type})
82
+
83
+ # 安全删除缓存
84
+ if cache_key in response_cache_manager.cache:
85
+ del response_cache_manager.cache[cache_key]
86
+ log('info', f"缓存使用后已删除: {cache_key[:8]}...",
87
+ extra={'cache_operation': 'used-and-removed', 'request_type': request_type})
88
+
89
+ return cached_response
90
+
91
+ # 创建响应
92
+ from app.utils.response import create_response
93
+ response = create_response(chat_request, response_content)
94
+
95
+ # 缓存响应
96
+ cache_response(response, cache_key, client_ip, response_cache_manager, update_api_call_stats, api_key=current_api_key)
97
+
98
+ # 立即删除缓存,确保只能使用一次
99
+ if cache_key and cache_key in response_cache_manager.cache:
100
+ del response_cache_manager.cache[cache_key]
101
+ log('info', f"缓存创建后立即删除: {cache_key[:8]}...",
102
+ extra={'cache_operation': 'store-and-remove', 'request_type': request_type})
103
+
104
+ # 返回响应
105
+ return response
106
+
107
+ except asyncio.CancelledError:
108
+ extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message':"请求被取消"}
109
+ log('info', "请求取消", extra=extra_log)
110
+
111
+ # 在请求被取消时先检查缓存中是否已有结果
112
+ cached_response, cache_hit = response_cache_manager.get(cache_key)
113
+ if cache_hit:
114
+ log('info', f"请求取消但找到有效缓存,使用缓存响应: {cache_key[:8]}...",
115
+ extra={'cache_operation': 'use-cache-on-cancel', 'request_type': request_type})
116
+
117
+ # 安全删除缓存
118
+ if cache_key in response_cache_manager.cache:
119
+ del response_cache_manager.cache[cache_key]
120
+ log('info', f"缓存使用后已删除: {cache_key[:8]}...",
121
+ extra={'cache_operation': 'used-and-removed', 'request_type': request_type})
122
+
123
+ return cached_response
124
+
125
+ # 尝试完成正在进行的API请求
126
+ if not gemini_task.done():
127
+ log('info', "请求取消但API请求尚未完成,继续等待...",
128
+ extra={'key': current_api_key[:8], 'request_type': request_type})
129
+
130
+ # 使用shield确保任务不会被取消
131
+ response_content = await asyncio.shield(gemini_task)
132
+
133
+ # 创建响应
134
+ from app.utils.response import create_response
135
+ response = create_response(chat_request, response_content)
136
+
137
+ # 不缓存这个响应,直接返回
138
+ return response
139
+ else:
140
+ # 任务已完成,获取结果
141
+ response_content = gemini_task.result()
142
+
143
+ # 创建响应
144
+ from app.utils.response import create_response
145
+ response = create_response(chat_request, response_content)
146
+
147
+ # 不缓存这个响应,直接返回
148
+ return response
149
+
150
+ except HTTPException as e:
151
+ if e.status_code == status.HTTP_408_REQUEST_TIMEOUT:
152
+ extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model,
153
+ 'status_code': 408, 'error_message': '客户端连接中断'}
154
+ log('error', "客户端连接中断,终止后续重试", extra=extra_log)
155
+ raise
156
+ else:
157
+ raise
app/api/request_handlers.py ADDED
@@ -0,0 +1,165 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import json
3
+ from typing import Literal
4
+ from fastapi import HTTPException, Request, status
5
+ from fastapi.responses import StreamingResponse
6
+ from app.models import ChatCompletionRequest
7
+ from app.services import GeminiClient
8
+ from app.utils import protect_from_abuse, handle_gemini_error, handle_api_error
9
+ from .logging_utils import log
10
+ from .stream_handlers import process_stream_request
11
+ from .nonstream_handlers import process_nonstream_request
12
+
13
+ # 请求处理函数
14
+ async def process_request(
15
+ chat_request: ChatCompletionRequest,
16
+ http_request: Request,
17
+ request_type: Literal['stream', 'non-stream'],
18
+ key_manager,
19
+ response_cache_manager,
20
+ active_requests_manager,
21
+ safety_settings,
22
+ safety_settings_g2,
23
+ api_call_stats,
24
+ FAKE_STREAMING,
25
+ FAKE_STREAMING_INTERVAL,
26
+ MAX_REQUESTS_PER_MINUTE,
27
+ MAX_REQUESTS_PER_DAY_PER_IP,
28
+ cache_key: str = None,
29
+ client_ip: str = None
30
+ ):
31
+ """处理API请求的主函数,根据需要处理流式或非流式请求"""
32
+ global current_api_key
33
+
34
+ # 请求前基本检查
35
+ protect_from_abuse(
36
+ http_request, MAX_REQUESTS_PER_MINUTE, MAX_REQUESTS_PER_DAY_PER_IP)
37
+ if chat_request.model not in GeminiClient.AVAILABLE_MODELS:
38
+ error_msg = "无效的模型"
39
+ extra_log = {'request_type': request_type, 'model': chat_request.model, 'status_code': 400, 'error_message': error_msg}
40
+ log('error', error_msg, extra=extra_log)
41
+ raise HTTPException(
42
+ status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg)
43
+
44
+ # 重置已尝试的密钥
45
+ key_manager.reset_tried_keys_for_request()
46
+
47
+ # 转换消息格式
48
+ contents, system_instruction = GeminiClient.convert_messages(
49
+ GeminiClient, chat_request.messages)
50
+
51
+ # 设置重试次数(使用可用API密钥数量作为最大重试次数)
52
+ retry_attempts = len(key_manager.api_keys) if key_manager.api_keys else 1
53
+
54
+ # 尝试使用不同API密钥
55
+ for attempt in range(1, retry_attempts + 1):
56
+ # 获取下一个密钥
57
+ current_api_key = key_manager.get_available_key()
58
+
59
+ # 检查API密钥是否可用
60
+ if current_api_key is None:
61
+ log('warning', "没有可用的 API 密钥,跳过本次尝试",
62
+ extra={'request_type': request_type, 'model': chat_request.model, 'status_code': 'N/A'})
63
+ break
64
+
65
+ # 记录当前尝试的密钥信息
66
+ log('info', f"第 {attempt}/{retry_attempts} 次尝试 ... 使用密钥: {current_api_key[:8]}...",
67
+ extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
68
+
69
+ # 服务器错误重试逻辑
70
+ server_error_retries = 3
71
+ for server_retry in range(1, server_error_retries + 1):
72
+ try:
73
+ # 根据请求类型分别处理
74
+ if chat_request.stream:
75
+ try:
76
+ return await process_stream_request(
77
+ chat_request,
78
+ http_request,
79
+ contents,
80
+ system_instruction,
81
+ current_api_key,
82
+ key_manager,
83
+ safety_settings,
84
+ safety_settings_g2,
85
+ api_call_stats,
86
+ FAKE_STREAMING,
87
+ FAKE_STREAMING_INTERVAL
88
+ )
89
+ except Exception as e:
90
+ # 捕获流式请求的异常,但不立即返回错误
91
+ # 记录错误并继续尝试下一个API密钥
92
+ error_detail = handle_gemini_error(e, current_api_key, key_manager)
93
+ log('error', f"流式请求失败: {error_detail}",
94
+ extra={'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
95
+ # 不返回错误,而是抛出异常让外层循环处理
96
+ raise
97
+ else:
98
+ return await process_nonstream_request(
99
+ chat_request,
100
+ http_request,
101
+ request_type,
102
+ contents,
103
+ system_instruction,
104
+ current_api_key,
105
+ response_cache_manager,
106
+ active_requests_manager,
107
+ safety_settings,
108
+ safety_settings_g2,
109
+ api_call_stats,
110
+ cache_key,
111
+ client_ip
112
+ )
113
+ except HTTPException as e:
114
+ if e.status_code == status.HTTP_408_REQUEST_TIMEOUT:
115
+ log('error', "客户端连接中断",
116
+ extra={'key': current_api_key[:8], 'request_type': request_type,
117
+ 'model': chat_request.model, 'status_code': 408})
118
+ raise
119
+ else:
120
+ raise
121
+ except Exception as e:
122
+ # 使用统一的API错误处理函数
123
+ error_result = await handle_api_error(
124
+ e,
125
+ current_api_key,
126
+ key_manager,
127
+ request_type,
128
+ chat_request.model,
129
+ server_retry - 1
130
+ )
131
+
132
+ # 如果需要删除缓存,清除缓存
133
+ if error_result.get('remove_cache', False) and cache_key and cache_key in response_cache_manager.cache:
134
+ log('info', f"因API错误,删除缓存: {cache_key[:8]}...",
135
+ extra={'cache_operation': 'remove-on-error', 'request_type': request_type})
136
+ del response_cache_manager.cache[cache_key]
137
+
138
+ if error_result.get('should_retry', False):
139
+ # 服务器错误需要重试(等待已在handle_api_error中完成)
140
+ continue
141
+ elif error_result.get('should_switch_key', False) and attempt < retry_attempts:
142
+ # 跳出服务器错误重试循环,获取下一个可用密钥
143
+ log('info', f"API密钥 {current_api_key[:8]}... 失败,准备尝试下一个密钥",
144
+ extra={'key': current_api_key[:8], 'request_type': request_type})
145
+ break
146
+ else:
147
+ # 无法处理的错误或已达到重试上限
148
+ break
149
+
150
+ # 如果所有尝试都失败
151
+ msg = "所有API密钥均请求失败,请稍后重试"
152
+ log('error', "API key 替换失败,所有API key都已尝试,请重新配置或稍后重试", extra={'key': 'N/A', 'request_type': 'switch_key', 'status_code': 'N/A'})
153
+
154
+ # 对于流式请求,创建一个特殊的StreamingResponse返回错误
155
+ if chat_request.stream:
156
+ async def error_generator():
157
+ error_json = json.dumps({'error': {'message': msg, 'type': 'api_error'}})
158
+ yield f"data: {error_json}\n\n"
159
+ yield "data: [DONE]\n\n"
160
+
161
+ return StreamingResponse(error_generator(), media_type="text/event-stream")
162
+ else:
163
+ # 非流式请求使用标准HTTP异常
164
+ raise HTTPException(
165
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=msg)
app/api/routes.py CHANGED
@@ -1,33 +1,25 @@
1
  from fastapi import APIRouter, HTTPException, Request, Depends, status
2
  from fastapi.responses import JSONResponse, StreamingResponse
3
  from app.models import ChatCompletionRequest, ChatCompletionResponse, ErrorResponse, ModelList
4
- from app.services import GeminiClient, ResponseWrapper
5
  from app.utils import (
6
- handle_gemini_error,
7
- protect_from_abuse,
8
- APIKeyManager,
9
- test_api_key,
10
- format_log_message,
11
- log_manager,
12
  generate_cache_key,
13
  cache_response,
14
  create_chat_response,
15
- create_error_response,
16
- handle_api_error,
17
- update_api_call_stats
18
  )
19
- import json
20
- import asyncio
21
- import time
22
- import logging
23
- import random
24
- from typing import Literal
25
  from app.config.settings import (
26
  api_call_stats,
27
  BLOCKED_MODELS
28
  )
29
- # 获取logger
30
- logger = logging.getLogger("my_logger")
 
 
 
 
 
 
31
 
32
  # 创建路由器
33
  router = APIRouter()
@@ -76,23 +68,9 @@ def init_router(
76
  MAX_REQUESTS_PER_MINUTE = _max_requests_per_minute
77
  MAX_REQUESTS_PER_DAY_PER_IP = _max_requests_per_day_per_ip
78
 
79
- # 日志记录函数
80
- def log(level: str, message: str, **extra):
81
- """简化日志记录的统一函数"""
82
- msg = format_log_message(level.upper(), message, extra=extra)
83
- getattr(logger, level.lower())(msg)
84
-
85
- # 密码验证依赖
86
- async def verify_password(request: Request):
87
- if PASSWORD:
88
- auth_header = request.headers.get("Authorization")
89
- if not auth_header or not auth_header.startswith("Bearer "):
90
- raise HTTPException(
91
- status_code=401, detail="Unauthorized: Missing or invalid token")
92
- token = auth_header.split(" ")[1]
93
- if token != PASSWORD:
94
- raise HTTPException(
95
- status_code=401, detail="Unauthorized: Invalid token")
96
 
97
  # API路由
98
  @router.get("/v1/models", response_model=ModelList)
@@ -102,13 +80,27 @@ def list_models():
102
  return ModelList(data=[{"id": model, "object": "model", "created": 1678888888, "owned_by": "organization-owner"} for model in filtered_models])
103
 
104
  @router.post("/v1/chat/completions", response_model=ChatCompletionResponse)
105
- async def chat_completions(request: ChatCompletionRequest, http_request: Request, _: None = Depends(verify_password)):
106
  # 获取客户端IP
107
  client_ip = http_request.client.host if http_request.client else "unknown"
108
 
109
  # 流式请求直接处理,不使用缓存
110
  if request.stream:
111
- return await process_request(request, http_request, "stream")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
112
 
113
  # 生成完整缓存键 - 用于精确匹配
114
  cache_key = generate_cache_key(request)
@@ -191,7 +183,23 @@ async def chat_completions(request: ChatCompletionRequest, http_request: Request
191
 
192
  # 创建请求处理任务
193
  process_task = asyncio.create_task(
194
- process_request(request, http_request, "non-stream", cache_key=cache_key, client_ip=client_ip)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
195
  )
196
 
197
  # 将任务添加到活跃请求池
@@ -213,706 +221,4 @@ async def chat_completions(request: ChatCompletionRequest, http_request: Request
213
  return cached_response
214
 
215
  # 重新抛出异常
216
- raise
217
-
218
- # 请求处理函数
219
- async def process_request(chat_request: ChatCompletionRequest, http_request: Request, request_type: Literal['stream', 'non-stream'], cache_key: str = None, client_ip: str = None):
220
- """处理API请求的主函数,根据需要处理流式或非流式请求"""
221
- global current_api_key
222
-
223
- # 请求前基本检查
224
- protect_from_abuse(
225
- http_request, MAX_REQUESTS_PER_MINUTE, MAX_REQUESTS_PER_DAY_PER_IP)
226
- if chat_request.model not in GeminiClient.AVAILABLE_MODELS:
227
- error_msg = "无效的模型"
228
- extra_log = {'request_type': request_type, 'model': chat_request.model, 'status_code': 400, 'error_message': error_msg}
229
- log('error', error_msg, extra=extra_log)
230
- raise HTTPException(
231
- status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg)
232
-
233
- # 重置已尝试的密钥
234
- key_manager.reset_tried_keys_for_request()
235
-
236
- # 转换消息格式
237
- contents, system_instruction = GeminiClient.convert_messages(
238
- GeminiClient, chat_request.messages)
239
-
240
- # 设置重试次数(使用可用API密钥数量作为最大重试次数)
241
- retry_attempts = len(key_manager.api_keys) if key_manager.api_keys else 1
242
-
243
- # 尝试使用不同API密钥
244
- for attempt in range(1, retry_attempts + 1):
245
- # 获取下一个密钥
246
- current_api_key = key_manager.get_available_key()
247
-
248
- # 检查API密钥是否可用
249
- if current_api_key is None:
250
- log('warning', "没有可用的 API 密钥,跳过本次尝试",
251
- extra={'request_type': request_type, 'model': chat_request.model, 'status_code': 'N/A'})
252
- break
253
-
254
- # 记录当前尝试的密钥信息
255
- log('info', f"第 {attempt}/{retry_attempts} 次尝试 ... 使用密钥: {current_api_key[:8]}...",
256
- extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
257
-
258
- # 服务器错误重试逻辑
259
- server_error_retries = 3
260
- for server_retry in range(1, server_error_retries + 1):
261
- try:
262
- # 根据请求类型分别处理
263
- if chat_request.stream:
264
- try:
265
- return await process_stream_request(
266
- chat_request,
267
- http_request,
268
- contents,
269
- system_instruction,
270
- current_api_key
271
- )
272
- except Exception as e:
273
- # 捕获流式请求的异常,但不立即返回错误
274
- # 记录错误并继续尝试下一个API密钥
275
- error_detail = handle_gemini_error(e, current_api_key, key_manager)
276
- log('error', f"流式请求失败: {error_detail}",
277
- extra={'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
278
- # 不返回错误,而是抛出异常让外层循环处理
279
- raise
280
- else:
281
- return await process_nonstream_request(
282
- chat_request,
283
- http_request,
284
- request_type,
285
- contents,
286
- system_instruction,
287
- current_api_key,
288
- cache_key,
289
- client_ip
290
- )
291
- except HTTPException as e:
292
- if e.status_code == status.HTTP_408_REQUEST_TIMEOUT:
293
- log('error', "客户端连接中断",
294
- extra={'key': current_api_key[:8], 'request_type': request_type,
295
- 'model': chat_request.model, 'status_code': 408})
296
- raise
297
- else:
298
- raise
299
- except Exception as e:
300
- # 使用统一的API错误处理函数
301
- error_result = await handle_api_error(
302
- e,
303
- current_api_key,
304
- key_manager,
305
- request_type,
306
- chat_request.model,
307
- server_retry - 1
308
- )
309
-
310
- # 如果需要删除缓存,清除缓存
311
- if error_result.get('remove_cache', False) and cache_key and cache_key in response_cache_manager.cache:
312
- log('info', f"因API错误,删除缓存: {cache_key[:8]}...",
313
- extra={'cache_operation': 'remove-on-error', 'request_type': request_type})
314
- del response_cache_manager.cache[cache_key]
315
-
316
- if error_result.get('should_retry', False):
317
- # 服务器错误需要重试(等待已在handle_api_error中完成)
318
- continue
319
- elif error_result.get('should_switch_key', False) and attempt < retry_attempts:
320
- # 跳出服务器错误重试循环,获取下一个可用密钥
321
- log('info', f"API密钥 {current_api_key[:8]}... 失败,准备尝试下一个密钥",
322
- extra={'key': current_api_key[:8], 'request_type': request_type})
323
- break
324
- else:
325
- # 无法处理的错误或已达到重试上限
326
- break
327
-
328
- # 如果所有尝试都失败
329
- msg = "所有API密钥均请求失败,请稍后重试"
330
- log('error', "API key 替换失败,所有API key都已尝试,请重新配置或稍后重试", extra={'key': 'N/A', 'request_type': 'switch_key', 'status_code': 'N/A'})
331
-
332
- # 对于流式请求,创建一个特殊的StreamingResponse返回错误
333
- if chat_request.stream:
334
- async def error_generator():
335
- error_json = json.dumps({'error': {'message': msg, 'type': 'api_error'}})
336
- yield f"data: {error_json}\n\n"
337
- yield "data: [DONE]\n\n"
338
-
339
- return StreamingResponse(error_generator(), media_type="text/event-stream")
340
- else:
341
- # 非流式请求使用标准HTTP异常
342
- raise HTTPException(
343
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=msg)
344
-
345
- # 流式请求处理函数
346
- async def process_stream_request(
347
- chat_request: ChatCompletionRequest,
348
- http_request: Request,
349
- contents,
350
- system_instruction,
351
- current_api_key: str
352
- ) -> StreamingResponse:
353
- """处理流式API请求"""
354
-
355
- # 创建一个直接流式响应的生成器函数
356
- async def stream_response_generator():
357
- # 如果启用了假流式模式,使用随机遍历API密钥的方式
358
- if FAKE_STREAMING:
359
- # 创建一个队列用于在任务之间传递数据
360
- queue = asyncio.Queue()
361
- keep_alive_task = None
362
- api_request_task = None
363
-
364
- try:
365
- # 创建一个保持连接的任务,持续发送换行符
366
- async def keep_alive_sender():
367
- try:
368
- # 创建一个Gemini客户端用于发送保持连接的换行符
369
- keep_alive_client = GeminiClient(current_api_key)
370
-
371
- # 启动保持连接的生成器
372
- keep_alive_generator = keep_alive_client.stream_chat(
373
- chat_request,
374
- contents,
375
- safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings,
376
- system_instruction
377
- )
378
-
379
- # 持续发送换行符直到被取消
380
- async for line in keep_alive_generator:
381
- if line == "\n":
382
- # 将换行符格式化为SSE格式
383
- formatted_chunk = {
384
- "id": "chatcmpl-keepalive",
385
- "object": "chat.completion.chunk",
386
- "created": int(time.time()),
387
- "model": chat_request.model,
388
- "choices": [{"delta": {"content": ""}, "index": 0, "finish_reason": None}]
389
- }
390
- # 将格式化的换行符放入队列
391
- await queue.put(f"data: {json.dumps(formatted_chunk)}\n\n")
392
- except asyncio.CancelledError:
393
- log('info', "保持连接任务被取消",
394
- extra={'key': current_api_key[:8], 'request_type': 'fake-stream'})
395
- raise
396
- except Exception as e:
397
- log('error', f"保持连接任务出错: {str(e)}",
398
- extra={'key': current_api_key[:8], 'request_type': 'fake-stream'})
399
- # 将错误放入队列
400
- await queue.put(None)
401
- raise
402
-
403
- # 创建一个任务来随机遍历API密钥并请求内容
404
- async def api_request_handler():
405
- success = False
406
- try:
407
- # 重置已尝试的密钥
408
- key_manager.reset_tried_keys_for_request()
409
-
410
- # 获取可用的API密钥
411
- available_keys = key_manager.api_keys.copy()
412
- random.shuffle(available_keys) # 随机打乱密钥顺序
413
-
414
- # 遍历所有API密钥尝试获取响应
415
- for attempt, api_key in enumerate(available_keys, 1):
416
- try:
417
- log('info', f"假流式模式: 尝试API密钥 {api_key[:8]}... ({attempt}/{len(available_keys)})",
418
- extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})
419
-
420
- # 创建一个新的客户端使用当前API密钥
421
- non_stream_client = GeminiClient(api_key)
422
-
423
- # 使用非流式方式请求内容
424
- response_content = await asyncio.to_thread(
425
- non_stream_client.complete_chat,
426
- chat_request,
427
- contents,
428
- safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings,
429
- system_instruction
430
- )
431
-
432
- # 检查响应是否有效
433
- if response_content and response_content.text:
434
- log('info', f"假流式模式: API密钥 {api_key[:8]}... 成功获取响应",
435
- extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})
436
-
437
- # 将完整响应分割成小块,模拟流式返回
438
- full_text = response_content.text
439
- chunk_size = max(len(full_text) // 10, 1) # 至少分成10块,每块至少1个字符
440
-
441
- for i in range(0, len(full_text), chunk_size):
442
- chunk = full_text[i:i+chunk_size]
443
- formatted_chunk = {
444
- "id": "chatcmpl-someid",
445
- "object": "chat.completion.chunk",
446
- "created": int(time.time()),
447
- "model": chat_request.model,
448
- "choices": [{"delta": {"role": "assistant", "content": chunk}, "index": 0, "finish_reason": None}]
449
- }
450
- # 将格式化的内容块放入队列
451
- await queue.put(f"data: {json.dumps(formatted_chunk)}\n\n")
452
-
453
- success = True
454
- # 更新API调用统计
455
- from app.utils.stats import update_api_call_stats
456
- update_api_call_stats(api_call_stats,api_key)
457
- break # 成功获取响应,退出循环
458
- else:
459
- log('warning', f"假流式模式: API密钥 {api_key[:8]}... 返回空响应",
460
- extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})
461
- except Exception as e:
462
- error_detail = handle_gemini_error(e, api_key, key_manager)
463
- log('error', f"假流式模式: API密钥 {api_key[:8]}... 请求失败: {error_detail}",
464
- extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})
465
- # 继续尝试下一个API密钥
466
-
467
- # 如果所有API密钥都尝试失败
468
- if not success:
469
- error_msg = "所有API密钥均请求失败,请稍后重试"
470
- log('error', error_msg,
471
- extra={'key': 'ALL', 'request_type': 'fake-stream', 'model': chat_request.model})
472
-
473
- # 添加错误信息到队列
474
- error_json = {
475
- "id": "chatcmpl-error",
476
- "object": "chat.completion.chunk",
477
- "created": int(time.time()),
478
- "model": chat_request.model,
479
- "choices": [{"delta": {"content": f"\n\n[错误: {error_msg}]"}, "index": 0, "finish_reason": "error"}]
480
- }
481
- await queue.put(f"data: {json.dumps(error_json)}\n\n")
482
-
483
- # 添加完成标记到队列
484
- await queue.put("data: [DONE]\n\n")
485
- # 添加None表示队列结束
486
- await queue.put(None)
487
-
488
- except asyncio.CancelledError:
489
- log('info', "API请求任务被取消",
490
- extra={'key': current_api_key[:8], 'request_type': 'fake-stream'})
491
- # 添加None表示队列结束
492
- await queue.put(None)
493
- raise
494
- except Exception as e:
495
- log('error', f"API请求任务出错: {str(e)}",
496
- extra={'key': current_api_key[:8], 'request_type': 'fake-stream'})
497
- # 添加错误信息到队列
498
- error_json = {
499
- "id": "chatcmpl-error",
500
- "object": "chat.completion.chunk",
501
- "created": int(time.time()),
502
- "model": chat_request.model,
503
- "choices": [{"delta": {"content": f"\n\n[错误: {str(e)}]"}, "index": 0, "finish_reason": "error"}]
504
- }
505
- await queue.put(f"data: {json.dumps(error_json)}\n\n")
506
- await queue.put("data: [DONE]\n\n")
507
- # 添加None表示队列结束
508
- await queue.put(None)
509
- raise
510
-
511
- # 启动保持连接的任务
512
- keep_alive_task = asyncio.create_task(keep_alive_sender())
513
- # 启动API请求任务
514
- api_request_task = asyncio.create_task(api_request_handler())
515
-
516
- # 从队列中获取数据并发送给客户端
517
- while True:
518
- chunk = await queue.get()
519
- if chunk is None: # None表示队列结束
520
- break
521
- yield chunk
522
-
523
- # 如果API请求任务已完成,取消保持连接任务
524
- if api_request_task.done() and not keep_alive_task.done():
525
- keep_alive_task.cancel()
526
-
527
- except asyncio.CancelledError:
528
- log('info', "流式响应生成器被取消",
529
- extra={'key': current_api_key[:8], 'request_type': 'fake-stream'})
530
- # 取消所有任务
531
- if keep_alive_task and not keep_alive_task.done():
532
- keep_alive_task.cancel()
533
- if api_request_task and not api_request_task.done():
534
- api_request_task.cancel()
535
- except Exception as e:
536
- log('error', f"流式响应生成器出错: {str(e)}",
537
- extra={'key': current_api_key[:8], 'request_type': 'fake-stream'})
538
- # 取消所有任务
539
- if keep_alive_task and not keep_alive_task.done():
540
- keep_alive_task.cancel()
541
- if api_request_task and not api_request_task.done():
542
- api_request_task.cancel()
543
- # 发送错误信息给客户端
544
- error_json = {
545
- "id": "chatcmpl-error",
546
- "object": "chat.completion.chunk",
547
- "created": int(time.time()),
548
- "model": chat_request.model,
549
- "choices": [{"delta": {"content": f"\n\n[错误: {str(e)}]"}, "index": 0, "finish_reason": "error"}]
550
- }
551
- yield f"data: {json.dumps(error_json)}\n\n"
552
- yield "data: [DONE]\n\n"
553
- finally:
554
- # 确保所有任务都被取消
555
- if keep_alive_task and not keep_alive_task.done():
556
- keep_alive_task.cancel()
557
- if api_request_task and not api_request_task.done():
558
- api_request_task.cancel()
559
- else:
560
- # 原始流式请求处理逻辑
561
- gemini_client = GeminiClient(current_api_key)
562
- success = False
563
-
564
- try:
565
- # 直接迭代生成器并发送响应块
566
- async for chunk in gemini_client.stream_chat(
567
- chat_request,
568
- contents,
569
- safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings,
570
- system_instruction
571
- ):
572
- # 空字符串跳过
573
- if not chunk:
574
- continue
575
-
576
- formatted_chunk = {
577
- "id": "chatcmpl-someid",
578
- "object": "chat.completion.chunk",
579
- "created": int(time.time()),
580
- "model": chat_request.model,
581
- "choices": [{"delta": {"role": "assistant", "content": chunk}, "index": 0, "finish_reason": None}]
582
- }
583
- success = True # 只要有一个chunk成功,就标记为成功
584
- yield f"data: {json.dumps(formatted_chunk)}\n\n"
585
-
586
- # 如果成功获取到响应,更新API调用统计
587
- if success:
588
- from app.utils.stats import update_api_call_stats
589
- update_api_call_stats(api_call_stats, current_api_key)
590
-
591
- yield "data: [DONE]\n\n"
592
-
593
- except asyncio.CancelledError:
594
- extra_log_cancel = {'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model, 'error_message': '客户端已断开连接'}
595
- log('info', "客户端连接已中断", extra=extra_log_cancel)
596
- except Exception as e:
597
- error_detail = handle_gemini_error(e, current_api_key, key_manager)
598
- log('error', f"流式请求失败: {error_detail}",
599
- extra={'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
600
- # 发送错误信息给客户端
601
- error_json = {
602
- "id": "chatcmpl-error",
603
- "object": "chat.completion.chunk",
604
- "created": int(time.time()),
605
- "model": chat_request.model,
606
- "choices": [{"delta": {"content": f"\n\n[错误: {error_detail}]"}, "index": 0, "finish_reason": "error"}]
607
- }
608
- yield f"data: {json.dumps(error_json)}\n\n"
609
- yield "data: [DONE]\n\n"
610
- # 重新抛出异常,这样process_request可以捕获它
611
- raise e
612
-
613
- return StreamingResponse(stream_response_generator(), media_type="text/event-stream")
614
-
615
- # Gemini完成请求函数
616
- async def run_gemini_completion(
617
- gemini_client,
618
- chat_request: ChatCompletionRequest,
619
- contents,
620
- system_instruction,
621
- request_type: str,
622
- current_api_key: str
623
- ):
624
- """运行Gemini非流式请求"""
625
- # 记录函数调用状态
626
- run_fn = run_gemini_completion
627
-
628
- try:
629
- # 创建一个不会被客户端断开影响的任务
630
- response_future = asyncio.create_task(
631
- asyncio.to_thread(
632
- gemini_client.complete_chat,
633
- chat_request,
634
- contents,
635
- safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings,
636
- system_instruction
637
- )
638
- )
639
-
640
- # 使用shield防止任务被外部取消
641
- response_content = await asyncio.shield(response_future)
642
-
643
- # 只在第一次调用时记录完成日志
644
- if not hasattr(run_fn, 'logged_complete'):
645
- log('info', "非流式请求成功完成", extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
646
- run_fn.logged_complete = True
647
- return response_content
648
- except asyncio.CancelledError:
649
- # 即使任务被取消,我们也确保正在进行的API请求能够完成
650
- if 'response_future' in locals() and not response_future.done():
651
- try:
652
- # 使用shield确保任务不被取消,并等待它完成
653
- response_content = await asyncio.shield(response_future)
654
- log('info', "API请求在客户端断开后完成", extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
655
- return response_content
656
- except Exception as e:
657
- extra_log_gemini_cancel = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message': f'API请求在客户端断开后失败: {str(e)}'}
658
- log('info', "API调用因客户端断开而失败", extra=extra_log_gemini_cancel)
659
- raise
660
-
661
- # 如果任务尚未开始或已经失败,记录日志
662
- extra_log_gemini_cancel = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message': '客户端断开导致API调用取消'}
663
- log('info', "API调用因客户端断开而取消", extra=extra_log_gemini_cancel)
664
- raise
665
-
666
- # 客户端断开检测函数
667
- async def check_client_disconnect(http_request: Request, current_api_key: str, request_type: str, model: str):
668
- """检查客户端是否断开连接"""
669
- while True:
670
- if await http_request.is_disconnected():
671
- extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': model, 'error_message': '检测到客户端断开连接'}
672
- log('info', "客户端连接已中断,等待API请求完成", extra=extra_log)
673
- return True
674
- await asyncio.sleep(0.5)
675
-
676
- # 客户端断开处理函数
677
- async def handle_client_disconnect(
678
- gemini_task: asyncio.Task,
679
- chat_request: ChatCompletionRequest,
680
- request_type: str,
681
- current_api_key: str,
682
- cache_key: str = None,
683
- client_ip: str = None
684
- ):
685
- try:
686
- # 等待API任务完成,使用shield防止它被取消
687
- response_content = await asyncio.shield(gemini_task)
688
-
689
- # 检查响应文本是否为空
690
- if response_content is None or response_content.text == "":
691
- if response_content is None:
692
- log('info', "客户端断开后API任务返回None",
693
- extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
694
- else:
695
- extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'status_code': 204}
696
- log('info', "客户端断开后Gemini API 返回空响应", extra=extra_log)
697
-
698
- # 删除任何现有缓存,因为响应为空
699
- if cache_key and cache_key in response_cache_manager.cache:
700
- log('info', f"因空响应,删除缓存: {cache_key[:8]}...",
701
- extra={'cache_operation': 'remove-on-empty', 'request_type': request_type})
702
- del response_cache_manager.cache[cache_key]
703
-
704
- # 返回错误响应而不是None
705
- return create_error_response(chat_request.model, "AI未返回任何内容,请重试")
706
-
707
- # 首先检查是否有现有缓存
708
- cached_response, cache_hit = response_cache_manager.get(cache_key)
709
- if cache_hit:
710
- log('info', f"客户端断开但找到已存在缓存,将删除: {cache_key[:8]}...",
711
- extra={'cache_operation': 'disconnect-found-cache', 'request_type': request_type})
712
-
713
- # 安全删除缓存
714
- if cache_key in response_cache_manager.cache:
715
- del response_cache_manager.cache[cache_key]
716
-
717
- # 不返回缓存,而是创建新响应并缓存
718
-
719
- # 创建新响应
720
- from app.utils.response import create_response
721
- response = create_response(chat_request, response_content)
722
-
723
- # 客户端已断开,此响应不会实际发送,可以考虑将其缓存以供后续使用
724
- # 如果确实需要缓存,则可以取消下面的注释
725
- # cache_response(response, cache_key, client_ip)
726
-
727
- return response
728
- except asyncio.CancelledError:
729
- # 对于取消异常,仍然尝试继续完成任务
730
- log('info', "客户端断开后任务被取消,但我们仍会尝试完成",
731
- extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
732
-
733
- # 检查任务是否已经完成
734
- if gemini_task.done() and not gemini_task.cancelled():
735
- try:
736
- response_content = gemini_task.result()
737
-
738
- # 首先检查是否有现有缓存
739
- cached_response, cache_hit = response_cache_manager.get(cache_key)
740
- if cache_hit:
741
- log('info', f"任务被取消但找到已存在缓存,将删除: {cache_key[:8]}...",
742
- extra={'cache_operation': 'cancel-found-cache', 'request_type': request_type})
743
-
744
- # 安全删除缓存
745
- if cache_key in response_cache_manager.cache:
746
- del response_cache_manager.cache[cache_key]
747
-
748
- # 创建但不缓存响应
749
- from app.utils.response import create_response
750
- response = create_response(chat_request, response_content)
751
- return response
752
- except Exception as inner_e:
753
- log('error', f"客户端断开后从已完成任务获取结果失败: {str(inner_e)}",
754
- extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
755
-
756
- # 删除缓存,因为出现错误
757
- if cache_key and cache_key in response_cache_manager.cache:
758
- log('info', f"因任务获取结果失败,删除缓存: {cache_key[:8]}...",
759
- extra={'cache_operation': 'remove-on-error', 'request_type': request_type})
760
- del response_cache_manager.cache[cache_key]
761
-
762
- # 创建错误响应而不是返回None
763
- return create_error_response(chat_request.model, "请求处理过程中发生错误,请重试")
764
- except Exception as e:
765
- # 处理API任务异常
766
- error_msg = str(e)
767
- extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message': error_msg}
768
- log('error', f"客户端断开后处理API响应时出错: {error_msg}", extra=extra_log)
769
-
770
- # 删除缓存,因为出现错误
771
- if cache_key and cache_key in response_cache_manager.cache:
772
- log('info', f"因API响应错误,删除缓存: {cache_key[:8]}...",
773
- extra={'cache_operation': 'remove-on-error', 'request_type': request_type})
774
- del response_cache_manager.cache[cache_key]
775
-
776
- # 创建错误响应而不是返回None
777
- return create_error_response(chat_request.model, f"请求处理错误: {error_msg}")
778
-
779
- # 非流式请求处理函数
780
- async def process_nonstream_request(
781
- chat_request: ChatCompletionRequest,
782
- http_request: Request,
783
- request_type: str,
784
- contents,
785
- system_instruction,
786
- current_api_key: str,
787
- cache_key: str = None,
788
- client_ip: str = None
789
- ):
790
- """处理非流式API请求"""
791
- gemini_client = GeminiClient(current_api_key)
792
-
793
- # 创建任务
794
- gemini_task = asyncio.create_task(
795
- run_gemini_completion(
796
- gemini_client,
797
- chat_request,
798
- contents,
799
- system_instruction,
800
- request_type,
801
- current_api_key
802
- )
803
- )
804
-
805
- disconnect_task = asyncio.create_task(
806
- check_client_disconnect(
807
- http_request,
808
- current_api_key,
809
- request_type,
810
- chat_request.model
811
- )
812
- )
813
-
814
- try:
815
- # 先等待看是否API任务先完成,或者客户端先断开连接
816
- done, pending = await asyncio.wait(
817
- [gemini_task, disconnect_task],
818
- return_when=asyncio.FIRST_COMPLETED
819
- )
820
-
821
- if disconnect_task in done:
822
- # 客户端已断开连接,但我们仍继续完成API请求以便缓存结果
823
- return await handle_client_disconnect(
824
- gemini_task,
825
- chat_request,
826
- request_type,
827
- current_api_key,
828
- cache_key,
829
- client_ip
830
- )
831
- else:
832
- # API任务先完成,取消断开检测任务
833
- disconnect_task.cancel()
834
-
835
- # 获取响应内容
836
- response_content = await gemini_task
837
-
838
- # 检查缓存是否已经存在,如果存在则不再创建新缓存
839
- cached_response, cache_hit = response_cache_manager.get(cache_key)
840
- if cache_hit:
841
- log('info', f"缓存已存在,直接返回: {cache_key[:8]}...",
842
- extra={'cache_operation': 'use-existing', 'request_type': request_type})
843
-
844
- # 安全删除缓存
845
- if cache_key in response_cache_manager.cache:
846
- del response_cache_manager.cache[cache_key]
847
- log('info', f"缓存使用后已删除: {cache_key[:8]}...",
848
- extra={'cache_operation': 'used-and-removed', 'request_type': request_type})
849
-
850
- return cached_response
851
-
852
- # 创建响应
853
- from app.utils.response import create_response
854
- response = create_response(chat_request, response_content)
855
-
856
- # 缓存响应
857
- cache_response(response, cache_key, client_ip, response_cache_manager, update_api_call_stats, api_key=current_api_key)
858
-
859
- # 立即删除缓存,确保只能使用一次
860
- if cache_key and cache_key in response_cache_manager.cache:
861
- del response_cache_manager.cache[cache_key]
862
- log('info', f"缓存创建后立即删除: {cache_key[:8]}...",
863
- extra={'cache_operation': 'store-and-remove', 'request_type': request_type})
864
-
865
- # 返回响应
866
- return response
867
-
868
- except asyncio.CancelledError:
869
- extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message':"请求被取消"}
870
- log('info', "请求取消", extra=extra_log)
871
-
872
- # 在请求被取消时先检查缓存中是否已有结果
873
- cached_response, cache_hit = response_cache_manager.get(cache_key)
874
- if cache_hit:
875
- log('info', f"请求取消但找到有效缓存,使用缓存响应: {cache_key[:8]}...",
876
- extra={'cache_operation': 'use-cache-on-cancel', 'request_type': request_type})
877
-
878
- # 安全删除缓存
879
- if cache_key in response_cache_manager.cache:
880
- del response_cache_manager.cache[cache_key]
881
- log('info', f"缓存使用后已删除: {cache_key[:8]}...",
882
- extra={'cache_operation': 'used-and-removed', 'request_type': request_type})
883
-
884
- return cached_response
885
-
886
- # 尝试完成正在进行的API请求
887
- if not gemini_task.done():
888
- log('info', "请求取消但API请求尚未完成,继续等待...",
889
- extra={'key': current_api_key[:8], 'request_type': request_type})
890
-
891
- # 使用shield确保任务不会被取消
892
- response_content = await asyncio.shield(gemini_task)
893
-
894
- # 创建响应
895
- from app.utils.response import create_response
896
- response = create_response(chat_request, response_content)
897
-
898
- # 不缓存这个响应,直接返回
899
- return response
900
- else:
901
- # 任务已完成,获取结果
902
- response_content = gemini_task.result()
903
-
904
- # 创建响应
905
- from app.utils.response import create_response
906
- response = create_response(chat_request, response_content)
907
-
908
- # 不缓存这个响应,直接返回
909
- return response
910
-
911
- except HTTPException as e:
912
- if e.status_code == status.HTTP_408_REQUEST_TIMEOUT:
913
- extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model,
914
- 'status_code': 408, 'error_message': '客户端连接中断'}
915
- log('error', "客户端连接中断,终止后续重试", extra=extra_log)
916
- raise
917
- else:
918
- raise
 
1
  from fastapi import APIRouter, HTTPException, Request, Depends, status
2
  from fastapi.responses import JSONResponse, StreamingResponse
3
  from app.models import ChatCompletionRequest, ChatCompletionResponse, ErrorResponse, ModelList
4
+ from app.services import GeminiClient
5
  from app.utils import (
 
 
 
 
 
 
6
  generate_cache_key,
7
  cache_response,
8
  create_chat_response,
9
+ create_error_response
 
 
10
  )
 
 
 
 
 
 
11
  from app.config.settings import (
12
  api_call_stats,
13
  BLOCKED_MODELS
14
  )
15
+ import asyncio
16
+ import time
17
+ import logging
18
+
19
+ # 导入拆分后的模块
20
+ from .auth import verify_password
21
+ from .logging_utils import log
22
+ from .request_handlers import process_request
23
 
24
  # 创建路由器
25
  router = APIRouter()
 
68
  MAX_REQUESTS_PER_MINUTE = _max_requests_per_minute
69
  MAX_REQUESTS_PER_DAY_PER_IP = _max_requests_per_day_per_ip
70
 
71
+ # 自定义密码验证依赖
72
+ async def custom_verify_password(request: Request):
73
+ await verify_password(request, PASSWORD)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74
 
75
  # API路由
76
  @router.get("/v1/models", response_model=ModelList)
 
80
  return ModelList(data=[{"id": model, "object": "model", "created": 1678888888, "owned_by": "organization-owner"} for model in filtered_models])
81
 
82
  @router.post("/v1/chat/completions", response_model=ChatCompletionResponse)
83
+ async def chat_completions(request: ChatCompletionRequest, http_request: Request, _: None = Depends(custom_verify_password)):
84
  # 获取客户端IP
85
  client_ip = http_request.client.host if http_request.client else "unknown"
86
 
87
  # 流式请求直接处理,不使用缓存
88
  if request.stream:
89
+ return await process_request(
90
+ request,
91
+ http_request,
92
+ "stream",
93
+ key_manager,
94
+ response_cache_manager,
95
+ active_requests_manager,
96
+ safety_settings,
97
+ safety_settings_g2,
98
+ api_call_stats,
99
+ FAKE_STREAMING,
100
+ FAKE_STREAMING_INTERVAL,
101
+ MAX_REQUESTS_PER_MINUTE,
102
+ MAX_REQUESTS_PER_DAY_PER_IP
103
+ )
104
 
105
  # 生成完整缓存键 - 用于精确匹配
106
  cache_key = generate_cache_key(request)
 
183
 
184
  # 创建请求处理任务
185
  process_task = asyncio.create_task(
186
+ process_request(
187
+ request,
188
+ http_request,
189
+ "non-stream",
190
+ key_manager,
191
+ response_cache_manager,
192
+ active_requests_manager,
193
+ safety_settings,
194
+ safety_settings_g2,
195
+ api_call_stats,
196
+ FAKE_STREAMING,
197
+ FAKE_STREAMING_INTERVAL,
198
+ MAX_REQUESTS_PER_MINUTE,
199
+ MAX_REQUESTS_PER_DAY_PER_IP,
200
+ cache_key,
201
+ client_ip
202
+ )
203
  )
204
 
205
  # 将任务添加到活跃请求池
 
221
  return cached_response
222
 
223
  # 重新抛出异常
224
+ raise
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app/api/stream_handlers.py ADDED
@@ -0,0 +1,284 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import json
3
+ import time
4
+ import random
5
+ from fastapi import Request
6
+ from fastapi.responses import StreamingResponse
7
+ from app.models import ChatCompletionRequest
8
+ from app.services import GeminiClient
9
+ from app.utils import handle_gemini_error, update_api_call_stats
10
+ from .logging_utils import log
11
+
12
+ # 流式请求处理函数
13
+ async def process_stream_request(
14
+ chat_request: ChatCompletionRequest,
15
+ http_request: Request,
16
+ contents,
17
+ system_instruction,
18
+ current_api_key: str,
19
+ key_manager,
20
+ safety_settings,
21
+ safety_settings_g2,
22
+ api_call_stats,
23
+ FAKE_STREAMING,
24
+ FAKE_STREAMING_INTERVAL
25
+ ) -> StreamingResponse:
26
+ """处理流式API请求"""
27
+
28
+ # 创建一个直接流式响应的生成器函数
29
+ async def stream_response_generator():
30
+ # 如果启用了假流式模式,使用随机遍历API密钥的方式
31
+ if FAKE_STREAMING:
32
+ # 创建一个队列用于在任务之间传递数据
33
+ queue = asyncio.Queue()
34
+ keep_alive_task = None
35
+ api_request_task = None
36
+
37
+ try:
38
+ # 创建一个保持连接的任务,持续发送换行符
39
+ async def keep_alive_sender():
40
+ try:
41
+ # 创建一个Gemini客户端用于发送保持连接的换行符
42
+ keep_alive_client = GeminiClient(current_api_key)
43
+
44
+ # 启动保持连接的生成器
45
+ keep_alive_generator = keep_alive_client.stream_chat(
46
+ chat_request,
47
+ contents,
48
+ safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings,
49
+ system_instruction
50
+ )
51
+
52
+ # 持续发送换行符直到被取消
53
+ async for line in keep_alive_generator:
54
+ if line == "\n":
55
+ # 将换行符格式化为SSE格式
56
+ formatted_chunk = {
57
+ "id": "chatcmpl-keepalive",
58
+ "object": "chat.completion.chunk",
59
+ "created": int(time.time()),
60
+ "model": chat_request.model,
61
+ "choices": [{"delta": {"content": ""}, "index": 0, "finish_reason": None}]
62
+ }
63
+ # 将格式化的换行符放入队列
64
+ await queue.put(f"data: {json.dumps(formatted_chunk)}\n\n")
65
+ except asyncio.CancelledError:
66
+ # log('info', "保持连接任务被取消",
67
+ # extra={'key': current_api_key[:8], 'request_type': 'fake-stream'})
68
+ raise
69
+ except Exception as e:
70
+ log('error', f"保持连接任务出错: {str(e)}",
71
+ extra={'key': current_api_key[:8], 'request_type': 'fake-stream'})
72
+ # 将错误放入队列
73
+ await queue.put(None)
74
+ raise
75
+
76
+ # 创建一个任务来随机遍历API密钥并请求内容
77
+ async def api_request_handler():
78
+ success = False
79
+ try:
80
+ # 重置已尝试的密钥
81
+ key_manager.reset_tried_keys_for_request()
82
+
83
+ # 获取可用的API密钥
84
+ available_keys = key_manager.api_keys.copy()
85
+ random.shuffle(available_keys) # 随机打乱密钥顺序
86
+
87
+ # 遍历所有API密钥尝试获取响应
88
+ for attempt, api_key in enumerate(available_keys, 1):
89
+ try:
90
+ log('info', f"假流式模式: 尝试API密钥 {api_key[:8]}... ({attempt}/{len(available_keys)})",
91
+ extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})
92
+
93
+ # 创建一个新的客户端使用当前API密钥
94
+ non_stream_client = GeminiClient(api_key)
95
+
96
+ # 使用非流式方式请求内容
97
+ response_content = await asyncio.to_thread(
98
+ non_stream_client.complete_chat,
99
+ chat_request,
100
+ contents,
101
+ safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings,
102
+ system_instruction
103
+ )
104
+
105
+ # 检查响应是否有效
106
+ if response_content and response_content.text:
107
+ log('info', f"假流式模式: API密钥 {api_key[:8]}... 成功获取响应",
108
+ extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})
109
+
110
+ # 将完整响应分割成小块,模拟流式返回
111
+ full_text = response_content.text
112
+ chunk_size = max(len(full_text) // 10, 1) # 至少分成10块,每块至少1个字符
113
+
114
+ for i in range(0, len(full_text), chunk_size):
115
+ chunk = full_text[i:i+chunk_size]
116
+ formatted_chunk = {
117
+ "id": "chatcmpl-someid",
118
+ "object": "chat.completion.chunk",
119
+ "created": int(time.time()),
120
+ "model": chat_request.model,
121
+ "choices": [{"delta": {"role": "assistant", "content": chunk}, "index": 0, "finish_reason": None}]
122
+ }
123
+ # 将格式化的内容块放入队列
124
+ await queue.put(f"data: {json.dumps(formatted_chunk)}\n\n")
125
+
126
+ success = True
127
+ # 更新API调用统计
128
+ update_api_call_stats(api_call_stats, api_key)
129
+ break # 成功获取响应,退出循环
130
+ else:
131
+ log('warning', f"假流式模式: API密钥 {api_key[:8]}... 返回空响应",
132
+ extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})
133
+ except Exception as e:
134
+ error_detail = handle_gemini_error(e, api_key, key_manager)
135
+ log('error', f"假流式模式: API密钥 {api_key[:8]}... 请求失败: {error_detail}",
136
+ extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})
137
+ # 继续尝试下一个API密钥
138
+
139
+ # 如果所有API密钥都尝试失败
140
+ if not success:
141
+ error_msg = "所有API密钥均请求失败,请稍后重试"
142
+ log('error', error_msg,
143
+ extra={'key': 'ALL', 'request_type': 'fake-stream', 'model': chat_request.model})
144
+
145
+ # 添加错误信息到队列
146
+ error_json = {
147
+ "id": "chatcmpl-error",
148
+ "object": "chat.completion.chunk",
149
+ "created": int(time.time()),
150
+ "model": chat_request.model,
151
+ "choices": [{"delta": {"content": f"\n\n[错误: {error_msg}]"}, "index": 0, "finish_reason": "error"}]
152
+ }
153
+ await queue.put(f"data: {json.dumps(error_json)}\n\n")
154
+
155
+ # 添加完成标记到队列
156
+ await queue.put("data: [DONE]\n\n")
157
+ # 添加None表示队列结束
158
+ await queue.put(None)
159
+
160
+ except asyncio.CancelledError:
161
+ log('info', "API请求任务被取消",
162
+ extra={'key': current_api_key[:8], 'request_type': 'fake-stream'})
163
+ # 添加None表示队列结束
164
+ await queue.put(None)
165
+ raise
166
+ except Exception as e:
167
+ log('error', f"API请求任务出错: {str(e)}",
168
+ extra={'key': current_api_key[:8], 'request_type': 'fake-stream'})
169
+ # 添加错误信息到队列
170
+ error_json = {
171
+ "id": "chatcmpl-error",
172
+ "object": "chat.completion.chunk",
173
+ "created": int(time.time()),
174
+ "model": chat_request.model,
175
+ "choices": [{"delta": {"content": f"\n\n[错误: {str(e)}]"}, "index": 0, "finish_reason": "error"}]
176
+ }
177
+ await queue.put(f"data: {json.dumps(error_json)}\n\n")
178
+ await queue.put("data: [DONE]\n\n")
179
+ # 添加None表示队列结束
180
+ await queue.put(None)
181
+ raise
182
+
183
+ # 启动保持连接的任务
184
+ keep_alive_task = asyncio.create_task(keep_alive_sender())
185
+ # 启动API请求任务
186
+ api_request_task = asyncio.create_task(api_request_handler())
187
+
188
+ # 从队列中获取数据并发送给客户端
189
+ while True:
190
+ chunk = await queue.get()
191
+ if chunk is None: # None表示队列结束
192
+ break
193
+ yield chunk
194
+
195
+ # 如果API请求任务已完成,取消保持连接任务
196
+ if api_request_task.done() and not keep_alive_task.done():
197
+ keep_alive_task.cancel()
198
+
199
+ except asyncio.CancelledError:
200
+ log('info', "流式响应生成器被取消",
201
+ extra={'key': current_api_key[:8], 'request_type': 'fake-stream'})
202
+ # 取消所有任务
203
+ if keep_alive_task and not keep_alive_task.done():
204
+ keep_alive_task.cancel()
205
+ if api_request_task and not api_request_task.done():
206
+ api_request_task.cancel()
207
+ except Exception as e:
208
+ log('error', f"流式响应生成器出错: {str(e)}",
209
+ extra={'key': current_api_key[:8], 'request_type': 'fake-stream'})
210
+ # 取消所有任务
211
+ if keep_alive_task and not keep_alive_task.done():
212
+ keep_alive_task.cancel()
213
+ if api_request_task and not api_request_task.done():
214
+ api_request_task.cancel()
215
+ # 发送错误信息给客户端
216
+ error_json = {
217
+ "id": "chatcmpl-error",
218
+ "object": "chat.completion.chunk",
219
+ "created": int(time.time()),
220
+ "model": chat_request.model,
221
+ "choices": [{"delta": {"content": f"\n\n[错误: {str(e)}]"}, "index": 0, "finish_reason": "error"}]
222
+ }
223
+ yield f"data: {json.dumps(error_json)}\n\n"
224
+ yield "data: [DONE]\n\n"
225
+ finally:
226
+ # 确保所有任务都被取消
227
+ if keep_alive_task and not keep_alive_task.done():
228
+ keep_alive_task.cancel()
229
+ if api_request_task and not api_request_task.done():
230
+ api_request_task.cancel()
231
+ else:
232
+ # 原始流式请求处理逻辑
233
+ gemini_client = GeminiClient(current_api_key)
234
+ success = False
235
+
236
+ try:
237
+ # 直接迭代生成器并发送响应块
238
+ async for chunk in gemini_client.stream_chat(
239
+ chat_request,
240
+ contents,
241
+ safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings,
242
+ system_instruction
243
+ ):
244
+ # 空字符串跳过
245
+ if not chunk:
246
+ continue
247
+
248
+ formatted_chunk = {
249
+ "id": "chatcmpl-someid",
250
+ "object": "chat.completion.chunk",
251
+ "created": int(time.time()),
252
+ "model": chat_request.model,
253
+ "choices": [{"delta": {"role": "assistant", "content": chunk}, "index": 0, "finish_reason": None}]
254
+ }
255
+ success = True # 只要有一个chunk成功,就标记为成功
256
+ yield f"data: {json.dumps(formatted_chunk)}\n\n"
257
+
258
+ # 如果成功获取到响应,更新API调用统计
259
+ if success:
260
+ update_api_call_stats(api_call_stats, current_api_key)
261
+
262
+ yield "data: [DONE]\n\n"
263
+
264
+ except asyncio.CancelledError:
265
+ extra_log_cancel = {'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model, 'error_message': '客户端已断开连接'}
266
+ log('info', "客户端连接已中断", extra=extra_log_cancel)
267
+ except Exception as e:
268
+ error_detail = handle_gemini_error(e, current_api_key, key_manager)
269
+ log('error', f"流式请求失败: {error_detail}",
270
+ extra={'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
271
+ # 发送错误信息给客户端
272
+ error_json = {
273
+ "id": "chatcmpl-error",
274
+ "object": "chat.completion.chunk",
275
+ "created": int(time.time()),
276
+ "model": chat_request.model,
277
+ "choices": [{"delta": {"content": f"\n\n[错误: {error_detail}]"}, "index": 0, "finish_reason": "error"}]
278
+ }
279
+ yield f"data: {json.dumps(error_json)}\n\n"
280
+ yield "data: [DONE]\n\n"
281
+ # 重新抛出异常,这样process_request可以捕获它
282
+ raise e
283
+
284
+ return StreamingResponse(stream_response_generator(), media_type="text/event-stream")
app/config/safety.py CHANGED
@@ -1,49 +1,49 @@
1
- # 安全设置配置
2
-
3
- # Gemini 1.0 安全设置
4
- SAFETY_SETTINGS = [
5
- {
6
- "category": "HARM_CATEGORY_HARASSMENT",
7
- "threshold": "BLOCK_NONE"
8
- },
9
- {
10
- "category": "HARM_CATEGORY_HATE_SPEECH",
11
- "threshold": "BLOCK_NONE"
12
- },
13
- {
14
- "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
15
- "threshold": "BLOCK_NONE"
16
- },
17
- {
18
- "category": "HARM_CATEGORY_DANGEROUS_CONTENT",
19
- "threshold": "BLOCK_NONE"
20
- },
21
- {
22
- "category": 'HARM_CATEGORY_CIVIC_INTEGRITY',
23
- "threshold": 'BLOCK_NONE'
24
- }
25
- ]
26
-
27
- # Gemini 2.0 安全设置
28
- SAFETY_SETTINGS_G2 = [
29
- {
30
- "category": "HARM_CATEGORY_HARASSMENT",
31
- "threshold": "BLOCK_NONE"
32
- },
33
- {
34
- "category": "HARM_CATEGORY_HATE_SPEECH",
35
- "threshold": "BLOCK_NONE"
36
- },
37
- {
38
- "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
39
- "threshold": "BLOCK_NONE"
40
- },
41
- {
42
- "category": "HARM_CATEGORY_DANGEROUS_CONTENT",
43
- "threshold": "BLOCK_NONE"
44
- },
45
- {
46
- "category": 'HARM_CATEGORY_CIVIC_INTEGRITY',
47
- "threshold": 'BLOCK_NONE'
48
- }
49
  ]
 
1
+ # 安全设置配置
2
+
3
+ # Gemini 1.0 安全设置
4
+ SAFETY_SETTINGS = [
5
+ {
6
+ "category": "HARM_CATEGORY_HARASSMENT",
7
+ "threshold": "BLOCK_NONE"
8
+ },
9
+ {
10
+ "category": "HARM_CATEGORY_HATE_SPEECH",
11
+ "threshold": "BLOCK_NONE"
12
+ },
13
+ {
14
+ "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
15
+ "threshold": "BLOCK_NONE"
16
+ },
17
+ {
18
+ "category": "HARM_CATEGORY_DANGEROUS_CONTENT",
19
+ "threshold": "BLOCK_NONE"
20
+ },
21
+ {
22
+ "category": 'HARM_CATEGORY_CIVIC_INTEGRITY',
23
+ "threshold": 'BLOCK_NONE'
24
+ }
25
+ ]
26
+
27
+ # Gemini 2.0 安全设置
28
+ SAFETY_SETTINGS_G2 = [
29
+ {
30
+ "category": "HARM_CATEGORY_HARASSMENT",
31
+ "threshold": "OFF"
32
+ },
33
+ {
34
+ "category": "HARM_CATEGORY_HATE_SPEECH",
35
+ "threshold": "OFF"
36
+ },
37
+ {
38
+ "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
39
+ "threshold": "OFF"
40
+ },
41
+ {
42
+ "category": "HARM_CATEGORY_DANGEROUS_CONTENT",
43
+ "threshold": "OFF"
44
+ },
45
+ {
46
+ "category": 'HARM_CATEGORY_CIVIC_INTEGRITY',
47
+ "threshold": 'OFF'
48
+ }
49
  ]
app/config/settings.py CHANGED
@@ -8,6 +8,10 @@ BASE_DIR = pathlib.Path(__file__).parent.parent
8
 
9
  # 流式响应配置
10
  FAKE_STREAMING = os.environ.get("FAKE_STREAMING", "true").lower() in ["true", "1", "yes"]
 
 
 
 
11
  # 假流式请求的空内容返回间隔(秒)
12
  FAKE_STREAMING_INTERVAL = float(os.environ.get("FAKE_STREAMING_INTERVAL", "1"))
13
 
@@ -67,4 +71,4 @@ DEFAULT_BLOCKED_MODELS = []
67
  # 环境变量格式应为逗号分隔的模型名称字符串
68
  BLOCKED_MODELS = os.environ.get("BLOCKED_MODELS", ",".join(DEFAULT_BLOCKED_MODELS))
69
  # 将字符串转换为列表
70
- BLOCKED_MODELS = [model.strip() for model in BLOCKED_MODELS.split(",") if model.strip()]
 
8
 
9
  # 流式响应配置
10
  FAKE_STREAMING = os.environ.get("FAKE_STREAMING", "true").lower() in ["true", "1", "yes"]
11
+
12
+ #随机字符串
13
+ RANDOM_STRING = os.environ.get("RANDOM_STRING", "true").lower() in ["true", "1", "yes"]
14
+ RANDOM_STRING_LENGTH = int(os.environ.get("RANDOM_STRING_LENGTH", "25"))
15
  # 假流式请求的空内容返回间隔(秒)
16
  FAKE_STREAMING_INTERVAL = float(os.environ.get("FAKE_STREAMING_INTERVAL", "1"))
17
 
 
71
  # 环境变量格式应为逗号分隔的模型名称字符串
72
  BLOCKED_MODELS = os.environ.get("BLOCKED_MODELS", ",".join(DEFAULT_BLOCKED_MODELS))
73
  # 将字符串转换为列表
74
+ BLOCKED_MODELS = [model.strip() for model in BLOCKED_MODELS.split(",") if model.strip()]
app/main.py CHANGED
@@ -1,292 +1,292 @@
1
- from fastapi import FastAPI, HTTPException, Request, status
2
- from fastapi.responses import JSONResponse, HTMLResponse
3
- from fastapi.staticfiles import StaticFiles
4
- from fastapi.templating import Jinja2Templates
5
- from app.models import ErrorResponse
6
- from app.services import GeminiClient
7
- from app.utils import (
8
- APIKeyManager,
9
- test_api_key,
10
- format_log_message,
11
- log_manager,
12
- ResponseCacheManager,
13
- ActiveRequestsManager,
14
- clean_expired_stats,
15
- update_api_call_stats,
16
- check_version,
17
- schedule_cache_cleanup,
18
- handle_exception,
19
- log
20
- )
21
- from app.api import router, init_router, dashboard_router, init_dashboard_router
22
- from app.config.settings import (
23
- FAKE_STREAMING,
24
- FAKE_STREAMING_INTERVAL,
25
- PASSWORD,
26
- MAX_REQUESTS_PER_MINUTE,
27
- MAX_REQUESTS_PER_DAY_PER_IP,
28
- RETRY_DELAY,
29
- MAX_RETRY_DELAY,
30
- CACHE_EXPIRY_TIME,
31
- MAX_CACHE_ENTRIES,
32
- REMOVE_CACHE_AFTER_USE,
33
- REQUEST_HISTORY_EXPIRY_TIME,
34
- ENABLE_RECONNECT_DETECTION,
35
- api_call_stats,
36
- client_request_history,
37
- local_version,
38
- remote_version,
39
- has_update,
40
- API_KEY_DAILY_LIMIT
41
- )
42
- from app.config.safety import SAFETY_SETTINGS, SAFETY_SETTINGS_G2
43
- import os
44
- import json
45
- import asyncio
46
- import time
47
- import logging
48
- from datetime import datetime, timedelta
49
- import sys
50
- import pathlib
51
-
52
- # 设置模板目录
53
- BASE_DIR = pathlib.Path(__file__).parent
54
- templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
55
-
56
- app = FastAPI()
57
-
58
- # --------------- 全局实例 ---------------
59
-
60
- # 初始化API密钥管理器
61
- key_manager = APIKeyManager()
62
- current_api_key = key_manager.get_available_key()
63
-
64
- # 创建全局缓存字典,将作为缓存管理器的内部存储
65
- response_cache = {}
66
-
67
- # 初始化缓存管理器,使用全局字典作为存储
68
- response_cache_manager = ResponseCacheManager(
69
- expiry_time=CACHE_EXPIRY_TIME,
70
- max_entries=MAX_CACHE_ENTRIES,
71
- remove_after_use=REMOVE_CACHE_AFTER_USE,
72
- cache_dict=response_cache
73
- )
74
-
75
- # 活跃请求池 - 将作为活跃请求管理器的内部存储
76
- active_requests_pool = {}
77
-
78
- # 初始化活跃请求管理器
79
- active_requests_manager = ActiveRequestsManager(requests_pool=active_requests_pool)
80
-
81
- # --------------- 工具函数 ---------------
82
-
83
- def switch_api_key():
84
- global current_api_key
85
- key = key_manager.get_available_key() # get_available_key 会处理栈的逻辑
86
- if key:
87
- current_api_key = key
88
- log('info', f"API key 替换为 → {current_api_key[:8]}...", extra={'key': current_api_key[:8], 'request_type': 'switch_key'})
89
- else:
90
- log('error', "API key 替换失败,所有API key都已尝试,请重新配置或稍后重试", extra={'key': 'N/A', 'request_type': 'switch_key', 'status_code': 'N/A'})
91
-
92
- async def check_keys():
93
- available_keys = []
94
- for key in key_manager.api_keys:
95
- is_valid = await test_api_key(key)
96
- status_msg = "有效" if is_valid else "无效"
97
- log('info', f"API Key {key[:10]}... {status_msg}.")
98
- if is_valid:
99
- available_keys.append(key)
100
- if not available_keys:
101
- log('error', "没有可用的 API 密钥!", extra={'key': 'N/A', 'request_type': 'startup', 'status_code': 'N/A'})
102
- return available_keys
103
-
104
- # 设置全局异常处理
105
- sys.excepthook = handle_exception
106
-
107
- # --------------- 事件处理 ---------------
108
-
109
- @app.on_event("startup")
110
- async def startup_event():
111
- log('info', "Starting Gemini API proxy...")
112
-
113
- # 启动缓存清理定时任务
114
- schedule_cache_cleanup(response_cache_manager, active_requests_manager)
115
-
116
- # 检查版本
117
- await check_version()
118
-
119
- available_keys = await check_keys()
120
- if available_keys:
121
- key_manager.api_keys = available_keys
122
- key_manager._reset_key_stack() # 启动时也确保创建随机栈
123
- key_manager.show_all_keys()
124
- log('info', f"可用 API 密钥数量:{len(key_manager.api_keys)}")
125
- log('info', f"最大重试次数设置为:{len(key_manager.api_keys)}")
126
- if key_manager.api_keys:
127
- all_models = await GeminiClient.list_available_models(key_manager.api_keys[0])
128
- GeminiClient.AVAILABLE_MODELS = [model.replace(
129
- "models/", "") for model in all_models]
130
- log('info', "Available models loaded.")
131
-
132
- # 初始化路由器
133
- init_router(
134
- key_manager,
135
- response_cache_manager,
136
- active_requests_manager,
137
- SAFETY_SETTINGS,
138
- SAFETY_SETTINGS_G2,
139
- current_api_key,
140
- FAKE_STREAMING,
141
- FAKE_STREAMING_INTERVAL,
142
- PASSWORD,
143
- MAX_REQUESTS_PER_MINUTE,
144
- MAX_REQUESTS_PER_DAY_PER_IP
145
- )
146
-
147
- # 初始化仪表盘路由器
148
- init_dashboard_router(
149
- key_manager,
150
- response_cache_manager,
151
- active_requests_manager
152
- )
153
-
154
- # --------------- 异常处理 ---------------
155
-
156
- @app.exception_handler(Exception)
157
- async def global_exception_handler(request: Request, exc: Exception):
158
- from app.utils import translate_error
159
- error_message = translate_error(str(exc))
160
- extra_log_unhandled_exception = {'status_code': 500, 'error_message': error_message}
161
- log('error', f"Unhandled exception: {error_message}", extra=extra_log_unhandled_exception)
162
- return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=ErrorResponse(message=str(exc), type="internal_error").dict())
163
-
164
- # --------------- 路由 ---------------
165
-
166
- # 包含API路由
167
- app.include_router(router)
168
- app.include_router(dashboard_router)
169
-
170
- @app.get("/", response_class=HTMLResponse)
171
- async def root(request: Request):
172
- # 先清理过期数据,确保统计数据是最新的
173
- clean_expired_stats(api_call_stats)
174
- response_cache_manager.clean_expired() # 使用管理器清理缓存
175
- active_requests_manager.clean_completed() # 使用管理器清理活跃请求
176
- # 获取当前统计数据
177
- now = datetime.now()
178
-
179
- # 计算过去24小时的调用总数
180
- last_24h_calls = sum(api_call_stats['last_24h']['total'].values())
181
-
182
- # 计算过去一小时内的调用总数
183
- one_hour_ago = now - timedelta(hours=1)
184
- hourly_calls = 0
185
- for hour_key, count in api_call_stats['hourly']['total'].items():
186
- try:
187
- hour_time = datetime.strptime(hour_key, '%Y-%m-%d %H:00')
188
- if hour_time >= one_hour_ago:
189
- hourly_calls += count
190
- except ValueError:
191
- continue
192
-
193
- # 计算过去一分钟内的调用总数
194
- one_minute_ago = now - timedelta(minutes=1)
195
- minute_calls = 0
196
- for minute_key, count in api_call_stats['minute']['total'].items():
197
- try:
198
- minute_time = datetime.strptime(minute_key, '%Y-%m-%d %H:%M')
199
- if minute_time >= one_minute_ago:
200
- minute_calls += count
201
- except ValueError:
202
- continue
203
-
204
- # 获取最近的日志
205
- recent_logs = log_manager.get_recent_logs(50) # 获取最近50条日志
206
-
207
- # 获取缓存统计
208
- total_cache = len(response_cache_manager.cache)
209
- valid_cache = sum(1 for _, data in response_cache_manager.cache.items()
210
- if time.time() < data.get('expiry_time', 0))
211
- cache_by_model = {}
212
-
213
- # 分析缓存数据
214
- for _, cache_data in response_cache_manager.cache.items():
215
- if time.time() < cache_data.get('expiry_time', 0):
216
- # 按模型统计缓存
217
- model = cache_data.get('response', {}).model
218
- if model:
219
- if model in cache_by_model:
220
- cache_by_model[model] += 1
221
- else:
222
- cache_by_model[model] = 1
223
-
224
- # 获取请求历史统计
225
- history_count = len(client_request_history)
226
-
227
- # 获取活跃请求统计
228
- active_count = len(active_requests_manager.active_requests)
229
- active_done = sum(1 for task in active_requests_manager.active_requests.values() if task.done())
230
- active_pending = active_count - active_done
231
-
232
- # 获取API密钥使用统计
233
- api_key_stats = []
234
- for api_key in key_manager.api_keys:
235
- # 获取API密钥前8位作为标识
236
- api_key_id = api_key[:8]
237
-
238
- # 计算24小时内的调用次数
239
- calls_24h = 0
240
- if 'by_endpoint' in api_call_stats['last_24h'] and api_key in api_call_stats['last_24h']['by_endpoint']:
241
- calls_24h = sum(api_call_stats['last_24h']['by_endpoint'][api_key].values())
242
-
243
- # 计算使用百分比
244
- usage_percent = (calls_24h / API_KEY_DAILY_LIMIT) * 100 if API_KEY_DAILY_LIMIT > 0 else 0
245
-
246
- # 添加到结果列表
247
- api_key_stats.append({
248
- 'api_key': api_key_id,
249
- 'calls_24h': calls_24h,
250
- 'limit': API_KEY_DAILY_LIMIT,
251
- 'usage_percent': round(usage_percent, 2)
252
- })
253
-
254
- # 按使用百分比降序排序
255
- api_key_stats.sort(key=lambda x: x['usage_percent'], reverse=True)
256
-
257
- # 准备模板上下文
258
- context = {
259
- "key_count": len(key_manager.api_keys),
260
- "model_count": len(GeminiClient.AVAILABLE_MODELS),
261
- "retry_count": len(key_manager.api_keys),
262
- "last_24h_calls": last_24h_calls,
263
- "hourly_calls": hourly_calls,
264
- "minute_calls": minute_calls,
265
- "max_requests_per_minute": MAX_REQUESTS_PER_MINUTE,
266
- "max_requests_per_day_per_ip": MAX_REQUESTS_PER_DAY_PER_IP,
267
- "current_time": datetime.now().strftime('%H:%M:%S'),
268
- "logs": recent_logs,
269
- # 添加版本信息
270
- "local_version": local_version,
271
- "remote_version": remote_version,
272
- "has_update": has_update,
273
- # 添加缓存信息
274
- "cache_entries": total_cache,
275
- "valid_cache": valid_cache,
276
- "expired_cache": total_cache - valid_cache,
277
- "cache_expiry_time": CACHE_EXPIRY_TIME,
278
- "max_cache_entries": MAX_CACHE_ENTRIES,
279
- "cache_by_model": cache_by_model,
280
- "request_history_count": history_count,
281
- "enable_reconnect_detection": ENABLE_RECONNECT_DETECTION,
282
- "remove_cache_after_use": REMOVE_CACHE_AFTER_USE,
283
- # 添加活跃请求池信息
284
- "active_count": active_count,
285
- "active_done": active_done,
286
- "active_pending": active_pending,
287
- # 添加API密钥统计
288
- "api_key_stats": api_key_stats,
289
- }
290
-
291
- # 使用Jinja2模板引擎正确渲染HTML
292
  return templates.TemplateResponse("index.html", {"request": request, **context})
 
1
+ from fastapi import FastAPI, HTTPException, Request, status
2
+ from fastapi.responses import JSONResponse, HTMLResponse
3
+ from fastapi.staticfiles import StaticFiles
4
+ from fastapi.templating import Jinja2Templates
5
+ from app.models import ErrorResponse
6
+ from app.services import GeminiClient
7
+ from app.utils import (
8
+ APIKeyManager,
9
+ test_api_key,
10
+ format_log_message,
11
+ log_manager,
12
+ ResponseCacheManager,
13
+ ActiveRequestsManager,
14
+ clean_expired_stats,
15
+ update_api_call_stats,
16
+ check_version,
17
+ schedule_cache_cleanup,
18
+ handle_exception,
19
+ log
20
+ )
21
+ from app.api import router, init_router, dashboard_router, init_dashboard_router
22
+ from app.config.settings import (
23
+ FAKE_STREAMING,
24
+ FAKE_STREAMING_INTERVAL,
25
+ PASSWORD,
26
+ MAX_REQUESTS_PER_MINUTE,
27
+ MAX_REQUESTS_PER_DAY_PER_IP,
28
+ RETRY_DELAY,
29
+ MAX_RETRY_DELAY,
30
+ CACHE_EXPIRY_TIME,
31
+ MAX_CACHE_ENTRIES,
32
+ REMOVE_CACHE_AFTER_USE,
33
+ REQUEST_HISTORY_EXPIRY_TIME,
34
+ ENABLE_RECONNECT_DETECTION,
35
+ api_call_stats,
36
+ client_request_history,
37
+ local_version,
38
+ remote_version,
39
+ has_update,
40
+ API_KEY_DAILY_LIMIT
41
+ )
42
+ from app.config.safety import SAFETY_SETTINGS, SAFETY_SETTINGS_G2
43
+ import os
44
+ import json
45
+ import asyncio
46
+ import time
47
+ import logging
48
+ from datetime import datetime, timedelta
49
+ import sys
50
+ import pathlib
51
+
52
+ # 设置模板目录
53
+ BASE_DIR = pathlib.Path(__file__).parent
54
+ templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
55
+
56
+ app = FastAPI()
57
+
58
+ # --------------- 全局实例 ---------------
59
+
60
+ # 初始化API密钥管理器
61
+ key_manager = APIKeyManager()
62
+ current_api_key = key_manager.get_available_key()
63
+
64
+ # 创建全局缓存字典,将作为缓存管理器的内部存储
65
+ response_cache = {}
66
+
67
+ # 初始化缓存管理器,使用全局字典作为存储
68
+ response_cache_manager = ResponseCacheManager(
69
+ expiry_time=CACHE_EXPIRY_TIME,
70
+ max_entries=MAX_CACHE_ENTRIES,
71
+ remove_after_use=REMOVE_CACHE_AFTER_USE,
72
+ cache_dict=response_cache
73
+ )
74
+
75
+ # 活跃请求池 - 将作为活跃请求管理器的内部存储
76
+ active_requests_pool = {}
77
+
78
+ # 初始化活跃请求管理器
79
+ active_requests_manager = ActiveRequestsManager(requests_pool=active_requests_pool)
80
+
81
+ # --------------- 工具函数 ---------------
82
+
83
+ def switch_api_key():
84
+ global current_api_key
85
+ key = key_manager.get_available_key() # get_available_key 会处理栈的逻辑
86
+ if key:
87
+ current_api_key = key
88
+ log('info', f"API key 替换为 → {current_api_key[:8]}...", extra={'key': current_api_key[:8], 'request_type': 'switch_key'})
89
+ else:
90
+ log('error', "API key 替换失败,所有API key都已尝试,请重新配置或稍后重试", extra={'key': 'N/A', 'request_type': 'switch_key', 'status_code': 'N/A'})
91
+
92
+ async def check_keys():
93
+ available_keys = []
94
+ for key in key_manager.api_keys:
95
+ is_valid = await test_api_key(key)
96
+ status_msg = "有效" if is_valid else "无效"
97
+ log('info', f"API Key {key[:10]}... {status_msg}.")
98
+ if is_valid:
99
+ available_keys.append(key)
100
+ if not available_keys:
101
+ log('error', "没有可用的 API 密钥!", extra={'key': 'N/A', 'request_type': 'startup', 'status_code': 'N/A'})
102
+ return available_keys
103
+
104
+ # 设置全局异常处理
105
+ sys.excepthook = handle_exception
106
+
107
+ # --------------- 事件处理 ---------------
108
+
109
+ @app.on_event("startup")
110
+ async def startup_event():
111
+ log('info', "Starting Gemini API proxy...")
112
+
113
+ # 启动缓存清理定时任务
114
+ schedule_cache_cleanup(response_cache_manager, active_requests_manager)
115
+
116
+ # 检查版本
117
+ await check_version()
118
+
119
+ available_keys = await check_keys()
120
+ if available_keys:
121
+ key_manager.api_keys = available_keys
122
+ key_manager._reset_key_stack() # 启动时也确保创建随机栈
123
+ key_manager.show_all_keys()
124
+ log('info', f"可用 API 密钥数量:{len(key_manager.api_keys)}")
125
+ log('info', f"最大重试次数设置为:{len(key_manager.api_keys)}")
126
+ if key_manager.api_keys:
127
+ all_models = await GeminiClient.list_available_models(key_manager.api_keys[0])
128
+ GeminiClient.AVAILABLE_MODELS = [model.replace(
129
+ "models/", "") for model in all_models]
130
+ log('info', "Available models loaded.")
131
+
132
+ # 初始化路由器
133
+ init_router(
134
+ key_manager,
135
+ response_cache_manager,
136
+ active_requests_manager,
137
+ SAFETY_SETTINGS,
138
+ SAFETY_SETTINGS_G2,
139
+ current_api_key,
140
+ FAKE_STREAMING,
141
+ FAKE_STREAMING_INTERVAL,
142
+ PASSWORD,
143
+ MAX_REQUESTS_PER_MINUTE,
144
+ MAX_REQUESTS_PER_DAY_PER_IP
145
+ )
146
+
147
+ # 初始化仪表盘路由器
148
+ init_dashboard_router(
149
+ key_manager,
150
+ response_cache_manager,
151
+ active_requests_manager
152
+ )
153
+
154
+ # --------------- 异常处理 ---------------
155
+
156
+ @app.exception_handler(Exception)
157
+ async def global_exception_handler(request: Request, exc: Exception):
158
+ from app.utils import translate_error
159
+ error_message = translate_error(str(exc))
160
+ extra_log_unhandled_exception = {'status_code': 500, 'error_message': error_message}
161
+ log('error', f"Unhandled exception: {error_message}", extra=extra_log_unhandled_exception)
162
+ return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=ErrorResponse(message=str(exc), type="internal_error").dict())
163
+
164
+ # --------------- 路由 ---------------
165
+
166
+ # 包含API路由
167
+ app.include_router(router)
168
+ app.include_router(dashboard_router)
169
+
170
+ @app.get("/", response_class=HTMLResponse)
171
+ async def root(request: Request):
172
+ # 先清理过期数据,确保统计数据是最新的
173
+ clean_expired_stats(api_call_stats)
174
+ response_cache_manager.clean_expired() # 使用管理器清理缓存
175
+ active_requests_manager.clean_completed() # 使用管理器清理活跃请求
176
+ # 获取当前统计数据
177
+ now = datetime.now()
178
+
179
+ # 计算过去24小时的调用总数
180
+ last_24h_calls = sum(api_call_stats['last_24h']['total'].values())
181
+
182
+ # 计算过去一小时内的调用总数
183
+ one_hour_ago = now - timedelta(hours=1)
184
+ hourly_calls = 0
185
+ for hour_key, count in api_call_stats['hourly']['total'].items():
186
+ try:
187
+ hour_time = datetime.strptime(hour_key, '%Y-%m-%d %H:00')
188
+ if hour_time >= one_hour_ago:
189
+ hourly_calls += count
190
+ except ValueError:
191
+ continue
192
+
193
+ # 计算过去一分钟内的调用总数
194
+ one_minute_ago = now - timedelta(minutes=1)
195
+ minute_calls = 0
196
+ for minute_key, count in api_call_stats['minute']['total'].items():
197
+ try:
198
+ minute_time = datetime.strptime(minute_key, '%Y-%m-%d %H:%M')
199
+ if minute_time >= one_minute_ago:
200
+ minute_calls += count
201
+ except ValueError:
202
+ continue
203
+
204
+ # 获取最近的日志
205
+ recent_logs = log_manager.get_recent_logs(50) # 获取最近50条日志
206
+
207
+ # 获取缓存统计
208
+ total_cache = len(response_cache_manager.cache)
209
+ valid_cache = sum(1 for _, data in response_cache_manager.cache.items()
210
+ if time.time() < data.get('expiry_time', 0))
211
+ cache_by_model = {}
212
+
213
+ # 分析缓存数据
214
+ for _, cache_data in response_cache_manager.cache.items():
215
+ if time.time() < cache_data.get('expiry_time', 0):
216
+ # 按模型统计缓存
217
+ model = cache_data.get('response', {}).model
218
+ if model:
219
+ if model in cache_by_model:
220
+ cache_by_model[model] += 1
221
+ else:
222
+ cache_by_model[model] = 1
223
+
224
+ # 获取请求历史统计
225
+ history_count = len(client_request_history)
226
+
227
+ # 获取活跃请求统计
228
+ active_count = len(active_requests_manager.active_requests)
229
+ active_done = sum(1 for task in active_requests_manager.active_requests.values() if task.done())
230
+ active_pending = active_count - active_done
231
+
232
+ # 获取API密钥使用统计
233
+ api_key_stats = []
234
+ for api_key in key_manager.api_keys:
235
+ # 获取API密钥前8位作为标识
236
+ api_key_id = api_key[:8]
237
+
238
+ # 计算24小时内的调用次数
239
+ calls_24h = 0
240
+ if 'by_endpoint' in api_call_stats['last_24h'] and api_key in api_call_stats['last_24h']['by_endpoint']:
241
+ calls_24h = sum(api_call_stats['last_24h']['by_endpoint'][api_key].values())
242
+
243
+ # 计算使用百分比
244
+ usage_percent = (calls_24h / API_KEY_DAILY_LIMIT) * 100 if API_KEY_DAILY_LIMIT > 0 else 0
245
+
246
+ # 添加到结果列表
247
+ api_key_stats.append({
248
+ 'api_key': api_key_id,
249
+ 'calls_24h': calls_24h,
250
+ 'limit': API_KEY_DAILY_LIMIT,
251
+ 'usage_percent': round(usage_percent, 2)
252
+ })
253
+
254
+ # 按使用百分比降序排序
255
+ api_key_stats.sort(key=lambda x: x['usage_percent'], reverse=True)
256
+
257
+ # 准备模板上下文
258
+ context = {
259
+ "key_count": len(key_manager.api_keys),
260
+ "model_count": len(GeminiClient.AVAILABLE_MODELS),
261
+ "retry_count": len(key_manager.api_keys),
262
+ "last_24h_calls": last_24h_calls,
263
+ "hourly_calls": hourly_calls,
264
+ "minute_calls": minute_calls,
265
+ "max_requests_per_minute": MAX_REQUESTS_PER_MINUTE,
266
+ "max_requests_per_day_per_ip": MAX_REQUESTS_PER_DAY_PER_IP,
267
+ "current_time": datetime.now().strftime('%H:%M:%S'),
268
+ "logs": recent_logs,
269
+ # 添加版本信息
270
+ "local_version": local_version,
271
+ "remote_version": remote_version,
272
+ "has_update": has_update,
273
+ # 添加缓存信息
274
+ "cache_entries": total_cache,
275
+ "valid_cache": valid_cache,
276
+ "expired_cache": total_cache - valid_cache,
277
+ "cache_expiry_time": CACHE_EXPIRY_TIME,
278
+ "max_cache_entries": MAX_CACHE_ENTRIES,
279
+ "cache_by_model": cache_by_model,
280
+ "request_history_count": history_count,
281
+ "enable_reconnect_detection": ENABLE_RECONNECT_DETECTION,
282
+ "remove_cache_after_use": REMOVE_CACHE_AFTER_USE,
283
+ # 添加活跃请求池信息
284
+ "active_count": active_count,
285
+ "active_done": active_done,
286
+ "active_pending": active_pending,
287
+ # 添加API密钥统计
288
+ "api_key_stats": api_key_stats,
289
+ }
290
+
291
+ # 使用Jinja2模板引擎正确渲染HTML
292
  return templates.TemplateResponse("index.html", {"request": request, **context})
app/services/gemini.py CHANGED
@@ -1,346 +1,361 @@
1
- import requests
2
- import json
3
- import os
4
- import asyncio
5
- import time
6
- from app.models import ChatCompletionRequest, Message
7
- from dataclasses import dataclass
8
- from typing import Optional, Dict, Any, List
9
- import httpx
10
- import logging
11
- from app.utils import format_log_message
12
-
13
- logger = logging.getLogger('my_logger')
14
-
15
- # 是否启用假流式请求 默认启用
16
- FAKE_STREAMING = os.environ.get("FAKE_STREAMING", "true").lower() in ["true", "1", "yes"]
17
- # 假流式请求的空内容返回间隔(秒)
18
- FAKE_STREAMING_INTERVAL = float(os.environ.get("FAKE_STREAMING_INTERVAL", "1"))
19
-
20
- @dataclass
21
- class GeneratedText:
22
- text: str
23
- finish_reason: Optional[str] = None
24
-
25
-
26
- class ResponseWrapper:
27
- def __init__(self, data: Dict[Any, Any]): # 正确的初始化方法名
28
- self._data = data
29
- self._text = self._extract_text()
30
- self._finish_reason = self._extract_finish_reason()
31
- self._prompt_token_count = self._extract_prompt_token_count()
32
- self._candidates_token_count = self._extract_candidates_token_count()
33
- self._total_token_count = self._extract_total_token_count()
34
- self._thoughts = self._extract_thoughts()
35
- self._json_dumps = json.dumps(self._data, indent=4, ensure_ascii=False)
36
-
37
- def _extract_thoughts(self) -> Optional[str]:
38
- try:
39
- for part in self._data['candidates'][0]['content']['parts']:
40
- if 'thought' in part:
41
- return part['text']
42
- return ""
43
- except (KeyError, IndexError):
44
- return ""
45
-
46
- def _extract_text(self) -> str:
47
- try:
48
- for part in self._data['candidates'][0]['content']['parts']:
49
- if 'thought' not in part:
50
- return part['text']
51
- return ""
52
- except (KeyError, IndexError):
53
- return ""
54
-
55
- def _extract_finish_reason(self) -> Optional[str]:
56
- try:
57
- return self._data['candidates'][0].get('finishReason')
58
- except (KeyError, IndexError):
59
- return None
60
-
61
- def _extract_prompt_token_count(self) -> Optional[int]:
62
- try:
63
- return self._data['usageMetadata'].get('promptTokenCount')
64
- except (KeyError):
65
- return None
66
-
67
- def _extract_candidates_token_count(self) -> Optional[int]:
68
- try:
69
- return self._data['usageMetadata'].get('candidatesTokenCount')
70
- except (KeyError):
71
- return None
72
-
73
- def _extract_total_token_count(self) -> Optional[int]:
74
- try:
75
- return self._data['usageMetadata'].get('totalTokenCount')
76
- except (KeyError):
77
- return None
78
-
79
- @property
80
- def text(self) -> str:
81
- return self._text
82
-
83
- @property
84
- def finish_reason(self) -> Optional[str]:
85
- return self._finish_reason
86
-
87
- @property
88
- def prompt_token_count(self) -> Optional[int]:
89
- return self._prompt_token_count
90
-
91
- @property
92
- def candidates_token_count(self) -> Optional[int]:
93
- return self._candidates_token_count
94
-
95
- @property
96
- def total_token_count(self) -> Optional[int]:
97
- return self._total_token_count
98
-
99
- @property
100
- def thoughts(self) -> Optional[str]:
101
- return self._thoughts
102
-
103
- @property
104
- def json_dumps(self) -> str:
105
- return self._json_dumps
106
-
107
-
108
- class GeminiClient:
109
-
110
- AVAILABLE_MODELS = []
111
- EXTRA_MODELS = os.environ.get("EXTRA_MODELS", "").split(",")
112
-
113
- def __init__(self, api_key: str):
114
- self.api_key = api_key
115
-
116
- async def stream_chat(self, request: ChatCompletionRequest, contents, safety_settings, system_instruction):
117
- extra_log = {'key': self.api_key[:8], 'request_type': 'stream', 'model': request.model, 'status_code': 'N/A'}
118
- log_msg = format_log_message('INFO', "流式请求开始", extra=extra_log)
119
- logger.info(log_msg)
120
-
121
- # 检查是否启用假流式请求
122
- if FAKE_STREAMING:
123
- log_msg = format_log_message('INFO', "使用假流式请求模式(发送换行符保持连接)", extra=extra_log)
124
- logger.info(log_msg)
125
-
126
- try:
127
- # 这个方法不再直接使用self.api_key,而是由main.py提供API密钥列表和管理
128
- # 在这里,我们只负责持续发送换行符,直到main.py那边获取到响应
129
-
130
- # 持续发送换行符,直到外部取消此生成器
131
- start_time = time.time()
132
- while True:
133
- # 发送换行符作为保活消息
134
- yield "\n"
135
- # 等待一段时间
136
- await asyncio.sleep(FAKE_STREAMING_INTERVAL)
137
-
138
- # 如果等待时间过长(超过300秒),防止无限等待
139
- if time.time() - start_time > 300:
140
- log_msg = format_log_message('WARNING', "假流式请求等待时间过长,强制结束", extra=extra_log)
141
- logger.warning(log_msg)
142
- # 抛出超时异常,让外部处理
143
- error_msg = "假流式请求等待时间过长,所有API密钥均已尝试"
144
- extra_log_timeout = {'key': self.api_key[:8], 'request_type': 'fake-stream', 'model': request.model, 'status_code': 'TIMEOUT', 'error_message': error_msg}
145
- log_msg = format_log_message('ERROR', error_msg, extra=extra_log_timeout)
146
- logger.error(log_msg)
147
- raise TimeoutError(error_msg)
148
-
149
- except Exception as e:
150
- if not isinstance(e, asyncio.CancelledError): # 忽略取消异常的日志记录
151
- error_msg = f"假流式处理期间发生错误: {str(e)}"
152
- extra_log_error = {'key': self.api_key[:8], 'request_type': 'fake-stream', 'model': request.model, 'status_code': 'ERROR', 'error_message': error_msg}
153
- log_msg = format_log_message('ERROR', error_msg, extra=extra_log_error)
154
- logger.error(log_msg)
155
- raise e
156
- finally:
157
- log_msg = format_log_message('INFO', "假流式请求结束", extra=extra_log)
158
- logger.info(log_msg)
159
- else:
160
- # 原始流式请求处理逻辑
161
- api_version = "v1beta" # 统一使用 v1beta
162
- url = f"https://generativelanguage.googleapis.com/{api_version}/models/{request.model}:streamGenerateContent?key={self.api_key}&alt=sse"
163
- headers = {
164
- "Content-Type": "application/json",
165
- }
166
- data = {
167
- "contents": contents,
168
- "generationConfig": {
169
- "temperature": request.temperature,
170
- "maxOutputTokens": request.max_tokens,
171
- },
172
- "safetySettings": safety_settings,
173
- }
174
- if system_instruction:
175
- data["system_instruction"] = system_instruction
176
-
177
- async with httpx.AsyncClient() as client:
178
- async with client.stream("POST", url, headers=headers, json=data, timeout=600) as response:
179
- buffer = b""
180
- try:
181
- async for line in response.aiter_lines():
182
- if not line.strip():
183
- continue
184
- if line.startswith("data: "):
185
- line = line[len("data: "):]
186
- buffer += line.encode('utf-8')
187
- try:
188
- data = json.loads(buffer.decode('utf-8'))
189
- buffer = b""
190
- if 'candidates' in data and data['candidates']:
191
- candidate = data['candidates'][0]
192
- if 'content' in candidate:
193
- content = candidate['content']
194
- if 'parts' in content and content['parts']:
195
- parts = content['parts']
196
- text = ""
197
- for part in parts:
198
- if 'text' in part:
199
- text += part['text']
200
- if text:
201
- yield text
202
-
203
- if candidate.get("finishReason") and candidate.get("finishReason") != "STOP":
204
- error_msg = f"模型的响应被截断: {candidate.get('finishReason')}"
205
- extra_log_error = {'key': self.api_key[:8], 'request_type': 'stream', 'model': request.model, 'status_code': 'ERROR', 'error_message': error_msg}
206
- log_msg = format_log_message('WARNING', error_msg, extra=extra_log_error)
207
- logger.warning(log_msg)
208
- raise ValueError(error_msg)
209
-
210
- if 'safetyRatings' in candidate:
211
- for rating in candidate['safetyRatings']:
212
- if rating['probability'] == 'HIGH':
213
- error_msg = f"模型的响应被截断: {rating['category']}"
214
- extra_log_safety = {'key': self.api_key[:8], 'request_type': 'stream', 'model': request.model, 'status_code': 'ERROR', 'error_message': error_msg}
215
- log_msg = format_log_message('WARNING', error_msg, extra=extra_log_safety)
216
- logger.warning(log_msg)
217
- raise ValueError(error_msg)
218
- except json.JSONDecodeError:
219
- continue
220
- except Exception as e:
221
- error_msg = f"流式处理期间发生错误: {str(e)}"
222
- extra_log_stream_error = {'key': self.api_key[:8], 'request_type': 'stream', 'model': request.model, 'status_code': 'ERROR', 'error_message': error_msg}
223
- log_msg = format_log_message('ERROR', error_msg, extra=extra_log_stream_error)
224
- logger.error(log_msg)
225
- raise e
226
- except Exception as e:
227
- raise e
228
- finally:
229
- log_msg = format_log_message('INFO', "流式请求结束", extra=extra_log)
230
- logger.info(log_msg)
231
-
232
- def complete_chat(self, request: ChatCompletionRequest, contents, safety_settings, system_instruction):
233
- extra_log = {'key': self.api_key[:8], 'request_type': 'non-stream', 'model': request.model, 'status_code': 'N/A'}
234
- log_msg = format_log_message('INFO', "非流式请求开始", extra=extra_log)
235
- logger.info(log_msg)
236
-
237
- api_version = "v1beta" # 统一使用 v1beta
238
- url = f"https://generativelanguage.googleapis.com/{api_version}/models/{request.model}:generateContent?key={self.api_key}"
239
- headers = {
240
- "Content-Type": "application/json",
241
- }
242
- data = {
243
- "contents": contents,
244
- "generationConfig": {
245
- "temperature": request.temperature,
246
- "maxOutputTokens": request.max_tokens,
247
- },
248
- "safetySettings": safety_settings,
249
- }
250
- if system_instruction:
251
- data["system_instruction"] = system_instruction
252
-
253
- try:
254
- response = requests.post(url, headers=headers, json=data)
255
- response.raise_for_status()
256
-
257
- log_msg = format_log_message('INFO', "非流式请求成功完成", extra=extra_log)
258
- logger.info(log_msg)
259
-
260
- return ResponseWrapper(response.json())
261
- except Exception as e:
262
- raise
263
-
264
- def convert_messages(self, messages, use_system_prompt=False):
265
- gemini_history = []
266
- errors = []
267
- system_instruction_text = ""
268
- is_system_phase = use_system_prompt
269
- for i, message in enumerate(messages):
270
- role = message.role
271
- content = message.content
272
-
273
- if isinstance(content, str):
274
- if is_system_phase and role == 'system':
275
- if system_instruction_text:
276
- system_instruction_text += "\n" + content
277
- else:
278
- system_instruction_text = content
279
- else:
280
- is_system_phase = False
281
-
282
- if role in ['user', 'system']:
283
- role_to_use = 'user'
284
- elif role == 'assistant':
285
- role_to_use = 'model'
286
- else:
287
- errors.append(f"Invalid role: {role}")
288
- continue
289
-
290
- if gemini_history and gemini_history[-1]['role'] == role_to_use:
291
- gemini_history[-1]['parts'].append({"text": content})
292
- else:
293
- gemini_history.append(
294
- {"role": role_to_use, "parts": [{"text": content}]})
295
- elif isinstance(content, list):
296
- parts = []
297
- for item in content:
298
- if item.get('type') == 'text':
299
- parts.append({"text": item.get('text')})
300
- elif item.get('type') == 'image_url':
301
- image_data = item.get('image_url', {}).get('url', '')
302
- if image_data.startswith('data:image/'):
303
- try:
304
- mime_type, base64_data = image_data.split(';')[0].split(':')[1], image_data.split(',')[1]
305
- parts.append({
306
- "inline_data": {
307
- "mime_type": mime_type,
308
- "data": base64_data
309
- }
310
- })
311
- except (IndexError, ValueError):
312
- errors.append(
313
- f"Invalid data URI for image: {image_data}")
314
- else:
315
- errors.append(
316
- f"Invalid image URL format for item: {item}")
317
-
318
- if parts:
319
- if role in ['user', 'system']:
320
- role_to_use = 'user'
321
- elif role == 'assistant':
322
- role_to_use = 'model'
323
- else:
324
- errors.append(f"Invalid role: {role}")
325
- continue
326
- if gemini_history and gemini_history[-1]['role'] == role_to_use:
327
- gemini_history[-1]['parts'].extend(parts)
328
- else:
329
- gemini_history.append(
330
- {"role": role_to_use, "parts": parts})
331
- if errors:
332
- return errors
333
- else:
334
- return gemini_history, {"parts": [{"text": system_instruction_text}]}
335
-
336
- @staticmethod
337
- async def list_available_models(api_key) -> list:
338
- url = "https://generativelanguage.googleapis.com/v1beta/models?key={}".format(
339
- api_key)
340
- async with httpx.AsyncClient() as client:
341
- response = await client.get(url)
342
- response.raise_for_status()
343
- data = response.json()
344
- models = [model["name"] for model in data.get("models", [])]
345
- models.extend(GeminiClient.EXTRA_MODELS)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
346
  return models
 
1
+ import requests
2
+ import json
3
+ import os
4
+ import asyncio
5
+ import time
6
+ from app.models import ChatCompletionRequest, Message
7
+ from dataclasses import dataclass
8
+ from typing import Optional, Dict, Any, List
9
+ import httpx
10
+ import logging
11
+ import secrets
12
+ import string
13
+ from app.utils import format_log_message
14
+ from app.config.settings import (
15
+ RANDOM_STRING,
16
+ RANDOM_STRING_LENGTH
17
+ )
18
+
19
+ def generate_secure_random_string(length):
20
+ all_characters = string.ascii_letters + string.digits
21
+ secure_random_string = ''.join(secrets.choice(all_characters) for _ in range(length))
22
+ return secure_random_string
23
+
24
+ logger = logging.getLogger('my_logger')
25
+
26
+ # 是否启用假流式请求 默认启用
27
+ FAKE_STREAMING = os.environ.get("FAKE_STREAMING", "true").lower() in ["true", "1", "yes"]
28
+ # 假流式请求的空内容返回间隔(秒)
29
+ FAKE_STREAMING_INTERVAL = float(os.environ.get("FAKE_STREAMING_INTERVAL", "1"))
30
+
31
+ @dataclass
32
+ class GeneratedText:
33
+ text: str
34
+ finish_reason: Optional[str] = None
35
+
36
+
37
+ class ResponseWrapper:
38
+ def __init__(self, data: Dict[Any, Any]): # 正确的初始化方法名
39
+ self._data = data
40
+ self._text = self._extract_text()
41
+ self._finish_reason = self._extract_finish_reason()
42
+ self._prompt_token_count = self._extract_prompt_token_count()
43
+ self._candidates_token_count = self._extract_candidates_token_count()
44
+ self._total_token_count = self._extract_total_token_count()
45
+ self._thoughts = self._extract_thoughts()
46
+ self._json_dumps = json.dumps(self._data, indent=4, ensure_ascii=False)
47
+
48
+ def _extract_thoughts(self) -> Optional[str]:
49
+ try:
50
+ for part in self._data['candidates'][0]['content']['parts']:
51
+ if 'thought' in part:
52
+ return part['text']
53
+ return ""
54
+ except (KeyError, IndexError):
55
+ return ""
56
+
57
+ def _extract_text(self) -> str:
58
+ try:
59
+ for part in self._data['candidates'][0]['content']['parts']:
60
+ if 'thought' not in part:
61
+ return part['text']
62
+ return ""
63
+ except (KeyError, IndexError):
64
+ return ""
65
+
66
+ def _extract_finish_reason(self) -> Optional[str]:
67
+ try:
68
+ return self._data['candidates'][0].get('finishReason')
69
+ except (KeyError, IndexError):
70
+ return None
71
+
72
+ def _extract_prompt_token_count(self) -> Optional[int]:
73
+ try:
74
+ return self._data['usageMetadata'].get('promptTokenCount')
75
+ except (KeyError):
76
+ return None
77
+
78
+ def _extract_candidates_token_count(self) -> Optional[int]:
79
+ try:
80
+ return self._data['usageMetadata'].get('candidatesTokenCount')
81
+ except (KeyError):
82
+ return None
83
+
84
+ def _extract_total_token_count(self) -> Optional[int]:
85
+ try:
86
+ return self._data['usageMetadata'].get('totalTokenCount')
87
+ except (KeyError):
88
+ return None
89
+
90
+ @property
91
+ def text(self) -> str:
92
+ return self._text
93
+
94
+ @property
95
+ def finish_reason(self) -> Optional[str]:
96
+ return self._finish_reason
97
+
98
+ @property
99
+ def prompt_token_count(self) -> Optional[int]:
100
+ return self._prompt_token_count
101
+
102
+ @property
103
+ def candidates_token_count(self) -> Optional[int]:
104
+ return self._candidates_token_count
105
+
106
+ @property
107
+ def total_token_count(self) -> Optional[int]:
108
+ return self._total_token_count
109
+
110
+ @property
111
+ def thoughts(self) -> Optional[str]:
112
+ return self._thoughts
113
+
114
+ @property
115
+ def json_dumps(self) -> str:
116
+ return self._json_dumps
117
+
118
+
119
+ class GeminiClient:
120
+
121
+ AVAILABLE_MODELS = []
122
+ EXTRA_MODELS = os.environ.get("EXTRA_MODELS", "").split(",")
123
+
124
+ def __init__(self, api_key: str):
125
+ self.api_key = api_key
126
+
127
+ async def stream_chat(self, request: ChatCompletionRequest, contents, safety_settings, system_instruction):
128
+ extra_log = {'key': self.api_key[:8], 'request_type': 'stream', 'model': request.model, 'status_code': 'N/A'}
129
+ log_msg = format_log_message('INFO', "流式请求开始", extra=extra_log)
130
+ logger.info(log_msg)
131
+
132
+ # 检查是否启用假流式请求
133
+ if FAKE_STREAMING:
134
+ log_msg = format_log_message('INFO', "使用假流式请求模式(发送换行符保持连接)", extra=extra_log)
135
+ logger.info(log_msg)
136
+
137
+ try:
138
+ # 这个方法不再直接使用self.api_key,而是由main.py提供API密钥列表和管理
139
+ # 在这里,我们只负责持续发送换行符,直到main.py那边获取到响应
140
+
141
+ # 持续发送换行符,直到外部取消此生成器
142
+ start_time = time.time()
143
+ while True:
144
+ # 发送换行符作为保活消息
145
+ yield "\n"
146
+ # 等待一段时间
147
+ await asyncio.sleep(FAKE_STREAMING_INTERVAL)
148
+
149
+ # 如果等待时间过长(超过300秒),防止无限等待
150
+ if time.time() - start_time > 300:
151
+ log_msg = format_log_message('WARNING', "假流式请求等待时间过长,强制结束", extra=extra_log)
152
+ logger.warning(log_msg)
153
+ # 抛出超时异常,让外部处理
154
+ error_msg = "假流式请求等待时间过长,所有API密钥均已尝试"
155
+ extra_log_timeout = {'key': self.api_key[:8], 'request_type': 'fake-stream', 'model': request.model, 'status_code': 'TIMEOUT', 'error_message': error_msg}
156
+ log_msg = format_log_message('ERROR', error_msg, extra=extra_log_timeout)
157
+ logger.error(log_msg)
158
+ raise TimeoutError(error_msg)
159
+
160
+ except Exception as e:
161
+ if not isinstance(e, asyncio.CancelledError): # 忽略取消异常的日志记录
162
+ error_msg = f"假流式处理期间发生错误: {str(e)}"
163
+ extra_log_error = {'key': self.api_key[:8], 'request_type': 'fake-stream', 'model': request.model, 'status_code': 'ERROR', 'error_message': error_msg}
164
+ log_msg = format_log_message('ERROR', error_msg, extra=extra_log_error)
165
+ logger.error(log_msg)
166
+ raise e
167
+ finally:
168
+ log_msg = format_log_message('INFO', "假流式请求结束", extra=extra_log)
169
+ logger.info(log_msg)
170
+ else:
171
+ # 原始流式请求处理逻辑
172
+ api_version = "v1alpha" if "think" in request.model else "v1beta"
173
+ url = f"https://generativelanguage.googleapis.com/{api_version}/models/{request.model}:streamGenerateContent?key={self.api_key}&alt=sse"
174
+ headers = {
175
+ "Content-Type": "application/json",
176
+ }
177
+ data = {
178
+ "contents": contents,
179
+ "generationConfig": {
180
+ "temperature": request.temperature,
181
+ "maxOutputTokens": request.max_tokens,
182
+ },
183
+ "safetySettings": safety_settings,
184
+ }
185
+ if system_instruction:
186
+ data["system_instruction"] = system_instruction
187
+
188
+ async with httpx.AsyncClient() as client:
189
+ async with client.stream("POST", url, headers=headers, json=data, timeout=600) as response:
190
+ buffer = b""
191
+ try:
192
+ async for line in response.aiter_lines():
193
+ if not line.strip():
194
+ continue
195
+ if line.startswith("data: "):
196
+ line = line[len("data: "):]
197
+ buffer += line.encode('utf-8')
198
+ try:
199
+ data = json.loads(buffer.decode('utf-8'))
200
+ buffer = b""
201
+ if 'candidates' in data and data['candidates']:
202
+ candidate = data['candidates'][0]
203
+ if 'content' in candidate:
204
+ content = candidate['content']
205
+ if 'parts' in content and content['parts']:
206
+ parts = content['parts']
207
+ text = ""
208
+ for part in parts:
209
+ if 'text' in part:
210
+ text += part['text']
211
+ if text:
212
+ yield text
213
+
214
+ if candidate.get("finishReason") and candidate.get("finishReason") != "STOP":
215
+ error_msg = f"模型的响应被截断: {candidate.get('finishReason')}"
216
+ extra_log_error = {'key': self.api_key[:8], 'request_type': 'stream', 'model': request.model, 'status_code': 'ERROR', 'error_message': error_msg}
217
+ log_msg = format_log_message('WARNING', error_msg, extra=extra_log_error)
218
+ logger.warning(log_msg)
219
+ raise ValueError(error_msg)
220
+
221
+ if 'safetyRatings' in candidate:
222
+ for rating in candidate['safetyRatings']:
223
+ if rating['probability'] == 'HIGH':
224
+ error_msg = f"模型的响应被截断: {rating['category']}"
225
+ extra_log_safety = {'key': self.api_key[:8], 'request_type': 'stream', 'model': request.model, 'status_code': 'ERROR', 'error_message': error_msg}
226
+ log_msg = format_log_message('WARNING', error_msg, extra=extra_log_safety)
227
+ logger.warning(log_msg)
228
+ raise ValueError(error_msg)
229
+ except json.JSONDecodeError:
230
+ continue
231
+ except Exception as e:
232
+ error_msg = f"流式处理期间发生错误: {str(e)}"
233
+ extra_log_stream_error = {'key': self.api_key[:8], 'request_type': 'stream', 'model': request.model, 'status_code': 'ERROR', 'error_message': error_msg}
234
+ log_msg = format_log_message('ERROR', error_msg, extra=extra_log_stream_error)
235
+ logger.error(log_msg)
236
+ raise e
237
+ except Exception as e:
238
+ raise e
239
+ finally:
240
+ log_msg = format_log_message('INFO', "流式请求结束", extra=extra_log)
241
+ logger.info(log_msg)
242
+
243
+ def complete_chat(self, request: ChatCompletionRequest, contents, safety_settings, system_instruction):
244
+ extra_log = {'key': self.api_key[:8], 'request_type': 'non-stream', 'model': request.model, 'status_code': 'N/A'}
245
+ log_msg = format_log_message('INFO', "非流式请求开始", extra=extra_log)
246
+ logger.info(log_msg)
247
+
248
+ api_version = "v1alpha" if "think" in request.model else "v1beta"
249
+ url = f"https://generativelanguage.googleapis.com/{api_version}/models/{request.model}:generateContent?key={self.api_key}"
250
+ headers = {
251
+ "Content-Type": "application/json",
252
+ }
253
+ data = {
254
+ "contents": contents,
255
+ "generationConfig": {
256
+ "temperature": request.temperature,
257
+ "maxOutputTokens": request.max_tokens,
258
+ },
259
+ "safetySettings": safety_settings,
260
+ }
261
+ if system_instruction:
262
+ data["system_instruction"] = system_instruction
263
+
264
+ try:
265
+ response = requests.post(url, headers=headers, json=data)
266
+ response.raise_for_status()
267
+
268
+ log_msg = format_log_message('INFO', "非流式请求成功完成", extra=extra_log)
269
+ logger.info(log_msg)
270
+
271
+ return ResponseWrapper(response.json())
272
+ except Exception as e:
273
+ raise
274
+
275
+ def convert_messages(self, messages, use_system_prompt=False):
276
+ gemini_history = []
277
+ errors = []
278
+ system_instruction_text = ""
279
+ is_system_phase = use_system_prompt
280
+ for i, message in enumerate(messages):
281
+ role = message.role
282
+ content = message.content
283
+ if isinstance(content, str):
284
+ if is_system_phase and role == 'system':
285
+ if system_instruction_text:
286
+ system_instruction_text += "\n" + content
287
+ else:
288
+ system_instruction_text = content
289
+ else:
290
+ is_system_phase = False
291
+
292
+ if role in ['user', 'system']:
293
+ role_to_use = 'user'
294
+ elif role == 'assistant':
295
+ role_to_use = 'model'
296
+ else:
297
+ errors.append(f"Invalid role: {role}")
298
+ continue
299
+
300
+ if gemini_history and gemini_history[-1]['role'] == role_to_use:
301
+ gemini_history[-1]['parts'].append({"text": content})
302
+ else:
303
+ gemini_history.append(
304
+ {"role": role_to_use, "parts": [{"text": content}]})
305
+ elif isinstance(content, list):
306
+ parts = []
307
+ for item in content:
308
+ if item.get('type') == 'text':
309
+ parts.append({"text": item.get('text')})
310
+ elif item.get('type') == 'image_url':
311
+ image_data = item.get('image_url', {}).get('url', '')
312
+ if image_data.startswith('data:image/'):
313
+ try:
314
+ mime_type, base64_data = image_data.split(';')[0].split(':')[1], image_data.split(',')[1]
315
+ parts.append({
316
+ "inline_data": {
317
+ "mime_type": mime_type,
318
+ "data": base64_data
319
+ }
320
+ })
321
+ except (IndexError, ValueError):
322
+ errors.append(
323
+ f"Invalid data URI for image: {image_data}")
324
+ else:
325
+ errors.append(
326
+ f"Invalid image URL format for item: {item}")
327
+
328
+ if parts:
329
+ if role in ['user', 'system']:
330
+ role_to_use = 'user'
331
+ elif role == 'assistant':
332
+ role_to_use = 'model'
333
+ else:
334
+ errors.append(f"Invalid role: {role}")
335
+ continue
336
+ if gemini_history and gemini_history[-1]['role'] == role_to_use:
337
+ gemini_history[-1]['parts'].extend(parts)
338
+ else:
339
+ gemini_history.append(
340
+ {"role": role_to_use, "parts": parts})
341
+ if errors:
342
+ return errors
343
+ else:
344
+ if RANDOM_STRING:
345
+ gemini_history.insert(1,{'role': 'user', 'parts': [{'text': generate_secure_random_string(RANDOM_STRING_LENGTH)}]})
346
+ gemini_history.insert(len(gemini_history)-1,{'role': 'user', 'parts': [{'text': generate_secure_random_string(RANDOM_STRING_LENGTH)}]})
347
+ log_msg = format_log_message('INFO', "伪装消息成功")
348
+ logger.info(log_msg)
349
+ return gemini_history, {"parts": [{"text": system_instruction_text}]}
350
+
351
+ @staticmethod
352
+ async def list_available_models(api_key) -> list:
353
+ url = "https://generativelanguage.googleapis.com/v1beta/models?key={}".format(
354
+ api_key)
355
+ async with httpx.AsyncClient() as client:
356
+ response = await client.get(url)
357
+ response.raise_for_status()
358
+ data = response.json()
359
+ models = [model["name"] for model in data.get("models", [])]
360
+ models.extend(GeminiClient.EXTRA_MODELS)
361
  return models