|
|
""" |
|
|
Hybrid Browser Automation Tool - Client Examples |
|
|
Demonstrates API usage, SSE streaming, and integration patterns |
|
|
""" |
|
|
|
|
|
import requests |
|
|
import json |
|
|
import asyncio |
|
|
import aiohttp |
|
|
import base64 |
|
|
from typing import Dict, Any, AsyncGenerator |
|
|
from PIL import Image |
|
|
from io import BytesIO |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
API_BASE = "http://localhost:8000" |
|
|
GRADIO_BASE = "http://localhost:7860" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class HybridBrowserClient: |
|
|
"""Client for the hybrid browser automation tool""" |
|
|
|
|
|
def __init__(self, api_base: str = API_BASE): |
|
|
self.api_base = api_base |
|
|
self.session_id = None |
|
|
|
|
|
def health_check(self) -> Dict: |
|
|
"""Check API health""" |
|
|
response = requests.get(f"{self.api_base}/health") |
|
|
return response.json() |
|
|
|
|
|
def create_session(self) -> str: |
|
|
"""Create a new browser session""" |
|
|
response = requests.post(f"{self.api_base}/sessions/create") |
|
|
data = response.json() |
|
|
self.session_id = data["session_id"] |
|
|
print(f"β
Session created: {self.session_id}") |
|
|
return self.session_id |
|
|
|
|
|
def navigate(self, url: str, wait_time: int = 3) -> Dict: |
|
|
"""Navigate to a URL""" |
|
|
if not self.session_id: |
|
|
self.create_session() |
|
|
|
|
|
data = {"url": url, "wait_time": wait_time} |
|
|
response = requests.post( |
|
|
f"{self.api_base}/sessions/{self.session_id}/navigate", |
|
|
json=data |
|
|
) |
|
|
result = response.json() |
|
|
|
|
|
if "error" in result: |
|
|
print(f"β Navigation error: {result['error']}") |
|
|
return result |
|
|
|
|
|
print(f"β
Navigated to: {result['url']}") |
|
|
print(f"π Page title: {result['title']}") |
|
|
return result |
|
|
|
|
|
def extract(self, selector: str, attribute: str = "text") -> Dict: |
|
|
"""Extract data from page elements""" |
|
|
if not self.session_id: |
|
|
raise ValueError("No active session. Call create_session() first.") |
|
|
|
|
|
data = {"selector": selector, "attribute": attribute} |
|
|
response = requests.post( |
|
|
f"{self.api_base}/sessions/{self.session_id}/extract", |
|
|
json=data |
|
|
) |
|
|
result = response.json() |
|
|
|
|
|
if "error" in result: |
|
|
print(f"β Extraction error: {result['error']}") |
|
|
return result |
|
|
|
|
|
print(f"β
Extracted {result['count']} items from selector: {selector}") |
|
|
return result |
|
|
|
|
|
def click(self, selector: str) -> Dict: |
|
|
"""Click an element""" |
|
|
if not self.session_id: |
|
|
raise ValueError("No active session. Call create_session() first.") |
|
|
|
|
|
data = {"selector": selector} |
|
|
response = requests.post( |
|
|
f"{self.api_base}/sessions/{self.session_id}/click", |
|
|
json=data |
|
|
) |
|
|
result = response.json() |
|
|
|
|
|
if "error" in result: |
|
|
print(f"β Click error: {result['error']}") |
|
|
return result |
|
|
|
|
|
print(f"β
Clicked: {selector}") |
|
|
return result |
|
|
|
|
|
def type_text(self, selector: str, text: str) -> Dict: |
|
|
"""Type text into an element""" |
|
|
if not self.session_id: |
|
|
raise ValueError("No active session. Call create_session() first.") |
|
|
|
|
|
data = {"selector": selector, "text": text} |
|
|
response = requests.post( |
|
|
f"{self.api_base}/sessions/{self.session_id}/type", |
|
|
json=data |
|
|
) |
|
|
result = response.json() |
|
|
|
|
|
if "error" in result: |
|
|
print(f"β Typing error: {result['error']}") |
|
|
return result |
|
|
|
|
|
print(f"β
Typed '{text}' into: {selector}") |
|
|
return result |
|
|
|
|
|
def screenshot(self, save_path: str = None) -> str: |
|
|
"""Take a screenshot""" |
|
|
if not self.session_id: |
|
|
raise ValueError("No active session. Call create_session() first.") |
|
|
|
|
|
response = requests.get(f"{self.api_base}/sessions/{self.session_id}/screenshot") |
|
|
result = response.json() |
|
|
|
|
|
if "error" in result: |
|
|
print(f"β Screenshot error: {result['error']}") |
|
|
return None |
|
|
|
|
|
screenshot_b64 = result["screenshot"] |
|
|
|
|
|
if save_path: |
|
|
|
|
|
image_data = base64.b64decode(screenshot_b64) |
|
|
with open(save_path, "wb") as f: |
|
|
f.write(image_data) |
|
|
print(f"β
Screenshot saved to: {save_path}") |
|
|
|
|
|
print("β
Screenshot captured") |
|
|
return screenshot_b64 |
|
|
|
|
|
def execute_task(self, actions: list) -> Dict: |
|
|
"""Execute a multi-step task""" |
|
|
data = {"actions": actions} |
|
|
response = requests.post(f"{self.api_base}/tasks/execute", json=data) |
|
|
result = response.json() |
|
|
|
|
|
if "error" in result: |
|
|
print(f"β Task error: {result['error']}") |
|
|
return result |
|
|
|
|
|
print(f"β
Task started: {result['task_id']}") |
|
|
return result |
|
|
|
|
|
def get_task_status(self, task_id: str) -> Dict: |
|
|
"""Get task execution status""" |
|
|
response = requests.get(f"{self.api_base}/tasks/{task_id}/status") |
|
|
return response.json() |
|
|
|
|
|
def list_sessions(self) -> Dict: |
|
|
"""List all active sessions""" |
|
|
response = requests.get(f"{self.api_base}/sessions") |
|
|
return response.json() |
|
|
|
|
|
def close_session(self): |
|
|
"""Close current session""" |
|
|
if self.session_id: |
|
|
response = requests.delete(f"{self.api_base}/sessions/{self.session_id}") |
|
|
result = response.json() |
|
|
print(f"β
Session closed: {self.session_id}") |
|
|
self.session_id = None |
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SSEClient: |
|
|
"""Client for Server-Sent Events streaming""" |
|
|
|
|
|
def __init__(self, api_base: str = API_BASE): |
|
|
self.api_base = api_base |
|
|
|
|
|
async def stream_session_events(self, session_id: str) -> AsyncGenerator[Dict, None]: |
|
|
"""Stream browser session events in real-time""" |
|
|
url = f"{self.api_base}/stream/{session_id}" |
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get(url) as response: |
|
|
async for line in response.content: |
|
|
line = line.decode('utf-8').strip() |
|
|
|
|
|
if line.startswith('data: '): |
|
|
data = json.loads(line[6:]) |
|
|
yield data |
|
|
|
|
|
if data.get('status') == 'closed': |
|
|
break |
|
|
|
|
|
async def stream_task_progress(self, task_id: str) -> AsyncGenerator[Dict, None]: |
|
|
"""Stream task execution progress""" |
|
|
url = f"{self.api_base}/tasks/{task_id}/stream" |
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get(url) as response: |
|
|
async for line in response.content: |
|
|
line = line.decode('utf-8').strip() |
|
|
|
|
|
if line.startswith('data: '): |
|
|
data = json.loads(line[6:]) |
|
|
yield data |
|
|
|
|
|
if data.get('status') in ['completed', 'error']: |
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MCPClient: |
|
|
"""Client for Model Context Protocol integration""" |
|
|
|
|
|
def __init__(self, api_base: str = API_BASE): |
|
|
self.api_base = api_base |
|
|
|
|
|
def get_tools(self) -> Dict: |
|
|
"""Get available MCP tools""" |
|
|
response = requests.get(f"{self.api_base}/mcp/tools") |
|
|
return response.json() |
|
|
|
|
|
def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict: |
|
|
"""Execute an MCP tool""" |
|
|
response = requests.post( |
|
|
f"{self.api_base}/mcp/execute", |
|
|
params={"tool_name": tool_name}, |
|
|
json=arguments |
|
|
) |
|
|
return response.json() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def example_1_basic_workflow(): |
|
|
"""Example 1: Basic browser automation workflow""" |
|
|
print("\\n" + "="*60) |
|
|
print("Example 1: Basic Browser Automation Workflow") |
|
|
print("="*60) |
|
|
|
|
|
client = HybridBrowserClient() |
|
|
|
|
|
try: |
|
|
|
|
|
client.create_session() |
|
|
|
|
|
|
|
|
client.navigate("https://news.ycombinator.com") |
|
|
|
|
|
|
|
|
result = client.extract(".titleline > a", "text") |
|
|
if "data" in result: |
|
|
print("\\nπ° Top 5 articles:") |
|
|
for i, title in enumerate(result["data"][:5], 1): |
|
|
print(f" {i}. {title}") |
|
|
|
|
|
|
|
|
screenshot = client.screenshot("example_screenshot.png") |
|
|
if screenshot: |
|
|
print(f"πΈ Screenshot saved (size: {len(screenshot)} chars)") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error: {e}") |
|
|
finally: |
|
|
client.close_session() |
|
|
|
|
|
def example_2_interactive_session(): |
|
|
"""Example 2: Interactive browser session with form interaction""" |
|
|
print("\\n" + "="*60) |
|
|
print("Example 2: Interactive Form Interaction") |
|
|
print("="*60) |
|
|
|
|
|
client = HybridBrowserClient() |
|
|
|
|
|
try: |
|
|
client.create_session() |
|
|
|
|
|
|
|
|
client.navigate("https://www.google.com") |
|
|
|
|
|
|
|
|
import time |
|
|
time.sleep(2) |
|
|
|
|
|
|
|
|
client.type_text('textarea[name="q"]', "browser automation selenium") |
|
|
|
|
|
|
|
|
client.click('input[name="btnK"]') |
|
|
|
|
|
print("β
Search completed successfully!") |
|
|
|
|
|
|
|
|
time.sleep(2) |
|
|
results = client.extract(".g", "innerHTML") |
|
|
if "data" in results: |
|
|
print(f"\\nπ Found {len(results['data'])} search results") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error: {e}") |
|
|
finally: |
|
|
client.close_session() |
|
|
|
|
|
async def example_3_sse_monitoring(): |
|
|
"""Example 3: Real-time monitoring with SSE""" |
|
|
print("\\n" + "="*60) |
|
|
print("Example 3: Real-time SSE Monitoring") |
|
|
print("="*60) |
|
|
|
|
|
client = HybridBrowserClient() |
|
|
sse_client = SSEClient() |
|
|
|
|
|
try: |
|
|
|
|
|
session_id = client.create_session() |
|
|
|
|
|
print("\\nπ― Starting navigation...") |
|
|
|
|
|
import asyncio |
|
|
asyncio.create_task( |
|
|
asyncio.to_thread(client.navigate, "https://example.com") |
|
|
) |
|
|
|
|
|
|
|
|
print("\\nπ‘ Monitoring session events:") |
|
|
event_count = 0 |
|
|
async for event in sse_client.stream_session_events(session_id): |
|
|
event_count += 1 |
|
|
print(f" Event {event_count}: {event.get('url', 'N/A')} | {event.get('title', 'N/A')}") |
|
|
|
|
|
if event_count >= 5: |
|
|
break |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error: {e}") |
|
|
finally: |
|
|
client.close_session() |
|
|
|
|
|
async def example_4_task_with_progress(): |
|
|
"""Example 4: Task execution with progress monitoring""" |
|
|
print("\\n" + "="*60) |
|
|
print("Example 4: Task Execution with Progress Tracking") |
|
|
print("="*60) |
|
|
|
|
|
client = HybridBrowserClient() |
|
|
sse_client = SSEClient() |
|
|
|
|
|
|
|
|
actions = [ |
|
|
{"type": "navigate", "url": "https://example.com"}, |
|
|
{"type": "wait", "seconds": 1}, |
|
|
{"type": "navigate", "url": "https://httpbin.org/html"}, |
|
|
{"type": "wait", "seconds": 1}, |
|
|
{"type": "click", "selector": "a"} |
|
|
] |
|
|
|
|
|
try: |
|
|
|
|
|
result = client.execute_task(actions) |
|
|
task_id = result["task_id"] |
|
|
print(f"\\nπ Task started: {task_id}") |
|
|
|
|
|
|
|
|
print("\\nπ Task progress:") |
|
|
async for status in sse_client.stream_task_progress(task_id): |
|
|
progress = status.get('progress', 0) |
|
|
current_step = status.get('current_step', 0) |
|
|
total_steps = status.get('total_steps', 0) |
|
|
task_status = status.get('status', 'unknown') |
|
|
|
|
|
print(f" Step {current_step}/{total_steps} - {progress:.1f}% | Status: {task_status}") |
|
|
|
|
|
if task_status in ['completed', 'error']: |
|
|
if task_status == 'completed': |
|
|
print("β
Task completed successfully!") |
|
|
else: |
|
|
print(f"β Task failed: {status.get('error', 'Unknown error')}") |
|
|
break |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error: {e}") |
|
|
|
|
|
def example_5_mcp_integration(): |
|
|
"""Example 5: Using MCP tools for AI agent integration""" |
|
|
print("\\n" + "="*60) |
|
|
print("Example 5: MCP Tools Integration") |
|
|
print("="*60) |
|
|
|
|
|
mcp_client = MCPClient() |
|
|
client = HybridBrowserClient() |
|
|
|
|
|
try: |
|
|
|
|
|
tools = mcp_client.get_tools() |
|
|
print(f"\\nπ§ Available MCP tools: {len(tools['tools'])}") |
|
|
for tool in tools['tools']: |
|
|
print(f" β’ {tool['name']}: {tool['description']}") |
|
|
|
|
|
|
|
|
session_id = client.create_session() |
|
|
|
|
|
|
|
|
print("\\nπ Using MCP to navigate...") |
|
|
result = mcp_client.execute_tool( |
|
|
"browser_navigate", |
|
|
{"session_id": session_id, "url": "https://example.com"} |
|
|
) |
|
|
print(f" Result: {result['content'][0]['text']}") |
|
|
|
|
|
|
|
|
print("\\nπ Using MCP to extract data...") |
|
|
result = mcp_client.execute_tool( |
|
|
"browser_extract", |
|
|
{"session_id": session_id, "selector": "p", "attribute": "text"} |
|
|
) |
|
|
print(f" Extracted: {len(json.loads(result['content'][0]['text'])['data'])} paragraphs") |
|
|
|
|
|
|
|
|
print("\\nπΈ Using MCP to take screenshot...") |
|
|
result = mcp_client.execute_tool( |
|
|
"browser_screenshot", |
|
|
{"session_id": session_id} |
|
|
) |
|
|
if result['content'][0]['type'] == 'image': |
|
|
print(" Screenshot captured via MCP!") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error: {e}") |
|
|
finally: |
|
|
client.close_session() |
|
|
|
|
|
def example_6_data_extraction(): |
|
|
"""Example 6: Advanced data extraction and processing""" |
|
|
print("\\n" + "="*60) |
|
|
print("Example 6: Advanced Data Extraction") |
|
|
print("="*60) |
|
|
|
|
|
client = HybridBrowserClient() |
|
|
|
|
|
try: |
|
|
client.create_session() |
|
|
client.navigate("https://quotes.toscrape.com") |
|
|
|
|
|
|
|
|
quotes = client.extract(".quote", "innerHTML") |
|
|
if "data" in quotes: |
|
|
print(f"\\n㪠Extracted {len(quotes['data'])} quotes") |
|
|
|
|
|
|
|
|
for i, quote_html in enumerate(quotes["data"][:3], 1): |
|
|
print(f"\\n Quote {i}:") |
|
|
print(f" Raw HTML: {quote_html[:100]}...") |
|
|
|
|
|
|
|
|
authors = client.extract(".author", "text") |
|
|
if "data" in authors: |
|
|
print(f"\\nπ€ Authors: {', '.join(authors['data'][:5])}") |
|
|
|
|
|
|
|
|
tags = client.extract(".tag", "text") |
|
|
if "data" in tags: |
|
|
print(f"\\nπ·οΈ Top tags: {', '.join(tags['data'][:10])}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error: {e}") |
|
|
finally: |
|
|
client.close_session() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def example_7_webscraping_pipeline(): |
|
|
"""Example 7: Complete web scraping pipeline""" |
|
|
print("\\n" + "="*60) |
|
|
print("Example 7: Web Scraping Pipeline") |
|
|
print("="*60) |
|
|
|
|
|
client = HybridBrowserClient() |
|
|
|
|
|
try: |
|
|
client.create_session() |
|
|
|
|
|
|
|
|
target_url = "https://books.toscrape.com" |
|
|
client.navigate(target_url) |
|
|
|
|
|
print(f"\\nπ Scraping: {target_url}") |
|
|
|
|
|
|
|
|
books = client.extract(".product_pod", "innerHTML") |
|
|
if "data" in books: |
|
|
print(f"\\nπ Found {len(books['data'])} books") |
|
|
|
|
|
|
|
|
processed_books = [] |
|
|
for book_html in books["data"][:5]: |
|
|
|
|
|
processed_books.append({ |
|
|
"html": book_html, |
|
|
"size": len(book_html) |
|
|
}) |
|
|
|
|
|
print(f" Processed {len(processed_books)} books") |
|
|
for i, book in enumerate(processed_books, 1): |
|
|
print(f" Book {i}: {book['size']} characters") |
|
|
|
|
|
|
|
|
client.screenshot("books_scraped.png") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error: {e}") |
|
|
finally: |
|
|
client.close_session() |
|
|
|
|
|
def example_8_monitoring_automation(): |
|
|
"""Example 8: Website monitoring automation""" |
|
|
print("\\n" + "="*60) |
|
|
print("Example 8: Website Monitoring") |
|
|
print("="*60) |
|
|
|
|
|
client = HybridBrowserClient() |
|
|
|
|
|
try: |
|
|
client.create_session() |
|
|
|
|
|
|
|
|
test_url = "https://httpbin.org/get" |
|
|
client.navigate(test_url) |
|
|
|
|
|
|
|
|
response_data = client.extract("pre", "text") |
|
|
if "data" in response_data: |
|
|
print(f"\\nπ Response data extracted:") |
|
|
print(f" Size: {len(response_data['data'][0])} characters") |
|
|
|
|
|
|
|
|
try: |
|
|
json_data = json.loads(response_data["data"][0]) |
|
|
print(f" URL: {json_data.get('url', 'N/A')}") |
|
|
print(f" User-Agent: {json_data.get('headers', {}).get('User-Agent', 'N/A')}") |
|
|
except: |
|
|
print(" Could not parse JSON") |
|
|
|
|
|
|
|
|
client.screenshot("monitoring_baseline.png") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error: {e}") |
|
|
finally: |
|
|
client.close_session() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Run all examples""" |
|
|
print("\\n" + "="*70) |
|
|
print("π HYBRID BROWSER AUTOMATION TOOL - CLIENT EXAMPLES") |
|
|
print("="*70) |
|
|
print("\\nThis demonstrates the full capabilities of the hybrid tool:") |
|
|
print(" β’ FastAPI backend with REST API") |
|
|
print(" β’ Gradio frontend for UI") |
|
|
print(" β’ SSE streaming for real-time updates") |
|
|
print(" β’ MCP integration for AI agents") |
|
|
print(" β’ Session management and task execution") |
|
|
|
|
|
|
|
|
try: |
|
|
client = HybridBrowserClient() |
|
|
health = client.health_check() |
|
|
print(f"\\nβ
API Health Check: {health['status']}") |
|
|
print(f" Active Sessions: {health['active_sessions']}") |
|
|
print(f" Active Tasks: {health['active_tasks']}") |
|
|
except: |
|
|
print("\\nβ API not available. Please start the hybrid tool first:") |
|
|
print(" python hybrid_browser_tool.py") |
|
|
return |
|
|
|
|
|
|
|
|
print("\\n" + "="*70) |
|
|
print("π RUNNING SYNCHRONOUS EXAMPLES") |
|
|
print("="*70) |
|
|
|
|
|
try: |
|
|
example_1_basic_workflow() |
|
|
example_2_interactive_session() |
|
|
example_5_mcp_integration() |
|
|
example_6_data_extraction() |
|
|
example_7_webscraping_pipeline() |
|
|
example_8_monitoring_automation() |
|
|
except Exception as e: |
|
|
print(f"\\nβ Synchronous example error: {e}") |
|
|
|
|
|
|
|
|
print("\\n" + "="*70) |
|
|
print("β‘ RUNNING ASYNCHRONOUS EXAMPLES") |
|
|
print("="*70) |
|
|
|
|
|
try: |
|
|
asyncio.run(example_3_sse_monitoring()) |
|
|
asyncio.run(example_4_task_with_progress()) |
|
|
except Exception as e: |
|
|
print(f"\\nβ Asynchronous example error: {e}") |
|
|
|
|
|
print("\\n" + "="*70) |
|
|
print("π ALL EXAMPLES COMPLETED!") |
|
|
print("="*70) |
|
|
print("\\nNext steps:") |
|
|
print(" 1. Explore the Gradio UI at: http://localhost:7860") |
|
|
print(" 2. Check API documentation at: http://localhost:8000/docs") |
|
|
print(" 3. Integrate with your own applications using the REST API") |
|
|
print(" 4. Use MCP tools for AI agent integration") |
|
|
print("="*70) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |