Spaces:
No application file
No application file
| import os | |
| import json | |
| import base64 | |
| import asyncio | |
| import uuid | |
| from typing import Optional, Dict, Any, List | |
| from contextlib import asynccontextmanager | |
| from fastapi import FastAPI, HTTPException, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| import uvicorn | |
| from selenium import webdriver | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.common.keys import Keys | |
| from selenium.webdriver.common.action_chains import ActionChains | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.common.exceptions import TimeoutException, NoSuchElementException | |
| from webdriver_manager.chrome import ChromeDriverManager | |
| from selenium.webdriver.chrome.service import Service | |
| from PIL import Image | |
| import io | |
| # Store active browser sessions | |
| browser_sessions: Dict[str, webdriver.Chrome] = {} | |
| async def lifespan(app: FastAPI): | |
| # Startup | |
| yield | |
| # Shutdown - close all browser sessions | |
| for session_id, driver in browser_sessions.items(): | |
| try: | |
| driver.quit() | |
| except: | |
| pass | |
| browser_sessions.clear() | |
| app = FastAPI( | |
| title="Browser Automation API", | |
| description="A browser automation API similar to browser-use", | |
| version="1.0.0", | |
| lifespan=lifespan | |
| ) | |
| # CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Pydantic models | |
| class SessionResponse(BaseModel): | |
| session_id: str | |
| status: str | |
| class NavigateRequest(BaseModel): | |
| url: str | |
| class ClickRequest(BaseModel): | |
| selector: str | |
| selector_type: str = "css" # css, xpath, id, class, name | |
| class TypeRequest(BaseModel): | |
| selector: str | |
| text: str | |
| selector_type: str = "css" | |
| clear_first: bool = True | |
| class ActionResponse(BaseModel): | |
| success: bool | |
| message: str | |
| data: Optional[Dict[str, Any]] = None | |
| class ScreenshotResponse(BaseModel): | |
| success: bool | |
| screenshot: str # base64 encoded image | |
| message: str | |
| class ElementInfo(BaseModel): | |
| tag: str | |
| text: str | |
| attributes: Dict[str, str] | |
| class PageInfo(BaseModel): | |
| title: str | |
| url: str | |
| elements: List[ElementInfo] | |
| def create_chrome_driver() -> webdriver.Chrome: | |
| """Create a Chrome WebDriver instance with appropriate options""" | |
| chrome_options = Options() | |
| chrome_options.add_argument("--headless") | |
| chrome_options.add_argument("--no-sandbox") | |
| chrome_options.add_argument("--disable-dev-shm-usage") | |
| chrome_options.add_argument("--disable-gpu") | |
| chrome_options.add_argument("--window-size=1920,1080") | |
| chrome_options.add_argument("--disable-extensions") | |
| chrome_options.add_argument("--disable-plugins") | |
| chrome_options.add_argument("--disable-images") | |
| chrome_options.add_argument("--remote-debugging-port=9222") | |
| chrome_options.add_argument("--disable-web-security") | |
| chrome_options.add_argument("--allow-running-insecure-content") | |
| # Try multiple methods to get the right ChromeDriver | |
| try: | |
| # Method 1: Use webdriver-manager to auto-download matching version | |
| service = Service(ChromeDriverManager().install()) | |
| driver = webdriver.Chrome(service=service, options=chrome_options) | |
| except Exception as e1: | |
| try: | |
| # Method 2: Use system ChromeDriver if available | |
| driver = webdriver.Chrome(options=chrome_options) | |
| except Exception as e2: | |
| # Method 3: Try with explicit path | |
| try: | |
| service = Service("/usr/local/bin/chromedriver") | |
| driver = webdriver.Chrome(service=service, options=chrome_options) | |
| except Exception as e3: | |
| raise Exception(f"Failed to create Chrome driver. Tried multiple methods: {str(e1)}, {str(e2)}, {str(e3)}") | |
| driver.set_page_load_timeout(30) | |
| driver.implicitly_wait(10) | |
| return driver | |
| def get_element(driver: webdriver.Chrome, selector: str, selector_type: str = "css"): | |
| """Get element by selector""" | |
| try: | |
| if selector_type == "css": | |
| return driver.find_element(By.CSS_SELECTOR, selector) | |
| elif selector_type == "xpath": | |
| return driver.find_element(By.XPATH, selector) | |
| elif selector_type == "id": | |
| return driver.find_element(By.ID, selector) | |
| elif selector_type == "class": | |
| return driver.find_element(By.CLASS_NAME, selector) | |
| elif selector_type == "name": | |
| return driver.find_element(By.NAME, selector) | |
| else: | |
| raise ValueError(f"Unsupported selector type: {selector_type}") | |
| except (NoSuchElementException, TimeoutException) as e: | |
| raise HTTPException(status_code=404, detail=f"Element not found: {str(e)}") | |
| async def root(): | |
| return {"message": "Browser Automation API is running"} | |
| async def create_session(): | |
| """Create a new browser session""" | |
| try: | |
| session_id = str(uuid.uuid4()) | |
| driver = create_chrome_driver() | |
| browser_sessions[session_id] = driver | |
| return SessionResponse( | |
| session_id=session_id, | |
| status="created" | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to create session: {str(e)}") | |
| async def close_session(session_id: str): | |
| """Close a browser session""" | |
| if session_id not in browser_sessions: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| try: | |
| browser_sessions[session_id].quit() | |
| del browser_sessions[session_id] | |
| return {"message": "Session closed successfully"} | |
| except Exception as e: | |
| return {"message": f"Session closed with warning: {str(e)}"} | |
| async def navigate(session_id: str, request: NavigateRequest): | |
| """Navigate to a URL""" | |
| if session_id not in browser_sessions: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| driver = browser_sessions[session_id] | |
| try: | |
| driver.get(request.url) | |
| WebDriverWait(driver, 10).until( | |
| lambda d: d.execute_script("return document.readyState") == "complete" | |
| ) | |
| return ActionResponse( | |
| success=True, | |
| message=f"Successfully navigated to {request.url}", | |
| data={"current_url": driver.current_url, "title": driver.title} | |
| ) | |
| except Exception as e: | |
| return ActionResponse( | |
| success=False, | |
| message=f"Navigation failed: {str(e)}" | |
| ) | |
| async def click_element(session_id: str, request: ClickRequest): | |
| """Click an element""" | |
| if session_id not in browser_sessions: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| driver = browser_sessions[session_id] | |
| try: | |
| element = get_element(driver, request.selector, request.selector_type) | |
| # Scroll to element if needed | |
| driver.execute_script("arguments[0].scrollIntoView(true);", element) | |
| # Wait for element to be clickable | |
| WebDriverWait(driver, 10).until(EC.element_to_be_clickable(element)) | |
| element.click() | |
| return ActionResponse( | |
| success=True, | |
| message=f"Successfully clicked element: {request.selector}" | |
| ) | |
| except Exception as e: | |
| return ActionResponse( | |
| success=False, | |
| message=f"Click failed: {str(e)}" | |
| ) | |
| async def type_text(session_id: str, request: TypeRequest): | |
| """Type text into an element""" | |
| if session_id not in browser_sessions: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| driver = browser_sessions[session_id] | |
| try: | |
| element = get_element(driver, request.selector, request.selector_type) | |
| # Scroll to element | |
| driver.execute_script("arguments[0].scrollIntoView(true);", element) | |
| # Clear field if requested | |
| if request.clear_first: | |
| element.clear() | |
| # Type text | |
| element.send_keys(request.text) | |
| return ActionResponse( | |
| success=True, | |
| message=f"Successfully typed text into element: {request.selector}" | |
| ) | |
| except Exception as e: | |
| return ActionResponse( | |
| success=False, | |
| message=f"Type failed: {str(e)}" | |
| ) | |
| async def take_screenshot(session_id: str): | |
| """Take a screenshot of the current page""" | |
| if session_id not in browser_sessions: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| driver = browser_sessions[session_id] | |
| try: | |
| screenshot = driver.get_screenshot_as_png() | |
| screenshot_b64 = base64.b64encode(screenshot).decode() | |
| return ScreenshotResponse( | |
| success=True, | |
| screenshot=screenshot_b64, | |
| message="Screenshot taken successfully" | |
| ) | |
| except Exception as e: | |
| return ScreenshotResponse( | |
| success=False, | |
| screenshot="", | |
| message=f"Screenshot failed: {str(e)}" | |
| ) | |
| async def get_page_info(session_id: str): | |
| """Get information about the current page""" | |
| if session_id not in browser_sessions: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| driver = browser_sessions[session_id] | |
| try: | |
| # Get basic page info | |
| title = driver.title | |
| url = driver.current_url | |
| # Get interactive elements | |
| elements = [] | |
| interactive_tags = ["button", "input", "a", "select", "textarea"] | |
| for tag in interactive_tags: | |
| web_elements = driver.find_elements(By.TAG_NAME, tag) | |
| for elem in web_elements[:10]: # Limit to first 10 of each type | |
| try: | |
| element_info = ElementInfo( | |
| tag=elem.tag_name, | |
| text=elem.text[:100] if elem.text else "", # Limit text length | |
| attributes={ | |
| attr: elem.get_attribute(attr) or "" | |
| for attr in ["id", "class", "name", "type", "href", "onclick"] | |
| if elem.get_attribute(attr) | |
| } | |
| ) | |
| elements.append(element_info) | |
| except: | |
| continue | |
| return PageInfo( | |
| title=title, | |
| url=url, | |
| elements=elements | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to get page info: {str(e)}") | |
| async def execute_javascript(session_id: str, script: str): | |
| """Execute JavaScript on the page""" | |
| if session_id not in browser_sessions: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| driver = browser_sessions[session_id] | |
| try: | |
| result = driver.execute_script(script) | |
| return ActionResponse( | |
| success=True, | |
| message="JavaScript executed successfully", | |
| data={"result": result} | |
| ) | |
| except Exception as e: | |
| return ActionResponse( | |
| success=False, | |
| message=f"JavaScript execution failed: {str(e)}" | |
| ) | |
| async def list_sessions(): | |
| """List all active sessions""" | |
| return { | |
| "active_sessions": list(browser_sessions.keys()), | |
| "total_sessions": len(browser_sessions) | |
| } | |
| if __name__ == "__main__": | |
| uvicorn.run( | |
| "app:app", | |
| host="0.0.0.0", | |
| port=7860, | |
| log_level="info" | |
| ) |