Spaces:
Sleeping
Sleeping
| """Retry utilities with exponential backoff for handling transient failures.""" | |
| import logging | |
| import random | |
| import time | |
| from functools import wraps | |
| from typing import Any, Callable, Optional, Tuple, Type | |
| from utils.errors import RateLimitError | |
| logger = logging.getLogger(__name__) | |
| def exponential_backoff( | |
| max_retries: int = 3, | |
| base_delay: float = 1.0, | |
| max_delay: float = 60.0, | |
| exponential_base: float = 2.0, | |
| jitter: bool = True, | |
| retry_on: Tuple[Type[Exception], ...] = (Exception,), | |
| skip_on: Tuple[Type[Exception], ...] = (), | |
| ) -> Callable: | |
| """ | |
| Decorator for retrying a function with exponential backoff. | |
| Args: | |
| max_retries: Maximum number of retry attempts (default: 3) | |
| base_delay: Initial delay in seconds (default: 1.0) | |
| max_delay: Maximum delay in seconds (default: 60.0) | |
| exponential_base: Base for exponential calculation (default: 2.0) | |
| jitter: Add random jitter to prevent thundering herd (default: True) | |
| retry_on: Tuple of exceptions to retry on (default: all exceptions) | |
| skip_on: Tuple of exceptions to NOT retry (default: none) | |
| Returns: | |
| Decorated function with retry logic | |
| Example: | |
| @exponential_backoff(max_retries=5, base_delay=2.0) | |
| def fetch_data(ticker: str) -> pd.DataFrame: | |
| return api.get_data(ticker) | |
| # Will retry up to 5 times with delays: 2s, 4s, 8s, 16s, 32s | |
| """ | |
| def decorator(func: Callable) -> Callable: | |
| def wrapper(*args, **kwargs) -> Any: | |
| last_exception = None | |
| for attempt in range(max_retries + 1): | |
| try: | |
| return func(*args, **kwargs) | |
| except skip_on as e: | |
| # Don't retry these exceptions | |
| logger.warning( | |
| f"{func.__name__}: Skipping retry for {type(e).__name__}: {str(e)}" | |
| ) | |
| raise | |
| except retry_on as e: | |
| last_exception = e | |
| # If this was the last attempt, raise the exception | |
| if attempt >= max_retries: | |
| logger.error( | |
| f"{func.__name__}: All {max_retries} retries exhausted. " | |
| f"Last error: {type(e).__name__}: {str(e)}" | |
| ) | |
| raise | |
| # Calculate delay with exponential backoff | |
| delay = min(base_delay * (exponential_base**attempt), max_delay) | |
| # Add jitter to prevent thundering herd | |
| if jitter: | |
| delay = delay * (0.5 + random.random()) | |
| logger.info( | |
| f"{func.__name__}: Attempt {attempt + 1}/{max_retries} failed " | |
| f"with {type(e).__name__}: {str(e)}. " | |
| f"Retrying in {delay:.2f}s..." | |
| ) | |
| time.sleep(delay) | |
| # Should not reach here, but raise the last exception if we do | |
| if last_exception: | |
| raise last_exception | |
| return wrapper | |
| return decorator | |
| def retry_with_backoff( | |
| func: Callable, | |
| *args, | |
| max_retries: int = 3, | |
| base_delay: float = 1.0, | |
| max_delay: float = 60.0, | |
| exponential_base: float = 2.0, | |
| jitter: bool = True, | |
| retry_on: Tuple[Type[Exception], ...] = (Exception,), | |
| skip_on: Tuple[Type[Exception], ...] = (), | |
| **kwargs, | |
| ) -> Any: | |
| """ | |
| Retry a function call with exponential backoff (non-decorator version). | |
| This is useful when you want to retry a function without decorating it. | |
| Args: | |
| func: Function to retry | |
| *args: Positional arguments for the function | |
| max_retries: Maximum number of retry attempts | |
| base_delay: Initial delay in seconds | |
| max_delay: Maximum delay in seconds | |
| exponential_base: Base for exponential calculation | |
| jitter: Add random jitter | |
| retry_on: Tuple of exceptions to retry on | |
| skip_on: Tuple of exceptions to NOT retry | |
| **kwargs: Keyword arguments for the function | |
| Returns: | |
| Result of the function call | |
| Example: | |
| result = retry_with_backoff( | |
| api.get_data, | |
| "AAPL", | |
| max_retries=5, | |
| base_delay=2.0, | |
| retry_on=(ConnectionError, TimeoutError) | |
| ) | |
| """ | |
| last_exception = None | |
| for attempt in range(max_retries + 1): | |
| try: | |
| return func(*args, **kwargs) | |
| except skip_on as e: | |
| logger.warning( | |
| f"{func.__name__}: Skipping retry for {type(e).__name__}: {str(e)}" | |
| ) | |
| raise | |
| except retry_on as e: | |
| last_exception = e | |
| if attempt >= max_retries: | |
| logger.error( | |
| f"{func.__name__}: All {max_retries} retries exhausted. " | |
| f"Last error: {type(e).__name__}: {str(e)}" | |
| ) | |
| raise | |
| delay = min(base_delay * (exponential_base**attempt), max_delay) | |
| if jitter: | |
| delay = delay * (0.5 + random.random()) | |
| logger.info( | |
| f"{func.__name__}: Attempt {attempt + 1}/{max_retries} failed " | |
| f"with {type(e).__name__}: {str(e)}. " | |
| f"Retrying in {delay:.2f}s..." | |
| ) | |
| time.sleep(delay) | |
| if last_exception: | |
| raise last_exception | |
| def retry_on_rate_limit( | |
| max_retries: int = 3, | |
| base_delay: float = 60.0, | |
| max_delay: float = 300.0, | |
| ) -> Callable: | |
| """ | |
| Specialized decorator for retrying on rate limit errors. | |
| This uses longer delays appropriate for API rate limits. | |
| Args: | |
| max_retries: Maximum number of retry attempts (default: 3) | |
| base_delay: Initial delay in seconds (default: 60s) | |
| max_delay: Maximum delay in seconds (default: 300s) | |
| Returns: | |
| Decorated function with rate limit retry logic | |
| Example: | |
| @retry_on_rate_limit(max_retries=5, base_delay=120.0) | |
| def fetch_data(ticker: str) -> pd.DataFrame: | |
| return api.get_data(ticker) | |
| """ | |
| return exponential_backoff( | |
| max_retries=max_retries, | |
| base_delay=base_delay, | |
| max_delay=max_delay, | |
| exponential_base=2.0, | |
| jitter=True, | |
| retry_on=(RateLimitError,), | |
| skip_on=(), # Will still raise other exceptions immediately | |
| ) | |
| class RetryStrategy: | |
| """ | |
| Configurable retry strategy for fine-grained control. | |
| Example: | |
| strategy = RetryStrategy( | |
| max_retries=5, | |
| base_delay=2.0, | |
| max_delay=30.0, | |
| retry_on=(ConnectionError, TimeoutError), | |
| skip_on=(ValueError, KeyError) | |
| ) | |
| result = strategy.execute(api.get_data, "AAPL") | |
| """ | |
| def __init__( | |
| self, | |
| max_retries: int = 3, | |
| base_delay: float = 1.0, | |
| max_delay: float = 60.0, | |
| exponential_base: float = 2.0, | |
| jitter: bool = True, | |
| retry_on: Tuple[Type[Exception], ...] = (Exception,), | |
| skip_on: Tuple[Type[Exception], ...] = (), | |
| ): | |
| """Initialize retry strategy with configuration.""" | |
| self.max_retries = max_retries | |
| self.base_delay = base_delay | |
| self.max_delay = max_delay | |
| self.exponential_base = exponential_base | |
| self.jitter = jitter | |
| self.retry_on = retry_on | |
| self.skip_on = skip_on | |
| def execute(self, func: Callable, *args, **kwargs) -> Any: | |
| """ | |
| Execute a function with this retry strategy. | |
| Args: | |
| func: Function to execute | |
| *args: Positional arguments | |
| **kwargs: Keyword arguments | |
| Returns: | |
| Result of the function call | |
| """ | |
| return retry_with_backoff( | |
| func, | |
| *args, | |
| max_retries=self.max_retries, | |
| base_delay=self.base_delay, | |
| max_delay=self.max_delay, | |
| exponential_base=self.exponential_base, | |
| jitter=self.jitter, | |
| retry_on=self.retry_on, | |
| skip_on=self.skip_on, | |
| **kwargs, | |
| ) | |
| def decorator(self) -> Callable: | |
| """ | |
| Get this strategy as a decorator. | |
| Returns: | |
| Decorator function | |
| Example: | |
| strategy = RetryStrategy(max_retries=5) | |
| @strategy.decorator() | |
| def fetch_data(ticker: str): | |
| return api.get_data(ticker) | |
| """ | |
| return exponential_backoff( | |
| max_retries=self.max_retries, | |
| base_delay=self.base_delay, | |
| max_delay=self.max_delay, | |
| exponential_base=self.exponential_base, | |
| jitter=self.jitter, | |
| retry_on=self.retry_on, | |
| skip_on=self.skip_on, | |
| ) | |
| # Predefined strategies for common use cases | |
| # Fast retry for transient network issues | |
| FAST_RETRY = RetryStrategy( | |
| max_retries=3, | |
| base_delay=0.5, | |
| max_delay=5.0, | |
| retry_on=(ConnectionError, TimeoutError), | |
| ) | |
| # Standard retry for API calls | |
| STANDARD_RETRY = RetryStrategy( | |
| max_retries=3, | |
| base_delay=1.0, | |
| max_delay=30.0, | |
| retry_on=(ConnectionError, TimeoutError, Exception), | |
| skip_on=(ValueError, KeyError, TypeError), # Don't retry programming errors | |
| ) | |
| # Aggressive retry for rate limits | |
| RATE_LIMIT_RETRY = RetryStrategy( | |
| max_retries=5, | |
| base_delay=60.0, | |
| max_delay=300.0, | |
| retry_on=(RateLimitError,), | |
| ) | |
| # Conservative retry for critical operations | |
| CONSERVATIVE_RETRY = RetryStrategy( | |
| max_retries=5, | |
| base_delay=2.0, | |
| max_delay=60.0, | |
| exponential_base=2.5, | |
| retry_on=(Exception,), | |
| skip_on=(ValueError, KeyError, TypeError), | |
| ) | |