""" Patchright Scraper Module This module provides a robust web scraping implementation using Patchright (undetected Playwright fork). It supports advanced features like stealth mode, human simulation, CAPTCHA handling, and cloudflare bypassing. """ from typing import Any, Dict, List, Optional from patchright.async_api import async_playwright, Browser, BrowserContext, Page from .base_scraper import BaseScraper import asyncio import random import logging from urllib.parse import urlparse, parse_qs, urlencode, urlunparse import platform import subprocess import os import tempfile # import aioconsole from ..utils.error_handler import ErrorMessages def _get_browser_channel(): """Get browser channel based on platform. Chrome not available on ARM64 Linux.""" system = platform.system().lower() machine = platform.machine().lower() # In Docker/Linux, we often prefer the installed chromium if system == "linux": # Check if chrome actually exists if os.path.exists("/usr/bin/google-chrome") or os.path.exists("/opt/google/chrome/chrome"): return "chrome" return None # Fallback to chromium return "chrome" # Use Chrome for better stealth on macOS/Windows class ScraperConfig: """Configuration class for the Patchright scraper.""" def __init__( self, use_stealth: bool = True, simulate_human: bool = False, use_custom_headers: bool = False, # Disabled by default - creates detection signatures hide_webdriver: bool = True, bypass_cloudflare: bool = True, headless: bool = True, debug: bool = False, timeout: int = 30000, wait_for: str = 'domcontentloaded', use_current_browser: bool = False, use_persistent_context: bool = False, # Use persistent context for max stealth max_retries: int = 3, delay_after_load: int = 2, max_concurrent_pages: int = 5, locale: str | None = None, # e.g., 'en-US' - matches browser locale timezone_id: str | None = None, # e.g., 'America/New_York' - matches browser timezone ): self.use_stealth = use_stealth self.simulate_human = simulate_human self.use_custom_headers = use_custom_headers self.hide_webdriver = hide_webdriver self.bypass_cloudflare = bypass_cloudflare self.headless = headless self.debug = debug self.timeout = timeout self.wait_for = wait_for self.use_current_browser = use_current_browser self.use_persistent_context = use_persistent_context self.max_retries = max_retries self.delay_after_load = delay_after_load self.max_concurrent_pages = max_concurrent_pages self.locale = locale self.timezone_id = timezone_id class PlaywrightScraper(BaseScraper): """ Advanced web scraper implementation using Patchright (undetected Playwright fork). Features: - Undetected browser automation (bypasses Cloudflare, Akamai, etc.) - Browser instance reuse for better performance - Concurrent page scraping with configurable concurrency - Persistent context support for maximum stealth - CAPTCHA handling - Automatic stealth patches via Patchright """ def __init__(self, config: ScraperConfig | None = None): config = config or ScraperConfig() self.logger = logging.getLogger(__name__) self.logger.setLevel(logging.DEBUG if config.debug else logging.INFO) self.config = config self.chrome_process = None self.temp_user_data_dir = None # Browser pooling self._playwright = None self._browser: Browser | None = None self._persistent_context: BrowserContext | None = None self._browser_lock = asyncio.Lock() async def _get_browser(self, proxy: str | None = None, handle_captcha: bool = False) -> Browser: """Get or create a pooled browser instance with thread-safe locking.""" async with self._browser_lock: if self._browser is None or not self._browser.is_connected(): if self._playwright is None: self._playwright = await async_playwright().start() if self.config.use_current_browser: self._browser = await self.launch_and_connect_to_chrome(self._playwright) elif self.config.use_persistent_context: # Persistent context for maximum stealth - returns context, not browser self._persistent_context = await self.launch_persistent_context( self._playwright, proxy, handle_captcha ) self._browser = self._persistent_context.browser else: self._browser = await self.launch_browser(self._playwright, proxy, handle_captcha) return self._browser async def launch_persistent_context( self, playwright, proxy: Optional[str] = None, handle_captcha: bool = False ) -> BrowserContext: """ Launch a persistent browser context for maximum stealth. Patchright recommends persistent context with real Chrome for best undetectability. This uses a real Chrome profile which appears more human-like. Args: playwright: Patchright instance proxy: Optional proxy server handle_captcha: Whether CAPTCHA handling is enabled Returns: BrowserContext: Persistent browser context """ if self.temp_user_data_dir is None: self.temp_user_data_dir = tempfile.mkdtemp(prefix="patchright_profile_") context_options = { 'user_data_dir': self.temp_user_data_dir, 'channel': _get_browser_channel(), 'headless': self.config.headless and not handle_captcha, 'no_viewport': True, # Removes viewport fingerprint 'proxy': {'server': proxy} if proxy else None, 'args': ['--no-sandbox', '--disable-setuid-sandbox'], 'ignore_https_errors': True, 'locale': self.config.locale, 'timezone_id': self.config.timezone_id, } try: context = await playwright.chromium.launch_persistent_context( **{k: v for k, v in context_options.items() if v is not None} ) # Note: Init script disabled - Patchright handles stealth automatically # if self.config.use_stealth: # await self._add_stealth_init_script(context) return context except Exception as e: raise Exception( f"Failed to launch persistent context: {str(e)}\n\n" "Make sure Chrome is installed or run: patchright install chrome" ) async def fetch_content( self, url: str, proxy: str | None = None, pages: str | None = None, url_pattern: str | None = None, handle_captcha: bool = False ) -> list[str]: """ Fetch content from a given URL using Playwright with browser pooling. Args: url: The URL to fetch content from proxy: Proxy server to use for the request pages: Page numbers to scrape (e.g., "1-5" or "1,3,5") url_pattern: Pattern for constructing multi-page URLs handle_captcha: Whether to pause for CAPTCHA solving Returns: List of content strings from scraped pages """ browser = await self._get_browser(proxy, handle_captcha) context = None try: if handle_captcha: # For CAPTCHA mode: create context, handle CAPTCHA, then scrape pages context = await self.create_context(browser, proxy) page = await context.new_page() if self.config.use_stealth: await self.apply_stealth_settings(page) await self.set_browser_features(page) await self.handle_captcha(page, url) # After CAPTCHA is solved, get content from the current page await asyncio.sleep(self.config.delay_after_load) first_page_content = await page.content() # Check if we need to scrape multiple pages if pages: page_numbers = self.parse_page_numbers(pages) if not url_pattern: url_pattern = self.detect_url_pattern(url) contents = [first_page_content] # First page already scraped # Scrape remaining pages (skip first one since we already have it) for page_num in page_numbers[1:]: page_url = self.apply_url_pattern(url, url_pattern, page_num) if url_pattern else url self.logger.info(f"Scraping page {page_num}: {page_url}") await page.goto(page_url, wait_until=self.config.wait_for, timeout=self.config.timeout) await asyncio.sleep(self.config.delay_after_load) content = await page.content() contents.append(content) else: contents = [first_page_content] else: # Normal mode: use scrape_multiple_pages contents = await self.scrape_multiple_pages(browser, url, pages, url_pattern, proxy) except Exception as e: import traceback error_details = f"{type(e).__name__}: {str(e)}" self.logger.error(f"Error during scraping: {error_details}") self.logger.error(traceback.format_exc()) contents = [f"Error: {error_details}"] finally: if context: await context.close() # Close browser after CAPTCHA mode to clean up the visible window if handle_captcha and self._browser and self._browser.is_connected(): await self._browser.close() self._browser = None return contents async def handle_captcha(self, page: Page, url: str): """ Handle CAPTCHA solving by pausing execution and waiting for user input. This method navigates to the URL and waits for the user to solve any CAPTCHAs manually before continuing with the scraping process. Args: page (Page): Playwright page object url (str): URL to navigate to for CAPTCHA solving """ self.logger.info("Waiting for user to solve CAPTCHA...") try: await page.goto(url, wait_until=self.config.wait_for, timeout=self.config.timeout) print("Please solve the CAPTCHA in the browser window.") print("Once solved, press Enter in this console to continue...") # await aioconsole.ainput() # For now, just sleep to allow manual interaction if aioconsole is missing await asyncio.sleep(60) # Use 'load' instead of 'networkidle' - modern sites never reach networkidle # due to constant analytics/tracking requests await page.wait_for_load_state('load', timeout=5000) self.logger.info("CAPTCHA handling completed.") except Exception as e: # Handle browser closure or timeout gracefully if "closed" in str(e).lower(): self.logger.warning("Browser was closed during CAPTCHA handling") raise self.logger.warning(f"CAPTCHA wait completed with: {e}") async def launch_and_connect_to_chrome(self, playwright): """ Launch a new Chrome instance with remote debugging enabled and connect to it. This method creates a temporary user data directory and launches Chrome with remote debugging on port 9222, then connects to it via Playwright. Args: playwright: Playwright instance Returns: Browser: Connected browser instance Raises: Exception: If unable to connect to Chrome after 30 seconds """ if self.chrome_process is None: self.temp_user_data_dir = tempfile.mkdtemp(prefix="chrome_debug_profile_") chrome_executable = self.get_chrome_executable() command = [ chrome_executable, f"--user-data-dir={self.temp_user_data_dir}", "--remote-debugging-port=9222", "--no-first-run", "--no-default-browser-check" ] self.chrome_process = subprocess.Popen(command) self.logger.info("Launched Chrome with remote debugging.") for _ in range(30): try: browser = await playwright.chromium.connect_over_cdp("http://localhost:9222") self.logger.info("Successfully connected to Chrome.") return browser except Exception as e: self.logger.debug(f"Connection attempt failed: {str(e)}") await asyncio.sleep(1) raise Exception("Failed to connect to Chrome after 30 seconds") def get_chrome_executable(self): """ Get the path to Chrome executable based on the operating system. Returns: str: Path to Chrome executable Raises: NotImplementedError: If the operating system is not supported """ system = platform.system() if system == "Darwin": # macOS return "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" elif system == "Linux": return "google-chrome" elif system == "Windows": return "chrome" else: raise NotImplementedError(f"Unsupported operating system: {system}") async def close(self) -> None: """ Clean up resources including browser, Chrome process, and temp directories. This should be called when done using the scraper, or use it as an async context manager. """ import shutil # Close persistent context if used if self._persistent_context: await self._persistent_context.close() self._persistent_context = None self.logger.info("Persistent context closed.") # Close pooled browser if self._browser and self._browser.is_connected(): await self._browser.close() self._browser = None self.logger.info("Browser closed.") # Stop playwright if self._playwright: await self._playwright.stop() self._playwright = None # Terminate Chrome process if we started one if self.chrome_process: self.chrome_process.terminate() self.chrome_process.wait() self.chrome_process = None self.logger.info("Chrome process terminated.") # Remove temp directory if self.temp_user_data_dir: shutil.rmtree(self.temp_user_data_dir, ignore_errors=True) self.logger.info(f"Temporary user data directory removed: {self.temp_user_data_dir}") self.temp_user_data_dir = None async def __aenter__(self): """Async context manager entry.""" return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit with cleanup.""" await self.close() return False async def connect_to_current_browser(self, playwright): """ Connect to an existing browser instance with remote debugging enabled. This method launches a browser with remote debugging and attempts to connect to it via Playwright. Args: playwright: Playwright instance Returns: Browser: Connected browser instance Raises: NotImplementedError: If the operating system is not supported Exception: If unable to connect to the browser after 30 seconds """ system = platform.system() if system == "Darwin": subprocess.Popen(["open", "-a", "Google Chrome", "--args", "--remote-debugging-port=9222"]) elif system == "Linux": subprocess.Popen(["google-chrome", "--remote-debugging-port=9222"]) elif system == "Windows": subprocess.Popen(["start", "chrome", "--remote-debugging-port=9222"], shell=True) else: raise NotImplementedError(f"Connecting to current browser is not implemented for {system}") self.logger.info("Waiting for browser to start...") for _ in range(30): try: browser = await playwright.chromium.connect_over_cdp("http://localhost:9222") self.logger.info("Successfully connected to the browser.") return browser except Exception as e: self.logger.debug(f"Connection attempt failed: {str(e)}") await asyncio.sleep(1) raise Exception("Failed to connect to the current browser after 30 seconds") async def launch_browser(self, playwright, proxy: Optional[str] = None, handle_captcha: bool = False) -> Browser: """ Launch a new browser instance with specified configuration. Args: playwright: Patchright instance proxy (Optional[str]): Proxy server to use handle_captcha (bool): Whether CAPTCHA handling is enabled Returns: Browser: Launched browser instance """ try: channel = _get_browser_channel() launch_options = { 'headless': self.config.headless and not handle_captcha, 'args': ['--no-sandbox', '--disable-setuid-sandbox', '--disable-infobars', '--window-position=0,0', '--ignore-certifcate-errors', '--ignore-certifcate-errors-spki-list'], 'proxy': {'server': proxy} if proxy else None } if channel: launch_options['channel'] = channel return await playwright.chromium.launch(**launch_options) except EOFError: raise Exception( "Patchright browsers are not installed.\n\n" "Please run: patchright install chromium\n\n" "Or for better stealth: patchright install chrome" ) except Exception as e: raise Exception(f"Failed to launch browser: {str(e)}") async def create_context(self, browser: Browser, proxy: Optional[str] = None) -> BrowserContext: """ Create a new browser context with specified settings. Note: Patchright recommends NOT setting custom user_agent or viewport as these create detection signatures. Let the browser use defaults. Args: browser (Browser): Browser instance proxy (Optional[str]): Proxy server to use Returns: BrowserContext: Created browser context """ context_options = { 'proxy': {'server': proxy} if proxy else None, 'java_script_enabled': True, 'ignore_https_errors': True, 'locale': self.config.locale, 'timezone_id': self.config.timezone_id, } # Don't set viewport or user_agent - Patchright handles stealth better without them context = await browser.new_context(**{k: v for k, v in context_options.items() if v is not None}) # Note: Init script disabled - Patchright handles stealth automatically # If needed, uncomment: # if self.config.use_stealth: # await self._add_stealth_init_script(context) return context async def _add_stealth_init_script(self, context: BrowserContext): """ Add init script to context for additional stealth before page scripts run. This runs before any page JavaScript executes, patching detection vectors that Patchright might not cover. Uses the Playwright add_init_script API. Args: context: Browser context to add init script to """ stealth_script = ''' () => { // Patch chrome.runtime to avoid detection if (!window.chrome) { window.chrome = {}; } if (!window.chrome.runtime) { window.chrome.runtime = {}; } // Patch plugins array to look like real browser Object.defineProperty(navigator, 'plugins', { get: () => { const plugins = [ { name: 'Chrome PDF Plugin', filename: 'internal-pdf-viewer' }, { name: 'Chrome PDF Viewer', filename: 'mhjfbmdgcfjbbpaeojofohoefgiehjai' }, { name: 'Native Client', filename: 'internal-nacl-plugin' } ]; plugins.item = (i) => plugins[i] || null; plugins.namedItem = (name) => plugins.find(p => p.name === name) || null; plugins.refresh = () => {}; return plugins; } }); // Patch languages to look normal Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); // Ensure webdriver is not set (Patchright handles this but extra safety) Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); // Patch connection info if (navigator.connection) { Object.defineProperty(navigator.connection, 'rtt', { get: () => 50 }); } } ''' await context.add_init_script(stealth_script) async def apply_stealth_settings(self, page: Page): """ Apply additional stealth settings to avoid bot detection. Note: Patchright already handles most stealth features automatically: - navigator.webdriver is already undefined - Automation flags are already removed - Runtime.enable leak is already patched This method only applies minimal additional patches that don't interfere. Args: page (Page): Patchright page object """ # Patchright handles most stealth automatically via isolated contexts # Only apply minimal non-conflicting patches await page.evaluate(''' () => { // Patch permissions query for notifications const originalQuery = window.navigator.permissions.query; if (originalQuery) { window.navigator.permissions.query = (parameters) => ( parameters.name === 'notifications' ? Promise.resolve({ state: Notification.permission }) : originalQuery(parameters) ); } } ''') async def set_browser_features(self, page: Page): """ Set browser features like custom headers if enabled in configuration. Args: page (Page): Playwright page object """ if self.config.use_custom_headers: await page.set_extra_http_headers({ 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Referer': 'https://www.google.com/', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'Upgrade-Insecure-Requests': '1' }) async def scrape_multiple_pages( self, browser: Browser, base_url: str, pages: str | None = None, url_pattern: str | None = None, proxy: str | None = None ) -> list[str]: """ Scrape content from single or multiple pages with concurrent execution. Uses asyncio.gather with a semaphore for controlled concurrency. Supports both regular browser contexts and persistent contexts. Args: browser: Browser instance for creating contexts base_url: Base URL to scrape pages: Page numbers to scrape url_pattern: Pattern for constructing multi-page URLs proxy: Optional proxy for context creation Returns: List of content strings from scraped pages """ if not url_pattern: url_pattern = self.detect_url_pattern(base_url) # Use persistent context if available, otherwise create new contexts use_persistent = self._persistent_context is not None if not url_pattern and not pages: # Single page scraping self.logger.info(f"Scraping single page: {base_url}") if use_persistent: page = await self._persistent_context.new_page() try: if self.config.use_stealth: await self.apply_stealth_settings(page) await self.set_browser_features(page) content = await self.navigate_and_get_content(page, base_url) return [content] finally: await page.close() else: context = await self.create_context(browser, proxy) try: page = await context.new_page() if self.config.use_stealth: await self.apply_stealth_settings(page) await self.set_browser_features(page) content = await self.navigate_and_get_content(page, base_url) return [content] finally: await context.close() # Multiple page scraping with concurrency page_numbers = self.parse_page_numbers(pages) if pages else [1] urls = [ self.apply_url_pattern(base_url, url_pattern, page_num) if url_pattern else base_url for page_num in page_numbers ] semaphore = asyncio.Semaphore(self.config.max_concurrent_pages) async def scrape_with_context(url: str, page_num: int) -> str: async with semaphore: self.logger.info(f"Scraping page {page_num}: {url}") if use_persistent: page = await self._persistent_context.new_page() try: if self.config.use_stealth: await self.apply_stealth_settings(page) await self.set_browser_features(page) content = await self.navigate_and_get_content(page, url) await asyncio.sleep(random.uniform(0.5, 1.5)) return content finally: await page.close() else: context = await self.create_context(browser, proxy) try: page = await context.new_page() if self.config.use_stealth: await self.apply_stealth_settings(page) await self.set_browser_features(page) content = await self.navigate_and_get_content(page, url) await asyncio.sleep(random.uniform(0.5, 1.5)) return content finally: await context.close() # Execute concurrently and maintain order tasks = [ scrape_with_context(url, page_num) for page_num, url in zip(page_numbers, urls) ] return await asyncio.gather(*tasks) async def navigate_and_get_content(self, page: Page, url: str) -> str: """ Navigate to a URL and extract its content. Args: page (Page): Playwright page object url (str): URL to navigate to Returns: str: Page content or error message """ try: self.logger.info(f"Navigating to {url}") await page.goto(url, wait_until=self.config.wait_for, timeout=self.config.timeout) self.logger.info(f"Successfully loaded {url}") await asyncio.sleep(self.config.delay_after_load) self.logger.info("Extracting page content") content = await page.content() self.logger.info(f"Successfully extracted content (length: {len(content)})") return content except asyncio.TimeoutError: self.logger.error(f"Timeout loading {url}") return f"Error: {ErrorMessages.TIMEOUT_ERROR}" except Exception as e: self.logger.error(f"Error navigating to {url}: {str(e)}") error_details = str(e) if len(str(e)) < 200 else str(e)[:200] + "..." return f"Error: {ErrorMessages.SCRAPING_FAILED}\n\nDetails: {error_details}" async def bypass_cloudflare(self, page: Page, url: str) -> str: """ Attempt to bypass Cloudflare protection. This method reloads the page multiple times and simulates human behavior to try to bypass Cloudflare's bot detection. Args: page (Page): Playwright page object url (str): URL to bypass Cloudflare for Returns: str: Page content after bypass attempt """ max_retries = 3 for _ in range(max_retries): await page.reload(wait_until=self.config.wait_for, timeout=self.config.timeout) if self.config.simulate_human: await self.simulate_human_behavior(page) else: await asyncio.sleep(2) content = await page.content() if "Cloudflare" not in content or "ray ID" not in content.lower(): self.logger.info("Successfully bypassed Cloudflare") return content self.logger.info("Cloudflare still detected, retrying...") self.logger.warning("Failed to bypass Cloudflare after multiple attempts") return content async def simulate_human_behavior(self, page: Page): """ Simulate human-like browsing behavior. This method simulates human behavior like scrolling, mouse movements, and hovering over elements to make automation less detectable. Args: page (Page): Playwright page object """ # Scrolling behavior await page.evaluate('window.scrollBy(0, window.innerHeight / 2)') await asyncio.sleep(random.uniform(0.5, 1)) # Mouse movement behavior for _ in range(2): x = random.randint(100, 500) y = random.randint(100, 500) await page.mouse.move(x, y) await asyncio.sleep(random.uniform(0.1, 0.3)) # Hover over a random element (without clicking) elements = await page.query_selector_all('a, button, input, select') if elements: random_element = random.choice(elements) await random_element.hover() await asyncio.sleep(random.uniform(0.3, 0.7)) def detect_url_pattern(self, url: str) -> Optional[str]: """ Detect URL pagination pattern from a given URL. This method analyzes the URL to identify common pagination patterns in query parameters or path segments. Args: url (str): URL to analyze for pagination patterns Returns: Optional[str]: Detected pattern or None if no pattern found """ parsed_url = urlparse(url) query = parse_qs(parsed_url.query) for param, value in query.items(): if value and value[0].isdigit(): return f"{param}={{{param}}}" path_parts = parsed_url.path.split('/') for i, part in enumerate(path_parts): if part.isdigit(): path_parts[i] = "{page}" return '/'.join(path_parts) return None def apply_url_pattern(self, base_url: str, pattern: str, page_num: int) -> str: """ Apply a URL pattern to generate a paginated URL. Args: base_url (str): Base URL to apply pattern to pattern (str): Pattern to apply page_num (int): Page number to insert into pattern Returns: str: Generated URL with page number applied """ parsed_url = urlparse(base_url) if '=' in pattern: query = parse_qs(parsed_url.query) param, value = pattern.split('=') query[param] = [value.format(**{param: page_num})] return urlunparse(parsed_url._replace(query=urlencode(query, doseq=True))) elif '{page}' in pattern: return urlunparse(parsed_url._replace(path=pattern.format(page=page_num))) else: return base_url def parse_page_numbers(self, pages: Optional[str]) -> List[int]: """ Parse page number specification into a list of integers. This method parses page specifications like "1-5" or "1,3,5" into a sorted list of unique page numbers. Args: pages (Optional[str]): Page specification string Returns: List[int]: Sorted list of unique page numbers """ if not pages: return [1] page_numbers = [] for part in pages.split(','): if '-' in part: start, end = map(int, part.split('-')) page_numbers.extend(range(start, end + 1)) else: page_numbers.append(int(part)) return sorted(set(page_numbers)) async def extract(self, content: str) -> Dict[str, Any]: """ Extract structured data from content. For the Playwright scraper, this method simply returns the raw content since Playwright is primarily used for fetching content rather than extracting structured data. Args: content (str): Raw content to extract data from Returns: Dict[str, Any]: Dictionary containing the raw content """ return {"raw_content": content}