Spaces:
Running
Running
File size: 5,311 Bytes
4e98fb0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | # 文件路径: app/utils/retry.py
"""
LLM 调用重试机制
使用 tenacity 库实现智能重试策略:
- 指数退避 (Exponential Backoff)
- 可重试异常识别
- 最大重试次数限制
- 详细日志记录
"""
import logging
from typing import Callable, Type, Tuple, Any
from functools import wraps
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log,
after_log,
RetryError,
)
# 配置日志
logger = logging.getLogger("llm_retry")
logger.setLevel(logging.INFO)
# ============================================================================
# 可重试的异常类型定义
# ============================================================================
# 网络/临时性错误 - 应该重试
RETRYABLE_EXCEPTIONS: Tuple[Type[Exception], ...] = (
ConnectionError,
TimeoutError,
)
# 尝试导入各 SDK 的异常类型
try:
from openai import (
APIConnectionError,
APITimeoutError,
RateLimitError,
InternalServerError,
)
RETRYABLE_EXCEPTIONS = RETRYABLE_EXCEPTIONS + (
APIConnectionError,
APITimeoutError,
RateLimitError,
InternalServerError,
)
except ImportError:
pass
try:
from anthropic import (
APIConnectionError as AnthropicConnectionError,
APITimeoutError as AnthropicTimeoutError,
RateLimitError as AnthropicRateLimitError,
InternalServerError as AnthropicServerError,
)
RETRYABLE_EXCEPTIONS = RETRYABLE_EXCEPTIONS + (
AnthropicConnectionError,
AnthropicTimeoutError,
AnthropicRateLimitError,
AnthropicServerError,
)
except ImportError:
pass
try:
import httpx
RETRYABLE_EXCEPTIONS = RETRYABLE_EXCEPTIONS + (
httpx.ConnectError,
httpx.ReadTimeout,
httpx.ConnectTimeout,
)
except ImportError:
pass
# ============================================================================
# 重试配置
# ============================================================================
class RetryConfig:
"""重试配置"""
MAX_ATTEMPTS: int = 3 # 最大重试次数
MIN_WAIT_SECONDS: float = 1.0 # 最小等待时间
MAX_WAIT_SECONDS: float = 30.0 # 最大等待时间
EXPONENTIAL_MULTIPLIER: float = 2.0 # 指数退避乘数
# ============================================================================
# 重试装饰器
# ============================================================================
def create_retry_decorator(
max_attempts: int = RetryConfig.MAX_ATTEMPTS,
min_wait: float = RetryConfig.MIN_WAIT_SECONDS,
max_wait: float = RetryConfig.MAX_WAIT_SECONDS,
):
"""
创建 LLM 调用重试装饰器
Args:
max_attempts: 最大重试次数
min_wait: 最小等待时间 (秒)
max_wait: 最大等待时间 (秒)
Returns:
tenacity retry 装饰器
"""
return retry(
# 重试条件: 仅对可重试异常进行重试
retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS),
# 停止条件: 达到最大重试次数
stop=stop_after_attempt(max_attempts),
# 等待策略: 指数退避
wait=wait_exponential(
multiplier=RetryConfig.EXPONENTIAL_MULTIPLIER,
min=min_wait,
max=max_wait,
),
# 日志: 重试前记录
before_sleep=before_sleep_log(logger, logging.WARNING),
# 日志: 重试后记录
after=after_log(logger, logging.DEBUG),
# 重新抛出最后一个异常
reraise=True,
)
# 默认的重试装饰器实例
llm_retry = create_retry_decorator()
def with_retry(func: Callable) -> Callable:
"""
为异步函数添加重试能力的装饰器
Usage:
@with_retry
async def call_llm(...):
...
"""
@wraps(func)
async def wrapper(*args, **kwargs):
@llm_retry
async def _inner():
return await func(*args, **kwargs)
return await _inner()
return wrapper
# ============================================================================
# 便捷函数
# ============================================================================
async def retry_async(
coro_func: Callable,
*args,
max_attempts: int = RetryConfig.MAX_ATTEMPTS,
**kwargs
) -> Any:
"""
带重试的异步调用
Usage:
result = await retry_async(
client.chat.completions.create,
model="gpt-4",
messages=[...]
)
"""
decorator = create_retry_decorator(max_attempts=max_attempts)
@decorator
async def _call():
return await coro_func(*args, **kwargs)
return await _call()
def is_retryable_error(error: Exception) -> bool:
"""判断异常是否可重试"""
return isinstance(error, RETRYABLE_EXCEPTIONS)
def log_retry_info(attempt: int, max_attempts: int, error: Exception, wait_time: float):
"""记录重试信息的辅助函数"""
logger.warning(
f"🔄 LLM 调用失败 (尝试 {attempt}/{max_attempts}): {type(error).__name__}: {error}. "
f"等待 {wait_time:.1f}s 后重试..."
)
|