keet-streaming / src /lib /transcription /TokenStreamTranscriber.ts
ysdede's picture
fix: stabilize hf space worker model loading
c783252
/**
* Keet v3.0 - Token Stream Transcriber
*
* Uses LCSPTFAMerger for overlapping window transcription with
* token-level merging (NeMo LCS + PTFA enhancements).
*/
import type {
TranscriptionResult,
TranscriptionWord
} from './types';
import { ModelManager } from './ModelManager';
import { LCSPTFAMerger } from 'parakeet.js';
export interface TokenStreamConfig {
/** Window duration in seconds (default 5.0) */
windowDuration?: number;
/** Overlap duration in seconds (default 1.5) */
overlapDuration?: number;
/** Sample rate (default 16000) */
sampleRate?: number;
/** Enable debug logging */
debug?: boolean;
/** Min sequence anchor length for LCS (default 3) */
sequenceAnchorLength?: number;
/** Time tolerance for frame alignment (default 0.15s) */
timeTolerance?: number;
/** Decoder frame stride (default 1). Set to 2 to halve decoder steps at cost of coarser timestamps */
frameStride?: number;
/** Request log probabilities per token for merger arbitration (default false).
* Adds per-frame softmax cost. Disable unless using logProb-based merging. */
returnLogProbs?: boolean;
}
export interface TokenStreamCallbacks {
onConfirmedUpdate?: (text: string, tokens: any[]) => void;
onPendingUpdate?: (text: string, tokens: any[]) => void;
onMergeInfo?: (info: { lcsLength: number; anchorValid: boolean; anchorTokens?: string[] }) => void;
onError?: (error: Error) => void;
}
export interface TokenStreamResult {
/** Confirmed (stable) transcript text */
confirmedText: string;
/** Pending (tentative) transcript text */
pendingText: string;
/** Full transcript (confirmed + pending) */
fullText: string;
/** LCS match length from last merge */
lcsLength: number;
/** Whether anchor was validated */
anchorValid: boolean;
/** Total chunks processed */
chunkCount: number;
/** Tokens used for anchor match (optional) */
anchorTokens?: string[];
}
/**
* TokenStreamTranscriber - Windowed streaming transcription with LCS+PTFA merging.
*
* This class implements the v3 streaming architecture:
* - Short overlapping windows (5-7s)
* - Token-level merging using LCSPTFAMerger
* - Frame-aligned verification
* - Vignetting and logProb arbitration
*/
export class TokenStreamTranscriber {
private _modelManager: ModelManager;
private _merger: any = null; // LCSPTFAMerger instance
private _config: Required<TokenStreamConfig>;
private _callbacks: TokenStreamCallbacks;
// State
private _currentTimestamp: number = 0;
private _chunkCount: number = 0;
private _isProcessing: boolean = false;
private _pendingChunk: { audio: Float32Array; startTime?: number } | null = null;
private _tokenizer: any = null;
constructor(
modelManager: ModelManager,
config: TokenStreamConfig = {},
callbacks: TokenStreamCallbacks = {}
) {
this._modelManager = modelManager;
this._config = {
windowDuration: config.windowDuration ?? 5.0,
overlapDuration: config.overlapDuration ?? 1.5,
sampleRate: config.sampleRate ?? 16000,
debug: config.debug ?? false,
sequenceAnchorLength: config.sequenceAnchorLength ?? 3,
timeTolerance: config.timeTolerance ?? 0.15,
frameStride: config.frameStride ?? 1,
returnLogProbs: config.returnLogProbs ?? false,
};
this._callbacks = callbacks;
}
/**
* Initialize the transcriber with LCSPTFAMerger.
* Must be called after model is loaded.
*/
async initialize(): Promise<void> {
const model = this._modelManager.getModel();
if (!model) {
throw new Error('Model not loaded. Call ModelManager.loadModel() first.');
}
// Get tokenizer from model
this._tokenizer = model.tokenizer;
if (!LCSPTFAMerger) {
throw new Error('LCSPTFAMerger not found in parakeet.js. Make sure you have the latest version.');
}
this._merger = new LCSPTFAMerger({
frameTimeStride: model.getFrameTimeStride?.() ?? 0.08,
timeTolerance: this._config.timeTolerance,
sequenceAnchorLength: this._config.sequenceAnchorLength,
});
if (this._config.debug) {
console.log('[TokenStreamTranscriber] Initialized with LCSPTFAMerger');
}
}
/**
* Update configuration.
* Note: Changing window/overlap requires re-connecting to AudioEngine (or reset).
*/
updateConfig(config: TokenStreamConfig): void {
this._config = {
...this._config,
...config,
// Ensure sensible defaults if partial config provided
windowDuration: config.windowDuration ?? this._config.windowDuration,
overlapDuration: config.overlapDuration ?? this._config.overlapDuration,
};
if (this._config.debug) {
console.log('[TokenStreamTranscriber] Config updated:', this._config);
}
}
/**
* Process an audio chunk through the windowed streaming pipeline.
*
* @param audio - Float32Array of audio samples (16kHz mono PCM)
* @param startTime - The absolute start time of this chunk in seconds
* @returns TokenStreamResult with confirmed/pending text
*/
async processChunk(audio: Float32Array, startTime?: number): Promise<TokenStreamResult> {
if (!this._merger) {
throw new Error('TokenStreamTranscriber not initialized. Call initialize() first.');
}
if (this._isProcessing) {
// Queue the latest chunk instead of dropping it entirely.
// Only the most recent pending chunk is kept (older ones are stale).
this._pendingChunk = { audio, startTime };
if (this._config.debug) {
console.log('[TokenStreamTranscriber] Queued chunk (processing busy)');
}
return this._getEmptyResult();
}
this._isProcessing = true;
try {
const result = await this._processChunkInternal(audio, startTime);
// After finishing, check if a newer chunk was queued while we were busy
if (this._pendingChunk) {
const pending = this._pendingChunk;
this._pendingChunk = null;
// Process the queued chunk immediately (non-blocking schedule)
// We don't await here to avoid blocking the caller, but we do
// need to keep _isProcessing true until it completes.
this._isProcessing = false; // Release lock temporarily
// Use queueMicrotask to process next chunk without starving the event loop
queueMicrotask(() => {
this.processChunk(pending.audio, pending.startTime);
});
}
return result;
} catch (error) {
console.error('[TokenStreamTranscriber] Process chunk error:', error);
this._callbacks.onError?.(error as Error);
throw error;
} finally {
this._isProcessing = false;
}
}
/**
* Internal processing logic extracted for queue support.
*/
private async _processChunkInternal(audio: Float32Array, startTime?: number): Promise<TokenStreamResult> {
const model = this._modelManager.getModel();
if (!model) {
throw new Error('Model not available');
}
// Use provided startTime or fall back to internal tracking
const chunkStartTime = startTime !== undefined ? startTime : this._currentTimestamp;
// Calculate actual overlap based on previous chunk end
let actualOverlap = 0;
if (this._chunkCount > 0) {
// If we use explicit startTime, we can calculate overlap precisely:
// overlap = previousWindowEnd - currentWindowStart
const previousWindowEnd = this._currentTimestamp + (audio.length / this._config.sampleRate);
actualOverlap = Math.max(0, previousWindowEnd - chunkStartTime);
}
// Transcribe with all metadata needed for merging.
// Use incremental cache to skip re-decoding the overlap prefix.
// On subsequent chunks, the decoder resumes from cached state at
// the boundary of the overlap, saving ~80% of decode frames.
//
// prefixSamples enables incremental mel caching (JS preprocessor):
// the overlap portion's mel features are reused from the previous call,
// saving ~60-70% of preprocessing time for typical 70% overlap.
const overlapSeconds = this._chunkCount > 0 ? actualOverlap : 0;
const overlapSamples = Math.floor(overlapSeconds * this._config.sampleRate);
const result = await model.transcribe(audio, this._config.sampleRate, {
returnTimestamps: true,
returnTokenIds: true,
returnFrameIndices: true,
returnLogProbs: this._config.returnLogProbs,
timeOffset: chunkStartTime,
frameStride: this._config.frameStride,
prefixSamples: overlapSamples > 0 ? overlapSamples : 0,
incremental: overlapSeconds > 0 ? {
cacheKey: 'streaming',
prefixSeconds: overlapSeconds,
} : undefined,
});
// Merge using LCSPTFAMerger
// We use the provided overlap or calculated one
const mergeResult = this._merger!.processChunk(
result,
chunkStartTime,
this._chunkCount > 0 ? (startTime !== undefined ? actualOverlap : this._config.overlapDuration) : 0
);
// Update state for next chunk
this._currentTimestamp = chunkStartTime;
this._chunkCount++;
// Get formatted text from the merger (handles SentencePiece spaces correctly)
const texts = this._merger!.getText(this._tokenizer);
const confirmedText = texts.confirmed;
const pendingText = texts.pending;
const fullText = texts.full;
// Notify callbacks
this._callbacks.onConfirmedUpdate?.(confirmedText, mergeResult.confirmed);
this._callbacks.onPendingUpdate?.(pendingText, mergeResult.pending);
this._callbacks.onMergeInfo?.({
lcsLength: mergeResult.lcsLength,
anchorValid: mergeResult.anchorValid,
anchorTokens: mergeResult.anchorTokens
});
// Always log preprocessing info from model metrics
const m = result.metrics;
if (m) {
console.log(`[TokenStreamTranscriber] Chunk #${this._chunkCount + 1}: preprocessor=${m.preprocessor_backend || 'unknown'}, preprocess=${m.preprocess_ms}ms, encode=${m.encode_ms}ms, decode=${m.decode_ms}ms, total=${m.total_ms}ms${m.mel_cache ? `, mel_cache: ${m.mel_cache.cached_frames} cached / ${m.mel_cache.new_frames} new` : ''}`);
}
if (this._config.debug) {
console.log(`[TokenStreamTranscriber] Chunk ${this._chunkCount}: start=${chunkStartTime.toFixed(2)}s, overlap=${actualOverlap.toFixed(2)}s, LCS=${mergeResult.lcsLength}, anchor=${mergeResult.anchorValid}`);
}
return {
confirmedText,
pendingText,
fullText,
lcsLength: mergeResult.lcsLength,
anchorValid: mergeResult.anchorValid,
chunkCount: this._chunkCount,
anchorTokens: mergeResult.anchorTokens,
};
}
/**
* Process a chunk using pre-computed mel features from the mel worker.
* Bypasses the preprocessor entirely — encoder + decoder only.
*
* @param features - Normalized mel features [melBins, T] from mel worker
* @param T - Number of time frames
* @param melBins - Number of mel frequency bins
* @param startTime - Start time of this audio window in seconds
* @param overlapSeconds - Overlap with previous window in seconds (for decoder cache)
*/
async processChunkWithFeatures(
features: Float32Array,
T: number,
melBins: number,
startTime?: number,
overlapSeconds?: number,
): Promise<TokenStreamResult> {
if (!this._merger) {
throw new Error('TokenStreamTranscriber not initialized. Call initialize() first.');
}
if (this._isProcessing) {
if (this._config.debug) {
console.log('[TokenStreamTranscriber] Queued features chunk (processing busy)');
}
return this._getEmptyResult();
}
this._isProcessing = true;
try {
const result = await this._processChunkWithFeaturesInternal(
features, T, melBins, startTime, overlapSeconds,
);
return result;
} catch (error) {
console.error('[TokenStreamTranscriber] Process features chunk error:', error);
this._callbacks.onError?.(error as Error);
throw error;
} finally {
this._isProcessing = false;
}
}
private async _processChunkWithFeaturesInternal(
features: Float32Array,
T: number,
melBins: number,
startTime?: number,
overlapSeconds?: number,
): Promise<TokenStreamResult> {
const model = this._modelManager.getModel();
if (!model) {
throw new Error('Model not available');
}
const chunkStartTime = startTime !== undefined ? startTime : this._currentTimestamp;
// Calculate overlap for decoder cache
let actualOverlap = 0;
if (this._chunkCount > 0 && overlapSeconds !== undefined) {
actualOverlap = overlapSeconds;
} else if (this._chunkCount > 0) {
// Estimate from config
actualOverlap = this._config.overlapDuration;
}
const overlapSec = this._chunkCount > 0 ? actualOverlap : 0;
// Call transcribe with pre-computed features (bypasses preprocessor)
const result = await model.transcribe(null, this._config.sampleRate, {
returnTimestamps: true,
returnTokenIds: true,
returnFrameIndices: true,
returnLogProbs: this._config.returnLogProbs,
timeOffset: chunkStartTime,
frameStride: this._config.frameStride,
precomputedFeatures: { features, T, melBins },
incremental: overlapSec > 0 ? {
cacheKey: 'streaming',
prefixSeconds: overlapSec,
} : undefined,
});
// Merge using LCSPTFAMerger (same as audio path)
const mergeResult = this._merger!.processChunk(
result,
chunkStartTime,
this._chunkCount > 0 ? actualOverlap : 0
);
// Update state for next chunk
this._currentTimestamp = chunkStartTime;
this._chunkCount++;
// Get formatted text
const texts = this._merger!.getText(this._tokenizer);
const confirmedText = texts.confirmed;
const pendingText = texts.pending;
const fullText = texts.full;
// Notify callbacks
this._callbacks.onConfirmedUpdate?.(confirmedText, mergeResult.confirmed);
this._callbacks.onPendingUpdate?.(pendingText, mergeResult.pending);
this._callbacks.onMergeInfo?.({
lcsLength: mergeResult.lcsLength,
anchorValid: mergeResult.anchorValid,
anchorTokens: mergeResult.anchorTokens
});
// Always log preprocessing info from model metrics
const m = result.metrics;
if (m) {
console.log(`[TokenStreamTranscriber] Features chunk #${this._chunkCount + 1}: preprocessor=${m.preprocessor_backend || 'unknown'}, preprocess=${m.preprocess_ms}ms, encode=${m.encode_ms}ms, decode=${m.decode_ms}ms, total=${m.total_ms}ms, T=${T}`);
}
if (this._config.debug) {
console.log(`[TokenStreamTranscriber] Features chunk ${this._chunkCount}: start=${chunkStartTime.toFixed(2)}s, overlap=${actualOverlap.toFixed(2)}s, T=${T}, LCS=${mergeResult.lcsLength}`);
}
return {
confirmedText,
pendingText,
fullText,
lcsLength: mergeResult.lcsLength,
anchorValid: mergeResult.anchorValid,
chunkCount: this._chunkCount,
anchorTokens: mergeResult.anchorTokens,
};
}
/**
* Finalize and get complete transcript.
* Promotes all pending tokens to confirmed.
*/
finalize(): TokenStreamResult {
if (!this._merger) {
return this._getEmptyResult();
}
const texts = this._merger.getText(this._tokenizer);
return {
confirmedText: texts.full, // All text is now confirmed
pendingText: '',
fullText: texts.full,
lcsLength: 0,
anchorValid: true,
chunkCount: this._chunkCount,
};
}
/**
* Reset for a new transcription session.
*/
reset(): void {
if (this._merger) {
this._merger.reset();
}
// Reset mel cache in the model's JS preprocessor (if available)
const model = this._modelManager.getModel();
if (model?.resetMelCache) {
model.resetMelCache();
}
this._currentTimestamp = 0;
this._chunkCount = 0;
this._isProcessing = false;
this._pendingChunk = null;
}
/**
* Get current state for debugging.
*/
getState(): any {
return {
chunkCount: this._chunkCount,
currentTimestamp: this._currentTimestamp,
isProcessing: this._isProcessing,
mergerState: this._merger?.getState() || null,
};
}
/**
* Check if currently processing.
*/
isProcessing(): boolean {
return this._isProcessing;
}
private _getEmptyResult(): TokenStreamResult {
return {
confirmedText: '',
pendingText: '',
fullText: '',
lcsLength: 0,
anchorValid: false,
chunkCount: this._chunkCount,
};
}
/**
* Connect to AudioEngine for automatic window-based streaming.
*
* Usage:
* ```
* const unsubscribe = transcriber.connectToAudioEngine(audioEngine);
* // ... later
* unsubscribe();
* ```
*
* @param audioEngine - AudioEngine instance with onWindowChunk support
* @returns Unsubscribe function
*/
connectToAudioEngine(audioEngine: any): () => void {
if (!audioEngine.onWindowChunk) {
throw new Error('AudioEngine does not support onWindowChunk. Update to v3.0.');
}
return audioEngine.onWindowChunk(
this._config.windowDuration,
this._config.overlapDuration,
this._config.windowDuration - this._config.overlapDuration, // Default trigger interval
async (audio: Float32Array, startTime: number) => {
try {
await this.processChunk(audio, startTime);
} catch (e) {
console.error('[TokenStreamTranscriber] Window processing error:', e);
}
}
);
}
/**
* Get configuration.
*/
getConfig(): Required<TokenStreamConfig> {
return { ...this._config };
}
}