File size: 13,990 Bytes
e82bac2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 | # -*- coding: utf-8 -*-
"""
API 请求处理中的错误处理辅助函数。
"""
import httpx # 导入 HTTP 客户端库,用于处理 HTTP 异常
import json # 导入 JSON 库,用于解析错误响应体
import logging # 导入日志库
from typing import Optional, Dict, Any, Tuple # 导入类型提示
# 导入 APIKeyManager 类,用于标记 Key 状态
from app.core.keys.manager import APIKeyManager # (新路径)
# 获取日志记录器实例
logger = logging.getLogger("my_logger")
# --- 错误处理辅助函数 ---
def _format_api_error(http_err: httpx.HTTPStatusError) -> Tuple[str, int]:
"""
(内部辅助函数) 根据 httpx.HTTPStatusError 格式化 API 错误消息和状态码。
尝试从响应体中解析更具体的错误信息。
Args:
http_err (httpx.HTTPStatusError): httpx 抛出的 HTTP 状态错误异常。
Returns:
Tuple[str, int]: 包含错误消息字符串和 HTTP 状态码的元组。
"""
status_code = http_err.response.status_code # 获取 HTTP 状态码
error_message = f"API 请求失败,状态码: {status_code}。" # 默认错误消息
try:
# 尝试解析响应体为 JSON
error_detail = http_err.response.json()
# 尝试提取 Google API 标准错误格式中的 message 字段
if isinstance(error_detail, dict) and "error" in error_detail and "message" in error_detail["error"]:
error_message = f"API 错误 (状态码 {status_code}): {error_detail['error']['message']}"
else:
# 如果不是标准格式,使用原始响应体作为错误信息的一部分
error_message = f"API 请求失败,状态码: {status_code}。响应体: {http_err.response.text[:200]}" # 限制响应体长度
except json.JSONDecodeError:
# 如果响应体不是有效的 JSON,使用原始响应体文本
error_message = f"API 请求失败,状态码: {status_code}。响应体: {http_err.response.text[:200]}" # 限制响应体长度
except Exception as e:
# 捕获解析过程中其他可能的错误
logger.error(f"解析 API 错误响应体时发生意外错误: {e}", exc_info=True)
error_message = f"API 请求失败,状态码: {status_code},且无法解析错误详情。"
return error_message, status_code
def _handle_429_daily_quota(http_error: httpx.HTTPStatusError, api_key: Optional[str], key_manager: APIKeyManager) -> bool:
"""
(内部辅助函数) 处理 HTTP 429 错误,专门检查是否为每日配额耗尽。
如果是每日配额耗尽,则调用 Key 管理器标记该 Key。
Args:
http_error (httpx.HTTPStatusError): 429 错误异常对象。
api_key (Optional[str]): 当前使用的 API Key。
key_manager (APIKeyManager): Key 管理器实例。
Returns:
bool: 如果确定是每日配额耗尽错误,返回 True;否则返回 False。
"""
if not api_key: # 如果没有提供 api_key,无法标记,直接返回 False
return False
try:
# 尝试解析 429 错误的 JSON 响应体
error_detail = http_error.response.json()
is_daily_quota_error = False # 初始化标志
# 检查 Google API 标准错误结构中是否包含指示配额失败的详情
if error_detail and "error" in error_detail and "details" in error_detail["error"]:
for detail in error_detail["error"]["details"]:
# 检查详情类型是否为 QuotaFailure
if detail.get("@type") == "type.googleapis.com/google.rpc.QuotaFailure":
# 检查 quotaId 是否包含 "PerDay" 字样来判断是否为每日配额
quota_id = detail.get("quotaId", "")
if "PerDay" in quota_id:
is_daily_quota_error = True # 确认是每日配额错误
break # 找到后即可退出循环
if is_daily_quota_error: # 如果确认是每日配额错误
# 调用 Key 管理器的方法标记该 Key 当天已耗尽
key_manager.mark_key_daily_exhausted(api_key)
logger.warning(f"Key {api_key[:8]}... 因每日配额耗尽被标记为当天不可用。") # 记录警告日志
return True # 返回 True 表示已处理每日配额错误
except json.JSONDecodeError: # 处理 JSON 解析失败的情况
logger.error(f"无法解析 429 错误的 JSON 响应体 (Key: {api_key[:8]}...)") # 记录错误
except Exception as parse_e: # 处理解析过程中其他可能的异常
logger.error(f"解析 429 错误详情时发生意外异常 (Key: {api_key[:8]}...): {parse_e}") # 记录错误
return False # 如果不是每日配额错误或处理失败,返回 False
async def _handle_http_error_in_attempt(
http_err: httpx.HTTPStatusError,
current_api_key: Optional[str],
key_manager: APIKeyManager,
is_stream: bool, # 标记是否为流式请求 (用于日志)
request_id: Optional[str] = None # 请求 ID (用于日志)
) -> Tuple[Dict[str, Any], bool]:
"""
(内部辅助函数) 处理在单次 API 调用尝试 (`_attempt_api_call`) 中发生的 `httpx.HTTPStatusError`。
根据 HTTP 状态码判断错误类型,格式化错误信息,并决定是否需要重试(通常意味着尝试其他 Key)。
对于特定错误(如 429 每日配额、400 Key 无效、401/403),会调用 Key 管理器标记 Key 状态。
Args:
http_err (httpx.HTTPStatusError): 捕获到的 HTTP 状态错误异常。
current_api_key (Optional[str]): 当前尝试使用的 API Key。
key_manager (APIKeyManager): Key 管理器实例。
is_stream (bool): 当前请求是否为流式请求。
request_id (Optional[str]): 当前请求的 ID。
Returns:
Tuple[Dict[str, Any], bool]:
- 第一个元素 (Dict): 包含格式化错误信息的字典 ('message', 'type', 'code')。
- 第二个元素 (bool): 指示是否需要重试 (True 表示需要,False 表示不需要)。
"""
# 格式化错误消息和状态码
error_message, status_code = _format_api_error(http_err)
# 记录包含状态码和 Key 前缀的错误日志
logger.error(f"API HTTP 错误 ({'流式' if is_stream else '非流式'}, Key: {current_api_key[:8] if current_api_key else 'N/A'}, Request: {request_id}): {status_code} - {error_message}", exc_info=False) # exc_info=False 避免重复记录堆栈
# 初始化错误信息字典和重试标志
error_info: Dict[str, Any] = {
"message": error_message,
"type": "api_error", # 默认错误类型
"code": status_code
}
needs_retry = False # 默认不需要重试
# --- 根据 HTTP 状态码判断错误类型和是否需要重试 ---
if status_code in [500, 503]: # 服务器内部错误或服务不可用
error_info["type"] = "server_error" if status_code == 500 else "service_unavailable_error"
needs_retry = True # 这些通常是临时性问题,需要重试 (尝试其他 Key 或稍后重试)
logger.warning(f"HTTP 状态码 {status_code} 表示服务器临时错误,标记需要重试。")
# 标记 Key 临时不可用
if current_api_key:
key_manager.mark_key_temporarily_unavailable(current_api_key, duration_seconds=60, issue_type=f"HTTP {status_code}")
elif status_code == 429: # 请求过多 (速率限制或配额)
error_info["type"] = "rate_limit_error"
# 检查是否为每日配额耗尽
if _handle_429_daily_quota(http_err, current_api_key, key_manager):
needs_retry = True # 如果是每日配额耗尽,需要重试(尝试其他 Key)
error_info["message"] = f"Key {current_api_key[:8] if current_api_key else 'N/A'} 每日配额耗尽,尝试其他 Key。"
else:
# 如果是普通的速率限制 (RPM/TPM),通常不需要立即重试当前请求(因为是针对当前 Key 的限制)
# Key 选择逻辑应该在下次选择时避开此 Key 一段时间
needs_retry = False # 不需要外部重试循环立即尝试其他 Key
logger.warning(f"HTTP 状态码 429 (非每日配额) 表示当前 Key 速率限制,标记无需重试。")
# 仍然标记 Key 临时不可用,以便 Key 选择器避开
if current_api_key:
key_manager.mark_key_temporarily_unavailable(current_api_key, duration_seconds=60, issue_type="Rate Limit (429)")
elif status_code in [401, 403]: # 未授权或禁止访问
error_info["type"] = "authentication_error" if status_code == 401 else "permission_error"
needs_retry = False # 认证/权限问题通常是永久性的,不需要重试
logger.warning(f"HTTP 状态码 {status_code} 表示认证/权限问题,标记无需重试。")
# 标记 Key 临时不可用(可能需要较长时间,例如 5 分钟)
if current_api_key:
key_manager.mark_key_temporarily_unavailable(current_api_key, duration_seconds=300, issue_type=f"Auth/Permission ({status_code})")
elif status_code == 400: # 错误请求
error_info["type"] = "invalid_request_error"
needs_retry = False # 通常是请求本身的问题,不需要重试
# 检查是否为明确的 Key 无效错误
if "API key not valid" in error_message:
logger.warning(f"HTTP 状态码 400 (明确 Key 无效) 表示 Key 无效,标记无需重试。")
# 标记 Key 临时不可用(可能需要较长时间)
if current_api_key:
key_manager.mark_key_temporarily_unavailable(current_api_key, duration_seconds=300, issue_type="Invalid API Key (400)")
else:
logger.warning(f"HTTP 状态码 400 (非 Key 无效) 表示请求体问题,标记无需重试。")
else: # 其他未明确处理的 4xx 或其他错误
needs_retry = False # 默认不重试
logger.warning(f"HTTP 状态码 {status_code} 表示其他错误,标记无需重试。")
return error_info, needs_retry
async def _handle_api_call_exception(
exc: Exception,
current_api_key: Optional[str],
key_manager: APIKeyManager,
is_stream: bool,
request_id: Optional[str] = None
) -> Tuple[Dict[str, Any], bool]:
"""
(内部辅助函数) 统一处理在 `_attempt_api_call` 中捕获到的各类异常。
根据异常类型调用不同的处理逻辑,格式化错误信息,并决定是否需要重试。
Args:
exc (Exception): 捕获到的异常对象。
current_api_key (Optional[str]): 当前尝试使用的 API Key。
key_manager (APIKeyManager): Key 管理器实例。
is_stream (bool): 当前请求是否为流式请求。
request_id (Optional[str]): 当前请求的 ID。
Returns:
Tuple[Dict[str, Any], bool]:
- 第一个元素 (Dict): 包含格式化错误信息的字典 ('message', 'type', 'code')。
- 第二个元素 (bool): 指示是否需要重试 (True 表示需要,False 表示不需要)。
"""
needs_retry = False # 初始化重试标志
error_info: Dict[str, Any] = { # 初始化默认错误信息
"message": f"API 调用中发生意外异常: {exc}",
"type": "internal_error",
"code": 500 # 默认为 500 内部错误
}
if isinstance(exc, httpx.HTTPStatusError): # --- 处理 HTTP 状态错误 ---
# 调用专门处理 HTTP 错误的函数
error_info, needs_retry = await _handle_http_error_in_attempt(
exc, current_api_key, key_manager, is_stream, request_id
)
elif isinstance(exc, httpx.TimeoutException): # --- 处理请求超时 ---
error_message = f"请求超时 (Key: {current_api_key[:8] if current_api_key else 'N/A'}, Request: {request_id}): {exc}"
logger.error(error_message) # 记录错误
error_info["message"] = "请求 Gemini API 超时,请稍后重试。"
error_info["type"] = "timeout_error"
error_info["code"] = 504 # Gateway Timeout
needs_retry = True # 超时通常是临时问题,需要重试
# 标记 Key 临时不可用
if current_api_key:
key_manager.mark_key_temporarily_unavailable(current_api_key, duration_seconds=60, issue_type="Timeout")
elif isinstance(exc, httpx.RequestError): # --- 处理其他网络请求错误 ---
error_message = f"网络连接错误 (Key: {current_api_key[:8] if current_api_key else 'N/A'}, Request: {request_id}): {exc}"
logger.error(error_message) # 记录错误
error_info["message"] = "连接 Gemini API 时发生网络错误。"
error_info["type"] = "connection_error"
error_info["code"] = 503 # Service Unavailable
needs_retry = True # 网络问题通常是临时的,需要重试
# 标记 Key 临时不可用
if current_api_key:
key_manager.mark_key_temporarily_unavailable(current_api_key, duration_seconds=60, issue_type="Connection Error")
else: # --- 处理其他所有未预料到的异常 ---
error_message = f"API 调用中发生未知内部错误 (Key: {current_api_key[:8] if current_api_key else 'N/A'}, Request: {request_id}): {exc}"
logger.error(error_message, exc_info=True) # 记录包含堆栈的错误日志
error_info["message"] = "处理请求时发生意外的内部错误。"
error_info["type"] = "internal_error"
error_info["code"] = 500
needs_retry = False # 未知内部错误通常不建议自动重试
return error_info, needs_retry
# handle_gemini_error 函数的功能已被 _handle_api_call_exception 覆盖和细化,可以考虑移除
# def handle_gemini_error(e: Exception, api_key: Optional[str], key_manager: APIKeyManager) -> str: ...
|