File size: 5,648 Bytes
535332e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | #!/usr/bin/env python3
import asyncio, json, logging
from typing import Optional, Set
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from playwright.async_api import async_playwright, Browser, BrowserContext, CDPSession, Page
from playwright_stealth import stealth_async
log = logging.getLogger("browser")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
app = FastAPI()
_pw = None
browser: Optional[Browser] = None
context: Optional[BrowserContext] = None
page: Optional[Page] = None
cdp: Optional[CDPSession] = None
clients: Set[WebSocket] = set()
VIEWPORT_W = 1280
VIEWPORT_H = 800
async def broadcast(msg: dict):
if not clients: return
text = json.dumps(msg)
dead = set()
for ws in clients:
try: await ws.send_text(text)
except: dead.add(ws)
clients.difference_update(dead)
async def push_nav():
if page:
try:
title = await page.title()
await broadcast({"type": "nav", "url": page.url, "title": title})
except: pass
async def init_browser():
global _pw, browser, context, page, cdp
_pw = await async_playwright().start()
browser = await _pw.chromium.launch(
headless=True,
args=[
"--no-sandbox", "--disable-dev-shm-usage",
"--disable-setuid-sandbox", "--disable-gpu",
"--no-first-run", "--no-default-browser-check",
"--disable-blink-features=AutomationControlled", # stealth
"--disable-background-timer-throttling",
"--disable-renderer-backgrounding",
f"--window-size={VIEWPORT_W},{VIEWPORT_H}",
]
)
context = await browser.new_context(
viewport={"width": VIEWPORT_W, "height": VIEWPORT_H},
user_agent=(
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
),
# Stealth context flags
java_script_enabled=True,
accept_downloads=True,
ignore_https_errors=True,
)
page = await context.new_page()
# ββ Apply stealth patches ββββββββββββββββββββββββββββββββββββββ
await stealth_async(page)
# Navigation tracking
async def on_nav(frame):
if frame == page.main_frame:
await push_nav()
page.on("framenavigated", on_nav)
# CDP screencast
cdp = await context.new_cdp_session(page)
async def on_frame(params):
await broadcast({"type": "frame", "data": params["data"]})
try:
await cdp.send("Page.screencastFrameAck", {"sessionId": params["sessionId"]})
except: pass
cdp.on("Page.screencastFrame", on_frame)
await cdp.send("Page.startScreencast", {
"format": "jpeg", "quality": 80,
"maxWidth": VIEWPORT_W, "maxHeight": VIEWPORT_H,
"everyNthFrame": 1,
})
await page.goto("https://www.google.com")
log.info("β
Stealth browser ready")
@app.on_event("startup")
async def startup(): await init_browser()
@app.websocket("/ws")
async def ws_handler(websocket: WebSocket):
await websocket.accept()
clients.add(websocket)
await push_nav()
try:
while True:
ev = json.loads(await websocket.receive_text())
t = ev.get("type")
if not page: continue
if t == "navigate":
url = ev["url"].strip()
if not url.startswith(("http://", "https://")):
url = ("https://" + url) if ("." in url and " " not in url) \
else f"https://www.google.com/search?q={url}"
await page.goto(url, wait_until="domcontentloaded")
elif t == "back": await page.go_back()
elif t == "forward": await page.go_forward()
elif t == "reload": await page.reload(wait_until="domcontentloaded")
elif t == "click": await page.mouse.click(ev["x"], ev["y"])
elif t == "dblclick":await page.mouse.dblclick(ev["x"], ev["y"])
elif t == "mousemove":await page.mouse.move(ev["x"], ev["y"])
elif t == "mousedown":await page.mouse.down()
elif t == "mouseup": await page.mouse.up()
elif t == "wheel": await page.mouse.wheel(ev.get("dx",0), ev.get("dy",0))
elif t == "keydown":
k = ev["key"]
if ev.get("ctrl"): await page.keyboard.down("Control")
if ev.get("shift"): await page.keyboard.down("Shift")
if ev.get("alt"): await page.keyboard.down("Alt")
await page.keyboard.down(k)
await page.keyboard.up(k)
if ev.get("ctrl"): await page.keyboard.up("Control")
if ev.get("shift"): await page.keyboard.up("Shift")
if ev.get("alt"): await page.keyboard.up("Alt")
elif t == "type": await page.keyboard.type(ev["text"])
except WebSocketDisconnect:
clients.discard(websocket)
except Exception as e:
log.error(f"WS error: {e}")
clients.discard(websocket)
@app.get("/")
async def index():
with open("/app/static/browser.html") as f:
return HTMLResponse(f.read())
app.mount("/static", StaticFiles(directory="/app/static"), name="static")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8888, log_level="warning") |