File size: 5,648 Bytes
535332e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env python3
import asyncio, json, logging
from typing import Optional, Set
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from playwright.async_api import async_playwright, Browser, BrowserContext, CDPSession, Page
from playwright_stealth import stealth_async

log = logging.getLogger("browser")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")

app = FastAPI()

_pw       = None
browser:  Optional[Browser]        = None
context:  Optional[BrowserContext] = None
page:     Optional[Page]           = None
cdp:      Optional[CDPSession]     = None
clients:  Set[WebSocket]           = set()

VIEWPORT_W = 1280
VIEWPORT_H = 800

async def broadcast(msg: dict):
    if not clients: return
    text = json.dumps(msg)
    dead = set()
    for ws in clients:
        try:    await ws.send_text(text)
        except: dead.add(ws)
    clients.difference_update(dead)

async def push_nav():
    if page:
        try:
            title = await page.title()
            await broadcast({"type": "nav", "url": page.url, "title": title})
        except: pass

async def init_browser():
    global _pw, browser, context, page, cdp

    _pw     = await async_playwright().start()
    browser = await _pw.chromium.launch(
        headless=True,
        args=[
            "--no-sandbox", "--disable-dev-shm-usage",
            "--disable-setuid-sandbox", "--disable-gpu",
            "--no-first-run", "--no-default-browser-check",
            "--disable-blink-features=AutomationControlled",  # stealth
            "--disable-background-timer-throttling",
            "--disable-renderer-backgrounding",
            f"--window-size={VIEWPORT_W},{VIEWPORT_H}",
        ]
    )
    context = await browser.new_context(
        viewport={"width": VIEWPORT_W, "height": VIEWPORT_H},
        user_agent=(
            "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
            "(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
        ),
        # Stealth context flags
        java_script_enabled=True,
        accept_downloads=True,
        ignore_https_errors=True,
    )
    page = await context.new_page()

    # ── Apply stealth patches ──────────────────────────────────────
    await stealth_async(page)

    # Navigation tracking
    async def on_nav(frame):
        if frame == page.main_frame:
            await push_nav()
    page.on("framenavigated", on_nav)

    # CDP screencast
    cdp = await context.new_cdp_session(page)

    async def on_frame(params):
        await broadcast({"type": "frame", "data": params["data"]})
        try:
            await cdp.send("Page.screencastFrameAck", {"sessionId": params["sessionId"]})
        except: pass

    cdp.on("Page.screencastFrame", on_frame)
    await cdp.send("Page.startScreencast", {
        "format": "jpeg", "quality": 80,
        "maxWidth": VIEWPORT_W, "maxHeight": VIEWPORT_H,
        "everyNthFrame": 1,
    })

    await page.goto("https://www.google.com")
    log.info("βœ… Stealth browser ready")

@app.on_event("startup")
async def startup(): await init_browser()

@app.websocket("/ws")
async def ws_handler(websocket: WebSocket):
    await websocket.accept()
    clients.add(websocket)
    await push_nav()
    try:
        while True:
            ev = json.loads(await websocket.receive_text())
            t  = ev.get("type")
            if not page: continue

            if t == "navigate":
                url = ev["url"].strip()
                if not url.startswith(("http://", "https://")):
                    url = ("https://" + url) if ("." in url and " " not in url) \
                          else f"https://www.google.com/search?q={url}"
                await page.goto(url, wait_until="domcontentloaded")

            elif t == "back":    await page.go_back()
            elif t == "forward": await page.go_forward()
            elif t == "reload":  await page.reload(wait_until="domcontentloaded")
            elif t == "click":   await page.mouse.click(ev["x"], ev["y"])
            elif t == "dblclick":await page.mouse.dblclick(ev["x"], ev["y"])
            elif t == "mousemove":await page.mouse.move(ev["x"], ev["y"])
            elif t == "mousedown":await page.mouse.down()
            elif t == "mouseup": await page.mouse.up()
            elif t == "wheel":   await page.mouse.wheel(ev.get("dx",0), ev.get("dy",0))
            elif t == "keydown":
                k = ev["key"]
                if ev.get("ctrl"):  await page.keyboard.down("Control")
                if ev.get("shift"): await page.keyboard.down("Shift")
                if ev.get("alt"):   await page.keyboard.down("Alt")
                await page.keyboard.down(k)
                await page.keyboard.up(k)
                if ev.get("ctrl"):  await page.keyboard.up("Control")
                if ev.get("shift"): await page.keyboard.up("Shift")
                if ev.get("alt"):   await page.keyboard.up("Alt")
            elif t == "type":    await page.keyboard.type(ev["text"])

    except WebSocketDisconnect:
        clients.discard(websocket)
    except Exception as e:
        log.error(f"WS error: {e}")
        clients.discard(websocket)

@app.get("/")
async def index():
    with open("/app/static/browser.html") as f:
        return HTMLResponse(f.read())

app.mount("/static", StaticFiles(directory="/app/static"), name="static")

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8888, log_level="warning")