"""Web crawler module using Firecrawl API for collecting RAG data. Supports multiple crawl modes: - single: Scrape only the given URL - links: Scrape URL + all links found on that page - search: Use map API to find URLs by topic, then scrape - domain: Use crawl API to crawl entire domain """ import json import os import time from datetime import datetime from pathlib import Path from urllib.parse import unquote, urlparse from dotenv import load_dotenv from firecrawl import FirecrawlApp from tqdm import tqdm from src.config import DATA_CRAWLED_DIR from src.utils.common import remove_diacritics load_dotenv() DELAY_SECONDS = 6 class WebCrawler: """Web crawler with multiple crawl modes.""" def __init__(self, api_key: str | None = None): self.api_key = api_key or os.getenv("FIRECRAWL_API_KEY", "") if not self.api_key: raise ValueError("FIRECRAWL_API_KEY required") self.app = FirecrawlApp(api_key=self.api_key) self._last_request = 0.0 def _wait_rate_limit(self): """Wait to respect rate limits.""" elapsed = time.time() - self._last_request if elapsed < DELAY_SECONDS: time.sleep(DELAY_SECONDS - elapsed) self._last_request = time.time() def _scrape_page(self, url: str, get_links: bool = False) -> dict | None: """Scrape a single page.""" self._wait_rate_limit() formats = ["markdown", "summary"] if get_links: formats.append("links") try: result = self.app.scrape(url, formats=formats, only_main_content=True) if hasattr(result, 'markdown') and result.markdown: meta = result.metadata if hasattr(result, 'metadata') else None return { "url": url, "title": getattr(meta, 'title', '') if meta else "", "content": result.markdown, "summary": getattr(result, 'summary', '') or "", "description": getattr(meta, 'description', '') if meta else "", "keywords": getattr(meta, 'keywords', '') if meta else "", "links": result.links if get_links and hasattr(result, 'links') else [], } except Exception as e: print(f"[Crawler] Error scraping {url}: {e}") return None def crawl_single(self, url: str) -> list[dict]: """Mode: single - Crawl only the given URL.""" print(f"[Crawler] Mode: single | URL: {url}") result = self._scrape_page(url) if result: result.pop("links", None) return [result] return [] def crawl_links(self, url: str, max_pages: int = 10, topic: str = "") -> list[dict]: """Mode: links - Crawl URL + links containing topic keywords.""" if not topic: raise ValueError("Topic required for links mode. Use comma to separate keywords.") keywords = [k.strip() for k in topic.split(",") if k.strip()] keywords_normalized = [remove_diacritics(k).replace("_", " ").replace("-", " ") for k in keywords] print(f"[Crawler] Mode: links | URL: {url}") print(f"[Crawler] Keywords: {keywords}") print(f"[Crawler] Scraping main page...") documents = [] main_result = self._scrape_page(url, get_links=True) if not main_result: return [] links = main_result.pop("links", []) documents.append(main_result) print(f"[Crawler] Found {len(links)} total links") if not links or max_pages <= 1: return documents urls_to_scrape = [] source_domain = urlparse(url).netloc skip_patterns = [ '.jpg', '.jpeg', '.png', '.gif', '.svg', '.webp', '.pdf', '.doc', '.docx', '.xls', '.xlsx', '/Special:', '/Đặc_biệt:', '/Tập_tin:', '/File:', '/Thể_loại:', '/Category:', '/Help:', '/Wikipedia:', 'action=edit', 'action=history', 'oldid=', 'diff=', '/w/index.php', '/wiki/index.php', '#cite', '#ref', '#mw-', ] for link in links: if hasattr(link, 'url'): link_url = link.url elif isinstance(link, str): link_url = link elif isinstance(link, dict): link_url = link.get('url', '') else: continue if not link_url: continue link_domain = urlparse(link_url).netloc if link_domain and link_domain != source_domain: continue decoded_url = unquote(link_url) if any(p.lower() in decoded_url.lower() for p in skip_patterns): continue normalized_url = remove_diacritics(decoded_url).replace("_", " ").replace("-", " ") for i, kw_norm in enumerate(keywords_normalized): if kw_norm in normalized_url: print(f" Match: '{keywords[i]}' in {decoded_url}") urls_to_scrape.append(link_url) break print(f"[Crawler] Matched {len(urls_to_scrape)} links") if not urls_to_scrape: return documents scrape_count = min(len(urls_to_scrape), max_pages - 1) print(f"[Crawler] Scraping {scrape_count} pages...") for page_url in tqdm(urls_to_scrape[:scrape_count], desc="Scraping"): if page_url == url: continue result = self._scrape_page(page_url) if result: result.pop("links", None) documents.append(result) return documents def crawl_search(self, url: str, topic: str, max_pages: int = 10) -> list[dict]: """Mode: search - Use map API to find URLs by topic, then scrape.""" print(f"[Crawler] Mode: search | URL: {url} | Topic: {topic}") documents = [] try: map_result = self.app.map(url=url, search=topic, limit=max_pages) urls_to_scrape = [] if hasattr(map_result, 'links'): for link in map_result.links: if hasattr(link, 'url'): urls_to_scrape.append(link.url) elif isinstance(link, str): urls_to_scrape.append(link) elif isinstance(map_result, list): for link in map_result: if isinstance(link, str): urls_to_scrape.append(link) except Exception as e: print(f"[Crawler] Error: Map API failed - {e}") return [] print(f"[Crawler] Found {len(urls_to_scrape)} URLs") for page_url in tqdm(urls_to_scrape, desc="Scraping"): result = self._scrape_page(page_url) if result: result.pop("links", None) documents.append(result) return documents def crawl_domain(self, url: str, max_pages: int = 10, topic: str | None = None) -> list[dict]: """Mode: domain - Use crawl API to crawl domain.""" print(f"[Crawler] Mode: domain | URL: {url}") documents = [] crawl_options = { "limit": max_pages, "scrape_options": { "formats": ["markdown"], "only_main_content": True, } } if topic: crawl_options["include_paths"] = [f"*{topic}*"] try: result = self.app.crawl(url=url, **crawl_options) pages = [] if hasattr(result, 'data'): pages = result.data or [] elif isinstance(result, dict) and 'data' in result: pages = result['data'] or [] for page in pages: if hasattr(page, 'markdown') and page.markdown: meta = page.metadata if hasattr(page, 'metadata') else None documents.append({ "url": getattr(meta, 'sourceURL', '') if meta else "", "title": getattr(meta, 'title', '') if meta else "", "content": page.markdown, "description": getattr(meta, 'description', '') if meta else "", "keywords": getattr(meta, 'keywords', '') if meta else "", }) except Exception as e: print(f"[Crawler] Error: Crawl API failed - {e}") print(f"[Crawler] Crawled {len(documents)} pages") return documents def crawl_website( url: str, mode: str = "links", topic: str | None = None, max_pages: int = 10, api_key: str | None = None, ) -> dict: """Main crawl function with mode selection. Args: url: URL to crawl mode: Crawl mode - 'single', 'links', 'search', 'domain' topic: Optional topic filter (required for 'search' mode) max_pages: Maximum pages to crawl api_key: Firecrawl API key """ crawler = WebCrawler(api_key) if mode == "single": documents = crawler.crawl_single(url) elif mode == "links": documents = crawler.crawl_links(url, max_pages, topic) elif mode == "search": if not topic: raise ValueError("Topic required for search mode") documents = crawler.crawl_search(url, topic, max_pages) elif mode == "domain": documents = crawler.crawl_domain(url, max_pages, topic) else: raise ValueError(f"Unknown mode: {mode}. Use: single, links, search, domain") print(f"[Crawler] Total: {len(documents)} documents") return { "source": url, "domain": urlparse(url).netloc, "mode": mode, "topic": topic, "crawled_at": datetime.now().isoformat(), "total_pages": len(documents), "documents": documents, } def save_crawled_data(data: dict, output_dir: str | Path | None = None, filename: str | None = None) -> Path: """Save crawled data to JSON.""" if output_dir is None: output_path = DATA_CRAWLED_DIR else: output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) if not filename: domain = data.get("domain", "unknown").replace(".", "_") filename = f"{domain}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" path = output_path / filename with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f"[Crawler] Saved: {path}") return path