"""Audio analytics for usage logging. Computed synchronously at log-write time, after the pipeline has already yielded its final result to the caller. Inputs (waveform + VAD speech intervals) are already in scope, so wall-time impact on the UI is invisible. Granularities: - whole : full post-resample waveform - speech : concat of VAD speech intervals (actual recitation) - nonspeech : concat of VAD gaps (room tone / noise floor / hum) SNR is 20*log10(rms_speech / rms_nonspeech). If either region is empty (e.g. pure-speech clip with no VAD gaps), SNR is null. """ from __future__ import annotations import time import numpy as np try: from scipy import signal as _sig _HAVE_SCIPY = True except Exception: _HAVE_SCIPY = False _EPS = 1e-12 _HUM_TARGETS = { "hum_50hz_db": (50.0, 100.0, 150.0), "hum_60hz_db": (60.0, 120.0, 180.0), } def _db(x: float) -> float: return float(20.0 * np.log10(max(x, _EPS))) def _rms(a: np.ndarray) -> float: if a.size == 0: return 0.0 return float(np.sqrt(np.dot(a, a) / a.size)) def _concat_intervals(audio: np.ndarray, sr: int, intervals_s, invert: bool = False) -> np.ndarray: """Concatenate audio from (start_s, end_s) intervals. invert=True returns the complement (gaps between the given intervals, plus head/tail if the intervals don't cover the full clip). """ if intervals_s is None or len(intervals_s) == 0: return audio if invert else np.asarray([], dtype=audio.dtype) n = audio.size samples = [] for s, e in intervals_s: ss = max(0, int(float(s) * sr)) ee = min(n, int(float(e) * sr)) if ee > ss: samples.append((ss, ee)) if not samples: return audio if invert else np.asarray([], dtype=audio.dtype) # Sort (VAD output already sorted, but be defensive) samples.sort() if invert: gaps = [] cur = 0 for s, e in samples: if s > cur: gaps.append((cur, s)) cur = e if cur < n: gaps.append((cur, n)) samples = gaps if not samples: return np.asarray([], dtype=audio.dtype) parts = [audio[s:e] for s, e in samples] return np.concatenate(parts) def _psd(a: np.ndarray, sr: int): """Welch PSD. Returns (freqs, pxx) or (empty, empty) if region too short.""" if not _HAVE_SCIPY or a.size < 512: return np.asarray([]), np.asarray([]) nperseg = 4096 if a.size >= 4096 else 1 << (a.size.bit_length() - 1) f, pxx = _sig.welch(a, fs=sr, nperseg=nperseg) return f, pxx def _spectral_centroid_hz(f, pxx) -> float: if f.size == 0: return 0.0 s = pxx.sum() if s <= 0: return 0.0 return float((f * pxx).sum() / s) def _spectral_rolloff_hz(f, pxx, pct: float = 0.85) -> float: if f.size == 0: return 0.0 cum = np.cumsum(pxx) total = cum[-1] if total <= 0: return 0.0 idx = int(np.searchsorted(cum, pct * total)) idx = min(idx, f.size - 1) return float(f[idx]) def _spectral_flatness(pxx) -> float: """Wiener entropy: geometric_mean / arithmetic_mean of PSD bins. 1.0 = white noise (flat spectrum). 0.0 = pure tone. Low flatness on non-speech regions hints at hum or tonal interference. """ if pxx.size == 0: return 0.0 p = np.maximum(pxx, _EPS) gm = float(np.exp(np.log(p).mean())) am = float(p.mean()) return gm / am if am > 0 else 0.0 def _bandwidth_hz(f, pxx, drop_db: float = 60.0) -> float: """Highest freq bin where PSD is within `drop_db` of peak. Proxy for recording bandwidth — 8kHz band-limited audio (phone, lossy) returns ~3-4kHz; full-band studio returns near Nyquist. """ if f.size == 0: return 0.0 pdb = 10.0 * np.log10(np.maximum(pxx, _EPS)) threshold = pdb.max() - drop_db above = np.where(pdb >= threshold)[0] if above.size == 0: return 0.0 return float(f[above[-1]]) def _hum_db(f, pxx, targets: tuple, tol_hz: float = 3.0) -> float: """Peak PSD across target freqs ± tol_hz, in dB/Hz.""" if f.size == 0: return -120.0 best = _EPS for t in targets: mask = (f >= t - tol_hz) & (f <= t + tol_hz) if mask.any(): peak = float(pxx[mask].max()) if peak > best: best = peak return round(10.0 * np.log10(max(best, _EPS)), 2) def _whole_block(audio: np.ndarray, sr: int) -> dict: rms = _rms(audio) abs_a = np.abs(audio) peak = float(abs_a.max()) if audio.size else 0.0 dc = float(audio.mean()) if audio.size else 0.0 p99 = float(np.percentile(abs_a, 99)) if audio.size else 0.0 p01 = float(np.percentile(abs_a, 1)) if audio.size else 0.0 crest = (peak / rms) if rms > _EPS else 0.0 f, pxx = _psd(audio, sr) return { "rms": round(rms, 5), "rms_db": round(_db(rms), 2), "peak": round(peak, 5), "peak_db": round(_db(peak), 2), "dc_offset": round(dc, 6), "p99": round(p99, 5), "p01": round(p01, 5), "crest": round(crest, 3), "dyn_range_db": round(_db(p99) - _db(max(p01, _EPS)), 2), "bandwidth_hz": round(_bandwidth_hz(f, pxx), 1), "duration_s": round(audio.size / sr, 3), } def _speech_block(speech: np.ndarray, sr: int) -> dict: rms = _rms(speech) f, pxx = _psd(speech, sr) return { "rms": round(rms, 5), "rms_db": round(_db(rms), 2), "spectral_centroid_hz": round(_spectral_centroid_hz(f, pxx), 1), "spectral_rolloff_hz": round(_spectral_rolloff_hz(f, pxx), 1), "bandwidth_hz": round(_bandwidth_hz(f, pxx), 1), "duration_s": round(speech.size / sr, 3), } def _nonspeech_block(nonspeech: np.ndarray, sr: int) -> dict: rms = _rms(nonspeech) f, pxx = _psd(nonspeech, sr) out = { "rms": round(rms, 5), "rms_db": round(_db(rms), 2), "spectral_flatness": round(_spectral_flatness(pxx), 4), "spectral_centroid_hz": round(_spectral_centroid_hz(f, pxx), 1), "duration_s": round(nonspeech.size / sr, 3), } for key, targets in _HUM_TARGETS.items(): out[key] = _hum_db(f, pxx, targets) return out def compute_audio_analytics(audio: np.ndarray, sr: int, speech_intervals_s) -> dict: """Compute whole/speech/nonspeech + SNR analytics. Expected runtime ~100-800ms depending on audio length and scipy FFT cache warmth. Caller is responsible for choosing when to run (after user-visible response has been delivered). Returns {} on empty input. Speech/nonspeech blocks may be {} if the respective concatenated region is empty (e.g. pure silence, or VAD intervals cover the full clip). """ t0 = time.time() if audio is None or getattr(audio, "size", 0) == 0: return {} # Ensure float32 for numerical ops if audio.dtype != np.float32: audio = audio.astype(np.float32, copy=False) whole = _whole_block(audio, sr) speech = _concat_intervals(audio, sr, speech_intervals_s, invert=False) nonspeech = _concat_intervals(audio, sr, speech_intervals_s, invert=True) speech_b = _speech_block(speech, sr) if speech.size else {} nonspeech_b = _nonspeech_block(nonspeech, sr) if nonspeech.size else {} snr_db = None rs = speech_b.get("rms") rn = nonspeech_b.get("rms") if rs and rn and rs > _EPS and rn > _EPS: snr_db = round(20.0 * float(np.log10(rs / rn)), 2) return { "whole": whole, "speech": speech_b, "nonspeech": nonspeech_b, "snr_db": snr_db, "noise_floor_rms": nonspeech_b.get("rms"), # convenience for per-segment SNR "compute_ms": round((time.time() - t0) * 1000, 1), } def compute_noise_floor_rms(audio: np.ndarray, sr: int, speech_intervals_s) -> float | None: """Fast path: RMS of the concatenated non-speech region only. Cheap (~10-50ms even on 48-min clips — one concat + one dot product). Used sync at log-write time so `segments[*].audio_stats.snr_db` can be populated on the response path while the full `audio_analytics` dict (with the expensive Welch PSDs) is computed post-yield in a bg thread. Returns None if the non-speech concat is empty or audio invalid. """ if audio is None or getattr(audio, "size", 0) == 0: return None if audio.dtype != np.float32: audio = audio.astype(np.float32, copy=False) nonspeech = _concat_intervals(audio, sr, speech_intervals_s, invert=True) if nonspeech.size == 0: return None return _rms(nonspeech) def segment_audio_stats(audio: np.ndarray, sr: int, start_s: float, end_s: float, noise_floor_rms: float | None) -> dict: """Per-segment {rms, peak, snr_db}. Cheap — one slice + one dot. SNR is relative to the clip-level noise floor (non-speech concat RMS). Null if noise floor unavailable or zero. """ n = audio.size ss = max(0, int(float(start_s) * sr)) ee = min(n, int(float(end_s) * sr)) if ee <= ss: return {"rms": 0.0, "peak": 0.0, "snr_db": None} slc = audio[ss:ee] rms = _rms(slc) peak = float(np.abs(slc).max()) if slc.size else 0.0 snr = None if noise_floor_rms and noise_floor_rms > _EPS and rms > _EPS: snr = round(20.0 * float(np.log10(rms / noise_floor_rms)), 2) return { "rms": round(rms, 5), "peak": round(peak, 5), "snr_db": snr, }