AutoWS / crawler /robots.py
Roman190928's picture
Upload AutoWS app files without plan/readme
f55f92e verified
from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass
from urllib.parse import urlsplit
from urllib.robotparser import RobotFileParser
import aiohttp
@dataclass
class _RobotsCacheEntry:
parser: RobotFileParser | None
expires_at: float
class RobotsPolicy:
def __init__(
self,
session: aiohttp.ClientSession,
user_agent: str,
*,
cache_ttl_seconds: float = 3600.0,
fail_closed: bool = True,
max_bytes: int = 300_000,
) -> None:
self.session = session
self.user_agent = user_agent
self.cache_ttl_seconds = max(1.0, float(cache_ttl_seconds))
self.fail_closed = bool(fail_closed)
self.max_bytes = int(max_bytes)
self._cache: dict[str, _RobotsCacheEntry] = {}
self._cache_lock = asyncio.Lock()
self._origin_locks: dict[str, asyncio.Lock] = {}
async def can_fetch(self, url: str) -> bool:
parts = urlsplit(url)
host = (parts.hostname or "").lower().strip(".")
scheme = parts.scheme.lower()
if scheme not in {"http", "https"} or not host:
return False
origin = f"{scheme}://{host}"
parser = await self._get_parser(origin)
if parser is None:
return not self.fail_closed
return parser.can_fetch(self.user_agent, url)
async def _get_parser(self, origin: str) -> RobotFileParser | None:
now = time.monotonic()
async with self._cache_lock:
cached = self._cache.get(origin)
if cached and cached.expires_at > now:
return cached.parser
lock = self._origin_locks.get(origin)
if lock is None:
lock = asyncio.Lock()
self._origin_locks[origin] = lock
async with lock:
now = time.monotonic()
async with self._cache_lock:
cached = self._cache.get(origin)
if cached and cached.expires_at > now:
return cached.parser
parser = await self._download_and_parse(origin)
async with self._cache_lock:
self._cache[origin] = _RobotsCacheEntry(
parser=parser,
expires_at=time.monotonic() + self.cache_ttl_seconds,
)
return parser
async def _download_and_parse(self, origin: str) -> RobotFileParser | None:
robots_url = f"{origin}/robots.txt"
try:
async with self.session.get(robots_url, allow_redirects=True) as response:
if response.status >= 400:
return None
raw = await response.content.read(self.max_bytes + 1)
if len(raw) > self.max_bytes:
raw = raw[: self.max_bytes]
charset = response.charset or "utf-8"
text = raw.decode(charset, errors="ignore")
except Exception:
return None
parser = RobotFileParser()
parser.set_url(robots_url)
parser.parse(text.splitlines())
return parser