/** * WebSocket transport for the Codex Responses API. * * Opens a WebSocket to the backend, sends a `response.create` message, * and wraps incoming JSON messages into an SSE-formatted ReadableStream. * This lets parseStream() and all downstream consumers work identically * regardless of whether HTTP SSE or WebSocket was used. * * Used when `previous_response_id` is present — HTTP SSE does not support it. * * The `ws` package is loaded lazily via dynamic import to avoid * "Dynamic require of 'events' is not supported" errors when the * backend is bundled as ESM for Electron (esbuild cannot convert * ws's CJS require chain to ESM statics). */ import type { CodexInputItem } from "./codex-api.js"; /** Cached ws module — loaded once on first use. */ let _WS: typeof import("ws").default | undefined; /** Lazily load the `ws` package. */ async function getWS(): Promise { if (!_WS) { const mod = await import("ws"); _WS = mod.default; } return _WS; } /** Flat WebSocket message format expected by the Codex backend. */ export interface WsCreateRequest { type: "response.create"; model: string; instructions: string; input: CodexInputItem[]; previous_response_id?: string; reasoning?: { effort?: string; summary?: string }; tools?: unknown[]; tool_choice?: string | { type: string; name: string }; text?: { format: { type: "text" | "json_object" | "json_schema"; name?: string; schema?: Record; strict?: boolean; }; }; // NOTE: `store` and `stream` are intentionally omitted. // The backend defaults to storing via WebSocket and always streams. } /** * Open a WebSocket to the Codex backend, send `response.create`, * and return a Response whose body is an SSE-formatted ReadableStream. * * The SSE format matches what parseStream() expects: * event: \ndata: \n\n */ export async function createWebSocketResponse( wsUrl: string, headers: Record, request: WsCreateRequest, signal?: AbortSignal, proxyUrl?: string | null, ): Promise { const WS = await getWS(); // Lazy-import proxy agent only when needed const wsOpts: ConstructorParameters[2] = { headers }; if (proxyUrl) { const { HttpsProxyAgent } = await import("https-proxy-agent"); wsOpts.agent = new HttpsProxyAgent(proxyUrl); } return new Promise((resolve, reject) => { if (signal?.aborted) { reject(new Error("Aborted before WebSocket connect")); return; } const ws = new WS(wsUrl, wsOpts); const encoder = new TextEncoder(); let controller: ReadableStreamDefaultController | null = null; let streamClosed = false; let connected = false; function closeStream() { if (!streamClosed && controller) { streamClosed = true; try { controller.close(); } catch { /* already closed */ } } } function errorStream(err: Error) { if (!streamClosed && controller) { streamClosed = true; try { controller.error(err); } catch { /* already closed */ } } } // Abort signal handling const onAbort = () => { ws.close(1000, "aborted"); if (!connected) { reject(new Error("Aborted during WebSocket connect")); } }; signal?.addEventListener("abort", onAbort, { once: true }); const stream = new ReadableStream({ start(c) { controller = c; }, cancel() { ws.close(1000, "stream cancelled"); }, }); ws.on("open", () => { connected = true; ws.send(JSON.stringify(request)); // Return the Response immediately — events will flow into the stream const responseHeaders = new Headers({ "content-type": "text/event-stream" }); resolve(new Response(stream, { status: 200, headers: responseHeaders })); }); ws.on("message", (data: Buffer | string) => { if (streamClosed) return; const raw = typeof data === "string" ? data : data.toString("utf-8"); try { const msg = JSON.parse(raw) as Record; const type = (msg.type as string) ?? "unknown"; // Re-encode as SSE: event: \ndata: \n\n const sse = `event: ${type}\ndata: ${raw}\n\n`; controller!.enqueue(encoder.encode(sse)); // Close stream after response.completed, response.failed, or error if (type === "response.completed" || type === "response.failed" || type === "error") { // Let the SSE chunk flush, then close queueMicrotask(() => { closeStream(); ws.close(1000); }); } } catch { // Non-JSON message — emit as raw data const sse = `data: ${raw}\n\n`; controller!.enqueue(encoder.encode(sse)); } }); ws.on("error", (err: Error) => { signal?.removeEventListener("abort", onAbort); if (!connected) { reject(err); } else { errorStream(err); } }); ws.on("close", (_code: number, _reason: Buffer) => { signal?.removeEventListener("abort", onAbort); closeStream(); }); }); }