File size: 5,255 Bytes
d07b5c0
 
 
 
 
 
 
 
 
ea2be52
 
 
 
 
d07b5c0
 
 
 
ea2be52
 
 
 
 
 
 
 
 
 
 
 
d07b5c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ea2be52
d07b5c0
 
 
 
 
 
ea2be52
 
 
 
 
 
 
 
 
d07b5c0
 
 
 
 
 
ea2be52
d07b5c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/**
 * WebSocket transport for the Codex Responses API.
 *
 * Opens a WebSocket to the backend, sends a `response.create` message,
 * and wraps incoming JSON messages into an SSE-formatted ReadableStream.
 * This lets parseStream() and all downstream consumers work identically
 * regardless of whether HTTP SSE or WebSocket was used.
 *
 * Used when `previous_response_id` is present — HTTP SSE does not support it.
 *
 * The `ws` package is loaded lazily via dynamic import to avoid
 * "Dynamic require of 'events' is not supported" errors when the
 * backend is bundled as ESM for Electron (esbuild cannot convert
 * ws's CJS require chain to ESM statics).
 */

import type { CodexInputItem } from "./codex-api.js";

/** Cached ws module — loaded once on first use. */
let _WS: typeof import("ws").default | undefined;

/** Lazily load the `ws` package. */
async function getWS(): Promise<typeof import("ws").default> {
  if (!_WS) {
    const mod = await import("ws");
    _WS = mod.default;
  }
  return _WS;
}

/** Flat WebSocket message format expected by the Codex backend. */
export interface WsCreateRequest {
  type: "response.create";
  model: string;
  instructions: string;
  input: CodexInputItem[];
  previous_response_id?: string;
  reasoning?: { effort?: string; summary?: string };
  tools?: unknown[];
  tool_choice?: string | { type: string; name: string };
  text?: {
    format: {
      type: "text" | "json_object" | "json_schema";
      name?: string;
      schema?: Record<string, unknown>;
      strict?: boolean;
    };
  };
  // NOTE: `store` and `stream` are intentionally omitted.
  // The backend defaults to storing via WebSocket and always streams.
}

/**
 * Open a WebSocket to the Codex backend, send `response.create`,
 * and return a Response whose body is an SSE-formatted ReadableStream.
 *
 * The SSE format matches what parseStream() expects:
 *   event: <type>\ndata: <json>\n\n
 */
export async function createWebSocketResponse(
  wsUrl: string,
  headers: Record<string, string>,
  request: WsCreateRequest,
  signal?: AbortSignal,
  proxyUrl?: string | null,
): Promise<Response> {
  const WS = await getWS();

  // Lazy-import proxy agent only when needed
  const wsOpts: ConstructorParameters<typeof WS>[2] = { headers };
  if (proxyUrl) {
    const { HttpsProxyAgent } = await import("https-proxy-agent");
    wsOpts.agent = new HttpsProxyAgent(proxyUrl);
  }

  return new Promise<Response>((resolve, reject) => {
    if (signal?.aborted) {
      reject(new Error("Aborted before WebSocket connect"));
      return;
    }

    const ws = new WS(wsUrl, wsOpts);
    const encoder = new TextEncoder();
    let controller: ReadableStreamDefaultController<Uint8Array> | null = null;
    let streamClosed = false;
    let connected = false;

    function closeStream() {
      if (!streamClosed && controller) {
        streamClosed = true;
        try { controller.close(); } catch { /* already closed */ }
      }
    }

    function errorStream(err: Error) {
      if (!streamClosed && controller) {
        streamClosed = true;
        try { controller.error(err); } catch { /* already closed */ }
      }
    }

    // Abort signal handling
    const onAbort = () => {
      ws.close(1000, "aborted");
      if (!connected) {
        reject(new Error("Aborted during WebSocket connect"));
      }
    };
    signal?.addEventListener("abort", onAbort, { once: true });

    const stream = new ReadableStream<Uint8Array>({
      start(c) {
        controller = c;
      },
      cancel() {
        ws.close(1000, "stream cancelled");
      },
    });

    ws.on("open", () => {
      connected = true;
      ws.send(JSON.stringify(request));

      // Return the Response immediately — events will flow into the stream
      const responseHeaders = new Headers({ "content-type": "text/event-stream" });
      resolve(new Response(stream, { status: 200, headers: responseHeaders }));
    });

    ws.on("message", (data: Buffer | string) => {
      if (streamClosed) return;
      const raw = typeof data === "string" ? data : data.toString("utf-8");

      try {
        const msg = JSON.parse(raw) as Record<string, unknown>;
        const type = (msg.type as string) ?? "unknown";

        // Re-encode as SSE: event: <type>\ndata: <full json>\n\n
        const sse = `event: ${type}\ndata: ${raw}\n\n`;
        controller!.enqueue(encoder.encode(sse));

        // Close stream after response.completed, response.failed, or error
        if (type === "response.completed" || type === "response.failed" || type === "error") {
          // Let the SSE chunk flush, then close
          queueMicrotask(() => {
            closeStream();
            ws.close(1000);
          });
        }
      } catch {
        // Non-JSON message — emit as raw data
        const sse = `data: ${raw}\n\n`;
        controller!.enqueue(encoder.encode(sse));
      }
    });

    ws.on("error", (err: Error) => {
      signal?.removeEventListener("abort", onAbort);
      if (!connected) {
        reject(err);
      } else {
        errorStream(err);
      }
    });

    ws.on("close", (_code: number, _reason: Buffer) => {
      signal?.removeEventListener("abort", onAbort);
      closeStream();
    });
  });
}