water3 / agent /tools /browser_tool.py
onewayto's picture
Upload 187 files
070daf8 verified
"""
Browser Automation Tool - Web scraping, screenshots, and form automation
Uses Playwright for headless browser control.
"""
import base64
import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# Try to import playwright, but don't fail if not installed
try:
from playwright.async_api import async_playwright, Page, Browser
PLAYWRIGHT_AVAILABLE = True
except ImportError:
PLAYWRIGHT_AVAILABLE = False
logger.warning("Playwright not installed. Browser automation will be unavailable.")
@dataclass
class BrowserResult:
"""Result of browser automation"""
url: str
success: bool
screenshot: Optional[bytes] = None
html: Optional[str] = None
extracted_data: Optional[Dict] = None
error: Optional[str] = None
final_url: Optional[str] = None
class BrowserAutomation:
"""
Headless browser automation for:
- Web scraping
- Form filling and submission
- Screenshot capture
- PDF generation
- Testing web applications
"""
def __init__(self):
self.playwright = None
self.browser = None
self._initialized = False
async def _ensure_initialized(self):
"""Ensure playwright is initialized"""
if not PLAYWRIGHT_AVAILABLE:
raise RuntimeError("Playwright is not installed. Install with: pip install playwright")
if not self._initialized:
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(headless=True)
self._initialized = True
async def close(self):
"""Close browser and cleanup"""
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
self._initialized = False
async def navigate_and_screenshot(
self,
url: str,
wait_for: Optional[str] = None,
viewport: Optional[Dict[str, int]] = None,
full_page: bool = False
) -> BrowserResult:
"""Navigate to URL and take screenshot."""
try:
await self._ensure_initialized()
page = await self.browser.new_page(
viewport=viewport or {"width": 1280, "height": 720}
)
try:
# Navigate to URL
response = await page.goto(url, wait_until="networkidle")
if wait_for:
await page.wait_for_selector(wait_for, timeout=10000)
# Take screenshot
screenshot = await page.screenshot(
type="png",
full_page=full_page
)
# Get page content
html_content = await page.content()
return BrowserResult(
url=url,
success=response.status < 400 if response else True,
screenshot=screenshot,
html=html_content,
final_url=page.url
)
finally:
await page.close()
except Exception as e:
logger.error(f"Browser screenshot error: {e}")
return BrowserResult(
url=url,
success=False,
error=str(e)
)
async def fill_and_submit_form(
self,
url: str,
form_data: Dict[str, str],
submit_button: Optional[str] = None,
wait_for_navigation: bool = True
) -> BrowserResult:
"""Fill form and submit it."""
try:
await self._ensure_initialized()
page = await self.browser.new_page()
try:
# Navigate to URL
await page.goto(url, wait_until="networkidle")
# Fill form fields
for field_name, value in form_data.items():
# Try different selectors
selectors = [
f'input[name="{field_name}"]',
f'textarea[name="{field_name}"]',
f'#{field_name}',
f'[data-testid="{field_name}"]',
]
filled = False
for selector in selectors:
try:
await page.fill(selector, value, timeout=2000)
filled = True
break
except:
continue
if not filled:
logger.warning(f"Could not find field: {field_name}")
# Submit form
if submit_button:
await page.click(submit_button)
else:
# Try to find submit button
submit_selectors = [
'button[type="submit"]',
'input[type="submit"]',
'button:has-text("Submit")',
'button:has-text("Send")',
]
for selector in submit_selectors:
try:
await page.click(selector, timeout=2000)
break
except:
continue
if wait_for_navigation:
await page.wait_for_load_state("networkidle")
# Capture result
screenshot = await page.screenshot(type="png")
final_url = page.url
html_content = await page.content()
return BrowserResult(
url=url,
success=True,
screenshot=screenshot,
html=html_content,
final_url=final_url
)
finally:
await page.close()
except Exception as e:
logger.error(f"Form submission error: {e}")
return BrowserResult(
url=url,
success=False,
error=str(e)
)
async def scrape_data(
self,
url: str,
extract_script: str,
wait_for: Optional[str] = None
) -> BrowserResult:
"""Execute custom JavaScript to extract data."""
try:
await self._ensure_initialized()
page = await self.browser.new_page()
try:
# Navigate to URL
await page.goto(url, wait_until="networkidle")
if wait_for:
await page.wait_for_selector(wait_for, timeout=10000)
# Execute extraction script
data = await page.evaluate(extract_script)
return BrowserResult(
url=url,
success=True,
extracted_data=data
)
finally:
await page.close()
except Exception as e:
logger.error(f"Data scraping error: {e}")
return BrowserResult(
url=url,
success=False,
error=str(e)
)
async def generate_pdf(
self,
url: str,
output_path: Optional[str] = None
) -> BrowserResult:
"""Generate PDF from webpage."""
try:
await self._ensure_initialized()
page = await self.browser.new_page()
try:
# Navigate to URL
await page.goto(url, wait_until="networkidle")
# Generate PDF
pdf_bytes = await page.pdf(
format="A4",
print_background=True
)
return BrowserResult(
url=url,
success=True,
screenshot=pdf_bytes # Reuse field for PDF
)
finally:
await page.close()
except Exception as e:
logger.error(f"PDF generation error: {e}")
return BrowserResult(
url=url,
success=False,
error=str(e)
)
# Tool specs for LLM
BROWSER_SCREENSHOT_TOOL_SPEC = {
"name": "browser_screenshot",
"description": (
"Take a screenshot of a webpage. "
"Use this to: capture visual state of websites, debug UI issues, "
"create documentation screenshots, verify page rendering."
),
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "URL of the webpage to screenshot",
},
"wait_for": {
"type": "string",
"description": "CSS selector to wait for before taking screenshot (optional)",
},
"viewport_width": {
"type": "number",
"description": "Viewport width in pixels (default: 1280)",
},
"viewport_height": {
"type": "number",
"description": "Viewport height in pixels (default: 720)",
},
"full_page": {
"type": "boolean",
"description": "Capture full page or just viewport (default: false)",
},
},
"required": ["url"],
},
}
BROWSER_SCRAPE_TOOL_SPEC = {
"name": "browser_scrape",
"description": (
"Extract data from a webpage using JavaScript. "
"Use this to: scrape structured data, extract text content, "
"get prices, collect article titles, extract table data."
),
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "URL of the webpage to scrape",
},
"extract_script": {
"type": "string",
"description": "JavaScript code to extract data (e.g., 'return document.title')",
},
"wait_for": {
"type": "string",
"description": "CSS selector to wait for before scraping (optional)",
},
},
"required": ["url", "extract_script"],
},
}
BROWSER_FORM_TOOL_SPEC = {
"name": "browser_fill_form",
"description": (
"Fill and submit a form on a webpage. "
"Use this to: test contact forms, submit search queries, "
"automate login (with caution), test form validation."
),
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "URL of the page containing the form",
},
"form_data": {
"type": "object",
"description": "Form field names and values to fill",
},
"submit_button": {
"type": "string",
"description": "CSS selector for submit button (optional, auto-detected if not provided)",
},
},
"required": ["url", "form_data"],
},
}
# Tool handlers
async def browser_screenshot_handler(arguments: Dict[str, Any]) -> tuple[str, bool]:
"""Handler for browser screenshot."""
try:
if not PLAYWRIGHT_AVAILABLE:
return "❌ Playwright is not installed. Install with: pip install playwright", False
url = arguments.get("url", "").strip()
wait_for = arguments.get("wait_for")
viewport_width = arguments.get("viewport_width", 1280)
viewport_height = arguments.get("viewport_height", 720)
full_page = arguments.get("full_page", False)
if not url:
return "Error: URL is required", False
logger.info(f"Taking screenshot of: {url}")
browser = BrowserAutomation()
try:
result = await browser.navigate_and_screenshot(
url=url,
wait_for=wait_for,
viewport={"width": viewport_width, "height": viewport_height},
full_page=full_page
)
if not result.success:
return f"❌ Screenshot failed: {result.error}", False
# Encode screenshot as base64
screenshot_b64 = base64.b64encode(result.screenshot).decode() if result.screenshot else None
output_lines = [
f"βœ… Screenshot captured successfully",
f"🌐 URL: {result.url}",
]
if result.final_url and result.final_url != result.url:
output_lines.append(f"πŸ”„ Final URL: {result.final_url}")
if screenshot_b64:
output_lines.append(f"πŸ“Έ Screenshot: data:image/png;base64,{screenshot_b64[:100]}...")
output_lines.append(f"πŸ“Š Size: {len(result.screenshot)} bytes")
return "\n".join(output_lines), True, {
"id": f"screenshot_{hash(url) % 10000}",
"name": "screenshot.png",
"type": "image",
"content": screenshot_b64,
"size": len(result.screenshot) if result.screenshot else 0,
}
finally:
await browser.close()
except Exception as e:
logger.error(f"Browser screenshot error: {e}")
return f"❌ Error taking screenshot: {str(e)}", False
async def browser_scrape_handler(arguments: Dict[str, Any]) -> tuple[str, bool]:
"""Handler for browser scraping."""
try:
if not PLAYWRIGHT_AVAILABLE:
return "❌ Playwright is not installed. Install with: pip install playwright", False
url = arguments.get("url", "").strip()
extract_script = arguments.get("extract_script", "").strip()
wait_for = arguments.get("wait_for")
if not url:
return "Error: URL is required", False
if not extract_script:
return "Error: Extract script is required", False
logger.info(f"Scraping data from: {url}")
browser = BrowserAutomation()
try:
result = await browser.scrape_data(
url=url,
extract_script=extract_script,
wait_for=wait_for
)
if not result.success:
return f"❌ Scraping failed: {result.error}", False
# Format extracted data
data_str = str(result.extracted_data)
if len(data_str) > 2000:
data_str = data_str[:2000] + "... [truncated]"
output_lines = [
f"βœ… Data scraped successfully",
f"🌐 URL: {result.url}",
f"",
f"πŸ“Š Extracted Data:",
data_str,
]
return "\n".join(output_lines), True
finally:
await browser.close()
except Exception as e:
logger.error(f"Browser scraping error: {e}")
return f"❌ Error scraping data: {str(e)}", False
async def browser_form_handler(arguments: Dict[str, Any]) -> tuple[str, bool]:
"""Handler for form filling."""
try:
if not PLAYWRIGHT_AVAILABLE:
return "❌ Playwright is not installed. Install with: pip install playwright", False
url = arguments.get("url", "").strip()
form_data = arguments.get("form_data", {})
submit_button = arguments.get("submit_button")
if not url:
return "Error: URL is required", False
if not form_data:
return "Error: Form data is required", False
logger.info(f"Filling form on: {url}")
browser = BrowserAutomation()
try:
result = await browser.fill_and_submit_form(
url=url,
form_data=form_data,
submit_button=submit_button
)
if not result.success:
return f"❌ Form submission failed: {result.error}", False
# Encode screenshot as base64
screenshot_b64 = base64.b64encode(result.screenshot).decode() if result.screenshot else None
output_lines = [
f"βœ… Form submitted successfully",
f"🌐 Original URL: {result.url}",
f"πŸ”„ Final URL: {result.final_url}",
]
if screenshot_b64:
output_lines.append(f"πŸ“Έ Result screenshot captured")
return "\n".join(output_lines), True, {
"id": f"form_result_{hash(url) % 10000}",
"name": "form_result.png",
"type": "image",
"content": screenshot_b64,
"size": len(result.screenshot) if result.screenshot else 0,
}
finally:
await browser.close()
except Exception as e:
logger.error(f"Form submission error: {e}")
return f"❌ Error submitting form: {str(e)}", False