Spaces:
Paused
Paused
File size: 5,255 Bytes
d07b5c0 ea2be52 d07b5c0 ea2be52 d07b5c0 ea2be52 d07b5c0 ea2be52 d07b5c0 ea2be52 d07b5c0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 | /**
* WebSocket transport for the Codex Responses API.
*
* Opens a WebSocket to the backend, sends a `response.create` message,
* and wraps incoming JSON messages into an SSE-formatted ReadableStream.
* This lets parseStream() and all downstream consumers work identically
* regardless of whether HTTP SSE or WebSocket was used.
*
* Used when `previous_response_id` is present — HTTP SSE does not support it.
*
* The `ws` package is loaded lazily via dynamic import to avoid
* "Dynamic require of 'events' is not supported" errors when the
* backend is bundled as ESM for Electron (esbuild cannot convert
* ws's CJS require chain to ESM statics).
*/
import type { CodexInputItem } from "./codex-api.js";
/** Cached ws module — loaded once on first use. */
let _WS: typeof import("ws").default | undefined;
/** Lazily load the `ws` package. */
async function getWS(): Promise<typeof import("ws").default> {
if (!_WS) {
const mod = await import("ws");
_WS = mod.default;
}
return _WS;
}
/** Flat WebSocket message format expected by the Codex backend. */
export interface WsCreateRequest {
type: "response.create";
model: string;
instructions: string;
input: CodexInputItem[];
previous_response_id?: string;
reasoning?: { effort?: string; summary?: string };
tools?: unknown[];
tool_choice?: string | { type: string; name: string };
text?: {
format: {
type: "text" | "json_object" | "json_schema";
name?: string;
schema?: Record<string, unknown>;
strict?: boolean;
};
};
// NOTE: `store` and `stream` are intentionally omitted.
// The backend defaults to storing via WebSocket and always streams.
}
/**
* Open a WebSocket to the Codex backend, send `response.create`,
* and return a Response whose body is an SSE-formatted ReadableStream.
*
* The SSE format matches what parseStream() expects:
* event: <type>\ndata: <json>\n\n
*/
export async function createWebSocketResponse(
wsUrl: string,
headers: Record<string, string>,
request: WsCreateRequest,
signal?: AbortSignal,
proxyUrl?: string | null,
): Promise<Response> {
const WS = await getWS();
// Lazy-import proxy agent only when needed
const wsOpts: ConstructorParameters<typeof WS>[2] = { headers };
if (proxyUrl) {
const { HttpsProxyAgent } = await import("https-proxy-agent");
wsOpts.agent = new HttpsProxyAgent(proxyUrl);
}
return new Promise<Response>((resolve, reject) => {
if (signal?.aborted) {
reject(new Error("Aborted before WebSocket connect"));
return;
}
const ws = new WS(wsUrl, wsOpts);
const encoder = new TextEncoder();
let controller: ReadableStreamDefaultController<Uint8Array> | null = null;
let streamClosed = false;
let connected = false;
function closeStream() {
if (!streamClosed && controller) {
streamClosed = true;
try { controller.close(); } catch { /* already closed */ }
}
}
function errorStream(err: Error) {
if (!streamClosed && controller) {
streamClosed = true;
try { controller.error(err); } catch { /* already closed */ }
}
}
// Abort signal handling
const onAbort = () => {
ws.close(1000, "aborted");
if (!connected) {
reject(new Error("Aborted during WebSocket connect"));
}
};
signal?.addEventListener("abort", onAbort, { once: true });
const stream = new ReadableStream<Uint8Array>({
start(c) {
controller = c;
},
cancel() {
ws.close(1000, "stream cancelled");
},
});
ws.on("open", () => {
connected = true;
ws.send(JSON.stringify(request));
// Return the Response immediately — events will flow into the stream
const responseHeaders = new Headers({ "content-type": "text/event-stream" });
resolve(new Response(stream, { status: 200, headers: responseHeaders }));
});
ws.on("message", (data: Buffer | string) => {
if (streamClosed) return;
const raw = typeof data === "string" ? data : data.toString("utf-8");
try {
const msg = JSON.parse(raw) as Record<string, unknown>;
const type = (msg.type as string) ?? "unknown";
// Re-encode as SSE: event: <type>\ndata: <full json>\n\n
const sse = `event: ${type}\ndata: ${raw}\n\n`;
controller!.enqueue(encoder.encode(sse));
// Close stream after response.completed, response.failed, or error
if (type === "response.completed" || type === "response.failed" || type === "error") {
// Let the SSE chunk flush, then close
queueMicrotask(() => {
closeStream();
ws.close(1000);
});
}
} catch {
// Non-JSON message — emit as raw data
const sse = `data: ${raw}\n\n`;
controller!.enqueue(encoder.encode(sse));
}
});
ws.on("error", (err: Error) => {
signal?.removeEventListener("abort", onAbort);
if (!connected) {
reject(err);
} else {
errorStream(err);
}
});
ws.on("close", (_code: number, _reason: Buffer) => {
signal?.removeEventListener("abort", onAbort);
closeStream();
});
});
}
|