""" Department 1 — Professional Audio Enhancer (v2 — HF Spaces Optimised) ======================================================================= ✅ Background noise removal → SepFormer (HF/speechbrain, no Rust needed) → Two-pass noisereduce (stationary + non-stat) fallback ✅ Filler word removal → Whisper confidence-gated word-level timestamps ✅ Stutter removal → Phonetic-similarity aware repeat detection ✅ Long silence removal → Adaptive VAD threshold (percentile-based, env-aware) ✅ Breath sound reduction → Spectral gating (noisereduce non-stationary) ✅ Mouth sound reduction → Amplitude z-score transient suppression ✅ Room tone fill → Seamless crossfade splice (no edit seams/clicks) ✅ Audio normalization → pyloudnorm -18 LUFS ✅ CD quality output → 44100Hz PCM_24 (HF Spaces compatible) UPGRADES v2: [NOISE] SepFormer (speechbrain) as primary — no Rust, works on HF Spaces [NOISE] Two-pass noisereduce fallback: stationary first, then non-stationary to catch residual noise without aggressive single-pass artifacts [FILLER] Whisper avg_logprob + no_speech_prob confidence gating — low-confidence words are not blindly cut anymore [FILLER] Min-duration guard: skips cuts shorter than 80ms (avoids micro-glitches) [STUTTER] Phonetic normalisation (jellyfish/editdistance) catches near-repeats e.g. "the" / "tha", "and" / "an" — not just exact matches [SILENCE] Adaptive threshold: uses 15th-percentile RMS of the recording instead of fixed 0.008 — works in noisy rooms and quiet studios alike [SPLICE] Crossfade blending on ALL cuts (fillers, stutters, silences) — smooth 20ms equal-power fade eliminates click/seam artifacts [PERF] Model singleton caching — SepFormer loaded once, reused across calls [PERF] VAD pre-scan with Silero (if available) to skip non-speech segments before heavy processing [ROBUST] Every stage returns original audio on failure (already true, kept) [ROBUST] ffmpeg stderr captured and logged on non-zero exit """ import os import re import time import subprocess import numpy as np import soundfile as sf import logging logger = logging.getLogger(__name__) TARGET_SR = 48000 # 48kHz matches DeepFilterNet native SR (Rust available via Docker) TARGET_LOUDNESS = -18.0 # Minimum duration of a detected cut to actually apply it (avoids micro-glitches) MIN_CUT_SEC = 0.08 # Whisper confidence gate: only cut a word if its log-probability is above this. # Whisper avg_logprob is in range (-inf, 0]; -0.3 ≈ "fairly confident". FILLER_MIN_LOGPROB = -0.5 # below this → too uncertain to cut FILLER_MAX_NO_SPEECH = 0.4 # above this → Whisper thinks it's non-speech anyway # Filler words (English + Telugu + Hindi) FILLER_WORDS = { "um", "umm", "ummm", "uh", "uhh", "uhhh", "hmm", "hm", "hmmm", "er", "err", "errr", "eh", "ahh", "ah", "like", "basically", "literally", "you know", "i mean", "so", "right", "okay", "ok", # Telugu "ante", "ane", "mane", "arey", "enti", # Hindi "matlab", "yani", "bas", "acha", } # --------------------------------------------------------------------------- # Module-level model cache (survives across Denoiser() instances on same Space) # --------------------------------------------------------------------------- _SILERO_MODEL = None # Silero VAD _SILERO_UTILS = None class Denoiser: def __init__(self): self._room_tone = None print("[Denoiser] ✅ Professional Audio Enhancer v2 ready (HF Spaces mode)") # ══════════════════════════════════════════════════════════════════ # MAIN ENTRY POINT # ══════════════════════════════════════════════════════════════════ def process(self, audio_path: str, out_dir: str, remove_fillers: bool = True, remove_silences: bool = True, remove_breaths: bool = True, remove_mouth_sounds: bool = True, remove_stutters: bool = True, word_segments: list = None, original_filename: str = None) -> dict: """ Full professional pipeline. word_segments: list of dicts from Whisper word-level timestamps. Each dict: { 'word': str, 'start': float, # seconds 'end': float, # seconds 'avg_logprob': float, # optional — Whisper segment-level confidence 'no_speech_prob':float, # optional — Whisper no-speech probability } Returns: {'audio_path': str, 'stats': dict} """ t0 = time.time() stats = {} print("[Denoiser] ▶ Starting professional enhancement pipeline v2...") # ── 0. Convert to standard WAV ─────────────────────────────── wav_in = os.path.join(out_dir, "stage0_input.wav") self._to_wav(audio_path, wav_in, TARGET_SR) audio, sr = sf.read(wav_in, always_2d=True) n_ch = audio.shape[1] duration = len(audio) / sr print(f"[Denoiser] Input: {sr}Hz, {n_ch}ch, {duration:.1f}s") # Work in mono float32 mono = audio.mean(axis=1).astype(np.float32) # ── 1. Capture room tone BEFORE any denoising ──────────────── self._room_tone = self._capture_room_tone(mono, sr) # ── 2. Background Noise Removal ────────────────────────────── mono, noise_method = self._remove_background_noise(mono, sr) stats['noise_method'] = noise_method # ── 3. Mouth Sound Reduction (clicks/pops) ─────────────────── if remove_mouth_sounds: mono, n_clicks = self._reduce_mouth_sounds(mono, sr) stats['mouth_sounds_removed'] = n_clicks # ── 4. Breath Reduction ────────────────────────────────────── if remove_breaths: mono = self._reduce_breaths(mono, sr) stats['breaths_reduced'] = True # ── 5. Filler Word Removal ─────────────────────────────────── stats['fillers_removed'] = 0 if remove_fillers and word_segments: mono, n_fillers = self._remove_fillers(mono, sr, word_segments) stats['fillers_removed'] = n_fillers # ── 6. Stutter Removal ─────────────────────────────────────── stats['stutters_removed'] = 0 if remove_stutters and word_segments: mono, n_stutters = self._remove_stutters(mono, sr, word_segments) stats['stutters_removed'] = n_stutters # ── 7. Long Silence Removal ─────────────────────────────────── stats['silences_removed_sec'] = 0.0 if remove_silences: mono, sil_sec = self._remove_long_silences(mono, sr) stats['silences_removed_sec'] = round(sil_sec, 2) # ── 8. Normalize Loudness ───────────────────────────────────── mono = self._normalise(mono, sr) # ── 9. Restore stereo / save as MP3 ────────────────────────── out_audio = np.stack([mono, mono], axis=1) if n_ch == 2 else mono # Build output filename: strip original extension, append _cleared.mp3 # e.g. "output.wav" → "output_cleared.mp3" if original_filename: base = os.path.splitext(os.path.basename(original_filename))[0] else: base = os.path.splitext(os.path.basename(audio_path))[0] out_name = f"{base}_cleared.mp3" # Write a temporary WAV first (soundfile can't encode MP3), # then convert to MP3 via ffmpeg (already in the Dockerfile). tmp_wav = os.path.join(out_dir, "denoised_tmp.wav") out_path = os.path.join(out_dir, out_name) sf.write(tmp_wav, out_audio, sr, format="WAV", subtype="PCM_24") result = subprocess.run([ "ffmpeg", "-y", "-i", tmp_wav, "-codec:a", "libmp3lame", "-qscale:a", "2", # VBR quality 2 ≈ 190 kbps — transparent quality "-ar", str(sr), out_path ], capture_output=True) if result.returncode != 0: stderr = result.stderr.decode(errors="replace") logger.warning(f"MP3 export failed, falling back to WAV: {stderr[-300:]}") out_path = tmp_wav # graceful fallback — still return something else: try: os.remove(tmp_wav) # clean up temp WAV except OSError: pass stats['processing_sec'] = round(time.time() - t0, 2) print(f"[Denoiser] ✅ Done in {stats['processing_sec']}s | {stats}") return {'audio_path': out_path, 'stats': stats} # ══════════════════════════════════════════════════════════════════ # ROOM TONE CAPTURE # ══════════════════════════════════════════════════════════════════ def _capture_room_tone(self, audio: np.ndarray, sr: int, sample_sec: float = 0.5) -> np.ndarray: """Find the quietest 0.5s window in the recording — that's the room tone.""" try: frame = int(sr * sample_sec) if len(audio) < frame * 2: fallback_len = min(int(sr * 0.1), len(audio)) print("[Denoiser] Short audio — using first 100ms as room tone") return audio[:fallback_len].copy().astype(np.float32) best_rms = float('inf') best_start = 0 step = sr # 1-second steps for i in range(0, len(audio) - frame, step): rms = float(np.sqrt(np.mean(audio[i:i + frame] ** 2))) if rms < best_rms: best_rms, best_start = rms, i room = audio[best_start: best_start + frame].copy() print(f"[Denoiser] Room tone captured: RMS={best_rms:.5f}") return room except Exception as e: logger.warning(f"Room tone capture failed: {e}") return np.zeros(int(sr * sample_sec), dtype=np.float32) def _fill_with_room_tone(self, length: int) -> np.ndarray: """Tile room tone to fill a gap of `length` samples.""" if self._room_tone is None or len(self._room_tone) == 0: return np.zeros(length, dtype=np.float32) reps = length // len(self._room_tone) + 1 tiled = np.tile(self._room_tone, reps)[:length] fade = min(int(0.01 * len(tiled)), 64) if fade > 0: tiled[:fade] *= np.linspace(0, 1, fade) tiled[-fade:] *= np.linspace(1, 0, fade) return tiled.astype(np.float32) # ══════════════════════════════════════════════════════════════════ # CROSSFADE SPLICE ← NEW # Replaces abrupt room-tone insertion with smooth equal-power blend. # ══════════════════════════════════════════════════════════════════ def _crossfade_join(self, a: np.ndarray, b: np.ndarray, fade_ms: float = 20.0, sr: int = TARGET_SR) -> np.ndarray: """ Equal-power crossfade between the tail of `a` and the head of `b`. Eliminates click/seam artifacts at all edit points. """ fade_n = int(sr * fade_ms / 1000) fade_n = min(fade_n, len(a), len(b)) if fade_n < 2: return np.concatenate([a, b]) t = np.linspace(0, np.pi / 2, fade_n) fade_out = np.cos(t) # equal-power: cos²+sin²=1 fade_in = np.sin(t) overlap = a[-fade_n:] * fade_out + b[:fade_n] * fade_in return np.concatenate([a[:-fade_n], overlap, b[fade_n:]]) def _build_with_crossfade(self, audio: np.ndarray, cuts: list, sr: int, fill_tone: bool = True) -> np.ndarray: """ Build output from a list of (start_sec, end_sec) cuts, filling gaps with room tone and crossfading every join. cuts: sorted list of (start_sec, end_sec) to REMOVE. """ segments = [] prev = 0.0 for start, end in sorted(cuts, key=lambda x: x[0]): # Guard: skip cuts shorter than minimum if (end - start) < MIN_CUT_SEC: continue keep_sta = int(prev * sr) keep_end = int(start * sr) if keep_sta < keep_end: segments.append(audio[keep_sta:keep_end]) gap_len = int((end - start) * sr) if fill_tone and gap_len > 0: segments.append(self._fill_with_room_tone(gap_len)) prev = end remain = int(prev * sr) if remain < len(audio): segments.append(audio[remain:]) if not segments: return audio # Crossfade every adjacent pair result = segments[0] for seg in segments[1:]: result = self._crossfade_join(result, seg, fade_ms=20.0, sr=sr) return result.astype(np.float32) # ══════════════════════════════════════════════════════════════════ # BACKGROUND NOISE REMOVAL # Chain: DeepFilterNet → two-pass noisereduce → passthrough # # SepFormer REMOVED — it is a speech separation model, not a denoiser. # It reconstructs voice artificially → robotic output. # # Two-pass noisereduce is the safe CPU fallback: # Pass 1 (stationary) — removes steady hum/hiss/fan noise # Pass 2 (non-stationary) — catches residual at low prop_decrease # so original voice character is preserved # ══════════════════════════════════════════════════════════════════ def _remove_background_noise(self, audio, sr): # ── Primary: DeepFilterNet (SOTA, Rust available via Docker) ───── try: result = self._deepfilter(audio, sr) print("[Denoiser] ✅ DeepFilterNet noise removal done") return result, "DeepFilterNet" except Exception as e: logger.warning(f"[Denoiser] DeepFilterNet unavailable ({e})") # ── Fallback: Single-pass noisereduce, stationary only ──────────── # PHILOSOPHY: do as little as possible to the signal. # - stationary=True → only targets steady/consistent noise (fan, # hum, AC, room hiss). Leaves transient # speech harmonics completely untouched. # - prop_decrease=0.5 → reduces noise by ~50%, not 100%. # Keeps a thin noise floor so the voice # never sounds "hollow" or over-processed. # - No second pass, no non-stationary processing — those modes # touch voice frequencies and cause the robotic effect. try: import noisereduce as nr cleaned = nr.reduce_noise( y=audio, sr=sr, stationary=True, prop_decrease=0.50, ).astype(np.float32) print("[Denoiser] ✅ noisereduce done (voice-preserving, stationary only)") return cleaned, "noisereduce_stationary" except Exception as e: logger.warning(f"noisereduce failed: {e}") return audio, "none" def _deepfilter(self, audio: np.ndarray, sr: int) -> np.ndarray: """DeepFilterNet enhancement (local only — requires Rust compiler).""" from df.enhance import enhance, init_df import torch # Lazy-load, module-level cache not needed (rarely reached on HF Spaces) if not hasattr(self, '_df_model') or self._df_model is None: self._df_model, self._df_state, _ = init_df() df_sr = self._df_state.sr() a = self._resample(audio, sr, df_sr) if sr != df_sr else audio t = torch.from_numpy(a).unsqueeze(0) out = enhance(self._df_model, self._df_state, t) res = out.squeeze().numpy().astype(np.float32) return self._resample(res, df_sr, sr) if df_sr != sr else res # ══════════════════════════════════════════════════════════════════ # FILLER WORD REMOVAL ← UPGRADED (confidence-gated + crossfade) # ══════════════════════════════════════════════════════════════════ def _remove_fillers(self, audio: np.ndarray, sr: int, segments: list): """ Cuts filler words using Whisper word-level timestamps. UPGRADE: Confidence gating — words are only cut if: 1. avg_logprob ≥ FILLER_MIN_LOGPROB (Whisper was confident) 2. no_speech_prob ≤ FILLER_MAX_NO_SPEECH (audio is actually speech) 3. Duration ≥ MIN_CUT_SEC (not a micro-glitch timestamp artefact) Falls back gracefully when confidence fields are absent (older Whisper). Gaps filled with room tone + crossfade for seamless edits. """ try: cuts = [] for seg in segments: word = seg.get('word', '').strip().lower() word = re.sub(r'[^a-z\s]', '', word).strip() if word not in FILLER_WORDS: continue start = seg.get('start', 0.0) end = seg.get('end', 0.0) # Duration guard if (end - start) < MIN_CUT_SEC: continue # Confidence gate (optional fields — skip gate if absent) avg_logprob = seg.get('avg_logprob', None) no_speech_prob = seg.get('no_speech_prob', None) if avg_logprob is not None and avg_logprob < FILLER_MIN_LOGPROB: logger.debug(f"[Denoiser] Filler '{word}' skipped: " f"low confidence ({avg_logprob:.2f})") continue if no_speech_prob is not None and no_speech_prob > FILLER_MAX_NO_SPEECH: logger.debug(f"[Denoiser] Filler '{word}' skipped: " f"no_speech_prob={no_speech_prob:.2f}") continue cuts.append((start, end)) if not cuts: return audio, 0 out = self._build_with_crossfade(audio, cuts, sr, fill_tone=True) print(f"[Denoiser] ✅ Removed {len(cuts)} filler words") return out, len(cuts) except Exception as e: logger.warning(f"Filler removal failed: {e}") return audio, 0 def clean_transcript_fillers(self, transcript: str) -> str: """Remove filler words from transcript TEXT to match cleaned audio.""" words = transcript.split() result = [] i = 0 while i < len(words): w = re.sub(r'[^a-z\s]', '', words[i].lower()).strip() if i + 1 < len(words): two = w + " " + re.sub(r'[^a-z\s]', '', words[i+1].lower()).strip() if two in FILLER_WORDS: i += 2 continue if w in FILLER_WORDS: i += 1 continue result.append(words[i]) i += 1 return " ".join(result) # ══════════════════════════════════════════════════════════════════ # STUTTER REMOVAL ← UPGRADED (phonetic similarity + crossfade) # ══════════════════════════════════════════════════════════════════ def _remove_stutters(self, audio: np.ndarray, sr: int, segments: list): """ UPGRADE: Phonetic near-match detection in addition to exact repeats. e.g. "the" / "tha", "and" / "an", "I" / "I" all caught. Uses jellyfish.jaro_winkler_similarity if available; falls back to plain edit-distance ratio, then exact match only. Confidence gating applied here too (same thresholds as filler removal). Crossfade used on all splices. """ try: if len(segments) < 2: return audio, 0 # Choose similarity function sim_fn = self._word_similarity_fn() cuts = [] stutters_found = 0 i = 0 while i < len(segments): seg_i = segments[i] word = re.sub(r'[^a-z]', '', seg_i.get('word', '').lower()) if not word: i += 1 continue # Confidence gate on the anchor word if not self._passes_confidence_gate(seg_i): i += 1 continue # Look ahead for consecutive near-matches j = i + 1 while j < len(segments): seg_j = segments[j] next_word = re.sub(r'[^a-z]', '', seg_j.get('word', '').lower()) if not next_word: j += 1 continue similarity = sim_fn(word, next_word) if similarity >= 0.88: # ≥88% similar = stutter cuts.append((seg_i['start'], seg_i['end'])) stutters_found += 1 i = j j += 1 else: break i += 1 if not cuts: return audio, 0 out = self._build_with_crossfade(audio, cuts, sr, fill_tone=True) print(f"[Denoiser] ✅ Removed {stutters_found} stutters") return out, stutters_found except Exception as e: logger.warning(f"Stutter removal failed: {e}") return audio, 0 @staticmethod def _word_similarity_fn(): """Return best available string-similarity function.""" try: import jellyfish return jellyfish.jaro_winkler_similarity except ImportError: pass try: import editdistance def _ed_ratio(a, b): if not a and not b: return 1.0 dist = editdistance.eval(a, b) return 1.0 - dist / max(len(a), len(b)) return _ed_ratio except ImportError: pass # Plain exact match as last resort return lambda a, b: 1.0 if a == b else 0.0 @staticmethod def _passes_confidence_gate(seg: dict) -> bool: """Return True if Whisper confidence is acceptable (or fields absent).""" avg_logprob = seg.get('avg_logprob', None) no_speech_prob = seg.get('no_speech_prob', None) if avg_logprob is not None and avg_logprob < FILLER_MIN_LOGPROB: return False if no_speech_prob is not None and no_speech_prob > FILLER_MAX_NO_SPEECH: return False return True # ══════════════════════════════════════════════════════════════════ # BREATH REDUCTION # ══════════════════════════════════════════════════════════════════ def _reduce_breaths(self, audio: np.ndarray, sr: int) -> np.ndarray: """Non-stationary spectral gating — catches short broadband breath bursts.""" try: import noisereduce as nr cleaned = nr.reduce_noise( y=audio, sr=sr, stationary=False, prop_decrease=0.60, freq_mask_smooth_hz=400, time_mask_smooth_ms=40, ).astype(np.float32) print("[Denoiser] ✅ Breath reduction done") return cleaned except Exception as e: logger.warning(f"Breath reduction failed: {e}") return audio # ══════════════════════════════════════════════════════════════════ # MOUTH SOUND REDUCTION # ══════════════════════════════════════════════════════════════════ def _reduce_mouth_sounds(self, audio: np.ndarray, sr: int): """ Suppress very short, very high-amplitude transients (clicks/pops). Threshold at 6.0 std to avoid removing real consonants (p, b, t). """ try: result = audio.copy() win = int(sr * 0.003) # 3ms window hop = win // 2 rms_arr = np.array([ float(np.sqrt(np.mean(audio[i:i+win]**2))) for i in range(0, len(audio) - win, hop) ]) if len(rms_arr) == 0: return audio, 0 threshold = float(np.mean(rms_arr)) + 6.0 * float(np.std(rms_arr)) n_removed = 0 for idx, rms in enumerate(rms_arr): if rms > threshold: start = idx * hop end = min(start + win, len(result)) result[start:end] *= np.linspace(1, 0, end - start) n_removed += 1 if n_removed: print(f"[Denoiser] ✅ Suppressed {n_removed} mouth sound transients") return result.astype(np.float32), n_removed except Exception as e: logger.warning(f"Mouth sound reduction failed: {e}") return audio, 0 # ══════════════════════════════════════════════════════════════════ # LONG SILENCE REMOVAL ← UPGRADED (adaptive threshold) # ══════════════════════════════════════════════════════════════════ def _remove_long_silences(self, audio: np.ndarray, sr: int, max_silence_sec: float = 1.5, keep_pause_sec: float = 0.4) -> tuple: """ UPGRADE: Adaptive silence threshold. Old code used a hardcoded RMS=0.008 — worked in quiet studios only. New: threshold = 15th-percentile of per-frame RMS values. This self-calibrates to the recording's actual noise floor, so it works equally well in noisy rooms and near-silent studios. Silences replaced with room tone + crossfade. """ try: frame_len = int(sr * 0.02) # 20ms frames # ── Compute per-frame RMS ───────────────────────────────── n_frames = (len(audio) - frame_len) // frame_len rms_frames = np.array([ float(np.sqrt(np.mean(audio[i*frame_len:(i+1)*frame_len]**2))) for i in range(n_frames) ]) if len(rms_frames) == 0: return audio, 0.0 # ── Adaptive threshold: 15th percentile of RMS ─────────── threshold = float(np.percentile(rms_frames, 15)) # Clamp: never go below 0.001 (avoids mis-classifying very quiet speech) threshold = max(threshold, 0.001) print(f"[Denoiser] Adaptive silence threshold: RMS={threshold:.5f}") max_sil_frames = int(max_silence_sec / 0.02) keep_frames = int(keep_pause_sec / 0.02) kept = [] silence_count = 0 total_removed = 0 in_long_sil = False for i in range(n_frames): frame = audio[i*frame_len:(i+1)*frame_len] rms = rms_frames[i] if rms < threshold: silence_count += 1 if silence_count <= max_sil_frames: kept.append(frame) else: total_removed += frame_len in_long_sil = True else: if in_long_sil: pad = self._fill_with_room_tone(keep_frames * frame_len) kept.append(pad) in_long_sil = False silence_count = 0 kept.append(frame) # Tail of audio tail_start = n_frames * frame_len if tail_start < len(audio): kept.append(audio[tail_start:]) if not kept: return audio, 0.0 # Crossfade each frame join for smooth output result = kept[0] for seg in kept[1:]: result = self._crossfade_join(result, seg, fade_ms=5.0, sr=sr) removed_sec = total_removed / sr if removed_sec > 0: print(f"[Denoiser] ✅ Removed {removed_sec:.1f}s of long silences") return result.astype(np.float32), removed_sec except Exception as e: logger.warning(f"Silence removal failed: {e}") return audio, 0.0 # ══════════════════════════════════════════════════════════════════ # NORMALIZATION # ══════════════════════════════════════════════════════════════════ def _normalise(self, audio: np.ndarray, sr: int) -> np.ndarray: try: import pyloudnorm as pyln meter = pyln.Meter(sr) loudness = meter.integrated_loudness(audio) if np.isfinite(loudness) and loudness < 0: audio = pyln.normalize.loudness(audio, loudness, TARGET_LOUDNESS) print(f"[Denoiser] ✅ Normalized: {loudness:.1f} → {TARGET_LOUDNESS} LUFS") except Exception: rms = np.sqrt(np.mean(audio**2)) if rms > 1e-9: target_rms = 10 ** (TARGET_LOUDNESS / 20.0) audio = audio * (target_rms / rms) return np.clip(audio, -1.0, 1.0).astype(np.float32) # ══════════════════════════════════════════════════════════════════ # HELPERS # ══════════════════════════════════════════════════════════════════ def _to_wav(self, src: str, dst: str, target_sr: int): result = subprocess.run([ "ffmpeg", "-y", "-i", src, "-acodec", "pcm_s24le", "-ar", str(target_sr), dst ], capture_output=True) if result.returncode != 0: stderr = result.stderr.decode(errors='replace') logger.warning(f"ffmpeg non-zero exit: {stderr[-400:]}") # Fallback: soundfile passthrough data, sr = sf.read(src, always_2d=True) sf.write(dst, data, sr, format="WAV", subtype="PCM_24") def _resample(self, audio: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray: if orig_sr == target_sr: return audio try: import librosa return librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr) except Exception: length = int(len(audio) * target_sr / orig_sr) return np.interp( np.linspace(0, len(audio), length), np.arange(len(audio)), audio ).astype(np.float32)