import asyncio import json import os import base64 import sys from pathlib import Path from fastapi import FastAPI, WebSocket, WebSocketDisconnect, UploadFile, Form from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse from playwright.async_api import async_playwright, Page app = FastAPI() PORT = int(os.environ.get("PORT", 7860)) BASE_DIR = Path(os.getcwd()) SCRIPTS_DIR = BASE_DIR / "scripts" RESULTS_DIR = BASE_DIR / "results" SCRIPTS_DIR.mkdir(exist_ok=True) RESULTS_DIR.mkdir(exist_ok=True) # Global browser state pw = None browser = None context = None pages = [] active_tab = 0 console_logs = {} network_logs = {} # ========================= # HELPER FUNCTIONS # ========================= def resolve_url(q: str) -> str: q = q.strip() if not q: return "https://example.com" if q.startswith(("http://", "https://")): return q if "." in q and " " not in q and not q.startswith("localhost"): return "https://" + q return f"https://www.google.com/search?q={q.replace(' ', '+')}" async def setup_page(page: Page, page_id: int): """Attach console and network listeners to a page""" pid = id(page) console_logs[pid] = [] network_logs[pid] = [] def handle_console(msg): console_logs[pid].append({ "type": msg.type, "text": msg.text, "args": [str(a) for a in msg.args[:5]] }) def handle_request(req): network_logs[pid].append({ "type": "request", "url": req.url, "method": req.method, "resourceType": req.resource_type }) def handle_response(res): network_logs[pid].append({ "type": "response", "url": res.url, "status": res.status, "ok": res.ok }) page.on("console", handle_console) page.on("request", handle_request) page.on("response", handle_response) await page.route("**/*", lambda route: route.continue_()) async def send_tabs(ws: WebSocket): """Send current tab list to client""" tabs = [] for p in pages: try: if not p.is_closed(): tabs.append({"url": p.url, "title": await p.title()}) else: tabs.append({"url": "about:blank", "title": "Closed"}) except: tabs.append({"url": "about:blank", "title": "Error"}) await ws.send_json({"type": "tabs", "tabs": tabs, "active": active_tab}) def list_scripts(): """List saved JS files""" return [f.name for f in SCRIPTS_DIR.glob("*.js") if f.is_file()] # ========================= # BROWSER LIFECYCLE # ========================= async def start_browser(): global pw, browser, context, pages print("🚀 Starting Playwright browser...") pw = await async_playwright().start() browser = await pw.chromium.launch( headless=True, args=["--no-sandbox", "--disable-dev-shm-usage", "--disable-gpu"] ) context = await browser.new_context( viewport={"width": 1280, "height": 800}, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" ) page = await context.new_page() await setup_page(page, 0) await page.goto("https://example.com") pages.append(page) print(f"✅ Browser ready with 1 page") @app.on_event("startup") async def startup(): await start_browser() @app.on_event("shutdown") async def shutdown(): print("🛑 Shutting down browser...") if browser: await browser.close() if pw: await pw.stop() # ========================= # FRONTEND HTML # ========================= HTML = """ 🕷️ HF Scraping Browser
Connecting...
""" # ========================= # WEBSOCKET HANDLER # ========================= @app.websocket("/ws") async def websocket_endpoint(ws: WebSocket): global active_tab await ws.accept() print(f"✅ WS accepted from {ws.client}") await send_tabs(ws) async def stream(): while True: try: if not pages or active_tab >= len(pages): await asyncio.sleep(0.1) continue page = pages[active_tab] if page.is_closed(): active_tab = max(0, active_tab - 1) await send_tabs(ws) continue img = await asyncio.wait_for( page.screenshot(type="jpeg", quality=65, scale="css"), timeout=5.0 ) await ws.send_bytes(img) await asyncio.sleep(0.033) except asyncio.TimeoutError: await asyncio.sleep(0.1) except Exception as e: print(f"❌ Stream error: {e}") await asyncio.sleep(0.2) stream_task = asyncio.create_task(stream()) try: while True: msg = json.loads(await ws.receive_text()) t = msg.get("t") if not pages: await ws.send_json({"type":"error","message":"No pages"}) continue page = pages[active_tab] if active_tab < len(pages) else pages[0] print(f"📨 {t} | tab={active_tab}") if t == "init": await send_tabs(ws) elif t == "goto": q = msg.get("q","").strip() if not q: await ws.send_json({"type":"status","text":"⚠️ Empty URL"}) continue url = resolve_url(q) print(f"🌐 Going to: {url}") try: await page.goto(url, wait_until="domcontentloaded", timeout=30000) title = await page.title() await ws.send_json({"type":"status","text":f"✅ {title[:40]}"}) await send_tabs(ws) except Exception as e: await ws.send_json({"type":"error","message":f"Nav failed: {str(e)[:80]}"}) elif t == "back": try: await page.go_back(timeout=15000) except Exception as e: await ws.send_json({"type":"error","message":f"Back: {e}"}) elif t == "forward": try: await page.go_forward(timeout=15000) except Exception as e: await ws.send_json({"type":"error","message":f"Forward: {e}"}) elif t == "reload": try: await page.reload(wait_until="domcontentloaded", timeout=30000) except Exception as e: await ws.send_json({"type":"error","message":f"Reload: {e}"}) elif t == "click": try: await page.mouse.click(msg["x"], msg["y"], delay=10) except: pass elif t == "key": try: await page.keyboard.press(msg["k"]) except: pass elif t == "new-tab": try: np = await context.new_page() await setup_page(np, len(pages)) pages.append(np) active_tab = len(pages) - 1 await send_tabs(ws) except Exception as e: await ws.send_json({"type":"error","message":f"Tab: {e}"}) elif t == "switch-tab": i = msg.get("index",0) if 0 <= i < len(pages): active_tab = i await pages[active_tab].bring_to_front() await send_tabs(ws) elif t == "close-tab": i = msg.get("index",0) if len(pages) > 1 and 0 <= i < len(pages): await pages[i].close() del pages[i] if active_tab >= len(pages): active_tab = len(pages)-1 await send_tabs(ws) elif t == "execute-js": code = msg.get("code","") try: result = await page.evaluate(f"(async()=>{{try{{{code}}}catch(e){{return{{__err__:e.message}}}}}})()") if isinstance(result,dict) and result.get("__err__"): await ws.send_json({"type":"js-result","data":{"error":result["__err__"]}}) else: await ws.send_json({"type":"js-result","data":result}) except Exception as e: await ws.send_json({"type":"js-result","data":{"error":str(e)}}) elif t == "save-file": name = "".join(c for c in msg.get("name","script.js") if c.isalnum() or c in "._-") (SCRIPTS_DIR / name).write_text(msg.get("content","")) await ws.send_json({"type":"file-list","files":list_scripts()}) elif t == "load-file": path = SCRIPTS_DIR / msg.get("name","") if path.exists() and path.suffix==".js": await ws.send_json({"type":"js-result","data":path.read_text()}) else: await ws.send_json({"type":"error","message":"Not found"}) elif t == "list-files": await ws.send_json({"type":"file-list","files":list_scripts()}) elif t == "get-html": try: html = await page.content() b64 = base64.b64encode(html.encode('utf-8',errors='ignore')).decode() await ws.send_json({"type":"html","data":b64}) except Exception as e: await ws.send_json({"type":"error","message":f"HTML: {e}"}) elif t == "clear-logs": pid = id(page) console_logs[pid] = [] network_logs[pid] = [] # Forward logs pid = id(page) if console_logs.get(pid): for log in console_logs[pid][-3:]: await ws.send_json({"type":"console","data":log}) console_logs[pid] = [] if network_logs.get(pid): for log in network_logs[pid][-5:]: await ws.send_json({"type":"network","data":log}) network_logs[pid] = [] except WebSocketDisconnect: print("🔌 Client disconnected") except Exception as e: print(f"💥 WS error: {e}") import traceback; traceback.print_exc() finally: stream_task.cancel() try: await stream_task except: pass # ========================= # HTTP ENDPOINTS # ========================= @app.get("/", response_class=HTMLResponse) async def home(): return HTML @app.get("/health") async def health(): return { "status": "ok", "tabs": len(pages), "active": active_tab, "alive": sum(1 for p in pages if not p.is_closed()) } @app.get("/scripts/{filename}") async def get_script(filename: str): path = SCRIPTS_DIR / filename if path.exists() and path.suffix == ".js": return PlainTextResponse(path.read_text(), media_type="application/javascript") return JSONResponse({"error":"Not found"}, status_code=404) @app.post("/upload-script") async def upload_script(file: UploadFile = None, name: str = Form(None)): if not file: return JSONResponse({"error":"No file"}, status_code=400) filename = (name or file.filename).replace(".js","") + ".js" safe = "".join(c for c in filename if c.isalnum() or c in "._-") content = await file.read() (SCRIPTS_DIR / safe).write_bytes(content) return {"status":"saved","file":safe} # ========================= # RUN # ========================= if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=PORT, log_level="info")