| |
| """ |
| Manus-style Browser Server |
| CDP screencast β WebSocket β Chrome-skin frontend |
| """ |
| import asyncio, json, logging, os |
| from typing import Optional, Set |
| import uvicorn |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect |
| from fastapi.responses import HTMLResponse |
| from fastapi.staticfiles import StaticFiles |
| from playwright.async_api import async_playwright, Browser, BrowserContext, CDPSession, Page |
|
|
| log = logging.getLogger("browser") |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") |
|
|
| app = FastAPI() |
|
|
| |
| _pw = None |
| browser: Optional[Browser] = None |
| context: Optional[BrowserContext] = None |
| page: Optional[Page] = None |
| cdp: Optional[CDPSession] = None |
| clients: Set[WebSocket] = set() |
|
|
| VIEWPORT_W = 1280 |
| VIEWPORT_H = 800 |
|
|
| |
| async def broadcast(msg: dict): |
| if not clients: return |
| text = json.dumps(msg) |
| dead = set() |
| for ws in clients: |
| try: await ws.send_text(text) |
| except: dead.add(ws) |
| clients.difference_update(dead) |
|
|
| async def push_nav(): |
| if page: |
| try: |
| title = await page.title() |
| await broadcast({"type": "nav", "url": page.url, "title": title}) |
| except: pass |
|
|
| |
| async def init_browser(): |
| global _pw, browser, context, page, cdp |
| _pw = await async_playwright().start() |
| browser = await _pw.chromium.launch( |
| headless=True, |
| args=[ |
| "--no-sandbox", "--disable-dev-shm-usage", |
| "--disable-setuid-sandbox", "--disable-gpu", |
| "--no-first-run", "--no-default-browser-check", |
| "--disable-background-timer-throttling", |
| "--disable-renderer-backgrounding", |
| f"--window-size={VIEWPORT_W},{VIEWPORT_H}", |
| ] |
| ) |
| context = await browser.new_context( |
| viewport={"width": VIEWPORT_W, "height": VIEWPORT_H}, |
| user_agent=( |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " |
| "(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" |
| ) |
| ) |
| page = await context.new_page() |
|
|
| |
| async def on_nav(frame): |
| if frame == page.main_frame: |
| await push_nav() |
| page.on("framenavigated", on_nav) |
|
|
| |
| cdp = await context.new_cdp_session(page) |
|
|
| async def on_frame(params): |
| await broadcast({"type": "frame", "data": params["data"]}) |
| try: |
| await cdp.send("Page.screencastFrameAck", {"sessionId": params["sessionId"]}) |
| except: pass |
|
|
| cdp.on("Page.screencastFrame", on_frame) |
| await cdp.send("Page.startScreencast", { |
| "format": "jpeg", "quality": 80, |
| "maxWidth": VIEWPORT_W, "maxHeight": VIEWPORT_H, |
| "everyNthFrame": 1, |
| }) |
|
|
| await page.goto("https://www.google.com") |
| log.info("β
Browser ready") |
|
|
| @app.on_event("startup") |
| async def startup(): await init_browser() |
|
|
| |
| @app.websocket("/ws") |
| async def ws_handler(websocket: WebSocket): |
| await websocket.accept() |
| clients.add(websocket) |
| await push_nav() |
| try: |
| while True: |
| ev = json.loads(await websocket.receive_text()) |
| t = ev.get("type") |
| if not page: continue |
|
|
| if t == "navigate": |
| url = ev["url"].strip() |
| if not url.startswith(("http://", "https://")): |
| url = ("https://" + url) if ("." in url and " " not in url) \ |
| else f"https://www.google.com/search?q={url}" |
| await page.goto(url, wait_until="domcontentloaded") |
|
|
| elif t == "back": await page.go_back() |
| elif t == "forward": await page.go_forward() |
| elif t == "reload": await page.reload(wait_until="domcontentloaded") |
|
|
| elif t == "click": |
| await page.mouse.click(ev["x"], ev["y"]) |
| elif t == "dblclick": |
| await page.mouse.dblclick(ev["x"], ev["y"]) |
| elif t == "mousemove": |
| await page.mouse.move(ev["x"], ev["y"]) |
| elif t == "mousedown": |
| await page.mouse.down() |
| elif t == "mouseup": |
| await page.mouse.up() |
| elif t == "wheel": |
| await page.mouse.wheel(ev.get("dx", 0), ev.get("dy", 0)) |
| elif t == "keydown": |
| key = ev["key"] |
| if ev.get("ctrl"): await page.keyboard.down("Control") |
| if ev.get("shift"): await page.keyboard.down("Shift") |
| if ev.get("alt"): await page.keyboard.down("Alt") |
| await page.keyboard.down(key) |
| await page.keyboard.up(key) |
| if ev.get("ctrl"): await page.keyboard.up("Control") |
| if ev.get("shift"): await page.keyboard.up("Shift") |
| if ev.get("alt"): await page.keyboard.up("Alt") |
| elif t == "type": |
| await page.keyboard.type(ev["text"]) |
|
|
| except WebSocketDisconnect: |
| clients.discard(websocket) |
| except Exception as e: |
| log.error(f"WS error: {e}") |
| clients.discard(websocket) |
|
|
| @app.get("/") |
| async def index(): |
| with open("/app/static/browser.html") as f: |
| return HTMLResponse(f.read()) |
|
|
| app.mount("/static", StaticFiles(directory="/app/static"), name="static") |
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=7860, log_level="warning") |