""" Playwright browser helper for loading and interacting with quiz pages. """ import asyncio import logging import sys from asyncio.subprocess import PIPE from typing import Optional, Dict, Any, List from playwright.async_api import async_playwright, Browser, Page, BrowserContext import time logger = logging.getLogger(__name__) class BrowserHelper: """Helper class for managing Playwright browser sessions.""" def __init__(self): self.browser: Optional[Browser] = None self.context: Optional[BrowserContext] = None self.page: Optional[Page] = None self.playwright = None self._install_attempted = False async def start(self, headless: bool = True) -> None: """ Start Playwright browser. Args: headless: Run in headless mode """ try: self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch( headless=headless, args=[ '--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage', '--disable-accelerated-2d-canvas', '--disable-gpu' ] ) self.context = await self.browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' ) self.page = await self.context.new_page() logger.info("Browser started successfully") except Exception as e: await self._cleanup_partial_start() if self._should_install_browsers(e): logger.warning("Playwright browsers missing. Installing Chromium bundle...") await self._install_browsers() return await self.start(headless=headless) logger.error(f"Error starting browser: {e}") raise def _should_install_browsers(self, error: Exception) -> bool: """Determine if the error indicates missing Playwright browsers.""" if self._install_attempted: return False message = str(error).lower() indicators = [ "executable doesn't exist", "run the following command to download new browsers", "playwright install" ] needs_install = any(token in message for token in indicators) if needs_install: self._install_attempted = True return needs_install async def _install_browsers(self) -> None: """Install the required Playwright Chromium browser once.""" cmd = [sys.executable, "-m", "playwright", "install", "chromium"] logger.info("Running: %s", " ".join(cmd)) process = await asyncio.create_subprocess_exec( *cmd, stdout=PIPE, stderr=PIPE ) stdout, stderr = await process.communicate() if stdout: logger.debug(stdout.decode(errors="ignore").strip()) if stderr: logger.warning(stderr.decode(errors="ignore").strip()) if process.returncode != 0: raise RuntimeError(f"Failed to install Playwright browsers (exit code {process.returncode})") logger.info("Playwright Chromium installed successfully") async def _cleanup_partial_start(self) -> None: """Cleanup resources if browser start fails midway.""" try: if self.page: await self.page.close() except Exception: pass finally: self.page = None try: if self.context: await self.context.close() except Exception: pass finally: self.context = None try: if self.browser: await self.browser.close() except Exception: pass finally: self.browser = None try: if self.playwright: await self.playwright.stop() except Exception: pass finally: self.playwright = None async def load_page(self, url: str, wait_time: int = 2, timeout: int = 15000) -> Dict[str, Any]: """ Load a page and extract all content. Args: url: URL to load wait_time: Seconds to wait for page to load (default: 2, optimized for speed) timeout: Page load timeout in milliseconds (default: 15s) Returns: Dictionary with page content """ if not self.page: await self.start() try: logger.info(f"Loading page: {url} (timeout: {timeout}ms, wait: {wait_time}s)") # Use 'load' instead of 'networkidle' for faster loading await self.page.goto(url, wait_until='load', timeout=timeout) # Wait for dynamic content (reduced wait time) await asyncio.sleep(min(wait_time, 2)) # Extract page content content = { 'url': url, 'title': await self.page.title(), 'text': await self.page.inner_text('body'), 'html': await self.page.content(), 'screenshot': await self.page.screenshot(full_page=True), } # Try to extract all visible text elements try: content['all_text'] = await self.page.evaluate(""" () => { const walker = document.createTreeWalker( document.body, NodeFilter.SHOW_TEXT, null, false ); let text = []; let node; while (node = walker.nextNode()) { if (node.textContent.trim()) { text.push(node.textContent.trim()); } } return text.join('\\n'); } """) except Exception as e: logger.warning(f"Error extracting all text: {e}") content['all_text'] = content['text'] # Extract links try: content['links'] = await self.page.evaluate(""" () => { const links = Array.from(document.querySelectorAll('a[href]')); return links.map(a => ({text: a.textContent.trim(), href: a.href})); } """) except Exception as e: logger.warning(f"Error extracting links: {e}") content['links'] = [] # Extract images try: content['images'] = await self.page.evaluate(""" () => { const images = Array.from(document.querySelectorAll('img[src]')); return images.map(img => ({alt: img.alt, src: img.src})); } """) except Exception as e: logger.warning(f"Error extracting images: {e}") content['images'] = [] logger.info(f"Page loaded successfully: {content['title']}") return content except Exception as e: logger.error(f"Error loading page {url}: {e}") raise async def click_element(self, selector: str) -> bool: """ Click an element on the page. Args: selector: CSS selector Returns: True if successful """ try: await self.page.click(selector) await asyncio.sleep(1) return True except Exception as e: logger.error(f"Error clicking element {selector}: {e}") return False async def fill_input(self, selector: str, value: str) -> bool: """ Fill an input field. Args: selector: CSS selector value: Value to fill Returns: True if successful """ try: await self.page.fill(selector, value) return True except Exception as e: logger.error(f"Error filling input {selector}: {e}") return False async def wait_for_element(self, selector: str, timeout: int = 10000) -> bool: """ Wait for an element to appear. Args: selector: CSS selector timeout: Timeout in milliseconds Returns: True if element found """ try: await self.page.wait_for_selector(selector, timeout=timeout) return True except Exception as e: logger.warning(f"Element {selector} not found: {e}") return False async def evaluate_script(self, script: str) -> Any: """ Execute JavaScript on the page. Args: script: JavaScript code to execute Returns: Result of script execution """ try: return await self.page.evaluate(script) except Exception as e: logger.error(f"Error evaluating script: {e}") return None async def close(self) -> None: """Close browser and cleanup.""" try: if self.page: await self.page.close() if self.context: await self.context.close() if self.browser: await self.browser.close() if self.playwright: await self.playwright.stop() logger.info("Browser closed") except Exception as e: logger.error(f"Error closing browser: {e}") # Global browser instance _browser: Optional[BrowserHelper] = None async def get_browser() -> BrowserHelper: """ Get or create a browser instance. Returns: BrowserHelper instance """ global _browser if _browser is None: _browser = BrowserHelper() await _browser.start() return _browser async def cleanup_browser() -> None: """Cleanup browser instance.""" global _browser if _browser: await _browser.close() _browser = None