File size: 40,355 Bytes
e82bac2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 | # -*- coding: utf-8 -*-
"""
主请求处理程序,包含 process_request 和 _attempt_api_call 逻辑。
此模块负责协调处理来自 API 端点的请求,包括:
- 解析请求数据
- 加载和处理上下文
- 选择合适的 API Key
- 调用 Gemini API (流式或非流式)
- 处理 API 响应和错误
- 更新速率限制和 Token 计数
- 创建缓存条目
- 保存上下文
"""
import asyncio # 异步 IO 库
import json # JSON 处理库
import logging # 日志库
import time # 时间库
import pytz # 时区库
import uuid # UUID 生成库
from datetime import datetime # 日期时间库
import random # 随机数库 (可能在 Key 选择策略中使用)
import hashlib # 哈希库 (可能在缓存中使用)
from typing import Literal, List, Tuple, Dict, Any, Optional, Union # 类型提示
from fastapi import HTTPException, Request, status, Depends # FastAPI 相关组件
from fastapi.responses import StreamingResponse # 流式响应
from collections import Counter, defaultdict # 集合库
import httpx # HTTP 客户端库
from sqlalchemy.orm import Session # SQLAlchemy 同步会话 (可能在某些依赖中使用)
from sqlalchemy.ext.asyncio import AsyncSession # SQLAlchemy 异步会话
from aiosqlite import Connection # aiosqlite 连接类型 (可能在某些依赖中使用)
# 导入模型定义
from app.api.models import ChatCompletionRequest, ChatCompletionResponse, ResponseMessage # API 请求/响应模型
# 导入核心模块的类和函数
from app.core.services.gemini import GeminiClient # Gemini API 客户端
from app.core.utils.response_wrapper import ResponseWrapper # 响应包装器
from app.core.context import store as context_store # 上下文存储模块
from app.core.database import utils as db_utils # 数据库工具模块
from app.core.context.converter import convert_messages # 消息格式转换器
from app.core.keys.manager import APIKeyManager # Key 管理器
from app.core.cache.manager import CacheManager # 缓存管理器
from app.core.processing.error_handler import _handle_api_call_exception # 统一的 API 调用异常处理器
from app.core.utils.request_helpers import get_client_ip, get_current_timestamps # 请求辅助函数 (移除 protect_from_abuse)
from app.core.security.rate_limit import protect_from_abuse # 从新路径导入
# 导入依赖注入函数
from app.core.dependencies import get_db_session, get_key_manager, get_cache_manager, get_http_client # FastAPI 依赖项
# 导入处理工具函数
from app.core.processing.utils import ( # 从 utils 模块导入
estimate_token_count, truncate_context, check_rate_limits_and_update_counts,
update_token_counts, save_context_after_success
)
# 导入流处理函数
from app.core.processing.stream_handler import generate_stream_response # 从 stream_handler 导入流生成器
# 导入配置
from app import config # 应用配置
from app.config import ( # 导入具体配置项
DISABLE_SAFETY_FILTERING, # 是否禁用安全过滤
MAX_REQUESTS_PER_MINUTE, # 每分钟最大请求数 (可能用于 IP 限制)
ENABLE_NATIVE_CACHING, # 是否启用原生缓存
ENABLE_STICKY_SESSION, # 是否启用粘性会话 (Key 选择)
STREAM_SAVE_REPLY, # 是否在流式响应中保存回复 (可能已废弃)
MAX_REQUESTS_PER_DAY_PER_IP, # 每个 IP 每日最大请求数 (可能用于 IP 限制)
safety_settings, # 默认安全设置
safety_settings_g2 # Gemini 2.0 Flash 的安全设置
)
# 导入跟踪相关
from app.core.tracking import ( # 导入用于跟踪和限制的数据结构及锁
usage_data, usage_lock, RPM_WINDOW_SECONDS, TPM_WINDOW_SECONDS, # Key 使用数据和锁,时间窗口常量
ip_daily_input_token_counts, ip_input_token_counts_lock, # IP 每日 Token 计数和锁
increment_cache_hit_count, increment_cache_miss_count, add_tokens_saved, # 缓存统计函数
track_cache_hit, track_cache_miss # 缓存命中/未命中跟踪函数
)
# 导入日志格式化函数 (如果需要自定义格式)
# from app.handlers.log_config import format_log_message
logger = logging.getLogger('my_logger') # 获取日志记录器实例
# --- 核心 API 调用尝试逻辑 ---
async def _attempt_api_call(
chat_request: ChatCompletionRequest, # 聊天请求对象
contents: List[Dict[str, Any]], # 经过处理和可能截断后,要发送给 Gemini API 的内容列表
system_instruction: Optional[str], # 系统指令文本
current_api_key: str, # 本次尝试选定的 API Key
http_client: httpx.AsyncClient, # 共享的 HTTP 客户端实例
key_manager: APIKeyManager, # Key 管理器实例,用于错误处理时标记 Key
model_name: str, # 请求的模型名称
limits: Optional[Dict[str, Any]], # 该模型的速率限制配置
client_ip: str, # 客户端 IP 地址,用于日志和可能的关联
today_date_str_pt: str, # 当前太平洋时区日期字符串,用于 Token 计数
enable_native_caching: bool, # 是否启用原生缓存
cache_manager_instance: CacheManager, # 缓存管理器实例
request_id: Optional[str] = None, # 请求的唯一 ID,用于日志跟踪
# --- 缓存相关参数 ---
cached_content_id_to_use: Optional[str] = None, # 如果缓存命中,传递缓存内容的 ID 给 Gemini API
content_to_cache_on_success: Optional[Dict[str, Any]] = None, # 如果缓存未命中,成功后需要缓存的原始内容
user_id: Optional[str] = None, # 用户 ID,用于缓存创建和用户关联
db: AsyncSession = None # 数据库会话 (异步)
) -> Tuple[Optional[Union[StreamingResponse, ChatCompletionResponse]], Optional[Dict[str, Any]], bool]:
"""
尝试使用给定的 API Key 和内容进行一次对 Gemini API 的调用。
此函数封装了单次 API 调用的逻辑,包括流式和非流式处理、
原生缓存的创建(如果需要且调用成功)、以及基本的成功/失败判断。
Args:
(参数说明见上方的类型提示)
Returns:
Tuple[Optional[Union[StreamingResponse, ChatCompletionResponse]], Optional[Dict[str, Any]], bool]:
一个元组包含:
- 第一个元素:如果调用成功,返回 FastAPI 的响应对象 (StreamingResponse 或 ChatCompletionResponse);否则为 None。
- 第二个元素:如果调用失败,返回包含错误信息的字典;否则为 None。
- 第三个元素:一个布尔值,指示调用失败后是否需要外部重试循环尝试使用其他 Key (True 表示需要重试,False 表示不需要)。
"""
response = None # 初始化响应对象
error_info = None # 初始化错误信息字典
needs_retry = False # 初始化重试标志,默认为 False
try:
# --- 确定安全设置 ---
# 根据全局配置或模型名称选择合适的安全设置
# TODO: 确认 'gemini-2.0-flash-exp' 的检查逻辑是否仍然需要或准确
current_safety_settings = safety_settings_g2 if config.DISABLE_SAFETY_FILTERING or 'gemini-2.0-flash-exp' in chat_request.model else safety_settings
# --- 初始化 Gemini 客户端 ---
# 每次尝试都创建一个新的客户端实例可能不是最高效的,但可以确保使用正确的 Key
# 考虑是否可以在外部创建并传递,或者优化 GeminiClient 使其能切换 Key
gemini_client_instance = GeminiClient(current_api_key, http_client) # 使用当前 Key 和共享的 http_client 初始化
# --- 判断是否为流式请求 ---
is_stream = chat_request.stream # 从请求对象中获取 stream 标志
if is_stream:
# --- 处理流式请求 ---
response_id = f"chatcmpl-{int(time.time() * 1000)}" # 为本次流式响应生成一个唯一的 ID
# 调用位于 stream_handler.py 中的异步生成器函数
response = StreamingResponse(generate_stream_response(
# 传递所有需要的参数给流生成器
gemini_client_instance=gemini_client_instance,
chat_request=chat_request,
contents=contents, # 传递已处理和截断的内容
safety_settings=current_safety_settings,
system_instruction=system_instruction,
cached_content_id=cached_content_id_to_use, # 传递缓存 ID (如果命中)
response_id=response_id, # 传递生成的响应 ID
enable_native_caching=enable_native_caching,
cache_manager_instance=cache_manager_instance,
content_to_cache_on_success=content_to_cache_on_success, # 传递待缓存内容 (如果未命中)
db_for_cache=db, # 传递数据库会话
user_id_for_mapping=user_id, # 传递用户 ID
key_manager=key_manager, # 传递 Key 管理器
selected_key=current_api_key, # 传递当前使用的 Key
model_name=model_name, # 传递模型名称
limits=limits, # 传递模型限制
client_ip=client_ip, # 传递客户端 IP
today_date_str_pt=today_date_str_pt, # 传递日期字符串
), media_type="text/event-stream") # 设置响应媒体类型为 SSE
logger.info(f"流式响应已启动 (Key: {current_api_key[:8]}, ID: {response_id})") # 记录流启动日志
# 流式请求一旦成功启动(StreamingResponse 对象创建成功),就认为本次尝试成功,不需要重试。
# 流内部的错误由 generate_stream_response 自行处理并发送给客户端。
return response, None, False # 返回响应对象,无错误信息,不需重试
else:
# --- 处理非流式请求 ---
# 调用 Gemini 客户端的非流式聊天方法
# 注意:缓存命中/未命中的跟踪已移至 process_request 函数的缓存查找逻辑部分
response_obj = await gemini_client_instance.complete_chat(
request=chat_request, # 传递原始请求
contents=contents, # 传递处理和截断后的内容
safety_settings=current_safety_settings, # 传递安全设置
system_instruction=system_instruction, # 传递系统指令
cached_content_id=cached_content_id_to_use # 传递缓存 ID (如果命中)
)
# 假设 complete_chat 返回的是 ResponseWrapper 实例或兼容的对象
if isinstance(response_obj, ResponseWrapper):
# 将 Gemini 的响应格式化为 OpenAI 的 ChatCompletionResponse 格式
response = ChatCompletionResponse(
id=f"chatcmpl-{int(time.time() * 1000)}", # 生成响应 ID
object="chat.completion", # 固定值
created=int(time.time()), # 创建时间戳
model=chat_request.model, # 使用请求中的模型名称
choices=[{ # choices 列表
"index": 0, # 索引
"message": ResponseMessage( # 消息体
role="assistant", # 角色为助手
content=response_obj.text, # 模型回复文本
tool_calls=response_obj.tool_calls # 模型返回的工具调用 (如果支持)
),
"finish_reason": response_obj.finish_reason # 完成原因
}],
usage={ # 使用量信息
"prompt_tokens": response_obj.prompt_token_count or 0, # 输入 Token 数
"completion_tokens": response_obj.candidates_token_count or 0, # 输出 Token 数
"total_tokens": response_obj.total_token_count or 0 # 总 Token 数
}
)
else:
# 如果 complete_chat 返回了非预期的类型,记录错误并抛出异常
logger.error(f"complete_chat 返回了意外的类型: {type(response_obj)}") # 记录错误日志
raise TypeError("API 调用返回了非预期的响应类型") # 抛出类型错误
# --- 非流式请求成功后的处理 ---
# 1. 更新 Key 的最后使用时间戳
with usage_lock: # 使用锁保证线程安全
# 确保 usage_data 中存在对应的 Key 和模型条目
key_usage = usage_data.setdefault(current_api_key, defaultdict(lambda: defaultdict(int)))[model_name]
key_usage['last_used_timestamp'] = time.time() # 更新时间戳
logger.debug(f"非流式请求成功,更新 Key {current_api_key[:8]}... ({model_name}) 的 last_used_timestamp") # 记录日志
# 2. 更新 Token 计数
if response.usage: # 确保响应中包含 usage 信息
prompt_tokens = response.usage.prompt_tokens # 获取输入 Token 数
# 调用位于 utils.py 中的函数更新计数 (注意:该函数目前是占位符)
await update_token_counts(current_api_key, model_name, limits, prompt_tokens, client_ip, today_date_str_pt)
logger.debug(f"非流式请求成功,更新 Key {current_api_key[:8]}... ({model_name}) 的 Token 计数 (占位符)。") # 记录日志
else:
# 如果响应中没有 usage 信息,记录警告
logger.warning(f"非流式响应成功但未找到 usage metadata (Key: {current_api_key[:8]}...). Token counts not updated.") # 记录警告
# 3. 创建缓存 (如果启用了原生缓存、是缓存未命中且调用成功)
if enable_native_caching and content_to_cache_on_success:
logger.debug(f"非流式请求成功且是缓存未命中,尝试创建新缓存 (Key: {current_api_key[:8]}...)") # 记录日志
try:
# 确保数据库会话和用户 ID 有效
if db and user_id is not None:
# 获取当前 Key 在数据库中的 ID
api_key_id = await key_manager.get_key_id(current_api_key)
if api_key_id is not None: # 确保成功获取到 ID
# 调用缓存管理器的 create_cache 方法
new_cache_id = await cache_manager_instance.create_cache(
db=db, # 传递数据库会话
user_id=user_id, # 传递用户 ID
api_key_id=api_key_id, # 传递 Key 的数据库 ID
content=content_to_cache_on_success, # 传递要缓存的原始内容
ttl=3600 # 设置缓存有效期(例如 1 小时)
)
if new_cache_id: # 如果成功创建缓存
logger.info(f"新缓存创建成功: {new_cache_id} (Key: {current_api_key[:8]}...)") # 记录成功日志
# TODO: 实现 Key 与缓存的关联更新逻辑 (在 key_manager 中)
else: # 如果创建失败
logger.warning(f"创建新缓存失败 (Key: {current_api_key[:8]}...)") # 记录失败警告
else: # 如果无法获取 Key ID
logger.warning(f"无法获取 Key {current_api_key[:8]}... 的 ID,跳过缓存创建。") # 记录警告
else: # 如果数据库会话或用户 ID 无效
logger.warning(f"db session ({'有效' if db else '无效'}) 或 user_id ({user_id if user_id is not None else '无效'}) 无效,跳过缓存创建。") # 记录警告
except Exception as cache_create_err:
# 捕获并记录缓存创建过程中可能发生的异常
logger.error(f"创建缓存时发生异常 (Key: {current_api_key[:8]}...): {cache_create_err}", exc_info=True) # 记录错误
# 4. 传统上下文保存逻辑已移至 process_request 函数末尾
# 非流式请求成功,返回响应对象,无错误信息,不需重试
return response, None, False
except Exception as api_exc:
# --- 处理 API 调用过程中发生的任何异常 ---
# 使用统一的异常处理函数 _handle_api_call_exception (位于 error_handler.py)
# 该函数会格式化错误信息、记录日志、可能标记 Key 状态,并返回是否需要重试
error_info, needs_retry_from_exception = await _handle_api_call_exception(
exc=api_exc,
current_api_key=current_api_key,
key_manager=key_manager,
is_stream=is_stream,
request_id=request_id
)
# 返回 None 表示没有成功响应,同时返回错误信息和重试标志
return None, error_info, needs_retry_from_exception
# --- 主请求处理函数 ---
async def process_request(
chat_request: ChatCompletionRequest, # 经过 Pydantic 验证的请求体对象
http_request: Request, # FastAPI 的原始请求对象
request_type: Literal['stream', 'non-stream'], # 请求类型 ('stream' 或 'non-stream')
auth_data: Dict[str, Any], # 经过认证中间件处理后的数据,包含 'key' 和 'config'
# --- 依赖注入 ---
key_manager: APIKeyManager = Depends(get_key_manager), # 注入 Key 管理器实例
http_client: httpx.AsyncClient = Depends(get_http_client), # 注入共享的 HTTP 客户端实例
cache_manager_instance: CacheManager = Depends(get_cache_manager), # 注入缓存管理器实例
db: AsyncSession = Depends(get_db_session) # 注入异步数据库会话
):
"""
处理来自 API 端点的聊天补全请求的核心逻辑。
负责:上下文加载、消息转换、缓存查找、Key 选择、API 调用尝试与重试、
结果处理、Token 计数更新、上下文保存等。
Args:
chat_request (ChatCompletionRequest): 请求体数据。
http_request (Request): FastAPI 请求对象。
request_type (Literal['stream', 'non-stream']): 请求类型。
auth_data (Dict[str, Any]): 认证数据,包含 'key' (代理 Key) 和 'config' (Key 特定配置)。
key_manager (APIKeyManager): 依赖注入的 Key 管理器。
http_client (httpx.AsyncClient): 依赖注入的 HTTP 客户端。
cache_manager_instance (CacheManager): 依赖注入的缓存管理器。
db (AsyncSession): 依赖注入的数据库会话。
Returns:
Union[StreamingResponse, ChatCompletionResponse]: 成功时返回 FastAPI 响应对象。
Raises:
HTTPException: 在处理过程中发生错误时抛出,例如无可用 Key、API 错误、内部错误等。
"""
# --- 初始化和信息提取 ---
proxy_key = auth_data.get("key") # 获取用于认证的代理 Key (可能是 user_id 或实际 Key)
key_config = auth_data.get("config", {}) # 获取该 Key 的特定配置
model_name = chat_request.model # 获取请求的模型名称
client_ip = get_client_ip(http_request) # 获取客户端 IP 地址
_, today_date_str_pt = get_current_timestamps() # 获取太平洋时区的当前日期字符串
request_id = f"req_{uuid.uuid4().hex[:8]}" # 为本次请求生成一个唯一的内部 ID,用于日志跟踪
logger.info(f"开始处理请求 {request_id} (类型: {request_type}, 模型: {model_name}, Key: {proxy_key[:8]}...)") # 记录请求开始日志
# --- 初始 IP 速率限制检查 ---
# 在处理 Key 之前,先检查来源 IP 是否触发了滥用限制
try:
await protect_from_abuse( # 改为异步调用
http_request,
config.MAX_REQUESTS_PER_MINUTE,
config.MAX_REQUESTS_PER_DAY_PER_IP
)
logger.debug(f"请求 {request_id}: IP {client_ip} 通过滥用检查。")
except HTTPException as ip_limit_exc:
# 如果 IP 达到限制,直接抛出异常,终止请求处理
logger.warning(f"请求 {request_id}: IP {client_ip} 未通过滥用检查: {ip_limit_exc.detail}")
raise ip_limit_exc
# --- 模型名称规范化和验证 ---
normalized_model_name = model_name.lower() # 规范化为小写
# 从已加载的配置中获取支持的模型列表
# config.MODEL_LIMITS 是在应用启动时从 model_limits.json 加载的
supported_models_keys = config.MODEL_LIMITS.keys()
original_model_name_for_error = model_name # 保存原始模型名称用于可能的错误消息
if normalized_model_name not in supported_models_keys:
# 如果小写名称不在支持的键中,再尝试不区分大小写的查找
# 这可以处理用户输入 "Gemini-Pro" 而配置文件是 "gemini-pro" 的情况
found_case_insensitive = False
for m_key in supported_models_keys:
if m_key.lower() == normalized_model_name:
logger.info(f"请求 {request_id}: 模型名称 '{original_model_name_for_error}' 通过大小写不敏感匹配规范化为 '{m_key}'。")
model_name = m_key # 使用配置文件中的规范名称
normalized_model_name = m_key # 更新 normalized_model_name 以确保后续逻辑一致
found_case_insensitive = True
break
if not found_case_insensitive:
logger.error(f"请求 {request_id}: 不支持的模型名称 '{original_model_name_for_error}' (规范化尝试: '{normalized_model_name}')。支持的模型: {list(supported_models_keys)}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"不支持的模型: '{original_model_name_for_error}'. 支持的模型包括: {', '.join(supported_models_keys)}."
)
else:
# 如果小写名称直接在支持的键中,确保 model_name 使用的是该键 (处理原始大小写不同的情况)
if model_name != normalized_model_name:
logger.info(f"请求 {request_id}: 模型名称 '{original_model_name_for_error}' 规范化为 '{normalized_model_name}'。")
model_name = normalized_model_name
else:
logger.info(f"请求 {request_id}: 模型名称 '{model_name}' 有效且大小写规范。")
# --- 获取模型限制 ---
# 此时 model_name 应该是 config.MODEL_LIMITS 中的一个有效键
limits = config.MODEL_LIMITS.get(model_name)
if not limits:
# 此处理论上不应发生,因为 model_name 已经过验证和规范化
logger.critical(f"请求 {request_id}: 严重错误!模型 '{model_name}' 通过了名称校验,但在 MODEL_LIMITS 中未找到其限制配置。原始请求模型: '{original_model_name_for_error}'")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"模型 '{model_name}' 的内部配置错误。请联系管理员。")
# --- 初始 IP 速率限制检查 (可选) ---
# 如果需要基于 IP 进行全局速率限制,可以在这里调用 protect_from_abuse
# protect_from_abuse(http_request, config.MAX_REQUESTS_PER_MINUTE, config.MAX_REQUESTS_PER_DAY_PER_IP)
# --- 确定上下文和缓存策略 ---
enable_native_caching = config.ENABLE_NATIVE_CACHING # 获取全局原生缓存配置
# Key 特定配置优先于全局配置来决定是否启用传统上下文
enable_context = key_config.get('enable_context_completion', config.ENABLE_CONTEXT_COMPLETION)
if enable_native_caching: # 如果启用了原生缓存
enable_context = False # 则强制禁用传统上下文补全,避免冲突和冗余
logger.info(f"请求 {request_id}: 原生缓存已启用,传统上下文补全已禁用。") # 记录信息
# --- 加载传统上下文 (如果启用) ---
initial_contents = [] # 初始化上下文内容列表
if enable_context and chat_request.user_id: # 仅在启用传统上下文且请求中包含 user_id 时加载
try:
# 调用 context_store 加载上下文
initial_contents = await context_store.load_context(chat_request.user_id) or [] # 加载并确保是列表
logger.debug(f"请求 {request_id}: 传统上下文已启用 (用户: {chat_request.user_id}), 加载了 {len(initial_contents)} 条历史。") # 记录加载日志
except Exception as context_load_err:
# 记录加载上下文失败的错误,但不中断请求处理
logger.error(f"请求 {request_id}: 加载上下文失败 (用户: {chat_request.user_id}): {context_load_err}", exc_info=True)
initial_contents = [] # 加载失败时重置为空列表
elif enable_context and not chat_request.user_id:
# 如果启用了上下文但请求中没有 user_id,记录警告
logger.warning(f"请求 {request_id}: 传统上下文已启用但未提供用户 ID,跳过上下文加载。")
else:
# 如果未启用传统上下文,记录调试信息
logger.debug(f"请求 {request_id}: 传统上下文已禁用或未提供用户 ID,跳过上下文加载。")
# --- 转换用户消息格式 ---
try:
# 调用 convert_messages 将 OpenAI 格式的消息列表转换为 Gemini 格式
conversion_result = convert_messages(chat_request.messages)
# convert_messages 在失败时可能返回错误详情列表,需要检查
if isinstance(conversion_result, list): # 如果返回的是列表,说明包含错误信息
error_detail = "; ".join(conversion_result) # 将错误信息拼接成字符串
logger.error(f"请求 {request_id}: 转换用户消息失败: {error_detail}") # 记录错误日志
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"消息格式错误: {error_detail}") # 返回 400 错误
# 转换成功,解包结果
gemini_contents, system_instruction_dict = conversion_result # 获取转换后的内容和系统指令字典
# 从系统指令字典中提取文本内容
system_instruction_text = system_instruction_dict.get("parts", [{}])[0].get("text") if system_instruction_dict else None
except Exception as e:
# 捕获转换过程中可能发生的其他异常
logger.error(f"请求 {request_id}: 转换消息时发生意外错误: {e}", exc_info=True) # 记录错误日志
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="处理消息时出错。") # 返回 400 错误
# --- 原生缓存查找逻辑 ---
cached_content_id_to_use = None # 初始化要使用的缓存 ID 为 None
content_to_cache_on_success = None # 初始化成功后要缓存的内容为 None
if enable_native_caching and chat_request.user_id: # 仅在启用原生缓存且有 user_id 时执行
try:
# 调用缓存管理器的 find_cache 方法查找缓存
cached_content_id_to_use = await cache_manager_instance.find_cache(
db=db, # 传递数据库会话
user_id=chat_request.user_id, # 传递用户 ID
messages=chat_request.messages # 传递原始 OpenAI 格式消息用于哈希计算
)
if cached_content_id_to_use: # 如果找到了缓存 (缓存命中)
logger.info(f"请求 {request_id}: 缓存命中 (用户: {chat_request.user_id}, 缓存 ID: {cached_content_id_to_use})") # 记录命中日志
# 调用跟踪函数记录缓存命中,并估算节省的 Token 数
track_cache_hit(request_id, cached_content_id_to_use, await estimate_token_count(initial_contents + gemini_contents)) # 注意 estimate_token_count 现在是异步的
else: # 如果未找到缓存 (缓存未命中)
# 准备在 API 调用成功后用于创建缓存的内容 (包含原始消息和模型)
content_to_cache_on_success = {"messages": [msg.model_dump() for msg in chat_request.messages], "model": chat_request.model}
logger.debug(f"请求 {request_id}: 缓存未命中 (用户: {chat_request.user_id}), 将在成功后创建缓存。") # 记录未命中日志
# 调用跟踪函数记录缓存未命中,使用内部哈希计算方法获取哈希值
track_cache_miss(request_id, cache_manager_instance._calculate_hash(content_to_cache_on_success))
except Exception as cache_find_err:
# 记录查找缓存时发生的异常,但不中断请求
logger.error(f"请求 {request_id}: 查找缓存时发生异常 (用户: {chat_request.user_id}): {cache_find_err}", exc_info=True)
elif enable_native_caching and not chat_request.user_id:
# 如果启用了缓存但没有 user_id,记录警告
logger.warning(f"请求 {request_id}: 原生缓存已启用但未提供 user_id,无法进行缓存查找或创建。")
# --- Key 选择与 API 调用重试循环 ---
max_attempts = key_manager.get_active_keys_count() + 1 # 最大尝试次数设置为活跃 Key 数量 + 1,确保每个 Key 至少有机会尝试一次
attempt_count = 0 # 初始化尝试计数器
last_error_info = None # 存储最后一次尝试的错误信息
selected_key = None # 存储当前循环选定的 Key
# 重置 KeyManager 中为本次请求记录的已尝试 Key 集合
key_manager.tried_keys_for_request.clear()
logger.debug(f"请求 {request_id}: 重置已尝试 Key 列表。") # 记录日志
# 开始循环尝试调用 API
while attempt_count < max_attempts:
attempt_count += 1 # 增加尝试次数
logger.info(f"请求 {request_id}: 尝试 API 调用 (尝试 {attempt_count}/{max_attempts})") # 记录尝试日志
# --- 选择最佳 API Key ---
# 合并初始上下文和当前请求内容,用于估算 Token 数以辅助 Key 选择
merged_contents_for_estimation = initial_contents + gemini_contents
# 估算输入 Token 数 (注意:estimate_token_count 现在是异步的)
estimated_input_tokens = await estimate_token_count(merged_contents_for_estimation)
logger.debug(f"请求 {request_id}: 估算本次请求输入 Token 数: {estimated_input_tokens}") # 记录估算结果
# 调用 Key 管理器的 select_best_key 方法选择 Key
# 此方法会考虑 Key 的状态、限制、负载、粘性会话、缓存关联等因素
selected_key, available_input_tokens = await key_manager.select_best_key(
model_name=model_name, # 传递模型名称
model_limits=limits, # 传递模型限制
estimated_input_tokens=estimated_input_tokens, # 传递估算的 Token 数
user_id=chat_request.user_id, # 传递用户 ID (用于粘性会话和关联)
enable_sticky_session=config.ENABLE_STICKY_SESSION, # 传递粘性会话配置
request_id=request_id, # 传递请求 ID 用于日志
cached_content_id=cached_content_id_to_use, # 传递缓存 ID (用于缓存关联选择)
db=db # 传递数据库会话
)
if selected_key: # 如果成功选定了一个 Key
logger.info(f"请求 {request_id}: 第 {attempt_count} 次尝试,选定 Key: {selected_key[:8]}...") # 记录选定的 Key (部分隐藏)
# --- 合并内容并进行动态截断 ---
# 准备发送给 API 的完整内容(包含历史和当前消息)
merged_contents_for_api = initial_contents + gemini_contents
# 使用 select_best_key 返回的该 Key 当前可用的输入 Token 数作为动态截断限制
dynamic_limit_for_truncation = available_input_tokens
logger.debug(f"请求 {request_id}: 选定 Key {selected_key[:8]}... 剩余可用输入 Token (用于动态截断): {available_input_tokens}") # 记录可用 Token
# 调用 truncate_context 进行动态截断 (注意:truncate_context 现在是异步的)
truncated_contents_for_api, context_over_limit_after_truncation = await truncate_context(
contents=merged_contents_for_api, # 传递合并后的内容
model_name=model_name, # 传递模型名称
dynamic_max_tokens_limit=dynamic_limit_for_truncation # 传递动态限制
)
# 检查截断后是否仍然超限
if context_over_limit_after_truncation:
# 如果截断后仍然超限,记录错误,标记此 Key 为已尝试,并跳过此 Key 进行下一次循环
logger.error(f"请求 {request_id}: 动态截断后上下文仍然超限 ({await estimate_token_count(truncated_contents_for_api)} tokens)。跳过此 Key。") # 记录错误
key_manager.record_selection_reason(selected_key, "Context Over Limit After Dynamic Truncation", request_id) # 记录跳过原因
key_manager.tried_keys_for_request.add(selected_key) # 添加到已尝试集合
continue # 继续下一次循环
# --- 尝试调用 API ---
# 调用内部函数 _attempt_api_call 执行实际的 API 请求
response, error_info, needs_retry = await _attempt_api_call(
chat_request=chat_request, # 传递请求对象
contents=truncated_contents_for_api, # 传递截断后的内容
system_instruction=system_instruction_text, # 传递提取的系统指令文本
current_api_key=selected_key, # 传递选定的 Key
http_client=http_client, # 传递 HTTP 客户端
key_manager=key_manager, # 传递 Key 管理器
model_name=model_name, # 传递模型名称
limits=limits, # 传递模型限制
client_ip=client_ip, # 传递客户端 IP
today_date_str_pt=today_date_str_pt, # 传递日期字符串
enable_native_caching=enable_native_caching, # 传递缓存启用标志
cache_manager_instance=cache_manager_instance, # 传递缓存管理器
request_id=request_id, # 传递请求 ID
cached_content_id_to_use=cached_content_id_to_use, # 传递缓存 ID
content_to_cache_on_success=content_to_cache_on_success, # 传递待缓存内容
user_id=chat_request.user_id, # 传递用户 ID
db=db # 传递数据库会话
)
# --- 处理 API 调用结果 ---
if response: # 如果成功获取到响应对象
logger.info(f"请求 {request_id}: API 调用成功 (Key: {selected_key[:8]}..., 尝试 {attempt_count})") # 记录成功日志
# 更新用户与 Key 的关联(仅在数据库模式下且有用户 ID 时)
if chat_request.user_id and config.KEY_STORAGE_MODE == 'database':
try:
await key_manager.update_user_key_association(db, chat_request.user_id, selected_key) # 更新关联
logger.debug(f"请求 {request_id}: 更新用户 {chat_request.user_id} 与 Key {selected_key[:8]}... 的关联。") # 记录日志
except Exception as assoc_err:
logger.error(f"请求 {request_id}: 更新用户 Key 关联失败: {assoc_err}", exc_info=True) # 记录错误
# --- 非流式请求的上下文保存 ---
# 仅在非流式、未启用原生缓存、启用了传统上下文且有用户 ID 时执行
if request_type == 'non-stream' and not enable_native_caching and enable_context and chat_request.user_id:
if isinstance(response, ChatCompletionResponse): # 确保响应类型正确
# 提取模型回复内容
model_reply_content = response.choices[0].message.content if response.choices and response.choices[0].message else ""
if model_reply_content: # 确保回复内容不为空
# 调用位于 utils.py 的函数保存上下文
await save_context_after_success(
proxy_key=chat_request.user_id, # 使用 user_id 作为上下文的 Key
contents_to_send=merged_contents_for_api, # 传递合并后的原始内容(包含历史)
model_reply_content=model_reply_content, # 传递模型回复
model_name=model_name, # 传递模型名称
enable_context=True, # 确认启用上下文
final_tool_calls=response.choices[0].message.tool_calls if response.choices and response.choices[0].message else None # 传递工具调用信息
)
else:
# 如果回复内容为空,记录警告
logger.warning(f"请求 {request_id}: 非流式响应成功但回复内容为空,跳过上下文保存。")
else:
# 如果响应类型不匹配,记录警告
logger.warning(f"请求 {request_id}: 非流式响应类型异常 ({type(response)}),跳过上下文保存。")
# --- 结束上下文保存 ---
return response # 返回成功的响应对象,结束处理流程
elif needs_retry: # 如果 API 调用失败但指示需要重试 (例如 5xx 错误, Key 配额耗尽)
logger.warning(f"请求 {request_id}: API 调用失败,需要重试 (Key: {selected_key[:8]}..., 尝试 {attempt_count}). 错误: {error_info.get('message', '未知错误')}") # 记录重试日志
last_error_info = error_info # 保存错误信息
key_manager.tried_keys_for_request.add(selected_key) # 将此 Key 加入已尝试集合
continue # 继续下一次循环,尝试选择其他 Key
else: # 如果 API 调用失败且不需要重试 (例如 4xx 客户端错误, 无效 Key)
logger.error(f"请求 {request_id}: API 调用失败,无需重试 (Key: {selected_key[:8]}..., 尝试 {attempt_count}). 错误: {error_info.get('message', '未知错误')}") # 记录失败日志
last_error_info = error_info # 保存错误信息
# 注意:这里不需要将 Key 加入 tried_keys_for_request,因为 Key 本身可能有问题,不应再试
break # 退出重试循环,将向客户端返回错误
else: # 如果 key_manager.select_best_key 未能选择到可用的 Key
logger.warning(f"请求 {request_id}: 第 {attempt_count} 次尝试未找到可用 Key。") # 记录警告
# 设置默认错误信息,表明所有 Key 都不可用或已尝试过
last_error_info = {"message": "所有可用 API Key 均尝试失败或达到限制。", "type": "key_error", "code": status.HTTP_503_SERVICE_UNAVAILABLE}
# 可以选择在这里稍微等待一下,给 Key 状态恢复或限制重置留出时间
await asyncio.sleep(0.5) # 短暂等待 0.5 秒
# --- 循环结束仍未成功 ---
# 如果循环正常结束(尝试次数耗尽)或者因为无需重试的错误而 break,执行以下逻辑
logger.error(f"请求 {request_id}: 所有 API 调用尝试均失败。") # 记录最终失败日志
# 构造最终的错误详情和状态码
error_detail = last_error_info.get("message", "所有尝试均失败,无法处理请求。") if last_error_info else "所有尝试均失败,无法处理请求。"
status_code = last_error_info.get("code", status.HTTP_503_SERVICE_UNAVAILABLE) if last_error_info else status.HTTP_503_SERVICE_UNAVAILABLE
# 抛出 HTTPException,将错误返回给客户端
raise HTTPException(status_code=status_code, detail=error_detail)
|