import consola from "consola" import { events } from "fetch-event-stream" import { createHash } from "node:crypto" import { WebSocket } from "undici" import type { SubagentMarker } from "~/lib/subagent" import { copilotBaseUrl, copilotHeaders, copilotWebSocketHeaders, prepareForCompact, prepareInteractionHeaders, } from "~/lib/api-config" import { COMPACT_REQUEST, type CompactType } from "~/lib/compact" import { logCopilotQuotaSnapshots, logCopilotRateLimits, type CopilotQuotaSnapshot, } from "~/lib/copilot-rate-limit" import { HTTPError } from "~/lib/error" import { getProxyEnvDispatcher } from "~/lib/proxy" import { state } from "~/lib/state" export interface ResponsesPayload { model: string instructions?: string | null input?: string | Array tools?: Array | null tool_choice?: ToolChoiceOptions | ToolChoiceFunction temperature?: number | null top_p?: number | null max_output_tokens?: number | null metadata?: Metadata | null stream?: boolean | null safety_identifier?: string | null prompt_cache_key?: string | null prompt_cache_retention?: "in_memory" | "24h" | null parallel_tool_calls?: boolean | null store?: boolean | null reasoning?: Reasoning | null context_management?: Array | null include?: Array service_tier?: string | null // NOTE: Unsupported by GitHub Copilot [key: string]: unknown } export type ToolChoiceOptions = "none" | "auto" | "required" export type ToolSearchExecution = "client" | "server" export interface ToolChoiceFunction { name: string type: "function" } export type Tool = | FunctionTool | ToolSearchTool | NamespaceTool | Record export interface FunctionTool { name: string parameters: { [key: string]: unknown } | null strict: boolean | null type: "function" description?: string | null defer_loading?: boolean | null } export interface ToolSearchTool { type: "tool_search" execution?: ToolSearchExecution | null description?: string | null parameters?: { [key: string]: unknown } | null } export interface NamespaceTool { type: "namespace" name: string description?: string | null tools: Array } export type ResponseIncludable = | "file_search_call.results" | "message.input_image.image_url" | "computer_call_output.output.image_url" | "reasoning.encrypted_content" | "code_interpreter_call.outputs" export interface Reasoning { effort?: "none" | "minimal" | "low" | "medium" | "high" | "xhigh" | null summary?: "auto" | "concise" | "detailed" | null } export interface ResponseContextManagementCompactionItem { type: "compaction" compact_threshold: number } export type ResponseContextManagementItem = ResponseContextManagementCompactionItem export interface ResponseInputMessage { type?: "message" role: "user" | "assistant" | "system" | "developer" content?: string | Array status?: string phase?: "commentary" | "final_answer" } export interface ResponseFunctionToolCallItem { type: "function_call" call_id: string name: string arguments: string status?: "in_progress" | "completed" | "incomplete" namespace?: string | null } export interface ResponseFunctionCallOutputItem { type: "function_call_output" call_id: string output: string | Array status?: "in_progress" | "completed" | "incomplete" } export interface ResponseToolSearchCallItem { type: "tool_search_call" call_id: string arguments: Record | string execution?: ToolSearchExecution | null status?: "in_progress" | "completed" | "incomplete" } export interface ResponseToolSearchOutputItem { type: "tool_search_output" call_id: string tools: Array execution?: ToolSearchExecution | null status?: "in_progress" | "completed" | "incomplete" } export interface ResponseInputReasoning { id?: string type: "reasoning" summary: Array<{ type: "summary_text" text: string }> encrypted_content: string } export interface ResponseInputCompaction { id: string type: "compaction" encrypted_content: string } export type ResponseInputItem = | ResponseInputMessage | ResponseFunctionToolCallItem | ResponseFunctionCallOutputItem | ResponseToolSearchCallItem | ResponseToolSearchOutputItem | ResponseInputReasoning | ResponseInputCompaction | Record export type ResponseInputContent = | ResponseInputText | ResponseInputImage | ResponseInputFile | Record export interface ResponseInputText { type: "input_text" | "output_text" text: string } export interface ResponseInputImage { type: "input_image" image_url?: string | null file_id?: string | null detail: "low" | "high" | "auto" } export interface ResponseInputFile { type: "input_file" file_data?: string | null file_id?: string | null filename?: string | null } export interface ResponsesResult { id: string object: "response" created_at: number model: string output: Array output_text: string status: string usage?: ResponseUsage | null error: ResponseError | null incomplete_details: IncompleteDetails | null instructions: string | null metadata: Metadata | null parallel_tool_calls: boolean temperature: number | null tool_choice: unknown tools: Array top_p: number | null } export type Metadata = { [key: string]: string } export interface IncompleteDetails { reason?: "max_output_tokens" | "content_filter" } export interface ResponseError { message: string } export type ResponseOutputItem = | ResponseOutputMessage | ResponseOutputReasoning | ResponseOutputFunctionCall | ResponseOutputToolSearchCall | ResponseOutputToolSearchOutput | ResponseOutputCompaction export interface ResponseOutputMessage { id: string type: "message" role: "assistant" status: "completed" | "in_progress" | "incomplete" content?: Array } export interface ResponseOutputReasoning { id: string type: "reasoning" summary?: Array encrypted_content?: string status?: "completed" | "in_progress" | "incomplete" } export interface ResponseReasoningBlock { type: string text?: string } export interface ResponseOutputFunctionCall { id?: string type: "function_call" call_id: string name: string arguments: string status?: "in_progress" | "completed" | "incomplete" namespace?: string | null } export interface ResponseOutputToolSearchCall { id?: string type: "tool_search_call" call_id: string arguments: Record | string execution?: ToolSearchExecution | null status?: "in_progress" | "completed" | "incomplete" } export interface ResponseOutputToolSearchOutput { id?: string type: "tool_search_output" call_id: string tools: Array execution?: ToolSearchExecution | null status?: "in_progress" | "completed" | "incomplete" } export interface ResponseOutputCompaction { id: string type: "compaction" encrypted_content: string } export type ResponseOutputContentBlock = | ResponseOutputText | ResponseOutputRefusal | Record export interface ResponseOutputText { type: "output_text" text: string annotations: Array } export interface ResponseOutputRefusal { type: "refusal" refusal: string } export interface ResponseUsage { input_tokens: number output_tokens?: number total_tokens: number input_tokens_details?: { cached_tokens: number } output_tokens_details?: { reasoning_tokens: number } } export type ResponseStreamEvent = | ResponseCompletedEvent | ResponseIncompleteEvent | ResponseCreatedEvent | ResponseErrorEvent | ResponseFunctionCallArgumentsDeltaEvent | ResponseFunctionCallArgumentsDoneEvent | ResponseFailedEvent | ResponseOutputItemAddedEvent | ResponseOutputItemDoneEvent | ResponseReasoningSummaryTextDeltaEvent | ResponseReasoningSummaryTextDoneEvent | ResponseTextDeltaEvent | ResponseTextDoneEvent export interface ResponseCompletedEvent { copilot_quota_snapshots?: Record response: ResponsesResult sequence_number: number type: "response.completed" } export interface ResponseIncompleteEvent { response: ResponsesResult sequence_number: number type: "response.incomplete" } export interface ResponseCreatedEvent { response: ResponsesResult sequence_number: number type: "response.created" } export interface ResponseErrorEvent { code: string | null message: string param: string | null sequence_number: number type: "error" } export interface ResponseFunctionCallArgumentsDeltaEvent { delta: string item_id: string output_index: number sequence_number: number type: "response.function_call_arguments.delta" } export interface ResponseFunctionCallArgumentsDoneEvent { arguments: string item_id: string name: string output_index: number sequence_number: number type: "response.function_call_arguments.done" } export interface ResponseFailedEvent { response: ResponsesResult sequence_number: number type: "response.failed" } export interface ResponseOutputItemAddedEvent { item: ResponseOutputItem output_index: number sequence_number: number type: "response.output_item.added" } export interface ResponseOutputItemDoneEvent { item: ResponseOutputItem output_index: number sequence_number: number type: "response.output_item.done" } export interface ResponseReasoningSummaryTextDeltaEvent { delta: string item_id: string output_index: number sequence_number: number summary_index: number type: "response.reasoning_summary_text.delta" } export interface ResponseReasoningSummaryTextDoneEvent { item_id: string output_index: number sequence_number: number summary_index: number text: string type: "response.reasoning_summary_text.done" } export interface ResponseTextDeltaEvent { content_index: number delta: string item_id: string output_index: number sequence_number: number type: "response.output_text.delta" } export interface ResponseTextDoneEvent { content_index: number item_id: string output_index: number sequence_number: number text: string type: "response.output_text.done" } export type ResponsesStream = ReturnType export type CreateResponsesReturn = ResponsesResult | ResponsesStream export type ResponsesTransport = "http" | "websocket" interface ResponsesRequestOptions { vision: boolean initiator: "agent" | "user" subagentMarker?: SubagentMarker | null requestId: string sessionId?: string compactType?: CompactType transport?: ResponsesTransport } const RESPONSES_WEBSOCKET_IDLE_TIMEOUT_MS = 60_000 export const createResponses = async ( payload: ResponsesPayload, { vision, initiator, subagentMarker, requestId, sessionId, compactType, transport = "http", }: ResponsesRequestOptions, ): Promise => { if (!state.copilotToken) throw new Error("Copilot token not found") const headers: Record = { ...copilotHeaders(state, requestId, vision), "x-initiator": initiator, } prepareInteractionHeaders(sessionId, Boolean(subagentMarker), headers) prepareForCompact(headers, compactType) // service_tier is not supported by github copilot payload.service_tier = undefined consola.log(`<-- model: ${payload.model}`) const effectiveTransport = compactType === COMPACT_REQUEST ? "http" : transport if (effectiveTransport === "websocket") { const websocketRequest = prepareResponsesWebSocketRequest( payload, headers, { requestId, subagentMarker, }, ) const stream = createPooledResponsesWebSocketStream(websocketRequest) if (payload.stream) { return stream } return await consumeResponsesWebSocketStream(stream) } return await createHttpResponses(payload, headers) } const createHttpResponses = async ( payload: ResponsesPayload, headers: Record, ): Promise => { const response = await fetch(`${copilotBaseUrl(state)}/responses`, { method: "POST", headers, body: JSON.stringify(payload), }) logCopilotRateLimits(response.headers) if (!response.ok) { consola.error("Failed to create responses", response) throw new HTTPError("Failed to create responses", response) } if (payload.stream) { return events(response) } return (await response.json()) as ResponsesResult } type ResponsesWebSocketPayload = ResponsesPayload & { type: "response.create" initiator: "agent" | "user" } interface ResponsesWebSocketRequest { headers: Record poolKey: string payload: ResponsesWebSocketPayload } type ResponsesWebSocketErrorEvent = Parameters< NonNullable["onerror"]> >[0] export const prepareResponsesWebSocketRequest = ( payload: ResponsesPayload, preparedHeaders: Record, options: { requestId: string subagentMarker?: SubagentMarker | null }, ): ResponsesWebSocketRequest => { const initiator = getResponsesWebSocketInitiator(preparedHeaders) return { headers: copilotWebSocketHeaders(preparedHeaders), poolKey: buildResponsesWebSocketPoolKey(payload, options), payload: buildResponsesWebSocketPayload(payload, initiator), } } export const buildResponsesWebSocketPoolKey = ( payload: ResponsesPayload, { requestId, subagentMarker, }: { requestId: string subagentMarker?: SubagentMarker | null }, ): string => { const tokenFingerprint = state.copilotToken ? createHash("sha256").update(state.copilotToken).digest("hex").slice(0, 16) : "missing-token" const subagentKey = subagentMarker ? [ subagentMarker.session_id, subagentMarker.agent_id, subagentMarker.agent_type, ].join(":") : "main" return [tokenFingerprint, payload.model, requestId, subagentKey] .map(encodePoolKeyPart) .join("|") } export const getResponsesWebSocketInitiator = ( preparedHeaders: Record, ): "agent" | "user" => { const initiator = getHeaderValue(preparedHeaders, "x-initiator") return initiator?.toLowerCase() === "agent" ? "agent" : "user" } const createPooledResponsesWebSocketStream = ( request: ResponsesWebSocketRequest, ): ResponsesStream => runResponsesWebSocketRequest(request) export const buildResponsesWebSocketPayload = ( payload: ResponsesPayload, initiator: "agent" | "user", ): ResponsesWebSocketPayload => { const websocketPayload: ResponsesWebSocketPayload = { ...payload, type: "response.create", initiator, } delete websocketPayload.stream delete websocketPayload["background"] delete websocketPayload.service_tier return websocketPayload } export const buildResponsesWebSocketUrl = (baseUrl: string): string => { const url = new URL(`${baseUrl.replace(/\/+$/u, "")}/responses`) if (url.protocol === "https:") { url.protocol = "wss:" } else if (url.protocol === "http:") { url.protocol = "ws:" } return url.toString() } const responsesWebSocketPool = new Map() const responsesWebSocketActiveRequests = new Map() interface ResponsesWebSocketEntry { closed: boolean idleTimer: ReturnType | null requestCount: number websocketPromise: Promise> } interface ResponsesWebSocketRequestTarget { entry: ResponsesWebSocketEntry pooled: boolean } const runResponsesWebSocketRequest = async function* ( request: ResponsesWebSocketRequest, ): ResponsesStream { const { entry, pooled } = getResponsesWebSocketRequestTarget(request) const release = acquireResponsesWebSocketEntry(request.poolKey, entry, pooled) try { const websocket = await getReadyResponsesWebSocket( request.poolKey, entry, pooled, ) websocket.send(JSON.stringify(request.payload)) for await (const data of createWebSocketMessageStream(websocket)) { const chunk = createResponsesWebSocketStreamChunk(data) yield chunk if (isTerminalResponsesStreamChunk(chunk)) { return } } removeResponsesWebSocketPoolEntry(request.poolKey, entry) throw new Error("Responses websocket ended without a terminal response") } catch (error) { removeResponsesWebSocketPoolEntry(request.poolKey, entry) throw toError(error) } finally { release() } } const getResponsesWebSocketRequestTarget = ( request: ResponsesWebSocketRequest, ): ResponsesWebSocketRequestTarget => { if (getResponsesWebSocketActiveRequestCount(request.poolKey) > 0) { return { entry: createResponsesWebSocketEntry(request), pooled: false, } } const existing = responsesWebSocketPool.get(request.poolKey) if (existing && !existing.closed) { clearResponsesWebSocketIdleTimer(existing) return { entry: existing, pooled: true, } } const entry = createResponsesWebSocketEntry(request) responsesWebSocketPool.set(request.poolKey, entry) return { entry, pooled: true, } } const createResponsesWebSocketEntry = ( request: ResponsesWebSocketRequest, ): ResponsesWebSocketEntry => { const entry: ResponsesWebSocketEntry = { closed: false, idleTimer: null, requestCount: 0, websocketPromise: openResponsesWebSocket({ headers: request.headers, url: buildResponsesWebSocketUrl(copilotBaseUrl(state)), }), } entry.websocketPromise .then((websocket) => { websocket.addEventListener("close", () => { removeResponsesWebSocketPoolEntry(request.poolKey, entry) }) websocket.addEventListener("error", () => { removeResponsesWebSocketPoolEntry(request.poolKey, entry) }) }) .catch(() => { removeResponsesWebSocketPoolEntry(request.poolKey, entry) }) return entry } const acquireResponsesWebSocketEntry = ( poolKey: string, entry: ResponsesWebSocketEntry, pooled: boolean, ): (() => void) => { clearResponsesWebSocketIdleTimer(entry) incrementResponsesWebSocketActiveRequestCount(poolKey) entry.requestCount += 1 let released = false return () => { if (released) { return } released = true entry.requestCount -= 1 decrementResponsesWebSocketActiveRequestCount(poolKey) if (entry.closed || entry.requestCount > 0) { return } if (pooled && responsesWebSocketPool.get(poolKey) === entry) { scheduleResponsesWebSocketIdleClose(poolKey, entry) return } removeResponsesWebSocketPoolEntry(poolKey, entry) } } const getReadyResponsesWebSocket = async ( poolKey: string, entry: ResponsesWebSocketEntry, pooled: boolean, ): Promise> => { if (entry.closed) { throw new Error( "Responses websocket became unavailable before the request started", ) } const websocket = await entry.websocketPromise if ( entry.closed || (pooled && responsesWebSocketPool.get(poolKey) !== entry) ) { throw new Error( "Responses websocket became unavailable before the request started", ) } if (websocket.readyState !== WebSocket.OPEN) { removeResponsesWebSocketPoolEntry(poolKey, entry) throw new Error( "Responses websocket became unavailable before the request started", ) } return websocket } const scheduleResponsesWebSocketIdleClose = ( poolKey: string, entry: ResponsesWebSocketEntry, ): void => { clearResponsesWebSocketIdleTimer(entry) entry.idleTimer = setTimeout(() => { removeResponsesWebSocketPoolEntry(poolKey, entry) }, RESPONSES_WEBSOCKET_IDLE_TIMEOUT_MS) unrefTimer(entry.idleTimer) } const clearResponsesWebSocketIdleTimer = ( entry: ResponsesWebSocketEntry, ): void => { if (entry.idleTimer) { clearTimeout(entry.idleTimer) entry.idleTimer = null } } const getResponsesWebSocketActiveRequestCount = (poolKey: string): number => responsesWebSocketActiveRequests.get(poolKey) ?? 0 const incrementResponsesWebSocketActiveRequestCount = ( poolKey: string, ): void => { responsesWebSocketActiveRequests.set( poolKey, getResponsesWebSocketActiveRequestCount(poolKey) + 1, ) } const decrementResponsesWebSocketActiveRequestCount = ( poolKey: string, ): void => { const nextCount = getResponsesWebSocketActiveRequestCount(poolKey) - 1 if (nextCount <= 0) { responsesWebSocketActiveRequests.delete(poolKey) return } responsesWebSocketActiveRequests.set(poolKey, nextCount) } const removeResponsesWebSocketPoolEntry = ( poolKey: string, entry: ResponsesWebSocketEntry, ): void => { if (responsesWebSocketPool.get(poolKey) === entry) { responsesWebSocketPool.delete(poolKey) } if (entry.closed) { return } entry.closed = true clearResponsesWebSocketIdleTimer(entry) entry.websocketPromise.then(closeResponsesWebSocket).catch(() => {}) } const unrefTimer = (timer: ReturnType): void => { if ( typeof timer === "object" && "unref" in timer && typeof timer.unref === "function" ) { timer.unref() } } const createResponsesWebSocketError = ( message: string, event?: Pick, ): Error => { const reason = event?.error ?? event?.message if (reason === undefined || reason === "") { return new Error(message) } const cause = toError(reason) return new Error(`${message}: ${cause.message}`, { cause }) } const openResponsesWebSocket = async ({ headers, url, }: { headers: Record url: string }): Promise> => await new Promise((resolve, reject) => { const dispatcher = getProxyEnvDispatcher() const init = dispatcher ? { dispatcher, headers } : { headers } const websocket = new WebSocket(url, init) const cleanup = () => { websocket.removeEventListener("open", onOpen) websocket.removeEventListener("error", onError) } const onOpen = () => { cleanup() resolve(websocket) } const onError = (event: ResponsesWebSocketErrorEvent) => { cleanup() reject( createResponsesWebSocketError( "Failed to create responses websocket", event, ), ) } websocket.addEventListener("open", onOpen) websocket.addEventListener("error", onError) }) const createWebSocketMessageStream = async function* ( websocket: InstanceType, ): AsyncIterable { const queue: Array> = [] let closed = false let error: Error | null = null let notify: (() => void) | null = null const wake = () => { notify?.() notify = null } const onMessage = (event: { data: unknown }) => { queue.push(normalizeWebSocketMessageData(event.data)) wake() } const onClose = () => { closed = true wake() } const onError = (event: ResponsesWebSocketErrorEvent) => { error = createResponsesWebSocketError( "Responses websocket stream error", event, ) wake() } websocket.addEventListener("message", onMessage) websocket.addEventListener("close", onClose) websocket.addEventListener("error", onError) try { while (true) { const item = queue.shift() if (item) { yield await item continue } if (error) { throw toError(error) } if (closed) { break } await new Promise((resolve) => { notify = resolve }) } } finally { websocket.removeEventListener("message", onMessage) websocket.removeEventListener("close", onClose) websocket.removeEventListener("error", onError) } } const normalizeWebSocketMessageData = async ( data: unknown, ): Promise => { if (typeof data === "string") { return data } if (data instanceof ArrayBuffer) { return new TextDecoder().decode(data) } if (ArrayBuffer.isView(data)) { const view = data return new TextDecoder().decode( new Uint8Array( view.buffer as ArrayBuffer, view.byteOffset, view.byteLength, ), ) } if (isTextReadable(data)) { return await data.text() } return String(data) } const isTextReadable = ( value: unknown, ): value is { text: () => Promise } => { if (!value || typeof value !== "object" || !("text" in value)) { return false } return typeof (value as { text?: unknown }).text === "function" } const toError = (value: unknown): Error => { if (value instanceof Error) { return value } return new Error(String(value)) } const getHeaderValue = ( headers: Record, headerName: string, ): string | undefined => { const normalizedHeaderName = headerName.toLowerCase() const match = Object.entries(headers).find( ([key]) => key.toLowerCase() === normalizedHeaderName, ) return match?.[1] } const encodePoolKeyPart = (value: string): string => encodeURIComponent(value) const createResponsesWebSocketStreamChunk = ( data: string, ): { data?: string; event?: string; id?: string } => { if (data === "[DONE]") { return { data } } try { const parsed = JSON.parse(data) as { copilot_quota_snapshots?: Record id?: unknown type?: unknown } if (parsed.type === "response.completed") { logCopilotQuotaSnapshots(parsed.copilot_quota_snapshots) } return { data: JSON.stringify(parsed), event: typeof parsed.type === "string" ? parsed.type : undefined, id: typeof parsed.id === "string" ? parsed.id : undefined, } } catch { return { data } } } const isTerminalResponsesStreamChunk = (chunk: { data?: string }): boolean => { if (!chunk.data || chunk.data === "[DONE]") { return false } try { const parsed = JSON.parse(chunk.data) as { type?: unknown } return ( parsed.type === "response.completed" || parsed.type === "response.failed" || parsed.type === "response.incomplete" || parsed.type === "error" ) } catch { return false } } const consumeResponsesWebSocketStream = async ( stream: ResponsesStream, ): Promise => { for await (const chunk of stream) { if (!chunk.data || chunk.data === "[DONE]") { continue } const event = JSON.parse(chunk.data) as ResponseStreamEvent if (event.type === "error") { throw new Error(event.message) } if ( event.type === "response.completed" || event.type === "response.failed" || event.type === "response.incomplete" ) { return event.response } } throw new Error("Responses websocket ended without a terminal response") } const closeResponsesWebSocket = ( websocket: InstanceType, ): void => { if ( websocket.readyState === WebSocket.CONNECTING || websocket.readyState === WebSocket.OPEN ) { websocket.close() } }