Admin-Desk2 / app /utils /retry.py
Fred808's picture
Upload 82 files
20adca1 verified
import asyncio
import functools
from typing import Type, Tuple, Optional, TypeVar, Callable, Any
from ..utils.logger import logger
T = TypeVar('T')
def with_retry(
max_retries: int = 3,
delay: float = 1.0,
backoff_factor: float = 2.0,
exceptions: Tuple[Type[Exception], ...] = (Exception,)
) -> Callable:
"""
Decorator that implements retry logic with exponential backoff
Args:
max_retries (int): Maximum number of retry attempts
delay (float): Initial delay between retries in seconds
backoff_factor (float): Multiplier for delay after each retry
exceptions (tuple): Tuple of exceptions to catch and retry on
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
last_exception = None
current_delay = delay
# Initial attempt plus retries
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exception = e
# Don't sleep on the last attempt
if attempt < max_retries:
logger.warning(
f"Attempt {attempt + 1} failed for {func.__name__}: {str(e)}. "
f"Retrying in {current_delay} seconds..."
)
await asyncio.sleep(current_delay)
current_delay *= backoff_factor
else:
logger.error(
f"All {max_retries + 1} attempts failed for {func.__name__}: {str(e)}"
)
raise last_exception
return wrapper
return decorator
def retry_with_backoff(
max_retries: int = 3,
initial_delay: float = 1.0,
max_delay: float = 30.0,
backoff_factor: float = 2.0,
exceptions: Optional[Tuple[Type[Exception], ...]] = None
) -> Callable:
"""
More advanced retry decorator with capped exponential backoff and jitter
Args:
max_retries (int): Maximum number of retry attempts
initial_delay (float): Initial delay between retries in seconds
max_delay (float): Maximum delay between retries in seconds
backoff_factor (float): Multiplier for delay after each retry
exceptions (tuple): Tuple of exceptions to catch and retry on
"""
if exceptions is None:
exceptions = (Exception,)
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
retry_count = 0
current_delay = initial_delay
operation_name = func.__name__
while True:
try:
return await func(*args, **kwargs)
except exceptions as e:
retry_count += 1
if retry_count > max_retries:
logger.error(
f"Operation {operation_name} failed after {max_retries} retries: {str(e)}"
)
raise
# Add jitter to prevent thundering herd
jitter = (asyncio.get_event_loop().time() * 1000) % 1.0
sleep_time = min(current_delay + jitter, max_delay)
logger.warning(
f"Operation {operation_name} failed (attempt {retry_count}/{max_retries}): "
f"{str(e)}. Retrying in {sleep_time:.2f} seconds..."
)
await asyncio.sleep(sleep_time)
current_delay = min(current_delay * backoff_factor, max_delay)
return wrapper
return decorator
def circuit_breaker(
failure_threshold: int = 5,
reset_timeout: float = 60.0
) -> Callable:
"""
Circuit breaker decorator to prevent repeated calls to failing services
Args:
failure_threshold (int): Number of failures before opening circuit
reset_timeout (float): Time in seconds before attempting to close circuit
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
# State for the circuit breaker
state = {
'failures': 0,
'last_failure_time': 0,
'is_open': False
}
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
current_time = asyncio.get_event_loop().time()
# Check if circuit is open
if state['is_open']:
if current_time - state['last_failure_time'] > reset_timeout:
# Try to close the circuit
state['is_open'] = False
state['failures'] = 0
else:
raise Exception(
f"Circuit breaker is open for {func.__name__}. "
f"Try again in {reset_timeout - (current_time - state['last_failure_time']):.1f} seconds"
)
try:
result = await func(*args, **kwargs)
# Success - reset failure count
state['failures'] = 0
return result
except Exception as e:
# Record failure
state['failures'] += 1
state['last_failure_time'] = current_time
# Check if we need to open the circuit
if state['failures'] >= failure_threshold:
state['is_open'] = True
logger.error(
f"Circuit breaker opened for {func.__name__} after {failure_threshold} failures"
)
raise
return wrapper
return decorator