import asyncio import functools from typing import Type, Tuple, Optional, TypeVar, Callable, Any from ..utils.logger import logger T = TypeVar('T') def with_retry( max_retries: int = 3, delay: float = 1.0, backoff_factor: float = 2.0, exceptions: Tuple[Type[Exception], ...] = (Exception,) ) -> Callable: """ Decorator that implements retry logic with exponential backoff Args: max_retries (int): Maximum number of retry attempts delay (float): Initial delay between retries in seconds backoff_factor (float): Multiplier for delay after each retry exceptions (tuple): Tuple of exceptions to catch and retry on """ def decorator(func: Callable[..., Any]) -> Callable[..., Any]: @functools.wraps(func) async def wrapper(*args: Any, **kwargs: Any) -> Any: last_exception = None current_delay = delay # Initial attempt plus retries for attempt in range(max_retries + 1): try: return await func(*args, **kwargs) except exceptions as e: last_exception = e # Don't sleep on the last attempt if attempt < max_retries: logger.warning( f"Attempt {attempt + 1} failed for {func.__name__}: {str(e)}. " f"Retrying in {current_delay} seconds..." ) await asyncio.sleep(current_delay) current_delay *= backoff_factor else: logger.error( f"All {max_retries + 1} attempts failed for {func.__name__}: {str(e)}" ) raise last_exception return wrapper return decorator def retry_with_backoff( max_retries: int = 3, initial_delay: float = 1.0, max_delay: float = 30.0, backoff_factor: float = 2.0, exceptions: Optional[Tuple[Type[Exception], ...]] = None ) -> Callable: """ More advanced retry decorator with capped exponential backoff and jitter Args: max_retries (int): Maximum number of retry attempts initial_delay (float): Initial delay between retries in seconds max_delay (float): Maximum delay between retries in seconds backoff_factor (float): Multiplier for delay after each retry exceptions (tuple): Tuple of exceptions to catch and retry on """ if exceptions is None: exceptions = (Exception,) def decorator(func: Callable[..., Any]) -> Callable[..., Any]: @functools.wraps(func) async def wrapper(*args: Any, **kwargs: Any) -> Any: retry_count = 0 current_delay = initial_delay operation_name = func.__name__ while True: try: return await func(*args, **kwargs) except exceptions as e: retry_count += 1 if retry_count > max_retries: logger.error( f"Operation {operation_name} failed after {max_retries} retries: {str(e)}" ) raise # Add jitter to prevent thundering herd jitter = (asyncio.get_event_loop().time() * 1000) % 1.0 sleep_time = min(current_delay + jitter, max_delay) logger.warning( f"Operation {operation_name} failed (attempt {retry_count}/{max_retries}): " f"{str(e)}. Retrying in {sleep_time:.2f} seconds..." ) await asyncio.sleep(sleep_time) current_delay = min(current_delay * backoff_factor, max_delay) return wrapper return decorator def circuit_breaker( failure_threshold: int = 5, reset_timeout: float = 60.0 ) -> Callable: """ Circuit breaker decorator to prevent repeated calls to failing services Args: failure_threshold (int): Number of failures before opening circuit reset_timeout (float): Time in seconds before attempting to close circuit """ def decorator(func: Callable[..., Any]) -> Callable[..., Any]: # State for the circuit breaker state = { 'failures': 0, 'last_failure_time': 0, 'is_open': False } @functools.wraps(func) async def wrapper(*args: Any, **kwargs: Any) -> Any: current_time = asyncio.get_event_loop().time() # Check if circuit is open if state['is_open']: if current_time - state['last_failure_time'] > reset_timeout: # Try to close the circuit state['is_open'] = False state['failures'] = 0 else: raise Exception( f"Circuit breaker is open for {func.__name__}. " f"Try again in {reset_timeout - (current_time - state['last_failure_time']):.1f} seconds" ) try: result = await func(*args, **kwargs) # Success - reset failure count state['failures'] = 0 return result except Exception as e: # Record failure state['failures'] += 1 state['last_failure_time'] = current_time # Check if we need to open the circuit if state['failures'] >= failure_threshold: state['is_open'] = True logger.error( f"Circuit breaker opened for {func.__name__} after {failure_threshold} failures" ) raise return wrapper return decorator