File size: 13,990 Bytes
e82bac2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# -*- coding: utf-8 -*-
"""
API 请求处理中的错误处理辅助函数。
"""
import httpx     # 导入 HTTP 客户端库,用于处理 HTTP 异常
import json      # 导入 JSON 库,用于解析错误响应体
import logging   # 导入日志库
from typing import Optional, Dict, Any, Tuple # 导入类型提示

# 导入 APIKeyManager 类,用于标记 Key 状态
from app.core.keys.manager import APIKeyManager # (新路径)

# 获取日志记录器实例
logger = logging.getLogger("my_logger")

# --- 错误处理辅助函数 ---

def _format_api_error(http_err: httpx.HTTPStatusError) -> Tuple[str, int]:
    """
    (内部辅助函数) 根据 httpx.HTTPStatusError 格式化 API 错误消息和状态码。
    尝试从响应体中解析更具体的错误信息。

    Args:
        http_err (httpx.HTTPStatusError): httpx 抛出的 HTTP 状态错误异常。

    Returns:
        Tuple[str, int]: 包含错误消息字符串和 HTTP 状态码的元组。
    """
    status_code = http_err.response.status_code # 获取 HTTP 状态码
    error_message = f"API 请求失败,状态码: {status_code}。" # 默认错误消息

    try:
        # 尝试解析响应体为 JSON
        error_detail = http_err.response.json()
        # 尝试提取 Google API 标准错误格式中的 message 字段
        if isinstance(error_detail, dict) and "error" in error_detail and "message" in error_detail["error"]:
            error_message = f"API 错误 (状态码 {status_code}): {error_detail['error']['message']}"
        else:
            # 如果不是标准格式,使用原始响应体作为错误信息的一部分
            error_message = f"API 请求失败,状态码: {status_code}。响应体: {http_err.response.text[:200]}" # 限制响应体长度
    except json.JSONDecodeError:
        # 如果响应体不是有效的 JSON,使用原始响应体文本
        error_message = f"API 请求失败,状态码: {status_code}。响应体: {http_err.response.text[:200]}" # 限制响应体长度
    except Exception as e:
        # 捕获解析过程中其他可能的错误
        logger.error(f"解析 API 错误响应体时发生意外错误: {e}", exc_info=True)
        error_message = f"API 请求失败,状态码: {status_code},且无法解析错误详情。"

    return error_message, status_code

def _handle_429_daily_quota(http_error: httpx.HTTPStatusError, api_key: Optional[str], key_manager: APIKeyManager) -> bool:
    """
    (内部辅助函数) 处理 HTTP 429 错误,专门检查是否为每日配额耗尽。
    如果是每日配额耗尽,则调用 Key 管理器标记该 Key。

    Args:
        http_error (httpx.HTTPStatusError): 429 错误异常对象。
        api_key (Optional[str]): 当前使用的 API Key。
        key_manager (APIKeyManager): Key 管理器实例。

    Returns:
        bool: 如果确定是每日配额耗尽错误,返回 True;否则返回 False。
    """
    if not api_key: # 如果没有提供 api_key,无法标记,直接返回 False
        return False

    try:
        # 尝试解析 429 错误的 JSON 响应体
        error_detail = http_error.response.json()
        is_daily_quota_error = False # 初始化标志
        # 检查 Google API 标准错误结构中是否包含指示配额失败的详情
        if error_detail and "error" in error_detail and "details" in error_detail["error"]:
            for detail in error_detail["error"]["details"]:
                # 检查详情类型是否为 QuotaFailure
                if detail.get("@type") == "type.googleapis.com/google.rpc.QuotaFailure":
                    # 检查 quotaId 是否包含 "PerDay" 字样来判断是否为每日配额
                    quota_id = detail.get("quotaId", "")
                    if "PerDay" in quota_id:
                        is_daily_quota_error = True # 确认是每日配额错误
                        break # 找到后即可退出循环

        if is_daily_quota_error: # 如果确认是每日配额错误
            # 调用 Key 管理器的方法标记该 Key 当天已耗尽
            key_manager.mark_key_daily_exhausted(api_key)
            logger.warning(f"Key {api_key[:8]}... 因每日配额耗尽被标记为当天不可用。") # 记录警告日志
            return True # 返回 True 表示已处理每日配额错误

    except json.JSONDecodeError: # 处理 JSON 解析失败的情况
        logger.error(f"无法解析 429 错误的 JSON 响应体 (Key: {api_key[:8]}...)") # 记录错误
    except Exception as parse_e: # 处理解析过程中其他可能的异常
        logger.error(f"解析 429 错误详情时发生意外异常 (Key: {api_key[:8]}...): {parse_e}") # 记录错误

    return False # 如果不是每日配额错误或处理失败,返回 False

