chintu4's picture
ok
7ef1828
Raw
History Blame Contribute Delete
13.4 kB
"""
Documentation Crawler Module
Handles recursive crawling, rate limiting, and HTML parsing.
"""
import time
import json
import logging
import asyncio
import re
import xml.etree.ElementTree as ET
from urllib.parse import urljoin, urlparse, urlunparse, parse_qsl, urlencode
from urllib.robotparser import RobotFileParser
from typing import Set, List, Optional, Dict, Any
from bs4 import BeautifulSoup
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
try:
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
except ImportError:
AsyncWebCrawler = None
BrowserConfig = None
CrawlerRunConfig = None
CacheMode = None
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def _async_crawl4ai_urls(urls: List[str], max_pages: int = 100) -> List[Dict[str, str]]:
if AsyncWebCrawler is None:
raise RuntimeError("crawl4ai is not installed")
browser_config = BrowserConfig(headless=True)
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS if CacheMode else None,
wait_until="domcontentloaded",
max_retries=2,
max_scroll_steps=10,
verbose=False,
only_text=True,
page_timeout=30000,
)
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(urls=urls, config=run_config)
if hasattr(results, "__aiter__"):
results = [result async for result in results]
elif not isinstance(results, list):
results = list(results)
documents = []
for result in results:
if not getattr(result, "success", False):
logger.warning(
f"Crawl4AI failed for {getattr(result, 'url', '<unknown>')}: {getattr(result, 'error_message', 'no error message')}"
)
continue
content = (
getattr(result, "extracted_content", None)
or getattr(result, "markdown", None)
or getattr(result, "cleaned_html", None)
or ""
)
if not content:
continue
documents.append({"url": getattr(result, "url", ""), "content": content})
return documents
async def async_crawl_urls_with_crawl4ai(urls: List[str], max_pages: int = 100) -> List[Dict[str, str]]:
return await _async_crawl4ai_urls(urls, max_pages)
def _fallback_crawl(base_url: str, max_pages: int = 100) -> List[Dict[str, str]]:
crawler = DocumentationCrawler(base_url=base_url, max_depth=3, max_pages=max_pages)
documents = crawler.crawl()
if not documents:
raise RuntimeError("Fallback DocumentationCrawler returned no documents")
return documents
async def async_crawl_and_persist(base_url: str, output_path: str = "./crawler_docs.json", max_pages: int = 100) -> List[Dict[str, str]]:
if AsyncWebCrawler is None:
documents = await asyncio.to_thread(_fallback_crawl, base_url, max_pages)
else:
try:
documents = await _async_crawl4ai_urls([base_url], max_pages=max_pages)
if not documents:
raise RuntimeError("Crawl4AI returned no documents")
except Exception as e:
logging.getLogger(__name__).info(f"Crawl4AI failed, falling back to DocumentationCrawler: {e}")
documents = await asyncio.to_thread(_fallback_crawl, base_url, max_pages)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(documents, f, indent=2, ensure_ascii=False)
return documents
def crawl_and_persist(base_url: str, output_path: str = "./crawler_docs.json", max_pages: int = 100) -> List[Dict[str, str]]:
return asyncio.run(async_crawl_and_persist(base_url, output_path=output_path, max_pages=max_pages))
def crawl_urls_with_crawl4ai(urls: List[str], max_pages: int = 100) -> List[Dict[str, str]]:
if AsyncWebCrawler is None:
raise RuntimeError("crawl4ai is not installed")
try:
return asyncio.run(_async_crawl4ai_urls(urls, max_pages))
except Exception as e:
raise RuntimeError(f"Crawl4AI URL crawl failed: {e}") from e
class DocumentationCrawler:
"""
Recursively crawls documentation websites with politeness and rate limiting.
"""
def __init__(self,
base_url: str,
max_depth: int = 3,
delay: float = 0.5,
timeout: int = 10,
max_pages: int = 100,
respect_robots_txt: bool = True,
use_sitemap: bool = True):
self.base_url = base_url
self.max_depth = max_depth
self.delay = delay
self.timeout = timeout
self.max_pages = max_pages
self.respect_robots_txt = respect_robots_txt
self.use_sitemap = use_sitemap
self.visited_urls: Set[str] = set()
self.failed_urls: Set[str] = set()
self.robots_parser: Optional[RobotFileParser] = None
self.crawl_delay = delay
self.sitemap_urls: List[str] = []
self.session = self._setup_session()
self.domain = urlparse(base_url).netloc.lower()
self._load_robots_rules()
if self.use_sitemap:
self.sitemap_urls = self._load_sitemap_urls()
def _setup_session(self) -> requests.Session:
session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
return session
def _load_robots_rules(self) -> None:
if not self.respect_robots_txt:
return
robots_url = urljoin(self.base_url, "/robots.txt")
parser = RobotFileParser()
parser.set_url(robots_url)
try:
parser.read()
self.robots_parser = parser
delay = parser.crawl_delay(self.session.headers.get('User-Agent', '*'))
if delay is not None:
self.crawl_delay = max(self.delay, delay)
logger.info(f"Using crawl delay {self.crawl_delay} from robots.txt")
except Exception as exc:
logger.warning(f"Could not load robots.txt from {robots_url}: {exc}")
self.robots_parser = None
def _load_sitemap_urls(self) -> List[str]:
sitemap_urls: List[str] = []
if not self.use_sitemap:
return sitemap_urls
try:
robots_url = urljoin(self.base_url, "/robots.txt")
response = self.session.get(robots_url, timeout=self.timeout)
response.raise_for_status()
for line in response.text.splitlines():
if line.strip().lower().startswith("sitemap:"):
sitemap_url = line.split(":", 1)[1].strip()
if sitemap_url:
sitemap_urls.append(sitemap_url)
except Exception:
pass
if not sitemap_urls:
sitemap_urls.append(urljoin(self.base_url, "/sitemap.xml"))
discovered: List[str] = []
for sitemap_url in sitemap_urls:
try:
response = self.session.get(sitemap_url, timeout=self.timeout)
response.raise_for_status()
root = ET.fromstring(response.content)
for elem in root.findall('.//{*}loc'):
normalized = self._normalize_url(elem.text or "")
if normalized:
discovered.append(normalized)
except Exception:
continue
return list(dict.fromkeys(discovered))
def _normalize_url(self, url: str) -> str:
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
return ""
scheme = parsed.scheme.lower()
netloc = parsed.netloc.lower()
if netloc.endswith(":80") and scheme == "http":
netloc = netloc[:-3]
elif netloc.endswith(":443") and scheme == "https":
netloc = netloc[:-4]
path = parsed.path or "/"
path = re.sub(r"/+", "/", path)
if path != "/" and path.endswith("/"):
path = path.rstrip("/")
query_pairs = parse_qsl(parsed.query, keep_blank_values=True)
filtered_pairs = [
(k, v)
for k, v in query_pairs
if not re.match(r'^(utm_|fbclid|gclid|mc_cid|mc_eid|ref)', k, re.IGNORECASE)
]
filtered_pairs.sort()
query = urlencode(filtered_pairs, doseq=True)
normalized = urlunparse((scheme, netloc, path, "", query, ""))
return normalized
def _is_allowed_by_robots(self, url: str) -> bool:
if not self.robots_parser:
return True
return self.robots_parser.can_fetch(self.session.headers.get('User-Agent', '*'), url)
def _should_crawl_url(self, url: str) -> bool:
normalized = self._normalize_url(url)
if not normalized:
return False
parsed = urlparse(normalized)
if parsed.netloc != self.domain:
return False
if normalized in self.visited_urls:
return False
if not parsed.scheme or parsed.scheme not in ("http", "https"):
return False
if self.respect_robots_txt and not self._is_allowed_by_robots(normalized):
logger.info(f"Skipping {normalized} due to robots.txt")
return False
avoid_extensions = ['.pdf', '.zip', '.exe', '.jpg', '.jpeg', '.png', '.gif', '.css', '.js', '.svg', '.ico', '.woff', '.woff2']
if any(parsed.path.lower().endswith(ext) for ext in avoid_extensions):
return False
return True
def _extract_text(self, html: str) -> str:
soup = BeautifulSoup(html, 'html.parser')
for element in soup(['script', 'style', 'nav', 'footer', 'noscript', 'header', 'aside', 'form', 'iframe', 'svg', 'canvas']):
element.decompose()
content = soup.find('article') or soup.find('main') or soup.body or soup
text = content.get_text(separator='\n', strip=True)
lines = [line.strip() for line in text.split('\n') if line.strip()]
return '\n'.join(lines)
def _extract_links(self, html: str, current_url: str) -> List[str]:
soup = BeautifulSoup(html, 'html.parser')
links = []
for link in soup.find_all('a', href=True):
href = link['href'].strip()
if not href or href.startswith('mailto:') or href.startswith('javascript:'):
continue
absolute_url = urljoin(current_url, href)
absolute_url = absolute_url.split('#')[0].strip()
if not absolute_url:
continue
links.append(absolute_url)
return links
def crawl(self) -> List[dict]:
to_crawl = []
normalized_base = self._normalize_url(self.base_url)
if normalized_base:
to_crawl.append((normalized_base, 0))
for sitemap_url in self.sitemap_urls:
if self._should_crawl_url(sitemap_url):
to_crawl.append((sitemap_url, 0))
documents = []
logger.info(f"Starting crawl of {self.base_url}")
while to_crawl and len(self.visited_urls) < self.max_pages:
current_url, depth = to_crawl.pop(0)
if depth > self.max_depth:
continue
if not self._should_crawl_url(current_url):
continue
normalized_current = self._normalize_url(current_url)
if not normalized_current:
continue
self.visited_urls.add(normalized_current)
try:
logger.info(f"Crawling [{len(self.visited_urls)}/{self.max_pages}] {normalized_current} (depth: {depth})")
time.sleep(self.crawl_delay)
response = self.session.get(normalized_current, timeout=self.timeout)
response.raise_for_status()
text_content = self._extract_text(response.text)
if text_content:
documents.append({
'url': normalized_current,
'content': text_content
})
if depth < self.max_depth:
links = self._extract_links(response.text, normalized_current)
for link in links:
if self._should_crawl_url(link):
normalized_link = self._normalize_url(link)
if normalized_link:
to_crawl.append((normalized_link, depth + 1))
except Exception as e:
logger.error(f"Failed to crawl {normalized_current}: {str(e)}")
self.failed_urls.add(normalized_current)
logger.info(f"Crawl complete. Fetched {len(documents)} pages, {len(self.failed_urls)} failed.")
return documents