Spaces:
Running
Running
File size: 8,636 Bytes
843a4b2 fa08634 843a4b2 fa08634 843a4b2 fa08634 843a4b2 fa08634 843a4b2 fa08634 843a4b2 fa08634 843a4b2 fa08634 843a4b2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | /**
* Transient-onset detector β clicks (antenna hits) and short beeps.
*
* Ported from marionette/tests/audio_analysis.py `detect_transient_onsets`:
* highpass -> short-term energy (5 ms windows) -> spectral flux
* (positive energy differences) -> peak pick with min separation.
*
* Two entry points sharing the same DSP core:
* - detectTransientOnsets(samples, sr, opts): pure, offline. Used by unit
* tests (synthetic audio) and the visualizer (on recorded buffers).
* - StreamingOnsetDetector: live mic. The highpass is done by a Web Audio
* BiquadFilterNode upstream; this class consumes filtered blocks and
* emits onsets in real time with an adaptive (noise-floor) threshold.
*
* Onset times are in MILLISECONDS to match wire.js / timing.js.
*/
// ββββββββ Pure DSP ββββββββββββββββββββββββββββββββββββββββββββββββ
/**
* Single 2nd-order Butterworth high-pass (RBJ cookbook). Cascade twice for
* a ~4th-order roll-off (matches the Python `butter(4, ...)` intent).
*/
export function biquadHighpassOnce(samples, sr, fc, q = Math.SQRT1_2) {
const w0 = (2 * Math.PI * fc) / sr;
const cos = Math.cos(w0);
const sin = Math.sin(w0);
const alpha = sin / (2 * q);
const b0 = (1 + cos) / 2;
const b1 = -(1 + cos);
const b2 = (1 + cos) / 2;
const a0 = 1 + alpha;
const a1 = -2 * cos;
const a2 = 1 - alpha;
const nb0 = b0 / a0, nb1 = b1 / a0, nb2 = b2 / a0, na1 = a1 / a0, na2 = a2 / a0;
const out = new Float64Array(samples.length);
let x1 = 0, x2 = 0, y1 = 0, y2 = 0;
for (let i = 0; i < samples.length; i++) {
const x0 = samples[i];
const y0 = nb0 * x0 + nb1 * x1 + nb2 * x2 - na1 * y1 - na2 * y2;
x2 = x1; x1 = x0; y2 = y1; y1 = y0;
out[i] = y0;
}
return out;
}
export function highpass(samples, sr, fc) {
return biquadHighpassOnce(biquadHighpassOnce(samples, sr, fc), sr, fc);
}
/**
* Short-term energy per non-overlapping window, then positive flux.
* @returns {{flux:Float64Array, winSamples:number}}
*/
export function energyFlux(filtered, sr, winMs = 5) {
const winSamples = Math.max(1, Math.round((winMs / 1000) * sr));
const nWin = Math.floor(filtered.length / winSamples);
const energy = new Float64Array(nWin);
for (let w = 0; w < nWin; w++) {
let e = 0;
const base = w * winSamples;
for (let j = 0; j < winSamples; j++) {
const v = filtered[base + j];
e += v * v;
}
energy[w] = e;
}
const flux = new Float64Array(Math.max(0, nWin - 1));
for (let i = 1; i < nWin; i++) flux[i - 1] = Math.max(0, energy[i] - energy[i - 1]);
return { flux, winSamples };
}
/**
* Offline transient detection on a full buffer.
* @returns {number[]} onset times in ms.
*/
export function detectTransientOnsets(samples, sr, opts = {}) {
const { highpassHz = 2000, winMs = 5, thresholdRatio = 0.1, minSepMs = 150 } = opts;
if (!samples || samples.length < sr * (winMs / 1000) * 2) return [];
const filtered = highpass(samples, sr, highpassHz);
const { flux, winSamples } = energyFlux(filtered, sr, winMs);
if (flux.length < 2) return [];
let peak = 0;
for (let i = 0; i < flux.length; i++) if (flux[i] > peak) peak = flux[i];
if (peak < 1e-12) return [];
const threshold = peak * thresholdRatio;
const minDistWin = Math.max(1, Math.round((minSepMs / 1000) * sr / winSamples));
// local-maxima peak pick with min distance (mirrors scipy find_peaks)
const onsets = [];
let lastPeakWin = -Infinity;
for (let i = 1; i < flux.length - 1; i++) {
if (flux[i] < threshold) continue;
if (flux[i] < flux[i - 1] || flux[i] < flux[i + 1]) continue; // local max
if (i - lastPeakWin < minDistWin) {
// keep the stronger of the two within the refractory window
if (onsets.length && flux[i] > flux[lastPeakWin]) {
onsets[onsets.length - 1] = ((i * winSamples) / sr) * 1000;
lastPeakWin = i;
}
continue;
}
onsets.push(((i * winSamples) / sr) * 1000);
lastPeakWin = i;
}
return onsets;
}
// ββββββββ Streaming (live mic) βββββββββββββββββββββββββββββββββββββ
/**
* Adaptive, real-time onset detector fed by filtered audio blocks.
*
* Threshold adapts to a slowly-tracked noise floor so it works without a
* global maximum (impossible to know live). An onset fires when the flux
* spikes well above the floor and the refractory window has elapsed.
*/
export class StreamingOnsetDetector {
/**
* @param {object} o
* @param {number} o.sampleRate
* @param {(timeMs:number, strength:number)=>void} o.onOnset
* @param {number} [o.winMs] energy window (ms)
* @param {number} [o.refractoryMs] min gap between onsets
* @param {number} [o.thresholdFactor] flux must exceed floor*factor
* @param {number} [o.absMin] absolute floor on flux energy
* @param {number} [o.peakRatio] flux must also exceed peak*ratio,
* where peak is a decaying max of
* recent flux. Mirrors the offline
* detector's global-peak threshold so
* mid-level strays between loud clicks
* are rejected even when the noise
* floor is near zero. Calibrated on
* real robot-clap audio (2026-06-01).
* @param {number} [o.peakDecayTau] time constant (s) for peak decay
*/
constructor(o) {
this.sr = o.sampleRate;
this.onOnset = o.onOnset;
this.winMs = o.winMs ?? 5;
this.refractoryMs = o.refractoryMs ?? 180;
this.thresholdFactor = o.thresholdFactor ?? 4.0;
this.absMin = o.absMin ?? 1e-5;
this.peakRatio = o.peakRatio ?? 0.12;
this.winSamples = Math.max(1, Math.round((this.winMs / 1000) * this.sr));
this._acc = 0; // energy accumulator for current window
this._accN = 0;
this._winIndex = 0; // running window counter since start
this._prevEnergy = null;
this._floor = this.absMin; // EMA noise floor of flux
this._peak = 0; // decaying max of flux (adaptive loudness reference)
this._lastOnsetMs = -Infinity;
// EMA coefficient (~300 ms time constant on the flux floor)
const tau = 0.3;
this._alpha = Math.exp(-(this.winMs / 1000) / tau);
// Peak decays slowly so detection adapts if the level drops over time.
this._peakDecay = Math.exp(-(this.winMs / 1000) / (o.peakDecayTau ?? 4.0));
}
/** Feed one block of already-highpassed samples. baseTimeMs = block start. */
process(block, baseTimeMs) {
for (let i = 0; i < block.length; i++) {
const v = block[i];
this._acc += v * v;
this._accN += 1;
if (this._accN >= this.winSamples) {
const energy = this._acc;
const winTimeMs = baseTimeMs + ((i + 1) / this.sr) * 1000;
this._onWindow(energy, winTimeMs);
this._acc = 0;
this._accN = 0;
this._winIndex += 1;
}
}
}
_onWindow(energy, timeMs) {
if (this._prevEnergy !== null) {
const flux = Math.max(0, energy - this._prevEnergy);
this._peak *= this._peakDecay;
if (flux > this._peak) this._peak = flux;
// Two gates: above the adaptive noise floor (rejects rumble) AND a
// fraction of recent peak loudness (rejects mid-level strays
// between real clicks).
const trigger = Math.max(this.absMin, this._floor * this.thresholdFactor);
const loudEnough = flux >= this._peak * this.peakRatio;
if (flux > trigger && loudEnough && timeMs - this._lastOnsetMs >= this.refractoryMs) {
this._lastOnsetMs = timeMs;
this.onOnset?.(timeMs, flux);
} else {
// only let the floor track quiet/background frames
this._floor = this._alpha * this._floor + (1 - this._alpha) * flux;
}
}
this._prevEnergy = energy;
}
}
|