arena-learning / studyArena /lib /buffer /stream-buffer.ts
NitishStark's picture
Upload folder using huggingface_hub
c20f20c verified
import type { DirectorState } from '@/lib/types/chat';
/**
* StreamBuffer β€” unified presentation pacing layer.
*
* Sits between data sources (SSE stream / PlaybackEngine) and React state.
* Events are pushed into an ordered queue; a fixed-rate tick loop reveals
* text character-by-character and fires typed callbacks so both the Chat
* area and the Roundtable bubble consume identically-paced content.
*
* Key invariants:
* - ONE source of pacing (this tick loop) β€” no double typewriter.
* - pause() is O(1) instant β€” tick returns immediately.
* - Actions fire only when the tick cursor reaches them (after preceding text).
* - Roundtable sees only the current speech segment (resets on action / agent switch).
*/
// ─── Buffer Item Types ───────────────────────────────────────────────
export interface AgentStartItem {
kind: 'agent_start';
messageId: string;
agentId: string;
agentName: string;
avatar?: string;
color?: string;
}
export interface AgentEndItem {
kind: 'agent_end';
messageId: string;
agentId: string;
}
export interface TextItem {
kind: 'text';
messageId: string;
agentId: string;
/** Unique ID for this text part β€” distinguishes multiple text items within one message (e.g. lecture). */
partId: string;
/** Growable β€” SSE deltas append here. */
text: string;
/** When true, no more text will be appended. Tick can advance past once fully revealed. */
sealed: boolean;
}
export interface ActionItem {
kind: 'action';
messageId: string;
actionId: string;
actionName: string;
params: Record<string, unknown>;
agentId: string;
}
export interface ThinkingItem {
kind: 'thinking';
stage: string;
agentId?: string;
}
export interface CueUserItem {
kind: 'cue_user';
fromAgentId?: string;
prompt?: string;
}
export interface DoneItem {
kind: 'done';
totalActions: number;
totalAgents: number;
agentHadContent?: boolean;
directorState?: DirectorState;
}
export interface ErrorItem {
kind: 'error';
message: string;
}
export type BufferItem =
| AgentStartItem
| AgentEndItem
| TextItem
| ActionItem
| ThinkingItem
| CueUserItem
| DoneItem
| ErrorItem;
// ─── Callbacks ───────────────────────────────────────────────────────
export interface StreamBufferCallbacks {
onAgentStart(data: AgentStartItem): void;
onAgentEnd(data: AgentEndItem): void;
/**
* Fired each tick while a text item is being revealed.
* @param messageId β€” which message to update
* @param partId β€” unique ID for this text part (stable across ticks)
* @param revealedText β€” text visible so far (slice of full text)
* @param isComplete β€” true when this text item is fully revealed AND sealed
*/
onTextReveal(messageId: string, partId: string, revealedText: string, isComplete: boolean): void;
/** Fired when tick reaches an action item. Callers should execute the effect + add badge. */
onActionReady(messageId: string, data: ActionItem): void;
/**
* Unified speech feed for the Roundtable bubble.
* Reports only the CURRENT segment text (resets on action / agent switch).
* Called with (null, null) when buffer completes or is disposed.
*/
onLiveSpeech(text: string | null, agentId: string | null): void;
/**
* Speech progress ratio for the Roundtable bubble auto-scroll.
* Fired each tick during text reveal: ratio = charCursor / totalTextLength.
* Called with null when buffer completes or is disposed.
*/
onSpeechProgress(ratio: number | null): void;
onThinking(data: { stage: string; agentId?: string } | null): void;
onCueUser(fromAgentId?: string, prompt?: string): void;
onDone(data: {
totalActions: number;
totalAgents: number;
agentHadContent?: boolean;
directorState?: DirectorState;
}): void;
onError(message: string): void;
}
// ─── Options ─────────────────────────────────────────────────────────
export interface StreamBufferOptions {
/** Milliseconds between ticks. Default: 30 */
tickMs?: number;
/** Characters revealed per tick. Default: 1 (β‰ˆ33 chars/s) */
charsPerTick?: number;
/**
* Fixed delay (ms) after a text segment is fully revealed before advancing
* to the next item. Gives the reader a breathing pause after each speech
* block. Default: 0 (no delay).
*/
postTextDelayMs?: number;
/**
* Delay (ms) after firing an action callback before advancing to the next
* item. Gives action animations time to play out. Default: 0.
*/
actionDelayMs?: number;
}
// ─── StreamBuffer Class ──────────────────────────────────────────────
export class StreamBuffer {
// Queue
private items: BufferItem[] = [];
private readIndex = 0;
private charCursor = 0;
// Roundtable segment tracking
private currentSegmentText = '';
private currentAgentId: string | null = null;
// Control
private _paused = false;
private _disposed = false;
private timer: ReturnType<typeof setInterval> | null = null;
// Dwell / delay counters (in ticks)
private _dwellTicksRemaining = 0;
// Config
private readonly tickMs: number;
private readonly charsPerTick: number;
private readonly postTextDelayTicks: number;
private readonly actionDelayTicks: number;
private readonly cb: StreamBufferCallbacks;
private partCounter = 0;
private _drainResolve: (() => void) | null = null;
private _drainReject: ((err: Error) => void) | null = null;
constructor(callbacks: StreamBufferCallbacks, options?: StreamBufferOptions) {
this.cb = callbacks;
this.tickMs = options?.tickMs ?? 30;
this.charsPerTick = options?.charsPerTick ?? 1;
this.postTextDelayTicks = Math.ceil((options?.postTextDelayMs ?? 0) / this.tickMs);
this.actionDelayTicks = Math.ceil((options?.actionDelayMs ?? 0) / this.tickMs);
}
// ─── Push Methods ────────────────────────────────────────────────
pushAgentStart(data: Omit<AgentStartItem, 'kind'>): void {
if (this._disposed) return;
this.sealLastText();
this.items.push({ kind: 'agent_start', ...data });
}
pushAgentEnd(data: Omit<AgentEndItem, 'kind'>): void {
if (this._disposed) return;
this.sealLastText();
this.items.push({ kind: 'agent_end', ...data });
}
/**
* Append text for a message.
* If the last queue item is an unsealed text item for the same messageId,
* the delta is appended in-place. Otherwise a new text item is created.
*/
pushText(messageId: string, delta: string, agentId?: string): void {
if (this._disposed) return;
const last = this.items[this.items.length - 1];
if (last && last.kind === 'text' && last.messageId === messageId && !last.sealed) {
last.text += delta;
} else {
this.items.push({
kind: 'text',
messageId,
agentId: agentId ?? this.currentAgentId ?? '',
partId: `p${this.partCounter++}`,
text: delta,
sealed: false,
});
}
}
/** Mark the current (last) text item as complete β€” no more appends expected. */
sealText(messageId: string): void {
if (this._disposed) return;
for (let i = this.items.length - 1; i >= 0; i--) {
const item = this.items[i];
if (item.kind === 'text' && item.messageId === messageId && !item.sealed) {
item.sealed = true;
break;
}
}
}
pushAction(data: Omit<ActionItem, 'kind'>): void {
if (this._disposed) return;
this.sealLastText();
this.items.push({ kind: 'action', ...data });
}
pushThinking(data: { stage: string; agentId?: string }): void {
if (this._disposed) return;
this.items.push({ kind: 'thinking', ...data });
}
pushCueUser(data: { fromAgentId?: string; prompt?: string }): void {
if (this._disposed) return;
this.items.push({ kind: 'cue_user', ...data });
}
pushDone(data: {
totalActions: number;
totalAgents: number;
agentHadContent?: boolean;
directorState?: DirectorState;
}): void {
if (this._disposed) return;
this.sealLastText();
this.items.push({ kind: 'done', ...data });
}
pushError(message: string): void {
if (this._disposed) return;
this.items.push({ kind: 'error', message });
}
// ─── Control ─────────────────────────────────────────────────────
/** Start the tick loop. Idempotent β€” calling twice is safe. */
start(): void {
if (this._disposed || this.timer) return;
this.timer = setInterval(() => this.tick(), this.tickMs);
}
/** Instantly pause β€” tick becomes a no-op. */
pause(): void {
this._paused = true;
}
/** Resume from exactly where we left off. */
resume(): void {
this._paused = false;
}
/**
* Returns a Promise that resolves when the buffer has processed all items
* including the final `done` item. Rejects if the buffer is disposed/shutdown
* before draining completes.
*/
waitUntilDrained(): Promise<void> {
if (this._disposed) {
return Promise.reject(new Error('Buffer already disposed'));
}
return new Promise<void>((resolve, reject) => {
this._drainResolve = resolve;
this._drainReject = reject;
});
}
get paused(): boolean {
return this._paused;
}
get disposed(): boolean {
return this._disposed;
}
/**
* Flush: instantly reveal everything remaining.
* Used when restoring persisted sessions or force-completing.
*/
flush(): void {
if (this._disposed) return;
while (this.readIndex < this.items.length) {
const item = this.items[this.readIndex];
switch (item.kind) {
case 'text':
this.cb.onTextReveal(item.messageId, item.partId, item.text, true);
this.currentSegmentText = item.text;
this.cb.onLiveSpeech(this.currentSegmentText, this.currentAgentId);
this.cb.onSpeechProgress(1);
break;
case 'action':
this.currentSegmentText = '';
this.cb.onActionReady(item.messageId, item);
this.cb.onLiveSpeech(null, this.currentAgentId);
break;
case 'agent_start':
this.currentAgentId = item.agentId;
this.currentSegmentText = '';
this.cb.onThinking(null); // Agent selected β€” clear thinking indicator
this.cb.onAgentStart(item);
this.cb.onLiveSpeech(null, item.agentId);
break;
case 'agent_end':
this.cb.onAgentEnd(item);
break;
case 'thinking':
this.cb.onThinking(item);
break;
case 'cue_user':
this.cb.onCueUser(item.fromAgentId, item.prompt);
break;
case 'done':
this.cb.onLiveSpeech(null, null);
this.cb.onSpeechProgress(null);
this.cb.onThinking(null);
this.cb.onDone(item);
// Resolve drain promise
this._drainResolve?.();
this._drainResolve = null;
this._drainReject = null;
break;
case 'error':
this.cb.onError(item.message);
break;
}
this.readIndex++;
this.charCursor = 0;
}
}
/** Stop tick loop, release resources. No more callbacks after this. */
dispose(): void {
if (this._disposed) return;
this._disposed = true;
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
// Reject waiting drain promise
this._drainReject?.(new Error('Buffer disposed'));
this._drainResolve = null;
this._drainReject = null;
// Final cleanup signal
this.cb.onLiveSpeech(null, null);
this.cb.onSpeechProgress(null);
}
/**
* Stop the tick timer and mark disposed WITHOUT firing final onLiveSpeech.
* Used when replacing a buffer (e.g. resume after soft-pause) to avoid
* the dispose callback clearing roundtable state via a stale microtask.
*/
shutdown(): void {
if (this._disposed) return;
this._disposed = true;
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
// Reject waiting drain promise
this._drainReject?.(new Error('Buffer shutdown'));
this._drainResolve = null;
this._drainReject = null;
}
// ─── Internals ───────────────────────────────────────────────────
/** Seal the last text item in the queue (if any). */
private sealLastText(): void {
for (let i = this.items.length - 1; i >= 0; i--) {
const item = this.items[i];
if (item.kind === 'text' && !item.sealed) {
item.sealed = true;
break;
}
// Stop searching once we hit a non-text item
if (item.kind !== 'text') break;
}
}
private tick(): void {
if (this._paused || this._disposed) return;
// Honour dwell / action-delay countdown before advancing
if (this._dwellTicksRemaining > 0) {
this._dwellTicksRemaining--;
return;
}
const item = this.items[this.readIndex];
if (!item) return; // Queue empty or caught up β€” wait
switch (item.kind) {
case 'text': {
// Advance character cursor
this.charCursor = Math.min(this.charCursor + this.charsPerTick, item.text.length);
const revealed = item.text.slice(0, this.charCursor);
const fullyRevealed = this.charCursor >= item.text.length;
const isComplete = fullyRevealed && item.sealed;
// Update chat area
this.cb.onTextReveal(item.messageId, item.partId, revealed, isComplete);
// Update roundtable (current segment only).
// Use this.currentAgentId (set when tick processes agent_start) rather than
// item.agentId β€” push-time race means item.agentId can carry a stale value
// from the previous agent when SSE pushes outpace the tick loop.
this.currentSegmentText = revealed;
this.cb.onLiveSpeech(this.currentSegmentText, this.currentAgentId);
this.cb.onSpeechProgress(item.text.length > 0 ? this.charCursor / item.text.length : 1);
// Advance to next item if fully revealed and sealed
if (isComplete) {
this.readIndex++;
this.charCursor = 0;
// Fixed pause after text finishes β€” gives the reader a breathing gap
// before the next action or agent turn fires.
if (this.postTextDelayTicks > 0) {
this._dwellTicksRemaining = this.postTextDelayTicks;
return; // next tick will count down, then advanceNonText
}
// Process any immediately-advanceable items in the same tick
// (e.g. action badges right after text)
this.advanceNonText();
}
// If fullyRevealed but !sealed: wait for more SSE deltas
break;
}
// Non-text items are processed immediately
case 'agent_start':
this.currentAgentId = item.agentId;
this.currentSegmentText = '';
this.cb.onThinking(null); // Agent selected β€” clear thinking indicator
this.cb.onAgentStart(item);
this.cb.onLiveSpeech(null, item.agentId);
this.readIndex++;
this.charCursor = 0;
this.advanceNonText();
break;
case 'agent_end':
this.cb.onAgentEnd(item);
this.readIndex++;
this.charCursor = 0;
this.advanceNonText();
break;
case 'action':
this.currentSegmentText = '';
this.cb.onActionReady(item.messageId, item);
this.cb.onLiveSpeech(null, this.currentAgentId);
this.readIndex++;
this.charCursor = 0;
// Delay after action so animations have time to play out
if (this.actionDelayTicks > 0) {
this._dwellTicksRemaining = this.actionDelayTicks;
return;
}
this.advanceNonText();
break;
case 'thinking':
this.cb.onThinking(item);
this.readIndex++;
this.charCursor = 0;
this.advanceNonText();
break;
case 'cue_user':
this.cb.onCueUser(item.fromAgentId, item.prompt);
this.readIndex++;
this.charCursor = 0;
this.advanceNonText();
break;
case 'done':
this.cb.onLiveSpeech(null, null);
this.cb.onSpeechProgress(null);
this.cb.onThinking(null);
this.cb.onDone(item);
this.readIndex++;
this.charCursor = 0;
// Stop the timer β€” nothing more to process
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
// Resolve drain promise
this._drainResolve?.();
this._drainResolve = null;
this._drainReject = null;
break;
case 'error':
this.cb.onError(item.message);
this.readIndex++;
this.charCursor = 0;
this.advanceNonText();
break;
}
}
/**
* After processing a non-text item, keep advancing through consecutive
* non-text items in the same tick. Stop when we hit a text item or
* the end of the queue β€” the next tick will handle the text item
* (so we don't skip the character-by-character reveal).
*
* Also stops when an action triggers a delay so its animation can play.
*/
private advanceNonText(): void {
while (this.readIndex < this.items.length) {
const next = this.items[this.readIndex];
if (next.kind === 'text') break; // Let the next tick handle text
switch (next.kind) {
case 'agent_start':
this.currentAgentId = next.agentId;
this.currentSegmentText = '';
this.cb.onThinking(null); // Agent selected β€” clear thinking indicator
this.cb.onAgentStart(next);
this.cb.onLiveSpeech(null, next.agentId);
break;
case 'agent_end':
this.cb.onAgentEnd(next);
break;
case 'action':
this.currentSegmentText = '';
this.cb.onActionReady(next.messageId, next);
this.cb.onLiveSpeech(null, this.currentAgentId);
this.readIndex++;
this.charCursor = 0;
// Pause after action to let animation play
if (this.actionDelayTicks > 0) {
this._dwellTicksRemaining = this.actionDelayTicks;
return; // resume on next tick after countdown
}
continue; // no delay β€” keep advancing
case 'thinking':
this.cb.onThinking(next);
break;
case 'cue_user':
this.cb.onCueUser(next.fromAgentId, next.prompt);
break;
case 'done':
this.cb.onLiveSpeech(null, null);
this.cb.onSpeechProgress(null);
this.cb.onThinking(null);
this.cb.onDone(next);
this.readIndex++;
this.charCursor = 0;
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
// Resolve drain promise
this._drainResolve?.();
this._drainResolve = null;
this._drainReject = null;
return; // done β€” stop advancing
case 'error':
this.cb.onError(next.message);
break;
}
this.readIndex++;
this.charCursor = 0;
}
}
}