Spaces:
Paused
Paused
| """CDP WebSocket proxy — expose browser CDP over a single port. | |
| Handles WebSocket upgrade requests to ``/cdp`` and proxies CDP messages | |
| between the external client and an internal Chromium CDP endpoint. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import base64 | |
| import hashlib | |
| import logging | |
| import os | |
| import struct | |
| from collections.abc import Callable, Coroutine | |
| from typing import Any | |
| from veilrender.auth import verify_token | |
| from veilrender._vendor.httpserver import Request | |
| logger = logging.getLogger(__name__) | |
| # WebSocket opcodes | |
| _OP_TEXT = 0x1 | |
| _OP_BINARY = 0x2 | |
| _OP_CLOSE = 0x8 | |
| _OP_PING = 0x9 | |
| _OP_PONG = 0xA | |
| _WS_MAGIC = b"258EAFA5-E914-47DA-95CA-5B0D11CF9245" | |
| # Maximum allowed WebSocket frame payload size (16 MB). | |
| # Frames exceeding this limit are rejected to prevent OOM from | |
| # malicious headers claiming excessively large payloads. | |
| MAX_FRAME_SIZE = 16 * 1024 * 1024 | |
| def _accept_key(key: str) -> str: | |
| """Compute Sec-WebSocket-Accept from Sec-WebSocket-Key.""" | |
| digest = hashlib.sha1(key.encode() + _WS_MAGIC).digest() | |
| return base64.b64encode(digest).decode() | |
| async def _read_ws_frame( | |
| reader: asyncio.StreamReader, | |
| ) -> tuple[bool, int, bytes] | None: | |
| """Read one WebSocket frame. Returns (fin, opcode, payload) or None on EOF.""" | |
| try: | |
| head = await reader.readexactly(2) | |
| except (asyncio.IncompleteReadError, ConnectionError): | |
| return None | |
| fin = bool(head[0] & 0x80) | |
| opcode = head[0] & 0x0F | |
| masked = bool(head[1] & 0x80) | |
| length = head[1] & 0x7F | |
| if length == 126: | |
| raw = await reader.readexactly(2) | |
| length = struct.unpack("!H", raw)[0] | |
| elif length == 127: | |
| raw = await reader.readexactly(8) | |
| length = struct.unpack("!Q", raw)[0] | |
| if length > MAX_FRAME_SIZE: | |
| logger.warning("WebSocket frame too large (%d bytes), dropping", length) | |
| return None | |
| if masked: | |
| mask = await reader.readexactly(4) | |
| data = bytearray(await reader.readexactly(length)) | |
| for i in range(length): | |
| data[i] ^= mask[i % 4] | |
| return fin, opcode, bytes(data) | |
| else: | |
| data = await reader.readexactly(length) | |
| return fin, opcode, data | |
| def _make_ws_frame( | |
| opcode: int, payload: bytes, *, fin: bool = True, mask: bool = False | |
| ) -> bytes: | |
| """Build a WebSocket frame.""" | |
| frame = bytearray() | |
| frame.append((0x80 if fin else 0x00) | opcode) | |
| length = len(payload) | |
| if length < 126: | |
| frame.append((0x80 if mask else 0) | length) | |
| elif length < 65536: | |
| frame.append((0x80 if mask else 0) | 126) | |
| frame.extend(struct.pack("!H", length)) | |
| else: | |
| frame.append((0x80 if mask else 0) | 127) | |
| frame.extend(struct.pack("!Q", length)) | |
| if mask: | |
| mask_key = os.urandom(4) | |
| frame.extend(mask_key) | |
| masked = bytearray(payload) | |
| for i in range(length): | |
| masked[i] ^= mask_key[i % 4] | |
| frame.extend(masked) | |
| else: | |
| frame.extend(payload) | |
| return bytes(frame) | |
| async def _proxy_ws( | |
| src_reader: asyncio.StreamReader, | |
| dst_writer: asyncio.StreamWriter, | |
| *, | |
| src_masked: bool, | |
| dst_mask: bool, | |
| label: str, | |
| ) -> None: | |
| """Forward WebSocket frames from src to dst until close/EOF.""" | |
| while True: | |
| result = await _read_ws_frame(src_reader) | |
| if result is None: | |
| break | |
| fin, opcode, payload = result | |
| if opcode == _OP_CLOSE: | |
| frame = _make_ws_frame(_OP_CLOSE, payload, fin=True, mask=dst_mask) | |
| dst_writer.write(frame) | |
| await dst_writer.drain() | |
| break | |
| elif opcode == _OP_PING: | |
| # Forward pings as-is to the other side | |
| frame = _make_ws_frame(_OP_PING, payload, fin=True, mask=dst_mask) | |
| dst_writer.write(frame) | |
| await dst_writer.drain() | |
| else: | |
| frame = _make_ws_frame(opcode, payload, fin=fin, mask=dst_mask) | |
| dst_writer.write(frame) | |
| await dst_writer.drain() | |
| def is_websocket_upgrade(method: str, headers: dict[str, str], path: str) -> bool: | |
| """Check if this HTTP request is a WebSocket upgrade to /cdp.""" | |
| if not path.startswith("/cdp"): | |
| return False | |
| upgrade = headers.get("upgrade", "").lower() | |
| connection = headers.get("connection", "").lower() | |
| return method == "GET" and "websocket" in upgrade and "upgrade" in connection | |
| async def handle_cdp_upgrade( | |
| reader: asyncio.StreamReader, | |
| writer: asyncio.StreamWriter, | |
| headers: dict[str, str], | |
| path: str, | |
| query_string: str, | |
| get_cdp_url: Callable[[], Coroutine[Any, Any, str | None]], | |
| ) -> None: | |
| """Handle a WebSocket upgrade for CDP proxying. | |
| Args: | |
| reader: Client's stream reader. | |
| writer: Client's stream writer. | |
| headers: Parsed HTTP headers from the upgrade request. | |
| path: Request path. | |
| query_string: Raw query string. | |
| get_cdp_url: Async callable that returns the internal CDP WebSocket URL, | |
| or None if no browser is available. | |
| """ | |
| # Auth check — build a minimal Request object for verify_token | |
| try: | |
| req = Request( | |
| method="GET", | |
| path=path, | |
| query_string=query_string, | |
| headers=headers, | |
| body=b"", | |
| client_addr=writer.get_extra_info("peername") or ("0.0.0.0", 0), | |
| ) | |
| verify_token(req) | |
| except Exception: | |
| writer.write(b"HTTP/1.1 401 Unauthorized\r\n\r\n") | |
| await writer.drain() | |
| writer.close() | |
| return | |
| # Get WebSocket accept key | |
| ws_key = headers.get("sec-websocket-key", "") | |
| if not ws_key: | |
| writer.write(b"HTTP/1.1 400 Bad Request\r\n\r\nMissing Sec-WebSocket-Key\r\n") | |
| await writer.drain() | |
| writer.close() | |
| return | |
| # Get internal CDP URL | |
| cdp_url = await get_cdp_url() | |
| if cdp_url is None: | |
| writer.write( | |
| b"HTTP/1.1 503 Service Unavailable\r\n\r\nNo browser available\r\n" | |
| ) | |
| await writer.drain() | |
| writer.close() | |
| return | |
| # Connect to internal CDP | |
| # Parse ws://host:port/path from cdp_url | |
| from urllib.parse import urlparse | |
| parsed = urlparse(cdp_url) | |
| cdp_host = parsed.hostname or "127.0.0.1" | |
| cdp_port = parsed.port or 9222 | |
| cdp_path = parsed.path or "/" | |
| try: | |
| cdp_reader, cdp_writer = await asyncio.open_connection(cdp_host, cdp_port) | |
| except Exception as exc: | |
| logger.error("Failed to connect to CDP at %s: %s", cdp_url, exc) | |
| writer.write(b"HTTP/1.1 502 Bad Gateway\r\n\r\n") | |
| await writer.drain() | |
| writer.close() | |
| return | |
| # WebSocket handshake with internal CDP | |
| cdp_ws_key = base64.b64encode(os.urandom(16)).decode() | |
| ws_protocol = headers.get("sec-websocket-protocol", "") | |
| handshake = ( | |
| f"GET {cdp_path} HTTP/1.1\r\n" | |
| f"Host: {cdp_host}:{cdp_port}\r\n" | |
| f"Upgrade: websocket\r\n" | |
| f"Connection: Upgrade\r\n" | |
| f"Sec-WebSocket-Version: 13\r\n" | |
| f"Sec-WebSocket-Key: {cdp_ws_key}\r\n" | |
| ) | |
| if ws_protocol: | |
| handshake += f"Sec-WebSocket-Protocol: {ws_protocol}\r\n" | |
| handshake += "\r\n" | |
| cdp_writer.write(handshake.encode()) | |
| await cdp_writer.drain() | |
| # Read CDP handshake response | |
| cdp_response = b"" | |
| while b"\r\n\r\n" not in cdp_response: | |
| chunk = await cdp_reader.read(4096) | |
| if not chunk: | |
| logger.error("CDP handshake failed: connection closed") | |
| writer.write(b"HTTP/1.1 502 Bad Gateway\r\n\r\n") | |
| await writer.drain() | |
| writer.close() | |
| cdp_writer.close() | |
| return | |
| cdp_response += chunk | |
| if b"101" not in cdp_response.split(b"\r\n")[0]: | |
| logger.error("CDP handshake failed: %s", cdp_response[:200]) | |
| writer.write(b"HTTP/1.1 502 Bad Gateway\r\n\r\n") | |
| await writer.drain() | |
| writer.close() | |
| cdp_writer.close() | |
| return | |
| # Complete WebSocket handshake with the external client | |
| accept = _accept_key(ws_key) | |
| response = ( | |
| f"HTTP/1.1 101 Switching Protocols\r\n" | |
| f"Upgrade: websocket\r\n" | |
| f"Connection: Upgrade\r\n" | |
| f"Sec-WebSocket-Accept: {accept}\r\n" | |
| ) | |
| if ws_protocol: | |
| response += f"Sec-WebSocket-Protocol: {ws_protocol}\r\n" | |
| response += "\r\n" | |
| writer.write(response.encode()) | |
| await writer.drain() | |
| logger.info("CDP proxy session started") | |
| # Bidirectional proxy — when one direction ends, cancel the other | |
| t1 = asyncio.create_task( | |
| _proxy_ws( | |
| reader, | |
| cdp_writer, | |
| src_masked=True, | |
| dst_mask=True, | |
| label="client→cdp", | |
| ) | |
| ) | |
| t2 = asyncio.create_task( | |
| _proxy_ws( | |
| cdp_reader, | |
| writer, | |
| src_masked=False, | |
| dst_mask=False, | |
| label="cdp→client", | |
| ) | |
| ) | |
| try: | |
| done, pending = await asyncio.wait( | |
| {t1, t2}, return_when=asyncio.FIRST_COMPLETED | |
| ) | |
| for t in pending: | |
| t.cancel() | |
| # Await cancelled tasks to suppress warnings | |
| for t in pending: | |
| try: | |
| await t | |
| except asyncio.CancelledError: | |
| pass | |
| except Exception as exc: | |
| logger.debug("CDP proxy ended: %s", exc) | |
| finally: | |
| logger.info("CDP proxy session ended") | |
| cdp_writer.close() | |
| writer.close() | |