| | """ |
| | Browser Automation Tool - Web scraping, screenshots, and form automation |
| | |
| | Uses Playwright for headless browser control. |
| | """ |
| |
|
| | import base64 |
| | import logging |
| | from dataclasses import dataclass |
| | from typing import Any, Dict, List, Optional |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | try: |
| | from playwright.async_api import async_playwright, Page, Browser |
| | PLAYWRIGHT_AVAILABLE = True |
| | except ImportError: |
| | PLAYWRIGHT_AVAILABLE = False |
| | logger.warning("Playwright not installed. Browser automation will be unavailable.") |
| |
|
| |
|
| | @dataclass |
| | class BrowserResult: |
| | """Result of browser automation""" |
| | url: str |
| | success: bool |
| | screenshot: Optional[bytes] = None |
| | html: Optional[str] = None |
| | extracted_data: Optional[Dict] = None |
| | error: Optional[str] = None |
| | final_url: Optional[str] = None |
| |
|
| |
|
| | class BrowserAutomation: |
| | """ |
| | Headless browser automation for: |
| | - Web scraping |
| | - Form filling and submission |
| | - Screenshot capture |
| | - PDF generation |
| | - Testing web applications |
| | """ |
| | |
| | def __init__(self): |
| | self.playwright = None |
| | self.browser = None |
| | self._initialized = False |
| | |
| | async def _ensure_initialized(self): |
| | """Ensure playwright is initialized""" |
| | if not PLAYWRIGHT_AVAILABLE: |
| | raise RuntimeError("Playwright is not installed. Install with: pip install playwright") |
| | |
| | if not self._initialized: |
| | self.playwright = await async_playwright().start() |
| | self.browser = await self.playwright.chromium.launch(headless=True) |
| | self._initialized = True |
| | |
| | async def close(self): |
| | """Close browser and cleanup""" |
| | if self.browser: |
| | await self.browser.close() |
| | if self.playwright: |
| | await self.playwright.stop() |
| | self._initialized = False |
| | |
| | async def navigate_and_screenshot( |
| | self, |
| | url: str, |
| | wait_for: Optional[str] = None, |
| | viewport: Optional[Dict[str, int]] = None, |
| | full_page: bool = False |
| | ) -> BrowserResult: |
| | """Navigate to URL and take screenshot.""" |
| | |
| | try: |
| | await self._ensure_initialized() |
| | |
| | page = await self.browser.new_page( |
| | viewport=viewport or {"width": 1280, "height": 720} |
| | ) |
| | |
| | try: |
| | |
| | response = await page.goto(url, wait_until="networkidle") |
| | |
| | if wait_for: |
| | await page.wait_for_selector(wait_for, timeout=10000) |
| | |
| | |
| | screenshot = await page.screenshot( |
| | type="png", |
| | full_page=full_page |
| | ) |
| | |
| | |
| | html_content = await page.content() |
| | |
| | return BrowserResult( |
| | url=url, |
| | success=response.status < 400 if response else True, |
| | screenshot=screenshot, |
| | html=html_content, |
| | final_url=page.url |
| | ) |
| | |
| | finally: |
| | await page.close() |
| | |
| | except Exception as e: |
| | logger.error(f"Browser screenshot error: {e}") |
| | return BrowserResult( |
| | url=url, |
| | success=False, |
| | error=str(e) |
| | ) |
| | |
| | async def fill_and_submit_form( |
| | self, |
| | url: str, |
| | form_data: Dict[str, str], |
| | submit_button: Optional[str] = None, |
| | wait_for_navigation: bool = True |
| | ) -> BrowserResult: |
| | """Fill form and submit it.""" |
| | |
| | try: |
| | await self._ensure_initialized() |
| | |
| | page = await self.browser.new_page() |
| | |
| | try: |
| | |
| | await page.goto(url, wait_until="networkidle") |
| | |
| | |
| | for field_name, value in form_data.items(): |
| | |
| | selectors = [ |
| | f'input[name="{field_name}"]', |
| | f'textarea[name="{field_name}"]', |
| | f'#{field_name}', |
| | f'[data-testid="{field_name}"]', |
| | ] |
| | |
| | filled = False |
| | for selector in selectors: |
| | try: |
| | await page.fill(selector, value, timeout=2000) |
| | filled = True |
| | break |
| | except: |
| | continue |
| | |
| | if not filled: |
| | logger.warning(f"Could not find field: {field_name}") |
| | |
| | |
| | if submit_button: |
| | await page.click(submit_button) |
| | else: |
| | |
| | submit_selectors = [ |
| | 'button[type="submit"]', |
| | 'input[type="submit"]', |
| | 'button:has-text("Submit")', |
| | 'button:has-text("Send")', |
| | ] |
| | |
| | for selector in submit_selectors: |
| | try: |
| | await page.click(selector, timeout=2000) |
| | break |
| | except: |
| | continue |
| | |
| | if wait_for_navigation: |
| | await page.wait_for_load_state("networkidle") |
| | |
| | |
| | screenshot = await page.screenshot(type="png") |
| | final_url = page.url |
| | html_content = await page.content() |
| | |
| | return BrowserResult( |
| | url=url, |
| | success=True, |
| | screenshot=screenshot, |
| | html=html_content, |
| | final_url=final_url |
| | ) |
| | |
| | finally: |
| | await page.close() |
| | |
| | except Exception as e: |
| | logger.error(f"Form submission error: {e}") |
| | return BrowserResult( |
| | url=url, |
| | success=False, |
| | error=str(e) |
| | ) |
| | |
| | async def scrape_data( |
| | self, |
| | url: str, |
| | extract_script: str, |
| | wait_for: Optional[str] = None |
| | ) -> BrowserResult: |
| | """Execute custom JavaScript to extract data.""" |
| | |
| | try: |
| | await self._ensure_initialized() |
| | |
| | page = await self.browser.new_page() |
| | |
| | try: |
| | |
| | await page.goto(url, wait_until="networkidle") |
| | |
| | if wait_for: |
| | await page.wait_for_selector(wait_for, timeout=10000) |
| | |
| | |
| | data = await page.evaluate(extract_script) |
| | |
| | return BrowserResult( |
| | url=url, |
| | success=True, |
| | extracted_data=data |
| | ) |
| | |
| | finally: |
| | await page.close() |
| | |
| | except Exception as e: |
| | logger.error(f"Data scraping error: {e}") |
| | return BrowserResult( |
| | url=url, |
| | success=False, |
| | error=str(e) |
| | ) |
| | |
| | async def generate_pdf( |
| | self, |
| | url: str, |
| | output_path: Optional[str] = None |
| | ) -> BrowserResult: |
| | """Generate PDF from webpage.""" |
| | |
| | try: |
| | await self._ensure_initialized() |
| | |
| | page = await self.browser.new_page() |
| | |
| | try: |
| | |
| | await page.goto(url, wait_until="networkidle") |
| | |
| | |
| | pdf_bytes = await page.pdf( |
| | format="A4", |
| | print_background=True |
| | ) |
| | |
| | return BrowserResult( |
| | url=url, |
| | success=True, |
| | screenshot=pdf_bytes |
| | ) |
| | |
| | finally: |
| | await page.close() |
| | |
| | except Exception as e: |
| | logger.error(f"PDF generation error: {e}") |
| | return BrowserResult( |
| | url=url, |
| | success=False, |
| | error=str(e) |
| | ) |
| |
|
| |
|
| | |
| | BROWSER_SCREENSHOT_TOOL_SPEC = { |
| | "name": "browser_screenshot", |
| | "description": ( |
| | "Take a screenshot of a webpage. " |
| | "Use this to: capture visual state of websites, debug UI issues, " |
| | "create documentation screenshots, verify page rendering." |
| | ), |
| | "parameters": { |
| | "type": "object", |
| | "properties": { |
| | "url": { |
| | "type": "string", |
| | "description": "URL of the webpage to screenshot", |
| | }, |
| | "wait_for": { |
| | "type": "string", |
| | "description": "CSS selector to wait for before taking screenshot (optional)", |
| | }, |
| | "viewport_width": { |
| | "type": "number", |
| | "description": "Viewport width in pixels (default: 1280)", |
| | }, |
| | "viewport_height": { |
| | "type": "number", |
| | "description": "Viewport height in pixels (default: 720)", |
| | }, |
| | "full_page": { |
| | "type": "boolean", |
| | "description": "Capture full page or just viewport (default: false)", |
| | }, |
| | }, |
| | "required": ["url"], |
| | }, |
| | } |
| |
|
| | BROWSER_SCRAPE_TOOL_SPEC = { |
| | "name": "browser_scrape", |
| | "description": ( |
| | "Extract data from a webpage using JavaScript. " |
| | "Use this to: scrape structured data, extract text content, " |
| | "get prices, collect article titles, extract table data." |
| | ), |
| | "parameters": { |
| | "type": "object", |
| | "properties": { |
| | "url": { |
| | "type": "string", |
| | "description": "URL of the webpage to scrape", |
| | }, |
| | "extract_script": { |
| | "type": "string", |
| | "description": "JavaScript code to extract data (e.g., 'return document.title')", |
| | }, |
| | "wait_for": { |
| | "type": "string", |
| | "description": "CSS selector to wait for before scraping (optional)", |
| | }, |
| | }, |
| | "required": ["url", "extract_script"], |
| | }, |
| | } |
| |
|
| | BROWSER_FORM_TOOL_SPEC = { |
| | "name": "browser_fill_form", |
| | "description": ( |
| | "Fill and submit a form on a webpage. " |
| | "Use this to: test contact forms, submit search queries, " |
| | "automate login (with caution), test form validation." |
| | ), |
| | "parameters": { |
| | "type": "object", |
| | "properties": { |
| | "url": { |
| | "type": "string", |
| | "description": "URL of the page containing the form", |
| | }, |
| | "form_data": { |
| | "type": "object", |
| | "description": "Form field names and values to fill", |
| | }, |
| | "submit_button": { |
| | "type": "string", |
| | "description": "CSS selector for submit button (optional, auto-detected if not provided)", |
| | }, |
| | }, |
| | "required": ["url", "form_data"], |
| | }, |
| | } |
| |
|
| |
|
| | |
| | async def browser_screenshot_handler(arguments: Dict[str, Any]) -> tuple[str, bool]: |
| | """Handler for browser screenshot.""" |
| | try: |
| | if not PLAYWRIGHT_AVAILABLE: |
| | return "β Playwright is not installed. Install with: pip install playwright", False |
| | |
| | url = arguments.get("url", "").strip() |
| | wait_for = arguments.get("wait_for") |
| | viewport_width = arguments.get("viewport_width", 1280) |
| | viewport_height = arguments.get("viewport_height", 720) |
| | full_page = arguments.get("full_page", False) |
| | |
| | if not url: |
| | return "Error: URL is required", False |
| | |
| | logger.info(f"Taking screenshot of: {url}") |
| | |
| | browser = BrowserAutomation() |
| | |
| | try: |
| | result = await browser.navigate_and_screenshot( |
| | url=url, |
| | wait_for=wait_for, |
| | viewport={"width": viewport_width, "height": viewport_height}, |
| | full_page=full_page |
| | ) |
| | |
| | if not result.success: |
| | return f"β Screenshot failed: {result.error}", False |
| | |
| | |
| | screenshot_b64 = base64.b64encode(result.screenshot).decode() if result.screenshot else None |
| | |
| | output_lines = [ |
| | f"β
Screenshot captured successfully", |
| | f"π URL: {result.url}", |
| | ] |
| | |
| | if result.final_url and result.final_url != result.url: |
| | output_lines.append(f"π Final URL: {result.final_url}") |
| | |
| | if screenshot_b64: |
| | output_lines.append(f"πΈ Screenshot: data:image/png;base64,{screenshot_b64[:100]}...") |
| | output_lines.append(f"π Size: {len(result.screenshot)} bytes") |
| | |
| | return "\n".join(output_lines), True, { |
| | "id": f"screenshot_{hash(url) % 10000}", |
| | "name": "screenshot.png", |
| | "type": "image", |
| | "content": screenshot_b64, |
| | "size": len(result.screenshot) if result.screenshot else 0, |
| | } |
| | |
| | finally: |
| | await browser.close() |
| | |
| | except Exception as e: |
| | logger.error(f"Browser screenshot error: {e}") |
| | return f"β Error taking screenshot: {str(e)}", False |
| |
|
| |
|
| | async def browser_scrape_handler(arguments: Dict[str, Any]) -> tuple[str, bool]: |
| | """Handler for browser scraping.""" |
| | try: |
| | if not PLAYWRIGHT_AVAILABLE: |
| | return "β Playwright is not installed. Install with: pip install playwright", False |
| | |
| | url = arguments.get("url", "").strip() |
| | extract_script = arguments.get("extract_script", "").strip() |
| | wait_for = arguments.get("wait_for") |
| | |
| | if not url: |
| | return "Error: URL is required", False |
| | |
| | if not extract_script: |
| | return "Error: Extract script is required", False |
| | |
| | logger.info(f"Scraping data from: {url}") |
| | |
| | browser = BrowserAutomation() |
| | |
| | try: |
| | result = await browser.scrape_data( |
| | url=url, |
| | extract_script=extract_script, |
| | wait_for=wait_for |
| | ) |
| | |
| | if not result.success: |
| | return f"β Scraping failed: {result.error}", False |
| | |
| | |
| | data_str = str(result.extracted_data) |
| | if len(data_str) > 2000: |
| | data_str = data_str[:2000] + "... [truncated]" |
| | |
| | output_lines = [ |
| | f"β
Data scraped successfully", |
| | f"π URL: {result.url}", |
| | f"", |
| | f"π Extracted Data:", |
| | data_str, |
| | ] |
| | |
| | return "\n".join(output_lines), True |
| | |
| | finally: |
| | await browser.close() |
| | |
| | except Exception as e: |
| | logger.error(f"Browser scraping error: {e}") |
| | return f"β Error scraping data: {str(e)}", False |
| |
|
| |
|
| | async def browser_form_handler(arguments: Dict[str, Any]) -> tuple[str, bool]: |
| | """Handler for form filling.""" |
| | try: |
| | if not PLAYWRIGHT_AVAILABLE: |
| | return "β Playwright is not installed. Install with: pip install playwright", False |
| | |
| | url = arguments.get("url", "").strip() |
| | form_data = arguments.get("form_data", {}) |
| | submit_button = arguments.get("submit_button") |
| | |
| | if not url: |
| | return "Error: URL is required", False |
| | |
| | if not form_data: |
| | return "Error: Form data is required", False |
| | |
| | logger.info(f"Filling form on: {url}") |
| | |
| | browser = BrowserAutomation() |
| | |
| | try: |
| | result = await browser.fill_and_submit_form( |
| | url=url, |
| | form_data=form_data, |
| | submit_button=submit_button |
| | ) |
| | |
| | if not result.success: |
| | return f"β Form submission failed: {result.error}", False |
| | |
| | |
| | screenshot_b64 = base64.b64encode(result.screenshot).decode() if result.screenshot else None |
| | |
| | output_lines = [ |
| | f"β
Form submitted successfully", |
| | f"π Original URL: {result.url}", |
| | f"π Final URL: {result.final_url}", |
| | ] |
| | |
| | if screenshot_b64: |
| | output_lines.append(f"πΈ Result screenshot captured") |
| | |
| | return "\n".join(output_lines), True, { |
| | "id": f"form_result_{hash(url) % 10000}", |
| | "name": "form_result.png", |
| | "type": "image", |
| | "content": screenshot_b64, |
| | "size": len(result.screenshot) if result.screenshot else 0, |
| | } |
| | |
| | finally: |
| | await browser.close() |
| | |
| | except Exception as e: |
| | logger.error(f"Form submission error: {e}") |
| | return f"β Error submitting form: {str(e)}", False |
| |
|