Spaces:
Paused
Paused
| import asyncio | |
| import functools | |
| from typing import Type, Tuple, Optional, TypeVar, Callable, Any | |
| from ..utils.logger import logger | |
| T = TypeVar('T') | |
| def with_retry( | |
| max_retries: int = 3, | |
| delay: float = 1.0, | |
| backoff_factor: float = 2.0, | |
| exceptions: Tuple[Type[Exception], ...] = (Exception,) | |
| ) -> Callable: | |
| """ | |
| Decorator that implements retry logic with exponential backoff | |
| Args: | |
| max_retries (int): Maximum number of retry attempts | |
| delay (float): Initial delay between retries in seconds | |
| backoff_factor (float): Multiplier for delay after each retry | |
| exceptions (tuple): Tuple of exceptions to catch and retry on | |
| """ | |
| def decorator(func: Callable[..., Any]) -> Callable[..., Any]: | |
| async def wrapper(*args: Any, **kwargs: Any) -> Any: | |
| last_exception = None | |
| current_delay = delay | |
| # Initial attempt plus retries | |
| for attempt in range(max_retries + 1): | |
| try: | |
| return await func(*args, **kwargs) | |
| except exceptions as e: | |
| last_exception = e | |
| # Don't sleep on the last attempt | |
| if attempt < max_retries: | |
| logger.warning( | |
| f"Attempt {attempt + 1} failed for {func.__name__}: {str(e)}. " | |
| f"Retrying in {current_delay} seconds..." | |
| ) | |
| await asyncio.sleep(current_delay) | |
| current_delay *= backoff_factor | |
| else: | |
| logger.error( | |
| f"All {max_retries + 1} attempts failed for {func.__name__}: {str(e)}" | |
| ) | |
| raise last_exception | |
| return wrapper | |
| return decorator | |
| def retry_with_backoff( | |
| max_retries: int = 3, | |
| initial_delay: float = 1.0, | |
| max_delay: float = 30.0, | |
| backoff_factor: float = 2.0, | |
| exceptions: Optional[Tuple[Type[Exception], ...]] = None | |
| ) -> Callable: | |
| """ | |
| More advanced retry decorator with capped exponential backoff and jitter | |
| Args: | |
| max_retries (int): Maximum number of retry attempts | |
| initial_delay (float): Initial delay between retries in seconds | |
| max_delay (float): Maximum delay between retries in seconds | |
| backoff_factor (float): Multiplier for delay after each retry | |
| exceptions (tuple): Tuple of exceptions to catch and retry on | |
| """ | |
| if exceptions is None: | |
| exceptions = (Exception,) | |
| def decorator(func: Callable[..., Any]) -> Callable[..., Any]: | |
| async def wrapper(*args: Any, **kwargs: Any) -> Any: | |
| retry_count = 0 | |
| current_delay = initial_delay | |
| operation_name = func.__name__ | |
| while True: | |
| try: | |
| return await func(*args, **kwargs) | |
| except exceptions as e: | |
| retry_count += 1 | |
| if retry_count > max_retries: | |
| logger.error( | |
| f"Operation {operation_name} failed after {max_retries} retries: {str(e)}" | |
| ) | |
| raise | |
| # Add jitter to prevent thundering herd | |
| jitter = (asyncio.get_event_loop().time() * 1000) % 1.0 | |
| sleep_time = min(current_delay + jitter, max_delay) | |
| logger.warning( | |
| f"Operation {operation_name} failed (attempt {retry_count}/{max_retries}): " | |
| f"{str(e)}. Retrying in {sleep_time:.2f} seconds..." | |
| ) | |
| await asyncio.sleep(sleep_time) | |
| current_delay = min(current_delay * backoff_factor, max_delay) | |
| return wrapper | |
| return decorator | |
| def circuit_breaker( | |
| failure_threshold: int = 5, | |
| reset_timeout: float = 60.0 | |
| ) -> Callable: | |
| """ | |
| Circuit breaker decorator to prevent repeated calls to failing services | |
| Args: | |
| failure_threshold (int): Number of failures before opening circuit | |
| reset_timeout (float): Time in seconds before attempting to close circuit | |
| """ | |
| def decorator(func: Callable[..., Any]) -> Callable[..., Any]: | |
| # State for the circuit breaker | |
| state = { | |
| 'failures': 0, | |
| 'last_failure_time': 0, | |
| 'is_open': False | |
| } | |
| async def wrapper(*args: Any, **kwargs: Any) -> Any: | |
| current_time = asyncio.get_event_loop().time() | |
| # Check if circuit is open | |
| if state['is_open']: | |
| if current_time - state['last_failure_time'] > reset_timeout: | |
| # Try to close the circuit | |
| state['is_open'] = False | |
| state['failures'] = 0 | |
| else: | |
| raise Exception( | |
| f"Circuit breaker is open for {func.__name__}. " | |
| f"Try again in {reset_timeout - (current_time - state['last_failure_time']):.1f} seconds" | |
| ) | |
| try: | |
| result = await func(*args, **kwargs) | |
| # Success - reset failure count | |
| state['failures'] = 0 | |
| return result | |
| except Exception as e: | |
| # Record failure | |
| state['failures'] += 1 | |
| state['last_failure_time'] = current_time | |
| # Check if we need to open the circuit | |
| if state['failures'] >= failure_threshold: | |
| state['is_open'] = True | |
| logger.error( | |
| f"Circuit breaker opened for {func.__name__} after {failure_threshold} failures" | |
| ) | |
| raise | |
| return wrapper | |
| return decorator |