import time import threading class TokenBucket: def __init__(self, rate: float, capacity: float): self.rate = float(rate) self.capacity = float(capacity) self._tokens = float(capacity) self._updated_at = time.monotonic() self._lock = threading.Lock() def acquire(self, tokens: float = 1.0): tokens = float(tokens) while True: wait_s = 0.0 with self._lock: now = time.monotonic() elapsed = now - self._updated_at if elapsed > 0: self._tokens = min(self.capacity, self._tokens + elapsed * self.rate) self._updated_at = now if self._tokens >= tokens: self._tokens -= tokens return if self.rate <= 0: wait_s = 0.2 else: wait_s = (tokens - self._tokens) / self.rate if wait_s <= 0: wait_s = 0.01 time.sleep(wait_s) def set_rate_capacity(self, rate: float, capacity: float): with self._lock: self.rate = float(rate) self.capacity = float(capacity) self._tokens = min(self._tokens, self.capacity) class DomainRateLimiter: def __init__(self, rules: dict[str, tuple[float, float]] | None = None): self._rules = rules or {} self._buckets: dict[str, TokenBucket] = {} self._penalties: dict[str, tuple[float, float]] = {} self._lock = threading.Lock() def acquire(self, domain: str, tokens: float = 1.0): if not domain: return with self._lock: now = time.monotonic() if domain in self._penalties: until, factor = self._penalties[domain] if now >= until: del self._penalties[domain] if domain not in self._buckets: rate, cap = self._rules.get(domain, (2.0, 2.0)) self._buckets[domain] = TokenBucket(rate=rate, capacity=cap) bucket = self._buckets[domain] if domain in self._penalties: _, factor = self._penalties[domain] base_rate, base_cap = self._rules.get(domain, (2.0, 2.0)) bucket.set_rate_capacity(rate=max(0.1, base_rate * factor), capacity=max(0.5, base_cap * factor)) else: base_rate, base_cap = self._rules.get(domain, (2.0, 2.0)) bucket.set_rate_capacity(rate=base_rate, capacity=base_cap) bucket.acquire(tokens=tokens) def penalize(self, domain: str, factor: float = 0.25, ttl_s: float = 60.0): if not domain: return factor = max(0.05, min(1.0, float(factor))) ttl_s = max(1.0, float(ttl_s)) with self._lock: until = time.monotonic() + ttl_s self._penalties[domain] = (until, factor) def get_default_rate_limiter(): rules = { "edith.xiaohongshu.com": (2.0, 2.0), "www.xiaohongshu.com": (1.5, 2.0), "creator.xiaohongshu.com": (1.5, 2.0), "ros-upload.xiaohongshu.com": (0.8, 1.0), "pgy.xiaohongshu.com": (1.0, 1.0), } return DomainRateLimiter(rules=rules)