XHS / xhs_utils /rate_limiter.py
Trae Bot
Upload Spider_XHS project
c481f8a
import time
import threading
class TokenBucket:
def __init__(self, rate: float, capacity: float):
self.rate = float(rate)
self.capacity = float(capacity)
self._tokens = float(capacity)
self._updated_at = time.monotonic()
self._lock = threading.Lock()
def acquire(self, tokens: float = 1.0):
tokens = float(tokens)
while True:
wait_s = 0.0
with self._lock:
now = time.monotonic()
elapsed = now - self._updated_at
if elapsed > 0:
self._tokens = min(self.capacity, self._tokens + elapsed * self.rate)
self._updated_at = now
if self._tokens >= tokens:
self._tokens -= tokens
return
if self.rate <= 0:
wait_s = 0.2
else:
wait_s = (tokens - self._tokens) / self.rate
if wait_s <= 0:
wait_s = 0.01
time.sleep(wait_s)
def set_rate_capacity(self, rate: float, capacity: float):
with self._lock:
self.rate = float(rate)
self.capacity = float(capacity)
self._tokens = min(self._tokens, self.capacity)
class DomainRateLimiter:
def __init__(self, rules: dict[str, tuple[float, float]] | None = None):
self._rules = rules or {}
self._buckets: dict[str, TokenBucket] = {}
self._penalties: dict[str, tuple[float, float]] = {}
self._lock = threading.Lock()
def acquire(self, domain: str, tokens: float = 1.0):
if not domain:
return
with self._lock:
now = time.monotonic()
if domain in self._penalties:
until, factor = self._penalties[domain]
if now >= until:
del self._penalties[domain]
if domain not in self._buckets:
rate, cap = self._rules.get(domain, (2.0, 2.0))
self._buckets[domain] = TokenBucket(rate=rate, capacity=cap)
bucket = self._buckets[domain]
if domain in self._penalties:
_, factor = self._penalties[domain]
base_rate, base_cap = self._rules.get(domain, (2.0, 2.0))
bucket.set_rate_capacity(rate=max(0.1, base_rate * factor), capacity=max(0.5, base_cap * factor))
else:
base_rate, base_cap = self._rules.get(domain, (2.0, 2.0))
bucket.set_rate_capacity(rate=base_rate, capacity=base_cap)
bucket.acquire(tokens=tokens)
def penalize(self, domain: str, factor: float = 0.25, ttl_s: float = 60.0):
if not domain:
return
factor = max(0.05, min(1.0, float(factor)))
ttl_s = max(1.0, float(ttl_s))
with self._lock:
until = time.monotonic() + ttl_s
self._penalties[domain] = (until, factor)
def get_default_rate_limiter():
rules = {
"edith.xiaohongshu.com": (2.0, 2.0),
"www.xiaohongshu.com": (1.5, 2.0),
"creator.xiaohongshu.com": (1.5, 2.0),
"ros-upload.xiaohongshu.com": (0.8, 1.0),
"pgy.xiaohongshu.com": (1.0, 1.0),
}
return DomainRateLimiter(rules=rules)