import asyncio import json import logging import time from typing import Any, Dict, List from urllib.parse import quote_plus import requests from bs4 import BeautifulSoup from crawl4ai import AsyncWebCrawler, BrowserConfig, CacheMode class CrawlForAIScraper: def __init__(self) -> None: self.logger = logging.getLogger(__name__) self.session = requests.Session() self.base_browser = BrowserConfig( browser_type="chromium", headless=True, viewport_width=1920, viewport_height=1080, accept_downloads=False, verbose=False, ) self.crawler = AsyncWebCrawler(config=self.base_browser) self._is_started = False async def start(self): if not self._is_started: await self.crawler.start() time.sleep(1) self._is_started = True async def close(self): if self._is_started: await self.crawler.close() self._is_started = False async def search_and_scrape(self, query: str, num_sites: int = 10) -> List[Dict[str, Any]]: await self.start() self.logger.info(f"Querying: {query}") # Perform a search to get a list of webpages search_results = await self._search(query) # Scrape each webpage scraped_data = [] self.logger.info(f"Scraping {num_sites} sites...") data = await self._scrape_pages(search_results[: num_sites + 2], num_sites) scraped_data.extend(data) # Scrape next pages when some failed for _ in range(3): if len(scraped_data) < num_sites: idx_last_page = search_results.index(search_results[-1]) data = await self._scrape_pages(search_results[idx_last_page + 1 : num_sites + 2], num_sites) scraped_data.extend(data) self.logger.info(f"Completed scraping {len(scraped_data)} sites") return scraped_data async def _search(self, query: str) -> List[str]: try: encoded_query = quote_plus(query) search_uri = f"https://www.google.com/search?q={encoded_query}" result = await self.crawler.arun( url=search_uri, screenshot=False, cache_mode=CacheMode.BYPASS, delay_before_return_html=2, scan_full_page=True, ) soup = BeautifulSoup(result.html, "html.parser") search_results = [] for link in list(soup.select("div > span > a"))[2:]: url = link.get("href").replace(" ", "").replace("\n", "").strip() if not url.startswith(("http://", "https://")): url = "https://" + url if "support.google.com" in url or url.startswith("/search?q="): continue search_results.append(url) for _ in range(3): if not search_results: self.logger.info("Performing DuckDuckGo search as fallback...") self.logger.warning("No search results found.") search_results = await self._duckduckgo_search(query) if not search_results: raise Exception("No results found") self.logger.info(f"Found {len(search_results)} results") return search_results except Exception as e: self.logger.error(f"Google search error: {str(e)}", exc_info=True) raise async def _duckduckgo_search(self, query: str) -> List[str]: self.logger.info("Performing DuckDuckGo search...") try: encoded_query = quote_plus(query) search_uri = f"https://html.duckduckgo.com/html/?q={encoded_query}" # response = self.session.get( # url, # headers={ # "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" # }, # timeout=10, # ) # response.raise_for_status() result = await self.crawler.arun( url=search_uri, screenshot=False, cache_mode=CacheMode.BYPASS, delay_before_return_html=2, scan_full_page=True, ) soup = BeautifulSoup(result.html, "html.parser") search_results = [] # DuckDuckGo search results are in elements with class 'result__url' for result in soup.select(".result__url"): url = result.get("href").replace(" ", "").replace("\\n", "") if not url.startswith(("http://", "https://")): url = "https://" + url search_results.append(url) self.logger.info(f"Found {len(search_results)} URLs") return search_results except requests.exceptions.RequestException as e: # Catch network errors specifically self.logger.error(f"DuckDuckGo search error: {str(e)}") return [] except Exception as e: # Catch any other errors self.logger.error(f"DuckDuckGo search error: {str(e)}") return [] async def _scrape_pages(self, urls: str, max_sites: int) -> Dict[str, Any]: await self.start() try: # Run the crawler on a URL results = await self.crawler.arun_many( urls=urls, screenshot=False, cache_mode=CacheMode.BYPASS, scan_full_page=True, semaphore_count=4, wait_for_images=True, scroll_delay=0.1, delay_before_return_html=2, exclude_external_images=True, page_timeout=25000, ) scraped_sites = [] for result in results: if result.success: soup = BeautifulSoup(result.html, "html.parser") # Combine images extracted_images = self._extract_images(soup, result.url) media_images = [] for img in result.media["images"]: if img["width"] is None or (isinstance(img["width"], (int, float)) and img["width"] > 300): # Resolve multiple URLs in the src attribute src = img["src"] if " " in src and "w," in src: urls = [url.strip() for url in src.split(" ") if url.strip()] if urls: last_url = urls[-1].split(" ")[0] media_images.append(last_url) else: media_images.append(src) all_images = list(set(extracted_images + media_images)) # Combine videos all_videos = self._extract_videos(soup) media_videos = [v["src"] for v in result.media["videos"] if v["src"]] all_videos = list(set(all_videos + media_videos)) data = { "url": result.url, "text": result.markdown, "images": all_images, "videos": all_videos, "links": self._extract_links(result.links["external"]), } scraped_sites.append(data) self.logger.info(f" - {result.url[:80]}...") return scraped_sites[:max_sites] except Exception as e: self.logger.error(f"Scraping error while {urls}: {str(e)}") return {} def _extract_images(self, soup: BeautifulSoup, url: str) -> List[str]: # Extract images with width and height greater than 300 pixels images = [] for img in soup.find_all("img"): if "src" in img.attrs: src = img["src"] if not "width" or "height" not in img.attrs: continue if "width" in img.attrs and img.get("width").lower() == "auto": images.append((src, 999, 0)) # Remove units from width and height: get start of the entity till the first non-digit character width = "".join([i for i in img.get("width", "0") if i.isdigit() or i == "."]) height = "".join([i for i in img.get("height", "0") if i.isdigit() or i == "."]) if width == "" or height == "": continue width, height = float(width), float(height) if width > 300 and height > 300 and "pixel" not in src and "icon" not in src: images.append((src, width, height)) images = sorted(images, key=lambda img: -1 * (img[1] * img[2])) images = [img[0] for img in images] # Add base URL to relative URLs base_url = "/".join(url.split("/")[:3]) images = [img if img.startswith("http") else base_url + img for img in images] return images def _extract_videos(self, soup: BeautifulSoup) -> List[str]: # Extract videos from iframes and video tags videos = [] nodes = list(soup.find_all("iframe")) + list(soup.find_all("video")) + list(soup.find_all("a")) for node in nodes: if not any( keyword in node.get("src", "") or keyword in node.get("href", "") for keyword in ["accounts.google.com", "blob:", "youtube.com/redirect"] ): continue elif ( any(node.name in tag for tag in ["video", "iframe", "a"]) and "www.youtube.com/watch?v" in node.get("src", "") or "www.youtube.com/watch?v" in node.get("href", "") ): videos.append(node.get("src", "")) return videos def _extract_links(self, links: list) -> List[str]: # Filter out unwanted links filtered_links = [] for link in links: url = link.get("href") if url.startswith(("http://", "https://")) and not any( keyword in url for keyword in ["support.google.com", "google.com", "accounts.google.com", "youtube.com", "blob:", "mailto:", "javascript:"] ): filtered_links.append(link) return filtered_links if __name__ == "__main__": # Testing the scraper import sys urls = [ "https://docs.anthropic.com/en/docs/agents-and-tools/claude-code/overview", "https://docs.crawl4ai.com/advanced/multi-url-crawling/", "https://github.com/SesameAILabs/csm", "https://docs.anthropic.com/en/docs/agents-and-tools/claude-code/overview", "https://docs.crawl4ai.com/advanced/multi-url-crawling/", "https://github.com/SesameAILabs/csm", ] if len(sys.argv) > 1: urls = sys.argv[1:] async def main(): scraper = CrawlForAIScraper() await scraper.start() data = await scraper.search_and_scrape("blender.org") await scraper.close() with open("output.log.json", "w") as f: f.write(json.dumps(data, indent=2)) print(json.dumps(data, indent=2)) asyncio.run(main())