""" Documentation Crawler Module Handles recursive crawling, rate limiting, and HTML parsing. """ import time import json import logging import asyncio import re import xml.etree.ElementTree as ET from urllib.parse import urljoin, urlparse, urlunparse, parse_qsl, urlencode from urllib.robotparser import RobotFileParser from typing import Set, List, Optional, Dict, Any from bs4 import BeautifulSoup import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry try: from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode except ImportError: AsyncWebCrawler = None BrowserConfig = None CrawlerRunConfig = None CacheMode = None logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) async def _async_crawl4ai_urls(urls: List[str], max_pages: int = 100) -> List[Dict[str, str]]: if AsyncWebCrawler is None: raise RuntimeError("crawl4ai is not installed") browser_config = BrowserConfig(headless=True) run_config = CrawlerRunConfig( cache_mode=CacheMode.BYPASS if CacheMode else None, wait_until="domcontentloaded", max_retries=2, max_scroll_steps=10, verbose=False, only_text=True, page_timeout=30000, ) async with AsyncWebCrawler(config=browser_config) as crawler: results = await crawler.arun_many(urls=urls, config=run_config) if hasattr(results, "__aiter__"): results = [result async for result in results] elif not isinstance(results, list): results = list(results) documents = [] for result in results: if not getattr(result, "success", False): logger.warning( f"Crawl4AI failed for {getattr(result, 'url', '')}: {getattr(result, 'error_message', 'no error message')}" ) continue content = ( getattr(result, "extracted_content", None) or getattr(result, "markdown", None) or getattr(result, "cleaned_html", None) or "" ) if not content: continue documents.append({"url": getattr(result, "url", ""), "content": content}) return documents async def async_crawl_urls_with_crawl4ai(urls: List[str], max_pages: int = 100) -> List[Dict[str, str]]: return await _async_crawl4ai_urls(urls, max_pages) def _fallback_crawl(base_url: str, max_pages: int = 100) -> List[Dict[str, str]]: crawler = DocumentationCrawler(base_url=base_url, max_depth=3, max_pages=max_pages) documents = crawler.crawl() if not documents: raise RuntimeError("Fallback DocumentationCrawler returned no documents") return documents async def async_crawl_and_persist(base_url: str, output_path: str = "./crawler_docs.json", max_pages: int = 100) -> List[Dict[str, str]]: if AsyncWebCrawler is None: documents = await asyncio.to_thread(_fallback_crawl, base_url, max_pages) else: try: documents = await _async_crawl4ai_urls([base_url], max_pages=max_pages) if not documents: raise RuntimeError("Crawl4AI returned no documents") except Exception as e: logging.getLogger(__name__).info(f"Crawl4AI failed, falling back to DocumentationCrawler: {e}") documents = await asyncio.to_thread(_fallback_crawl, base_url, max_pages) with open(output_path, "w", encoding="utf-8") as f: json.dump(documents, f, indent=2, ensure_ascii=False) return documents def crawl_and_persist(base_url: str, output_path: str = "./crawler_docs.json", max_pages: int = 100) -> List[Dict[str, str]]: return asyncio.run(async_crawl_and_persist(base_url, output_path=output_path, max_pages=max_pages)) def crawl_urls_with_crawl4ai(urls: List[str], max_pages: int = 100) -> List[Dict[str, str]]: if AsyncWebCrawler is None: raise RuntimeError("crawl4ai is not installed") try: return asyncio.run(_async_crawl4ai_urls(urls, max_pages)) except Exception as e: raise RuntimeError(f"Crawl4AI URL crawl failed: {e}") from e class DocumentationCrawler: """ Recursively crawls documentation websites with politeness and rate limiting. """ def __init__(self, base_url: str, max_depth: int = 3, delay: float = 0.5, timeout: int = 10, max_pages: int = 100, respect_robots_txt: bool = True, use_sitemap: bool = True): self.base_url = base_url self.max_depth = max_depth self.delay = delay self.timeout = timeout self.max_pages = max_pages self.respect_robots_txt = respect_robots_txt self.use_sitemap = use_sitemap self.visited_urls: Set[str] = set() self.failed_urls: Set[str] = set() self.robots_parser: Optional[RobotFileParser] = None self.crawl_delay = delay self.sitemap_urls: List[str] = [] self.session = self._setup_session() self.domain = urlparse(base_url).netloc.lower() self._load_robots_rules() if self.use_sitemap: self.sitemap_urls = self._load_sitemap_urls() def _setup_session(self) -> requests.Session: session = requests.Session() retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504] ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("http://", adapter) session.mount("https://", adapter) session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) return session def _load_robots_rules(self) -> None: if not self.respect_robots_txt: return robots_url = urljoin(self.base_url, "/robots.txt") parser = RobotFileParser() parser.set_url(robots_url) try: parser.read() self.robots_parser = parser delay = parser.crawl_delay(self.session.headers.get('User-Agent', '*')) if delay is not None: self.crawl_delay = max(self.delay, delay) logger.info(f"Using crawl delay {self.crawl_delay} from robots.txt") except Exception as exc: logger.warning(f"Could not load robots.txt from {robots_url}: {exc}") self.robots_parser = None def _load_sitemap_urls(self) -> List[str]: sitemap_urls: List[str] = [] if not self.use_sitemap: return sitemap_urls try: robots_url = urljoin(self.base_url, "/robots.txt") response = self.session.get(robots_url, timeout=self.timeout) response.raise_for_status() for line in response.text.splitlines(): if line.strip().lower().startswith("sitemap:"): sitemap_url = line.split(":", 1)[1].strip() if sitemap_url: sitemap_urls.append(sitemap_url) except Exception: pass if not sitemap_urls: sitemap_urls.append(urljoin(self.base_url, "/sitemap.xml")) discovered: List[str] = [] for sitemap_url in sitemap_urls: try: response = self.session.get(sitemap_url, timeout=self.timeout) response.raise_for_status() root = ET.fromstring(response.content) for elem in root.findall('.//{*}loc'): normalized = self._normalize_url(elem.text or "") if normalized: discovered.append(normalized) except Exception: continue return list(dict.fromkeys(discovered)) def _normalize_url(self, url: str) -> str: parsed = urlparse(url) if parsed.scheme not in ("http", "https"): return "" scheme = parsed.scheme.lower() netloc = parsed.netloc.lower() if netloc.endswith(":80") and scheme == "http": netloc = netloc[:-3] elif netloc.endswith(":443") and scheme == "https": netloc = netloc[:-4] path = parsed.path or "/" path = re.sub(r"/+", "/", path) if path != "/" and path.endswith("/"): path = path.rstrip("/") query_pairs = parse_qsl(parsed.query, keep_blank_values=True) filtered_pairs = [ (k, v) for k, v in query_pairs if not re.match(r'^(utm_|fbclid|gclid|mc_cid|mc_eid|ref)', k, re.IGNORECASE) ] filtered_pairs.sort() query = urlencode(filtered_pairs, doseq=True) normalized = urlunparse((scheme, netloc, path, "", query, "")) return normalized def _is_allowed_by_robots(self, url: str) -> bool: if not self.robots_parser: return True return self.robots_parser.can_fetch(self.session.headers.get('User-Agent', '*'), url) def _should_crawl_url(self, url: str) -> bool: normalized = self._normalize_url(url) if not normalized: return False parsed = urlparse(normalized) if parsed.netloc != self.domain: return False if normalized in self.visited_urls: return False if not parsed.scheme or parsed.scheme not in ("http", "https"): return False if self.respect_robots_txt and not self._is_allowed_by_robots(normalized): logger.info(f"Skipping {normalized} due to robots.txt") return False avoid_extensions = ['.pdf', '.zip', '.exe', '.jpg', '.jpeg', '.png', '.gif', '.css', '.js', '.svg', '.ico', '.woff', '.woff2'] if any(parsed.path.lower().endswith(ext) for ext in avoid_extensions): return False return True def _extract_text(self, html: str) -> str: soup = BeautifulSoup(html, 'html.parser') for element in soup(['script', 'style', 'nav', 'footer', 'noscript', 'header', 'aside', 'form', 'iframe', 'svg', 'canvas']): element.decompose() content = soup.find('article') or soup.find('main') or soup.body or soup text = content.get_text(separator='\n', strip=True) lines = [line.strip() for line in text.split('\n') if line.strip()] return '\n'.join(lines) def _extract_links(self, html: str, current_url: str) -> List[str]: soup = BeautifulSoup(html, 'html.parser') links = [] for link in soup.find_all('a', href=True): href = link['href'].strip() if not href or href.startswith('mailto:') or href.startswith('javascript:'): continue absolute_url = urljoin(current_url, href) absolute_url = absolute_url.split('#')[0].strip() if not absolute_url: continue links.append(absolute_url) return links def crawl(self) -> List[dict]: to_crawl = [] normalized_base = self._normalize_url(self.base_url) if normalized_base: to_crawl.append((normalized_base, 0)) for sitemap_url in self.sitemap_urls: if self._should_crawl_url(sitemap_url): to_crawl.append((sitemap_url, 0)) documents = [] logger.info(f"Starting crawl of {self.base_url}") while to_crawl and len(self.visited_urls) < self.max_pages: current_url, depth = to_crawl.pop(0) if depth > self.max_depth: continue if not self._should_crawl_url(current_url): continue normalized_current = self._normalize_url(current_url) if not normalized_current: continue self.visited_urls.add(normalized_current) try: logger.info(f"Crawling [{len(self.visited_urls)}/{self.max_pages}] {normalized_current} (depth: {depth})") time.sleep(self.crawl_delay) response = self.session.get(normalized_current, timeout=self.timeout) response.raise_for_status() text_content = self._extract_text(response.text) if text_content: documents.append({ 'url': normalized_current, 'content': text_content }) if depth < self.max_depth: links = self._extract_links(response.text, normalized_current) for link in links: if self._should_crawl_url(link): normalized_link = self._normalize_url(link) if normalized_link: to_crawl.append((normalized_link, depth + 1)) except Exception as e: logger.error(f"Failed to crawl {normalized_current}: {str(e)}") self.failed_urls.add(normalized_current) logger.info(f"Crawl complete. Fetched {len(documents)} pages, {len(self.failed_urls)} failed.") return documents