/** * Keet v3.0 - Token Stream Transcriber * * Uses LCSPTFAMerger for overlapping window transcription with * token-level merging (NeMo LCS + PTFA enhancements). */ import type { TranscriptionResult, TranscriptionWord } from './types'; import { ModelManager } from './ModelManager'; import { LCSPTFAMerger } from 'parakeet.js'; export interface TokenStreamConfig { /** Window duration in seconds (default 5.0) */ windowDuration?: number; /** Overlap duration in seconds (default 1.5) */ overlapDuration?: number; /** Sample rate (default 16000) */ sampleRate?: number; /** Enable debug logging */ debug?: boolean; /** Min sequence anchor length for LCS (default 3) */ sequenceAnchorLength?: number; /** Time tolerance for frame alignment (default 0.15s) */ timeTolerance?: number; /** Decoder frame stride (default 1). Set to 2 to halve decoder steps at cost of coarser timestamps */ frameStride?: number; /** Request log probabilities per token for merger arbitration (default false). * Adds per-frame softmax cost. Disable unless using logProb-based merging. */ returnLogProbs?: boolean; } export interface TokenStreamCallbacks { onConfirmedUpdate?: (text: string, tokens: any[]) => void; onPendingUpdate?: (text: string, tokens: any[]) => void; onMergeInfo?: (info: { lcsLength: number; anchorValid: boolean; anchorTokens?: string[] }) => void; onError?: (error: Error) => void; } export interface TokenStreamResult { /** Confirmed (stable) transcript text */ confirmedText: string; /** Pending (tentative) transcript text */ pendingText: string; /** Full transcript (confirmed + pending) */ fullText: string; /** LCS match length from last merge */ lcsLength: number; /** Whether anchor was validated */ anchorValid: boolean; /** Total chunks processed */ chunkCount: number; /** Tokens used for anchor match (optional) */ anchorTokens?: string[]; } /** * TokenStreamTranscriber - Windowed streaming transcription with LCS+PTFA merging. * * This class implements the v3 streaming architecture: * - Short overlapping windows (5-7s) * - Token-level merging using LCSPTFAMerger * - Frame-aligned verification * - Vignetting and logProb arbitration */ export class TokenStreamTranscriber { private _modelManager: ModelManager; private _merger: any = null; // LCSPTFAMerger instance private _config: Required; private _callbacks: TokenStreamCallbacks; // State private _currentTimestamp: number = 0; private _chunkCount: number = 0; private _isProcessing: boolean = false; private _pendingChunk: { audio: Float32Array; startTime?: number } | null = null; private _tokenizer: any = null; constructor( modelManager: ModelManager, config: TokenStreamConfig = {}, callbacks: TokenStreamCallbacks = {} ) { this._modelManager = modelManager; this._config = { windowDuration: config.windowDuration ?? 5.0, overlapDuration: config.overlapDuration ?? 1.5, sampleRate: config.sampleRate ?? 16000, debug: config.debug ?? false, sequenceAnchorLength: config.sequenceAnchorLength ?? 3, timeTolerance: config.timeTolerance ?? 0.15, frameStride: config.frameStride ?? 1, returnLogProbs: config.returnLogProbs ?? false, }; this._callbacks = callbacks; } /** * Initialize the transcriber with LCSPTFAMerger. * Must be called after model is loaded. */ async initialize(): Promise { const model = this._modelManager.getModel(); if (!model) { throw new Error('Model not loaded. Call ModelManager.loadModel() first.'); } // Get tokenizer from model this._tokenizer = model.tokenizer; if (!LCSPTFAMerger) { throw new Error('LCSPTFAMerger not found in parakeet.js. Make sure you have the latest version.'); } this._merger = new LCSPTFAMerger({ frameTimeStride: model.getFrameTimeStride?.() ?? 0.08, timeTolerance: this._config.timeTolerance, sequenceAnchorLength: this._config.sequenceAnchorLength, }); if (this._config.debug) { console.log('[TokenStreamTranscriber] Initialized with LCSPTFAMerger'); } } /** * Update configuration. * Note: Changing window/overlap requires re-connecting to AudioEngine (or reset). */ updateConfig(config: TokenStreamConfig): void { this._config = { ...this._config, ...config, // Ensure sensible defaults if partial config provided windowDuration: config.windowDuration ?? this._config.windowDuration, overlapDuration: config.overlapDuration ?? this._config.overlapDuration, }; if (this._config.debug) { console.log('[TokenStreamTranscriber] Config updated:', this._config); } } /** * Process an audio chunk through the windowed streaming pipeline. * * @param audio - Float32Array of audio samples (16kHz mono PCM) * @param startTime - The absolute start time of this chunk in seconds * @returns TokenStreamResult with confirmed/pending text */ async processChunk(audio: Float32Array, startTime?: number): Promise { if (!this._merger) { throw new Error('TokenStreamTranscriber not initialized. Call initialize() first.'); } if (this._isProcessing) { // Queue the latest chunk instead of dropping it entirely. // Only the most recent pending chunk is kept (older ones are stale). this._pendingChunk = { audio, startTime }; if (this._config.debug) { console.log('[TokenStreamTranscriber] Queued chunk (processing busy)'); } return this._getEmptyResult(); } this._isProcessing = true; try { const result = await this._processChunkInternal(audio, startTime); // After finishing, check if a newer chunk was queued while we were busy if (this._pendingChunk) { const pending = this._pendingChunk; this._pendingChunk = null; // Process the queued chunk immediately (non-blocking schedule) // We don't await here to avoid blocking the caller, but we do // need to keep _isProcessing true until it completes. this._isProcessing = false; // Release lock temporarily // Use queueMicrotask to process next chunk without starving the event loop queueMicrotask(() => { this.processChunk(pending.audio, pending.startTime); }); } return result; } catch (error) { console.error('[TokenStreamTranscriber] Process chunk error:', error); this._callbacks.onError?.(error as Error); throw error; } finally { this._isProcessing = false; } } /** * Internal processing logic extracted for queue support. */ private async _processChunkInternal(audio: Float32Array, startTime?: number): Promise { const model = this._modelManager.getModel(); if (!model) { throw new Error('Model not available'); } // Use provided startTime or fall back to internal tracking const chunkStartTime = startTime !== undefined ? startTime : this._currentTimestamp; // Calculate actual overlap based on previous chunk end let actualOverlap = 0; if (this._chunkCount > 0) { // If we use explicit startTime, we can calculate overlap precisely: // overlap = previousWindowEnd - currentWindowStart const previousWindowEnd = this._currentTimestamp + (audio.length / this._config.sampleRate); actualOverlap = Math.max(0, previousWindowEnd - chunkStartTime); } // Transcribe with all metadata needed for merging. // Use incremental cache to skip re-decoding the overlap prefix. // On subsequent chunks, the decoder resumes from cached state at // the boundary of the overlap, saving ~80% of decode frames. // // prefixSamples enables incremental mel caching (JS preprocessor): // the overlap portion's mel features are reused from the previous call, // saving ~60-70% of preprocessing time for typical 70% overlap. const overlapSeconds = this._chunkCount > 0 ? actualOverlap : 0; const overlapSamples = Math.floor(overlapSeconds * this._config.sampleRate); const result = await model.transcribe(audio, this._config.sampleRate, { returnTimestamps: true, returnTokenIds: true, returnFrameIndices: true, returnLogProbs: this._config.returnLogProbs, timeOffset: chunkStartTime, frameStride: this._config.frameStride, prefixSamples: overlapSamples > 0 ? overlapSamples : 0, incremental: overlapSeconds > 0 ? { cacheKey: 'streaming', prefixSeconds: overlapSeconds, } : undefined, }); // Merge using LCSPTFAMerger // We use the provided overlap or calculated one const mergeResult = this._merger!.processChunk( result, chunkStartTime, this._chunkCount > 0 ? (startTime !== undefined ? actualOverlap : this._config.overlapDuration) : 0 ); // Update state for next chunk this._currentTimestamp = chunkStartTime; this._chunkCount++; // Get formatted text from the merger (handles SentencePiece spaces correctly) const texts = this._merger!.getText(this._tokenizer); const confirmedText = texts.confirmed; const pendingText = texts.pending; const fullText = texts.full; // Notify callbacks this._callbacks.onConfirmedUpdate?.(confirmedText, mergeResult.confirmed); this._callbacks.onPendingUpdate?.(pendingText, mergeResult.pending); this._callbacks.onMergeInfo?.({ lcsLength: mergeResult.lcsLength, anchorValid: mergeResult.anchorValid, anchorTokens: mergeResult.anchorTokens }); // Always log preprocessing info from model metrics const m = result.metrics; if (m) { console.log(`[TokenStreamTranscriber] Chunk #${this._chunkCount + 1}: preprocessor=${m.preprocessor_backend || 'unknown'}, preprocess=${m.preprocess_ms}ms, encode=${m.encode_ms}ms, decode=${m.decode_ms}ms, total=${m.total_ms}ms${m.mel_cache ? `, mel_cache: ${m.mel_cache.cached_frames} cached / ${m.mel_cache.new_frames} new` : ''}`); } if (this._config.debug) { console.log(`[TokenStreamTranscriber] Chunk ${this._chunkCount}: start=${chunkStartTime.toFixed(2)}s, overlap=${actualOverlap.toFixed(2)}s, LCS=${mergeResult.lcsLength}, anchor=${mergeResult.anchorValid}`); } return { confirmedText, pendingText, fullText, lcsLength: mergeResult.lcsLength, anchorValid: mergeResult.anchorValid, chunkCount: this._chunkCount, anchorTokens: mergeResult.anchorTokens, }; } /** * Process a chunk using pre-computed mel features from the mel worker. * Bypasses the preprocessor entirely — encoder + decoder only. * * @param features - Normalized mel features [melBins, T] from mel worker * @param T - Number of time frames * @param melBins - Number of mel frequency bins * @param startTime - Start time of this audio window in seconds * @param overlapSeconds - Overlap with previous window in seconds (for decoder cache) */ async processChunkWithFeatures( features: Float32Array, T: number, melBins: number, startTime?: number, overlapSeconds?: number, ): Promise { if (!this._merger) { throw new Error('TokenStreamTranscriber not initialized. Call initialize() first.'); } if (this._isProcessing) { if (this._config.debug) { console.log('[TokenStreamTranscriber] Queued features chunk (processing busy)'); } return this._getEmptyResult(); } this._isProcessing = true; try { const result = await this._processChunkWithFeaturesInternal( features, T, melBins, startTime, overlapSeconds, ); return result; } catch (error) { console.error('[TokenStreamTranscriber] Process features chunk error:', error); this._callbacks.onError?.(error as Error); throw error; } finally { this._isProcessing = false; } } private async _processChunkWithFeaturesInternal( features: Float32Array, T: number, melBins: number, startTime?: number, overlapSeconds?: number, ): Promise { const model = this._modelManager.getModel(); if (!model) { throw new Error('Model not available'); } const chunkStartTime = startTime !== undefined ? startTime : this._currentTimestamp; // Calculate overlap for decoder cache let actualOverlap = 0; if (this._chunkCount > 0 && overlapSeconds !== undefined) { actualOverlap = overlapSeconds; } else if (this._chunkCount > 0) { // Estimate from config actualOverlap = this._config.overlapDuration; } const overlapSec = this._chunkCount > 0 ? actualOverlap : 0; // Call transcribe with pre-computed features (bypasses preprocessor) const result = await model.transcribe(null, this._config.sampleRate, { returnTimestamps: true, returnTokenIds: true, returnFrameIndices: true, returnLogProbs: this._config.returnLogProbs, timeOffset: chunkStartTime, frameStride: this._config.frameStride, precomputedFeatures: { features, T, melBins }, incremental: overlapSec > 0 ? { cacheKey: 'streaming', prefixSeconds: overlapSec, } : undefined, }); // Merge using LCSPTFAMerger (same as audio path) const mergeResult = this._merger!.processChunk( result, chunkStartTime, this._chunkCount > 0 ? actualOverlap : 0 ); // Update state for next chunk this._currentTimestamp = chunkStartTime; this._chunkCount++; // Get formatted text const texts = this._merger!.getText(this._tokenizer); const confirmedText = texts.confirmed; const pendingText = texts.pending; const fullText = texts.full; // Notify callbacks this._callbacks.onConfirmedUpdate?.(confirmedText, mergeResult.confirmed); this._callbacks.onPendingUpdate?.(pendingText, mergeResult.pending); this._callbacks.onMergeInfo?.({ lcsLength: mergeResult.lcsLength, anchorValid: mergeResult.anchorValid, anchorTokens: mergeResult.anchorTokens }); // Always log preprocessing info from model metrics const m = result.metrics; if (m) { console.log(`[TokenStreamTranscriber] Features chunk #${this._chunkCount + 1}: preprocessor=${m.preprocessor_backend || 'unknown'}, preprocess=${m.preprocess_ms}ms, encode=${m.encode_ms}ms, decode=${m.decode_ms}ms, total=${m.total_ms}ms, T=${T}`); } if (this._config.debug) { console.log(`[TokenStreamTranscriber] Features chunk ${this._chunkCount}: start=${chunkStartTime.toFixed(2)}s, overlap=${actualOverlap.toFixed(2)}s, T=${T}, LCS=${mergeResult.lcsLength}`); } return { confirmedText, pendingText, fullText, lcsLength: mergeResult.lcsLength, anchorValid: mergeResult.anchorValid, chunkCount: this._chunkCount, anchorTokens: mergeResult.anchorTokens, }; } /** * Finalize and get complete transcript. * Promotes all pending tokens to confirmed. */ finalize(): TokenStreamResult { if (!this._merger) { return this._getEmptyResult(); } const texts = this._merger.getText(this._tokenizer); return { confirmedText: texts.full, // All text is now confirmed pendingText: '', fullText: texts.full, lcsLength: 0, anchorValid: true, chunkCount: this._chunkCount, }; } /** * Reset for a new transcription session. */ reset(): void { if (this._merger) { this._merger.reset(); } // Reset mel cache in the model's JS preprocessor (if available) const model = this._modelManager.getModel(); if (model?.resetMelCache) { model.resetMelCache(); } this._currentTimestamp = 0; this._chunkCount = 0; this._isProcessing = false; this._pendingChunk = null; } /** * Get current state for debugging. */ getState(): any { return { chunkCount: this._chunkCount, currentTimestamp: this._currentTimestamp, isProcessing: this._isProcessing, mergerState: this._merger?.getState() || null, }; } /** * Check if currently processing. */ isProcessing(): boolean { return this._isProcessing; } private _getEmptyResult(): TokenStreamResult { return { confirmedText: '', pendingText: '', fullText: '', lcsLength: 0, anchorValid: false, chunkCount: this._chunkCount, }; } /** * Connect to AudioEngine for automatic window-based streaming. * * Usage: * ``` * const unsubscribe = transcriber.connectToAudioEngine(audioEngine); * // ... later * unsubscribe(); * ``` * * @param audioEngine - AudioEngine instance with onWindowChunk support * @returns Unsubscribe function */ connectToAudioEngine(audioEngine: any): () => void { if (!audioEngine.onWindowChunk) { throw new Error('AudioEngine does not support onWindowChunk. Update to v3.0.'); } return audioEngine.onWindowChunk( this._config.windowDuration, this._config.overlapDuration, this._config.windowDuration - this._config.overlapDuration, // Default trigger interval async (audio: Float32Array, startTime: number) => { try { await this.processChunk(audio, startTime); } catch (e) { console.error('[TokenStreamTranscriber] Window processing error:', e); } } ); } /** * Get configuration. */ getConfig(): Required { return { ...this._config }; } }