import os import json import base64 import asyncio import uuid from typing import Optional, Dict, Any, List from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import uvicorn from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options from selenium.common.exceptions import TimeoutException, NoSuchElementException from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.chrome.service import Service from PIL import Image import io # Store active browser sessions browser_sessions: Dict[str, webdriver.Chrome] = {} @asynccontextmanager async def lifespan(app: FastAPI): # Startup yield # Shutdown - close all browser sessions for session_id, driver in browser_sessions.items(): try: driver.quit() except: pass browser_sessions.clear() app = FastAPI( title="Browser Automation API", description="A browser automation API similar to browser-use", version="1.0.0", lifespan=lifespan ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Pydantic models class SessionResponse(BaseModel): session_id: str status: str class NavigateRequest(BaseModel): url: str class ClickRequest(BaseModel): selector: str selector_type: str = "css" # css, xpath, id, class, name class TypeRequest(BaseModel): selector: str text: str selector_type: str = "css" clear_first: bool = True class ActionResponse(BaseModel): success: bool message: str data: Optional[Dict[str, Any]] = None class ScreenshotResponse(BaseModel): success: bool screenshot: str # base64 encoded image message: str class ElementInfo(BaseModel): tag: str text: str attributes: Dict[str, str] class PageInfo(BaseModel): title: str url: str elements: List[ElementInfo] def create_chrome_driver() -> webdriver.Chrome: """Create a Chrome WebDriver instance with appropriate options""" chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--window-size=1920,1080") chrome_options.add_argument("--disable-extensions") chrome_options.add_argument("--disable-plugins") chrome_options.add_argument("--disable-images") chrome_options.add_argument("--remote-debugging-port=9222") chrome_options.add_argument("--disable-web-security") chrome_options.add_argument("--allow-running-insecure-content") # Try multiple methods to get the right ChromeDriver try: # Method 1: Use webdriver-manager to auto-download matching version service = Service(ChromeDriverManager().install()) driver = webdriver.Chrome(service=service, options=chrome_options) except Exception as e1: try: # Method 2: Use system ChromeDriver if available driver = webdriver.Chrome(options=chrome_options) except Exception as e2: # Method 3: Try with explicit path try: service = Service("/usr/local/bin/chromedriver") driver = webdriver.Chrome(service=service, options=chrome_options) except Exception as e3: raise Exception(f"Failed to create Chrome driver. Tried multiple methods: {str(e1)}, {str(e2)}, {str(e3)}") driver.set_page_load_timeout(30) driver.implicitly_wait(10) return driver def get_element(driver: webdriver.Chrome, selector: str, selector_type: str = "css"): """Get element by selector""" try: if selector_type == "css": return driver.find_element(By.CSS_SELECTOR, selector) elif selector_type == "xpath": return driver.find_element(By.XPATH, selector) elif selector_type == "id": return driver.find_element(By.ID, selector) elif selector_type == "class": return driver.find_element(By.CLASS_NAME, selector) elif selector_type == "name": return driver.find_element(By.NAME, selector) else: raise ValueError(f"Unsupported selector type: {selector_type}") except (NoSuchElementException, TimeoutException) as e: raise HTTPException(status_code=404, detail=f"Element not found: {str(e)}") @app.get("/") async def root(): return {"message": "Browser Automation API is running"} @app.post("/session/create", response_model=SessionResponse) async def create_session(): """Create a new browser session""" try: session_id = str(uuid.uuid4()) driver = create_chrome_driver() browser_sessions[session_id] = driver return SessionResponse( session_id=session_id, status="created" ) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to create session: {str(e)}") @app.delete("/session/{session_id}") async def close_session(session_id: str): """Close a browser session""" if session_id not in browser_sessions: raise HTTPException(status_code=404, detail="Session not found") try: browser_sessions[session_id].quit() del browser_sessions[session_id] return {"message": "Session closed successfully"} except Exception as e: return {"message": f"Session closed with warning: {str(e)}"} @app.post("/session/{session_id}/navigate", response_model=ActionResponse) async def navigate(session_id: str, request: NavigateRequest): """Navigate to a URL""" if session_id not in browser_sessions: raise HTTPException(status_code=404, detail="Session not found") driver = browser_sessions[session_id] try: driver.get(request.url) WebDriverWait(driver, 10).until( lambda d: d.execute_script("return document.readyState") == "complete" ) return ActionResponse( success=True, message=f"Successfully navigated to {request.url}", data={"current_url": driver.current_url, "title": driver.title} ) except Exception as e: return ActionResponse( success=False, message=f"Navigation failed: {str(e)}" ) @app.post("/session/{session_id}/click", response_model=ActionResponse) async def click_element(session_id: str, request: ClickRequest): """Click an element""" if session_id not in browser_sessions: raise HTTPException(status_code=404, detail="Session not found") driver = browser_sessions[session_id] try: element = get_element(driver, request.selector, request.selector_type) # Scroll to element if needed driver.execute_script("arguments[0].scrollIntoView(true);", element) # Wait for element to be clickable WebDriverWait(driver, 10).until(EC.element_to_be_clickable(element)) element.click() return ActionResponse( success=True, message=f"Successfully clicked element: {request.selector}" ) except Exception as e: return ActionResponse( success=False, message=f"Click failed: {str(e)}" ) @app.post("/session/{session_id}/type", response_model=ActionResponse) async def type_text(session_id: str, request: TypeRequest): """Type text into an element""" if session_id not in browser_sessions: raise HTTPException(status_code=404, detail="Session not found") driver = browser_sessions[session_id] try: element = get_element(driver, request.selector, request.selector_type) # Scroll to element driver.execute_script("arguments[0].scrollIntoView(true);", element) # Clear field if requested if request.clear_first: element.clear() # Type text element.send_keys(request.text) return ActionResponse( success=True, message=f"Successfully typed text into element: {request.selector}" ) except Exception as e: return ActionResponse( success=False, message=f"Type failed: {str(e)}" ) @app.get("/session/{session_id}/screenshot", response_model=ScreenshotResponse) async def take_screenshot(session_id: str): """Take a screenshot of the current page""" if session_id not in browser_sessions: raise HTTPException(status_code=404, detail="Session not found") driver = browser_sessions[session_id] try: screenshot = driver.get_screenshot_as_png() screenshot_b64 = base64.b64encode(screenshot).decode() return ScreenshotResponse( success=True, screenshot=screenshot_b64, message="Screenshot taken successfully" ) except Exception as e: return ScreenshotResponse( success=False, screenshot="", message=f"Screenshot failed: {str(e)}" ) @app.get("/session/{session_id}/page-info", response_model=PageInfo) async def get_page_info(session_id: str): """Get information about the current page""" if session_id not in browser_sessions: raise HTTPException(status_code=404, detail="Session not found") driver = browser_sessions[session_id] try: # Get basic page info title = driver.title url = driver.current_url # Get interactive elements elements = [] interactive_tags = ["button", "input", "a", "select", "textarea"] for tag in interactive_tags: web_elements = driver.find_elements(By.TAG_NAME, tag) for elem in web_elements[:10]: # Limit to first 10 of each type try: element_info = ElementInfo( tag=elem.tag_name, text=elem.text[:100] if elem.text else "", # Limit text length attributes={ attr: elem.get_attribute(attr) or "" for attr in ["id", "class", "name", "type", "href", "onclick"] if elem.get_attribute(attr) } ) elements.append(element_info) except: continue return PageInfo( title=title, url=url, elements=elements ) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to get page info: {str(e)}") @app.get("/session/{session_id}/execute-js") async def execute_javascript(session_id: str, script: str): """Execute JavaScript on the page""" if session_id not in browser_sessions: raise HTTPException(status_code=404, detail="Session not found") driver = browser_sessions[session_id] try: result = driver.execute_script(script) return ActionResponse( success=True, message="JavaScript executed successfully", data={"result": result} ) except Exception as e: return ActionResponse( success=False, message=f"JavaScript execution failed: {str(e)}" ) @app.get("/sessions") async def list_sessions(): """List all active sessions""" return { "active_sessions": list(browser_sessions.keys()), "total_sessions": len(browser_sessions) } if __name__ == "__main__": uvicorn.run( "app:app", host="0.0.0.0", port=7860, log_level="info" )