Spaces:
No application file
No application file
| import type { DirectorState } from '@/lib/types/chat'; | |
| /** | |
| * StreamBuffer β unified presentation pacing layer. | |
| * | |
| * Sits between data sources (SSE stream / PlaybackEngine) and React state. | |
| * Events are pushed into an ordered queue; a fixed-rate tick loop reveals | |
| * text character-by-character and fires typed callbacks so both the Chat | |
| * area and the Roundtable bubble consume identically-paced content. | |
| * | |
| * Key invariants: | |
| * - ONE source of pacing (this tick loop) β no double typewriter. | |
| * - pause() is O(1) instant β tick returns immediately. | |
| * - Actions fire only when the tick cursor reaches them (after preceding text). | |
| * - Roundtable sees only the current speech segment (resets on action / agent switch). | |
| */ | |
| // βββ Buffer Item Types βββββββββββββββββββββββββββββββββββββββββββββββ | |
| export interface AgentStartItem { | |
| kind: 'agent_start'; | |
| messageId: string; | |
| agentId: string; | |
| agentName: string; | |
| avatar?: string; | |
| color?: string; | |
| } | |
| export interface AgentEndItem { | |
| kind: 'agent_end'; | |
| messageId: string; | |
| agentId: string; | |
| } | |
| export interface TextItem { | |
| kind: 'text'; | |
| messageId: string; | |
| agentId: string; | |
| /** Unique ID for this text part β distinguishes multiple text items within one message (e.g. lecture). */ | |
| partId: string; | |
| /** Growable β SSE deltas append here. */ | |
| text: string; | |
| /** When true, no more text will be appended. Tick can advance past once fully revealed. */ | |
| sealed: boolean; | |
| } | |
| export interface ActionItem { | |
| kind: 'action'; | |
| messageId: string; | |
| actionId: string; | |
| actionName: string; | |
| params: Record<string, unknown>; | |
| agentId: string; | |
| } | |
| export interface ThinkingItem { | |
| kind: 'thinking'; | |
| stage: string; | |
| agentId?: string; | |
| } | |
| export interface CueUserItem { | |
| kind: 'cue_user'; | |
| fromAgentId?: string; | |
| prompt?: string; | |
| } | |
| export interface DoneItem { | |
| kind: 'done'; | |
| totalActions: number; | |
| totalAgents: number; | |
| agentHadContent?: boolean; | |
| directorState?: DirectorState; | |
| } | |
| export interface ErrorItem { | |
| kind: 'error'; | |
| message: string; | |
| } | |
| export type BufferItem = | |
| | AgentStartItem | |
| | AgentEndItem | |
| | TextItem | |
| | ActionItem | |
| | ThinkingItem | |
| | CueUserItem | |
| | DoneItem | |
| | ErrorItem; | |
| // βββ Callbacks βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| export interface StreamBufferCallbacks { | |
| onAgentStart(data: AgentStartItem): void; | |
| onAgentEnd(data: AgentEndItem): void; | |
| /** | |
| * Fired each tick while a text item is being revealed. | |
| * @param messageId β which message to update | |
| * @param partId β unique ID for this text part (stable across ticks) | |
| * @param revealedText β text visible so far (slice of full text) | |
| * @param isComplete β true when this text item is fully revealed AND sealed | |
| */ | |
| onTextReveal(messageId: string, partId: string, revealedText: string, isComplete: boolean): void; | |
| /** Fired when tick reaches an action item. Callers should execute the effect + add badge. */ | |
| onActionReady(messageId: string, data: ActionItem): void; | |
| /** | |
| * Unified speech feed for the Roundtable bubble. | |
| * Reports only the CURRENT segment text (resets on action / agent switch). | |
| * Called with (null, null) when buffer completes or is disposed. | |
| */ | |
| onLiveSpeech(text: string | null, agentId: string | null): void; | |
| /** | |
| * Speech progress ratio for the Roundtable bubble auto-scroll. | |
| * Fired each tick during text reveal: ratio = charCursor / totalTextLength. | |
| * Called with null when buffer completes or is disposed. | |
| */ | |
| onSpeechProgress(ratio: number | null): void; | |
| onThinking(data: { stage: string; agentId?: string } | null): void; | |
| onCueUser(fromAgentId?: string, prompt?: string): void; | |
| onDone(data: { | |
| totalActions: number; | |
| totalAgents: number; | |
| agentHadContent?: boolean; | |
| directorState?: DirectorState; | |
| }): void; | |
| onError(message: string): void; | |
| } | |
| // βββ Options βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| export interface StreamBufferOptions { | |
| /** Milliseconds between ticks. Default: 30 */ | |
| tickMs?: number; | |
| /** Characters revealed per tick. Default: 1 (β33 chars/s) */ | |
| charsPerTick?: number; | |
| /** | |
| * Fixed delay (ms) after a text segment is fully revealed before advancing | |
| * to the next item. Gives the reader a breathing pause after each speech | |
| * block. Default: 0 (no delay). | |
| */ | |
| postTextDelayMs?: number; | |
| /** | |
| * Delay (ms) after firing an action callback before advancing to the next | |
| * item. Gives action animations time to play out. Default: 0. | |
| */ | |
| actionDelayMs?: number; | |
| } | |
| // βββ StreamBuffer Class ββββββββββββββββββββββββββββββββββββββββββββββ | |
| export class StreamBuffer { | |
| // Queue | |
| private items: BufferItem[] = []; | |
| private readIndex = 0; | |
| private charCursor = 0; | |
| // Roundtable segment tracking | |
| private currentSegmentText = ''; | |
| private currentAgentId: string | null = null; | |
| // Control | |
| private _paused = false; | |
| private _disposed = false; | |
| private timer: ReturnType<typeof setInterval> | null = null; | |
| // Dwell / delay counters (in ticks) | |
| private _dwellTicksRemaining = 0; | |
| // Config | |
| private readonly tickMs: number; | |
| private readonly charsPerTick: number; | |
| private readonly postTextDelayTicks: number; | |
| private readonly actionDelayTicks: number; | |
| private readonly cb: StreamBufferCallbacks; | |
| private partCounter = 0; | |
| private _drainResolve: (() => void) | null = null; | |
| private _drainReject: ((err: Error) => void) | null = null; | |
| constructor(callbacks: StreamBufferCallbacks, options?: StreamBufferOptions) { | |
| this.cb = callbacks; | |
| this.tickMs = options?.tickMs ?? 30; | |
| this.charsPerTick = options?.charsPerTick ?? 1; | |
| this.postTextDelayTicks = Math.ceil((options?.postTextDelayMs ?? 0) / this.tickMs); | |
| this.actionDelayTicks = Math.ceil((options?.actionDelayMs ?? 0) / this.tickMs); | |
| } | |
| // βββ Push Methods ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| pushAgentStart(data: Omit<AgentStartItem, 'kind'>): void { | |
| if (this._disposed) return; | |
| this.sealLastText(); | |
| this.items.push({ kind: 'agent_start', ...data }); | |
| } | |
| pushAgentEnd(data: Omit<AgentEndItem, 'kind'>): void { | |
| if (this._disposed) return; | |
| this.sealLastText(); | |
| this.items.push({ kind: 'agent_end', ...data }); | |
| } | |
| /** | |
| * Append text for a message. | |
| * If the last queue item is an unsealed text item for the same messageId, | |
| * the delta is appended in-place. Otherwise a new text item is created. | |
| */ | |
| pushText(messageId: string, delta: string, agentId?: string): void { | |
| if (this._disposed) return; | |
| const last = this.items[this.items.length - 1]; | |
| if (last && last.kind === 'text' && last.messageId === messageId && !last.sealed) { | |
| last.text += delta; | |
| } else { | |
| this.items.push({ | |
| kind: 'text', | |
| messageId, | |
| agentId: agentId ?? this.currentAgentId ?? '', | |
| partId: `p${this.partCounter++}`, | |
| text: delta, | |
| sealed: false, | |
| }); | |
| } | |
| } | |
| /** Mark the current (last) text item as complete β no more appends expected. */ | |
| sealText(messageId: string): void { | |
| if (this._disposed) return; | |
| for (let i = this.items.length - 1; i >= 0; i--) { | |
| const item = this.items[i]; | |
| if (item.kind === 'text' && item.messageId === messageId && !item.sealed) { | |
| item.sealed = true; | |
| break; | |
| } | |
| } | |
| } | |
| pushAction(data: Omit<ActionItem, 'kind'>): void { | |
| if (this._disposed) return; | |
| this.sealLastText(); | |
| this.items.push({ kind: 'action', ...data }); | |
| } | |
| pushThinking(data: { stage: string; agentId?: string }): void { | |
| if (this._disposed) return; | |
| this.items.push({ kind: 'thinking', ...data }); | |
| } | |
| pushCueUser(data: { fromAgentId?: string; prompt?: string }): void { | |
| if (this._disposed) return; | |
| this.items.push({ kind: 'cue_user', ...data }); | |
| } | |
| pushDone(data: { | |
| totalActions: number; | |
| totalAgents: number; | |
| agentHadContent?: boolean; | |
| directorState?: DirectorState; | |
| }): void { | |
| if (this._disposed) return; | |
| this.sealLastText(); | |
| this.items.push({ kind: 'done', ...data }); | |
| } | |
| pushError(message: string): void { | |
| if (this._disposed) return; | |
| this.items.push({ kind: 'error', message }); | |
| } | |
| // βββ Control βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| /** Start the tick loop. Idempotent β calling twice is safe. */ | |
| start(): void { | |
| if (this._disposed || this.timer) return; | |
| this.timer = setInterval(() => this.tick(), this.tickMs); | |
| } | |
| /** Instantly pause β tick becomes a no-op. */ | |
| pause(): void { | |
| this._paused = true; | |
| } | |
| /** Resume from exactly where we left off. */ | |
| resume(): void { | |
| this._paused = false; | |
| } | |
| /** | |
| * Returns a Promise that resolves when the buffer has processed all items | |
| * including the final `done` item. Rejects if the buffer is disposed/shutdown | |
| * before draining completes. | |
| */ | |
| waitUntilDrained(): Promise<void> { | |
| if (this._disposed) { | |
| return Promise.reject(new Error('Buffer already disposed')); | |
| } | |
| return new Promise<void>((resolve, reject) => { | |
| this._drainResolve = resolve; | |
| this._drainReject = reject; | |
| }); | |
| } | |
| get paused(): boolean { | |
| return this._paused; | |
| } | |
| get disposed(): boolean { | |
| return this._disposed; | |
| } | |
| /** | |
| * Flush: instantly reveal everything remaining. | |
| * Used when restoring persisted sessions or force-completing. | |
| */ | |
| flush(): void { | |
| if (this._disposed) return; | |
| while (this.readIndex < this.items.length) { | |
| const item = this.items[this.readIndex]; | |
| switch (item.kind) { | |
| case 'text': | |
| this.cb.onTextReveal(item.messageId, item.partId, item.text, true); | |
| this.currentSegmentText = item.text; | |
| this.cb.onLiveSpeech(this.currentSegmentText, this.currentAgentId); | |
| this.cb.onSpeechProgress(1); | |
| break; | |
| case 'action': | |
| this.currentSegmentText = ''; | |
| this.cb.onActionReady(item.messageId, item); | |
| this.cb.onLiveSpeech(null, this.currentAgentId); | |
| break; | |
| case 'agent_start': | |
| this.currentAgentId = item.agentId; | |
| this.currentSegmentText = ''; | |
| this.cb.onThinking(null); // Agent selected β clear thinking indicator | |
| this.cb.onAgentStart(item); | |
| this.cb.onLiveSpeech(null, item.agentId); | |
| break; | |
| case 'agent_end': | |
| this.cb.onAgentEnd(item); | |
| break; | |
| case 'thinking': | |
| this.cb.onThinking(item); | |
| break; | |
| case 'cue_user': | |
| this.cb.onCueUser(item.fromAgentId, item.prompt); | |
| break; | |
| case 'done': | |
| this.cb.onLiveSpeech(null, null); | |
| this.cb.onSpeechProgress(null); | |
| this.cb.onThinking(null); | |
| this.cb.onDone(item); | |
| // Resolve drain promise | |
| this._drainResolve?.(); | |
| this._drainResolve = null; | |
| this._drainReject = null; | |
| break; | |
| case 'error': | |
| this.cb.onError(item.message); | |
| break; | |
| } | |
| this.readIndex++; | |
| this.charCursor = 0; | |
| } | |
| } | |
| /** Stop tick loop, release resources. No more callbacks after this. */ | |
| dispose(): void { | |
| if (this._disposed) return; | |
| this._disposed = true; | |
| if (this.timer) { | |
| clearInterval(this.timer); | |
| this.timer = null; | |
| } | |
| // Reject waiting drain promise | |
| this._drainReject?.(new Error('Buffer disposed')); | |
| this._drainResolve = null; | |
| this._drainReject = null; | |
| // Final cleanup signal | |
| this.cb.onLiveSpeech(null, null); | |
| this.cb.onSpeechProgress(null); | |
| } | |
| /** | |
| * Stop the tick timer and mark disposed WITHOUT firing final onLiveSpeech. | |
| * Used when replacing a buffer (e.g. resume after soft-pause) to avoid | |
| * the dispose callback clearing roundtable state via a stale microtask. | |
| */ | |
| shutdown(): void { | |
| if (this._disposed) return; | |
| this._disposed = true; | |
| if (this.timer) { | |
| clearInterval(this.timer); | |
| this.timer = null; | |
| } | |
| // Reject waiting drain promise | |
| this._drainReject?.(new Error('Buffer shutdown')); | |
| this._drainResolve = null; | |
| this._drainReject = null; | |
| } | |
| // βββ Internals βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| /** Seal the last text item in the queue (if any). */ | |
| private sealLastText(): void { | |
| for (let i = this.items.length - 1; i >= 0; i--) { | |
| const item = this.items[i]; | |
| if (item.kind === 'text' && !item.sealed) { | |
| item.sealed = true; | |
| break; | |
| } | |
| // Stop searching once we hit a non-text item | |
| if (item.kind !== 'text') break; | |
| } | |
| } | |
| private tick(): void { | |
| if (this._paused || this._disposed) return; | |
| // Honour dwell / action-delay countdown before advancing | |
| if (this._dwellTicksRemaining > 0) { | |
| this._dwellTicksRemaining--; | |
| return; | |
| } | |
| const item = this.items[this.readIndex]; | |
| if (!item) return; // Queue empty or caught up β wait | |
| switch (item.kind) { | |
| case 'text': { | |
| // Advance character cursor | |
| this.charCursor = Math.min(this.charCursor + this.charsPerTick, item.text.length); | |
| const revealed = item.text.slice(0, this.charCursor); | |
| const fullyRevealed = this.charCursor >= item.text.length; | |
| const isComplete = fullyRevealed && item.sealed; | |
| // Update chat area | |
| this.cb.onTextReveal(item.messageId, item.partId, revealed, isComplete); | |
| // Update roundtable (current segment only). | |
| // Use this.currentAgentId (set when tick processes agent_start) rather than | |
| // item.agentId β push-time race means item.agentId can carry a stale value | |
| // from the previous agent when SSE pushes outpace the tick loop. | |
| this.currentSegmentText = revealed; | |
| this.cb.onLiveSpeech(this.currentSegmentText, this.currentAgentId); | |
| this.cb.onSpeechProgress(item.text.length > 0 ? this.charCursor / item.text.length : 1); | |
| // Advance to next item if fully revealed and sealed | |
| if (isComplete) { | |
| this.readIndex++; | |
| this.charCursor = 0; | |
| // Fixed pause after text finishes β gives the reader a breathing gap | |
| // before the next action or agent turn fires. | |
| if (this.postTextDelayTicks > 0) { | |
| this._dwellTicksRemaining = this.postTextDelayTicks; | |
| return; // next tick will count down, then advanceNonText | |
| } | |
| // Process any immediately-advanceable items in the same tick | |
| // (e.g. action badges right after text) | |
| this.advanceNonText(); | |
| } | |
| // If fullyRevealed but !sealed: wait for more SSE deltas | |
| break; | |
| } | |
| // Non-text items are processed immediately | |
| case 'agent_start': | |
| this.currentAgentId = item.agentId; | |
| this.currentSegmentText = ''; | |
| this.cb.onThinking(null); // Agent selected β clear thinking indicator | |
| this.cb.onAgentStart(item); | |
| this.cb.onLiveSpeech(null, item.agentId); | |
| this.readIndex++; | |
| this.charCursor = 0; | |
| this.advanceNonText(); | |
| break; | |
| case 'agent_end': | |
| this.cb.onAgentEnd(item); | |
| this.readIndex++; | |
| this.charCursor = 0; | |
| this.advanceNonText(); | |
| break; | |
| case 'action': | |
| this.currentSegmentText = ''; | |
| this.cb.onActionReady(item.messageId, item); | |
| this.cb.onLiveSpeech(null, this.currentAgentId); | |
| this.readIndex++; | |
| this.charCursor = 0; | |
| // Delay after action so animations have time to play out | |
| if (this.actionDelayTicks > 0) { | |
| this._dwellTicksRemaining = this.actionDelayTicks; | |
| return; | |
| } | |
| this.advanceNonText(); | |
| break; | |
| case 'thinking': | |
| this.cb.onThinking(item); | |
| this.readIndex++; | |
| this.charCursor = 0; | |
| this.advanceNonText(); | |
| break; | |
| case 'cue_user': | |
| this.cb.onCueUser(item.fromAgentId, item.prompt); | |
| this.readIndex++; | |
| this.charCursor = 0; | |
| this.advanceNonText(); | |
| break; | |
| case 'done': | |
| this.cb.onLiveSpeech(null, null); | |
| this.cb.onSpeechProgress(null); | |
| this.cb.onThinking(null); | |
| this.cb.onDone(item); | |
| this.readIndex++; | |
| this.charCursor = 0; | |
| // Stop the timer β nothing more to process | |
| if (this.timer) { | |
| clearInterval(this.timer); | |
| this.timer = null; | |
| } | |
| // Resolve drain promise | |
| this._drainResolve?.(); | |
| this._drainResolve = null; | |
| this._drainReject = null; | |
| break; | |
| case 'error': | |
| this.cb.onError(item.message); | |
| this.readIndex++; | |
| this.charCursor = 0; | |
| this.advanceNonText(); | |
| break; | |
| } | |
| } | |
| /** | |
| * After processing a non-text item, keep advancing through consecutive | |
| * non-text items in the same tick. Stop when we hit a text item or | |
| * the end of the queue β the next tick will handle the text item | |
| * (so we don't skip the character-by-character reveal). | |
| * | |
| * Also stops when an action triggers a delay so its animation can play. | |
| */ | |
| private advanceNonText(): void { | |
| while (this.readIndex < this.items.length) { | |
| const next = this.items[this.readIndex]; | |
| if (next.kind === 'text') break; // Let the next tick handle text | |
| switch (next.kind) { | |
| case 'agent_start': | |
| this.currentAgentId = next.agentId; | |
| this.currentSegmentText = ''; | |
| this.cb.onThinking(null); // Agent selected β clear thinking indicator | |
| this.cb.onAgentStart(next); | |
| this.cb.onLiveSpeech(null, next.agentId); | |
| break; | |
| case 'agent_end': | |
| this.cb.onAgentEnd(next); | |
| break; | |
| case 'action': | |
| this.currentSegmentText = ''; | |
| this.cb.onActionReady(next.messageId, next); | |
| this.cb.onLiveSpeech(null, this.currentAgentId); | |
| this.readIndex++; | |
| this.charCursor = 0; | |
| // Pause after action to let animation play | |
| if (this.actionDelayTicks > 0) { | |
| this._dwellTicksRemaining = this.actionDelayTicks; | |
| return; // resume on next tick after countdown | |
| } | |
| continue; // no delay β keep advancing | |
| case 'thinking': | |
| this.cb.onThinking(next); | |
| break; | |
| case 'cue_user': | |
| this.cb.onCueUser(next.fromAgentId, next.prompt); | |
| break; | |
| case 'done': | |
| this.cb.onLiveSpeech(null, null); | |
| this.cb.onSpeechProgress(null); | |
| this.cb.onThinking(null); | |
| this.cb.onDone(next); | |
| this.readIndex++; | |
| this.charCursor = 0; | |
| if (this.timer) { | |
| clearInterval(this.timer); | |
| this.timer = null; | |
| } | |
| // Resolve drain promise | |
| this._drainResolve?.(); | |
| this._drainResolve = null; | |
| this._drainReject = null; | |
| return; // done β stop advancing | |
| case 'error': | |
| this.cb.onError(next.message); | |
| break; | |
| } | |
| this.readIndex++; | |
| this.charCursor = 0; | |
| } | |
| } | |
| } | |