Spaces:
Sleeping
Sleeping
| """ | |
| Stable Preprocessing Pipeline - v6.6.1 | |
| SINGLE source of truth for: | |
| - Canonical SR | |
| - Onset window lengths | |
| - Transient/tail slicing | |
| Key principle: Same audio → Same representation (deterministic) | |
| """ | |
| import os | |
| import io | |
| import numpy as np | |
| import librosa | |
| import soundfile as sf | |
| try: | |
| import soxr | |
| SOXR_AVAILABLE = True | |
| except ImportError: | |
| SOXR_AVAILABLE = False | |
| print("[preprocessing] Warning: soxr not available, using librosa (less deterministic)") | |
| # ========================= | |
| # CANONICAL SETTINGS | |
| # ========================= | |
| CANONICAL_SR = 48000 | |
| # Window around onset (ms) | |
| ONSET_PRE_MS = 15 | |
| ONSET_POST_MS = 735 # total window = 750ms | |
| # View presets (all in ms, relative to START OF WINDOW) | |
| # NOTE: start of window is ONSET_PRE_MS before onset. | |
| VIEW_PRESETS = { | |
| # Optimized for short one-shots / CNN-style event models | |
| "hits": { | |
| "TRANS_END_MS": 85, # first 85ms of window (15ms pre + 70ms post) | |
| "TAIL_START_MS": 30, # skip earliest transient region | |
| "TAIL_END_MS": 650, # capture body/decay | |
| }, | |
| # Sometimes helps transformer encoders on micro-clips (requires reindex to compare) | |
| "transformer": { | |
| "TRANS_END_MS": 140, # longer transient context | |
| "TAIL_START_MS": 40, | |
| "TAIL_END_MS": 700, | |
| } | |
| } | |
| DEFAULT_VIEW_PRESET = os.getenv("SCOUT_VIEW_PRESET", "hits").strip().lower() | |
| if DEFAULT_VIEW_PRESET not in VIEW_PRESETS: | |
| DEFAULT_VIEW_PRESET = "hits" | |
| # Export these so scout.py can’t drift | |
| TRANS_END_MS = VIEW_PRESETS[DEFAULT_VIEW_PRESET]["TRANS_END_MS"] | |
| TAIL_START_MS = VIEW_PRESETS[DEFAULT_VIEW_PRESET]["TAIL_START_MS"] | |
| TAIL_END_MS = VIEW_PRESETS[DEFAULT_VIEW_PRESET]["TAIL_END_MS"] | |
| def canonicalize_audio(audio: np.ndarray, sr: int): | |
| """ | |
| Deterministic audio canonicalization: | |
| 1) Resample to CANONICAL_SR | |
| 2) Mono | |
| 3) Peak normalize to ±1 | |
| 4) Remove DC offset | |
| """ | |
| if audio.ndim > 1: | |
| audio = np.mean(audio, axis=1) | |
| audio = audio.astype(np.float32, copy=False) | |
| if sr != CANONICAL_SR: | |
| if SOXR_AVAILABLE: | |
| audio = soxr.resample(audio, sr, CANONICAL_SR, quality="HQ") | |
| else: | |
| audio = librosa.resample(audio, orig_sr=sr, target_sr=CANONICAL_SR, res_type="kaiser_best") | |
| sr = CANONICAL_SR | |
| peak = float(np.max(np.abs(audio))) if audio.size else 0.0 | |
| if peak > 1e-6: | |
| audio = audio / peak | |
| audio = audio - float(np.mean(audio)) if audio.size else audio | |
| return audio.astype(np.float32, copy=False), CANONICAL_SR | |
| def detect_primary_onset_stable(audio: np.ndarray, sr: int) -> int: | |
| """ | |
| Deterministic onset detection with small zero-crossing refinement. | |
| Returns onset sample index. | |
| """ | |
| hop_length = 256 | |
| onset_env = librosa.onset.onset_strength( | |
| y=audio, | |
| sr=sr, | |
| hop_length=hop_length, | |
| aggregate=np.median, | |
| center=False | |
| ) | |
| peaks = librosa.util.peak_pick( | |
| onset_env, | |
| pre_max=3, | |
| post_max=3, | |
| pre_avg=3, | |
| post_avg=5, | |
| delta=0.05, | |
| wait=10 | |
| ) | |
| if len(peaks) == 0: | |
| return int(0.1 * sr) | |
| strongest_peak = int(peaks[int(np.argmax(onset_env[peaks]))]) | |
| onset_sample = int(librosa.frames_to_samples(strongest_peak, hop_length=hop_length)) | |
| # zero-crossing refinement | |
| window = 100 | |
| start = max(0, onset_sample - window) | |
| end = min(len(audio), onset_sample + window) | |
| if end > start + 2: | |
| seg = audio[start:end] | |
| zc = np.where(np.diff(np.sign(seg)))[0] | |
| if zc.size: | |
| center = window if onset_sample >= window else onset_sample - start | |
| onset_sample = start + int(zc[int(np.argmin(np.abs(zc - center)))]) | |
| return int(onset_sample) | |
| def extract_canonical_window(audio: np.ndarray, sr: int, onset_sample: int) -> np.ndarray: | |
| """ | |
| Extract fixed-length window around onset. | |
| Always returns exactly (ONSET_PRE_MS + ONSET_POST_MS) ms length at sr. | |
| """ | |
| pre_samples = int(ONSET_PRE_MS * sr / 1000.0) | |
| post_samples = int(ONSET_POST_MS * sr / 1000.0) | |
| expected = pre_samples + post_samples | |
| start = max(0, onset_sample - pre_samples) | |
| end = min(len(audio), onset_sample + post_samples) | |
| w = audio[start:end].astype(np.float32, copy=False) | |
| if w.size < expected: | |
| w = np.pad(w, (0, expected - w.size), mode="constant") | |
| elif w.size > expected: | |
| w = w[:expected] | |
| return w.astype(np.float32, copy=False) | |
| def preprocess_audio_stable(audio_bytes: bytes): | |
| """ | |
| MASTER preprocessing for QUERY uploads (file bytes): | |
| load -> canonicalize -> detect onset -> extract fixed window | |
| """ | |
| audio, sr = sf.read(io.BytesIO(audio_bytes), dtype="float32", always_2d=False) | |
| audio, sr = canonicalize_audio(audio, sr) | |
| onset_sample = detect_primary_onset_stable(audio, sr) | |
| window = extract_canonical_window(audio, sr, onset_sample) | |
| return { | |
| "audio": window, | |
| "sr": sr, | |
| "onset_time": onset_sample / sr, | |
| "onset_sample": onset_sample | |
| } | |
| def slice_views_stable(processed: dict, view_preset: str | None = None): | |
| """ | |
| Create full/trans/tail views from the canonical window. | |
| view_preset: | |
| - None => uses DEFAULT_VIEW_PRESET (env SCOUT_VIEW_PRESET) | |
| - "hits" or "transformer" | |
| """ | |
| audio = processed["audio"] | |
| sr = processed["sr"] | |
| preset = (view_preset or DEFAULT_VIEW_PRESET).strip().lower() | |
| if preset not in VIEW_PRESETS: | |
| preset = "hits" | |
| trans_end = int(VIEW_PRESETS[preset]["TRANS_END_MS"] * sr / 1000.0) | |
| tail_start = int(VIEW_PRESETS[preset]["TAIL_START_MS"] * sr / 1000.0) | |
| tail_end = int(VIEW_PRESETS[preset]["TAIL_END_MS"] * sr / 1000.0) | |
| full = audio | |
| trans = audio[:max(0, min(trans_end, audio.size))] | |
| tail = audio[max(0, min(tail_start, audio.size)):max(0, min(tail_end, audio.size))] | |
| return {"full": full, "trans": trans, "tail": tail} | |
| def verify_stability(): | |
| print("[preprocessing] Running stability test...") | |
| sr = 48000 | |
| t = np.linspace(0, 1.0, int(sr * 1.0), endpoint=False) | |
| audio = (np.sin(2 * np.pi * 200 * t) * np.exp(-t * 5)).astype(np.float32) | |
| bio = io.BytesIO() | |
| sf.write(bio, audio, sr, format="WAV") | |
| audio_bytes = bio.getvalue() | |
| outs = [] | |
| for _ in range(5): | |
| p = preprocess_audio_stable(audio_bytes) | |
| outs.append(p["audio"]) | |
| for i in range(1, len(outs)): | |
| diff = float(np.max(np.abs(outs[0] - outs[i]))) | |
| if diff > 1e-6: | |
| print(f"[preprocessing] ⚠️ Instability detected: {diff}") | |
| return False | |
| print("[preprocessing] ✓ Stability test passed") | |
| return True | |
| if __name__ == "__main__": | |
| verify_stability() | |