File size: 2,164 Bytes
0157ac7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aa9c0b0
 
 
 
 
 
 
 
 
 
 
0157ac7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
"""Shared strict sliding-window rate limiting primitives."""

from __future__ import annotations

import asyncio
import time
from collections import deque


class StrictSlidingWindowLimiter:
    """Strict sliding window limiter.

    Guarantees: at most ``rate_limit`` acquisitions in any interval of length
    ``rate_window`` (seconds).

    Implemented as an async context manager so call sites can do::

        async with limiter:
            ...
    """

    def __init__(self, rate_limit: int, rate_window: float) -> None:
        if rate_limit <= 0:
            raise ValueError("rate_limit must be > 0")
        if rate_window <= 0:
            raise ValueError("rate_window must be > 0")

        self._rate_limit = int(rate_limit)
        self._rate_window = float(rate_window)
        self._times: deque[float] = deque()
        self._lock = asyncio.Lock()

    async def acquire(self) -> None:
        while True:
            now = time.monotonic()
            cutoff = now - self._rate_window

            # Fast path: try without lock (common case - room in window)
            while self._times and self._times[0] <= cutoff:
                self._times.popleft()
            if len(self._times) < self._rate_limit:
                self._times.append(now)
                return

            # Slow path: need to wait for a slot, use lock for atomicity
            async with self._lock:
                now = time.monotonic()
                cutoff = now - self._rate_window
                while self._times and self._times[0] <= cutoff:
                    self._times.popleft()
                if len(self._times) < self._rate_limit:
                    self._times.append(now)
                    return
                oldest = self._times[0]
                wait_time = max(0.0, (oldest + self._rate_window) - now)

            if wait_time > 0:
                await asyncio.sleep(wait_time)
            else:
                await asyncio.sleep(0)

    async def __aenter__(self) -> StrictSlidingWindowLimiter:
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb) -> bool:
        return False