knowledge-net / backend /scraper.py
Soham Waghmare
fix: scrape error handling
f7da48c
import asyncio
import json
import logging
import time
from typing import Any, Dict, List
from urllib.parse import quote_plus
import requests
from bs4 import BeautifulSoup
from crawl4ai import AsyncWebCrawler, BrowserConfig, CacheMode
class CrawlForAIScraper:
def __init__(self) -> None:
self.logger = logging.getLogger(__name__)
self.session = requests.Session()
self.base_browser = BrowserConfig(
browser_type="chromium",
headless=True,
viewport_width=1920,
viewport_height=1080,
accept_downloads=False,
verbose=False,
)
self.crawler = AsyncWebCrawler(config=self.base_browser)
self._is_started = False
async def start(self):
if not self._is_started:
await self.crawler.start()
time.sleep(1)
self._is_started = True
async def close(self):
if self._is_started:
await self.crawler.close()
self._is_started = False
async def search_and_scrape(self, query: str, num_sites: int = 10) -> List[Dict[str, Any]]:
await self.start()
self.logger.info(f"Querying: {query}")
# Perform a search to get a list of webpages
search_results = await self._search(query)
# Scrape each webpage
scraped_data = []
self.logger.info(f"Scraping {num_sites} sites...")
data = await self._scrape_pages(search_results[: num_sites + 2], num_sites)
scraped_data.extend(data)
# Scrape next pages when some failed
for _ in range(3):
if len(scraped_data) < num_sites:
idx_last_page = search_results.index(search_results[-1])
data = await self._scrape_pages(search_results[idx_last_page + 1 : num_sites + 2], num_sites)
scraped_data.extend(data)
self.logger.info(f"Completed scraping {len(scraped_data)} sites")
return scraped_data
async def _search(self, query: str) -> List[str]:
try:
encoded_query = quote_plus(query)
search_uri = f"https://www.google.com/search?q={encoded_query}"
result = await self.crawler.arun(
url=search_uri,
screenshot=False,
cache_mode=CacheMode.BYPASS,
delay_before_return_html=2,
scan_full_page=True,
)
soup = BeautifulSoup(result.html, "html.parser")
search_results = []
for link in list(soup.select("div > span > a"))[2:]:
url = link.get("href").replace(" ", "").replace("\n", "").strip()
if not url.startswith(("http://", "https://")):
url = "https://" + url
if "support.google.com" in url or url.startswith("/search?q="):
continue
search_results.append(url)
for _ in range(3):
if not search_results:
self.logger.info("Performing DuckDuckGo search as fallback...")
self.logger.warning("No search results found.")
search_results = await self._duckduckgo_search(query)
if not search_results:
raise Exception("No results found")
self.logger.info(f"Found {len(search_results)} results")
return search_results
except Exception as e:
self.logger.error(f"Google search error: {str(e)}", exc_info=True)
raise
async def _duckduckgo_search(self, query: str) -> List[str]:
self.logger.info("Performing DuckDuckGo search...")
try:
encoded_query = quote_plus(query)
search_uri = f"https://html.duckduckgo.com/html/?q={encoded_query}"
# response = self.session.get(
# url,
# headers={
# "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
# },
# timeout=10,
# )
# response.raise_for_status()
result = await self.crawler.arun(
url=search_uri,
screenshot=False,
cache_mode=CacheMode.BYPASS,
delay_before_return_html=2,
scan_full_page=True,
)
soup = BeautifulSoup(result.html, "html.parser")
search_results = []
# DuckDuckGo search results are in elements with class 'result__url'
for result in soup.select(".result__url"):
url = result.get("href").replace(" ", "").replace("\\n", "")
if not url.startswith(("http://", "https://")):
url = "https://" + url
search_results.append(url)
self.logger.info(f"Found {len(search_results)} URLs")
return search_results
except requests.exceptions.RequestException as e: # Catch network errors specifically
self.logger.error(f"DuckDuckGo search error: {str(e)}")
return []
except Exception as e: # Catch any other errors
self.logger.error(f"DuckDuckGo search error: {str(e)}")
return []
async def _scrape_pages(self, urls: str, max_sites: int) -> Dict[str, Any]:
await self.start()
try:
# Run the crawler on a URL
results = await self.crawler.arun_many(
urls=urls,
screenshot=False,
cache_mode=CacheMode.BYPASS,
scan_full_page=True,
semaphore_count=4,
wait_for_images=True,
scroll_delay=0.1,
delay_before_return_html=2,
exclude_external_images=True,
page_timeout=25000,
)
scraped_sites = []
for result in results:
if result.success:
soup = BeautifulSoup(result.html, "html.parser")
# Combine images
extracted_images = self._extract_images(soup, result.url)
media_images = []
for img in result.media["images"]:
if img["width"] is None or (isinstance(img["width"], (int, float)) and img["width"] > 300):
# Resolve multiple URLs in the src attribute
src = img["src"]
if " " in src and "w," in src:
urls = [url.strip() for url in src.split(" ") if url.strip()]
if urls:
last_url = urls[-1].split(" ")[0]
media_images.append(last_url)
else:
media_images.append(src)
all_images = list(set(extracted_images + media_images))
# Combine videos
all_videos = self._extract_videos(soup)
media_videos = [v["src"] for v in result.media["videos"] if v["src"]]
all_videos = list(set(all_videos + media_videos))
data = {
"url": result.url,
"text": result.markdown,
"images": all_images,
"videos": all_videos,
"links": self._extract_links(result.links["external"]),
}
scraped_sites.append(data)
self.logger.info(f" - {result.url[:80]}...")
return scraped_sites[:max_sites]
except Exception as e:
self.logger.error(f"Scraping error while {urls}: {str(e)}")
return {}
def _extract_images(self, soup: BeautifulSoup, url: str) -> List[str]:
# Extract images with width and height greater than 300 pixels
images = []
for img in soup.find_all("img"):
if "src" in img.attrs:
src = img["src"]
if not "width" or "height" not in img.attrs:
continue
if "width" in img.attrs and img.get("width").lower() == "auto":
images.append((src, 999, 0))
# Remove units from width and height: get start of the entity till the first non-digit character
width = "".join([i for i in img.get("width", "0") if i.isdigit() or i == "."])
height = "".join([i for i in img.get("height", "0") if i.isdigit() or i == "."])
if width == "" or height == "":
continue
width, height = float(width), float(height)
if width > 300 and height > 300 and "pixel" not in src and "icon" not in src:
images.append((src, width, height))
images = sorted(images, key=lambda img: -1 * (img[1] * img[2]))
images = [img[0] for img in images]
# Add base URL to relative URLs
base_url = "/".join(url.split("/")[:3])
images = [img if img.startswith("http") else base_url + img for img in images]
return images
def _extract_videos(self, soup: BeautifulSoup) -> List[str]:
# Extract videos from iframes and video tags
videos = []
nodes = list(soup.find_all("iframe")) + list(soup.find_all("video")) + list(soup.find_all("a"))
for node in nodes:
if not any(
keyword in node.get("src", "") or keyword in node.get("href", "")
for keyword in ["accounts.google.com", "blob:", "youtube.com/redirect"]
):
continue
elif (
any(node.name in tag for tag in ["video", "iframe", "a"])
and "www.youtube.com/watch?v" in node.get("src", "")
or "www.youtube.com/watch?v" in node.get("href", "")
):
videos.append(node.get("src", ""))
return videos
def _extract_links(self, links: list) -> List[str]:
# Filter out unwanted links
filtered_links = []
for link in links:
url = link.get("href")
if url.startswith(("http://", "https://")) and not any(
keyword in url
for keyword in ["support.google.com", "google.com", "accounts.google.com", "youtube.com", "blob:", "mailto:", "javascript:"]
):
filtered_links.append(link)
return filtered_links
if __name__ == "__main__":
# Testing the scraper
import sys
urls = [
"https://docs.anthropic.com/en/docs/agents-and-tools/claude-code/overview",
"https://docs.crawl4ai.com/advanced/multi-url-crawling/",
"https://github.com/SesameAILabs/csm",
"https://docs.anthropic.com/en/docs/agents-and-tools/claude-code/overview",
"https://docs.crawl4ai.com/advanced/multi-url-crawling/",
"https://github.com/SesameAILabs/csm",
]
if len(sys.argv) > 1:
urls = sys.argv[1:]
async def main():
scraper = CrawlForAIScraper()
await scraper.start()
data = await scraper.search_and_scrape("blender.org")
await scraper.close()
with open("output.log.json", "w") as f:
f.write(json.dumps(data, indent=2))
print(json.dumps(data, indent=2))
asyncio.run(main())