| | import type { MessageUpdate, TextStreamUpdate } from "$lib/types/MessageUpdate"; |
| |
|
| | type MessageUpdateRequestOptions = { |
| | base: string; |
| | inputs?: string; |
| | messageId?: string; |
| | isRetry: boolean; |
| | isContinue: boolean; |
| | webSearch: boolean; |
| | files?: string[]; |
| | }; |
| | export async function fetchMessageUpdates( |
| | conversationId: string, |
| | opts: MessageUpdateRequestOptions, |
| | abortSignal: AbortSignal |
| | ): Promise<AsyncGenerator<MessageUpdate>> { |
| | const abortController = new AbortController(); |
| | abortSignal.addEventListener("abort", () => abortController.abort()); |
| |
|
| | const response = await fetch(`${opts.base}/conversation/${conversationId}`, { |
| | method: "POST", |
| | headers: { "Content-Type": "application/json" }, |
| | body: JSON.stringify({ |
| | inputs: opts.inputs, |
| | id: opts.messageId, |
| | is_retry: opts.isRetry, |
| | is_continue: opts.isContinue, |
| | web_search: opts.webSearch, |
| | files: opts.files, |
| | }), |
| | signal: abortController.signal, |
| | }); |
| |
|
| | if (!response.ok) { |
| | const errorMessage = await response |
| | .json() |
| | .then((obj) => obj.message) |
| | .catch(() => `Request failed with status code ${response.status}: ${response.statusText}`); |
| | throw Error(errorMessage); |
| | } |
| | if (!response.body) { |
| | throw Error("Body not defined"); |
| | } |
| | return smoothAsyncIterator( |
| | streamMessageUpdatesToFullWords(endpointStreamToIterator(response, abortController)) |
| | ); |
| | } |
| |
|
| | async function* endpointStreamToIterator( |
| | response: Response, |
| | abortController: AbortController |
| | ): AsyncGenerator<MessageUpdate> { |
| | const reader = response.body?.pipeThrough(new TextDecoderStream()).getReader(); |
| | if (!reader) throw Error("Response for endpoint had no body"); |
| |
|
| | |
| | reader.closed.then(() => abortController.abort()); |
| |
|
| | |
| | abortController.signal.addEventListener("abort", () => reader.cancel()); |
| |
|
| | |
| | |
| | let prevChunk = ""; |
| | while (!abortController.signal.aborted) { |
| | const { done, value } = await reader.read(); |
| | if (done) { |
| | abortController.abort(); |
| | break; |
| | } |
| | if (!value) continue; |
| |
|
| | const { messageUpdates, remainingText } = parseMessageUpdates(prevChunk + value); |
| | prevChunk = remainingText; |
| | for (const messageUpdate of messageUpdates) yield messageUpdate; |
| | } |
| | } |
| |
|
| | function parseMessageUpdates(value: string): { |
| | messageUpdates: MessageUpdate[]; |
| | remainingText: string; |
| | } { |
| | const inputs = value.split("\n"); |
| | const messageUpdates: MessageUpdate[] = []; |
| | for (const input of inputs) { |
| | try { |
| | messageUpdates.push(JSON.parse(input) as MessageUpdate); |
| | } catch (error) { |
| | |
| | if (error instanceof SyntaxError) { |
| | return { |
| | messageUpdates, |
| | remainingText: inputs.at(-1) ?? "", |
| | }; |
| | } |
| | } |
| | } |
| | return { messageUpdates, remainingText: "" }; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | async function* streamMessageUpdatesToFullWords( |
| | iterator: AsyncGenerator<MessageUpdate> |
| | ): AsyncGenerator<MessageUpdate> { |
| | let bufferedStreamUpdates: TextStreamUpdate[] = []; |
| |
|
| | const endAlphanumeric = /[a-zA-Z0-9À-ž'`]+$/; |
| | const beginnningAlphanumeric = /^[a-zA-Z0-9À-ž'`]+/; |
| |
|
| | for await (const messageUpdate of iterator) { |
| | if (messageUpdate.type !== "stream") { |
| | yield messageUpdate; |
| | continue; |
| | } |
| | bufferedStreamUpdates.push(messageUpdate); |
| |
|
| | let lastIndexEmitted = 0; |
| | for (let i = 1; i < bufferedStreamUpdates.length; i++) { |
| | const prevEndsAlphanumeric = endAlphanumeric.test(bufferedStreamUpdates[i - 1].token); |
| | const currBeginsAlphanumeric = beginnningAlphanumeric.test(bufferedStreamUpdates[i].token); |
| | const shouldCombine = prevEndsAlphanumeric && currBeginsAlphanumeric; |
| | const combinedTooMany = i - lastIndexEmitted >= 5; |
| | if (shouldCombine && !combinedTooMany) continue; |
| |
|
| | |
| | yield { |
| | type: "stream", |
| | token: bufferedStreamUpdates |
| | .slice(lastIndexEmitted, i) |
| | .map((_) => _.token) |
| | .join(""), |
| | }; |
| | lastIndexEmitted = i; |
| | } |
| | bufferedStreamUpdates = bufferedStreamUpdates.slice(lastIndexEmitted); |
| | } |
| | for (const messageUpdate of bufferedStreamUpdates) yield messageUpdate; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | async function* smoothAsyncIterator<T>(iterator: AsyncGenerator<T>): AsyncGenerator<T> { |
| | const eventTarget = new EventTarget(); |
| | let done = false; |
| | const valuesBuffer: T[] = []; |
| | const valueTimesMS: number[] = []; |
| |
|
| | const next = async () => { |
| | const obj = await iterator.next(); |
| | if (obj.done) { |
| | done = true; |
| | } else { |
| | valuesBuffer.push(obj.value); |
| | valueTimesMS.push(performance.now()); |
| | next(); |
| | } |
| | eventTarget.dispatchEvent(new Event("next")); |
| | }; |
| | next(); |
| |
|
| | let timeOfLastEmitMS = performance.now(); |
| | while (!done || valuesBuffer.length > 0) { |
| | |
| | const sampledTimesMS = valueTimesMS.slice(-30); |
| |
|
| | |
| | const anomalyThresholdMS = 2000; |
| | const anomalyDurationMS = sampledTimesMS |
| | .map((time, i, times) => time - times[i - 1]) |
| | .slice(1) |
| | .filter((time) => time > anomalyThresholdMS) |
| | .reduce((a, b) => a + b, 0); |
| |
|
| | |
| | const totalTimeMSBetweenValues = sampledTimesMS.at(-1)! - sampledTimesMS[0]; |
| | const timeMSBetweenValues = totalTimeMSBetweenValues - anomalyDurationMS; |
| |
|
| | const averageTimeMSBetweenValues = Math.min( |
| | 200, |
| | timeMSBetweenValues / (sampledTimesMS.length - 1) |
| | ); |
| | const timeSinceLastEmitMS = performance.now() - timeOfLastEmitMS; |
| |
|
| | |
| | const gotNext = await Promise.race([ |
| | sleep(Math.max(5, averageTimeMSBetweenValues - timeSinceLastEmitMS)), |
| | waitForEvent(eventTarget, "next"), |
| | ]); |
| |
|
| | |
| | if (gotNext) continue; |
| |
|
| | |
| | if (valuesBuffer.length === 0) continue; |
| |
|
| | |
| | timeOfLastEmitMS = performance.now(); |
| | |
| | yield valuesBuffer.shift()!; |
| | } |
| | } |
| |
|
| | const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); |
| | const waitForEvent = (eventTarget: EventTarget, eventName: string) => |
| | new Promise<boolean>((resolve) => |
| | eventTarget.addEventListener(eventName, () => resolve(true), { once: true }) |
| | ); |
| |
|