Spaces:
Running
Running
| import { EventEmitter, CustomEvent } from './EventEmitter.js'; | |
| /** | |
| * PCMPlayerWorklet - Drop-in replacement for PCMPlayer using AudioWorklet | |
| * Uses dynamic buffer management with backpressure for smooth playback | |
| */ | |
| export class PCMPlayerWorklet extends EventEmitter { | |
| constructor(audioContext, options = {}) { | |
| super(); | |
| this.audioContext = audioContext; | |
| this.options = options; | |
| this.workletNode = null; | |
| this.isInitialized = false; | |
| this.playbackTime = 0; // For API compatibility | |
| // Audio nodes | |
| this.gainNode = this.audioContext.createGain(); | |
| this.gainNode.connect(this.audioContext.destination); | |
| this.analyser = this.audioContext.createAnalyser(); | |
| this.gainNode.connect(this.analyser); | |
| // Queue for chunks waiting to be sent | |
| this.pendingChunks = []; | |
| this.availableCapacity = 0; | |
| this.isWorkletReady = false; | |
| this.hasReceivedInitialCapacity = false; | |
| // Metrics | |
| this.metrics = { | |
| chunksPlayed: 0, | |
| underruns: 0, | |
| bufferLevel: 0, | |
| samplesPlayed: 0 | |
| }; | |
| // Initialize worklet | |
| this.initPromise = this.initialize(); | |
| } | |
| async initialize() { | |
| if (this.isInitialized) return; | |
| try { | |
| // Calculate buffer parameters | |
| const sampleRate = this.audioContext.sampleRate; | |
| const minBufferMs = this.options.minBufferBeforePlaybackMs || 300; | |
| const minBufferSamples = Math.floor(minBufferMs * sampleRate / 1000); | |
| // Buffer size: enough for smooth playback but not excessive | |
| // Target 60 seconds of buffer to prevent any overflow issues | |
| const bufferSizeSamples = sampleRate * 60; | |
| // Create the worklet processor code | |
| const processorCode = ` | |
| class PCMProcessor extends AudioWorkletProcessor { | |
| constructor() { | |
| super(); | |
| // Ring buffer - sized appropriately | |
| this.bufferSize = ${bufferSizeSamples}; | |
| this.ringBuffer = new Float32Array(this.bufferSize); | |
| this.readPos = 0; | |
| this.writePos = 0; | |
| this.isPlaying = false; | |
| // Configuration | |
| this.minBufferSamples = ${minBufferSamples}; | |
| this.targetBufferSamples = ${minBufferSamples * 2}; // Target 2x min for stability | |
| // State | |
| this.streamEnded = false; | |
| this.playbackCompleteReported = false; | |
| // Stats reporting | |
| this.frameCount = 0; | |
| this.reportInterval = 256; // Report every ~5ms at 48kHz | |
| this.port.onmessage = (e) => { | |
| switch(e.data.type) { | |
| case 'audio': | |
| this.addAudio(e.data.data); | |
| break; | |
| case 'reset': | |
| this.reset(); | |
| break; | |
| case 'stream-ended': | |
| this.streamEnded = true; | |
| break; | |
| } | |
| }; | |
| // Send initial capacity | |
| this.sendCapacityUpdate(); | |
| } | |
| addAudio(float32Data) { | |
| const samples = float32Data.length; | |
| const available = this.getAvailableSpace(); | |
| const bufferedBefore = this.getBufferedSamples(); | |
| if (samples > available) { | |
| // This shouldn't happen with proper backpressure | |
| console.error('Buffer overflow - bug in backpressure. Samples:', samples, 'Available:', available, 'Buffered:', this.getBufferedSamples()); | |
| // Drop oldest data to recover | |
| const overflow = samples - available; | |
| this.readPos = (this.readPos + overflow) % this.bufferSize; | |
| } | |
| // Write to ring buffer | |
| if (this.writePos + samples <= this.bufferSize) { | |
| this.ringBuffer.set(float32Data, this.writePos); | |
| this.writePos += samples; | |
| if (this.writePos >= this.bufferSize) { | |
| this.writePos = 0; | |
| } | |
| } else { | |
| const firstPart = this.bufferSize - this.writePos; | |
| const secondPart = samples - firstPart; | |
| this.ringBuffer.set(float32Data.slice(0, firstPart), this.writePos); | |
| this.ringBuffer.set(float32Data.slice(firstPart), 0); | |
| this.writePos = secondPart; | |
| } | |
| // Auto-start when we have enough buffered | |
| const buffered = this.getBufferedSamples(); | |
| if (!this.isPlaying && buffered >= this.minBufferSamples) { | |
| const now = currentTime; | |
| this.isPlaying = true; | |
| // Notify that playback has started | |
| this.port.postMessage({ | |
| type: 'playback-started', | |
| buffered: buffered, | |
| audioTime: now | |
| }); | |
| } | |
| // Report capacity after adding | |
| this.sendCapacityUpdate(); | |
| } | |
| getAvailableSpace() { | |
| const used = this.getBufferedSamples(); | |
| return this.bufferSize - used - 128; // Leave small safety margin | |
| } | |
| getBufferedSamples() { | |
| if (this.writePos >= this.readPos) { | |
| return this.writePos - this.readPos; | |
| } else { | |
| return this.bufferSize - this.readPos + this.writePos; | |
| } | |
| } | |
| sendCapacityUpdate() { | |
| const buffered = this.getBufferedSamples(); | |
| const capacity = this.getAvailableSpace(); | |
| // Calculate how much we want to receive | |
| // If buffer is low, request more; if it's full, request nothing | |
| let requestSamples = 0; | |
| if (buffered < this.targetBufferSamples) { | |
| requestSamples = Math.min(capacity, this.targetBufferSamples - buffered); | |
| } | |
| this.port.postMessage({ | |
| type: 'capacity', | |
| buffered: buffered, | |
| capacity: capacity, | |
| requestSamples: requestSamples, | |
| isPlaying: this.isPlaying | |
| }); | |
| } | |
| process(inputs, outputs, parameters) { | |
| const output = outputs[0]; | |
| if (!output || !output[0]) return true; | |
| const outputChannel = output[0]; | |
| const numSamples = outputChannel.length; | |
| // Report stats periodically | |
| if (++this.frameCount % this.reportInterval === 0) { | |
| this.sendCapacityUpdate(); | |
| } | |
| if (!this.isPlaying) { | |
| outputChannel.fill(0); | |
| return true; | |
| } | |
| const buffered = this.getBufferedSamples(); | |
| if (buffered < numSamples) { | |
| // Underrun - play what we have and fill rest with silence | |
| let samplesRead = 0; | |
| if (buffered > 0) { | |
| // Play whatever samples we DO have | |
| if (this.readPos + buffered <= this.bufferSize) { | |
| for (let i = 0; i < buffered; i++) { | |
| outputChannel[i] = this.ringBuffer[this.readPos + i]; | |
| } | |
| this.readPos += buffered; | |
| if (this.readPos >= this.bufferSize) { | |
| this.readPos = 0; | |
| } | |
| } else { | |
| // Wrap-around case | |
| const firstPart = this.bufferSize - this.readPos; | |
| const secondPart = buffered - firstPart; | |
| for (let i = 0; i < firstPart; i++) { | |
| outputChannel[i] = this.ringBuffer[this.readPos + i]; | |
| } | |
| for (let i = 0; i < secondPart; i++) { | |
| outputChannel[firstPart + i] = this.ringBuffer[i]; | |
| } | |
| this.readPos = secondPart; | |
| } | |
| samplesRead = buffered; | |
| } | |
| // Fill remaining with silence | |
| for (let i = samplesRead; i < numSamples; i++) { | |
| outputChannel[i] = 0; | |
| } | |
| // Check for playback complete | |
| if (this.streamEnded && buffered === 0) { | |
| if (!this.playbackCompleteReported) { | |
| this.port.postMessage({ | |
| type: 'playback-complete' | |
| }); | |
| this.playbackCompleteReported = true; | |
| } | |
| this.isPlaying = false; | |
| this.streamEnded = false; | |
| } else { | |
| // Request more data urgently | |
| this.port.postMessage({ | |
| type: 'underrun', | |
| buffered: buffered, | |
| needed: numSamples | |
| }); | |
| this.sendCapacityUpdate(); | |
| } | |
| } else { | |
| // Normal playback - read from ring buffer | |
| if (this.readPos + numSamples <= this.bufferSize) { | |
| for (let i = 0; i < numSamples; i++) { | |
| outputChannel[i] = this.ringBuffer[this.readPos + i]; | |
| } | |
| this.readPos += numSamples; | |
| if (this.readPos >= this.bufferSize) { | |
| this.readPos = 0; | |
| } | |
| } else { | |
| // Wrap-around case | |
| const firstPart = this.bufferSize - this.readPos; | |
| const secondPart = numSamples - firstPart; | |
| for (let i = 0; i < firstPart; i++) { | |
| outputChannel[i] = this.ringBuffer[this.readPos + i]; | |
| } | |
| for (let i = 0; i < secondPart; i++) { | |
| outputChannel[firstPart + i] = this.ringBuffer[i]; | |
| } | |
| this.readPos = secondPart; | |
| } | |
| } | |
| return true; | |
| } | |
| reset() { | |
| this.readPos = 0; | |
| this.writePos = 0; | |
| this.ringBuffer.fill(0); | |
| this.isPlaying = false; | |
| this.streamEnded = false; | |
| this.playbackCompleteReported = false; | |
| this.sendCapacityUpdate(); | |
| } | |
| } | |
| registerProcessor('pcm-processor', PCMProcessor); | |
| `; | |
| // Create and load worklet | |
| const blob = new Blob([processorCode], { type: 'application/javascript' }); | |
| const workletUrl = URL.createObjectURL(blob); | |
| await this.audioContext.audioWorklet.addModule(workletUrl); | |
| URL.revokeObjectURL(workletUrl); | |
| // Create worklet node | |
| this.workletNode = new AudioWorkletNode(this.audioContext, 'pcm-processor'); | |
| this.workletNode.connect(this.gainNode); | |
| // Handle messages from worklet | |
| this.workletNode.port.onmessage = (e) => { | |
| switch (e.data.type) { | |
| case 'capacity': | |
| this.handleCapacityUpdate(e.data); | |
| break; | |
| case 'underrun': | |
| this.metrics.underruns++; | |
| console.warn(`[MAIN THREAD] ⚠️ UNDERRUN #${this.metrics.underruns} detected! buffered=${e.data.buffered} samples, needed=${e.data.needed} samples`); | |
| // Try to send more data immediately | |
| this.processPendingChunks(); | |
| break; | |
| case 'playback-started': | |
| console.log(`[MAIN THREAD] Received playback-started at performance.now=${performance.now().toFixed(2)}ms, audioContext.currentTime=${this.audioContext.currentTime.toFixed(3)}s, worklet reported audioTime=${e.data.audioTime}s`); | |
| this.emit('firstPlayback', { | |
| startTime: this.audioContext.currentTime, | |
| bufferedSamples: e.data.buffered | |
| }); | |
| break; | |
| case 'playback-complete': | |
| this.emit('audioEnded', { | |
| endTime: this.audioContext.currentTime | |
| }); | |
| break; | |
| } | |
| }; | |
| this.isInitialized = true; | |
| this.isWorkletReady = true; | |
| } catch (error) { | |
| console.error('Failed to initialize PCMPlayerWorklet:', error); | |
| throw error; | |
| } | |
| } | |
| handleCapacityUpdate(data) { | |
| this.availableCapacity = data.capacity; | |
| this.metrics.bufferLevel = data.buffered; | |
| // console.log(`[CAPACITY] Update at ${performance.now().toFixed(2)}ms: capacity=${data.capacity}, buffered=${data.buffered}, pending=${this.pendingChunks.length}`); | |
| // Mark that we've received initial capacity | |
| if (!this.hasReceivedInitialCapacity) { | |
| this.hasReceivedInitialCapacity = true; | |
| // console.log(`[CAPACITY] *** FIRST capacity received at ${performance.now().toFixed(2)}ms, processing ${this.pendingChunks.length} pending chunks`); | |
| // Process any chunks that were waiting for initial capacity | |
| if (this.pendingChunks.length > 0) { | |
| this.processPendingChunks(); | |
| } | |
| } | |
| // If worklet is requesting data, try to send it | |
| if (data.requestSamples > 0 && this.pendingChunks.length > 0) { | |
| this.processPendingChunks(); | |
| } | |
| } | |
| processPendingChunks() { | |
| if (!this.isWorkletReady || this.pendingChunks.length === 0) { | |
| return; | |
| } | |
| // Don't send if we don't know capacity yet | |
| if (this.availableCapacity <= 0) { | |
| return; | |
| } | |
| // Send ONE chunk if it fits, then wait for next capacity update | |
| // This prevents race conditions from sending multiple chunks before worklet updates | |
| const chunk = this.pendingChunks[0]; | |
| if (chunk.length <= this.availableCapacity) { | |
| // Send the whole chunk | |
| this.pendingChunks.shift(); | |
| this.workletNode.port.postMessage({ | |
| type: 'audio', | |
| data: chunk | |
| }); | |
| // Set capacity to 0 to prevent sending more until we get an update | |
| this.availableCapacity = 0; | |
| } else if (this.availableCapacity > 4096) { | |
| // Send partial chunk only if we have significant space | |
| const partial = chunk.slice(0, this.availableCapacity); | |
| console.log(`Sending partial: ${partial.length} samples from ${chunk.length} (capacity: ${this.availableCapacity})`); | |
| this.pendingChunks[0] = chunk.slice(this.availableCapacity); | |
| this.workletNode.port.postMessage({ | |
| type: 'audio', | |
| data: partial | |
| }); | |
| // Set capacity to 0 to prevent sending more until we get an update | |
| this.availableCapacity = 0; | |
| } else { | |
| console.log(`Not sending - chunk ${chunk.length} samples, capacity ${this.availableCapacity}`); | |
| } | |
| // else: Not enough space, wait for next capacity update | |
| // If all chunks sent and stream ended, notify worklet | |
| if (this.pendingChunks.length === 0 && this.pendingStreamEnd) { | |
| this.workletNode.port.postMessage({ type: 'stream-ended' }); | |
| this.pendingStreamEnd = false; | |
| } | |
| } | |
| playAudio(data) { | |
| if (!this.isInitialized) { | |
| // Queue the data if not initialized yet | |
| if (!this.initPendingQueue) { | |
| this.initPendingQueue = []; | |
| this.initPromise.then(() => { | |
| // Process queued data | |
| const queue = this.initPendingQueue; | |
| this.initPendingQueue = null; | |
| for (const queuedData of queue) { | |
| this.playAudio(queuedData); | |
| } | |
| }); | |
| } | |
| this.initPendingQueue.push(data); | |
| return; | |
| } | |
| if (this.audioContext.state !== 'running') { | |
| return; | |
| } | |
| // Convert to Float32Array if needed | |
| const float32Array = data instanceof Int16Array | |
| ? this.pcm16ToFloat32(data) | |
| : data; | |
| // Add to pending queue | |
| this.pendingChunks.push(float32Array); | |
| // Only try to process if we've received initial capacity and have space | |
| // Otherwise wait for capacity update from worklet | |
| if (this.hasReceivedInitialCapacity && this.availableCapacity > 0) { | |
| this.processPendingChunks(); | |
| } | |
| // Update metrics | |
| this.metrics.chunksPlayed++; | |
| // Update playback time for compatibility | |
| const duration = float32Array.length / this.audioContext.sampleRate; | |
| this.playbackTime = this.audioContext.currentTime + duration; | |
| // Emit events for compatibility | |
| this.emit('audioStarted', { | |
| startTime: this.audioContext.currentTime, | |
| duration: duration, | |
| samples: float32Array.length | |
| }); | |
| } | |
| notifyStreamEnded() { | |
| if (this.pendingChunks.length > 0) { | |
| // Still have chunks to send, mark for later | |
| this.pendingStreamEnd = true; | |
| } else { | |
| // No chunks left, send immediately | |
| if (this.workletNode) { | |
| this.workletNode.port.postMessage({ type: 'stream-ended' }); | |
| } | |
| } | |
| } | |
| pcm16ToFloat32(pcm16) { | |
| const float32 = new Float32Array(pcm16.length); | |
| for (let i = 0; i < pcm16.length; i++) { | |
| float32[i] = pcm16[i] / 32768; | |
| } | |
| return float32; | |
| } | |
| reset() { | |
| this.playbackTime = 0; | |
| this.pendingChunks = []; | |
| this.pendingStreamEnd = false; | |
| this.availableCapacity = 0; | |
| if (this.workletNode) { | |
| this.workletNode.port.postMessage({ type: 'reset' }); | |
| } | |
| // Quick fade out to avoid clicks | |
| if (this.gainNode) { | |
| const now = this.audioContext.currentTime; | |
| this.gainNode.gain.setValueAtTime(this.gainNode.gain.value, now); | |
| this.gainNode.gain.linearRampToValueAtTime(0, now + 0.05); | |
| setTimeout(() => { | |
| this.gainNode.gain.value = 1; | |
| }, 100); | |
| } | |
| } | |
| stopAllSources() { | |
| this.reset(); | |
| } | |
| async resume() { | |
| if (this.audioContext.state === 'suspended') { | |
| await this.audioContext.resume(); | |
| } | |
| } | |
| get volume() { | |
| return this.gainNode.gain.value; | |
| } | |
| set volume(value) { | |
| const clampedValue = Math.max(0, Math.min(1, value)); | |
| this.gainNode.gain.value = clampedValue; | |
| this.emit('volumeChange', { volume: clampedValue }); | |
| } | |
| get volumePercentage() { | |
| return this.volume * 100; | |
| } | |
| set volumePercentage(percentage) { | |
| this.volume = percentage / 100; | |
| } | |
| getAnalyserData() { | |
| const bufferLength = this.analyser.frequencyBinCount; | |
| const dataArray = new Uint8Array(bufferLength); | |
| this.analyser.getByteFrequencyData(dataArray); | |
| return dataArray; | |
| } | |
| getTimeDomainData() { | |
| const bufferLength = this.analyser.frequencyBinCount; | |
| const dataArray = new Uint8Array(bufferLength); | |
| this.analyser.getByteTimeDomainData(dataArray); | |
| return dataArray; | |
| } | |
| getPlaybackStatus() { | |
| const bufferMs = this.metrics.bufferLevel | |
| ? (this.metrics.bufferLevel / this.audioContext.sampleRate) * 1000 | |
| : 0; | |
| return { | |
| currentTime: this.audioContext.currentTime, | |
| scheduledTime: this.playbackTime, | |
| bufferedDuration: bufferMs / 1000, | |
| state: this.audioContext.state, | |
| worklet: { | |
| bufferLevelSamples: this.metrics.bufferLevel, | |
| bufferLevelMs: bufferMs, | |
| underruns: this.metrics.underruns, | |
| chunksPlayed: this.metrics.chunksPlayed, | |
| pendingChunks: this.pendingChunks.length | |
| } | |
| }; | |
| } | |
| } |