async def _handle_http_error_in_attempt(
    http_err: httpx.HTTPStatusError,
    current_api_key: Optional[str],
    key_manager: APIKeyManager,
    is_stream: bool, # 标记是否为流式请求 (用于日志)
    request_id: Optional[str] = None # 请求 ID (用于日志)
) -> Tuple[Dict[str, Any], bool]:
    """
    (内部辅助函数) 处理在单次 API 调用尝试 (`_attempt_api_call`) 中发生的 `httpx.HTTPStatusError`。
    根据 HTTP 状态码判断错误类型,格式化错误信息,并决定是否需要重试(通常意味着尝试其他 Key)。
    对于特定错误(如 429 每日配额、400 Key 无效、401/403),会调用 Key 管理器标记 Key 状态。

    Args:
        http_err (httpx.HTTPStatusError): 捕获到的 HTTP 状态错误异常。
        current_api_key (Optional[str]): 当前尝试使用的 API Key。
        key_manager (APIKeyManager): Key 管理器实例。
        is_stream (bool): 当前请求是否为流式请求。
        request_id (Optional[str]): 当前请求的 ID。

    Returns:
        Tuple[Dict[str, Any], bool]:
            - 第一个元素 (Dict): 包含格式化错误信息的字典 ('message', 'type', 'code')。
            - 第二个元素 (bool): 指示是否需要重试 (True 表示需要,False 表示不需要)。
    """
    # 格式化错误消息和状态码
    error_message, status_code = _format_api_error(http_err)
    # 记录包含状态码和 Key 前缀的错误日志
    logger.error(f"API HTTP 错误 ({'流式' if is_stream else '非流式'}, Key: {current_api_key[:8] if current_api_key else 'N/A'}, Request: {request_id}): {status_code} - {error_message}", exc_info=False) # exc_info=False 避免重复记录堆栈

    # 初始化错误信息字典和重试标志
    error_info: Dict[str, Any] = {
        "message": error_message,
        "type": "api_error", # 默认错误类型
        "code": status_code
    }
    needs_retry = False # 默认不需要重试

    # --- 根据 HTTP 状态码判断错误类型和是否需要重试 ---
    if status_code in [500, 503]: # 服务器内部错误或服务不可用
        error_info["type"] = "server_error" if status_code == 500 else "service_unavailable_error"
        needs_retry = True # 这些通常是临时性问题,需要重试 (尝试其他 Key 或稍后重试)
        logger.warning(f"HTTP 状态码 {status_code} 表示服务器临时错误,标记需要重试。")
        # 标记 Key 临时不可用
        if current_api_key:
            key_manager.mark_key_temporarily_unavailable(current_api_key, duration_seconds=60, issue_type=f"HTTP {status_code}")

    elif status_code == 429: # 请求过多 (速率限制或配额)
        error_info["type"] = "rate_limit_error"
        # 检查是否为每日配额耗尽
        if _handle_429_daily_quota(http_err, current_api_key, key_manager):
            needs_retry = True # 如果是每日配额耗尽,需要重试(尝试其他 Key)
            error_info["message"] = f"Key {current_api_key[:8] if current_api_key else 'N/A'} 每日配额耗尽,尝试其他 Key。"
        else:
             # 如果是普通的速率限制 (RPM/TPM),通常不需要立即重试当前请求(因为是针对当前 Key 的限制)
             # Key 选择逻辑应该在下次选择时避开此 Key 一段时间
             needs_retry = False # 不需要外部重试循环立即尝试其他 Key
             logger.warning(f"HTTP 状态码 429 (非每日配额) 表示当前 Key 速率限制,标记无需重试。")
             # 仍然标记 Key 临时不可用,以便 Key 选择器避开
             if current_api_key:
                 key_manager.mark_key_temporarily_unavailable(current_api_key, duration_seconds=60, issue_type="Rate Limit (429)")

    elif status_code in [401, 403]: # 未授权或禁止访问
        error_info["type"] = "authentication_error" if status_code == 401 else "permission_error"
        needs_retry = False # 认证/权限问题通常是永久性的,不需要重试
        logger.warning(f"HTTP 状态码 {status_code} 表示认证/权限问题,标记无需重试。")
        # 标记 Key 临时不可用(可能需要较长时间,例如 5 分钟)
        if current_api_key:
            key_manager.mark_key_temporarily_unavailable(current_api_key, duration_seconds=300, issue_type=f"Auth/Permission ({status_code})")

    elif status_code == 400: # 错误请求
        error_info["type"] = "invalid_request_error"
        needs_retry = False # 通常是请求本身的问题,不需要重试
        # 检查是否为明确的 Key 无效错误
        if "API key not valid" in error_message:
             logger.warning(f"HTTP 状态码 400 (明确 Key 无效) 表示 Key 无效,标记无需重试。")
             # 标记 Key 临时不可用(可能需要较长时间)
             if current_api_key:
                 key_manager.mark_key_temporarily_unavailable(current_api_key, duration_seconds=300, issue_type="Invalid API Key (400)")
        else:
             logger.warning(f"HTTP 状态码 400 (非 Key 无效) 表示请求体问题,标记无需重试。")

    else: # 其他未明确处理的 4xx 或其他错误
        needs_retry = False # 默认不重试
        logger.warning(f"HTTP 状态码 {status_code} 表示其他错误,标记无需重试。")

    return error_info, needs_retry

