gemini / app /utils /error_handling.py
drriver's picture
Upload 28 files
60016b2 verified
import requests
import logging
import asyncio
from fastapi import HTTPException, status
from app.utils.logging import format_log_message
logger = logging.getLogger("my_logger")
def handle_gemini_error(error, current_api_key, key_manager) -> str:
if isinstance(error, requests.exceptions.HTTPError):
status_code = error.response.status_code
if status_code == 400:
try:
error_data = error.response.json()
if 'error' in error_data:
if error_data['error'].get('code') == "invalid_argument":
error_message = "无效的 API 密钥"
extra_log_invalid_key = {'key': current_api_key[:8], 'status_code': status_code, 'error_message': error_message}
log_msg = format_log_message('ERROR', f"{current_api_key[:8]} ... {current_api_key[-3:]} → 无效,可能已过期或被删除", extra=extra_log_invalid_key)
logger.error(log_msg)
# key_manager.blacklist_key(current_api_key)
return error_message
error_message = error_data['error'].get(
'message', 'Bad Request')
extra_log_400 = {'key': current_api_key[:8], 'status_code': status_code, 'error_message': error_message}
log_msg = format_log_message('WARNING', f"400 错误请求: {error_message}", extra=extra_log_400)
logger.warning(log_msg)
return f"400 错误请求: {error_message}"
except ValueError:
error_message = "400 错误请求:响应不是有效的JSON格式"
extra_log_400_json = {'key': current_api_key[:8], 'status_code': status_code, 'error_message': error_message}
log_msg = format_log_message('WARNING', error_message, extra=extra_log_400_json)
logger.warning(log_msg)
return error_message
elif status_code == 429:
error_message = "API 密钥配额已用尽或其他原因"
extra_log_429 = {'key': current_api_key[:8], 'status_code': status_code, 'error_message': error_message}
log_msg = format_log_message('WARNING', f"{current_api_key[:8]} ... {current_api_key[-3:]} → 429 官方资源耗尽或其他原因", extra=extra_log_429)
logger.warning(log_msg)
# key_manager.blacklist_key(current_api_key)
return error_message
elif status_code == 403:
error_message = "权限被拒绝"
extra_log_403 = {'key': current_api_key[:8], 'status_code': status_code, 'error_message': error_message}
log_msg = format_log_message('ERROR', f"{current_api_key[:8]} ... {current_api_key[-3:]} → 403 权限被拒绝", extra=extra_log_403)
logger.error(log_msg)
# key_manager.blacklist_key(current_api_key)
return error_message
elif status_code == 500:
error_message = "服务器内部错误"
extra_log_500 = {'key': current_api_key[:8], 'status_code': status_code, 'error_message': error_message}
log_msg = format_log_message('WARNING', f"{current_api_key[:8]} ... {current_api_key[-3:]} → 500 服务器内部错误", extra=extra_log_500)
logger.warning(log_msg)
return "Gemini API 内部错误"
elif status_code == 503:
error_message = "服务不可用"
extra_log_503 = {'key': current_api_key[:8], 'status_code': status_code, 'error_message': error_message}
log_msg = format_log_message('WARNING', f"{current_api_key[:8]} ... {current_api_key[-3:]} → 503 服务不可用", extra=extra_log_503)
logger.warning(log_msg)
return "Gemini API 服务不可用"
else:
error_message = f"未知错误: {status_code}"
extra_log_other = {'key': current_api_key[:8], 'status_code': status_code, 'error_message': error_message}
log_msg = format_log_message('WARNING', f"{current_api_key[:8]} ... {current_api_key[-3:]}{status_code} 未知错误", extra=extra_log_other)
logger.warning(log_msg)
return f"未知错误/模型不可用: {status_code}"
elif isinstance(error, requests.exceptions.ConnectionError):
error_message = "连接错误"
log_msg = format_log_message('WARNING', error_message, extra={'error_message': error_message})
logger.warning(log_msg)
return error_message
elif isinstance(error, requests.exceptions.Timeout):
error_message = "请求超时"
log_msg = format_log_message('WARNING', error_message, extra={'error_message': error_message})
logger.warning(log_msg)
return error_message
else:
error_message = f"发生未知错误: {error}"
log_msg = format_log_message('ERROR', error_message, extra={'error_message': error_message})
logger.error(log_msg)
return error_message
def translate_error(message: str) -> str:
if "quota exceeded" in message.lower():
return "API 密钥配额已用尽"
if "invalid argument" in message.lower():
return "无效参数"
if "internal server error" in message.lower():
return "服务器内部错误"
if "service unavailable" in message.lower():
return "服务不可用"
return message
async def handle_api_error(e, api_key, key_manager, request_type, model, retry_count=0):
"""统一处理API错误,对500和503错误实现自动重试机制"""
error_detail = handle_gemini_error(e, api_key, key_manager)
# 处理500和503服务器错误
if isinstance(e, requests.exceptions.HTTPError) and ('500' in str(e) or '503' in str(e)):
status_code = '500' if '500' in str(e) else '503'
# 最多重试3次
if retry_count < 3:
wait_time = min(1 * (2 ** retry_count), 16) # RETRY_DELAY=1, MAX_RETRY_DELAY=16
log_msg = format_log_message('WARNING', f"Gemini服务器错误({status_code}),等待{wait_time}秒后重试 ({retry_count+1}/3)",
extra={'key': api_key[:8], 'request_type': request_type, 'model': model, 'status_code': int(status_code)})
logger.warning(log_msg)
# 等待后返回重试信号
await asyncio.sleep(wait_time)
return {'should_retry': True, 'error': error_detail, 'remove_cache': False}
# 重试次数用尽,直接返回错误状态码
log_msg = format_log_message('ERROR', f"服务器错误({status_code})重试{retry_count}次后仍然失败",
extra={'key': api_key[:8], 'request_type': request_type, 'model': model, 'status_code': int(status_code)})
logger.error(log_msg)
# 不建议切换密钥,直接抛出HTTP异常
raise HTTPException(status_code=int(status_code),
detail=f"Gemini API 服务器错误({status_code}),请稍后重试")
# 对于其他错误,返回切换密钥的信号
log_msg = format_log_message('ERROR', f"API错误: {error_detail}",
extra={'key': api_key[:8], 'request_type': request_type, 'model': model, 'error_message': error_detail})
logger.error(log_msg)
return {'should_retry': False, 'should_switch_key': True, 'error': error_detail, 'remove_cache': True}