|
|
"""
|
|
|
Playwright browser helper for loading and interacting with quiz pages.
|
|
|
"""
|
|
|
import asyncio
|
|
|
import logging
|
|
|
from typing import Optional, Dict, Any, List
|
|
|
from playwright.async_api import async_playwright, Browser, Page, BrowserContext
|
|
|
import time
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class BrowserHelper:
|
|
|
"""Helper class for managing Playwright browser sessions."""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.browser: Optional[Browser] = None
|
|
|
self.context: Optional[BrowserContext] = None
|
|
|
self.page: Optional[Page] = None
|
|
|
self.playwright = None
|
|
|
|
|
|
async def start(self, headless: bool = True) -> None:
|
|
|
"""
|
|
|
Start Playwright browser.
|
|
|
|
|
|
Args:
|
|
|
headless: Run in headless mode
|
|
|
"""
|
|
|
try:
|
|
|
self.playwright = await async_playwright().start()
|
|
|
self.browser = await self.playwright.chromium.launch(
|
|
|
headless=headless,
|
|
|
args=[
|
|
|
'--no-sandbox',
|
|
|
'--disable-setuid-sandbox',
|
|
|
'--disable-dev-shm-usage',
|
|
|
'--disable-accelerated-2d-canvas',
|
|
|
'--disable-gpu'
|
|
|
]
|
|
|
)
|
|
|
self.context = await self.browser.new_context(
|
|
|
viewport={'width': 1920, 'height': 1080},
|
|
|
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
|
|
|
)
|
|
|
self.page = await self.context.new_page()
|
|
|
logger.info("Browser started successfully")
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error starting browser: {e}")
|
|
|
raise
|
|
|
|
|
|
async def load_page(self, url: str, wait_time: int = 5, timeout: int = 30000) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Load a page and extract all content.
|
|
|
|
|
|
Args:
|
|
|
url: URL to load
|
|
|
wait_time: Seconds to wait for page to load
|
|
|
timeout: Page load timeout in milliseconds
|
|
|
|
|
|
Returns:
|
|
|
Dictionary with page content
|
|
|
"""
|
|
|
if not self.page:
|
|
|
await self.start()
|
|
|
|
|
|
try:
|
|
|
logger.info(f"Loading page: {url}")
|
|
|
await self.page.goto(url, wait_until='networkidle', timeout=timeout)
|
|
|
|
|
|
|
|
|
await asyncio.sleep(wait_time)
|
|
|
|
|
|
|
|
|
content = {
|
|
|
'url': url,
|
|
|
'title': await self.page.title(),
|
|
|
'text': await self.page.inner_text('body'),
|
|
|
'html': await self.page.content(),
|
|
|
'screenshot': await self.page.screenshot(full_page=True),
|
|
|
}
|
|
|
|
|
|
|
|
|
try:
|
|
|
content['all_text'] = await self.page.evaluate("""
|
|
|
() => {
|
|
|
const walker = document.createTreeWalker(
|
|
|
document.body,
|
|
|
NodeFilter.SHOW_TEXT,
|
|
|
null,
|
|
|
false
|
|
|
);
|
|
|
let text = [];
|
|
|
let node;
|
|
|
while (node = walker.nextNode()) {
|
|
|
if (node.textContent.trim()) {
|
|
|
text.push(node.textContent.trim());
|
|
|
}
|
|
|
}
|
|
|
return text.join('\\n');
|
|
|
}
|
|
|
""")
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Error extracting all text: {e}")
|
|
|
content['all_text'] = content['text']
|
|
|
|
|
|
|
|
|
try:
|
|
|
content['links'] = await self.page.evaluate("""
|
|
|
() => {
|
|
|
const links = Array.from(document.querySelectorAll('a[href]'));
|
|
|
return links.map(a => ({text: a.textContent.trim(), href: a.href}));
|
|
|
}
|
|
|
""")
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Error extracting links: {e}")
|
|
|
content['links'] = []
|
|
|
|
|
|
|
|
|
try:
|
|
|
content['images'] = await self.page.evaluate("""
|
|
|
() => {
|
|
|
const images = Array.from(document.querySelectorAll('img[src]'));
|
|
|
return images.map(img => ({alt: img.alt, src: img.src}));
|
|
|
}
|
|
|
""")
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Error extracting images: {e}")
|
|
|
content['images'] = []
|
|
|
|
|
|
logger.info(f"Page loaded successfully: {content['title']}")
|
|
|
return content
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error loading page {url}: {e}")
|
|
|
raise
|
|
|
|
|
|
async def click_element(self, selector: str) -> bool:
|
|
|
"""
|
|
|
Click an element on the page.
|
|
|
|
|
|
Args:
|
|
|
selector: CSS selector
|
|
|
|
|
|
Returns:
|
|
|
True if successful
|
|
|
"""
|
|
|
try:
|
|
|
await self.page.click(selector)
|
|
|
await asyncio.sleep(1)
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error clicking element {selector}: {e}")
|
|
|
return False
|
|
|
|
|
|
async def fill_input(self, selector: str, value: str) -> bool:
|
|
|
"""
|
|
|
Fill an input field.
|
|
|
|
|
|
Args:
|
|
|
selector: CSS selector
|
|
|
value: Value to fill
|
|
|
|
|
|
Returns:
|
|
|
True if successful
|
|
|
"""
|
|
|
try:
|
|
|
await self.page.fill(selector, value)
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error filling input {selector}: {e}")
|
|
|
return False
|
|
|
|
|
|
async def wait_for_element(self, selector: str, timeout: int = 10000) -> bool:
|
|
|
"""
|
|
|
Wait for an element to appear.
|
|
|
|
|
|
Args:
|
|
|
selector: CSS selector
|
|
|
timeout: Timeout in milliseconds
|
|
|
|
|
|
Returns:
|
|
|
True if element found
|
|
|
"""
|
|
|
try:
|
|
|
await self.page.wait_for_selector(selector, timeout=timeout)
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Element {selector} not found: {e}")
|
|
|
return False
|
|
|
|
|
|
async def evaluate_script(self, script: str) -> Any:
|
|
|
"""
|
|
|
Execute JavaScript on the page.
|
|
|
|
|
|
Args:
|
|
|
script: JavaScript code to execute
|
|
|
|
|
|
Returns:
|
|
|
Result of script execution
|
|
|
"""
|
|
|
try:
|
|
|
return await self.page.evaluate(script)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error evaluating script: {e}")
|
|
|
return None
|
|
|
|
|
|
async def close(self) -> None:
|
|
|
"""Close browser and cleanup."""
|
|
|
try:
|
|
|
if self.page:
|
|
|
await self.page.close()
|
|
|
if self.context:
|
|
|
await self.context.close()
|
|
|
if self.browser:
|
|
|
await self.browser.close()
|
|
|
if self.playwright:
|
|
|
await self.playwright.stop()
|
|
|
logger.info("Browser closed")
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error closing browser: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
_browser: Optional[BrowserHelper] = None
|
|
|
|
|
|
|
|
|
async def get_browser() -> BrowserHelper:
|
|
|
"""
|
|
|
Get or create a browser instance.
|
|
|
|
|
|
Returns:
|
|
|
BrowserHelper instance
|
|
|
"""
|
|
|
global _browser
|
|
|
if _browser is None:
|
|
|
_browser = BrowserHelper()
|
|
|
await _browser.start()
|
|
|
return _browser
|
|
|
|
|
|
|
|
|
async def cleanup_browser() -> None:
|
|
|
"""Cleanup browser instance."""
|
|
|
global _browser
|
|
|
if _browser:
|
|
|
await _browser.close()
|
|
|
_browser = None
|
|
|
|
|
|
|