Spaces:
Running
Running
| """ | |
| Department 1 β Professional Audio Enhancer (v2 β HF Spaces Optimised) | |
| ======================================================================= | |
| β Background noise removal β SepFormer (HF/speechbrain, no Rust needed) | |
| β Two-pass noisereduce (stationary + non-stat) fallback | |
| β Filler word removal β Whisper confidence-gated word-level timestamps | |
| β Stutter removal β Phonetic-similarity aware repeat detection | |
| β Long silence removal β Adaptive VAD threshold (percentile-based, env-aware) | |
| β Breath sound reduction β Spectral gating (noisereduce non-stationary) | |
| β Mouth sound reduction β Amplitude z-score transient suppression | |
| β Room tone fill β Seamless crossfade splice (no edit seams/clicks) | |
| β Audio normalization β pyloudnorm -18 LUFS | |
| β CD quality output β 44100Hz PCM_24 (HF Spaces compatible) | |
| UPGRADES v2: | |
| [NOISE] SepFormer (speechbrain) as primary β no Rust, works on HF Spaces | |
| [NOISE] Two-pass noisereduce fallback: stationary first, then non-stationary | |
| to catch residual noise without aggressive single-pass artifacts | |
| [FILLER] Whisper avg_logprob + no_speech_prob confidence gating β | |
| low-confidence words are not blindly cut anymore | |
| [FILLER] Min-duration guard: skips cuts shorter than 80ms (avoids micro-glitches) | |
| [STUTTER] Phonetic normalisation (jellyfish/editdistance) catches near-repeats | |
| e.g. "the" / "tha", "and" / "an" β not just exact matches | |
| [SILENCE] Adaptive threshold: uses 15th-percentile RMS of the recording | |
| instead of fixed 0.008 β works in noisy rooms and quiet studios alike | |
| [SPLICE] Crossfade blending on ALL cuts (fillers, stutters, silences) β | |
| smooth 20ms equal-power fade eliminates click/seam artifacts | |
| [PERF] Model singleton caching β SepFormer loaded once, reused across calls | |
| [PERF] VAD pre-scan with Silero (if available) to skip non-speech segments | |
| before heavy processing | |
| [ROBUST] Every stage returns original audio on failure (already true, kept) | |
| [ROBUST] ffmpeg stderr captured and logged on non-zero exit | |
| """ | |
| import os | |
| import re | |
| import time | |
| import subprocess | |
| import numpy as np | |
| import soundfile as sf | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| TARGET_SR = 48000 # 48kHz matches DeepFilterNet native SR (Rust available via Docker) | |
| TARGET_LOUDNESS = -18.0 | |
| # Minimum duration of a detected cut to actually apply it (avoids micro-glitches) | |
| MIN_CUT_SEC = 0.08 | |
| # Whisper confidence gate: only cut a word if its log-probability is above this. | |
| # Whisper avg_logprob is in range (-inf, 0]; -0.3 β "fairly confident". | |
| FILLER_MIN_LOGPROB = -0.5 # below this β too uncertain to cut | |
| FILLER_MAX_NO_SPEECH = 0.4 # above this β Whisper thinks it's non-speech anyway | |
| # Filler words (English + Telugu + Hindi) | |
| FILLER_WORDS = { | |
| "um", "umm", "ummm", "uh", "uhh", "uhhh", | |
| "hmm", "hm", "hmmm", | |
| "er", "err", "errr", | |
| "eh", "ahh", "ah", | |
| "like", "basically", "literally", | |
| "you know", "i mean", "so", | |
| "right", "okay", "ok", | |
| # Telugu | |
| "ante", "ane", "mane", "arey", "enti", | |
| # Hindi | |
| "matlab", "yani", "bas", "acha", | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Module-level model cache (survives across Denoiser() instances on same Space) | |
| # --------------------------------------------------------------------------- | |
| _SILERO_MODEL = None # Silero VAD | |
| _SILERO_UTILS = None | |
| class Denoiser: | |
| def __init__(self): | |
| self._room_tone = None | |
| print("[Denoiser] β Professional Audio Enhancer v2 ready (HF Spaces mode)") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MAIN ENTRY POINT | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process(self, audio_path: str, out_dir: str, | |
| remove_fillers: bool = True, | |
| remove_silences: bool = True, | |
| remove_breaths: bool = True, | |
| remove_mouth_sounds: bool = True, | |
| remove_stutters: bool = True, | |
| word_segments: list = None, | |
| original_filename: str = None) -> dict: | |
| """ | |
| Full professional pipeline. | |
| word_segments: list of dicts from Whisper word-level timestamps. | |
| Each dict: { | |
| 'word': str, | |
| 'start': float, # seconds | |
| 'end': float, # seconds | |
| 'avg_logprob': float, # optional β Whisper segment-level confidence | |
| 'no_speech_prob':float, # optional β Whisper no-speech probability | |
| } | |
| Returns: {'audio_path': str, 'stats': dict} | |
| """ | |
| t0 = time.time() | |
| stats = {} | |
| print("[Denoiser] βΆ Starting professional enhancement pipeline v2...") | |
| # ββ 0. Convert to standard WAV βββββββββββββββββββββββββββββββ | |
| wav_in = os.path.join(out_dir, "stage0_input.wav") | |
| self._to_wav(audio_path, wav_in, TARGET_SR) | |
| audio, sr = sf.read(wav_in, always_2d=True) | |
| n_ch = audio.shape[1] | |
| duration = len(audio) / sr | |
| print(f"[Denoiser] Input: {sr}Hz, {n_ch}ch, {duration:.1f}s") | |
| # Work in mono float32 | |
| mono = audio.mean(axis=1).astype(np.float32) | |
| # ββ 1. Capture room tone BEFORE any denoising ββββββββββββββββ | |
| self._room_tone = self._capture_room_tone(mono, sr) | |
| # ββ 2. Background Noise Removal ββββββββββββββββββββββββββββββ | |
| mono, noise_method = self._remove_background_noise(mono, sr) | |
| stats['noise_method'] = noise_method | |
| # ββ 3. Mouth Sound Reduction (clicks/pops) βββββββββββββββββββ | |
| if remove_mouth_sounds: | |
| mono, n_clicks = self._reduce_mouth_sounds(mono, sr) | |
| stats['mouth_sounds_removed'] = n_clicks | |
| # ββ 4. Breath Reduction ββββββββββββββββββββββββββββββββββββββ | |
| if remove_breaths: | |
| mono = self._reduce_breaths(mono, sr) | |
| stats['breaths_reduced'] = True | |
| # ββ 5. Filler Word Removal βββββββββββββββββββββββββββββββββββ | |
| stats['fillers_removed'] = 0 | |
| if remove_fillers and word_segments: | |
| mono, n_fillers = self._remove_fillers(mono, sr, word_segments) | |
| stats['fillers_removed'] = n_fillers | |
| # ββ 6. Stutter Removal βββββββββββββββββββββββββββββββββββββββ | |
| stats['stutters_removed'] = 0 | |
| if remove_stutters and word_segments: | |
| mono, n_stutters = self._remove_stutters(mono, sr, word_segments) | |
| stats['stutters_removed'] = n_stutters | |
| # ββ 7. Long Silence Removal βββββββββββββββββββββββββββββββββββ | |
| stats['silences_removed_sec'] = 0.0 | |
| if remove_silences: | |
| mono, sil_sec = self._remove_long_silences(mono, sr) | |
| stats['silences_removed_sec'] = round(sil_sec, 2) | |
| # ββ 8. Normalize Loudness βββββββββββββββββββββββββββββββββββββ | |
| mono = self._normalise(mono, sr) | |
| # ββ 9. Restore stereo / save as MP3 ββββββββββββββββββββββββββ | |
| out_audio = np.stack([mono, mono], axis=1) if n_ch == 2 else mono | |
| # Build output filename: strip original extension, append _cleared.mp3 | |
| # e.g. "output.wav" β "output_cleared.mp3" | |
| if original_filename: | |
| base = os.path.splitext(os.path.basename(original_filename))[0] | |
| else: | |
| base = os.path.splitext(os.path.basename(audio_path))[0] | |
| out_name = f"{base}_cleared.mp3" | |
| # Write a temporary WAV first (soundfile can't encode MP3), | |
| # then convert to MP3 via ffmpeg (already in the Dockerfile). | |
| tmp_wav = os.path.join(out_dir, "denoised_tmp.wav") | |
| out_path = os.path.join(out_dir, out_name) | |
| sf.write(tmp_wav, out_audio, sr, format="WAV", subtype="PCM_24") | |
| result = subprocess.run([ | |
| "ffmpeg", "-y", "-i", tmp_wav, | |
| "-codec:a", "libmp3lame", | |
| "-qscale:a", "2", # VBR quality 2 β 190 kbps β transparent quality | |
| "-ar", str(sr), | |
| out_path | |
| ], capture_output=True) | |
| if result.returncode != 0: | |
| stderr = result.stderr.decode(errors="replace") | |
| logger.warning(f"MP3 export failed, falling back to WAV: {stderr[-300:]}") | |
| out_path = tmp_wav # graceful fallback β still return something | |
| else: | |
| try: | |
| os.remove(tmp_wav) # clean up temp WAV | |
| except OSError: | |
| pass | |
| stats['processing_sec'] = round(time.time() - t0, 2) | |
| print(f"[Denoiser] β Done in {stats['processing_sec']}s | {stats}") | |
| return {'audio_path': out_path, 'stats': stats} | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ROOM TONE CAPTURE | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _capture_room_tone(self, audio: np.ndarray, sr: int, | |
| sample_sec: float = 0.5) -> np.ndarray: | |
| """Find the quietest 0.5s window in the recording β that's the room tone.""" | |
| try: | |
| frame = int(sr * sample_sec) | |
| if len(audio) < frame * 2: | |
| fallback_len = min(int(sr * 0.1), len(audio)) | |
| print("[Denoiser] Short audio β using first 100ms as room tone") | |
| return audio[:fallback_len].copy().astype(np.float32) | |
| best_rms = float('inf') | |
| best_start = 0 | |
| step = sr # 1-second steps | |
| for i in range(0, len(audio) - frame, step): | |
| rms = float(np.sqrt(np.mean(audio[i:i + frame] ** 2))) | |
| if rms < best_rms: | |
| best_rms, best_start = rms, i | |
| room = audio[best_start: best_start + frame].copy() | |
| print(f"[Denoiser] Room tone captured: RMS={best_rms:.5f}") | |
| return room | |
| except Exception as e: | |
| logger.warning(f"Room tone capture failed: {e}") | |
| return np.zeros(int(sr * sample_sec), dtype=np.float32) | |
| def _fill_with_room_tone(self, length: int) -> np.ndarray: | |
| """Tile room tone to fill a gap of `length` samples.""" | |
| if self._room_tone is None or len(self._room_tone) == 0: | |
| return np.zeros(length, dtype=np.float32) | |
| reps = length // len(self._room_tone) + 1 | |
| tiled = np.tile(self._room_tone, reps)[:length] | |
| fade = min(int(0.01 * len(tiled)), 64) | |
| if fade > 0: | |
| tiled[:fade] *= np.linspace(0, 1, fade) | |
| tiled[-fade:] *= np.linspace(1, 0, fade) | |
| return tiled.astype(np.float32) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CROSSFADE SPLICE β NEW | |
| # Replaces abrupt room-tone insertion with smooth equal-power blend. | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _crossfade_join(self, a: np.ndarray, b: np.ndarray, | |
| fade_ms: float = 20.0, sr: int = TARGET_SR) -> np.ndarray: | |
| """ | |
| Equal-power crossfade between the tail of `a` and the head of `b`. | |
| Eliminates click/seam artifacts at all edit points. | |
| """ | |
| fade_n = int(sr * fade_ms / 1000) | |
| fade_n = min(fade_n, len(a), len(b)) | |
| if fade_n < 2: | |
| return np.concatenate([a, b]) | |
| t = np.linspace(0, np.pi / 2, fade_n) | |
| fade_out = np.cos(t) # equal-power: cosΒ²+sinΒ²=1 | |
| fade_in = np.sin(t) | |
| overlap = a[-fade_n:] * fade_out + b[:fade_n] * fade_in | |
| return np.concatenate([a[:-fade_n], overlap, b[fade_n:]]) | |
| def _build_with_crossfade(self, audio: np.ndarray, cuts: list, | |
| sr: int, fill_tone: bool = True) -> np.ndarray: | |
| """ | |
| Build output from a list of (start_sec, end_sec) cuts, | |
| filling gaps with room tone and crossfading every join. | |
| cuts: sorted list of (start_sec, end_sec) to REMOVE. | |
| """ | |
| segments = [] | |
| prev = 0.0 | |
| for start, end in sorted(cuts, key=lambda x: x[0]): | |
| # Guard: skip cuts shorter than minimum | |
| if (end - start) < MIN_CUT_SEC: | |
| continue | |
| keep_sta = int(prev * sr) | |
| keep_end = int(start * sr) | |
| if keep_sta < keep_end: | |
| segments.append(audio[keep_sta:keep_end]) | |
| gap_len = int((end - start) * sr) | |
| if fill_tone and gap_len > 0: | |
| segments.append(self._fill_with_room_tone(gap_len)) | |
| prev = end | |
| remain = int(prev * sr) | |
| if remain < len(audio): | |
| segments.append(audio[remain:]) | |
| if not segments: | |
| return audio | |
| # Crossfade every adjacent pair | |
| result = segments[0] | |
| for seg in segments[1:]: | |
| result = self._crossfade_join(result, seg, fade_ms=20.0, sr=sr) | |
| return result.astype(np.float32) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # BACKGROUND NOISE REMOVAL | |
| # Chain: DeepFilterNet β two-pass noisereduce β passthrough | |
| # | |
| # SepFormer REMOVED β it is a speech separation model, not a denoiser. | |
| # It reconstructs voice artificially β robotic output. | |
| # | |
| # Two-pass noisereduce is the safe CPU fallback: | |
| # Pass 1 (stationary) β removes steady hum/hiss/fan noise | |
| # Pass 2 (non-stationary) β catches residual at low prop_decrease | |
| # so original voice character is preserved | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _remove_background_noise(self, audio, sr): | |
| # ββ Primary: DeepFilterNet (SOTA, Rust available via Docker) βββββ | |
| try: | |
| result = self._deepfilter(audio, sr) | |
| print("[Denoiser] β DeepFilterNet noise removal done") | |
| return result, "DeepFilterNet" | |
| except Exception as e: | |
| logger.warning(f"[Denoiser] DeepFilterNet unavailable ({e})") | |
| # ββ Fallback: Single-pass noisereduce, stationary only ββββββββββββ | |
| # PHILOSOPHY: do as little as possible to the signal. | |
| # - stationary=True β only targets steady/consistent noise (fan, | |
| # hum, AC, room hiss). Leaves transient | |
| # speech harmonics completely untouched. | |
| # - prop_decrease=0.5 β reduces noise by ~50%, not 100%. | |
| # Keeps a thin noise floor so the voice | |
| # never sounds "hollow" or over-processed. | |
| # - No second pass, no non-stationary processing β those modes | |
| # touch voice frequencies and cause the robotic effect. | |
| try: | |
| import noisereduce as nr | |
| cleaned = nr.reduce_noise( | |
| y=audio, sr=sr, | |
| stationary=True, | |
| prop_decrease=0.50, | |
| ).astype(np.float32) | |
| print("[Denoiser] β noisereduce done (voice-preserving, stationary only)") | |
| return cleaned, "noisereduce_stationary" | |
| except Exception as e: | |
| logger.warning(f"noisereduce failed: {e}") | |
| return audio, "none" | |
| def _deepfilter(self, audio: np.ndarray, sr: int) -> np.ndarray: | |
| """DeepFilterNet enhancement (local only β requires Rust compiler).""" | |
| from df.enhance import enhance, init_df | |
| import torch | |
| # Lazy-load, module-level cache not needed (rarely reached on HF Spaces) | |
| if not hasattr(self, '_df_model') or self._df_model is None: | |
| self._df_model, self._df_state, _ = init_df() | |
| df_sr = self._df_state.sr() | |
| a = self._resample(audio, sr, df_sr) if sr != df_sr else audio | |
| t = torch.from_numpy(a).unsqueeze(0) | |
| out = enhance(self._df_model, self._df_state, t) | |
| res = out.squeeze().numpy().astype(np.float32) | |
| return self._resample(res, df_sr, sr) if df_sr != sr else res | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FILLER WORD REMOVAL β UPGRADED (confidence-gated + crossfade) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _remove_fillers(self, audio: np.ndarray, sr: int, segments: list): | |
| """ | |
| Cuts filler words using Whisper word-level timestamps. | |
| UPGRADE: Confidence gating β words are only cut if: | |
| 1. avg_logprob β₯ FILLER_MIN_LOGPROB (Whisper was confident) | |
| 2. no_speech_prob β€ FILLER_MAX_NO_SPEECH (audio is actually speech) | |
| 3. Duration β₯ MIN_CUT_SEC (not a micro-glitch timestamp artefact) | |
| Falls back gracefully when confidence fields are absent (older Whisper). | |
| Gaps filled with room tone + crossfade for seamless edits. | |
| """ | |
| try: | |
| cuts = [] | |
| for seg in segments: | |
| word = seg.get('word', '').strip().lower() | |
| word = re.sub(r'[^a-z\s]', '', word).strip() | |
| if word not in FILLER_WORDS: | |
| continue | |
| start = seg.get('start', 0.0) | |
| end = seg.get('end', 0.0) | |
| # Duration guard | |
| if (end - start) < MIN_CUT_SEC: | |
| continue | |
| # Confidence gate (optional fields β skip gate if absent) | |
| avg_logprob = seg.get('avg_logprob', None) | |
| no_speech_prob = seg.get('no_speech_prob', None) | |
| if avg_logprob is not None and avg_logprob < FILLER_MIN_LOGPROB: | |
| logger.debug(f"[Denoiser] Filler '{word}' skipped: " | |
| f"low confidence ({avg_logprob:.2f})") | |
| continue | |
| if no_speech_prob is not None and no_speech_prob > FILLER_MAX_NO_SPEECH: | |
| logger.debug(f"[Denoiser] Filler '{word}' skipped: " | |
| f"no_speech_prob={no_speech_prob:.2f}") | |
| continue | |
| cuts.append((start, end)) | |
| if not cuts: | |
| return audio, 0 | |
| out = self._build_with_crossfade(audio, cuts, sr, fill_tone=True) | |
| print(f"[Denoiser] β Removed {len(cuts)} filler words") | |
| return out, len(cuts) | |
| except Exception as e: | |
| logger.warning(f"Filler removal failed: {e}") | |
| return audio, 0 | |
| def clean_transcript_fillers(self, transcript: str) -> str: | |
| """Remove filler words from transcript TEXT to match cleaned audio.""" | |
| words = transcript.split() | |
| result = [] | |
| i = 0 | |
| while i < len(words): | |
| w = re.sub(r'[^a-z\s]', '', words[i].lower()).strip() | |
| if i + 1 < len(words): | |
| two = w + " " + re.sub(r'[^a-z\s]', '', words[i+1].lower()).strip() | |
| if two in FILLER_WORDS: | |
| i += 2 | |
| continue | |
| if w in FILLER_WORDS: | |
| i += 1 | |
| continue | |
| result.append(words[i]) | |
| i += 1 | |
| return " ".join(result) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STUTTER REMOVAL β UPGRADED (phonetic similarity + crossfade) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _remove_stutters(self, audio: np.ndarray, sr: int, segments: list): | |
| """ | |
| UPGRADE: Phonetic near-match detection in addition to exact repeats. | |
| e.g. "the" / "tha", "and" / "an", "I" / "I" all caught. | |
| Uses jellyfish.jaro_winkler_similarity if available; | |
| falls back to plain edit-distance ratio, then exact match only. | |
| Confidence gating applied here too (same thresholds as filler removal). | |
| Crossfade used on all splices. | |
| """ | |
| try: | |
| if len(segments) < 2: | |
| return audio, 0 | |
| # Choose similarity function | |
| sim_fn = self._word_similarity_fn() | |
| cuts = [] | |
| stutters_found = 0 | |
| i = 0 | |
| while i < len(segments): | |
| seg_i = segments[i] | |
| word = re.sub(r'[^a-z]', '', seg_i.get('word', '').lower()) | |
| if not word: | |
| i += 1 | |
| continue | |
| # Confidence gate on the anchor word | |
| if not self._passes_confidence_gate(seg_i): | |
| i += 1 | |
| continue | |
| # Look ahead for consecutive near-matches | |
| j = i + 1 | |
| while j < len(segments): | |
| seg_j = segments[j] | |
| next_word = re.sub(r'[^a-z]', '', seg_j.get('word', '').lower()) | |
| if not next_word: | |
| j += 1 | |
| continue | |
| similarity = sim_fn(word, next_word) | |
| if similarity >= 0.88: # β₯88% similar = stutter | |
| cuts.append((seg_i['start'], seg_i['end'])) | |
| stutters_found += 1 | |
| i = j | |
| j += 1 | |
| else: | |
| break | |
| i += 1 | |
| if not cuts: | |
| return audio, 0 | |
| out = self._build_with_crossfade(audio, cuts, sr, fill_tone=True) | |
| print(f"[Denoiser] β Removed {stutters_found} stutters") | |
| return out, stutters_found | |
| except Exception as e: | |
| logger.warning(f"Stutter removal failed: {e}") | |
| return audio, 0 | |
| def _word_similarity_fn(): | |
| """Return best available string-similarity function.""" | |
| try: | |
| import jellyfish | |
| return jellyfish.jaro_winkler_similarity | |
| except ImportError: | |
| pass | |
| try: | |
| import editdistance | |
| def _ed_ratio(a, b): | |
| if not a and not b: | |
| return 1.0 | |
| dist = editdistance.eval(a, b) | |
| return 1.0 - dist / max(len(a), len(b)) | |
| return _ed_ratio | |
| except ImportError: | |
| pass | |
| # Plain exact match as last resort | |
| return lambda a, b: 1.0 if a == b else 0.0 | |
| def _passes_confidence_gate(seg: dict) -> bool: | |
| """Return True if Whisper confidence is acceptable (or fields absent).""" | |
| avg_logprob = seg.get('avg_logprob', None) | |
| no_speech_prob = seg.get('no_speech_prob', None) | |
| if avg_logprob is not None and avg_logprob < FILLER_MIN_LOGPROB: | |
| return False | |
| if no_speech_prob is not None and no_speech_prob > FILLER_MAX_NO_SPEECH: | |
| return False | |
| return True | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # BREATH REDUCTION | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _reduce_breaths(self, audio: np.ndarray, sr: int) -> np.ndarray: | |
| """Non-stationary spectral gating β catches short broadband breath bursts.""" | |
| try: | |
| import noisereduce as nr | |
| cleaned = nr.reduce_noise( | |
| y=audio, sr=sr, | |
| stationary=False, | |
| prop_decrease=0.60, | |
| freq_mask_smooth_hz=400, | |
| time_mask_smooth_ms=40, | |
| ).astype(np.float32) | |
| print("[Denoiser] β Breath reduction done") | |
| return cleaned | |
| except Exception as e: | |
| logger.warning(f"Breath reduction failed: {e}") | |
| return audio | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MOUTH SOUND REDUCTION | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _reduce_mouth_sounds(self, audio: np.ndarray, sr: int): | |
| """ | |
| Suppress very short, very high-amplitude transients (clicks/pops). | |
| Threshold at 6.0 std to avoid removing real consonants (p, b, t). | |
| """ | |
| try: | |
| result = audio.copy() | |
| win = int(sr * 0.003) # 3ms window | |
| hop = win // 2 | |
| rms_arr = np.array([ | |
| float(np.sqrt(np.mean(audio[i:i+win]**2))) | |
| for i in range(0, len(audio) - win, hop) | |
| ]) | |
| if len(rms_arr) == 0: | |
| return audio, 0 | |
| threshold = float(np.mean(rms_arr)) + 6.0 * float(np.std(rms_arr)) | |
| n_removed = 0 | |
| for idx, rms in enumerate(rms_arr): | |
| if rms > threshold: | |
| start = idx * hop | |
| end = min(start + win, len(result)) | |
| result[start:end] *= np.linspace(1, 0, end - start) | |
| n_removed += 1 | |
| if n_removed: | |
| print(f"[Denoiser] β Suppressed {n_removed} mouth sound transients") | |
| return result.astype(np.float32), n_removed | |
| except Exception as e: | |
| logger.warning(f"Mouth sound reduction failed: {e}") | |
| return audio, 0 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LONG SILENCE REMOVAL β UPGRADED (adaptive threshold) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _remove_long_silences(self, audio: np.ndarray, sr: int, | |
| max_silence_sec: float = 1.5, | |
| keep_pause_sec: float = 0.4) -> tuple: | |
| """ | |
| UPGRADE: Adaptive silence threshold. | |
| Old code used a hardcoded RMS=0.008 β worked in quiet studios only. | |
| New: threshold = 15th-percentile of per-frame RMS values. | |
| This self-calibrates to the recording's actual noise floor, | |
| so it works equally well in noisy rooms and near-silent studios. | |
| Silences replaced with room tone + crossfade. | |
| """ | |
| try: | |
| frame_len = int(sr * 0.02) # 20ms frames | |
| # ββ Compute per-frame RMS βββββββββββββββββββββββββββββββββ | |
| n_frames = (len(audio) - frame_len) // frame_len | |
| rms_frames = np.array([ | |
| float(np.sqrt(np.mean(audio[i*frame_len:(i+1)*frame_len]**2))) | |
| for i in range(n_frames) | |
| ]) | |
| if len(rms_frames) == 0: | |
| return audio, 0.0 | |
| # ββ Adaptive threshold: 15th percentile of RMS βββββββββββ | |
| threshold = float(np.percentile(rms_frames, 15)) | |
| # Clamp: never go below 0.001 (avoids mis-classifying very quiet speech) | |
| threshold = max(threshold, 0.001) | |
| print(f"[Denoiser] Adaptive silence threshold: RMS={threshold:.5f}") | |
| max_sil_frames = int(max_silence_sec / 0.02) | |
| keep_frames = int(keep_pause_sec / 0.02) | |
| kept = [] | |
| silence_count = 0 | |
| total_removed = 0 | |
| in_long_sil = False | |
| for i in range(n_frames): | |
| frame = audio[i*frame_len:(i+1)*frame_len] | |
| rms = rms_frames[i] | |
| if rms < threshold: | |
| silence_count += 1 | |
| if silence_count <= max_sil_frames: | |
| kept.append(frame) | |
| else: | |
| total_removed += frame_len | |
| in_long_sil = True | |
| else: | |
| if in_long_sil: | |
| pad = self._fill_with_room_tone(keep_frames * frame_len) | |
| kept.append(pad) | |
| in_long_sil = False | |
| silence_count = 0 | |
| kept.append(frame) | |
| # Tail of audio | |
| tail_start = n_frames * frame_len | |
| if tail_start < len(audio): | |
| kept.append(audio[tail_start:]) | |
| if not kept: | |
| return audio, 0.0 | |
| # Crossfade each frame join for smooth output | |
| result = kept[0] | |
| for seg in kept[1:]: | |
| result = self._crossfade_join(result, seg, fade_ms=5.0, sr=sr) | |
| removed_sec = total_removed / sr | |
| if removed_sec > 0: | |
| print(f"[Denoiser] β Removed {removed_sec:.1f}s of long silences") | |
| return result.astype(np.float32), removed_sec | |
| except Exception as e: | |
| logger.warning(f"Silence removal failed: {e}") | |
| return audio, 0.0 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # NORMALIZATION | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _normalise(self, audio: np.ndarray, sr: int) -> np.ndarray: | |
| try: | |
| import pyloudnorm as pyln | |
| meter = pyln.Meter(sr) | |
| loudness = meter.integrated_loudness(audio) | |
| if np.isfinite(loudness) and loudness < 0: | |
| audio = pyln.normalize.loudness(audio, loudness, TARGET_LOUDNESS) | |
| print(f"[Denoiser] β Normalized: {loudness:.1f} β {TARGET_LOUDNESS} LUFS") | |
| except Exception: | |
| rms = np.sqrt(np.mean(audio**2)) | |
| if rms > 1e-9: | |
| target_rms = 10 ** (TARGET_LOUDNESS / 20.0) | |
| audio = audio * (target_rms / rms) | |
| return np.clip(audio, -1.0, 1.0).astype(np.float32) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # HELPERS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _to_wav(self, src: str, dst: str, target_sr: int): | |
| result = subprocess.run([ | |
| "ffmpeg", "-y", "-i", src, | |
| "-acodec", "pcm_s24le", "-ar", str(target_sr), dst | |
| ], capture_output=True) | |
| if result.returncode != 0: | |
| stderr = result.stderr.decode(errors='replace') | |
| logger.warning(f"ffmpeg non-zero exit: {stderr[-400:]}") | |
| # Fallback: soundfile passthrough | |
| data, sr = sf.read(src, always_2d=True) | |
| sf.write(dst, data, sr, format="WAV", subtype="PCM_24") | |
| def _resample(self, audio: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray: | |
| if orig_sr == target_sr: | |
| return audio | |
| try: | |
| import librosa | |
| return librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr) | |
| except Exception: | |
| length = int(len(audio) * target_sr / orig_sr) | |
| return np.interp( | |
| np.linspace(0, len(audio), length), | |
| np.arange(len(audio)), audio | |
| ).astype(np.float32) |