GAP / app /core /processing /error_handler.py
misonL's picture
Upload 52 files
e82bac2 verified
Raw
History Blame Contribute Delete
14 kB
# -*- coding: utf-8 -*-
"""
API 请求处理中的错误处理辅助函数。
"""
import httpx # 导入 HTTP 客户端库,用于处理 HTTP 异常
import json # 导入 JSON 库,用于解析错误响应体
import logging # 导入日志库
from typing import Optional, Dict, Any, Tuple # 导入类型提示
# 导入 APIKeyManager 类,用于标记 Key 状态
from app.core.keys.manager import APIKeyManager # (新路径)
# 获取日志记录器实例
logger = logging.getLogger("my_logger")
# --- 错误处理辅助函数 ---
def _format_api_error(http_err: httpx.HTTPStatusError) -> Tuple[str, int]:
"""
(内部辅助函数) 根据 httpx.HTTPStatusError 格式化 API 错误消息和状态码。
尝试从响应体中解析更具体的错误信息。
Args:
http_err (httpx.HTTPStatusError): httpx 抛出的 HTTP 状态错误异常。
Returns:
Tuple[str, int]: 包含错误消息字符串和 HTTP 状态码的元组。
"""
status_code = http_err.response.status_code # 获取 HTTP 状态码
error_message = f"API 请求失败,状态码: {status_code}。" # 默认错误消息
try:
# 尝试解析响应体为 JSON
error_detail = http_err.response.json()
# 尝试提取 Google API 标准错误格式中的 message 字段
if isinstance(error_detail, dict) and "error" in error_detail and "message" in error_detail["error"]:
error_message = f"API 错误 (状态码 {status_code}): {error_detail['error']['message']}"
else:
# 如果不是标准格式,使用原始响应体作为错误信息的一部分
error_message = f"API 请求失败,状态码: {status_code}。响应体: {http_err.response.text[:200]}" # 限制响应体长度
except json.JSONDecodeError:
# 如果响应体不是有效的 JSON,使用原始响应体文本
error_message = f"API 请求失败,状态码: {status_code}。响应体: {http_err.response.text[:200]}" # 限制响应体长度
except Exception as e:
# 捕获解析过程中其他可能的错误
logger.error(f"解析 API 错误响应体时发生意外错误: {e}", exc_info=True)
error_message = f"API 请求失败,状态码: {status_code},且无法解析错误详情。"
return error_message, status_code
def _handle_429_daily_quota(http_error: httpx.HTTPStatusError, api_key: Optional[str], key_manager: APIKeyManager) -> bool:
"""
(内部辅助函数) 处理 HTTP 429 错误,专门检查是否为每日配额耗尽。
如果是每日配额耗尽,则调用 Key 管理器标记该 Key。
Args:
http_error (httpx.HTTPStatusError): 429 错误异常对象。
api_key (Optional[str]): 当前使用的 API Key。
key_manager (APIKeyManager): Key 管理器实例。
Returns:
bool: 如果确定是每日配额耗尽错误,返回 True;否则返回 False。
"""
if not api_key: # 如果没有提供 api_key,无法标记,直接返回 False
return False
try:
# 尝试解析 429 错误的 JSON 响应体
error_detail = http_error.response.json()
is_daily_quota_error = False # 初始化标志
# 检查 Google API 标准错误结构中是否包含指示配额失败的详情
if error_detail and "error" in error_detail and "details" in error_detail["error"]:
for detail in error_detail["error"]["details"]:
# 检查详情类型是否为 QuotaFailure
if detail.get("@type") == "type.googleapis.com/google.rpc.QuotaFailure":
# 检查 quotaId 是否包含 "PerDay" 字样来判断是否为每日配额
quota_id = detail.get("quotaId", "")
if "PerDay" in quota_id:
is_daily_quota_error = True # 确认是每日配额错误
break # 找到后即可退出循环
if is_daily_quota_error: # 如果确认是每日配额错误
# 调用 Key 管理器的方法标记该 Key 当天已耗尽
key_manager.mark_key_daily_exhausted(api_key)
logger.warning(f"Key {api_key[:8]}... 因每日配额耗尽被标记为当天不可用。") # 记录警告日志
return True # 返回 True 表示已处理每日配额错误
except json.JSONDecodeError: # 处理 JSON 解析失败的情况
logger.error(f"无法解析 429 错误的 JSON 响应体 (Key: {api_key[:8]}...)") # 记录错误
except Exception as parse_e: # 处理解析过程中其他可能的异常
logger.error(f"解析 429 错误详情时发生意外异常 (Key: {api_key[:8]}...): {parse_e}") # 记录错误
return False # 如果不是每日配额错误或处理失败,返回 False
async def _handle_http_error_in_attempt(
http_err: httpx.HTTPStatusError,
current_api_key: Optional[str],
key_manager: APIKeyManager,
is_stream: bool, # 标记是否为流式请求 (用于日志)
request_id: Optional[str] = None # 请求 ID (用于日志)
) -> Tuple[Dict[str, Any], bool]:
"""
(内部辅助函数) 处理在单次 API 调用尝试 (`_attempt_api_call`) 中发生的 `httpx.HTTPStatusError`。
根据 HTTP 状态码判断错误类型,格式化错误信息,并决定是否需要重试(通常意味着尝试其他 Key)。
对于特定错误(如 429 每日配额、400 Key 无效、401/403),会调用 Key 管理器标记 Key 状态。
Args:
http_err (httpx.HTTPStatusError): 捕获到的 HTTP 状态错误异常。
current_api_key (Optional[str]): 当前尝试使用的 API Key。
key_manager (APIKeyManager): Key 管理器实例。
is_stream (bool): 当前请求是否为流式请求。
request_id (Optional[str]): 当前请求的 ID。
Returns:
Tuple[Dict[str, Any], bool]:
- 第一个元素 (Dict): 包含格式化错误信息的字典 ('message', 'type', 'code')。
- 第二个元素 (bool): 指示是否需要重试 (True 表示需要,False 表示不需要)。
"""
# 格式化错误消息和状态码
error_message, status_code = _format_api_error(http_err)
# 记录包含状态码和 Key 前缀的错误日志
logger.error(f"API HTTP 错误 ({'流式' if is_stream else '非流式'}, Key: {current_api_key[:8] if current_api_key else 'N/A'}, Request: {request_id}): {status_code} - {error_message}", exc_info=False) # exc_info=False 避免重复记录堆栈
# 初始化错误信息字典和重试标志
error_info: Dict[str, Any] = {
"message": error_message,
"type": "api_error", # 默认错误类型
"code": status_code
}
needs_retry = False # 默认不需要重试
# --- 根据 HTTP 状态码判断错误类型和是否需要重试 ---
if status_code in [500, 503]: # 服务器内部错误或服务不可用
error_info["type"] = "server_error" if status_code == 500 else "service_unavailable_error"
needs_retry = True # 这些通常是临时性问题,需要重试 (尝试其他 Key 或稍后重试)
logger.warning(f"HTTP 状态码 {status_code} 表示服务器临时错误,标记需要重试。")
# 标记 Key 临时不可用
if current_api_key:
key_manager.mark_key_temporarily_unavailable(current_api_key, duration_seconds=60, issue_type=f"HTTP {status_code}")
elif status_code == 429: # 请求过多 (速率限制或配额)
error_info["type"] = "rate_limit_error"
# 检查是否为每日配额耗尽
if _handle_429_daily_quota(http_err, current_api_key, key_manager):
needs_retry = True # 如果是每日配额耗尽,需要重试(尝试其他 Key)
error_info["message"] = f"Key {current_api_key[:8] if current_api_key else 'N/A'} 每日配额耗尽,尝试其他 Key。"
else:
# 如果是普通的速率限制 (RPM/TPM),通常不需要立即重试当前请求(因为是针对当前 Key 的限制)
# Key 选择逻辑应该在下次选择时避开此 Key 一段时间
needs_retry = False # 不需要外部重试循环立即尝试其他 Key
logger.warning(f"HTTP 状态码 429 (非每日配额) 表示当前 Key 速率限制,标记无需重试。")
# 仍然标记 Key 临时不可用,以便 Key 选择器避开
if current_api_key:
key_manager.mark_key_temporarily_unavailable(current_api_key, duration_seconds=60, issue_type="Rate Limit (429)")
elif status_code in [401, 403]: # 未授权或禁止访问
error_info["type"] = "authentication_error" if status_code == 401 else "permission_error"
needs_retry = False # 认证/权限问题通常是永久性的,不需要重试
logger.warning(f"HTTP 状态码 {status_code} 表示认证/权限问题,标记无需重试。")
# 标记 Key 临时不可用(可能需要较长时间,例如 5 分钟)
if current_api_key:
key_manager.mark_key_temporarily_unavailable(current_api_key, duration_seconds=300, issue_type=f"Auth/Permission ({status_code})")
elif status_code == 400: # 错误请求
error_info["type"] = "invalid_request_error"
needs_retry = False # 通常是请求本身的问题,不需要重试
# 检查是否为明确的 Key 无效错误
if "API key not valid" in error_message:
logger.warning(f"HTTP 状态码 400 (明确 Key 无效) 表示 Key 无效,标记无需重试。")
# 标记 Key 临时不可用(可能需要较长时间)
if current_api_key:
key_manager.mark_key_temporarily_unavailable(current_api_key, duration_seconds=300, issue_type="Invalid API Key (400)")
else:
logger.warning(f"HTTP 状态码 400 (非 Key 无效) 表示请求体问题,标记无需重试。")
else: # 其他未明确处理的 4xx 或其他错误
needs_retry = False # 默认不重试
logger.warning(f"HTTP 状态码 {status_code} 表示其他错误,标记无需重试。")
return error_info, needs_retry
async def _handle_api_call_exception(
exc: Exception,
current_api_key: Optional[str],
key_manager: APIKeyManager,
is_stream: bool,
request_id: Optional[str] = None
) -> Tuple[Dict[str, Any], bool]:
"""
(内部辅助函数) 统一处理在 `_attempt_api_call` 中捕获到的各类异常。
根据异常类型调用不同的处理逻辑,格式化错误信息,并决定是否需要重试。
Args:
exc (Exception): 捕获到的异常对象。
current_api_key (Optional[str]): 当前尝试使用的 API Key。
key_manager (APIKeyManager): Key 管理器实例。
is_stream (bool): 当前请求是否为流式请求。
request_id (Optional[str]): 当前请求的 ID。
Returns:
Tuple[Dict[str, Any], bool]:
- 第一个元素 (Dict): 包含格式化错误信息的字典 ('message', 'type', 'code')。
- 第二个元素 (bool): 指示是否需要重试 (True 表示需要,False 表示不需要)。
"""
needs_retry = False # 初始化重试标志
error_info: Dict[str, Any] = { # 初始化默认错误信息
"message": f"API 调用中发生意外异常: {exc}",
"type": "internal_error",
"code": 500 # 默认为 500 内部错误
}
if isinstance(exc, httpx.HTTPStatusError): # --- 处理 HTTP 状态错误 ---
# 调用专门处理 HTTP 错误的函数
error_info, needs_retry = await _handle_http_error_in_attempt(
exc, current_api_key, key_manager, is_stream, request_id
)
elif isinstance(exc, httpx.TimeoutException): # --- 处理请求超时 ---
error_message = f"请求超时 (Key: {current_api_key[:8] if current_api_key else 'N/A'}, Request: {request_id}): {exc}"
logger.error(error_message) # 记录错误
error_info["message"] = "请求 Gemini API 超时,请稍后重试。"
error_info["type"] = "timeout_error"
error_info["code"] = 504 # Gateway Timeout
needs_retry = True # 超时通常是临时问题,需要重试
# 标记 Key 临时不可用
if current_api_key:
key_manager.mark_key_temporarily_unavailable(current_api_key, duration_seconds=60, issue_type="Timeout")
elif isinstance(exc, httpx.RequestError): # --- 处理其他网络请求错误 ---
error_message = f"网络连接错误 (Key: {current_api_key[:8] if current_api_key else 'N/A'}, Request: {request_id}): {exc}"
logger.error(error_message) # 记录错误
error_info["message"] = "连接 Gemini API 时发生网络错误。"
error_info["type"] = "connection_error"
error_info["code"] = 503 # Service Unavailable
needs_retry = True # 网络问题通常是临时的,需要重试
# 标记 Key 临时不可用
if current_api_key:
key_manager.mark_key_temporarily_unavailable(current_api_key, duration_seconds=60, issue_type="Connection Error")
else: # --- 处理其他所有未预料到的异常 ---
error_message = f"API 调用中发生未知内部错误 (Key: {current_api_key[:8] if current_api_key else 'N/A'}, Request: {request_id}): {exc}"
logger.error(error_message, exc_info=True) # 记录包含堆栈的错误日志
error_info["message"] = "处理请求时发生意外的内部错误。"
error_info["type"] = "internal_error"
error_info["code"] = 500
needs_retry = False # 未知内部错误通常不建议自动重试
return error_info, needs_retry
# handle_gemini_error 函数的功能已被 _handle_api_call_exception 覆盖和细化,可以考虑移除
# def handle_gemini_error(e: Exception, api_key: Optional[str], key_manager: APIKeyManager) -> str: ...