File size: 6,156 Bytes
b7d510f 4588a0f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | #!/usr/bin/env python3
"""
Manus-style Browser Server
CDP screencast β WebSocket β Chrome-skin frontend
"""
import asyncio, json, logging, os
from typing import Optional, Set
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from playwright.async_api import async_playwright, Browser, BrowserContext, CDPSession, Page
log = logging.getLogger("browser")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
app = FastAPI()
# ββ Global state ββββββββββββββββββββββββββββββββββββββββββββββββββ
_pw = None
browser: Optional[Browser] = None
context: Optional[BrowserContext] = None
page: Optional[Page] = None
cdp: Optional[CDPSession] = None
clients: Set[WebSocket] = set()
VIEWPORT_W = 1280
VIEWPORT_H = 800
# ββ Broadcast helpers βββββββββββββββββββββββββββββββββββββββββββββ
async def broadcast(msg: dict):
if not clients: return
text = json.dumps(msg)
dead = set()
for ws in clients:
try: await ws.send_text(text)
except: dead.add(ws)
clients.difference_update(dead)
async def push_nav():
if page:
try:
title = await page.title()
await broadcast({"type": "nav", "url": page.url, "title": title})
except: pass
# ββ Browser init ββββββββββββββββββββββββββββββββββββββββββββββββββ
async def init_browser():
global _pw, browser, context, page, cdp
_pw = await async_playwright().start()
browser = await _pw.chromium.launch(
headless=True,
args=[
"--no-sandbox", "--disable-dev-shm-usage",
"--disable-setuid-sandbox", "--disable-gpu",
"--no-first-run", "--no-default-browser-check",
"--disable-background-timer-throttling",
"--disable-renderer-backgrounding",
f"--window-size={VIEWPORT_W},{VIEWPORT_H}",
]
)
context = await browser.new_context(
viewport={"width": VIEWPORT_W, "height": VIEWPORT_H},
user_agent=(
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
)
)
page = await context.new_page()
# Navigation events β update URL bar for all clients
async def on_nav(frame):
if frame == page.main_frame:
await push_nav()
page.on("framenavigated", on_nav)
# CDP screencast
cdp = await context.new_cdp_session(page)
async def on_frame(params):
await broadcast({"type": "frame", "data": params["data"]})
try:
await cdp.send("Page.screencastFrameAck", {"sessionId": params["sessionId"]})
except: pass
cdp.on("Page.screencastFrame", on_frame)
await cdp.send("Page.startScreencast", {
"format": "jpeg", "quality": 80,
"maxWidth": VIEWPORT_W, "maxHeight": VIEWPORT_H,
"everyNthFrame": 1,
})
await page.goto("https://www.google.com")
log.info("β
Browser ready")
@app.on_event("startup")
async def startup(): await init_browser()
# ββ WebSocket endpoint ββββββββββββββββββββββββββββββββββββββββββββ
@app.websocket("/ws")
async def ws_handler(websocket: WebSocket):
await websocket.accept()
clients.add(websocket)
await push_nav() # Send current URL to new client
try:
while True:
ev = json.loads(await websocket.receive_text())
t = ev.get("type")
if not page: continue
if t == "navigate":
url = ev["url"].strip()
if not url.startswith(("http://", "https://")):
url = ("https://" + url) if ("." in url and " " not in url) \
else f"https://www.google.com/search?q={url}"
await page.goto(url, wait_until="domcontentloaded")
elif t == "back": await page.go_back()
elif t == "forward": await page.go_forward()
elif t == "reload": await page.reload(wait_until="domcontentloaded")
elif t == "click":
await page.mouse.click(ev["x"], ev["y"])
elif t == "dblclick":
await page.mouse.dblclick(ev["x"], ev["y"])
elif t == "mousemove":
await page.mouse.move(ev["x"], ev["y"])
elif t == "mousedown":
await page.mouse.down()
elif t == "mouseup":
await page.mouse.up()
elif t == "wheel":
await page.mouse.wheel(ev.get("dx", 0), ev.get("dy", 0))
elif t == "keydown":
key = ev["key"]
if ev.get("ctrl"): await page.keyboard.down("Control")
if ev.get("shift"): await page.keyboard.down("Shift")
if ev.get("alt"): await page.keyboard.down("Alt")
await page.keyboard.down(key)
await page.keyboard.up(key)
if ev.get("ctrl"): await page.keyboard.up("Control")
if ev.get("shift"): await page.keyboard.up("Shift")
if ev.get("alt"): await page.keyboard.up("Alt")
elif t == "type":
await page.keyboard.type(ev["text"])
except WebSocketDisconnect:
clients.discard(websocket)
except Exception as e:
log.error(f"WS error: {e}")
clients.discard(websocket)
@app.get("/")
async def index():
with open("/app/static/browser.html") as f:
return HTMLResponse(f.read())
app.mount("/static", StaticFiles(directory="/app/static"), name="static")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860, log_level="warning") |