File size: 5,311 Bytes
4e98fb0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# 文件路径: app/utils/retry.py
"""
LLM 调用重试机制

使用 tenacity 库实现智能重试策略:
- 指数退避 (Exponential Backoff)
- 可重试异常识别
- 最大重试次数限制
- 详细日志记录
"""

import logging
from typing import Callable, Type, Tuple, Any
from functools import wraps

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before_sleep_log,
    after_log,
    RetryError,
)

# 配置日志
logger = logging.getLogger("llm_retry")
logger.setLevel(logging.INFO)


# ============================================================================
# 可重试的异常类型定义
# ============================================================================

# 网络/临时性错误 - 应该重试
RETRYABLE_EXCEPTIONS: Tuple[Type[Exception], ...] = (
    ConnectionError,
    TimeoutError,
)

# 尝试导入各 SDK 的异常类型
try:
    from openai import (
        APIConnectionError,
        APITimeoutError,
        RateLimitError,
        InternalServerError,
    )
    RETRYABLE_EXCEPTIONS = RETRYABLE_EXCEPTIONS + (
        APIConnectionError,
        APITimeoutError,
        RateLimitError,
        InternalServerError,
    )
except ImportError:
    pass

try:
    from anthropic import (
        APIConnectionError as AnthropicConnectionError,
        APITimeoutError as AnthropicTimeoutError,
        RateLimitError as AnthropicRateLimitError,
        InternalServerError as AnthropicServerError,
    )
    RETRYABLE_EXCEPTIONS = RETRYABLE_EXCEPTIONS + (
        AnthropicConnectionError,
        AnthropicTimeoutError,
        AnthropicRateLimitError,
        AnthropicServerError,
    )
except ImportError:
    pass

try:
    import httpx
    RETRYABLE_EXCEPTIONS = RETRYABLE_EXCEPTIONS + (
        httpx.ConnectError,
        httpx.ReadTimeout,
        httpx.ConnectTimeout,
    )
except ImportError:
    pass


# ============================================================================
# 重试配置
# ============================================================================

class RetryConfig:
    """重试配置"""
    MAX_ATTEMPTS: int = 3                    # 最大重试次数
    MIN_WAIT_SECONDS: float = 1.0           # 最小等待时间
    MAX_WAIT_SECONDS: float = 30.0          # 最大等待时间
    EXPONENTIAL_MULTIPLIER: float = 2.0     # 指数退避乘数


# ============================================================================
# 重试装饰器
# ============================================================================

def create_retry_decorator(
    max_attempts: int = RetryConfig.MAX_ATTEMPTS,
    min_wait: float = RetryConfig.MIN_WAIT_SECONDS,
    max_wait: float = RetryConfig.MAX_WAIT_SECONDS,
):
    """
    创建 LLM 调用重试装饰器
    
    Args:
        max_attempts: 最大重试次数
        min_wait: 最小等待时间 (秒)
        max_wait: 最大等待时间 (秒)
        
    Returns:
        tenacity retry 装饰器
    """
    return retry(
        # 重试条件: 仅对可重试异常进行重试
        retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS),
        # 停止条件: 达到最大重试次数
        stop=stop_after_attempt(max_attempts),
        # 等待策略: 指数退避
        wait=wait_exponential(
            multiplier=RetryConfig.EXPONENTIAL_MULTIPLIER,
            min=min_wait,
            max=max_wait,
        ),
        # 日志: 重试前记录
        before_sleep=before_sleep_log(logger, logging.WARNING),
        # 日志: 重试后记录
        after=after_log(logger, logging.DEBUG),
        # 重新抛出最后一个异常
        reraise=True,
    )


# 默认的重试装饰器实例
llm_retry = create_retry_decorator()


def with_retry(func: Callable) -> Callable:
    """
    为异步函数添加重试能力的装饰器
    
    Usage:
        @with_retry
        async def call_llm(...):
            ...
    """
    @wraps(func)
    async def wrapper(*args, **kwargs):
        @llm_retry
        async def _inner():
            return await func(*args, **kwargs)
        return await _inner()
    return wrapper


# ============================================================================
# 便捷函数
# ============================================================================

async def retry_async(
    coro_func: Callable,
    *args,
    max_attempts: int = RetryConfig.MAX_ATTEMPTS,
    **kwargs
) -> Any:
    """
    带重试的异步调用
    
    Usage:
        result = await retry_async(
            client.chat.completions.create,
            model="gpt-4",
            messages=[...]
        )
    """
    decorator = create_retry_decorator(max_attempts=max_attempts)
    
    @decorator
    async def _call():
        return await coro_func(*args, **kwargs)
    
    return await _call()


def is_retryable_error(error: Exception) -> bool:
    """判断异常是否可重试"""
    return isinstance(error, RETRYABLE_EXCEPTIONS)


def log_retry_info(attempt: int, max_attempts: int, error: Exception, wait_time: float):
    """记录重试信息的辅助函数"""
    logger.warning(
        f"🔄 LLM 调用失败 (尝试 {attempt}/{max_attempts}): {type(error).__name__}: {error}. "
        f"等待 {wait_time:.1f}s 后重试..."
    )