async def _handle_api_call_exception(
    exc: Exception,
    current_api_key: Optional[str],
    key_manager: APIKeyManager,
    is_stream: bool,
    request_id: Optional[str] = None
) -> Tuple[Dict[str, Any], bool]:
    """
    (内部辅助函数) 统一处理在 `_attempt_api_call` 中捕获到的各类异常。
    根据异常类型调用不同的处理逻辑,格式化错误信息,并决定是否需要重试。

    Args:
        exc (Exception): 捕获到的异常对象。
        current_api_key (Optional[str]): 当前尝试使用的 API Key。
        key_manager (APIKeyManager): Key 管理器实例。
        is_stream (bool): 当前请求是否为流式请求。
        request_id (Optional[str]): 当前请求的 ID。

    Returns:
        Tuple[Dict[str, Any], bool]:
            - 第一个元素 (Dict): 包含格式化错误信息的字典 ('message', 'type', 'code')。
            - 第二个元素 (bool): 指示是否需要重试 (True 表示需要,False 表示不需要)。
    """
    needs_retry = False # 初始化重试标志
    error_info: Dict[str, Any] = { # 初始化默认错误信息
        "message": f"API 调用中发生意外异常: {exc}",
        "type": "internal_error",
        "code": 500 # 默认为 500 内部错误
    }

    if isinstance(exc, httpx.HTTPStatusError): # --- 处理 HTTP 状态错误 ---
        # 调用专门处理 HTTP 错误的函数
        error_info, needs_retry = await _handle_http_error_in_attempt(
            exc, current_api_key, key_manager, is_stream, request_id
        )
    elif isinstance(exc, httpx.TimeoutException): # --- 处理请求超时 ---
        error_message = f"请求超时 (Key: {current_api_key[:8] if current_api_key else 'N/A'}, Request: {request_id}): {exc}"
        logger.error(error_message) # 记录错误
        error_info["message"] = "请求 Gemini API 超时,请稍后重试。"
        error_info["type"] = "timeout_error"
        error_info["code"] = 504 # Gateway Timeout
        needs_retry = True # 超时通常是临时问题,需要重试
        # 标记 Key 临时不可用
        if current_api_key:
            key_manager.mark_key_temporarily_unavailable(current_api_key, duration_seconds=60, issue_type="Timeout")
    elif isinstance(exc, httpx.RequestError): # --- 处理其他网络请求错误 ---
        error_message = f"网络连接错误 (Key: {current_api_key[:8] if current_api_key else 'N/A'}, Request: {request_id}): {exc}"
        logger.error(error_message) # 记录错误
        error_info["message"] = "连接 Gemini API 时发生网络错误。"
        error_info["type"] = "connection_error"
        error_info["code"] = 503 # Service Unavailable
        needs_retry = True # 网络问题通常是临时的,需要重试
        # 标记 Key 临时不可用
        if current_api_key:
            key_manager.mark_key_temporarily_unavailable(current_api_key, duration_seconds=60, issue_type="Connection Error")
    else: # --- 处理其他所有未预料到的异常 ---
        error_message = f"API 调用中发生未知内部错误 (Key: {current_api_key[:8] if current_api_key else 'N/A'}, Request: {request_id}): {exc}"
        logger.error(error_message, exc_info=True) # 记录包含堆栈的错误日志
        error_info["message"] = "处理请求时发生意外的内部错误。"
        error_info["type"] = "internal_error"
        error_info["code"] = 500
        needs_retry = False # 未知内部错误通常不建议自动重试

    return error_info, needs_retry

# handle_gemini_error 函数的功能已被 _handle_api_call_exception 覆盖和细化,可以考虑移除
# def handle_gemini_error(e: Exception, api_key: Optional[str], key_manager: APIKeyManager) -> str: ...