/** * Transient-onset detector — clicks (antenna hits) and short beeps. * * Ported from marionette/tests/audio_analysis.py `detect_transient_onsets`: * highpass -> short-term energy (5 ms windows) -> spectral flux * (positive energy differences) -> peak pick with min separation. * * Two entry points sharing the same DSP core: * - detectTransientOnsets(samples, sr, opts): pure, offline. Used by unit * tests (synthetic audio) and the visualizer (on recorded buffers). * - StreamingOnsetDetector: live mic. The highpass is done by a Web Audio * BiquadFilterNode upstream; this class consumes filtered blocks and * emits onsets in real time with an adaptive (noise-floor) threshold. * * Onset times are in MILLISECONDS to match wire.js / timing.js. */ // ──────── Pure DSP ──────────────────────────────────────────────── /** * Single 2nd-order Butterworth high-pass (RBJ cookbook). Cascade twice for * a ~4th-order roll-off (matches the Python `butter(4, ...)` intent). */ export function biquadHighpassOnce(samples, sr, fc, q = Math.SQRT1_2) { const w0 = (2 * Math.PI * fc) / sr; const cos = Math.cos(w0); const sin = Math.sin(w0); const alpha = sin / (2 * q); const b0 = (1 + cos) / 2; const b1 = -(1 + cos); const b2 = (1 + cos) / 2; const a0 = 1 + alpha; const a1 = -2 * cos; const a2 = 1 - alpha; const nb0 = b0 / a0, nb1 = b1 / a0, nb2 = b2 / a0, na1 = a1 / a0, na2 = a2 / a0; const out = new Float64Array(samples.length); let x1 = 0, x2 = 0, y1 = 0, y2 = 0; for (let i = 0; i < samples.length; i++) { const x0 = samples[i]; const y0 = nb0 * x0 + nb1 * x1 + nb2 * x2 - na1 * y1 - na2 * y2; x2 = x1; x1 = x0; y2 = y1; y1 = y0; out[i] = y0; } return out; } export function highpass(samples, sr, fc) { return biquadHighpassOnce(biquadHighpassOnce(samples, sr, fc), sr, fc); } /** * Short-term energy per non-overlapping window, then positive flux. * @returns {{flux:Float64Array, winSamples:number}} */ export function energyFlux(filtered, sr, winMs = 5) { const winSamples = Math.max(1, Math.round((winMs / 1000) * sr)); const nWin = Math.floor(filtered.length / winSamples); const energy = new Float64Array(nWin); for (let w = 0; w < nWin; w++) { let e = 0; const base = w * winSamples; for (let j = 0; j < winSamples; j++) { const v = filtered[base + j]; e += v * v; } energy[w] = e; } const flux = new Float64Array(Math.max(0, nWin - 1)); for (let i = 1; i < nWin; i++) flux[i - 1] = Math.max(0, energy[i] - energy[i - 1]); return { flux, winSamples }; } /** * Offline transient detection on a full buffer. * @returns {number[]} onset times in ms. */ export function detectTransientOnsets(samples, sr, opts = {}) { const { highpassHz = 2000, winMs = 5, thresholdRatio = 0.1, minSepMs = 150 } = opts; if (!samples || samples.length < sr * (winMs / 1000) * 2) return []; const filtered = highpass(samples, sr, highpassHz); const { flux, winSamples } = energyFlux(filtered, sr, winMs); if (flux.length < 2) return []; let peak = 0; for (let i = 0; i < flux.length; i++) if (flux[i] > peak) peak = flux[i]; if (peak < 1e-12) return []; const threshold = peak * thresholdRatio; const minDistWin = Math.max(1, Math.round((minSepMs / 1000) * sr / winSamples)); // local-maxima peak pick with min distance (mirrors scipy find_peaks) const onsets = []; let lastPeakWin = -Infinity; for (let i = 1; i < flux.length - 1; i++) { if (flux[i] < threshold) continue; if (flux[i] < flux[i - 1] || flux[i] < flux[i + 1]) continue; // local max if (i - lastPeakWin < minDistWin) { // keep the stronger of the two within the refractory window if (onsets.length && flux[i] > flux[lastPeakWin]) { onsets[onsets.length - 1] = ((i * winSamples) / sr) * 1000; lastPeakWin = i; } continue; } onsets.push(((i * winSamples) / sr) * 1000); lastPeakWin = i; } return onsets; } // ──────── Streaming (live mic) ───────────────────────────────────── /** * Adaptive, real-time onset detector fed by filtered audio blocks. * * Threshold adapts to a slowly-tracked noise floor so it works without a * global maximum (impossible to know live). An onset fires when the flux * spikes well above the floor and the refractory window has elapsed. */ export class StreamingOnsetDetector { /** * @param {object} o * @param {number} o.sampleRate * @param {(timeMs:number, strength:number)=>void} o.onOnset * @param {number} [o.winMs] energy window (ms) * @param {number} [o.refractoryMs] min gap between onsets * @param {number} [o.thresholdFactor] flux must exceed floor*factor * @param {number} [o.absMin] absolute floor on flux energy * @param {number} [o.peakRatio] flux must also exceed peak*ratio, * where peak is a decaying max of * recent flux. Mirrors the offline * detector's global-peak threshold so * mid-level strays between loud clicks * are rejected even when the noise * floor is near zero. Calibrated on * real robot-clap audio (2026-06-01). * @param {number} [o.peakDecayTau] time constant (s) for peak decay */ constructor(o) { this.sr = o.sampleRate; this.onOnset = o.onOnset; this.winMs = o.winMs ?? 5; this.refractoryMs = o.refractoryMs ?? 180; this.thresholdFactor = o.thresholdFactor ?? 4.0; this.absMin = o.absMin ?? 1e-5; this.peakRatio = o.peakRatio ?? 0.12; this.winSamples = Math.max(1, Math.round((this.winMs / 1000) * this.sr)); this._acc = 0; // energy accumulator for current window this._accN = 0; this._winIndex = 0; // running window counter since start this._prevEnergy = null; this._floor = this.absMin; // EMA noise floor of flux this._peak = 0; // decaying max of flux (adaptive loudness reference) this._lastOnsetMs = -Infinity; // EMA coefficient (~300 ms time constant on the flux floor) const tau = 0.3; this._alpha = Math.exp(-(this.winMs / 1000) / tau); // Peak decays slowly so detection adapts if the level drops over time. this._peakDecay = Math.exp(-(this.winMs / 1000) / (o.peakDecayTau ?? 4.0)); } /** Feed one block of already-highpassed samples. baseTimeMs = block start. */ process(block, baseTimeMs) { for (let i = 0; i < block.length; i++) { const v = block[i]; this._acc += v * v; this._accN += 1; if (this._accN >= this.winSamples) { const energy = this._acc; const winTimeMs = baseTimeMs + ((i + 1) / this.sr) * 1000; this._onWindow(energy, winTimeMs); this._acc = 0; this._accN = 0; this._winIndex += 1; } } } _onWindow(energy, timeMs) { if (this._prevEnergy !== null) { const flux = Math.max(0, energy - this._prevEnergy); this._peak *= this._peakDecay; if (flux > this._peak) this._peak = flux; // Two gates: above the adaptive noise floor (rejects rumble) AND a // fraction of recent peak loudness (rejects mid-level strays // between real clicks). const trigger = Math.max(this.absMin, this._floor * this.thresholdFactor); const loudEnough = flux >= this._peak * this.peakRatio; if (flux > trigger && loudEnough && timeMs - this._lastOnsetMs >= this.refractoryMs) { this._lastOnsetMs = timeMs; this.onOnset?.(timeMs, flux); } else { // only let the floor track quiet/background frames this._floor = this._alpha * this._floor + (1 - this._alpha) * flux; } } this._prevEnergy = energy; } }