| """EyeWiki crawler for medical article scraping using crawl4ai.""" |
|
|
| import asyncio |
| import json |
| import re |
| from collections import deque |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Dict, Optional, Set |
| from urllib.parse import urljoin, urlparse, parse_qs |
| from urllib.robotparser import RobotFileParser |
|
|
| import aiohttp |
| from bs4 import BeautifulSoup |
| from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode |
| from rich.console import Console |
| from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn |
|
|
| from config.settings import settings |
|
|
|
|
| class EyeWikiCrawler: |
| """ |
| Asynchronous crawler for EyeWiki medical articles. |
| |
| Features: |
| - Asynchronous crawling with crawl4ai |
| - Respects robots.txt |
| - Polite crawling with configurable delays |
| - Markdown content extraction |
| - Checkpointing for resume capability |
| - Progress tracking with rich console |
| """ |
|
|
| def __init__( |
| self, |
| base_url: str = "https://eyewiki.org", |
| output_dir: Optional[Path] = None, |
| checkpoint_file: Optional[Path] = None, |
| delay: float = 1.5, |
| timeout: int = 30, |
| ): |
| """ |
| Initialize the EyeWiki crawler. |
| |
| Args: |
| base_url: Base URL for EyeWiki |
| output_dir: Directory to save scraped articles |
| checkpoint_file: Path to checkpoint file |
| delay: Delay between requests in seconds |
| timeout: Request timeout in seconds |
| """ |
| self.base_url = base_url |
| self.domain = urlparse(base_url).netloc |
| self.output_dir = output_dir or Path(settings.data_raw_path) |
| self.checkpoint_file = checkpoint_file or (self.output_dir / "crawler_checkpoint.json") |
| self.delay = delay |
| self.timeout = timeout |
|
|
| |
| self.output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| self.visited_urls: Set[str] = set() |
| self.to_crawl: deque = deque() |
| self.failed_urls: Dict[str, str] = {} |
| self.articles_saved: int = 0 |
|
|
| |
| self.console = Console() |
|
|
| |
| self.robot_parser = RobotFileParser() |
| self.robot_parser.set_url(urljoin(base_url, "/robots.txt")) |
|
|
| |
| self.skip_patterns = [ |
| r"/index\.php\?title=.*&action=", |
| r"/index\.php\?title=.*&diff=", |
| r"/index\.php\?title=.*&oldid=", |
| r"/index\.php\?title=.*&direction=", |
| r"/index\.php\?title=Special:", |
| r"/Special:", |
| r"/User:", |
| r"/User_talk:", |
| r"/Talk:", |
| r"/File:", |
| r"/Template:", |
| r"/Help:", |
| r"/MediaWiki:", |
| r"#", |
| ] |
|
|
| def _is_valid_article_url(self, url: str) -> bool: |
| """ |
| Check if URL is a valid medical article. |
| |
| Args: |
| url: URL to check |
| |
| Returns: |
| True if valid article URL |
| """ |
| |
| if self.domain not in url: |
| return False |
|
|
| |
| for pattern in self.skip_patterns: |
| if re.search(pattern, url): |
| return False |
|
|
| |
| parsed = urlparse(url) |
| path = parsed.path.strip("/") |
|
|
| |
| |
| |
| |
| |
|
|
| |
| if parsed.query and "title=" in parsed.query: |
| return True |
|
|
| |
| |
| if path and not path.startswith("w/") and not "." in path: |
| |
| if path[0].isupper() or path.startswith("wiki/"): |
| return True |
|
|
| return False |
|
|
| def _normalize_url(self, url: str) -> str: |
| """ |
| Normalize URL for consistent comparison. |
| |
| Args: |
| url: URL to normalize |
| |
| Returns: |
| Normalized URL |
| """ |
| |
| url = url.split("#")[0] |
| |
| url = url.rstrip("/") |
| return url |
|
|
| def _can_fetch(self, url: str) -> bool: |
| """ |
| Check if URL can be fetched according to robots.txt. |
| |
| Args: |
| url: URL to check |
| |
| Returns: |
| True if allowed to fetch |
| """ |
| try: |
| return self.robot_parser.can_fetch("*", url) |
| except Exception as e: |
| self.console.print(f"[yellow]Warning: Could not check robots.txt: {e}[/yellow]") |
| return True |
|
|
| def _extract_links(self, html: str, current_url: str) -> Set[str]: |
| """ |
| Extract valid article links from HTML. |
| |
| Args: |
| html: HTML content |
| current_url: Current page URL for resolving relative links |
| |
| Returns: |
| Set of valid article URLs |
| """ |
| soup = BeautifulSoup(html, "html.parser") |
| links = set() |
|
|
| for a_tag in soup.find_all("a", href=True): |
| href = a_tag["href"] |
| |
| absolute_url = urljoin(current_url, href) |
| normalized_url = self._normalize_url(absolute_url) |
|
|
| if self._is_valid_article_url(normalized_url): |
| links.add(normalized_url) |
|
|
| return links |
|
|
| def _extract_metadata(self, soup: BeautifulSoup, url: str) -> Dict: |
| """ |
| Extract metadata from article page. |
| |
| Args: |
| soup: BeautifulSoup object |
| url: Article URL |
| |
| Returns: |
| Dictionary of metadata |
| """ |
| metadata = { |
| "url": url, |
| "title": "", |
| "last_updated": None, |
| "categories": [], |
| "scraped_at": datetime.utcnow().isoformat(), |
| } |
|
|
| |
| title_tag = soup.find("h1", {"id": "firstHeading"}) or soup.find("h1") |
| if title_tag: |
| metadata["title"] = title_tag.get_text(strip=True) |
|
|
| |
| category_links = soup.find_all("a", href=re.compile(r"/Category:")) |
| metadata["categories"] = [link.get_text(strip=True) for link in category_links] |
|
|
| |
| last_modified = soup.find("li", {"id": "footer-info-lastmod"}) |
| if last_modified: |
| metadata["last_updated"] = last_modified.get_text(strip=True) |
|
|
| return metadata |
|
|
| def save_article(self, content: Dict, filepath: Path) -> None: |
| """ |
| Save article content and metadata to files. |
| |
| Args: |
| content: Dictionary with 'markdown' and 'metadata' keys |
| filepath: Base filepath (without extension) |
| """ |
| |
| md_file = filepath.with_suffix(".md") |
| with open(md_file, "w", encoding="utf-8") as f: |
| f.write(content["markdown"]) |
|
|
| |
| json_file = filepath.with_suffix(".json") |
| with open(json_file, "w", encoding="utf-8") as f: |
| json.dump(content["metadata"], f, indent=2, ensure_ascii=False) |
|
|
| self.articles_saved += 1 |
| self.console.print(f"[green][/green] Saved: {content['metadata'].get('title', 'Untitled')}") |
|
|
| def load_checkpoint(self) -> bool: |
| """ |
| Load checkpoint data to resume crawling. |
| |
| Returns: |
| True if checkpoint was loaded successfully |
| """ |
| if not self.checkpoint_file.exists(): |
| return False |
|
|
| try: |
| with open(self.checkpoint_file, "r") as f: |
| data = json.load(f) |
|
|
| self.visited_urls = set(data.get("visited_urls", [])) |
| self.to_crawl = deque(data.get("to_crawl", [])) |
| self.failed_urls = data.get("failed_urls", {}) |
| self.articles_saved = data.get("articles_saved", 0) |
|
|
| self.console.print(f"[blue]Loaded checkpoint:[/blue] {len(self.visited_urls)} visited, " |
| f"{len(self.to_crawl)} queued, {self.articles_saved} saved") |
| return True |
| except Exception as e: |
| self.console.print(f"[red]Error loading checkpoint: {e}[/red]") |
| return False |
|
|
| def save_checkpoint(self) -> None: |
| """Save current crawl state to checkpoint file.""" |
| data = { |
| "visited_urls": list(self.visited_urls), |
| "to_crawl": list(self.to_crawl), |
| "failed_urls": self.failed_urls, |
| "articles_saved": self.articles_saved, |
| "last_checkpoint": datetime.utcnow().isoformat(), |
| } |
|
|
| try: |
| with open(self.checkpoint_file, "w") as f: |
| json.dump(data, f, indent=2) |
| except Exception as e: |
| self.console.print(f"[red]Error saving checkpoint: {e}[/red]") |
|
|
| async def crawl_single_page(self, url: str) -> Optional[Dict]: |
| """ |
| Crawl a single page and extract content. |
| |
| Args: |
| url: URL to crawl |
| |
| Returns: |
| Dictionary with markdown content and metadata, or None if failed |
| """ |
| if not self._can_fetch(url): |
| self.console.print(f"[yellow]Blocked by robots.txt:[/yellow] {url}") |
| return None |
|
|
| try: |
| |
| browser_config = BrowserConfig( |
| headless=True, |
| verbose=False, |
| ) |
|
|
| |
| crawler_config = CrawlerRunConfig( |
| cache_mode=CacheMode.BYPASS, |
| page_timeout=self.timeout * 1000, |
| wait_for="body", |
| ) |
|
|
| |
| async with AsyncWebCrawler(config=browser_config) as crawler: |
| result = await crawler.arun( |
| url=url, |
| config=crawler_config, |
| ) |
|
|
| if not result.success: |
| self.console.print(f"[red]Failed to crawl:[/red] {url}") |
| return None |
|
|
| |
| soup = BeautifulSoup(result.html, "html.parser") |
| metadata = self._extract_metadata(soup, url) |
|
|
| |
| markdown = result.markdown |
|
|
| return { |
| "markdown": markdown, |
| "metadata": metadata, |
| "html": result.html, |
| "links": self._extract_links(result.html, url), |
| } |
|
|
| except Exception as e: |
| self.console.print(f"[red]Error crawling {url}:[/red] {e}") |
| self.failed_urls[url] = str(e) |
| return None |
|
|
| async def crawl( |
| self, |
| max_pages: Optional[int] = None, |
| depth: int = 2, |
| start_urls: Optional[list] = None, |
| ) -> None: |
| """ |
| Crawl EyeWiki starting from the main page. |
| |
| Args: |
| max_pages: Maximum number of pages to crawl (None for unlimited) |
| depth: Maximum depth to crawl |
| start_urls: Optional list of starting URLs (defaults to base_url) |
| """ |
| |
| checkpoint_loaded = self.load_checkpoint() |
|
|
| |
| try: |
| self.robot_parser.read() |
| self.console.print("[green][/green] Loaded robots.txt") |
| except Exception as e: |
| self.console.print(f"[yellow]Warning: Could not load robots.txt: {e}[/yellow]") |
|
|
| |
| if not checkpoint_loaded: |
| if start_urls: |
| self.to_crawl.extend([(url, 0) for url in start_urls]) |
| else: |
| self.to_crawl.append((self.base_url, 0)) |
|
|
| self.console.print(f"\n[bold cyan]Starting EyeWiki Crawl[/bold cyan]") |
| self.console.print(f"Max pages: {max_pages or 'unlimited'}") |
| self.console.print(f"Max depth: {depth}") |
| self.console.print(f"Delay: {self.delay}s\n") |
|
|
| with Progress( |
| SpinnerColumn(), |
| TextColumn("[progress.description]{task.description}"), |
| BarColumn(), |
| TaskProgressColumn(), |
| console=self.console, |
| ) as progress: |
|
|
| task = progress.add_task( |
| "[cyan]Crawling...", |
| total=max_pages if max_pages else 100, |
| ) |
|
|
| try: |
| while self.to_crawl: |
| |
| if max_pages and self.articles_saved >= max_pages: |
| self.console.print(f"\n[yellow]Reached max_pages limit: {max_pages}[/yellow]") |
| break |
|
|
| |
| current_url, current_depth = self.to_crawl.popleft() |
|
|
| |
| if current_url in self.visited_urls: |
| continue |
|
|
| |
| if current_depth > depth: |
| continue |
|
|
| |
| self.visited_urls.add(current_url) |
|
|
| |
| progress.update( |
| task, |
| completed=self.articles_saved, |
| description=f"[cyan]Crawling ({self.articles_saved} saved, {len(self.to_crawl)} queued): {current_url[:60]}...", |
| ) |
|
|
| |
| result = await self.crawl_single_page(current_url) |
|
|
| if result: |
| |
| parsed = urlparse(current_url) |
|
|
| |
| |
| if parsed.query: |
| query_params = parse_qs(parsed.query) |
| if 'title' in query_params: |
| |
| filename = query_params['title'][0] |
| else: |
| |
| filename = parsed.query |
| else: |
| |
| path_parts = parsed.path.strip("/").split("/") |
| filename = "_".join(path_parts[-2:]) if len(path_parts) > 1 else path_parts[-1] |
|
|
| |
| filename = re.sub(r"[^\w\s-]", "_", filename) |
| filename = re.sub(r"[-\s]+", "_", filename) |
| filename = filename[:200] |
|
|
| |
| filepath = self.output_dir / filename |
| self.save_article(result, filepath) |
|
|
| |
| for link in result["links"]: |
| if link not in self.visited_urls: |
| self.to_crawl.append((link, current_depth + 1)) |
|
|
| |
| await asyncio.sleep(self.delay) |
|
|
| |
| if self.articles_saved % 10 == 0: |
| self.save_checkpoint() |
|
|
| except KeyboardInterrupt: |
| self.console.print("\n[yellow]Crawl interrupted by user[/yellow]") |
| except Exception as e: |
| self.console.print(f"\n[red]Error during crawl: {e}[/red]") |
| finally: |
| |
| self.save_checkpoint() |
|
|
| |
| self.console.print("\n[bold cyan]Crawl Summary[/bold cyan]") |
| self.console.print(f"Articles saved: {self.articles_saved}") |
| self.console.print(f"URLs visited: {len(self.visited_urls)}") |
| self.console.print(f"URLs failed: {len(self.failed_urls)}") |
| self.console.print(f"URLs remaining: {len(self.to_crawl)}") |
|
|
| if self.failed_urls: |
| self.console.print("\n[yellow]Failed URLs:[/yellow]") |
| for url, error in list(self.failed_urls.items())[:10]: |
| self.console.print(f" - {url}: {error}") |
| if len(self.failed_urls) > 10: |
| self.console.print(f" ... and {len(self.failed_urls) - 10} more") |
|
|