Spaces:
Running
Running
| from __future__ import annotations | |
| import asyncio | |
| import time | |
| from typing import Awaitable, Callable | |
| class RequestRateLimiter: | |
| def __init__( | |
| self, | |
| global_interval_seconds: float, | |
| per_domain_interval_seconds: float, | |
| *, | |
| clock: Callable[[], float] | None = None, | |
| sleep: Callable[[float], Awaitable[None]] | None = None, | |
| ) -> None: | |
| self.global_interval_seconds = max(0.0, float(global_interval_seconds)) | |
| self.per_domain_interval_seconds = max(0.0, float(per_domain_interval_seconds)) | |
| self._clock = clock or time.monotonic | |
| self._sleep = sleep or asyncio.sleep | |
| self._global_lock = asyncio.Lock() | |
| self._global_last: float | None = None | |
| self._domain_guard = asyncio.Lock() | |
| self._domain_locks: dict[str, asyncio.Lock] = {} | |
| self._domain_last: dict[str, float] = {} | |
| async def acquire(self, domain: str) -> None: | |
| normalized = domain.lower().strip(".") | |
| await self._acquire_global() | |
| await self._acquire_domain(normalized) | |
| async def _acquire_global(self) -> None: | |
| if self.global_interval_seconds <= 0: | |
| return | |
| async with self._global_lock: | |
| now = self._clock() | |
| if self._global_last is not None: | |
| wait = self.global_interval_seconds - (now - self._global_last) | |
| if wait > 0: | |
| await self._sleep(wait) | |
| self._global_last = self._clock() | |
| async def _acquire_domain(self, domain: str) -> None: | |
| if not domain or self.per_domain_interval_seconds <= 0: | |
| return | |
| lock = await self._get_domain_lock(domain) | |
| async with lock: | |
| now = self._clock() | |
| last = self._domain_last.get(domain) | |
| if last is not None: | |
| wait = self.per_domain_interval_seconds - (now - last) | |
| if wait > 0: | |
| await self._sleep(wait) | |
| self._domain_last[domain] = self._clock() | |
| async def _get_domain_lock(self, domain: str) -> asyncio.Lock: | |
| async with self._domain_guard: | |
| lock = self._domain_locks.get(domain) | |
| if lock is None: | |
| lock = asyncio.Lock() | |
| self._domain_locks[domain] = lock | |
| return lock | |