kiroproxy / kiro_proxy /core /retry.py
KiroProxy User
chore: repo cleanup and maintenance
0edbd7b
"""请求重试机制"""
import asyncio
from typing import Callable, Any, Optional, Set
from functools import wraps
# 可重试的状态码
RETRYABLE_STATUS_CODES: Set[int] = {
408, # Request Timeout
500, # Internal Server Error
502, # Bad Gateway
503, # Service Unavailable
504, # Gateway Timeout
}
# 不可重试的状态码(直接返回错误)
NON_RETRYABLE_STATUS_CODES: Set[int] = {
400, # Bad Request
401, # Unauthorized
403, # Forbidden
404, # Not Found
422, # Unprocessable Entity
}
def is_retryable_error(status_code: Optional[int], error: Optional[Exception] = None) -> bool:
"""判断是否为可重试的错误"""
# 网络错误可重试
if error:
error_name = type(error).__name__.lower()
if any(kw in error_name for kw in ['timeout', 'connect', 'network', 'reset']):
return True
# 特定状态码可重试
if status_code and status_code in RETRYABLE_STATUS_CODES:
return True
return False
def is_non_retryable_error(status_code: Optional[int]) -> bool:
"""判断是否为不可重试的错误"""
return status_code in NON_RETRYABLE_STATUS_CODES if status_code else False
async def retry_async(
func: Callable,
max_retries: int = 2,
base_delay: float = 0.5,
max_delay: float = 5.0,
on_retry: Optional[Callable[[int, Exception], None]] = None
) -> Any:
"""
异步重试装饰器
Args:
func: 要执行的异步函数
max_retries: 最大重试次数
base_delay: 基础延迟(秒)
max_delay: 最大延迟(秒)
on_retry: 重试时的回调函数
"""
last_error = None
for attempt in range(max_retries + 1):
try:
return await func()
except Exception as e:
last_error = e
# 检查是否可重试
status_code = getattr(e, 'status_code', None)
if is_non_retryable_error(status_code):
raise
if attempt < max_retries and is_retryable_error(status_code, e):
# 指数退避
delay = min(base_delay * (2 ** attempt), max_delay)
if on_retry:
on_retry(attempt + 1, e)
else:
print(f"[Retry] 第 {attempt + 1} 次重试,延迟 {delay:.1f}s,错误: {type(e).__name__}")
await asyncio.sleep(delay)
else:
raise
raise last_error
class RetryableRequest:
"""可重试的请求上下文"""
def __init__(self, max_retries: int = 2, base_delay: float = 0.5):
self.max_retries = max_retries
self.base_delay = base_delay
self.attempt = 0
self.last_error = None
def should_retry(self, status_code: Optional[int] = None, error: Optional[Exception] = None) -> bool:
"""判断是否应该重试"""
self.attempt += 1
self.last_error = error
if self.attempt > self.max_retries:
return False
if is_non_retryable_error(status_code):
return False
return is_retryable_error(status_code, error)
async def wait(self):
"""等待重试延迟"""
delay = min(self.base_delay * (2 ** (self.attempt - 1)), 5.0)
print(f"[Retry] 第 {self.attempt} 次重试,延迟 {delay:.1f}s")
await asyncio.sleep(delay)