import asyncio
import json
import os
import base64
import sys
from pathlib import Path
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, UploadFile, Form
from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse
from playwright.async_api import async_playwright, Page
app = FastAPI()
PORT = int(os.environ.get("PORT", 7860))
BASE_DIR = Path(os.getcwd())
SCRIPTS_DIR = BASE_DIR / "scripts"
RESULTS_DIR = BASE_DIR / "results"
SCRIPTS_DIR.mkdir(exist_ok=True)
RESULTS_DIR.mkdir(exist_ok=True)
# Global browser state
pw = None
browser = None
context = None
pages = []
active_tab = 0
console_logs = {}
network_logs = {}
# =========================
# HELPER FUNCTIONS
# =========================
def resolve_url(q: str) -> str:
q = q.strip()
if not q:
return "https://example.com"
if q.startswith(("http://", "https://")):
return q
if "." in q and " " not in q and not q.startswith("localhost"):
return "https://" + q
return f"https://www.google.com/search?q={q.replace(' ', '+')}"
async def setup_page(page: Page, page_id: int):
"""Attach console and network listeners to a page"""
pid = id(page)
console_logs[pid] = []
network_logs[pid] = []
def handle_console(msg):
console_logs[pid].append({
"type": msg.type,
"text": msg.text,
"args": [str(a) for a in msg.args[:5]]
})
def handle_request(req):
network_logs[pid].append({
"type": "request",
"url": req.url,
"method": req.method,
"resourceType": req.resource_type
})
def handle_response(res):
network_logs[pid].append({
"type": "response",
"url": res.url,
"status": res.status,
"ok": res.ok
})
page.on("console", handle_console)
page.on("request", handle_request)
page.on("response", handle_response)
await page.route("**/*", lambda route: route.continue_())
async def send_tabs(ws: WebSocket):
"""Send current tab list to client"""
tabs = []
for p in pages:
try:
if not p.is_closed():
tabs.append({"url": p.url, "title": await p.title()})
else:
tabs.append({"url": "about:blank", "title": "Closed"})
except:
tabs.append({"url": "about:blank", "title": "Error"})
await ws.send_json({"type": "tabs", "tabs": tabs, "active": active_tab})
def list_scripts():
"""List saved JS files"""
return [f.name for f in SCRIPTS_DIR.glob("*.js") if f.is_file()]
# =========================
# BROWSER LIFECYCLE
# =========================
async def start_browser():
global pw, browser, context, pages
print("🚀 Starting Playwright browser...")
pw = await async_playwright().start()
browser = await pw.chromium.launch(
headless=True,
args=["--no-sandbox", "--disable-dev-shm-usage", "--disable-gpu"]
)
context = await browser.new_context(
viewport={"width": 1280, "height": 800},
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
)
page = await context.new_page()
await setup_page(page, 0)
await page.goto("https://example.com")
pages.append(page)
print(f"✅ Browser ready with 1 page")
@app.on_event("startup")
async def startup():
await start_browser()
@app.on_event("shutdown")
async def shutdown():
print("🛑 Shutting down browser...")
if browser:
await browser.close()
if pw:
await pw.stop()
# =========================
# FRONTEND HTML
# =========================
HTML = """
🕷️ HF Scraping Browser
"""
# =========================
# WEBSOCKET HANDLER
# =========================
@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
global active_tab
await ws.accept()
print(f"✅ WS accepted from {ws.client}")
await send_tabs(ws)
async def stream():
while True:
try:
if not pages or active_tab >= len(pages):
await asyncio.sleep(0.1)
continue
page = pages[active_tab]
if page.is_closed():
active_tab = max(0, active_tab - 1)
await send_tabs(ws)
continue
img = await asyncio.wait_for(
page.screenshot(type="jpeg", quality=65, scale="css"),
timeout=5.0
)
await ws.send_bytes(img)
await asyncio.sleep(0.033)
except asyncio.TimeoutError:
await asyncio.sleep(0.1)
except Exception as e:
print(f"❌ Stream error: {e}")
await asyncio.sleep(0.2)
stream_task = asyncio.create_task(stream())
try:
while True:
msg = json.loads(await ws.receive_text())
t = msg.get("t")
if not pages:
await ws.send_json({"type":"error","message":"No pages"})
continue
page = pages[active_tab] if active_tab < len(pages) else pages[0]
print(f"📨 {t} | tab={active_tab}")
if t == "init":
await send_tabs(ws)
elif t == "goto":
q = msg.get("q","").strip()
if not q:
await ws.send_json({"type":"status","text":"⚠️ Empty URL"})
continue
url = resolve_url(q)
print(f"🌐 Going to: {url}")
try:
await page.goto(url, wait_until="domcontentloaded", timeout=30000)
title = await page.title()
await ws.send_json({"type":"status","text":f"✅ {title[:40]}"})
await send_tabs(ws)
except Exception as e:
await ws.send_json({"type":"error","message":f"Nav failed: {str(e)[:80]}"})
elif t == "back":
try: await page.go_back(timeout=15000)
except Exception as e: await ws.send_json({"type":"error","message":f"Back: {e}"})
elif t == "forward":
try: await page.go_forward(timeout=15000)
except Exception as e: await ws.send_json({"type":"error","message":f"Forward: {e}"})
elif t == "reload":
try: await page.reload(wait_until="domcontentloaded", timeout=30000)
except Exception as e: await ws.send_json({"type":"error","message":f"Reload: {e}"})
elif t == "click":
try: await page.mouse.click(msg["x"], msg["y"], delay=10)
except: pass
elif t == "key":
try: await page.keyboard.press(msg["k"])
except: pass
elif t == "new-tab":
try:
np = await context.new_page()
await setup_page(np, len(pages))
pages.append(np)
active_tab = len(pages) - 1
await send_tabs(ws)
except Exception as e:
await ws.send_json({"type":"error","message":f"Tab: {e}"})
elif t == "switch-tab":
i = msg.get("index",0)
if 0 <= i < len(pages):
active_tab = i
await pages[active_tab].bring_to_front()
await send_tabs(ws)
elif t == "close-tab":
i = msg.get("index",0)
if len(pages) > 1 and 0 <= i < len(pages):
await pages[i].close()
del pages[i]
if active_tab >= len(pages): active_tab = len(pages)-1
await send_tabs(ws)
elif t == "execute-js":
code = msg.get("code","")
try:
result = await page.evaluate(f"(async()=>{{try{{{code}}}catch(e){{return{{__err__:e.message}}}}}})()")
if isinstance(result,dict) and result.get("__err__"):
await ws.send_json({"type":"js-result","data":{"error":result["__err__"]}})
else:
await ws.send_json({"type":"js-result","data":result})
except Exception as e:
await ws.send_json({"type":"js-result","data":{"error":str(e)}})
elif t == "save-file":
name = "".join(c for c in msg.get("name","script.js") if c.isalnum() or c in "._-")
(SCRIPTS_DIR / name).write_text(msg.get("content",""))
await ws.send_json({"type":"file-list","files":list_scripts()})
elif t == "load-file":
path = SCRIPTS_DIR / msg.get("name","")
if path.exists() and path.suffix==".js":
await ws.send_json({"type":"js-result","data":path.read_text()})
else:
await ws.send_json({"type":"error","message":"Not found"})
elif t == "list-files":
await ws.send_json({"type":"file-list","files":list_scripts()})
elif t == "get-html":
try:
html = await page.content()
b64 = base64.b64encode(html.encode('utf-8',errors='ignore')).decode()
await ws.send_json({"type":"html","data":b64})
except Exception as e:
await ws.send_json({"type":"error","message":f"HTML: {e}"})
elif t == "clear-logs":
pid = id(page)
console_logs[pid] = []
network_logs[pid] = []
# Forward logs
pid = id(page)
if console_logs.get(pid):
for log in console_logs[pid][-3:]:
await ws.send_json({"type":"console","data":log})
console_logs[pid] = []
if network_logs.get(pid):
for log in network_logs[pid][-5:]:
await ws.send_json({"type":"network","data":log})
network_logs[pid] = []
except WebSocketDisconnect:
print("🔌 Client disconnected")
except Exception as e:
print(f"💥 WS error: {e}")
import traceback; traceback.print_exc()
finally:
stream_task.cancel()
try: await stream_task
except: pass
# =========================
# HTTP ENDPOINTS
# =========================
@app.get("/", response_class=HTMLResponse)
async def home():
return HTML
@app.get("/health")
async def health():
return {
"status": "ok",
"tabs": len(pages),
"active": active_tab,
"alive": sum(1 for p in pages if not p.is_closed())
}
@app.get("/scripts/{filename}")
async def get_script(filename: str):
path = SCRIPTS_DIR / filename
if path.exists() and path.suffix == ".js":
return PlainTextResponse(path.read_text(), media_type="application/javascript")
return JSONResponse({"error":"Not found"}, status_code=404)
@app.post("/upload-script")
async def upload_script(file: UploadFile = None, name: str = Form(None)):
if not file:
return JSONResponse({"error":"No file"}, status_code=400)
filename = (name or file.filename).replace(".js","") + ".js"
safe = "".join(c for c in filename if c.isalnum() or c in "._-")
content = await file.read()
(SCRIPTS_DIR / safe).write_bytes(content)
return {"status":"saved","file":safe}
# =========================
# RUN
# =========================
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=PORT, log_level="info")