Spaces:
Sleeping
Sleeping
| import asyncio | |
| import time | |
| import logging | |
| from typing import Callable, Any, TypeVar | |
| T = TypeVar("T") | |
| logger = logging.getLogger("Resilience") | |
| class CircuitBreakerOpen(Exception): | |
| pass | |
| class RateLimiter: | |
| """ | |
| Token bucket rate limiter for controlling request frequency. | |
| Uses token bucket algorithm to allow bursts up to bucket size | |
| while maintaining average rate over time. | |
| """ | |
| def __init__(self, requests_per_minute: int = 60): | |
| """ | |
| Initialize rate limiter. | |
| Args: | |
| requests_per_minute: Maximum requests allowed per minute | |
| """ | |
| self.rate = requests_per_minute / 60.0 # Tokens added per second | |
| self.bucket_size = requests_per_minute # Max capacity | |
| self.tokens = float(self.bucket_size) # Current tokens | |
| self.last_update = time.time() | |
| self._lock = asyncio.Lock() | |
| async def acquire(self, tokens: int = 1) -> None: | |
| """ | |
| Acquire tokens, waiting if necessary. | |
| Args: | |
| tokens: Number of tokens to acquire (default: 1) | |
| """ | |
| async with self._lock: | |
| # Refill tokens based on elapsed time | |
| now = time.time() | |
| elapsed = now - self.last_update | |
| self.tokens = min( | |
| self.bucket_size, | |
| self.tokens + elapsed * self.rate | |
| ) | |
| self.last_update = now | |
| # If not enough tokens, wait | |
| if self.tokens < tokens: | |
| sleep_time = (tokens - self.tokens) / self.rate | |
| logger.debug(f"Rate limit: waiting {sleep_time:.2f}s for {tokens} token(s)") | |
| await asyncio.sleep(sleep_time) | |
| self.tokens = 0 | |
| self.last_update = time.time() | |
| else: | |
| self.tokens -= tokens | |
| def reset(self) -> None: | |
| """Reset the rate limiter to full capacity.""" | |
| with asyncio.Lock(): | |
| self.tokens = float(self.bucket_size) | |
| self.last_update = time.time() | |
| class ResilienceManager: | |
| """ | |
| Implements Circuit Breaker, Exponential Backoff, and Rate Limiting patterns. | |
| """ | |
| def __init__( | |
| self, | |
| failure_threshold: int = 5, | |
| recovery_timeout: int = 300, | |
| requests_per_minute: int = 60, | |
| enable_rate_limiting: bool = True | |
| ): | |
| """ | |
| Initialize resilience manager. | |
| Args: | |
| failure_threshold: Number of failures before circuit opens | |
| recovery_timeout: Seconds to wait before trying again | |
| requests_per_minute: Max requests per minute (for rate limiting) | |
| enable_rate_limiting: Whether to enable rate limiting | |
| """ | |
| # Circuit breaker | |
| self.failure_count = 0 | |
| self.failure_threshold = failure_threshold | |
| self.recovery_timeout = recovery_timeout | |
| self.last_failure_time = 0 | |
| self.state = "CLOSED" # CLOSED, OPEN, HALF-OPEN | |
| # Rate limiting | |
| self.enable_rate_limiting = enable_rate_limiting | |
| self.rate_limiter = RateLimiter(requests_per_minute) if enable_rate_limiting else None | |
| def _check_circuit(self): | |
| if self.state == "OPEN": | |
| if time.time() - self.last_failure_time > self.recovery_timeout: | |
| logger.info("Circuit Breaker: Cooling down period over. Switching to HALF-OPEN.") | |
| self.state = "HALF-OPEN" | |
| else: | |
| remaining = int(self.recovery_timeout - (time.time() - self.last_failure_time)) | |
| raise CircuitBreakerOpen(f"Circuit Breaker is OPEN. Cooling down for {remaining}s") | |
| def record_success(self): | |
| if self.state == "HALF-OPEN": | |
| logger.info("Circuit Breaker: Request successful. Closing circuit.") | |
| self.state = "CLOSED" | |
| self.failure_count = 0 | |
| elif self.state == "CLOSED": | |
| self.failure_count = 0 | |
| def record_failure(self): | |
| self.failure_count += 1 | |
| self.last_failure_time = time.time() | |
| logger.warning(f"Circuit Breaker: Failure recorded ({self.failure_count}/{self.failure_threshold})") | |
| if self.failure_count >= self.failure_threshold: | |
| logger.error("Circuit Breaker: Threshold reached. Opening circuit.") | |
| self.state = "OPEN" | |
| async def execute_with_retry(self, func: Callable[..., Any], *args, **kwargs) -> Any: | |
| """ | |
| Executes a function with exponential backoff, circuit breaker, and rate limiting. | |
| Args: | |
| func: Async function to execute | |
| *args: Positional arguments for func | |
| **kwargs: Keyword arguments for func | |
| Returns: | |
| Result from func | |
| Raises: | |
| CircuitBreakerOpen: If circuit breaker is open | |
| Exception: If all retries fail | |
| """ | |
| self._check_circuit() | |
| # Apply rate limiting before attempts | |
| if self.enable_rate_limiting and self.rate_limiter: | |
| await self.rate_limiter.acquire() | |
| retries = 3 | |
| delay = 1 | |
| for attempt in range(retries): | |
| try: | |
| result = await func(*args, **kwargs) | |
| self.record_success() | |
| return result | |
| except Exception as e: | |
| logger.warning(f"Attempt {attempt + 1}/{retries} failed: {e}") | |
| if attempt == retries - 1: | |
| self.record_failure() | |
| raise e | |
| await asyncio.sleep(delay) | |
| delay *= 2 # Exponential Backoff | |