"""CDP WebSocket proxy — expose browser CDP over a single port. Handles WebSocket upgrade requests to ``/cdp`` and proxies CDP messages between the external client and an internal Chromium CDP endpoint. """ from __future__ import annotations import asyncio import base64 import hashlib import logging import os import struct from collections.abc import Callable, Coroutine from typing import Any from veilrender.auth import verify_token from veilrender._vendor.httpserver import Request logger = logging.getLogger(__name__) # WebSocket opcodes _OP_TEXT = 0x1 _OP_BINARY = 0x2 _OP_CLOSE = 0x8 _OP_PING = 0x9 _OP_PONG = 0xA _WS_MAGIC = b"258EAFA5-E914-47DA-95CA-5B0D11CF9245" # Maximum allowed WebSocket frame payload size (16 MB). # Frames exceeding this limit are rejected to prevent OOM from # malicious headers claiming excessively large payloads. MAX_FRAME_SIZE = 16 * 1024 * 1024 def _accept_key(key: str) -> str: """Compute Sec-WebSocket-Accept from Sec-WebSocket-Key.""" digest = hashlib.sha1(key.encode() + _WS_MAGIC).digest() return base64.b64encode(digest).decode() async def _read_ws_frame( reader: asyncio.StreamReader, ) -> tuple[bool, int, bytes] | None: """Read one WebSocket frame. Returns (fin, opcode, payload) or None on EOF.""" try: head = await reader.readexactly(2) except (asyncio.IncompleteReadError, ConnectionError): return None fin = bool(head[0] & 0x80) opcode = head[0] & 0x0F masked = bool(head[1] & 0x80) length = head[1] & 0x7F if length == 126: raw = await reader.readexactly(2) length = struct.unpack("!H", raw)[0] elif length == 127: raw = await reader.readexactly(8) length = struct.unpack("!Q", raw)[0] if length > MAX_FRAME_SIZE: logger.warning("WebSocket frame too large (%d bytes), dropping", length) return None if masked: mask = await reader.readexactly(4) data = bytearray(await reader.readexactly(length)) for i in range(length): data[i] ^= mask[i % 4] return fin, opcode, bytes(data) else: data = await reader.readexactly(length) return fin, opcode, data def _make_ws_frame( opcode: int, payload: bytes, *, fin: bool = True, mask: bool = False ) -> bytes: """Build a WebSocket frame.""" frame = bytearray() frame.append((0x80 if fin else 0x00) | opcode) length = len(payload) if length < 126: frame.append((0x80 if mask else 0) | length) elif length < 65536: frame.append((0x80 if mask else 0) | 126) frame.extend(struct.pack("!H", length)) else: frame.append((0x80 if mask else 0) | 127) frame.extend(struct.pack("!Q", length)) if mask: mask_key = os.urandom(4) frame.extend(mask_key) masked = bytearray(payload) for i in range(length): masked[i] ^= mask_key[i % 4] frame.extend(masked) else: frame.extend(payload) return bytes(frame) async def _proxy_ws( src_reader: asyncio.StreamReader, dst_writer: asyncio.StreamWriter, *, src_masked: bool, dst_mask: bool, label: str, ) -> None: """Forward WebSocket frames from src to dst until close/EOF.""" while True: result = await _read_ws_frame(src_reader) if result is None: break fin, opcode, payload = result if opcode == _OP_CLOSE: frame = _make_ws_frame(_OP_CLOSE, payload, fin=True, mask=dst_mask) dst_writer.write(frame) await dst_writer.drain() break elif opcode == _OP_PING: # Forward pings as-is to the other side frame = _make_ws_frame(_OP_PING, payload, fin=True, mask=dst_mask) dst_writer.write(frame) await dst_writer.drain() else: frame = _make_ws_frame(opcode, payload, fin=fin, mask=dst_mask) dst_writer.write(frame) await dst_writer.drain() def is_websocket_upgrade(method: str, headers: dict[str, str], path: str) -> bool: """Check if this HTTP request is a WebSocket upgrade to /cdp.""" if not path.startswith("/cdp"): return False upgrade = headers.get("upgrade", "").lower() connection = headers.get("connection", "").lower() return method == "GET" and "websocket" in upgrade and "upgrade" in connection async def handle_cdp_upgrade( reader: asyncio.StreamReader, writer: asyncio.StreamWriter, headers: dict[str, str], path: str, query_string: str, get_cdp_url: Callable[[], Coroutine[Any, Any, str | None]], ) -> None: """Handle a WebSocket upgrade for CDP proxying. Args: reader: Client's stream reader. writer: Client's stream writer. headers: Parsed HTTP headers from the upgrade request. path: Request path. query_string: Raw query string. get_cdp_url: Async callable that returns the internal CDP WebSocket URL, or None if no browser is available. """ # Auth check — build a minimal Request object for verify_token try: req = Request( method="GET", path=path, query_string=query_string, headers=headers, body=b"", client_addr=writer.get_extra_info("peername") or ("0.0.0.0", 0), ) verify_token(req) except Exception: writer.write(b"HTTP/1.1 401 Unauthorized\r\n\r\n") await writer.drain() writer.close() return # Get WebSocket accept key ws_key = headers.get("sec-websocket-key", "") if not ws_key: writer.write(b"HTTP/1.1 400 Bad Request\r\n\r\nMissing Sec-WebSocket-Key\r\n") await writer.drain() writer.close() return # Get internal CDP URL cdp_url = await get_cdp_url() if cdp_url is None: writer.write( b"HTTP/1.1 503 Service Unavailable\r\n\r\nNo browser available\r\n" ) await writer.drain() writer.close() return # Connect to internal CDP # Parse ws://host:port/path from cdp_url from urllib.parse import urlparse parsed = urlparse(cdp_url) cdp_host = parsed.hostname or "127.0.0.1" cdp_port = parsed.port or 9222 cdp_path = parsed.path or "/" try: cdp_reader, cdp_writer = await asyncio.open_connection(cdp_host, cdp_port) except Exception as exc: logger.error("Failed to connect to CDP at %s: %s", cdp_url, exc) writer.write(b"HTTP/1.1 502 Bad Gateway\r\n\r\n") await writer.drain() writer.close() return # WebSocket handshake with internal CDP cdp_ws_key = base64.b64encode(os.urandom(16)).decode() ws_protocol = headers.get("sec-websocket-protocol", "") handshake = ( f"GET {cdp_path} HTTP/1.1\r\n" f"Host: {cdp_host}:{cdp_port}\r\n" f"Upgrade: websocket\r\n" f"Connection: Upgrade\r\n" f"Sec-WebSocket-Version: 13\r\n" f"Sec-WebSocket-Key: {cdp_ws_key}\r\n" ) if ws_protocol: handshake += f"Sec-WebSocket-Protocol: {ws_protocol}\r\n" handshake += "\r\n" cdp_writer.write(handshake.encode()) await cdp_writer.drain() # Read CDP handshake response cdp_response = b"" while b"\r\n\r\n" not in cdp_response: chunk = await cdp_reader.read(4096) if not chunk: logger.error("CDP handshake failed: connection closed") writer.write(b"HTTP/1.1 502 Bad Gateway\r\n\r\n") await writer.drain() writer.close() cdp_writer.close() return cdp_response += chunk if b"101" not in cdp_response.split(b"\r\n")[0]: logger.error("CDP handshake failed: %s", cdp_response[:200]) writer.write(b"HTTP/1.1 502 Bad Gateway\r\n\r\n") await writer.drain() writer.close() cdp_writer.close() return # Complete WebSocket handshake with the external client accept = _accept_key(ws_key) response = ( f"HTTP/1.1 101 Switching Protocols\r\n" f"Upgrade: websocket\r\n" f"Connection: Upgrade\r\n" f"Sec-WebSocket-Accept: {accept}\r\n" ) if ws_protocol: response += f"Sec-WebSocket-Protocol: {ws_protocol}\r\n" response += "\r\n" writer.write(response.encode()) await writer.drain() logger.info("CDP proxy session started") # Bidirectional proxy — when one direction ends, cancel the other t1 = asyncio.create_task( _proxy_ws( reader, cdp_writer, src_masked=True, dst_mask=True, label="client→cdp", ) ) t2 = asyncio.create_task( _proxy_ws( cdp_reader, writer, src_masked=False, dst_mask=False, label="cdp→client", ) ) try: done, pending = await asyncio.wait( {t1, t2}, return_when=asyncio.FIRST_COMPLETED ) for t in pending: t.cancel() # Await cancelled tasks to suppress warnings for t in pending: try: await t except asyncio.CancelledError: pass except Exception as exc: logger.debug("CDP proxy ended: %s", exc) finally: logger.info("CDP proxy session ended") cdp_writer.close() writer.close()