import { describe, expect, it } from "vitest"; import { MessageUpdateStatus, MessageUpdateType, type MessageUpdate, } from "$lib/types/MessageUpdate"; import { applyStreamingMode, resolveStreamingMode, smoothStreamUpdates } from "./messageUpdates"; async function* fromArray(values: T[]): AsyncGenerator { for (const value of values) { yield value; } } async function collect(iter: AsyncGenerator) { const updates: MessageUpdate[] = []; for await (const update of iter) { updates.push(update); } return updates; } const streamText = (updates: MessageUpdate[]) => updates .filter((u) => u.type === MessageUpdateType.Stream) .map((u) => u.token) .join(""); describe("smoothStreamUpdates", () => { it("merges partial words and preserves final text", async () => { const source: MessageUpdate[] = [ { type: MessageUpdateType.Stream, token: "Hel" }, { type: MessageUpdateType.Stream, token: "lo " }, { type: MessageUpdateType.Stream, token: "wor" }, { type: MessageUpdateType.Stream, token: "ld!" }, { type: MessageUpdateType.Status, status: MessageUpdateStatus.Finished }, ]; const updates = await collect( smoothStreamUpdates(fromArray(source), { minDelayMs: 0, maxDelayMs: 0, _internal: { detectChunk: (buffer) => /\S+\s+/.exec(buffer)?.[0] ?? null }, }) ); const streamedChunks = updates.filter((u) => u.type === MessageUpdateType.Stream); expect(streamedChunks.map((u) => u.token)).toEqual(["Hello ", "world!"]); expect(streamText(updates)).toBe("Hello world!"); }); it("flushes buffered stream text before non-stream updates", async () => { const source: MessageUpdate[] = [ { type: MessageUpdateType.Stream, token: "hello" }, { type: MessageUpdateType.Stream, token: " world" }, { type: MessageUpdateType.Title, title: "done" }, ]; const updates = await collect( smoothStreamUpdates(fromArray(source), { minDelayMs: 0, maxDelayMs: 0 }) ); expect(updates[0]).toMatchObject({ type: MessageUpdateType.Stream }); expect(updates[1]).toMatchObject({ type: MessageUpdateType.Stream }); expect(updates[2]).toEqual({ type: MessageUpdateType.Title, title: "done" }); expect(streamText(updates)).toBe("hello world"); }); it("spreads burst tokens over time", async () => { const bigToken = "word ".repeat(40); // 200 chars, 40 words const source: MessageUpdate[] = [{ type: MessageUpdateType.Stream, token: bigToken }]; let nowMs = 0; const emitTimes: number[] = []; const iter = smoothStreamUpdates(fromArray(source), { minDelayMs: 5, maxDelayMs: 80, minRateCharsPerMs: 0.3, _internal: { now: () => nowMs, sleep: async (ms: number) => { nowMs += ms; }, detectChunk: (buffer) => /\S+\s+/.exec(buffer)?.[0] ?? null, }, }); for await (const update of iter) { if (update.type === MessageUpdateType.Stream) { emitTimes.push(nowMs); } } // Should have multiple emissions expect(emitTimes.length).toBeGreaterThan(5); // Gap between first and last emission should be significant (not instant dump) const totalSpread = (emitTimes.at(-1) ?? 0) - (emitTimes[0] ?? 0); expect(totalSpread).toBeGreaterThan(100); }); it("keeps delays within configured bounds", async () => { const source: MessageUpdate[] = [ { type: MessageUpdateType.Stream, token: "one two three four five six seven eight nine ten ", }, ]; const delays: number[] = []; let nowMs = 0; await collect( smoothStreamUpdates(fromArray(source), { minDelayMs: 5, maxDelayMs: 80, minRateCharsPerMs: 0.3, _internal: { now: () => nowMs, sleep: async (ms: number) => { delays.push(ms); nowMs += ms; }, detectChunk: (buffer) => /\S+\s+/.exec(buffer)?.[0] ?? null, }, }) ); expect(delays.length).toBeGreaterThan(2); expect(delays.every((d) => d >= 5 && d <= 80)).toBe(true); // First delay should be >= later delays (rate floor dominates initially) expect(delays[0]).toBeGreaterThanOrEqual(delays.at(-1) ?? 0); }); it("handles CJK text correctly", async () => { const source: MessageUpdate[] = [{ type: MessageUpdateType.Stream, token: "你好,世界!" }]; const updates = await collect( smoothStreamUpdates(fromArray(source), { minDelayMs: 0, maxDelayMs: 0 }) ); expect(streamText(updates)).toBe("你好,世界!"); }); it("propagates source errors to consumer", async () => { async function* failingSource(): AsyncGenerator { yield { type: MessageUpdateType.Stream, token: "hello " }; throw new Error("source failed"); } await expect( collect(smoothStreamUpdates(failingSource(), { minDelayMs: 0, maxDelayMs: 0 })) ).rejects.toThrow("source failed"); }); it("propagates source errors even when no full chunk was emitted yet", async () => { async function* failingSource(): AsyncGenerator { yield { type: MessageUpdateType.Stream, token: "hel" }; throw new Error("source failed"); } await expect( collect( smoothStreamUpdates(failingSource(), { minDelayMs: 0, maxDelayMs: 0, _internal: { detectChunk: (buffer) => /\S+\s+/.exec(buffer)?.[0] ?? null }, }) ) ).rejects.toThrow("source failed"); }); it("drains queued stream chunks before throwing source errors", async () => { async function* failingSource(): AsyncGenerator { yield { type: MessageUpdateType.Stream, token: "a " }; yield { type: MessageUpdateType.Stream, token: "b " }; yield { type: MessageUpdateType.Stream, token: "c " }; throw new Error("source failed"); } const seen: MessageUpdate[] = []; let seenError: Error | null = null; try { for await (const update of smoothStreamUpdates(failingSource(), { minDelayMs: 0, maxDelayMs: 0, _internal: { detectChunk: (buffer) => /\S+\s+/.exec(buffer)?.[0] ?? null }, })) { seen.push(update); } } catch (error) { seenError = error as Error; } expect(streamText(seen)).toBe("a b c "); expect(seenError?.message).toBe("source failed"); }); it("caps burst tail latency with backlog acceleration", async () => { const source: MessageUpdate[] = [ { type: MessageUpdateType.Stream, token: "word ".repeat(500) }, ]; let nowMs = 0; await collect( smoothStreamUpdates(fromArray(source), { minDelayMs: 5, maxDelayMs: 80, minRateCharsPerMs: 0.3, maxBufferedMs: 400, _internal: { now: () => nowMs, sleep: async (ms: number) => { nowMs += ms; }, detectChunk: (buffer) => /\S+\s+/.exec(buffer)?.[0] ?? null, }, }) ); expect(nowMs).toBeLessThan(1500); }); it("skips empty tokens gracefully", async () => { const source: MessageUpdate[] = [ { type: MessageUpdateType.Stream, token: "" }, { type: MessageUpdateType.Stream, token: "hello " }, { type: MessageUpdateType.Stream, token: "" }, { type: MessageUpdateType.Stream, token: "world!" }, { type: MessageUpdateType.Status, status: MessageUpdateStatus.Finished }, ]; const updates = await collect( smoothStreamUpdates(fromArray(source), { minDelayMs: 0, maxDelayMs: 0 }) ); expect(streamText(updates)).toBe("hello world!"); }); }); describe("applyStreamingMode", () => { it("keeps stream unchanged for raw mode", async () => { const source: MessageUpdate[] = [ { type: MessageUpdateType.Stream, token: "Hello" }, { type: MessageUpdateType.Status, status: MessageUpdateStatus.Finished }, ]; const raw = await collect(applyStreamingMode(fromArray(source), "raw")); expect(raw).toEqual(source); }); }); describe("resolveStreamingMode", () => { it("returns explicit streamingMode when set", () => { expect(resolveStreamingMode({ streamingMode: "raw" })).toBe("raw"); expect(resolveStreamingMode({ streamingMode: "smooth" })).toBe("smooth"); }); it("defaults to smooth when unset", () => { expect(resolveStreamingMode({})).toBe("smooth"); }); it("maps unsupported legacy values to smooth", () => { expect(resolveStreamingMode({ streamingMode: "final" })).toBe("smooth"); }); });