claude-code-proxy / core /rate_limit.py
Yash030's picture
NIM speed optimization — adaptive rate limiting and increased throughput
aa9c0b0
"""Shared strict sliding-window rate limiting primitives."""
from __future__ import annotations
import asyncio
import time
from collections import deque
class StrictSlidingWindowLimiter:
"""Strict sliding window limiter.
Guarantees: at most ``rate_limit`` acquisitions in any interval of length
``rate_window`` (seconds).
Implemented as an async context manager so call sites can do::
async with limiter:
...
"""
def __init__(self, rate_limit: int, rate_window: float) -> None:
if rate_limit <= 0:
raise ValueError("rate_limit must be > 0")
if rate_window <= 0:
raise ValueError("rate_window must be > 0")
self._rate_limit = int(rate_limit)
self._rate_window = float(rate_window)
self._times: deque[float] = deque()
self._lock = asyncio.Lock()
async def acquire(self) -> None:
while True:
now = time.monotonic()
cutoff = now - self._rate_window
# Fast path: try without lock (common case - room in window)
while self._times and self._times[0] <= cutoff:
self._times.popleft()
if len(self._times) < self._rate_limit:
self._times.append(now)
return
# Slow path: need to wait for a slot, use lock for atomicity
async with self._lock:
now = time.monotonic()
cutoff = now - self._rate_window
while self._times and self._times[0] <= cutoff:
self._times.popleft()
if len(self._times) < self._rate_limit:
self._times.append(now)
return
oldest = self._times[0]
wait_time = max(0.0, (oldest + self._rate_window) - now)
if wait_time > 0:
await asyncio.sleep(wait_time)
else:
await asyncio.sleep(0)
async def __aenter__(self) -> StrictSlidingWindowLimiter:
await self.acquire()
return self
async def __aexit__(self, exc_type, exc, tb) -> bool:
return False