| import { Transform, TransformOptions } from "stream"; |
| import { logger } from "../../../../logger"; |
| import { APIFormat } from "../../../../shared/key-management"; |
| import { assertNever } from "../../../../shared/utils"; |
| import { |
| anthropicChatToOpenAI, |
| anthropicChatToAnthropicV2, |
| anthropicV1ToOpenAI, |
| AnthropicV2StreamEvent, |
| anthropicV2ToOpenAI, |
| googleAIToOpenAI, |
| OpenAIChatCompletionStreamEvent, |
| openAITextToOpenAIChat, |
| mistralAIToOpenAI, |
| mistralTextToMistralChat, |
| passthroughToOpenAI, |
| StreamingCompletionTransformer, |
| MistralChatCompletionEvent, |
| } from "./index"; |
|
|
| type SSEMessageTransformerOptions = TransformOptions & { |
| requestedModel: string; |
| requestId: string; |
| inputFormat: APIFormat; |
| inputApiVersion?: string; |
| outputFormat?: APIFormat; |
| logger: typeof logger; |
| }; |
|
|
| |
| |
| |
| |
| export class SSEMessageTransformer extends Transform { |
| private lastPosition: number; |
| private transformState: any; |
| private msgCount: number; |
| private readonly inputFormat: APIFormat; |
| private readonly transformFn: StreamingCompletionTransformer< |
| |
| | OpenAIChatCompletionStreamEvent |
| | AnthropicV2StreamEvent |
| | MistralChatCompletionEvent |
| >; |
| private readonly log; |
| private readonly fallbackId: string; |
| private readonly fallbackModel: string; |
|
|
| constructor(options: SSEMessageTransformerOptions) { |
| super({ ...options, readableObjectMode: true }); |
| this.log = options.logger?.child({ module: "sse-transformer" }); |
| this.lastPosition = 0; |
| this.msgCount = 0; |
| this.transformFn = getTransformer( |
| options.inputFormat, |
| options.inputApiVersion, |
| options.outputFormat |
| ); |
| this.inputFormat = options.inputFormat; |
| this.fallbackId = options.requestId; |
| this.fallbackModel = options.requestedModel; |
| this.log.debug( |
| { |
| fn: this.transformFn.name, |
| format: options.inputFormat, |
| version: options.inputApiVersion, |
| }, |
| "Selected SSE transformer" |
| ); |
| } |
|
|
| _transform(chunk: Buffer, _encoding: BufferEncoding, callback: Function) { |
| try { |
| const originalMessage = chunk.toString(); |
| const { |
| event: transformedMessage, |
| position: newPosition, |
| state, |
| } = this.transformFn({ |
| data: originalMessage, |
| lastPosition: this.lastPosition, |
| index: this.msgCount++, |
| fallbackId: this.fallbackId, |
| fallbackModel: this.fallbackModel, |
| state: this.transformState, |
| }); |
| this.lastPosition = newPosition; |
| this.transformState = state; |
|
|
| |
| |
| |
| |
| if (this.inputFormat === "openai" && this.msgCount <= 1) { |
| if (originalMessage.includes("prompt_filter_results")) { |
| this.log.debug("Dropping Azure OpenAI content moderation SSE event"); |
| return callback(); |
| } |
| } |
|
|
| this.emit("originalMessage", originalMessage); |
|
|
| |
| if (!transformedMessage) return callback(); |
|
|
| if (this.msgCount === 1 && eventIsOpenAIEvent(transformedMessage)) { |
| |
| this.push(createInitialMessage(transformedMessage)); |
| } |
| this.push(transformedMessage); |
| callback(); |
| } catch (err) { |
| err.lastEvent = chunk?.toString(); |
| this.log.error(err, "Error transforming SSE message"); |
| callback(err); |
| } |
| } |
| } |
|
|
| function eventIsOpenAIEvent( |
| event: any |
| ): event is OpenAIChatCompletionStreamEvent { |
| return event?.object === "chat.completion.chunk"; |
| } |
|
|
| function getTransformer( |
| responseApi: APIFormat, |
| version?: string, |
| |
| |
| |
| requestApi: APIFormat = "openai" |
| ): StreamingCompletionTransformer< |
| | OpenAIChatCompletionStreamEvent |
| | AnthropicV2StreamEvent |
| | MistralChatCompletionEvent |
| > { |
| switch (responseApi) { |
| case "openai": |
| return passthroughToOpenAI; |
| case "openai-text": |
| return openAITextToOpenAIChat; |
| case "anthropic-text": |
| return version === "2023-01-01" |
| ? anthropicV1ToOpenAI |
| : anthropicV2ToOpenAI; |
| case "anthropic-chat": |
| return requestApi === "anthropic-text" |
| ? anthropicChatToAnthropicV2 |
| : anthropicChatToOpenAI; |
| case "google-ai": |
| return googleAIToOpenAI; |
| case "mistral-ai": |
| return mistralAIToOpenAI; |
| case "mistral-text": |
| return requestApi === "mistral-ai" |
| ? mistralTextToMistralChat |
| : mistralAIToOpenAI; |
| case "openai-image": |
| throw new Error(`SSE transformation not supported for ${responseApi}`); |
| case "openai-responses": |
| return passthroughToOpenAI; |
| default: |
| assertNever(responseApi); |
| } |
| } |
|
|
| |
| |
| |
| |
| |
| |
| function createInitialMessage( |
| event: OpenAIChatCompletionStreamEvent |
| ): OpenAIChatCompletionStreamEvent { |
| return { |
| ...event, |
| choices: event.choices.map((choice) => ({ |
| ...choice, |
| delta: { role: "assistant", content: "" }, |
| })), |
| }; |
| } |
|
|