File size: 1,004 Bytes
8ec3495 4466c5e 68a01ab 4466c5e 68a01ab 4466c5e 68a01ab 4466c5e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | # Simple in-memory sliding-window rate limiter — good enough for a single Streamlit server.
import time
from collections import deque
class RateLimiter:
# Each instance tracks call timestamps for one caller/key.
def __init__(self, max_calls: int, period_seconds: float):
self.max_calls = max_calls
self.period = period_seconds
self.calls = deque()
def allow(self) -> bool:
now = time.time()
# Drop timestamps outside the active window.
while self.calls and self.calls[0] <= now - self.period:
self.calls.popleft()
if len(self.calls) < self.max_calls:
self.calls.append(now)
return True
return False
def time_until_next(self) -> float:
# Return wait time before another call is allowed (seconds).
now = time.time()
if len(self.calls) < self.max_calls:
return 0.0
oldest = self.calls[0]
return max(0.0, (oldest + self.period) - now) |