import { afterEach, beforeEach, expect, mock, test } from "bun:test" import type { ResponsesResult } from "../src/services/copilot/create-responses" type ListenerEvent = { data?: string error?: unknown message?: string } type Listener = (event: ListenerEvent) => void const originalClearTimeout = globalThis.clearTimeout const originalSetTimeout = globalThis.setTimeout class MockWebSocket { static readonly CONNECTING = 0 static readonly OPEN = 1 static readonly CLOSING = 2 static readonly CLOSED = 3 static autoComplete = true static closeAfterComplete = false static failOpen = false static failOpenEvent: ListenerEvent | null = null static instances: Array = [] readonly sent: Array = [] readonly init: { dispatcher?: unknown; headers?: Record } readonly url: string readyState = MockWebSocket.CONNECTING private readonly listeners = new Map>() constructor( url: string, init: { dispatcher?: unknown; headers?: Record }, ) { this.init = init this.url = url MockWebSocket.instances.push(this) originalSetTimeout(() => { if (MockWebSocket.failOpen) { this.readyState = MockWebSocket.CLOSED this.emit("error", MockWebSocket.failOpenEvent ?? {}) return } this.readyState = MockWebSocket.OPEN this.emit("open", {}) }, 0) } addEventListener(event: string, listener: Listener): void { const listeners = this.listeners.get(event) ?? new Set() listeners.add(listener) this.listeners.set(event, listeners) } removeEventListener(event: string, listener: Listener): void { this.listeners.get(event)?.delete(listener) } send(data: string): void { this.sent.push(data) if (MockWebSocket.autoComplete) { originalSetTimeout(() => { this.completeLatestResponse() }, 0) } } close(): void { if (this.readyState === MockWebSocket.CLOSED) { return } this.readyState = MockWebSocket.CLOSED this.emit("close", {}) } emitError(payload: ListenerEvent): void { this.emit("error", payload) } completeLatestResponse(): void { const latestSent = this.sent.at(-1) if (!latestSent) { throw new Error("No websocket request to complete") } const parsed = JSON.parse(latestSent) as { model: string } this.emit("message", { data: JSON.stringify({ response: createResponsesResult( parsed.model, `resp-${this.sent.length}`, ), sequence_number: 1, type: "response.completed", }), }) if (MockWebSocket.closeAfterComplete) { originalSetTimeout(() => { this.close() }, 0) } } private emit(event: string, payload: ListenerEvent): void { for (const listener of this.listeners.get(event) ?? []) { listener(payload) } } } class MockAgent { close(): Promise { return Promise.resolve() } destroy(): void {} } class MockProxyAgent extends MockAgent { readonly proxyUrl: string constructor(proxyUrl: string) { super() this.proxyUrl = proxyUrl } } const setGlobalDispatcherMock = mock((_dispatcher: unknown) => {}) await mock.module("undici", () => ({ Agent: MockAgent, ProxyAgent: MockProxyAgent, setGlobalDispatcher: setGlobalDispatcherMock, WebSocket: MockWebSocket, })) const { state } = await import("../src/lib/state") const { getProxyEnvDispatcher, initProxyFromEnv } = await import( "../src/lib/proxy" ) const { createResponses } = await import( "../src/services/copilot/create-responses" ) const originalState = { accountType: state.accountType, copilotApiUrl: state.copilotApiUrl, copilotToken: state.copilotToken, vsCodeDeviceId: state.vsCodeDeviceId, vsCodeVersion: state.vsCodeVersion, } const createResponsesResult = ( model: string, id = "resp-test", ): ResponsesResult => ({ created_at: 0, error: null, id, incomplete_details: null, instructions: null, metadata: null, model, object: "response", output: [], output_text: "", parallel_tool_calls: false, status: "completed", temperature: null, tool_choice: "auto", tools: [], top_p: null, usage: null, }) beforeEach(() => { MockWebSocket.autoComplete = true MockWebSocket.closeAfterComplete = false MockWebSocket.failOpen = false MockWebSocket.failOpenEvent = null MockWebSocket.instances = [] state.accountType = "individual" state.copilotApiUrl = "https://api.githubcopilot.com" state.copilotToken = "test-token" state.vsCodeDeviceId = "device-1" state.vsCodeVersion = "1.120.0" }) afterEach(() => { MockWebSocket.autoComplete = true MockWebSocket.closeAfterComplete = false MockWebSocket.failOpen = false MockWebSocket.failOpenEvent = null for (const websocket of MockWebSocket.instances) { websocket.close() } state.accountType = originalState.accountType state.copilotApiUrl = originalState.copilotApiUrl state.copilotToken = originalState.copilotToken state.vsCodeDeviceId = originalState.vsCodeDeviceId state.vsCodeVersion = originalState.vsCodeVersion ;( globalThis as unknown as { clearTimeout: typeof clearTimeout } ).clearTimeout = originalClearTimeout ;(globalThis as unknown as { setTimeout: typeof setTimeout }).setTimeout = originalSetTimeout }) test("Responses websocket pool reuses the same connection for matching pool keys", async () => { await collectResponsesStream("request-1") await collectResponsesStream("request-1") expect(MockWebSocket.instances).toHaveLength(1) expect(MockWebSocket.instances[0]?.sent).toHaveLength(2) }) test("Responses websocket open failure includes the underlying reason", async () => { MockWebSocket.failOpen = true MockWebSocket.failOpenEvent = { error: new Error("tls handshake failed"), } let thrown: unknown = null try { await collectResponsesStream("request-1") } catch (error) { thrown = error } expect(thrown).toBeInstanceOf(Error) expect((thrown as Error).message).toBe( "Failed to create responses websocket: tls handshake failed", ) }) test("Responses websocket pool separates different request IDs", async () => { await collectResponsesStream("request-1") await collectResponsesStream("request-2") expect(MockWebSocket.instances).toHaveLength(2) expect(MockWebSocket.instances[0]?.sent).toHaveLength(1) expect(MockWebSocket.instances[1]?.sent).toHaveLength(1) }) test("Responses websocket does not open until the stream is consumed", async () => { MockWebSocket.autoComplete = false const response = await createResponses( { input: "hello", model: "gpt-test", stream: true, }, { initiator: "user", requestId: "request-1", transport: "websocket", vision: false, }, ) expect(MockWebSocket.instances).toHaveLength(0) const iterator = (response as AsyncIterable)[Symbol.asyncIterator]() const firstChunk = iterator.next() await waitFor(() => MockWebSocket.instances[0]?.sent.length === 1) expect(MockWebSocket.instances).toHaveLength(1) expect(MockWebSocket.instances[0]?.sent).toHaveLength(1) MockWebSocket.instances[0]?.completeLatestResponse() await firstChunk await iterator.next() }) test("Responses websocket delayed concurrent streams still use dedicated connections", async () => { MockWebSocket.autoComplete = false const firstResponse = await createResponses( { input: "hello", model: "gpt-test", stream: true, }, { initiator: "user", requestId: "request-1", transport: "websocket", vision: false, }, ) const secondResponse = await createResponses( { input: "hello", model: "gpt-test", stream: true, }, { initiator: "user", requestId: "request-1", transport: "websocket", vision: false, }, ) expect(MockWebSocket.instances).toHaveLength(0) const firstIterator = (firstResponse as AsyncIterable)[ Symbol.asyncIterator ]() const secondIterator = (secondResponse as AsyncIterable)[ Symbol.asyncIterator ]() const firstChunk = firstIterator.next() await waitFor(() => MockWebSocket.instances[0]?.sent.length === 1) const secondChunk = secondIterator.next() await waitFor( () => MockWebSocket.instances.length === 2 && MockWebSocket.instances[1]?.sent.length === 1, ) expect(MockWebSocket.instances[0]?.sent).toHaveLength(1) expect(MockWebSocket.instances[1]?.sent).toHaveLength(1) MockWebSocket.instances[1]?.completeLatestResponse() await secondChunk await secondIterator.next() MockWebSocket.instances[0]?.completeLatestResponse() await firstChunk await firstIterator.next() }) test("Responses websocket concurrent request bypasses the pool without closing the previous websocket", async () => { MockWebSocket.autoComplete = false const firstResponse = await createResponses( { input: "hello", model: "gpt-test", stream: true, }, { initiator: "user", requestId: "request-1", transport: "websocket", vision: false, }, ) const firstIterator = (firstResponse as AsyncIterable)[ Symbol.asyncIterator ]() const firstChunk = firstIterator.next() await waitFor(() => MockWebSocket.instances[0]?.sent.length === 1) const secondPromise = collectResponsesStream("request-1") await waitFor( () => MockWebSocket.instances.length === 2 && MockWebSocket.instances[1]?.sent.length === 1, ) expect(MockWebSocket.instances[0]?.readyState).toBe(MockWebSocket.OPEN) expect(MockWebSocket.instances[0]?.sent).toHaveLength(1) expect(MockWebSocket.instances[1]?.sent).toHaveLength(1) MockWebSocket.instances[1]?.completeLatestResponse() await secondPromise expect(MockWebSocket.instances[0]?.readyState).toBe(MockWebSocket.OPEN) MockWebSocket.instances[0]?.completeLatestResponse() await firstChunk await firstIterator.next() }) test("Responses websocket multiple concurrent requests each use a dedicated connection", async () => { MockWebSocket.autoComplete = false const firstResponse = await createResponses( { input: "hello", model: "gpt-test", stream: true, }, { initiator: "user", requestId: "request-1", transport: "websocket", vision: false, }, ) const firstIterator = (firstResponse as AsyncIterable)[ Symbol.asyncIterator ]() const firstChunk = firstIterator.next() await waitFor(() => MockWebSocket.instances[0]?.sent.length === 1) const secondPromise = collectResponsesStream("request-1") const thirdPromise = collectResponsesStream("request-1") await waitFor( () => MockWebSocket.instances.length === 3 && MockWebSocket.instances[1]?.sent.length === 1, ) expect(MockWebSocket.instances[2]?.sent).toHaveLength(1) MockWebSocket.instances[1]?.completeLatestResponse() await secondPromise expect(MockWebSocket.instances[0]?.sent).toHaveLength(1) expect(MockWebSocket.instances[1]?.sent).toHaveLength(1) expect(MockWebSocket.instances[2]?.sent).toHaveLength(1) MockWebSocket.instances[2]?.completeLatestResponse() await thirdPromise MockWebSocket.instances[0]?.completeLatestResponse() await firstChunk await firstIterator.next() }) test("Responses websocket sequential request reuses the pooled connection after concurrent work completes", async () => { MockWebSocket.autoComplete = false const firstResponse = await createResponses( { input: "hello", model: "gpt-test", stream: true, }, { initiator: "user", requestId: "request-1", transport: "websocket", vision: false, }, ) const firstIterator = (firstResponse as AsyncIterable)[ Symbol.asyncIterator ]() const firstChunk = firstIterator.next() await waitFor(() => MockWebSocket.instances[0]?.sent.length === 1) const secondPromise = collectResponsesStream("request-1") await waitFor( () => MockWebSocket.instances.length === 2 && MockWebSocket.instances[1]?.sent.length === 1, ) MockWebSocket.instances[1]?.completeLatestResponse() await secondPromise MockWebSocket.instances[0]?.completeLatestResponse() await firstChunk await firstIterator.next() const thirdPromise = collectResponsesStream("request-1") await waitFor(() => MockWebSocket.instances[0]?.sent.length === 2) expect(MockWebSocket.instances).toHaveLength(2) expect(MockWebSocket.instances[1]?.sent).toHaveLength(1) MockWebSocket.instances[0]?.completeLatestResponse() await thirdPromise }) test("Responses websocket stream failure includes the underlying reason", async () => { MockWebSocket.autoComplete = false const streamPromise = collectResponsesStream("request-1") await waitFor(() => MockWebSocket.instances[0]?.sent.length === 1) MockWebSocket.instances[0]?.emitError({ error: new Error("socket hang up"), }) let thrown: unknown = null try { await streamPromise } catch (error) { thrown = error } expect(thrown).toBeInstanceOf(Error) expect((thrown as Error).message).toBe( "Responses websocket stream error: socket hang up", ) }) test("Responses websocket uses the proxy-env dispatcher when initialized", async () => { const originalHttpProxy = process.env.HTTP_PROXY process.env.HTTP_PROXY = "http://127.0.0.1:8080" try { initProxyFromEnv() const dispatcher = getProxyEnvDispatcher() await collectResponsesStream("proxy-request") expect(dispatcher).toBeDefined() expect(MockWebSocket.instances[0]?.init.dispatcher).toBe(dispatcher) } finally { if (originalHttpProxy === undefined) { delete process.env.HTTP_PROXY } else { process.env.HTTP_PROXY = originalHttpProxy } } }) const collectResponsesStream = async (requestId: string): Promise => { const response = await createResponses( { input: "hello", model: "gpt-test", stream: true, }, { initiator: "user", requestId, transport: "websocket", vision: false, }, ) for await (const _chunk of response as AsyncIterable) { // consume stream } } const waitFor = async (predicate: () => boolean): Promise => { for (let attempt = 0; attempt < 20; attempt += 1) { if (predicate()) { return } await new Promise((resolve) => { originalSetTimeout(resolve, 0) }) } throw new Error("Timed out waiting for condition") }