Spaces:
Paused
Paused
| """ | |
| Web URL crawler and content extractor for RAG indexing. | |
| Fetches a URL and linked pages up to a configurable depth. | |
| Also downloads and processes any PDF files linked on the pages. | |
| """ | |
| import re | |
| import logging | |
| import tempfile | |
| import os | |
| from pathlib import Path | |
| from typing import List, Dict, Any, Set, Tuple | |
| from urllib.parse import urljoin, urlparse | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from utils.document_processor import chunk_text, process_document_chunked | |
| logger = logging.getLogger(__name__) | |
| MAX_PAGES = 50 | |
| REQUEST_TIMEOUT = 15 | |
| HEADERS = {"User-Agent": "Mozilla/5.0 (compatible; MultimodalRAG/1.0)"} | |
| def _is_same_domain(url: str, base_url: str) -> bool: | |
| """Return True if url shares the same host (or is a subdomain) as base_url.""" | |
| try: | |
| base_host = urlparse(base_url).netloc.lower() | |
| url_host = urlparse(url).netloc.lower() | |
| return url_host == base_host or url_host.endswith("." + base_host) | |
| except Exception: | |
| return False | |
| def _fetch_html(url: str) -> Tuple[str | None, str]: | |
| """GET a URL and return (text, content_type). Returns (None, '') on failure.""" | |
| try: | |
| resp = requests.get(url, timeout=REQUEST_TIMEOUT, headers=HEADERS, allow_redirects=True) | |
| resp.raise_for_status() | |
| ct = resp.headers.get("content-type", "").lower() | |
| return resp.text, ct | |
| except Exception as e: | |
| logger.warning(f"Failed to fetch {url}: {e}") | |
| return None, "" | |
| def _fetch_binary(url: str) -> bytes | None: | |
| """Download raw bytes (for PDFs). Returns None on failure.""" | |
| try: | |
| resp = requests.get(url, timeout=REQUEST_TIMEOUT, headers=HEADERS, allow_redirects=True) | |
| resp.raise_for_status() | |
| return resp.content | |
| except Exception as e: | |
| logger.warning(f"Failed to download {url}: {e}") | |
| return None | |
| def _extract_text_and_links( | |
| html: str, base_url: str | |
| ) -> Tuple[str, List[str], List[str]]: | |
| """ | |
| Parse HTML and return: | |
| clean_text — visible page text with boilerplate removed | |
| html_links — absolute http/https links to other HTML pages | |
| pdf_links — absolute http/https links whose path ends in .pdf | |
| """ | |
| soup = BeautifulSoup(html, "lxml") | |
| for tag in soup(["script", "style", "nav", "footer", "header", "aside", "noscript"]): | |
| tag.decompose() | |
| text = soup.get_text(separator="\n", strip=True) | |
| text = re.sub(r"\n{3,}", "\n\n", text).strip() | |
| html_links: List[str] = [] | |
| pdf_links: List[str] = [] | |
| seen: Set[str] = set() | |
| for a in soup.find_all("a", href=True): | |
| href = a["href"].strip() | |
| absolute = urljoin(base_url, href) | |
| parsed = urlparse(absolute) | |
| if parsed.scheme not in ("http", "https"): | |
| continue | |
| # Normalise: drop fragment | |
| clean = parsed._replace(fragment="").geturl() | |
| if clean in seen: | |
| continue | |
| seen.add(clean) | |
| path_lower = parsed.path.lower() | |
| if path_lower.endswith(".pdf") or ".pdf?" in path_lower: | |
| pdf_links.append(clean) | |
| else: | |
| html_links.append(clean) | |
| return text, html_links, pdf_links | |
| def _process_pdf_url(pdf_url: str, start_url: str) -> List[Dict[str, Any]]: | |
| """ | |
| Download a PDF from pdf_url, process it with the existing PDF extractor, | |
| fix up source metadata to point at the PDF URL, and return the chunks. | |
| """ | |
| logger.info(f"Downloading PDF: {pdf_url}") | |
| data = _fetch_binary(pdf_url) | |
| if not data: | |
| return [] | |
| with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp: | |
| tmp.write(data) | |
| tmp_path = tmp.name | |
| try: | |
| chunks = process_document_chunked(tmp_path) | |
| tmp_name = Path(tmp_path).name | |
| for chunk in chunks: | |
| # Replace the temp filename with the real PDF URL in chunk text | |
| chunk["text"] = chunk["text"].replace(tmp_name, pdf_url) | |
| chunk["metadata"]["source"] = start_url | |
| chunk["metadata"]["page_url"] = pdf_url | |
| chunk["metadata"]["pdf_source"] = pdf_url | |
| return chunks | |
| except Exception as e: | |
| logger.warning(f"Failed to process PDF {pdf_url}: {e}") | |
| return [] | |
| finally: | |
| try: | |
| os.unlink(tmp_path) | |
| except OSError: | |
| pass | |
| def crawl_url( | |
| start_url: str, | |
| max_depth: int = 2, | |
| max_pages: int = MAX_PAGES, | |
| same_domain_only: bool = True, | |
| ) -> Tuple[List[Dict[str, Any]], List[str]]: | |
| """ | |
| BFS-crawl start_url up to max_depth link levels. | |
| - HTML pages are scraped for text. | |
| - PDF files linked from any crawled page are downloaded and indexed. | |
| - All chunks share source=start_url so they can be removed as a unit. | |
| Returns: | |
| chunks — list of {text, metadata} dicts for VectorStoreManager | |
| crawled_urls — list of all URLs successfully processed | |
| """ | |
| visited_html: Set[str] = set() | |
| visited_pdf: Set[str] = set() | |
| crawled_urls: List[str] = [] | |
| all_chunks: List[Dict[str, Any]] = [] | |
| # BFS queue: (url, depth) | |
| queue: List[Tuple[str, int]] = [(start_url, 0)] | |
| while queue and len(visited_html) < max_pages: | |
| url, depth = queue.pop(0) | |
| if url in visited_html: | |
| continue | |
| if same_domain_only and depth > 0 and not _is_same_domain(url, start_url): | |
| continue | |
| visited_html.add(url) | |
| logger.info(f"Crawling depth={depth}: {url}") | |
| content, ct = _fetch_html(url) | |
| if not content or ("text/html" not in ct and "text/plain" not in ct): | |
| continue | |
| text, html_links, pdf_links = _extract_text_and_links(content, url) | |
| if text: | |
| crawled_urls.append(url) | |
| sub_texts = chunk_text(text) | |
| for i, sub in enumerate(sub_texts): | |
| all_chunks.append({ | |
| "text": f"[Source: {url}]\n{sub}", | |
| "metadata": { | |
| "source": start_url, | |
| "page_url": url, | |
| "type": "web", | |
| "depth": depth, | |
| "chunk_index": i, | |
| }, | |
| }) | |
| # Process PDFs found on this page (no depth restriction for PDFs) | |
| for pdf_url in pdf_links: | |
| if pdf_url not in visited_pdf: | |
| if same_domain_only and not _is_same_domain(pdf_url, start_url): | |
| continue | |
| visited_pdf.add(pdf_url) | |
| pdf_chunks = _process_pdf_url(pdf_url, start_url) | |
| if pdf_chunks: | |
| all_chunks.extend(pdf_chunks) | |
| crawled_urls.append(pdf_url) | |
| # Enqueue child HTML pages if depth limit not yet reached | |
| if depth < max_depth: | |
| for link in html_links: | |
| if link not in visited_html: | |
| queue.append((link, depth + 1)) | |
| logger.info( | |
| f"Crawl complete: {len(crawled_urls)} pages/files, " | |
| f"{len(all_chunks)} chunks from {start_url}" | |
| ) | |
| return all_chunks, crawled_urls | |