import time from collections import defaultdict, deque from typing import Dict import threading class RateLimiter: def __init__(self, max_requests: int = 100, window_size: int = 60): """ Initialize rate limiter. Args: max_requests: Maximum number of requests allowed per window window_size: Time window in seconds """ self.max_requests = max_requests self.window_size = window_size self.requests = defaultdict(lambda: deque()) self._lock = threading.RLock() def is_allowed(self, key: str) -> bool: """ Check if a request is allowed for the given key. Args: key: Identifier for the client/user Returns: True if request is allowed, False otherwise """ with self._lock: current_time = time.time() requests = self.requests[key] # Remove old requests outside the time window while requests and current_time - requests[0] > self.window_size: requests.popleft() # Check if we're under the limit if len(requests) < self.max_requests: requests.append(current_time) return True else: return False def get_reset_time(self, key: str) -> float: """ Get the time when the rate limit will reset for the given key. Args: key: Identifier for the client/user Returns: Unix timestamp when the rate limit will reset """ with self._lock: if key in self.requests and len(self.requests[key]) > 0: oldest_request = self.requests[key][0] return oldest_request + self.window_size return time.time() # Global rate limiter instance event_publisher_rate_limiter = RateLimiter(max_requests=1000, window_size=60) # 1000 events per minute