Prj2 / app /browser.py
iitmbs24f's picture
Upload 12 files
afbbccf verified
"""
Playwright browser helper for loading and interacting with quiz pages.
"""
import asyncio
import logging
import sys
from asyncio.subprocess import PIPE
from typing import Optional, Dict, Any, List
from playwright.async_api import async_playwright, Browser, Page, BrowserContext
import time
logger = logging.getLogger(__name__)
class BrowserHelper:
"""Helper class for managing Playwright browser sessions."""
def __init__(self):
self.browser: Optional[Browser] = None
self.context: Optional[BrowserContext] = None
self.page: Optional[Page] = None
self.playwright = None
self._install_attempted = False
async def start(self, headless: bool = True) -> None:
"""
Start Playwright browser.
Args:
headless: Run in headless mode
"""
try:
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(
headless=headless,
args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-accelerated-2d-canvas',
'--disable-gpu'
]
)
self.context = await self.browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
)
self.page = await self.context.new_page()
logger.info("Browser started successfully")
except Exception as e:
await self._cleanup_partial_start()
if self._should_install_browsers(e):
logger.warning("Playwright browsers missing. Installing Chromium bundle...")
await self._install_browsers()
return await self.start(headless=headless)
logger.error(f"Error starting browser: {e}")
raise
def _should_install_browsers(self, error: Exception) -> bool:
"""Determine if the error indicates missing Playwright browsers."""
if self._install_attempted:
return False
message = str(error).lower()
indicators = [
"executable doesn't exist",
"run the following command to download new browsers",
"playwright install"
]
needs_install = any(token in message for token in indicators)
if needs_install:
self._install_attempted = True
return needs_install
async def _install_browsers(self) -> None:
"""Install the required Playwright Chromium browser once."""
cmd = [sys.executable, "-m", "playwright", "install", "chromium"]
logger.info("Running: %s", " ".join(cmd))
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=PIPE,
stderr=PIPE
)
stdout, stderr = await process.communicate()
if stdout:
logger.debug(stdout.decode(errors="ignore").strip())
if stderr:
logger.warning(stderr.decode(errors="ignore").strip())
if process.returncode != 0:
raise RuntimeError(f"Failed to install Playwright browsers (exit code {process.returncode})")
logger.info("Playwright Chromium installed successfully")
async def _cleanup_partial_start(self) -> None:
"""Cleanup resources if browser start fails midway."""
try:
if self.page:
await self.page.close()
except Exception:
pass
finally:
self.page = None
try:
if self.context:
await self.context.close()
except Exception:
pass
finally:
self.context = None
try:
if self.browser:
await self.browser.close()
except Exception:
pass
finally:
self.browser = None
try:
if self.playwright:
await self.playwright.stop()
except Exception:
pass
finally:
self.playwright = None
async def load_page(self, url: str, wait_time: int = 2, timeout: int = 15000) -> Dict[str, Any]:
"""
Load a page and extract all content.
Args:
url: URL to load
wait_time: Seconds to wait for page to load (default: 2, optimized for speed)
timeout: Page load timeout in milliseconds (default: 15s)
Returns:
Dictionary with page content
"""
if not self.page:
await self.start()
try:
logger.info(f"Loading page: {url} (timeout: {timeout}ms, wait: {wait_time}s)")
# Use 'load' instead of 'networkidle' for faster loading
await self.page.goto(url, wait_until='load', timeout=timeout)
# Wait for dynamic content (reduced wait time)
await asyncio.sleep(min(wait_time, 2))
# Extract page content
content = {
'url': url,
'title': await self.page.title(),
'text': await self.page.inner_text('body'),
'html': await self.page.content(),
'screenshot': await self.page.screenshot(full_page=True),
}
# Try to extract all visible text elements
try:
content['all_text'] = await self.page.evaluate("""
() => {
const walker = document.createTreeWalker(
document.body,
NodeFilter.SHOW_TEXT,
null,
false
);
let text = [];
let node;
while (node = walker.nextNode()) {
if (node.textContent.trim()) {
text.push(node.textContent.trim());
}
}
return text.join('\\n');
}
""")
except Exception as e:
logger.warning(f"Error extracting all text: {e}")
content['all_text'] = content['text']
# Extract links
try:
content['links'] = await self.page.evaluate("""
() => {
const links = Array.from(document.querySelectorAll('a[href]'));
return links.map(a => ({text: a.textContent.trim(), href: a.href}));
}
""")
except Exception as e:
logger.warning(f"Error extracting links: {e}")
content['links'] = []
# Extract images
try:
content['images'] = await self.page.evaluate("""
() => {
const images = Array.from(document.querySelectorAll('img[src]'));
return images.map(img => ({alt: img.alt, src: img.src}));
}
""")
except Exception as e:
logger.warning(f"Error extracting images: {e}")
content['images'] = []
logger.info(f"Page loaded successfully: {content['title']}")
return content
except Exception as e:
logger.error(f"Error loading page {url}: {e}")
raise
async def click_element(self, selector: str) -> bool:
"""
Click an element on the page.
Args:
selector: CSS selector
Returns:
True if successful
"""
try:
await self.page.click(selector)
await asyncio.sleep(1)
return True
except Exception as e:
logger.error(f"Error clicking element {selector}: {e}")
return False
async def fill_input(self, selector: str, value: str) -> bool:
"""
Fill an input field.
Args:
selector: CSS selector
value: Value to fill
Returns:
True if successful
"""
try:
await self.page.fill(selector, value)
return True
except Exception as e:
logger.error(f"Error filling input {selector}: {e}")
return False
async def wait_for_element(self, selector: str, timeout: int = 10000) -> bool:
"""
Wait for an element to appear.
Args:
selector: CSS selector
timeout: Timeout in milliseconds
Returns:
True if element found
"""
try:
await self.page.wait_for_selector(selector, timeout=timeout)
return True
except Exception as e:
logger.warning(f"Element {selector} not found: {e}")
return False
async def evaluate_script(self, script: str) -> Any:
"""
Execute JavaScript on the page.
Args:
script: JavaScript code to execute
Returns:
Result of script execution
"""
try:
return await self.page.evaluate(script)
except Exception as e:
logger.error(f"Error evaluating script: {e}")
return None
async def close(self) -> None:
"""Close browser and cleanup."""
try:
if self.page:
await self.page.close()
if self.context:
await self.context.close()
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
logger.info("Browser closed")
except Exception as e:
logger.error(f"Error closing browser: {e}")
# Global browser instance
_browser: Optional[BrowserHelper] = None
async def get_browser() -> BrowserHelper:
"""
Get or create a browser instance.
Returns:
BrowserHelper instance
"""
global _browser
if _browser is None:
_browser = BrowserHelper()
await _browser.start()
return _browser
async def cleanup_browser() -> None:
"""Cleanup browser instance."""
global _browser
if _browser:
await _browser.close()
_browser = None