AutoWS / crawler /rate_limit.py
Roman190928's picture
Upload AutoWS app files without plan/readme
f55f92e verified
from __future__ import annotations
import asyncio
import time
from typing import Awaitable, Callable
class RequestRateLimiter:
def __init__(
self,
global_interval_seconds: float,
per_domain_interval_seconds: float,
*,
clock: Callable[[], float] | None = None,
sleep: Callable[[float], Awaitable[None]] | None = None,
) -> None:
self.global_interval_seconds = max(0.0, float(global_interval_seconds))
self.per_domain_interval_seconds = max(0.0, float(per_domain_interval_seconds))
self._clock = clock or time.monotonic
self._sleep = sleep or asyncio.sleep
self._global_lock = asyncio.Lock()
self._global_last: float | None = None
self._domain_guard = asyncio.Lock()
self._domain_locks: dict[str, asyncio.Lock] = {}
self._domain_last: dict[str, float] = {}
async def acquire(self, domain: str) -> None:
normalized = domain.lower().strip(".")
await self._acquire_global()
await self._acquire_domain(normalized)
async def _acquire_global(self) -> None:
if self.global_interval_seconds <= 0:
return
async with self._global_lock:
now = self._clock()
if self._global_last is not None:
wait = self.global_interval_seconds - (now - self._global_last)
if wait > 0:
await self._sleep(wait)
self._global_last = self._clock()
async def _acquire_domain(self, domain: str) -> None:
if not domain or self.per_domain_interval_seconds <= 0:
return
lock = await self._get_domain_lock(domain)
async with lock:
now = self._clock()
last = self._domain_last.get(domain)
if last is not None:
wait = self.per_domain_interval_seconds - (now - last)
if wait > 0:
await self._sleep(wait)
self._domain_last[domain] = self._clock()
async def _get_domain_lock(self, domain: str) -> asyncio.Lock:
async with self._domain_guard:
lock = self._domain_locks.get(domain)
if lock is None:
lock = asyncio.Lock()
self._domain_locks[domain] = lock
return lock