pocket-tts-web / PCMPlayerWorklet.js
KevinAHM's picture
Initial commit
156aa01
import { EventEmitter, CustomEvent } from './EventEmitter.js';
/**
* PCMPlayerWorklet - Drop-in replacement for PCMPlayer using AudioWorklet
* Uses dynamic buffer management with backpressure for smooth playback
*/
export class PCMPlayerWorklet extends EventEmitter {
constructor(audioContext, options = {}) {
super();
this.audioContext = audioContext;
this.options = options;
this.workletNode = null;
this.isInitialized = false;
this.playbackTime = 0; // For API compatibility
// Audio nodes
this.gainNode = this.audioContext.createGain();
this.gainNode.connect(this.audioContext.destination);
this.analyser = this.audioContext.createAnalyser();
this.gainNode.connect(this.analyser);
// Queue for chunks waiting to be sent
this.pendingChunks = [];
this.availableCapacity = 0;
this.isWorkletReady = false;
this.hasReceivedInitialCapacity = false;
// Metrics
this.metrics = {
chunksPlayed: 0,
underruns: 0,
bufferLevel: 0,
samplesPlayed: 0
};
// Initialize worklet
this.initPromise = this.initialize();
}
async initialize() {
if (this.isInitialized) return;
try {
// Calculate buffer parameters
const sampleRate = this.audioContext.sampleRate;
const minBufferMs = this.options.minBufferBeforePlaybackMs || 300;
const minBufferSamples = Math.floor(minBufferMs * sampleRate / 1000);
// Buffer size: enough for smooth playback but not excessive
// Target 60 seconds of buffer to prevent any overflow issues
const bufferSizeSamples = sampleRate * 60;
// Create the worklet processor code
const processorCode = `
class PCMProcessor extends AudioWorkletProcessor {
constructor() {
super();
// Ring buffer - sized appropriately
this.bufferSize = ${bufferSizeSamples};
this.ringBuffer = new Float32Array(this.bufferSize);
this.readPos = 0;
this.writePos = 0;
this.isPlaying = false;
// Configuration
this.minBufferSamples = ${minBufferSamples};
this.targetBufferSamples = ${minBufferSamples * 2}; // Target 2x min for stability
// State
this.streamEnded = false;
this.playbackCompleteReported = false;
// Stats reporting
this.frameCount = 0;
this.reportInterval = 256; // Report every ~5ms at 48kHz
this.port.onmessage = (e) => {
switch(e.data.type) {
case 'audio':
this.addAudio(e.data.data);
break;
case 'reset':
this.reset();
break;
case 'stream-ended':
this.streamEnded = true;
break;
}
};
// Send initial capacity
this.sendCapacityUpdate();
}
addAudio(float32Data) {
const samples = float32Data.length;
const available = this.getAvailableSpace();
const bufferedBefore = this.getBufferedSamples();
if (samples > available) {
// This shouldn't happen with proper backpressure
console.error('Buffer overflow - bug in backpressure. Samples:', samples, 'Available:', available, 'Buffered:', this.getBufferedSamples());
// Drop oldest data to recover
const overflow = samples - available;
this.readPos = (this.readPos + overflow) % this.bufferSize;
}
// Write to ring buffer
if (this.writePos + samples <= this.bufferSize) {
this.ringBuffer.set(float32Data, this.writePos);
this.writePos += samples;
if (this.writePos >= this.bufferSize) {
this.writePos = 0;
}
} else {
const firstPart = this.bufferSize - this.writePos;
const secondPart = samples - firstPart;
this.ringBuffer.set(float32Data.slice(0, firstPart), this.writePos);
this.ringBuffer.set(float32Data.slice(firstPart), 0);
this.writePos = secondPart;
}
// Auto-start when we have enough buffered
const buffered = this.getBufferedSamples();
if (!this.isPlaying && buffered >= this.minBufferSamples) {
const now = currentTime;
this.isPlaying = true;
// Notify that playback has started
this.port.postMessage({
type: 'playback-started',
buffered: buffered,
audioTime: now
});
}
// Report capacity after adding
this.sendCapacityUpdate();
}
getAvailableSpace() {
const used = this.getBufferedSamples();
return this.bufferSize - used - 128; // Leave small safety margin
}
getBufferedSamples() {
if (this.writePos >= this.readPos) {
return this.writePos - this.readPos;
} else {
return this.bufferSize - this.readPos + this.writePos;
}
}
sendCapacityUpdate() {
const buffered = this.getBufferedSamples();
const capacity = this.getAvailableSpace();
// Calculate how much we want to receive
// If buffer is low, request more; if it's full, request nothing
let requestSamples = 0;
if (buffered < this.targetBufferSamples) {
requestSamples = Math.min(capacity, this.targetBufferSamples - buffered);
}
this.port.postMessage({
type: 'capacity',
buffered: buffered,
capacity: capacity,
requestSamples: requestSamples,
isPlaying: this.isPlaying
});
}
process(inputs, outputs, parameters) {
const output = outputs[0];
if (!output || !output[0]) return true;
const outputChannel = output[0];
const numSamples = outputChannel.length;
// Report stats periodically
if (++this.frameCount % this.reportInterval === 0) {
this.sendCapacityUpdate();
}
if (!this.isPlaying) {
outputChannel.fill(0);
return true;
}
const buffered = this.getBufferedSamples();
if (buffered < numSamples) {
// Underrun - play what we have and fill rest with silence
let samplesRead = 0;
if (buffered > 0) {
// Play whatever samples we DO have
if (this.readPos + buffered <= this.bufferSize) {
for (let i = 0; i < buffered; i++) {
outputChannel[i] = this.ringBuffer[this.readPos + i];
}
this.readPos += buffered;
if (this.readPos >= this.bufferSize) {
this.readPos = 0;
}
} else {
// Wrap-around case
const firstPart = this.bufferSize - this.readPos;
const secondPart = buffered - firstPart;
for (let i = 0; i < firstPart; i++) {
outputChannel[i] = this.ringBuffer[this.readPos + i];
}
for (let i = 0; i < secondPart; i++) {
outputChannel[firstPart + i] = this.ringBuffer[i];
}
this.readPos = secondPart;
}
samplesRead = buffered;
}
// Fill remaining with silence
for (let i = samplesRead; i < numSamples; i++) {
outputChannel[i] = 0;
}
// Check for playback complete
if (this.streamEnded && buffered === 0) {
if (!this.playbackCompleteReported) {
this.port.postMessage({
type: 'playback-complete'
});
this.playbackCompleteReported = true;
}
this.isPlaying = false;
this.streamEnded = false;
} else {
// Request more data urgently
this.port.postMessage({
type: 'underrun',
buffered: buffered,
needed: numSamples
});
this.sendCapacityUpdate();
}
} else {
// Normal playback - read from ring buffer
if (this.readPos + numSamples <= this.bufferSize) {
for (let i = 0; i < numSamples; i++) {
outputChannel[i] = this.ringBuffer[this.readPos + i];
}
this.readPos += numSamples;
if (this.readPos >= this.bufferSize) {
this.readPos = 0;
}
} else {
// Wrap-around case
const firstPart = this.bufferSize - this.readPos;
const secondPart = numSamples - firstPart;
for (let i = 0; i < firstPart; i++) {
outputChannel[i] = this.ringBuffer[this.readPos + i];
}
for (let i = 0; i < secondPart; i++) {
outputChannel[firstPart + i] = this.ringBuffer[i];
}
this.readPos = secondPart;
}
}
return true;
}
reset() {
this.readPos = 0;
this.writePos = 0;
this.ringBuffer.fill(0);
this.isPlaying = false;
this.streamEnded = false;
this.playbackCompleteReported = false;
this.sendCapacityUpdate();
}
}
registerProcessor('pcm-processor', PCMProcessor);
`;
// Create and load worklet
const blob = new Blob([processorCode], { type: 'application/javascript' });
const workletUrl = URL.createObjectURL(blob);
await this.audioContext.audioWorklet.addModule(workletUrl);
URL.revokeObjectURL(workletUrl);
// Create worklet node
this.workletNode = new AudioWorkletNode(this.audioContext, 'pcm-processor');
this.workletNode.connect(this.gainNode);
// Handle messages from worklet
this.workletNode.port.onmessage = (e) => {
switch (e.data.type) {
case 'capacity':
this.handleCapacityUpdate(e.data);
break;
case 'underrun':
this.metrics.underruns++;
console.warn(`[MAIN THREAD] ⚠️ UNDERRUN #${this.metrics.underruns} detected! buffered=${e.data.buffered} samples, needed=${e.data.needed} samples`);
// Try to send more data immediately
this.processPendingChunks();
break;
case 'playback-started':
console.log(`[MAIN THREAD] Received playback-started at performance.now=${performance.now().toFixed(2)}ms, audioContext.currentTime=${this.audioContext.currentTime.toFixed(3)}s, worklet reported audioTime=${e.data.audioTime}s`);
this.emit('firstPlayback', {
startTime: this.audioContext.currentTime,
bufferedSamples: e.data.buffered
});
break;
case 'playback-complete':
this.emit('audioEnded', {
endTime: this.audioContext.currentTime
});
break;
}
};
this.isInitialized = true;
this.isWorkletReady = true;
} catch (error) {
console.error('Failed to initialize PCMPlayerWorklet:', error);
throw error;
}
}
handleCapacityUpdate(data) {
this.availableCapacity = data.capacity;
this.metrics.bufferLevel = data.buffered;
// console.log(`[CAPACITY] Update at ${performance.now().toFixed(2)}ms: capacity=${data.capacity}, buffered=${data.buffered}, pending=${this.pendingChunks.length}`);
// Mark that we've received initial capacity
if (!this.hasReceivedInitialCapacity) {
this.hasReceivedInitialCapacity = true;
// console.log(`[CAPACITY] *** FIRST capacity received at ${performance.now().toFixed(2)}ms, processing ${this.pendingChunks.length} pending chunks`);
// Process any chunks that were waiting for initial capacity
if (this.pendingChunks.length > 0) {
this.processPendingChunks();
}
}
// If worklet is requesting data, try to send it
if (data.requestSamples > 0 && this.pendingChunks.length > 0) {
this.processPendingChunks();
}
}
processPendingChunks() {
if (!this.isWorkletReady || this.pendingChunks.length === 0) {
return;
}
// Don't send if we don't know capacity yet
if (this.availableCapacity <= 0) {
return;
}
// Send ONE chunk if it fits, then wait for next capacity update
// This prevents race conditions from sending multiple chunks before worklet updates
const chunk = this.pendingChunks[0];
if (chunk.length <= this.availableCapacity) {
// Send the whole chunk
this.pendingChunks.shift();
this.workletNode.port.postMessage({
type: 'audio',
data: chunk
});
// Set capacity to 0 to prevent sending more until we get an update
this.availableCapacity = 0;
} else if (this.availableCapacity > 4096) {
// Send partial chunk only if we have significant space
const partial = chunk.slice(0, this.availableCapacity);
console.log(`Sending partial: ${partial.length} samples from ${chunk.length} (capacity: ${this.availableCapacity})`);
this.pendingChunks[0] = chunk.slice(this.availableCapacity);
this.workletNode.port.postMessage({
type: 'audio',
data: partial
});
// Set capacity to 0 to prevent sending more until we get an update
this.availableCapacity = 0;
} else {
console.log(`Not sending - chunk ${chunk.length} samples, capacity ${this.availableCapacity}`);
}
// else: Not enough space, wait for next capacity update
// If all chunks sent and stream ended, notify worklet
if (this.pendingChunks.length === 0 && this.pendingStreamEnd) {
this.workletNode.port.postMessage({ type: 'stream-ended' });
this.pendingStreamEnd = false;
}
}
playAudio(data) {
if (!this.isInitialized) {
// Queue the data if not initialized yet
if (!this.initPendingQueue) {
this.initPendingQueue = [];
this.initPromise.then(() => {
// Process queued data
const queue = this.initPendingQueue;
this.initPendingQueue = null;
for (const queuedData of queue) {
this.playAudio(queuedData);
}
});
}
this.initPendingQueue.push(data);
return;
}
if (this.audioContext.state !== 'running') {
return;
}
// Convert to Float32Array if needed
const float32Array = data instanceof Int16Array
? this.pcm16ToFloat32(data)
: data;
// Add to pending queue
this.pendingChunks.push(float32Array);
// Only try to process if we've received initial capacity and have space
// Otherwise wait for capacity update from worklet
if (this.hasReceivedInitialCapacity && this.availableCapacity > 0) {
this.processPendingChunks();
}
// Update metrics
this.metrics.chunksPlayed++;
// Update playback time for compatibility
const duration = float32Array.length / this.audioContext.sampleRate;
this.playbackTime = this.audioContext.currentTime + duration;
// Emit events for compatibility
this.emit('audioStarted', {
startTime: this.audioContext.currentTime,
duration: duration,
samples: float32Array.length
});
}
notifyStreamEnded() {
if (this.pendingChunks.length > 0) {
// Still have chunks to send, mark for later
this.pendingStreamEnd = true;
} else {
// No chunks left, send immediately
if (this.workletNode) {
this.workletNode.port.postMessage({ type: 'stream-ended' });
}
}
}
pcm16ToFloat32(pcm16) {
const float32 = new Float32Array(pcm16.length);
for (let i = 0; i < pcm16.length; i++) {
float32[i] = pcm16[i] / 32768;
}
return float32;
}
reset() {
this.playbackTime = 0;
this.pendingChunks = [];
this.pendingStreamEnd = false;
this.availableCapacity = 0;
if (this.workletNode) {
this.workletNode.port.postMessage({ type: 'reset' });
}
// Quick fade out to avoid clicks
if (this.gainNode) {
const now = this.audioContext.currentTime;
this.gainNode.gain.setValueAtTime(this.gainNode.gain.value, now);
this.gainNode.gain.linearRampToValueAtTime(0, now + 0.05);
setTimeout(() => {
this.gainNode.gain.value = 1;
}, 100);
}
}
stopAllSources() {
this.reset();
}
async resume() {
if (this.audioContext.state === 'suspended') {
await this.audioContext.resume();
}
}
get volume() {
return this.gainNode.gain.value;
}
set volume(value) {
const clampedValue = Math.max(0, Math.min(1, value));
this.gainNode.gain.value = clampedValue;
this.emit('volumeChange', { volume: clampedValue });
}
get volumePercentage() {
return this.volume * 100;
}
set volumePercentage(percentage) {
this.volume = percentage / 100;
}
getAnalyserData() {
const bufferLength = this.analyser.frequencyBinCount;
const dataArray = new Uint8Array(bufferLength);
this.analyser.getByteFrequencyData(dataArray);
return dataArray;
}
getTimeDomainData() {
const bufferLength = this.analyser.frequencyBinCount;
const dataArray = new Uint8Array(bufferLength);
this.analyser.getByteTimeDomainData(dataArray);
return dataArray;
}
getPlaybackStatus() {
const bufferMs = this.metrics.bufferLevel
? (this.metrics.bufferLevel / this.audioContext.sampleRate) * 1000
: 0;
return {
currentTime: this.audioContext.currentTime,
scheduledTime: this.playbackTime,
bufferedDuration: bufferMs / 1000,
state: this.audioContext.state,
worklet: {
bufferLevelSamples: this.metrics.bufferLevel,
bufferLevelMs: bufferMs,
underruns: this.metrics.underruns,
chunksPlayed: this.metrics.chunksPlayed,
pendingChunks: this.pendingChunks.length
}
};
}
}