Spaces:
Paused
Paused
File size: 3,148 Bytes
f55f92e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 | from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass
from urllib.parse import urlsplit
from urllib.robotparser import RobotFileParser
import aiohttp
@dataclass
class _RobotsCacheEntry:
parser: RobotFileParser | None
expires_at: float
class RobotsPolicy:
def __init__(
self,
session: aiohttp.ClientSession,
user_agent: str,
*,
cache_ttl_seconds: float = 3600.0,
fail_closed: bool = True,
max_bytes: int = 300_000,
) -> None:
self.session = session
self.user_agent = user_agent
self.cache_ttl_seconds = max(1.0, float(cache_ttl_seconds))
self.fail_closed = bool(fail_closed)
self.max_bytes = int(max_bytes)
self._cache: dict[str, _RobotsCacheEntry] = {}
self._cache_lock = asyncio.Lock()
self._origin_locks: dict[str, asyncio.Lock] = {}
async def can_fetch(self, url: str) -> bool:
parts = urlsplit(url)
host = (parts.hostname or "").lower().strip(".")
scheme = parts.scheme.lower()
if scheme not in {"http", "https"} or not host:
return False
origin = f"{scheme}://{host}"
parser = await self._get_parser(origin)
if parser is None:
return not self.fail_closed
return parser.can_fetch(self.user_agent, url)
async def _get_parser(self, origin: str) -> RobotFileParser | None:
now = time.monotonic()
async with self._cache_lock:
cached = self._cache.get(origin)
if cached and cached.expires_at > now:
return cached.parser
lock = self._origin_locks.get(origin)
if lock is None:
lock = asyncio.Lock()
self._origin_locks[origin] = lock
async with lock:
now = time.monotonic()
async with self._cache_lock:
cached = self._cache.get(origin)
if cached and cached.expires_at > now:
return cached.parser
parser = await self._download_and_parse(origin)
async with self._cache_lock:
self._cache[origin] = _RobotsCacheEntry(
parser=parser,
expires_at=time.monotonic() + self.cache_ttl_seconds,
)
return parser
async def _download_and_parse(self, origin: str) -> RobotFileParser | None:
robots_url = f"{origin}/robots.txt"
try:
async with self.session.get(robots_url, allow_redirects=True) as response:
if response.status >= 400:
return None
raw = await response.content.read(self.max_bytes + 1)
if len(raw) > self.max_bytes:
raw = raw[: self.max_bytes]
charset = response.charset or "utf-8"
text = raw.decode(charset, errors="ignore")
except Exception:
return None
parser = RobotFileParser()
parser.set_url(robots_url)
parser.parse(text.splitlines())
return parser
|