Spaces:
Paused
Paused
| from __future__ import annotations | |
| import asyncio | |
| import time | |
| from dataclasses import dataclass | |
| from urllib.parse import urlsplit | |
| from urllib.robotparser import RobotFileParser | |
| import aiohttp | |
| class _RobotsCacheEntry: | |
| parser: RobotFileParser | None | |
| expires_at: float | |
| class RobotsPolicy: | |
| def __init__( | |
| self, | |
| session: aiohttp.ClientSession, | |
| user_agent: str, | |
| *, | |
| cache_ttl_seconds: float = 3600.0, | |
| fail_closed: bool = True, | |
| max_bytes: int = 300_000, | |
| ) -> None: | |
| self.session = session | |
| self.user_agent = user_agent | |
| self.cache_ttl_seconds = max(1.0, float(cache_ttl_seconds)) | |
| self.fail_closed = bool(fail_closed) | |
| self.max_bytes = int(max_bytes) | |
| self._cache: dict[str, _RobotsCacheEntry] = {} | |
| self._cache_lock = asyncio.Lock() | |
| self._origin_locks: dict[str, asyncio.Lock] = {} | |
| async def can_fetch(self, url: str) -> bool: | |
| parts = urlsplit(url) | |
| host = (parts.hostname or "").lower().strip(".") | |
| scheme = parts.scheme.lower() | |
| if scheme not in {"http", "https"} or not host: | |
| return False | |
| origin = f"{scheme}://{host}" | |
| parser = await self._get_parser(origin) | |
| if parser is None: | |
| return not self.fail_closed | |
| return parser.can_fetch(self.user_agent, url) | |
| async def _get_parser(self, origin: str) -> RobotFileParser | None: | |
| now = time.monotonic() | |
| async with self._cache_lock: | |
| cached = self._cache.get(origin) | |
| if cached and cached.expires_at > now: | |
| return cached.parser | |
| lock = self._origin_locks.get(origin) | |
| if lock is None: | |
| lock = asyncio.Lock() | |
| self._origin_locks[origin] = lock | |
| async with lock: | |
| now = time.monotonic() | |
| async with self._cache_lock: | |
| cached = self._cache.get(origin) | |
| if cached and cached.expires_at > now: | |
| return cached.parser | |
| parser = await self._download_and_parse(origin) | |
| async with self._cache_lock: | |
| self._cache[origin] = _RobotsCacheEntry( | |
| parser=parser, | |
| expires_at=time.monotonic() + self.cache_ttl_seconds, | |
| ) | |
| return parser | |
| async def _download_and_parse(self, origin: str) -> RobotFileParser | None: | |
| robots_url = f"{origin}/robots.txt" | |
| try: | |
| async with self.session.get(robots_url, allow_redirects=True) as response: | |
| if response.status >= 400: | |
| return None | |
| raw = await response.content.read(self.max_bytes + 1) | |
| if len(raw) > self.max_bytes: | |
| raw = raw[: self.max_bytes] | |
| charset = response.charset or "utf-8" | |
| text = raw.decode(charset, errors="ignore") | |
| except Exception: | |
| return None | |
| parser = RobotFileParser() | |
| parser.set_url(robots_url) | |
| parser.parse(text.splitlines()) | |
| return parser | |