from __future__ import annotations import asyncio import time from dataclasses import dataclass from urllib.parse import urlsplit from urllib.robotparser import RobotFileParser import aiohttp @dataclass class _RobotsCacheEntry: parser: RobotFileParser | None expires_at: float class RobotsPolicy: def __init__( self, session: aiohttp.ClientSession, user_agent: str, *, cache_ttl_seconds: float = 3600.0, fail_closed: bool = True, max_bytes: int = 300_000, ) -> None: self.session = session self.user_agent = user_agent self.cache_ttl_seconds = max(1.0, float(cache_ttl_seconds)) self.fail_closed = bool(fail_closed) self.max_bytes = int(max_bytes) self._cache: dict[str, _RobotsCacheEntry] = {} self._cache_lock = asyncio.Lock() self._origin_locks: dict[str, asyncio.Lock] = {} async def can_fetch(self, url: str) -> bool: parts = urlsplit(url) host = (parts.hostname or "").lower().strip(".") scheme = parts.scheme.lower() if scheme not in {"http", "https"} or not host: return False origin = f"{scheme}://{host}" parser = await self._get_parser(origin) if parser is None: return not self.fail_closed return parser.can_fetch(self.user_agent, url) async def _get_parser(self, origin: str) -> RobotFileParser | None: now = time.monotonic() async with self._cache_lock: cached = self._cache.get(origin) if cached and cached.expires_at > now: return cached.parser lock = self._origin_locks.get(origin) if lock is None: lock = asyncio.Lock() self._origin_locks[origin] = lock async with lock: now = time.monotonic() async with self._cache_lock: cached = self._cache.get(origin) if cached and cached.expires_at > now: return cached.parser parser = await self._download_and_parse(origin) async with self._cache_lock: self._cache[origin] = _RobotsCacheEntry( parser=parser, expires_at=time.monotonic() + self.cache_ttl_seconds, ) return parser async def _download_and_parse(self, origin: str) -> RobotFileParser | None: robots_url = f"{origin}/robots.txt" try: async with self.session.get(robots_url, allow_redirects=True) as response: if response.status >= 400: return None raw = await response.content.read(self.max_bytes + 1) if len(raw) > self.max_bytes: raw = raw[: self.max_bytes] charset = response.charset or "utf-8" text = raw.decode(charset, errors="ignore") except Exception: return None parser = RobotFileParser() parser.set_url(robots_url) parser.parse(text.splitlines()) return parser