Spaces:
Running
Running
File size: 2,164 Bytes
0157ac7 aa9c0b0 0157ac7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | """Shared strict sliding-window rate limiting primitives."""
from __future__ import annotations
import asyncio
import time
from collections import deque
class StrictSlidingWindowLimiter:
"""Strict sliding window limiter.
Guarantees: at most ``rate_limit`` acquisitions in any interval of length
``rate_window`` (seconds).
Implemented as an async context manager so call sites can do::
async with limiter:
...
"""
def __init__(self, rate_limit: int, rate_window: float) -> None:
if rate_limit <= 0:
raise ValueError("rate_limit must be > 0")
if rate_window <= 0:
raise ValueError("rate_window must be > 0")
self._rate_limit = int(rate_limit)
self._rate_window = float(rate_window)
self._times: deque[float] = deque()
self._lock = asyncio.Lock()
async def acquire(self) -> None:
while True:
now = time.monotonic()
cutoff = now - self._rate_window
# Fast path: try without lock (common case - room in window)
while self._times and self._times[0] <= cutoff:
self._times.popleft()
if len(self._times) < self._rate_limit:
self._times.append(now)
return
# Slow path: need to wait for a slot, use lock for atomicity
async with self._lock:
now = time.monotonic()
cutoff = now - self._rate_window
while self._times and self._times[0] <= cutoff:
self._times.popleft()
if len(self._times) < self._rate_limit:
self._times.append(now)
return
oldest = self._times[0]
wait_time = max(0.0, (oldest + self._rate_window) - now)
if wait_time > 0:
await asyncio.sleep(wait_time)
else:
await asyncio.sleep(0)
async def __aenter__(self) -> StrictSlidingWindowLimiter:
await self.acquire()
return self
async def __aexit__(self, exc_type, exc, tb) -> bool:
return False
|