linkedin_profile_scoring / app /utils /rate_limiter.py
Kaito117's picture
init project code
57fda34
import asyncio
import time
from collections import deque
class RateLimiter:
"""Token bucket rate limiter"""
def __init__(self, max_requests: int = 60, time_window: int = 60):
self.max_requests = max_requests
self.time_window = time_window
self.requests = deque()
self._lock = asyncio.Lock()
async def acquire(self):
"""Acquire permission to make a request"""
async with self._lock:
now = time.time()
# Remove old requests outside time window
while self.requests and self.requests[0] <= now - self.time_window:
self.requests.popleft()
# Check if we can make a request
if len(self.requests) < self.max_requests:
self.requests.append(now)
return
# Calculate wait time
oldest_request = self.requests[0]
wait_time = oldest_request + self.time_window - now
if wait_time > 0:
await asyncio.sleep(wait_time)
# Retry after waiting
await self.acquire()
else:
self.requests.append(now)