"""Shared strict sliding-window rate limiting primitives.""" from __future__ import annotations import asyncio import time from collections import deque class StrictSlidingWindowLimiter: """Strict sliding window limiter. Guarantees: at most ``rate_limit`` acquisitions in any interval of length ``rate_window`` (seconds). Implemented as an async context manager so call sites can do:: async with limiter: ... """ def __init__(self, rate_limit: int, rate_window: float) -> None: if rate_limit <= 0: raise ValueError("rate_limit must be > 0") if rate_window <= 0: raise ValueError("rate_window must be > 0") self._rate_limit = int(rate_limit) self._rate_window = float(rate_window) self._times: deque[float] = deque() self._lock = asyncio.Lock() async def acquire(self) -> None: while True: now = time.monotonic() cutoff = now - self._rate_window # Fast path: try without lock (common case - room in window) while self._times and self._times[0] <= cutoff: self._times.popleft() if len(self._times) < self._rate_limit: self._times.append(now) return # Slow path: need to wait for a slot, use lock for atomicity async with self._lock: now = time.monotonic() cutoff = now - self._rate_window while self._times and self._times[0] <= cutoff: self._times.popleft() if len(self._times) < self._rate_limit: self._times.append(now) return oldest = self._times[0] wait_time = max(0.0, (oldest + self._rate_window) - now) if wait_time > 0: await asyncio.sleep(wait_time) else: await asyncio.sleep(0) async def __aenter__(self) -> StrictSlidingWindowLimiter: await self.acquire() return self async def __aexit__(self, exc_type, exc, tb) -> bool: return False