""" Hybrid Browser Automation Tool - Client Examples Demonstrates API usage, SSE streaming, and integration patterns """ import requests import json import asyncio import aiohttp import base64 from typing import Dict, Any, AsyncGenerator from PIL import Image from io import BytesIO # ============================================================================ # Configuration # ============================================================================ API_BASE = "http://localhost:8000" GRADIO_BASE = "http://localhost:7860" # ============================================================================ # REST API Client # ============================================================================ class HybridBrowserClient: """Client for the hybrid browser automation tool""" def __init__(self, api_base: str = API_BASE): self.api_base = api_base self.session_id = None def health_check(self) -> Dict: """Check API health""" response = requests.get(f"{self.api_base}/health") return response.json() def create_session(self) -> str: """Create a new browser session""" response = requests.post(f"{self.api_base}/sessions/create") data = response.json() self.session_id = data["session_id"] print(f"āœ… Session created: {self.session_id}") return self.session_id def navigate(self, url: str, wait_time: int = 3) -> Dict: """Navigate to a URL""" if not self.session_id: self.create_session() data = {"url": url, "wait_time": wait_time} response = requests.post( f"{self.api_base}/sessions/{self.session_id}/navigate", json=data ) result = response.json() if "error" in result: print(f"āŒ Navigation error: {result['error']}") return result print(f"āœ… Navigated to: {result['url']}") print(f"šŸ“„ Page title: {result['title']}") return result def extract(self, selector: str, attribute: str = "text") -> Dict: """Extract data from page elements""" if not self.session_id: raise ValueError("No active session. Call create_session() first.") data = {"selector": selector, "attribute": attribute} response = requests.post( f"{self.api_base}/sessions/{self.session_id}/extract", json=data ) result = response.json() if "error" in result: print(f"āŒ Extraction error: {result['error']}") return result print(f"āœ… Extracted {result['count']} items from selector: {selector}") return result def click(self, selector: str) -> Dict: """Click an element""" if not self.session_id: raise ValueError("No active session. Call create_session() first.") data = {"selector": selector} response = requests.post( f"{self.api_base}/sessions/{self.session_id}/click", json=data ) result = response.json() if "error" in result: print(f"āŒ Click error: {result['error']}") return result print(f"āœ… Clicked: {selector}") return result def type_text(self, selector: str, text: str) -> Dict: """Type text into an element""" if not self.session_id: raise ValueError("No active session. Call create_session() first.") data = {"selector": selector, "text": text} response = requests.post( f"{self.api_base}/sessions/{self.session_id}/type", json=data ) result = response.json() if "error" in result: print(f"āŒ Typing error: {result['error']}") return result print(f"āœ… Typed '{text}' into: {selector}") return result def screenshot(self, save_path: str = None) -> str: """Take a screenshot""" if not self.session_id: raise ValueError("No active session. Call create_session() first.") response = requests.get(f"{self.api_base}/sessions/{self.session_id}/screenshot") result = response.json() if "error" in result: print(f"āŒ Screenshot error: {result['error']}") return None screenshot_b64 = result["screenshot"] if save_path: # Save to file image_data = base64.b64decode(screenshot_b64) with open(save_path, "wb") as f: f.write(image_data) print(f"āœ… Screenshot saved to: {save_path}") print("āœ… Screenshot captured") return screenshot_b64 def execute_task(self, actions: list) -> Dict: """Execute a multi-step task""" data = {"actions": actions} response = requests.post(f"{self.api_base}/tasks/execute", json=data) result = response.json() if "error" in result: print(f"āŒ Task error: {result['error']}") return result print(f"āœ… Task started: {result['task_id']}") return result def get_task_status(self, task_id: str) -> Dict: """Get task execution status""" response = requests.get(f"{self.api_base}/tasks/{task_id}/status") return response.json() def list_sessions(self) -> Dict: """List all active sessions""" response = requests.get(f"{self.api_base}/sessions") return response.json() def close_session(self): """Close current session""" if self.session_id: response = requests.delete(f"{self.api_base}/sessions/{self.session_id}") result = response.json() print(f"āœ… Session closed: {self.session_id}") self.session_id = None return result # ============================================================================ # SSE Client for Real-time Streaming # ============================================================================ class SSEClient: """Client for Server-Sent Events streaming""" def __init__(self, api_base: str = API_BASE): self.api_base = api_base async def stream_session_events(self, session_id: str) -> AsyncGenerator[Dict, None]: """Stream browser session events in real-time""" url = f"{self.api_base}/stream/{session_id}" async with aiohttp.ClientSession() as session: async with session.get(url) as response: async for line in response.content: line = line.decode('utf-8').strip() if line.startswith('data: '): data = json.loads(line[6:]) yield data if data.get('status') == 'closed': break async def stream_task_progress(self, task_id: str) -> AsyncGenerator[Dict, None]: """Stream task execution progress""" url = f"{self.api_base}/tasks/{task_id}/stream" async with aiohttp.ClientSession() as session: async with session.get(url) as response: async for line in response.content: line = line.decode('utf-8').strip() if line.startswith('data: '): data = json.loads(line[6:]) yield data if data.get('status') in ['completed', 'error']: break # ============================================================================ # MCP Client # ============================================================================ class MCPClient: """Client for Model Context Protocol integration""" def __init__(self, api_base: str = API_BASE): self.api_base = api_base def get_tools(self) -> Dict: """Get available MCP tools""" response = requests.get(f"{self.api_base}/mcp/tools") return response.json() def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict: """Execute an MCP tool""" response = requests.post( f"{self.api_base}/mcp/execute", params={"tool_name": tool_name}, json=arguments ) return response.json() # ============================================================================ # Example Usage Patterns # ============================================================================ def example_1_basic_workflow(): """Example 1: Basic browser automation workflow""" print("\\n" + "="*60) print("Example 1: Basic Browser Automation Workflow") print("="*60) client = HybridBrowserClient() try: # Create session client.create_session() # Navigate to a website client.navigate("https://news.ycombinator.com") # Extract article titles result = client.extract(".titleline > a", "text") if "data" in result: print("\\nšŸ“° Top 5 articles:") for i, title in enumerate(result["data"][:5], 1): print(f" {i}. {title}") # Take screenshot screenshot = client.screenshot("example_screenshot.png") if screenshot: print(f"šŸ“ø Screenshot saved (size: {len(screenshot)} chars)") except Exception as e: print(f"āŒ Error: {e}") finally: client.close_session() def example_2_interactive_session(): """Example 2: Interactive browser session with form interaction""" print("\\n" + "="*60) print("Example 2: Interactive Form Interaction") print("="*60) client = HybridBrowserClient() try: client.create_session() # Navigate to search engine client.navigate("https://www.google.com") # Wait for page to load import time time.sleep(2) # Type in search box client.type_text('textarea[name="q"]', "browser automation selenium") # Click search button client.click('input[name="btnK"]') print("āœ… Search completed successfully!") # Extract search results time.sleep(2) results = client.extract(".g", "innerHTML") if "data" in results: print(f"\\nšŸ” Found {len(results['data'])} search results") except Exception as e: print(f"āŒ Error: {e}") finally: client.close_session() async def example_3_sse_monitoring(): """Example 3: Real-time monitoring with SSE""" print("\\n" + "="*60) print("Example 3: Real-time SSE Monitoring") print("="*60) client = HybridBrowserClient() sse_client = SSEClient() try: # Create session session_id = client.create_session() print("\\nšŸŽÆ Starting navigation...") # Start navigation in background import asyncio asyncio.create_task( asyncio.to_thread(client.navigate, "https://example.com") ) # Monitor session events print("\\nšŸ“” Monitoring session events:") event_count = 0 async for event in sse_client.stream_session_events(session_id): event_count += 1 print(f" Event {event_count}: {event.get('url', 'N/A')} | {event.get('title', 'N/A')}") if event_count >= 5: # Stop after 5 events break except Exception as e: print(f"āŒ Error: {e}") finally: client.close_session() async def example_4_task_with_progress(): """Example 4: Task execution with progress monitoring""" print("\\n" + "="*60) print("Example 4: Task Execution with Progress Tracking") print("="*60) client = HybridBrowserClient() sse_client = SSEClient() # Define multi-step task actions = [ {"type": "navigate", "url": "https://example.com"}, {"type": "wait", "seconds": 1}, {"type": "navigate", "url": "https://httpbin.org/html"}, {"type": "wait", "seconds": 1}, {"type": "click", "selector": "a"} ] try: # Start task result = client.execute_task(actions) task_id = result["task_id"] print(f"\\nšŸš€ Task started: {task_id}") # Monitor progress via SSE print("\\nšŸ“Š Task progress:") async for status in sse_client.stream_task_progress(task_id): progress = status.get('progress', 0) current_step = status.get('current_step', 0) total_steps = status.get('total_steps', 0) task_status = status.get('status', 'unknown') print(f" Step {current_step}/{total_steps} - {progress:.1f}% | Status: {task_status}") if task_status in ['completed', 'error']: if task_status == 'completed': print("āœ… Task completed successfully!") else: print(f"āŒ Task failed: {status.get('error', 'Unknown error')}") break except Exception as e: print(f"āŒ Error: {e}") def example_5_mcp_integration(): """Example 5: Using MCP tools for AI agent integration""" print("\\n" + "="*60) print("Example 5: MCP Tools Integration") print("="*60) mcp_client = MCPClient() client = HybridBrowserClient() try: # Get available tools tools = mcp_client.get_tools() print(f"\\nšŸ”§ Available MCP tools: {len(tools['tools'])}") for tool in tools['tools']: print(f" • {tool['name']}: {tool['description']}") # Create session for MCP operations session_id = client.create_session() # Use MCP tool to navigate print("\\n🌐 Using MCP to navigate...") result = mcp_client.execute_tool( "browser_navigate", {"session_id": session_id, "url": "https://example.com"} ) print(f" Result: {result['content'][0]['text']}") # Use MCP tool to extract data print("\\nšŸ” Using MCP to extract data...") result = mcp_client.execute_tool( "browser_extract", {"session_id": session_id, "selector": "p", "attribute": "text"} ) print(f" Extracted: {len(json.loads(result['content'][0]['text'])['data'])} paragraphs") # Use MCP tool to take screenshot print("\\nšŸ“ø Using MCP to take screenshot...") result = mcp_client.execute_tool( "browser_screenshot", {"session_id": session_id} ) if result['content'][0]['type'] == 'image': print(" Screenshot captured via MCP!") except Exception as e: print(f"āŒ Error: {e}") finally: client.close_session() def example_6_data_extraction(): """Example 6: Advanced data extraction and processing""" print("\\n" + "="*60) print("Example 6: Advanced Data Extraction") print("="*60) client = HybridBrowserClient() try: client.create_session() client.navigate("https://quotes.toscrape.com") # Extract quotes quotes = client.extract(".quote", "innerHTML") if "data" in quotes: print(f"\\nšŸ’¬ Extracted {len(quotes['data'])} quotes") # Process each quote for i, quote_html in enumerate(quotes["data"][:3], 1): print(f"\\n Quote {i}:") print(f" Raw HTML: {quote_html[:100]}...") # Extract author names authors = client.extract(".author", "text") if "data" in authors: print(f"\\nšŸ‘¤ Authors: {', '.join(authors['data'][:5])}") # Extract tags tags = client.extract(".tag", "text") if "data" in tags: print(f"\\nšŸ·ļø Top tags: {', '.join(tags['data'][:10])}") except Exception as e: print(f"āŒ Error: {e}") finally: client.close_session() # ============================================================================ # Integration Examples # ============================================================================ def example_7_webscraping_pipeline(): """Example 7: Complete web scraping pipeline""" print("\\n" + "="*60) print("Example 7: Web Scraping Pipeline") print("="*60) client = HybridBrowserClient() try: client.create_session() # Target website target_url = "https://books.toscrape.com" client.navigate(target_url) print(f"\\nšŸ“š Scraping: {target_url}") # Extract book information books = client.extract(".product_pod", "innerHTML") if "data" in books: print(f"\\nšŸ“– Found {len(books['data'])} books") # Process book data processed_books = [] for book_html in books["data"][:5]: # Process first 5 books # This would need more sophisticated parsing in a real scenario processed_books.append({ "html": book_html, "size": len(book_html) }) print(f" Processed {len(processed_books)} books") for i, book in enumerate(processed_books, 1): print(f" Book {i}: {book['size']} characters") # Save screenshot client.screenshot("books_scraped.png") except Exception as e: print(f"āŒ Error: {e}") finally: client.close_session() def example_8_monitoring_automation(): """Example 8: Website monitoring automation""" print("\\n" + "="*60) print("Example 8: Website Monitoring") print("="*60) client = HybridBrowserClient() try: client.create_session() # Monitor a website for changes test_url = "https://httpbin.org/get" client.navigate(test_url) # Extract response data response_data = client.extract("pre", "text") if "data" in response_data: print(f"\\nšŸ“Š Response data extracted:") print(f" Size: {len(response_data['data'][0])} characters") # Parse JSON if possible try: json_data = json.loads(response_data["data"][0]) print(f" URL: {json_data.get('url', 'N/A')}") print(f" User-Agent: {json_data.get('headers', {}).get('User-Agent', 'N/A')}") except: print(" Could not parse JSON") # Take comparison screenshot client.screenshot("monitoring_baseline.png") except Exception as e: print(f"āŒ Error: {e}") finally: client.close_session() # ============================================================================ # Main Runner # ============================================================================ def main(): """Run all examples""" print("\\n" + "="*70) print("🌐 HYBRID BROWSER AUTOMATION TOOL - CLIENT EXAMPLES") print("="*70) print("\\nThis demonstrates the full capabilities of the hybrid tool:") print(" • FastAPI backend with REST API") print(" • Gradio frontend for UI") print(" • SSE streaming for real-time updates") print(" • MCP integration for AI agents") print(" • Session management and task execution") # Check if API is running try: client = HybridBrowserClient() health = client.health_check() print(f"\\nāœ… API Health Check: {health['status']}") print(f" Active Sessions: {health['active_sessions']}") print(f" Active Tasks: {health['active_tasks']}") except: print("\\nāŒ API not available. Please start the hybrid tool first:") print(" python hybrid_browser_tool.py") return # Run synchronous examples print("\\n" + "="*70) print("šŸ”„ RUNNING SYNCHRONOUS EXAMPLES") print("="*70) try: example_1_basic_workflow() example_2_interactive_session() example_5_mcp_integration() example_6_data_extraction() example_7_webscraping_pipeline() example_8_monitoring_automation() except Exception as e: print(f"\\nāŒ Synchronous example error: {e}") # Run asynchronous examples print("\\n" + "="*70) print("⚔ RUNNING ASYNCHRONOUS EXAMPLES") print("="*70) try: asyncio.run(example_3_sse_monitoring()) asyncio.run(example_4_task_with_progress()) except Exception as e: print(f"\\nāŒ Asynchronous example error: {e}") print("\\n" + "="*70) print("šŸŽ‰ ALL EXAMPLES COMPLETED!") print("="*70) print("\\nNext steps:") print(" 1. Explore the Gradio UI at: http://localhost:7860") print(" 2. Check API documentation at: http://localhost:8000/docs") print(" 3. Integrate with your own applications using the REST API") print(" 4. Use MCP tools for AI agent integration") print("="*70) if __name__ == "__main__": main()