chat-ui / src /lib /utils /messageUpdates.ts
DreamyDetective's picture
added app details
ded72f6
import type { MessageFile } from "$lib/types/Message";
import {
type MessageUpdate,
type MessageToolUpdate,
type MessageToolCallUpdate,
type MessageToolResultUpdate,
type MessageToolErrorUpdate,
type MessageToolProgressUpdate,
MessageUpdateType,
MessageToolUpdateType,
} from "$lib/types/MessageUpdate";
import type { StreamingMode } from "$lib/types/Settings";
import type { KeyValuePair } from "$lib/types/Tool";
type MessageUpdateRequestOptions = {
base: string;
inputs?: string;
messageId?: string;
isRetry: boolean;
isContinue?: boolean;
files?: MessageFile[];
// Optional: pass selected MCP server names (client-side selection)
selectedMcpServerNames?: string[];
// Optional: pass selected MCP server configs (for custom client-defined servers)
selectedMcpServers?: Array<{ name: string; url: string; headers?: KeyValuePair[] }>;
streamingMode?: StreamingMode;
};
type ChunkDetector = (buffer: string) => string | null;
type SmoothStreamConfig = {
minDelayMs?: number;
maxDelayMs?: number;
minRateCharsPerMs?: number;
maxBufferedMs?: number;
_internal?: {
now?: () => number;
sleep?: (ms: number) => Promise<void>;
detectChunk?: ChunkDetector;
};
};
export async function fetchMessageUpdates(
conversationId: string,
opts: MessageUpdateRequestOptions,
abortSignal: AbortSignal
): Promise<AsyncGenerator<MessageUpdate>> {
const abortController = new AbortController();
abortSignal.addEventListener("abort", () => abortController.abort());
const form = new FormData();
const optsJSON = JSON.stringify({
inputs: opts.inputs,
id: opts.messageId,
is_retry: opts.isRetry,
is_continue: Boolean(opts.isContinue),
// Will be ignored server-side if unsupported
selectedMcpServerNames: opts.selectedMcpServerNames,
selectedMcpServers: opts.selectedMcpServers,
});
opts.files?.forEach((file) => {
const name = file.type + ";" + file.name;
form.append("files", new File([file.value], name, { type: file.mime }));
});
form.append("data", optsJSON);
const response = await fetch(`${opts.base}/conversation/${conversationId}`, {
method: "POST",
body: form,
signal: abortController.signal,
});
if (!response.ok) {
const errorMessage = await response
.json()
.then((obj) => obj.message)
.catch(() => `Request failed with status code ${response.status}: ${response.statusText}`);
throw Error(errorMessage);
}
if (!response.body) {
throw Error("Body not defined");
}
return applyStreamingMode(
endpointStreamToIterator(response, abortController),
opts.streamingMode ?? "smooth"
);
}
export function applyStreamingMode(
iterator: AsyncGenerator<MessageUpdate>,
streamingMode: StreamingMode
): AsyncGenerator<MessageUpdate> {
if (streamingMode === "smooth") {
return smoothStreamUpdates(iterator);
}
// "raw" keeps source stream intact.
return iterator;
}
export function resolveStreamingMode(s: { streamingMode?: unknown }): StreamingMode {
return s.streamingMode === "raw" || s.streamingMode === "smooth" ? s.streamingMode : "smooth";
}
async function* endpointStreamToIterator(
response: Response,
abortController: AbortController
): AsyncGenerator<MessageUpdate> {
const reader = response.body?.pipeThrough(new TextDecoderStream()).getReader();
if (!reader) throw Error("Response for endpoint had no body");
// Handle any cases where we must abort
reader.closed.then(() => abortController.abort());
// Handle logic for aborting
abortController.signal.addEventListener("abort", () => reader.cancel());
// ex) If the last response is => {"type": "stream", "token":
// It should be => {"type": "stream", "token": "Hello"} = prev_input_chunk + "Hello"}
let prevChunk = "";
while (!abortController.signal.aborted) {
const { done, value } = await reader.read();
if (done) {
abortController.abort();
break;
}
if (!value) continue;
const { messageUpdates, remainingText } = parseMessageUpdates(prevChunk + value);
prevChunk = remainingText;
for (const messageUpdate of messageUpdates) yield messageUpdate;
}
}
function parseMessageUpdates(value: string): {
messageUpdates: MessageUpdate[];
remainingText: string;
} {
const inputs = value.split("\n");
const messageUpdates: MessageUpdate[] = [];
for (const input of inputs) {
try {
messageUpdates.push(JSON.parse(input) as MessageUpdate);
} catch (error) {
// in case of parsing error, we return what we were able to parse
if (error instanceof SyntaxError) {
return {
messageUpdates,
remainingText: inputs.at(-1) ?? "",
};
}
}
}
return { messageUpdates, remainingText: "" };
}
export async function* smoothStreamUpdates(
iterator: AsyncGenerator<MessageUpdate>,
{
minDelayMs = 5,
maxDelayMs = 80,
minRateCharsPerMs = 0.3,
maxBufferedMs = 400,
_internal: { now = () => performance.now(), sleep = defaultSleep, detectChunk } = {},
}: SmoothStreamConfig = {}
): AsyncGenerator<MessageUpdate> {
const chunkDetector = detectChunk ?? createWordChunkDetector();
const eventTarget = new EventTarget();
const outputQueue: Array<{ update: MessageUpdate }> = [];
let producerDone = false;
let producerError: unknown = null;
let pendingBuffer = "";
let queuedStreamChars = 0;
const enqueue = (update: MessageUpdate) => {
if (update.type === MessageUpdateType.Stream) {
queuedStreamChars += update.token.length;
}
outputQueue.push({ update });
eventTarget.dispatchEvent(new Event("next"));
};
const flushPendingBuffer = () => {
if (pendingBuffer.length === 0) return;
enqueue({ type: MessageUpdateType.Stream, token: pendingBuffer });
pendingBuffer = "";
};
const producer = (async () => {
for await (const messageUpdate of iterator) {
if (messageUpdate.type !== MessageUpdateType.Stream) {
flushPendingBuffer();
enqueue(messageUpdate);
continue;
}
if (!messageUpdate.token) continue;
pendingBuffer += messageUpdate.token;
let chunk: string | null;
while ((chunk = chunkDetector(pendingBuffer)) !== null) {
if (chunk.length === 0) break;
enqueue({ type: MessageUpdateType.Stream, token: chunk });
pendingBuffer = pendingBuffer.slice(chunk.length);
}
}
flushPendingBuffer();
})()
.catch((error) => {
producerError = error;
})
.finally(() => {
producerDone = true;
eventTarget.dispatchEvent(new Event("next"));
});
// Character-rate targeting consumer
let totalCharsEmitted = 0;
let firstEmitAt: number | null = null;
while (!producerDone || outputQueue.length > 0) {
if (outputQueue.length === 0) {
await waitForEvent(eventTarget, "next");
continue;
}
const next = outputQueue.shift();
if (!next) continue;
if (next.update.type === MessageUpdateType.Stream) {
const tokenLen = next.update.token.length;
queuedStreamChars = Math.max(0, queuedStreamChars - tokenLen);
totalCharsEmitted += tokenLen;
if (firstEmitAt === null) firstEmitAt = now();
const elapsedMs = now() - firstEmitAt;
const currentRate = elapsedMs > 0 ? totalCharsEmitted / elapsedMs : 0;
const backlogChars = tokenLen + queuedStreamChars;
const backlogRate = maxBufferedMs > 0 ? backlogChars / maxBufferedMs : 0;
const targetRate = Math.max(currentRate, minRateCharsPerMs, backlogRate);
const rawDelay = tokenLen / targetRate;
const underBacklogPressure = backlogRate > minRateCharsPerMs;
const effectiveMinDelayMs = underBacklogPressure ? 0 : minDelayMs;
const delayMs = Math.round(Math.max(effectiveMinDelayMs, Math.min(maxDelayMs, rawDelay)));
if (delayMs > 0) {
await sleep(delayMs);
}
}
yield next.update;
}
await producer;
if (producerError) throw producerError;
}
function createWordChunkDetector(): ChunkDetector {
if (typeof Intl !== "undefined" && typeof Intl.Segmenter === "function") {
const segmenter = new Intl.Segmenter(undefined, { granularity: "word" });
return (buffer: string): string | null => {
if (buffer.length === 0) return null;
let cursor = 0;
let boundary = 0;
let sawWordLike = false;
for (const part of segmenter.segment(buffer)) {
cursor += part.segment.length;
if (part.isWordLike) {
sawWordLike = true;
continue;
}
if (sawWordLike) {
boundary = cursor;
break;
}
}
return boundary > 0 ? buffer.slice(0, boundary) : null;
};
}
const wordWithTrailingBoundary = /\S+\s+/m;
return (buffer: string): string | null => {
const match = wordWithTrailingBoundary.exec(buffer);
if (!match) return null;
return buffer.slice(0, match.index) + match[0];
};
}
// Tool update type guards for UI rendering
export const isMessageToolUpdate = (update: MessageUpdate): update is MessageToolUpdate =>
update.type === MessageUpdateType.Tool;
export const isMessageToolCallUpdate = (update: MessageUpdate): update is MessageToolCallUpdate =>
isMessageToolUpdate(update) && update.subtype === MessageToolUpdateType.Call;
export const isMessageToolResultUpdate = (
update: MessageUpdate
): update is MessageToolResultUpdate =>
isMessageToolUpdate(update) && update.subtype === MessageToolUpdateType.Result;
export const isMessageToolErrorUpdate = (update: MessageUpdate): update is MessageToolErrorUpdate =>
isMessageToolUpdate(update) && update.subtype === MessageToolUpdateType.Error;
export const isMessageToolProgressUpdate = (
update: MessageUpdate
): update is MessageToolProgressUpdate =>
isMessageToolUpdate(update) && update.subtype === MessageToolUpdateType.Progress;
const defaultSleep = (ms: number): Promise<void> =>
new Promise((resolve) => setTimeout(resolve, ms));
const waitForEvent = (eventTarget: EventTarget, eventName: string) =>
new Promise<boolean>((resolve) =>
eventTarget.addEventListener(eventName, () => resolve(true), { once: true })
);