Spaces:
Paused
Paused
File size: 6,150 Bytes
20adca1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | import asyncio
import functools
from typing import Type, Tuple, Optional, TypeVar, Callable, Any
from ..utils.logger import logger
T = TypeVar('T')
def with_retry(
max_retries: int = 3,
delay: float = 1.0,
backoff_factor: float = 2.0,
exceptions: Tuple[Type[Exception], ...] = (Exception,)
) -> Callable:
"""
Decorator that implements retry logic with exponential backoff
Args:
max_retries (int): Maximum number of retry attempts
delay (float): Initial delay between retries in seconds
backoff_factor (float): Multiplier for delay after each retry
exceptions (tuple): Tuple of exceptions to catch and retry on
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
last_exception = None
current_delay = delay
# Initial attempt plus retries
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exception = e
# Don't sleep on the last attempt
if attempt < max_retries:
logger.warning(
f"Attempt {attempt + 1} failed for {func.__name__}: {str(e)}. "
f"Retrying in {current_delay} seconds..."
)
await asyncio.sleep(current_delay)
current_delay *= backoff_factor
else:
logger.error(
f"All {max_retries + 1} attempts failed for {func.__name__}: {str(e)}"
)
raise last_exception
return wrapper
return decorator
def retry_with_backoff(
max_retries: int = 3,
initial_delay: float = 1.0,
max_delay: float = 30.0,
backoff_factor: float = 2.0,
exceptions: Optional[Tuple[Type[Exception], ...]] = None
) -> Callable:
"""
More advanced retry decorator with capped exponential backoff and jitter
Args:
max_retries (int): Maximum number of retry attempts
initial_delay (float): Initial delay between retries in seconds
max_delay (float): Maximum delay between retries in seconds
backoff_factor (float): Multiplier for delay after each retry
exceptions (tuple): Tuple of exceptions to catch and retry on
"""
if exceptions is None:
exceptions = (Exception,)
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
retry_count = 0
current_delay = initial_delay
operation_name = func.__name__
while True:
try:
return await func(*args, **kwargs)
except exceptions as e:
retry_count += 1
if retry_count > max_retries:
logger.error(
f"Operation {operation_name} failed after {max_retries} retries: {str(e)}"
)
raise
# Add jitter to prevent thundering herd
jitter = (asyncio.get_event_loop().time() * 1000) % 1.0
sleep_time = min(current_delay + jitter, max_delay)
logger.warning(
f"Operation {operation_name} failed (attempt {retry_count}/{max_retries}): "
f"{str(e)}. Retrying in {sleep_time:.2f} seconds..."
)
await asyncio.sleep(sleep_time)
current_delay = min(current_delay * backoff_factor, max_delay)
return wrapper
return decorator
def circuit_breaker(
failure_threshold: int = 5,
reset_timeout: float = 60.0
) -> Callable:
"""
Circuit breaker decorator to prevent repeated calls to failing services
Args:
failure_threshold (int): Number of failures before opening circuit
reset_timeout (float): Time in seconds before attempting to close circuit
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
# State for the circuit breaker
state = {
'failures': 0,
'last_failure_time': 0,
'is_open': False
}
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
current_time = asyncio.get_event_loop().time()
# Check if circuit is open
if state['is_open']:
if current_time - state['last_failure_time'] > reset_timeout:
# Try to close the circuit
state['is_open'] = False
state['failures'] = 0
else:
raise Exception(
f"Circuit breaker is open for {func.__name__}. "
f"Try again in {reset_timeout - (current_time - state['last_failure_time']):.1f} seconds"
)
try:
result = await func(*args, **kwargs)
# Success - reset failure count
state['failures'] = 0
return result
except Exception as e:
# Record failure
state['failures'] += 1
state['last_failure_time'] = current_time
# Check if we need to open the circuit
if state['failures'] >= failure_threshold:
state['is_open'] = True
logger.error(
f"Circuit breaker opened for {func.__name__} after {failure_threshold} failures"
)
raise
return wrapper
return decorator |