| import type { MessageUpdate, TextStreamUpdate } from "$lib/types/MessageUpdate"; |
|
|
| type MessageUpdateRequestOptions = { |
| base: string; |
| inputs?: string; |
| messageId?: string; |
| isRetry: boolean; |
| isContinue: boolean; |
| webSearch: boolean; |
| files?: string[]; |
| }; |
| export async function fetchMessageUpdates( |
| conversationId: string, |
| opts: MessageUpdateRequestOptions, |
| abortSignal: AbortSignal |
| ): Promise<AsyncGenerator<MessageUpdate>> { |
| const abortController = new AbortController(); |
| abortSignal.addEventListener("abort", () => abortController.abort()); |
|
|
| const response = await fetch(`${opts.base}/conversation/${conversationId}`, { |
| method: "POST", |
| headers: { "Content-Type": "application/json" }, |
| body: JSON.stringify({ |
| inputs: opts.inputs, |
| id: opts.messageId, |
| is_retry: opts.isRetry, |
| is_continue: opts.isContinue, |
| web_search: opts.webSearch, |
| files: opts.files, |
| }), |
| signal: abortController.signal, |
| }); |
|
|
| if (!response.ok) { |
| const errorMessage = await response |
| .json() |
| .then((obj) => obj.message) |
| .catch(() => `Request failed with status code ${response.status}: ${response.statusText}`); |
| throw Error(errorMessage); |
| } |
| if (!response.body) { |
| throw Error("Body not defined"); |
| } |
| return smoothAsyncIterator( |
| streamMessageUpdatesToFullWords(endpointStreamToIterator(response, abortController)) |
| ); |
| } |
|
|
| async function* endpointStreamToIterator( |
| response: Response, |
| abortController: AbortController |
| ): AsyncGenerator<MessageUpdate> { |
| const reader = response.body?.pipeThrough(new TextDecoderStream()).getReader(); |
| if (!reader) throw Error("Response for endpoint had no body"); |
|
|
| |
| reader.closed.then(() => abortController.abort()); |
|
|
| |
| abortController.signal.addEventListener("abort", () => reader.cancel()); |
|
|
| |
| |
| let prevChunk = ""; |
| while (!abortController.signal.aborted) { |
| const { done, value } = await reader.read(); |
| if (done) { |
| abortController.abort(); |
| break; |
| } |
| if (!value) continue; |
|
|
| const { messageUpdates, remainingText } = parseMessageUpdates(prevChunk + value); |
| prevChunk = remainingText; |
| for (const messageUpdate of messageUpdates) yield messageUpdate; |
| } |
| } |
|
|
| function parseMessageUpdates(value: string): { |
| messageUpdates: MessageUpdate[]; |
| remainingText: string; |
| } { |
| const inputs = value.split("\n"); |
| const messageUpdates: MessageUpdate[] = []; |
| for (const input of inputs) { |
| try { |
| messageUpdates.push(JSON.parse(input) as MessageUpdate); |
| } catch (error) { |
| |
| if (error instanceof SyntaxError) { |
| return { |
| messageUpdates, |
| remainingText: inputs.at(-1) ?? "", |
| }; |
| } |
| } |
| } |
| return { messageUpdates, remainingText: "" }; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| async function* streamMessageUpdatesToFullWords( |
| iterator: AsyncGenerator<MessageUpdate> |
| ): AsyncGenerator<MessageUpdate> { |
| let bufferedStreamUpdates: TextStreamUpdate[] = []; |
|
|
| const endAlphanumeric = /[a-zA-Z0-9À-ž'`]+$/; |
| const beginnningAlphanumeric = /^[a-zA-Z0-9À-ž'`]+/; |
|
|
| for await (const messageUpdate of iterator) { |
| if (messageUpdate.type !== "stream") { |
| yield messageUpdate; |
| continue; |
| } |
| bufferedStreamUpdates.push(messageUpdate); |
|
|
| let lastIndexEmitted = 0; |
| for (let i = 1; i < bufferedStreamUpdates.length; i++) { |
| const prevEndsAlphanumeric = endAlphanumeric.test(bufferedStreamUpdates[i - 1].token); |
| const currBeginsAlphanumeric = beginnningAlphanumeric.test(bufferedStreamUpdates[i].token); |
| const shouldCombine = prevEndsAlphanumeric && currBeginsAlphanumeric; |
| const combinedTooMany = i - lastIndexEmitted >= 5; |
| if (shouldCombine && !combinedTooMany) continue; |
|
|
| |
| yield { |
| type: "stream", |
| token: bufferedStreamUpdates |
| .slice(lastIndexEmitted, i) |
| .map((_) => _.token) |
| .join(""), |
| }; |
| lastIndexEmitted = i; |
| } |
| bufferedStreamUpdates = bufferedStreamUpdates.slice(lastIndexEmitted); |
| } |
| for (const messageUpdate of bufferedStreamUpdates) yield messageUpdate; |
| } |
|
|
| |
| |
| |
| |
| async function* smoothAsyncIterator<T>(iterator: AsyncGenerator<T>): AsyncGenerator<T> { |
| const eventTarget = new EventTarget(); |
| let done = false; |
| const valuesBuffer: T[] = []; |
| const valueTimesMS: number[] = []; |
|
|
| const next = async () => { |
| const obj = await iterator.next(); |
| if (obj.done) { |
| done = true; |
| } else { |
| valuesBuffer.push(obj.value); |
| valueTimesMS.push(performance.now()); |
| next(); |
| } |
| eventTarget.dispatchEvent(new Event("next")); |
| }; |
| next(); |
|
|
| let timeOfLastEmitMS = performance.now(); |
| while (!done || valuesBuffer.length > 0) { |
| |
| const sampledTimesMS = valueTimesMS.slice(-30); |
|
|
| |
| const anomalyThresholdMS = 2000; |
| const anomalyDurationMS = sampledTimesMS |
| .map((time, i, times) => time - times[i - 1]) |
| .slice(1) |
| .filter((time) => time > anomalyThresholdMS) |
| .reduce((a, b) => a + b, 0); |
|
|
| |
| const totalTimeMSBetweenValues = sampledTimesMS.at(-1)! - sampledTimesMS[0]; |
| const timeMSBetweenValues = totalTimeMSBetweenValues - anomalyDurationMS; |
|
|
| const averageTimeMSBetweenValues = Math.min( |
| 200, |
| timeMSBetweenValues / (sampledTimesMS.length - 1) |
| ); |
| const timeSinceLastEmitMS = performance.now() - timeOfLastEmitMS; |
|
|
| |
| const gotNext = await Promise.race([ |
| sleep(Math.max(5, averageTimeMSBetweenValues - timeSinceLastEmitMS)), |
| waitForEvent(eventTarget, "next"), |
| ]); |
|
|
| |
| if (gotNext) continue; |
|
|
| |
| if (valuesBuffer.length === 0) continue; |
|
|
| |
| timeOfLastEmitMS = performance.now(); |
| |
| yield valuesBuffer.shift()!; |
| } |
| } |
|
|
| const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); |
| const waitForEvent = (eventTarget: EventTarget, eventName: string) => |
| new Promise<boolean>((resolve) => |
| eventTarget.addEventListener(eventName, () => resolve(true), { once: true }) |
| ); |
|
|