Medium-MCP / src /resilience.py
Nikhil Pravin Pise
feat: comprehensive migration - merge Scraper + MCP Server
ae588db
import asyncio
import time
import logging
from typing import Callable, Any, TypeVar
T = TypeVar("T")
logger = logging.getLogger("Resilience")
class CircuitBreakerOpen(Exception):
pass
class RateLimiter:
"""
Token bucket rate limiter for controlling request frequency.
Uses token bucket algorithm to allow bursts up to bucket size
while maintaining average rate over time.
"""
def __init__(self, requests_per_minute: int = 60):
"""
Initialize rate limiter.
Args:
requests_per_minute: Maximum requests allowed per minute
"""
self.rate = requests_per_minute / 60.0 # Tokens added per second
self.bucket_size = requests_per_minute # Max capacity
self.tokens = float(self.bucket_size) # Current tokens
self.last_update = time.time()
self._lock = asyncio.Lock()
async def acquire(self, tokens: int = 1) -> None:
"""
Acquire tokens, waiting if necessary.
Args:
tokens: Number of tokens to acquire (default: 1)
"""
async with self._lock:
# Refill tokens based on elapsed time
now = time.time()
elapsed = now - self.last_update
self.tokens = min(
self.bucket_size,
self.tokens + elapsed * self.rate
)
self.last_update = now
# If not enough tokens, wait
if self.tokens < tokens:
sleep_time = (tokens - self.tokens) / self.rate
logger.debug(f"Rate limit: waiting {sleep_time:.2f}s for {tokens} token(s)")
await asyncio.sleep(sleep_time)
self.tokens = 0
self.last_update = time.time()
else:
self.tokens -= tokens
def reset(self) -> None:
"""Reset the rate limiter to full capacity."""
with asyncio.Lock():
self.tokens = float(self.bucket_size)
self.last_update = time.time()
class ResilienceManager:
"""
Implements Circuit Breaker, Exponential Backoff, and Rate Limiting patterns.
"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 300,
requests_per_minute: int = 60,
enable_rate_limiting: bool = True
):
"""
Initialize resilience manager.
Args:
failure_threshold: Number of failures before circuit opens
recovery_timeout: Seconds to wait before trying again
requests_per_minute: Max requests per minute (for rate limiting)
enable_rate_limiting: Whether to enable rate limiting
"""
# Circuit breaker
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.last_failure_time = 0
self.state = "CLOSED" # CLOSED, OPEN, HALF-OPEN
# Rate limiting
self.enable_rate_limiting = enable_rate_limiting
self.rate_limiter = RateLimiter(requests_per_minute) if enable_rate_limiting else None
def _check_circuit(self):
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.recovery_timeout:
logger.info("Circuit Breaker: Cooling down period over. Switching to HALF-OPEN.")
self.state = "HALF-OPEN"
else:
remaining = int(self.recovery_timeout - (time.time() - self.last_failure_time))
raise CircuitBreakerOpen(f"Circuit Breaker is OPEN. Cooling down for {remaining}s")
def record_success(self):
if self.state == "HALF-OPEN":
logger.info("Circuit Breaker: Request successful. Closing circuit.")
self.state = "CLOSED"
self.failure_count = 0
elif self.state == "CLOSED":
self.failure_count = 0
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
logger.warning(f"Circuit Breaker: Failure recorded ({self.failure_count}/{self.failure_threshold})")
if self.failure_count >= self.failure_threshold:
logger.error("Circuit Breaker: Threshold reached. Opening circuit.")
self.state = "OPEN"
async def execute_with_retry(self, func: Callable[..., Any], *args, **kwargs) -> Any:
"""
Executes a function with exponential backoff, circuit breaker, and rate limiting.
Args:
func: Async function to execute
*args: Positional arguments for func
**kwargs: Keyword arguments for func
Returns:
Result from func
Raises:
CircuitBreakerOpen: If circuit breaker is open
Exception: If all retries fail
"""
self._check_circuit()
# Apply rate limiting before attempts
if self.enable_rate_limiting and self.rate_limiter:
await self.rate_limiter.acquire()
retries = 3
delay = 1
for attempt in range(retries):
try:
result = await func(*args, **kwargs)
self.record_success()
return result
except Exception as e:
logger.warning(f"Attempt {attempt + 1}/{retries} failed: {e}")
if attempt == retries - 1:
self.record_failure()
raise e
await asyncio.sleep(delay)
delay *= 2 # Exponential Backoff