Spaces:
No application file
No application file
| import asyncio | |
| import base64 | |
| import json | |
| import logging | |
| import os | |
| import time | |
| import uuid | |
| from datetime import datetime | |
| from io import BytesIO | |
| from typing import Dict, List, Optional, Any | |
| from contextlib import asynccontextmanager | |
| import gradio as gr | |
| import uvicorn | |
| from fastapi import FastAPI, HTTPException, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| from fastapi.responses import FileResponse, HTMLResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from pathlib import Path | |
| from pydantic import BaseModel | |
| from playwright.async_api import async_playwright, Browser, BrowserContext, Page | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.chrome.service import Service | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from selenium.common.exceptions import TimeoutException, WebDriverException | |
| from bs4 import BeautifulSoup | |
| from PIL import Image | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Global browser instances | |
| browser_instances: Dict[str, Dict] = {} | |
| playwright_instance = None | |
| browser_pool = None | |
| # Pydantic models | |
| class BrowserLaunchRequest(BaseModel): | |
| headless: bool = True | |
| width: int = 1920 | |
| height: int = 1080 | |
| user_agent: Optional[str] = None | |
| class NavigateRequest(BaseModel): | |
| session_id: str | |
| url: str | |
| wait_until: str = "networkidle" | |
| class ScreenshotRequest(BaseModel): | |
| session_id: str | |
| full_page: bool = False | |
| selector: Optional[str] = None | |
| class ElementActionRequest(BaseModel): | |
| session_id: str | |
| selector: str | |
| action: str # click, type, scroll, hover | |
| value: Optional[str] = None | |
| class ScrapeRequest(BaseModel): | |
| session_id: str | |
| selectors: Optional[List[str]] = None | |
| extract_all: bool = False | |
| class AIExtractionRequest(BaseModel): | |
| session_id: str | |
| prompt: str | |
| target_elements: Optional[List[str]] = None | |
| # Global lifespan manager | |
| async def lifespan(app: FastAPI): | |
| global playwright_instance, browser_pool | |
| # Startup | |
| playwright_instance = await async_playwright().start() | |
| browser_pool = await playwright_instance.chromium.launch( | |
| headless=True, | |
| args=['--no-sandbox', '--disable-dev-shm-usage', '--disable-gpu'] | |
| ) | |
| logger.info("Browser pool initialized") | |
| yield | |
| # Shutdown | |
| if browser_pool: | |
| await browser_pool.close() | |
| if playwright_instance: | |
| await playwright_instance.stop() | |
| logger.info("Browser instances cleaned up") | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="Web Scraping API Service", | |
| description="Headless browser automation with Playwright and Selenium", | |
| version="1.0.0", | |
| lifespan=lifespan | |
| ) | |
| # ---------- serve the single-page UI at “/” ---------- | |
| #BASE_DIR = pathlib.Path(__file__).resolve().parent | |
| #UI_FILE = BASE_DIR / "browser_automation_ui.html" # adjust if you stored it elsewhere | |
| BASE_DIR = Path(__file__).resolve().parent | |
| UI_FILE = BASE_DIR / "browser_automation_ui.html" | |
| async def root_ui(): | |
| """Return the front-end SPA.""" | |
| return FileResponse(UI_FILE) | |
| # (Optional) if you later add JS/CSS images, create a /static mount | |
| # app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static") | |
| # ------------------------------------------------------ | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Utility functions | |
| def get_chrome_options(): | |
| """Get Chrome options for Selenium""" | |
| options = Options() | |
| options.add_argument('--headless') | |
| options.add_argument('--no-sandbox') | |
| options.add_argument('--disable-dev-shm-usage') | |
| options.add_argument('--disable-gpu') | |
| options.add_argument('--window-size=1920,1080') | |
| return options | |
| def cleanup_old_sessions(): | |
| """Clean up sessions older than 1 hour""" | |
| current_time = time.time() | |
| expired_sessions = [] | |
| for session_id, session_data in browser_instances.items(): | |
| if current_time - session_data.get('created_at', 0) > 3600: # 1 hour | |
| expired_sessions.append(session_id) | |
| for session_id in expired_sessions: | |
| asyncio.create_task(close_browser_session(session_id)) | |
| async def close_browser_session(session_id: str): | |
| """Close a specific browser session""" | |
| if session_id in browser_instances: | |
| session = browser_instances[session_id] | |
| # Close Playwright session | |
| if 'playwright_page' in session: | |
| try: | |
| await session['playwright_page'].close() | |
| await session['playwright_context'].close() | |
| except Exception as e: | |
| logger.error(f"Error closing Playwright session {session_id}: {e}") | |
| # Close Selenium session | |
| if 'selenium_driver' in session: | |
| try: | |
| session['selenium_driver'].quit() | |
| except Exception as e: | |
| logger.error(f"Error closing Selenium session {session_id}: {e}") | |
| del browser_instances[session_id] | |
| logger.info(f"Closed browser session: {session_id}") | |
| # API Endpoints | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return {"status": "healthy", "timestamp": datetime.now().isoformat()} | |
| async def launch_browser(request: BrowserLaunchRequest): | |
| """Launch a new browser instance""" | |
| session_id = str(uuid.uuid4()) | |
| try: | |
| # Launch Playwright browser | |
| context = await browser_pool.new_context( | |
| viewport={'width': request.width, 'height': request.height}, | |
| user_agent=request.user_agent | |
| ) | |
| page = await context.new_page() | |
| # Launch Selenium browser | |
| chrome_options = get_chrome_options() | |
| if request.user_agent: | |
| chrome_options.add_argument(f'--user-agent={request.user_agent}') | |
| selenium_driver = webdriver.Chrome(options=chrome_options) | |
| selenium_driver.set_window_size(request.width, request.height) | |
| # Store session | |
| browser_instances[session_id] = { | |
| 'playwright_context': context, | |
| 'playwright_page': page, | |
| 'selenium_driver': selenium_driver, | |
| 'created_at': time.time(), | |
| 'config': request.dict() | |
| } | |
| logger.info(f"Launched browser session: {session_id}") | |
| return {"session_id": session_id, "status": "launched"} | |
| except Exception as e: | |
| logger.error(f"Error launching browser: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def navigate_to_url(request: NavigateRequest): | |
| """Navigate to a URL""" | |
| if request.session_id not in browser_instances: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| session = browser_instances[request.session_id] | |
| try: | |
| # Navigate with Playwright | |
| await session['playwright_page'].goto(request.url, wait_until=request.wait_until) | |
| # Navigate with Selenium | |
| session['selenium_driver'].get(request.url) | |
| return {"status": "navigated", "url": request.url} | |
| except Exception as e: | |
| logger.error(f"Error navigating to {request.url}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def take_screenshot(request: ScreenshotRequest): | |
| """Take a screenshot""" | |
| if request.session_id not in browser_instances: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| session = browser_instances[request.session_id] | |
| try: | |
| if request.selector: | |
| # Screenshot specific element with Playwright | |
| element = await session['playwright_page'].locator(request.selector).first | |
| screenshot_bytes = await element.screenshot() | |
| else: | |
| # Full page screenshot with Playwright | |
| screenshot_bytes = await session['playwright_page'].screenshot( | |
| full_page=request.full_page | |
| ) | |
| # Convert to base64 | |
| screenshot_b64 = base64.b64encode(screenshot_bytes).decode() | |
| return { | |
| "screenshot": screenshot_b64, | |
| "format": "png", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error taking screenshot: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def perform_element_action(request: ElementActionRequest): | |
| """Perform action on an element""" | |
| if request.session_id not in browser_instances: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| session = browser_instances[request.session_id] | |
| try: | |
| page = session['playwright_page'] | |
| element = page.locator(request.selector).first | |
| if request.action == "click": | |
| await element.click() | |
| elif request.action == "type": | |
| await element.fill(request.value or "") | |
| elif request.action == "scroll": | |
| await element.scroll_into_view_if_needed() | |
| elif request.action == "hover": | |
| await element.hover() | |
| else: | |
| raise HTTPException(status_code=400, detail="Invalid action") | |
| return {"status": "completed", "action": request.action} | |
| except Exception as e: | |
| logger.error(f"Error performing action {request.action}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def inspect_page_elements(session_id: str): | |
| """Get all interactive elements on the page""" | |
| if session_id not in browser_instances: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| session = browser_instances[session_id] | |
| try: | |
| page = session['playwright_page'] | |
| # Get page content | |
| content = await page.content() | |
| soup = BeautifulSoup(content, 'html.parser') | |
| # Find interactive elements | |
| interactive_selectors = [ | |
| 'a', 'button', 'input', 'select', 'textarea', | |
| '[onclick]', '[href]', '[role="button"]' | |
| ] | |
| elements = [] | |
| for selector in interactive_selectors: | |
| found_elements = soup.select(selector) | |
| for i, elem in enumerate(found_elements): | |
| element_info = { | |
| 'tag': elem.name, | |
| 'selector': f"{selector}:nth-of-type({i+1})", | |
| 'text': elem.get_text(strip=True)[:100], | |
| 'attributes': dict(elem.attrs), | |
| 'type': elem.get('type', 'N/A') | |
| } | |
| elements.append(element_info) | |
| return {"elements": elements, "total_count": len(elements)} | |
| except Exception as e: | |
| logger.error(f"Error inspecting elements: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def scrape_content(request: ScrapeRequest): | |
| """Scrape content from the page""" | |
| if request.session_id not in browser_instances: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| session = browser_instances[request.session_id] | |
| try: | |
| page = session['playwright_page'] | |
| content = await page.content() | |
| soup = BeautifulSoup(content, 'html.parser') | |
| scraped_data = {} | |
| if request.extract_all: | |
| # Extract common elements | |
| scraped_data = { | |
| 'title': soup.title.string if soup.title else None, | |
| 'headings': [h.get_text(strip=True) for h in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])], | |
| 'paragraphs': [p.get_text(strip=True) for p in soup.find_all('p')], | |
| 'links': [{'text': a.get_text(strip=True), 'href': a.get('href')} for a in soup.find_all('a', href=True)], | |
| 'images': [{'src': img.get('src'), 'alt': img.get('alt')} for img in soup.find_all('img')], | |
| 'forms': [{'action': form.get('action'), 'method': form.get('method')} for form in soup.find_all('form')] | |
| } | |
| elif request.selectors: | |
| # Extract specific selectors | |
| for selector in request.selectors: | |
| elements = soup.select(selector) | |
| scraped_data[selector] = [elem.get_text(strip=True) for elem in elements] | |
| return {"data": scraped_data, "timestamp": datetime.now().isoformat()} | |
| except Exception as e: | |
| logger.error(f"Error scraping content: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def close_browser(session_id: str): | |
| """Close a browser session""" | |
| if session_id not in browser_instances: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| await close_browser_session(session_id) | |
| return {"status": "closed", "session_id": session_id} | |
| async def list_sessions(): | |
| """List all active browser sessions""" | |
| sessions = [] | |
| for session_id, session_data in browser_instances.items(): | |
| sessions.append({ | |
| 'session_id': session_id, | |
| 'created_at': datetime.fromtimestamp(session_data['created_at']).isoformat(), | |
| 'config': session_data['config'] | |
| }) | |
| return {"sessions": sessions, "total_count": len(sessions)} | |
| # Background task to cleanup old sessions | |
| async def startup_event(): | |
| async def cleanup_task(): | |
| while True: | |
| cleanup_old_sessions() | |
| await asyncio.sleep(300) # Clean up every 5 minutes | |
| asyncio.create_task(cleanup_task()) | |
| if __name__ == "__main__": | |
| uvicorn.run( | |
| "app:app", | |
| host="0.0.0.0", | |
| port=7860, | |
| reload=False, | |
| workers=1 | |
| ) |