Spaces:
Sleeping
Sleeping
| """ | |
| Documentation Crawler Module | |
| Handles recursive crawling, rate limiting, and HTML parsing. | |
| """ | |
| import time | |
| import json | |
| import logging | |
| import asyncio | |
| import re | |
| import xml.etree.ElementTree as ET | |
| from urllib.parse import urljoin, urlparse, urlunparse, parse_qsl, urlencode | |
| from urllib.robotparser import RobotFileParser | |
| from typing import Set, List, Optional, Dict, Any | |
| from bs4 import BeautifulSoup | |
| import requests | |
| from requests.adapters import HTTPAdapter | |
| from urllib3.util.retry import Retry | |
| try: | |
| from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode | |
| except ImportError: | |
| AsyncWebCrawler = None | |
| BrowserConfig = None | |
| CrawlerRunConfig = None | |
| CacheMode = None | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| async def _async_crawl4ai_urls(urls: List[str], max_pages: int = 100) -> List[Dict[str, str]]: | |
| if AsyncWebCrawler is None: | |
| raise RuntimeError("crawl4ai is not installed") | |
| browser_config = BrowserConfig(headless=True) | |
| run_config = CrawlerRunConfig( | |
| cache_mode=CacheMode.BYPASS if CacheMode else None, | |
| wait_until="domcontentloaded", | |
| max_retries=2, | |
| max_scroll_steps=10, | |
| verbose=False, | |
| only_text=True, | |
| page_timeout=30000, | |
| ) | |
| async with AsyncWebCrawler(config=browser_config) as crawler: | |
| results = await crawler.arun_many(urls=urls, config=run_config) | |
| if hasattr(results, "__aiter__"): | |
| results = [result async for result in results] | |
| elif not isinstance(results, list): | |
| results = list(results) | |
| documents = [] | |
| for result in results: | |
| if not getattr(result, "success", False): | |
| logger.warning( | |
| f"Crawl4AI failed for {getattr(result, 'url', '<unknown>')}: {getattr(result, 'error_message', 'no error message')}" | |
| ) | |
| continue | |
| content = ( | |
| getattr(result, "extracted_content", None) | |
| or getattr(result, "markdown", None) | |
| or getattr(result, "cleaned_html", None) | |
| or "" | |
| ) | |
| if not content: | |
| continue | |
| documents.append({"url": getattr(result, "url", ""), "content": content}) | |
| return documents | |
| async def async_crawl_urls_with_crawl4ai(urls: List[str], max_pages: int = 100) -> List[Dict[str, str]]: | |
| return await _async_crawl4ai_urls(urls, max_pages) | |
| def _fallback_crawl(base_url: str, max_pages: int = 100) -> List[Dict[str, str]]: | |
| crawler = DocumentationCrawler(base_url=base_url, max_depth=3, max_pages=max_pages) | |
| documents = crawler.crawl() | |
| if not documents: | |
| raise RuntimeError("Fallback DocumentationCrawler returned no documents") | |
| return documents | |
| async def async_crawl_and_persist(base_url: str, output_path: str = "./crawler_docs.json", max_pages: int = 100) -> List[Dict[str, str]]: | |
| if AsyncWebCrawler is None: | |
| documents = await asyncio.to_thread(_fallback_crawl, base_url, max_pages) | |
| else: | |
| try: | |
| documents = await _async_crawl4ai_urls([base_url], max_pages=max_pages) | |
| if not documents: | |
| raise RuntimeError("Crawl4AI returned no documents") | |
| except Exception as e: | |
| logging.getLogger(__name__).info(f"Crawl4AI failed, falling back to DocumentationCrawler: {e}") | |
| documents = await asyncio.to_thread(_fallback_crawl, base_url, max_pages) | |
| with open(output_path, "w", encoding="utf-8") as f: | |
| json.dump(documents, f, indent=2, ensure_ascii=False) | |
| return documents | |
| def crawl_and_persist(base_url: str, output_path: str = "./crawler_docs.json", max_pages: int = 100) -> List[Dict[str, str]]: | |
| return asyncio.run(async_crawl_and_persist(base_url, output_path=output_path, max_pages=max_pages)) | |
| def crawl_urls_with_crawl4ai(urls: List[str], max_pages: int = 100) -> List[Dict[str, str]]: | |
| if AsyncWebCrawler is None: | |
| raise RuntimeError("crawl4ai is not installed") | |
| try: | |
| return asyncio.run(_async_crawl4ai_urls(urls, max_pages)) | |
| except Exception as e: | |
| raise RuntimeError(f"Crawl4AI URL crawl failed: {e}") from e | |
| class DocumentationCrawler: | |
| """ | |
| Recursively crawls documentation websites with politeness and rate limiting. | |
| """ | |
| def __init__(self, | |
| base_url: str, | |
| max_depth: int = 3, | |
| delay: float = 0.5, | |
| timeout: int = 10, | |
| max_pages: int = 100, | |
| respect_robots_txt: bool = True, | |
| use_sitemap: bool = True): | |
| self.base_url = base_url | |
| self.max_depth = max_depth | |
| self.delay = delay | |
| self.timeout = timeout | |
| self.max_pages = max_pages | |
| self.respect_robots_txt = respect_robots_txt | |
| self.use_sitemap = use_sitemap | |
| self.visited_urls: Set[str] = set() | |
| self.failed_urls: Set[str] = set() | |
| self.robots_parser: Optional[RobotFileParser] = None | |
| self.crawl_delay = delay | |
| self.sitemap_urls: List[str] = [] | |
| self.session = self._setup_session() | |
| self.domain = urlparse(base_url).netloc.lower() | |
| self._load_robots_rules() | |
| if self.use_sitemap: | |
| self.sitemap_urls = self._load_sitemap_urls() | |
| def _setup_session(self) -> requests.Session: | |
| session = requests.Session() | |
| retry_strategy = Retry( | |
| total=3, | |
| backoff_factor=1, | |
| status_forcelist=[429, 500, 502, 503, 504] | |
| ) | |
| adapter = HTTPAdapter(max_retries=retry_strategy) | |
| session.mount("http://", adapter) | |
| session.mount("https://", adapter) | |
| session.headers.update({ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| }) | |
| return session | |
| def _load_robots_rules(self) -> None: | |
| if not self.respect_robots_txt: | |
| return | |
| robots_url = urljoin(self.base_url, "/robots.txt") | |
| parser = RobotFileParser() | |
| parser.set_url(robots_url) | |
| try: | |
| parser.read() | |
| self.robots_parser = parser | |
| delay = parser.crawl_delay(self.session.headers.get('User-Agent', '*')) | |
| if delay is not None: | |
| self.crawl_delay = max(self.delay, delay) | |
| logger.info(f"Using crawl delay {self.crawl_delay} from robots.txt") | |
| except Exception as exc: | |
| logger.warning(f"Could not load robots.txt from {robots_url}: {exc}") | |
| self.robots_parser = None | |
| def _load_sitemap_urls(self) -> List[str]: | |
| sitemap_urls: List[str] = [] | |
| if not self.use_sitemap: | |
| return sitemap_urls | |
| try: | |
| robots_url = urljoin(self.base_url, "/robots.txt") | |
| response = self.session.get(robots_url, timeout=self.timeout) | |
| response.raise_for_status() | |
| for line in response.text.splitlines(): | |
| if line.strip().lower().startswith("sitemap:"): | |
| sitemap_url = line.split(":", 1)[1].strip() | |
| if sitemap_url: | |
| sitemap_urls.append(sitemap_url) | |
| except Exception: | |
| pass | |
| if not sitemap_urls: | |
| sitemap_urls.append(urljoin(self.base_url, "/sitemap.xml")) | |
| discovered: List[str] = [] | |
| for sitemap_url in sitemap_urls: | |
| try: | |
| response = self.session.get(sitemap_url, timeout=self.timeout) | |
| response.raise_for_status() | |
| root = ET.fromstring(response.content) | |
| for elem in root.findall('.//{*}loc'): | |
| normalized = self._normalize_url(elem.text or "") | |
| if normalized: | |
| discovered.append(normalized) | |
| except Exception: | |
| continue | |
| return list(dict.fromkeys(discovered)) | |
| def _normalize_url(self, url: str) -> str: | |
| parsed = urlparse(url) | |
| if parsed.scheme not in ("http", "https"): | |
| return "" | |
| scheme = parsed.scheme.lower() | |
| netloc = parsed.netloc.lower() | |
| if netloc.endswith(":80") and scheme == "http": | |
| netloc = netloc[:-3] | |
| elif netloc.endswith(":443") and scheme == "https": | |
| netloc = netloc[:-4] | |
| path = parsed.path or "/" | |
| path = re.sub(r"/+", "/", path) | |
| if path != "/" and path.endswith("/"): | |
| path = path.rstrip("/") | |
| query_pairs = parse_qsl(parsed.query, keep_blank_values=True) | |
| filtered_pairs = [ | |
| (k, v) | |
| for k, v in query_pairs | |
| if not re.match(r'^(utm_|fbclid|gclid|mc_cid|mc_eid|ref)', k, re.IGNORECASE) | |
| ] | |
| filtered_pairs.sort() | |
| query = urlencode(filtered_pairs, doseq=True) | |
| normalized = urlunparse((scheme, netloc, path, "", query, "")) | |
| return normalized | |
| def _is_allowed_by_robots(self, url: str) -> bool: | |
| if not self.robots_parser: | |
| return True | |
| return self.robots_parser.can_fetch(self.session.headers.get('User-Agent', '*'), url) | |
| def _should_crawl_url(self, url: str) -> bool: | |
| normalized = self._normalize_url(url) | |
| if not normalized: | |
| return False | |
| parsed = urlparse(normalized) | |
| if parsed.netloc != self.domain: | |
| return False | |
| if normalized in self.visited_urls: | |
| return False | |
| if not parsed.scheme or parsed.scheme not in ("http", "https"): | |
| return False | |
| if self.respect_robots_txt and not self._is_allowed_by_robots(normalized): | |
| logger.info(f"Skipping {normalized} due to robots.txt") | |
| return False | |
| avoid_extensions = ['.pdf', '.zip', '.exe', '.jpg', '.jpeg', '.png', '.gif', '.css', '.js', '.svg', '.ico', '.woff', '.woff2'] | |
| if any(parsed.path.lower().endswith(ext) for ext in avoid_extensions): | |
| return False | |
| return True | |
| def _extract_text(self, html: str) -> str: | |
| soup = BeautifulSoup(html, 'html.parser') | |
| for element in soup(['script', 'style', 'nav', 'footer', 'noscript', 'header', 'aside', 'form', 'iframe', 'svg', 'canvas']): | |
| element.decompose() | |
| content = soup.find('article') or soup.find('main') or soup.body or soup | |
| text = content.get_text(separator='\n', strip=True) | |
| lines = [line.strip() for line in text.split('\n') if line.strip()] | |
| return '\n'.join(lines) | |
| def _extract_links(self, html: str, current_url: str) -> List[str]: | |
| soup = BeautifulSoup(html, 'html.parser') | |
| links = [] | |
| for link in soup.find_all('a', href=True): | |
| href = link['href'].strip() | |
| if not href or href.startswith('mailto:') or href.startswith('javascript:'): | |
| continue | |
| absolute_url = urljoin(current_url, href) | |
| absolute_url = absolute_url.split('#')[0].strip() | |
| if not absolute_url: | |
| continue | |
| links.append(absolute_url) | |
| return links | |
| def crawl(self) -> List[dict]: | |
| to_crawl = [] | |
| normalized_base = self._normalize_url(self.base_url) | |
| if normalized_base: | |
| to_crawl.append((normalized_base, 0)) | |
| for sitemap_url in self.sitemap_urls: | |
| if self._should_crawl_url(sitemap_url): | |
| to_crawl.append((sitemap_url, 0)) | |
| documents = [] | |
| logger.info(f"Starting crawl of {self.base_url}") | |
| while to_crawl and len(self.visited_urls) < self.max_pages: | |
| current_url, depth = to_crawl.pop(0) | |
| if depth > self.max_depth: | |
| continue | |
| if not self._should_crawl_url(current_url): | |
| continue | |
| normalized_current = self._normalize_url(current_url) | |
| if not normalized_current: | |
| continue | |
| self.visited_urls.add(normalized_current) | |
| try: | |
| logger.info(f"Crawling [{len(self.visited_urls)}/{self.max_pages}] {normalized_current} (depth: {depth})") | |
| time.sleep(self.crawl_delay) | |
| response = self.session.get(normalized_current, timeout=self.timeout) | |
| response.raise_for_status() | |
| text_content = self._extract_text(response.text) | |
| if text_content: | |
| documents.append({ | |
| 'url': normalized_current, | |
| 'content': text_content | |
| }) | |
| if depth < self.max_depth: | |
| links = self._extract_links(response.text, normalized_current) | |
| for link in links: | |
| if self._should_crawl_url(link): | |
| normalized_link = self._normalize_url(link) | |
| if normalized_link: | |
| to_crawl.append((normalized_link, depth + 1)) | |
| except Exception as e: | |
| logger.error(f"Failed to crawl {normalized_current}: {str(e)}") | |
| self.failed_urls.add(normalized_current) | |
| logger.info(f"Crawl complete. Fetched {len(documents)} pages, {len(self.failed_urls)} failed.") | |
| return documents |