Spaces:
No application file
No application file
| import asyncio | |
| import base64 | |
| import json | |
| import logging | |
| import os | |
| import time | |
| import uuid | |
| import pathlib | |
| import datetime as dt | |
| from io import BytesIO | |
| from typing import Dict, List, Optional, Any | |
| from contextlib import asynccontextmanager | |
| from pathlib import Path | |
| import uvicorn | |
| from fastapi import FastAPI, HTTPException, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| from fastapi.responses import FileResponse, HTMLResponse | |
| from pydantic import BaseModel | |
| from playwright.async_api import async_playwright, Browser, BrowserContext, Page | |
| # from selenium import webdriver | |
| # from selenium.webdriver.chrome.options import Options | |
| from bs4 import BeautifulSoup | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Global browser instances | |
| browser_instances: Dict[str, Dict] = {} | |
| playwright_instance = None | |
| browser_pool = None | |
| # Pydantic models | |
| class BrowserLaunchRequest(BaseModel): | |
| headless: bool = True | |
| width: int = 1920 | |
| height: int = 1080 | |
| user_agent: Optional[str] = None | |
| class NavigateRequest(BaseModel): | |
| session_id: str | |
| url: str | |
| wait_until: str = "networkidle" | |
| class ScreenshotRequest(BaseModel): | |
| session_id: str | |
| full_page: bool = False | |
| selector: Optional[str] = None | |
| class ElementActionRequest(BaseModel): | |
| session_id: str | |
| selector: str | |
| action: str # click, type, scroll, hover, textContent | |
| value: Optional[str] = None | |
| class ScrapeRequest(BaseModel): | |
| session_id: str | |
| selectors: Optional[List[str]] = None | |
| extract_all: bool = False | |
| class AIExtractionRequest(BaseModel): | |
| session_id: str | |
| prompt: str | |
| target_elements: Optional[List[str]] = None | |
| # Global lifespan manager | |
| async def lifespan(app: FastAPI): | |
| global playwright_instance, browser_pool | |
| # Startup | |
| playwright_instance = await async_playwright().start() | |
| browser_pool = await playwright_instance.chromium.launch( | |
| headless=True, | |
| args=['--no-sandbox', '--disable-dev-shm-usage', '--disable-gpu'] | |
| ) | |
| logger.info("Browser pool initialized") | |
| yield | |
| # Shutdown | |
| if browser_pool: | |
| await browser_pool.close() | |
| if playwright_instance: | |
| await playwright_instance.stop() | |
| logger.info("Browser instances cleaned up") | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="Web Scraping API Service", | |
| description="Headless browser automation with Playwright and Selenium", | |
| version="1.0.0", | |
| lifespan=lifespan | |
| ) | |
| # ---------- serve the single-page UI at “/” ---------- | |
| BASE_DIR = Path(__file__).resolve().parent | |
| UI_FILE = BASE_DIR / "browser_automation_ui.html" | |
| VIDEO_DIR = BASE_DIR / "browser_videos" | |
| VIDEO_ROOT = pathlib.Path(VIDEO_DIR) | |
| VIDEO_ROOT.mkdir(exist_ok=True, parents=True) | |
| async def root_ui(): | |
| """Return the front-end SPA.""" | |
| return FileResponse(UI_FILE) | |
| # (Optional) if you later add JS/CSS images, create a /static mount | |
| # app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static") | |
| # ------------------------------------------------------ | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Utility functions | |
| def _chrome_opts(): | |
| o = Options() | |
| o.add_argument("--headless") | |
| o.add_argument("--no-sandbox") | |
| o.add_argument("--disable-dev-shm-usage") | |
| o.add_argument("--disable-gpu") | |
| o.add_argument("--window-size=1920,1080") | |
| return o | |
| def _purge_idle(): | |
| """Clean up sessions older than 1 hour""" | |
| current_time = time.time() | |
| expired_sessions = [] | |
| for session_id, session_data in browser_instances.items(): | |
| if current_time - session_data.get('created_at', 0) > 3600: # 1 hour | |
| expired_sessions.append(session_id) | |
| for session_id in expired_sessions: | |
| asyncio.create_task(_close_session(session_id)) | |
| async def _close_session(session_id: str): | |
| if session_id not in browser_instances: | |
| return | |
| sess = browser_instances[session_id] | |
| info = browser_instances.get(session_id) | |
| if not info: | |
| logger.error("unknown session, no video saved") | |
| # Playwright | |
| try: | |
| await sess['playwright_page'].close() | |
| await sess['playwright_context'].close() | |
| #await info["playwright_browser"].close() | |
| except Exception: | |
| pass | |
| # Grab every .webm path that appeared in the session’s video dir | |
| vids = list(map(str, info["video_dir"].glob("*.webm"))) | |
| del browser_instances[session_id] | |
| logger.info("Closed session %s", session_id, "videos: ", vids) | |
| async def _startup(): | |
| async def _cleaner(): | |
| while True: | |
| _purge_idle() | |
| await asyncio.sleep(300) | |
| asyncio.create_task(_cleaner()) | |
| # API Endpoints | |
| async def health(): | |
| return {"status": "healthy", "timestamp": dt.now().isoformat()} | |
| async def launch(request: BrowserLaunchRequest): | |
| """Launch a new browser instance""" | |
| session_id = str(uuid.uuid4()) | |
| ts_dir = VIDEO_ROOT | |
| ts_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| # Launch Playwright browser | |
| ctx = await browser_pool.new_context( | |
| viewport={'width': request.width, 'height': request.height}, | |
| user_agent=request.user_agent, | |
| record_video_dir=str(ts_dir), | |
| record_video_size={"width": 1280, "height": 720} | |
| ) | |
| page = await ctx.new_page() | |
| # Store session | |
| browser_instances[session_id] = { | |
| "playwright_browser": browser_pool, | |
| "playwright_context": ctx, | |
| "playwright_page": page, | |
| "created_at": time.time(), | |
| "config": request.dict(), | |
| "video_dir": ts_dir | |
| } | |
| logger.info("Launched session %s", session_id) | |
| return {"session_id": session_id, "status": "launched"} | |
| except Exception as e: | |
| logger.error(f"Error launching browser: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def navigate(req: NavigateRequest): | |
| """Navigate to a URL""" | |
| if req.session_id not in browser_instances: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| sess = browser_instances[req.session_id] | |
| try: | |
| # Navigate with Playwright | |
| await sess["playwright_page"].goto(req.url, wait_until=req.wait_until) | |
| return {"status": "navigated", "url": req.url} | |
| except Exception as e: | |
| logger.error(f"Error navigating to {req.url}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def screenshot(req: ScreenshotRequest): | |
| """Take a screenshot""" | |
| if req.session_id not in browser_instances: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| sess = browser_instances[req.session_id] | |
| try: | |
| if req.selector: | |
| el = sess["playwright_page"].locator(req.selector).first | |
| png = await el.screenshot() | |
| else: | |
| # Full page screenshot with Playwright | |
| png = await sess["playwright_page"].screenshot(full_page=req.full_page) | |
| # Convert to base64 | |
| b64 = base64.b64encode(png).decode() | |
| return {"screenshot": b64, "format": "png", "timestamp": dt.now().isoformat()} | |
| except Exception as e: | |
| logger.error(f"Error taking screenshot: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def close(session_id: str): | |
| if session_id not in browser_instances: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| await _close_session(session_id) | |
| return {"status": "closed", "session_id": session_id} | |
| # --------------------------------------------------------------------------- # | |
| # API – Element-level | |
| # --------------------------------------------------------------------------- # | |
| async def element_action(req: ElementActionRequest): | |
| """Perform action on an element""" | |
| if req.session_id not in browser_instances: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| sess = browser_instances[req.session_id] | |
| page = sess["playwright_page"] | |
| el = page.locator(req.selector).first | |
| try: | |
| if req.action == "click": | |
| await el.click() | |
| elif req.action == "type": | |
| await el.fill(req.value or "") | |
| elif req.action == "scroll": | |
| await el.scroll_into_view_if_needed() | |
| elif req.action == "hover": | |
| await el.hover() | |
| elif req.action == "textContent": | |
| text = await el.text_content() or "" | |
| return {"status": "completed", "action": "textContent", "text": text} | |
| else: | |
| raise ValueError("unknown action") | |
| return {"status": "completed", "action": req.action} | |
| except Exception as e: | |
| logger.error(f"Error performing action {req.action}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def inspect(session_id: str): | |
| """Get all interactive elements on the page""" | |
| if session_id not in browser_instances: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| page = browser_instances[session_id]["playwright_page"] | |
| soup = BeautifulSoup(await page.content(), "html.parser") | |
| selectors = [ | |
| 'a','button','input','select','textarea', | |
| '[onclick]','[href]','[role="button"]' | |
| ] | |
| out = [] | |
| try: | |
| for sel in selectors: | |
| for idx, elem in enumerate(soup.select(sel)): | |
| out.append({ | |
| "tag": elem.name, | |
| "selector": f"{sel}:nth-of-type({idx+1})", | |
| "text": elem.get_text(strip=True)[:100], | |
| "attributes": dict(elem.attrs), | |
| "type": elem.get("type", "N/A") | |
| }) | |
| return {"elements": out, "total_count": len(out)} | |
| except Exception as e: | |
| logger.error(f"Error inspecting elements: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def scrape_content(request: ScrapeRequest): | |
| """Scrape content from the page""" | |
| if request.session_id not in browser_instances: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| session = browser_instances[request.session_id] | |
| try: | |
| page = session['playwright_page'] | |
| content = await page.content() | |
| soup = BeautifulSoup(content, 'html.parser') | |
| scraped_data = {} | |
| if request.extract_all: | |
| # Extract common elements | |
| scraped_data = { | |
| 'title': soup.title.string if soup.title else None, | |
| 'headings': [h.get_text(strip=True) for h in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])], | |
| 'paragraphs': [p.get_text(strip=True) for p in soup.find_all('p')], | |
| 'links': [{'text': a.get_text(strip=True), 'href': a.get('href')} for a in soup.find_all('a', href=True)], | |
| 'images': [{'src': img.get('src'), 'alt': img.get('alt')} for img in soup.find_all('img')], | |
| 'forms': [{'action': form.get('action'), 'method': form.get('method')} for form in soup.find_all('form')] | |
| } | |
| elif request.selectors: | |
| # Extract specific selectors | |
| for selector in request.selectors: | |
| elements = soup.select(selector) | |
| scraped_data[selector] = [elem.get_text(strip=True) for elem in elements] | |
| return {"data": scraped_data, "timestamp": dt.now().isoformat()} | |
| except Exception as e: | |
| logger.error(f"Error scraping content: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def list_sessions(): | |
| """List all active browser sessions""" | |
| sessions = [] | |
| for session_id, session_data in browser_instances.items(): | |
| sessions.append({ | |
| 'session_id': session_id, | |
| 'created_at': dt.fromtimestamp(session_data['created_at']).isoformat(), | |
| 'config': session_data['config'] | |
| }) | |
| return {"sessions": sessions, "total_count": len(sessions)} | |
| if __name__ == "__main__": | |
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False) |