morse-code / lib /detector.js
RemiFabre
tune: robust live detector + calibrated robot clap from hardware test
fa08634
Raw
History Blame Contribute Delete
8.64 kB
/**
* Transient-onset detector β€” clicks (antenna hits) and short beeps.
*
* Ported from marionette/tests/audio_analysis.py `detect_transient_onsets`:
* highpass -> short-term energy (5 ms windows) -> spectral flux
* (positive energy differences) -> peak pick with min separation.
*
* Two entry points sharing the same DSP core:
* - detectTransientOnsets(samples, sr, opts): pure, offline. Used by unit
* tests (synthetic audio) and the visualizer (on recorded buffers).
* - StreamingOnsetDetector: live mic. The highpass is done by a Web Audio
* BiquadFilterNode upstream; this class consumes filtered blocks and
* emits onsets in real time with an adaptive (noise-floor) threshold.
*
* Onset times are in MILLISECONDS to match wire.js / timing.js.
*/
// ──────── Pure DSP ────────────────────────────────────────────────
/**
* Single 2nd-order Butterworth high-pass (RBJ cookbook). Cascade twice for
* a ~4th-order roll-off (matches the Python `butter(4, ...)` intent).
*/
export function biquadHighpassOnce(samples, sr, fc, q = Math.SQRT1_2) {
const w0 = (2 * Math.PI * fc) / sr;
const cos = Math.cos(w0);
const sin = Math.sin(w0);
const alpha = sin / (2 * q);
const b0 = (1 + cos) / 2;
const b1 = -(1 + cos);
const b2 = (1 + cos) / 2;
const a0 = 1 + alpha;
const a1 = -2 * cos;
const a2 = 1 - alpha;
const nb0 = b0 / a0, nb1 = b1 / a0, nb2 = b2 / a0, na1 = a1 / a0, na2 = a2 / a0;
const out = new Float64Array(samples.length);
let x1 = 0, x2 = 0, y1 = 0, y2 = 0;
for (let i = 0; i < samples.length; i++) {
const x0 = samples[i];
const y0 = nb0 * x0 + nb1 * x1 + nb2 * x2 - na1 * y1 - na2 * y2;
x2 = x1; x1 = x0; y2 = y1; y1 = y0;
out[i] = y0;
}
return out;
}
export function highpass(samples, sr, fc) {
return biquadHighpassOnce(biquadHighpassOnce(samples, sr, fc), sr, fc);
}
/**
* Short-term energy per non-overlapping window, then positive flux.
* @returns {{flux:Float64Array, winSamples:number}}
*/
export function energyFlux(filtered, sr, winMs = 5) {
const winSamples = Math.max(1, Math.round((winMs / 1000) * sr));
const nWin = Math.floor(filtered.length / winSamples);
const energy = new Float64Array(nWin);
for (let w = 0; w < nWin; w++) {
let e = 0;
const base = w * winSamples;
for (let j = 0; j < winSamples; j++) {
const v = filtered[base + j];
e += v * v;
}
energy[w] = e;
}
const flux = new Float64Array(Math.max(0, nWin - 1));
for (let i = 1; i < nWin; i++) flux[i - 1] = Math.max(0, energy[i] - energy[i - 1]);
return { flux, winSamples };
}
/**
* Offline transient detection on a full buffer.
* @returns {number[]} onset times in ms.
*/
export function detectTransientOnsets(samples, sr, opts = {}) {
const { highpassHz = 2000, winMs = 5, thresholdRatio = 0.1, minSepMs = 150 } = opts;
if (!samples || samples.length < sr * (winMs / 1000) * 2) return [];
const filtered = highpass(samples, sr, highpassHz);
const { flux, winSamples } = energyFlux(filtered, sr, winMs);
if (flux.length < 2) return [];
let peak = 0;
for (let i = 0; i < flux.length; i++) if (flux[i] > peak) peak = flux[i];
if (peak < 1e-12) return [];
const threshold = peak * thresholdRatio;
const minDistWin = Math.max(1, Math.round((minSepMs / 1000) * sr / winSamples));
// local-maxima peak pick with min distance (mirrors scipy find_peaks)
const onsets = [];
let lastPeakWin = -Infinity;
for (let i = 1; i < flux.length - 1; i++) {
if (flux[i] < threshold) continue;
if (flux[i] < flux[i - 1] || flux[i] < flux[i + 1]) continue; // local max
if (i - lastPeakWin < minDistWin) {
// keep the stronger of the two within the refractory window
if (onsets.length && flux[i] > flux[lastPeakWin]) {
onsets[onsets.length - 1] = ((i * winSamples) / sr) * 1000;
lastPeakWin = i;
}
continue;
}
onsets.push(((i * winSamples) / sr) * 1000);
lastPeakWin = i;
}
return onsets;
}
// ──────── Streaming (live mic) ─────────────────────────────────────
/**
* Adaptive, real-time onset detector fed by filtered audio blocks.
*
* Threshold adapts to a slowly-tracked noise floor so it works without a
* global maximum (impossible to know live). An onset fires when the flux
* spikes well above the floor and the refractory window has elapsed.
*/
export class StreamingOnsetDetector {
/**
* @param {object} o
* @param {number} o.sampleRate
* @param {(timeMs:number, strength:number)=>void} o.onOnset
* @param {number} [o.winMs] energy window (ms)
* @param {number} [o.refractoryMs] min gap between onsets
* @param {number} [o.thresholdFactor] flux must exceed floor*factor
* @param {number} [o.absMin] absolute floor on flux energy
* @param {number} [o.peakRatio] flux must also exceed peak*ratio,
* where peak is a decaying max of
* recent flux. Mirrors the offline
* detector's global-peak threshold so
* mid-level strays between loud clicks
* are rejected even when the noise
* floor is near zero. Calibrated on
* real robot-clap audio (2026-06-01).
* @param {number} [o.peakDecayTau] time constant (s) for peak decay
*/
constructor(o) {
this.sr = o.sampleRate;
this.onOnset = o.onOnset;
this.winMs = o.winMs ?? 5;
this.refractoryMs = o.refractoryMs ?? 180;
this.thresholdFactor = o.thresholdFactor ?? 4.0;
this.absMin = o.absMin ?? 1e-5;
this.peakRatio = o.peakRatio ?? 0.12;
this.winSamples = Math.max(1, Math.round((this.winMs / 1000) * this.sr));
this._acc = 0; // energy accumulator for current window
this._accN = 0;
this._winIndex = 0; // running window counter since start
this._prevEnergy = null;
this._floor = this.absMin; // EMA noise floor of flux
this._peak = 0; // decaying max of flux (adaptive loudness reference)
this._lastOnsetMs = -Infinity;
// EMA coefficient (~300 ms time constant on the flux floor)
const tau = 0.3;
this._alpha = Math.exp(-(this.winMs / 1000) / tau);
// Peak decays slowly so detection adapts if the level drops over time.
this._peakDecay = Math.exp(-(this.winMs / 1000) / (o.peakDecayTau ?? 4.0));
}
/** Feed one block of already-highpassed samples. baseTimeMs = block start. */
process(block, baseTimeMs) {
for (let i = 0; i < block.length; i++) {
const v = block[i];
this._acc += v * v;
this._accN += 1;
if (this._accN >= this.winSamples) {
const energy = this._acc;
const winTimeMs = baseTimeMs + ((i + 1) / this.sr) * 1000;
this._onWindow(energy, winTimeMs);
this._acc = 0;
this._accN = 0;
this._winIndex += 1;
}
}
}
_onWindow(energy, timeMs) {
if (this._prevEnergy !== null) {
const flux = Math.max(0, energy - this._prevEnergy);
this._peak *= this._peakDecay;
if (flux > this._peak) this._peak = flux;
// Two gates: above the adaptive noise floor (rejects rumble) AND a
// fraction of recent peak loudness (rejects mid-level strays
// between real clicks).
const trigger = Math.max(this.absMin, this._floor * this.thresholdFactor);
const loudEnough = flux >= this._peak * this.peakRatio;
if (flux > trigger && loudEnough && timeMs - this._lastOnsetMs >= this.refractoryMs) {
this._lastOnsetMs = timeMs;
this.onOnset?.(timeMs, flux);
} else {
// only let the floor track quiet/background frames
this._floor = this._alpha * this._floor + (1 - this._alpha) * flux;
}
}
this._prevEnergy = energy;
}
}