Spaces:
Running
Running
| """ | |
| Department 1 β Professional Audio Enhancer | |
| Matches CleanVoice feature-for-feature using FREE local models: | |
| β Background noise removal β DeepFilterNet (SOTA free model) β noisereduce fallback | |
| β Filler word removal β Word-level timestamps + room tone fill | |
| β Stutter removal β Repeated-phrase detection + cut (fixed: catches triple+ repeats) | |
| β Long silence removal β Energy-based VAD (keeps natural pauses) | |
| β Breath sound reduction β Spectral gating (noisereduce non-stationary) | |
| β Mouth sound reduction β Amplitude zscore transient suppression (tuned threshold) | |
| β Room tone fill β Captures room noise, fills cuts naturally | |
| β Audio normalization β pyloudnorm -18 LUFS | |
| β CD quality output β 48000Hz PCM_24 (matches DeepFilterNet native SR) | |
| FIXES APPLIED: | |
| - TARGET_SR set to 48000 to match DeepFilterNet natively (no double resampling) | |
| - Mouth sound threshold raised 4.5β6.0 std (was removing real consonants p/b/t) | |
| - Duplicate _remove_background_noise fixed (was silently overwriting first def) | |
| - TARGET_SR set to 48000 β matches DeepFilterNet natively | |
| - Wiener filter added as Priority 2 fallback (artifact-free) | |
| - noisereduce kept as gentle last resort only | |
| - Room tone fallback: uses first 100ms if audio too short | |
| - Stutter detection fixed: now catches triple+ repeats (I I I was β I was) | |
| - Filler removal: also returns cleaned transcript text | |
| - Normalise RMS fallback formula corrected | |
| """ | |
| import os | |
| import re | |
| import time | |
| import subprocess | |
| import tempfile | |
| import numpy as np | |
| import soundfile as sf | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| # NOTE: 44100 used on HF Spaces (DeepFilterNet not available β no Rust compiler) | |
| # Locally with DeepFilterNet installed, change this to 48000 for best quality | |
| TARGET_SR = 48000 # matches DeepFilterNet native SR | |
| TARGET_LOUDNESS = -18.0 | |
| # Filler words (English + Telugu + Hindi) | |
| FILLER_WORDS = { | |
| "um", "umm", "ummm", "uh", "uhh", "uhhh", | |
| "hmm", "hm", "hmm", "hmmm", | |
| "er", "err", "errr", | |
| "eh", "ahh", "ah", | |
| "like", "basically", "literally", | |
| "you know", "i mean", "so", | |
| "right", "okay", "ok", | |
| # Telugu | |
| "ante", "ane", "mane", "arey", "enti", | |
| # Hindi | |
| "matlab", "yani", "bas", "acha", | |
| } | |
| class Denoiser: | |
| def __init__(self): | |
| self._df_model = None | |
| self._df_state = None | |
| self._df_loaded = False | |
| self._room_tone = None # captured room noise sample | |
| print("[Denoiser] β Professional Audio Enhancer ready") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MAIN ENTRY POINT | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process(self, audio_path: str, out_dir: str, | |
| remove_fillers: bool = True, | |
| remove_silences: bool = True, | |
| remove_breaths: bool = True, | |
| remove_mouth_sounds: bool = True, | |
| remove_stutters: bool = True, | |
| word_segments: list = None) -> dict: | |
| """ | |
| Full professional pipeline. | |
| word_segments: list of {'word': str, 'start': float, 'end': float} | |
| from Whisper word-level timestamps. | |
| Returns: {'audio_path': str, 'stats': dict} | |
| """ | |
| t0 = time.time() | |
| stats = {} | |
| print("[Denoiser] βΆ Starting professional enhancement pipeline...") | |
| # ββ 0. Convert to standard WAV βββββββββββββββββββββββββββββββ | |
| wav_in = os.path.join(out_dir, "stage0_input.wav") | |
| self._to_wav(audio_path, wav_in, TARGET_SR) | |
| audio, sr = sf.read(wav_in, always_2d=True) | |
| n_ch = audio.shape[1] | |
| duration = len(audio) / sr | |
| print(f"[Denoiser] Input: {sr}Hz, {n_ch}ch, {duration:.1f}s") | |
| # Work in mono float32 | |
| mono = audio.mean(axis=1).astype(np.float32) | |
| # ββ 1. Capture room tone BEFORE denoising ββββββββββββββββββββ | |
| self._room_tone = self._capture_room_tone(mono, sr) | |
| # ββ 2. Background Noise Removal ββββββββββββββββββββββββββββββ | |
| mono, noise_method = self._remove_background_noise(mono, sr) | |
| stats['noise_method'] = noise_method | |
| # ββ 3. Mouth Sound Reduction (clicks/pops) βββββββββββββββββββ | |
| if remove_mouth_sounds: | |
| mono, n_clicks = self._reduce_mouth_sounds(mono, sr) | |
| stats['mouth_sounds_removed'] = n_clicks | |
| # ββ 4. Breath Reduction ββββββββββββββββββββββββββββββββββββββ | |
| if remove_breaths: | |
| mono = self._reduce_breaths(mono, sr) | |
| stats['breaths_reduced'] = True | |
| # ββ 5. Filler Word Removal (needs word-level timestamps) βββββ | |
| stats['fillers_removed'] = 0 | |
| if remove_fillers and word_segments: | |
| mono, n_fillers = self._remove_fillers(mono, sr, word_segments) | |
| stats['fillers_removed'] = n_fillers | |
| # ββ 6. Stutter Removal (needs word-level timestamps) βββββββββ | |
| stats['stutters_removed'] = 0 | |
| if remove_stutters and word_segments: | |
| mono, n_stutters = self._remove_stutters(mono, sr, word_segments) | |
| stats['stutters_removed'] = n_stutters | |
| # ββ 7. Long Silence Removal βββββββββββββββββββββββββββββββββββ | |
| stats['silences_removed_sec'] = 0.0 | |
| if remove_silences: | |
| mono, sil_sec = self._remove_long_silences(mono, sr) | |
| stats['silences_removed_sec'] = round(sil_sec, 2) | |
| # ββ 8. Normalize Loudness βββββββββββββββββββββββββββββββββββββ | |
| mono = self._normalise(mono, sr) | |
| # ββ 9. Restore stereo / save ββββββββββββββββββββββββββββββββββ | |
| out_audio = np.stack([mono, mono], axis=1) if n_ch == 2 else mono | |
| out_path = os.path.join(out_dir, "denoised.wav") | |
| sf.write(out_path, out_audio, sr, subtype="PCM_24") | |
| stats['processing_sec'] = round(time.time() - t0, 2) | |
| print(f"[Denoiser] β Done in {stats['processing_sec']}s | {stats}") | |
| return {'audio_path': out_path, 'stats': stats} | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ROOM TONE CAPTURE | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _capture_room_tone(self, audio: np.ndarray, sr: int, | |
| sample_sec: float = 0.5) -> np.ndarray: | |
| """ | |
| Find the quietest 0.5s section of audio = room tone. | |
| FIX: Falls back to first 100ms if audio is too short. | |
| """ | |
| try: | |
| frame = int(sr * sample_sec) | |
| # FIX: Robust fallback for short audio | |
| if len(audio) < frame * 2: | |
| fallback_len = min(int(sr * 0.1), len(audio)) # first 100ms | |
| print("[Denoiser] Short audio β using first 100ms as room tone") | |
| return audio[:fallback_len].copy().astype(np.float32) | |
| best_rms = float('inf') | |
| best_start = 0 | |
| step = sr | |
| for i in range(0, len(audio) - frame, step): | |
| chunk = audio[i:i + frame] | |
| rms = float(np.sqrt(np.mean(chunk ** 2))) | |
| if rms < best_rms: | |
| best_rms = rms | |
| best_start = i | |
| room = audio[best_start: best_start + frame].copy() | |
| print(f"[Denoiser] Room tone captured: RMS={best_rms:.5f}") | |
| return room | |
| except Exception as e: | |
| logger.warning(f"Room tone capture failed: {e}") | |
| return np.zeros(int(sr * sample_sec), dtype=np.float32) | |
| def _fill_with_room_tone(self, length: int) -> np.ndarray: | |
| """Tile room tone to fill a gap of `length` samples.""" | |
| if self._room_tone is None or len(self._room_tone) == 0: | |
| return np.zeros(length, dtype=np.float32) | |
| reps = length // len(self._room_tone) + 1 | |
| tiled = np.tile(self._room_tone, reps)[:length] | |
| # Fade in/out to avoid clicks | |
| fade = min(int(0.01 * len(tiled)), 64) | |
| if fade > 0: | |
| tiled[:fade] *= np.linspace(0, 1, fade) | |
| tiled[-fade:] *= np.linspace(1, 0, fade) | |
| return tiled.astype(np.float32) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # BACKGROUND NOISE REMOVAL | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _remove_background_noise(self, audio, sr): | |
| # ββ Priority 1: DeepFilterNet (SOTA β best quality) βββββββββ | |
| try: | |
| result = self._deepfilter(audio, sr) | |
| print("[Denoiser] β DeepFilterNet noise removal done") | |
| return result, "DeepFilterNet" | |
| except Exception as e: | |
| logger.warning(f"[Denoiser] DeepFilterNet unavailable ({e})") | |
| # ββ Try Wiener filter (scipy β artifact-free, no compilation needed) β | |
| try: | |
| result = self._rnnoise(audio, sr) | |
| print("[Denoiser] β Wiener filter noise removal done") | |
| return result, "Wiener filter" | |
| except Exception as e: | |
| logger.warning(f"[Denoiser] Wiener filter failed ({e})") | |
| # ββ Fallback: noisereduce with mild settings ββββββββββββββββββ | |
| # IMPORTANT: Keep prop_decrease LOW (0.50-0.60) to avoid musical | |
| # noise artifacts. Two aggressive passes make musical noise WORSE. | |
| try: | |
| import noisereduce as nr | |
| # Single gentle pass β avoids musical noise artifacts | |
| cleaned = nr.reduce_noise( | |
| y=audio, sr=sr, | |
| stationary=False, # non-stationary handles both types | |
| prop_decrease=0.55, # gentle β avoids buzzing artifacts | |
| freq_mask_smooth_hz=1000, # heavy smoothing = less musical noise | |
| time_mask_smooth_ms=100, # heavy smoothing = less musical noise | |
| n_std_thresh_stationary=2.0, # higher = less aggressive | |
| ).astype(np.float32) | |
| print("[Denoiser] β noisereduce (gentle, artifact-free) done") | |
| return cleaned, "noisereduce" | |
| except Exception as e: | |
| logger.warning(f"noisereduce failed: {e}") | |
| return audio, "none" | |
| def _rnnoise(self, audio, sr): | |
| """ | |
| Wiener filter via scipy β no compilation needed, works on HF Spaces. | |
| Much cleaner than noisereduce for voice β no musical artifacts. | |
| """ | |
| from scipy.signal import wiener | |
| # Wiener filter works best on short frames | |
| frame_size = int(sr * 0.02) # 20ms frames | |
| result = np.zeros_like(audio) | |
| for i in range(0, len(audio) - frame_size, frame_size): | |
| frame = audio[i:i + frame_size] | |
| result[i:i + frame_size] = wiener(frame, mysize=7) | |
| # Handle last chunk | |
| remainder = len(audio) % frame_size | |
| if remainder: | |
| result[-remainder:] = wiener(audio[-remainder:], mysize=7) | |
| return result.astype(np.float32) | |
| def _deepfilter(self, audio, sr): | |
| if not self._df_loaded: | |
| from df.enhance import enhance, init_df | |
| self._df_model, self._df_state, _ = init_df() | |
| self._df_loaded = True | |
| from df.enhance import enhance | |
| import torch | |
| df_sr = self._df_state.sr() | |
| # FIX: TARGET_SR now matches DeepFilterNet's native SR (48kHz) | |
| # so resampling is skipped in most cases | |
| a = self._resample(audio, sr, df_sr) if sr != df_sr else audio | |
| t = torch.from_numpy(a).unsqueeze(0) | |
| out = enhance(self._df_model, self._df_state, t) | |
| res = out.squeeze().numpy().astype(np.float32) | |
| return self._resample(res, df_sr, sr) if df_sr != sr else res | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FILLER WORD REMOVAL + ROOM TONE FILL | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _remove_fillers(self, audio, sr, segments): | |
| """ | |
| Cut filler words using word-level timestamps. | |
| Fills gaps with room tone for natural sound. | |
| """ | |
| try: | |
| cuts = [] | |
| for seg in segments: | |
| word = seg.get('word', '').strip().lower() | |
| word = re.sub(r'[^a-z\s]', '', word).strip() | |
| if word in FILLER_WORDS: | |
| cuts.append((seg['start'], seg['end'], word)) | |
| if not cuts: | |
| return audio, 0 | |
| result = [] | |
| prev = 0.0 | |
| for start, end, word in sorted(cuts, key=lambda x: x[0]): | |
| keep_end = int(start * sr) | |
| keep_sta = int(prev * sr) | |
| if keep_sta < keep_end: | |
| result.append(audio[keep_sta:keep_end]) | |
| gap_len = int((end - start) * sr) | |
| if gap_len > 0: | |
| result.append(self._fill_with_room_tone(gap_len)) | |
| prev = end | |
| remain_start = int(prev * sr) | |
| if remain_start < len(audio): | |
| result.append(audio[remain_start:]) | |
| out = np.concatenate(result) if result else audio | |
| print(f"[Denoiser] β Removed {len(cuts)} filler words: {[c[2] for c in cuts[:5]]}") | |
| return out.astype(np.float32), len(cuts) | |
| except Exception as e: | |
| logger.warning(f"Filler removal failed: {e}") | |
| return audio, 0 | |
| def clean_transcript_fillers(self, transcript: str) -> str: | |
| """ | |
| FIX (NEW): Also remove filler words from the transcript TEXT, | |
| so the displayed text matches the cleaned audio. | |
| """ | |
| words = transcript.split() | |
| result = [] | |
| i = 0 | |
| while i < len(words): | |
| word = re.sub(r'[^a-z\s]', '', words[i].lower()).strip() | |
| # Check two-word fillers first ("you know", "i mean") | |
| if i + 1 < len(words): | |
| two = word + " " + re.sub(r'[^a-z\s]', '', words[i+1].lower()).strip() | |
| if two in FILLER_WORDS: | |
| i += 2 | |
| continue | |
| if word in FILLER_WORDS: | |
| i += 1 | |
| continue | |
| result.append(words[i]) | |
| i += 1 | |
| return " ".join(result) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STUTTER REMOVAL β FIXED | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _remove_stutters(self, audio, sr, segments): | |
| """ | |
| FIX: Now correctly catches triple+ repeats (I I I was β I was). | |
| Old code broke after finding one repeat and missed subsequent ones. | |
| Strategy: | |
| - Scan forward from each word | |
| - While next word == current word, mark all but last as cuts | |
| - Skip past all repeats in one go | |
| """ | |
| try: | |
| if len(segments) < 2: | |
| return audio, 0 | |
| cuts = [] | |
| stutters_found = 0 | |
| i = 0 | |
| while i < len(segments): | |
| word = re.sub(r'[^a-z]', '', segments[i].get('word', '').strip().lower()) | |
| if not word: | |
| i += 1 | |
| continue | |
| # FIX: Look ahead for ALL consecutive repeats, not just one | |
| j = i + 1 | |
| while j < len(segments): | |
| next_word = re.sub(r'[^a-z]', '', segments[j].get('word', '').strip().lower()) | |
| if next_word == word: | |
| # Mark earlier copy as cut, keep advancing | |
| cuts.append((segments[i]['start'], segments[i]['end'])) | |
| stutters_found += 1 | |
| i = j # slide i forward to current repeat | |
| j += 1 | |
| else: | |
| break # no more repeats β stop | |
| i += 1 | |
| if not cuts: | |
| return audio, 0 | |
| # Build output | |
| result = [] | |
| prev = 0.0 | |
| for start, end in sorted(cuts, key=lambda x: x[0]): | |
| keep_sta = int(prev * sr) | |
| keep_end = int(start * sr) | |
| if keep_sta < keep_end: | |
| result.append(audio[keep_sta:keep_end]) | |
| gap_len = int((end - start) * sr) | |
| if gap_len > 0: | |
| result.append(self._fill_with_room_tone(gap_len)) | |
| prev = end | |
| remain = int(prev * sr) | |
| if remain < len(audio): | |
| result.append(audio[remain:]) | |
| out = np.concatenate(result) if result else audio | |
| print(f"[Denoiser] β Removed {stutters_found} stutters") | |
| return out.astype(np.float32), stutters_found | |
| except Exception as e: | |
| logger.warning(f"Stutter removal failed: {e}") | |
| return audio, 0 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # BREATH REDUCTION | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _reduce_breaths(self, audio, sr): | |
| """ | |
| Breaths = short broadband bursts between speech. | |
| Non-stationary spectral gating catches them well. | |
| """ | |
| try: | |
| import noisereduce as nr | |
| cleaned = nr.reduce_noise( | |
| y=audio, sr=sr, | |
| stationary=False, | |
| prop_decrease=0.90, # increased from 0.60 | |
| freq_mask_smooth_hz=400, | |
| time_mask_smooth_ms=40, | |
| n_std_thresh_stationary=1.0, | |
| ).astype(np.float32) | |
| print("[Denoiser] β Breath reduction done") | |
| return cleaned | |
| except Exception as e: | |
| logger.warning(f"Breath reduction failed: {e}") | |
| return audio | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MOUTH SOUND REDUCTION β FIXED THRESHOLD | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _reduce_mouth_sounds(self, audio, sr): | |
| """ | |
| Mouth clicks/pops = very short, very high amplitude transients. | |
| FIX: Threshold raised from 4.5β6.0 std to avoid removing | |
| real consonants like p, b, t which have similar transient energy. | |
| """ | |
| try: | |
| result = audio.copy() | |
| win = int(sr * 0.003) # 3ms window | |
| hop = win // 2 | |
| rms_arr = [] | |
| for i in range(0, len(audio) - win, hop): | |
| rms_arr.append(float(np.sqrt(np.mean(audio[i:i+win]**2)))) | |
| if not rms_arr: | |
| return audio, 0 | |
| rms_arr = np.array(rms_arr) | |
| mean_rms = float(np.mean(rms_arr)) | |
| std_rms = float(np.std(rms_arr)) | |
| # FIX: was 4.5 β too sensitive, removed real speech consonants | |
| threshold = mean_rms + 6.0 * std_rms | |
| n_removed = 0 | |
| for idx, rms in enumerate(rms_arr): | |
| if rms > threshold: | |
| start = idx * hop | |
| end = min(start + win, len(result)) | |
| fade = np.linspace(1, 0, end - start) | |
| result[start:end] *= fade | |
| n_removed += 1 | |
| if n_removed: | |
| print(f"[Denoiser] β Suppressed {n_removed} mouth sound transients") | |
| return result.astype(np.float32), n_removed | |
| except Exception as e: | |
| logger.warning(f"Mouth sound reduction failed: {e}") | |
| return audio, 0 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LONG SILENCE REMOVAL | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _remove_long_silences(self, audio, sr, | |
| max_silence_sec=1.5, | |
| keep_pause_sec=0.4): | |
| """ | |
| Shorten silences longer than max_silence_sec. | |
| Keeps keep_pause_sec worth of silence for natural pacing. | |
| """ | |
| try: | |
| frame_len = int(sr * 0.02) | |
| max_sil_frames = int(max_silence_sec / 0.02) | |
| keep_frames = int(keep_pause_sec / 0.02) | |
| threshold = 0.008 | |
| kept = [] | |
| silence_count = 0 | |
| total_removed = 0 | |
| in_long_sil = False | |
| for i in range(0, len(audio) - frame_len, frame_len): | |
| frame = audio[i:i + frame_len] | |
| rms = float(np.sqrt(np.mean(frame**2))) | |
| if rms < threshold: | |
| silence_count += 1 | |
| if silence_count <= max_sil_frames: | |
| kept.append(frame) | |
| else: | |
| total_removed += frame_len | |
| in_long_sil = True | |
| else: | |
| if in_long_sil: | |
| pad = self._fill_with_room_tone(keep_frames * frame_len) | |
| kept.append(pad) | |
| in_long_sil = False | |
| silence_count = 0 | |
| kept.append(frame) | |
| result = np.concatenate(kept) if kept else audio | |
| removed_sec = total_removed / sr | |
| if removed_sec > 0: | |
| print(f"[Denoiser] β Removed {removed_sec:.1f}s of long silences") | |
| return result.astype(np.float32), removed_sec | |
| except Exception as e: | |
| logger.warning(f"Silence removal failed: {e}") | |
| return audio, 0.0 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # NORMALIZATION β FIXED RMS FALLBACK | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _normalise(self, audio, sr): | |
| try: | |
| import pyloudnorm as pyln | |
| meter = pyln.Meter(sr) | |
| loudness = meter.integrated_loudness(audio) | |
| if np.isfinite(loudness) and loudness < 0: | |
| audio = pyln.normalize.loudness(audio, loudness, TARGET_LOUDNESS) | |
| print(f"[Denoiser] β Normalized: {loudness:.1f} β {TARGET_LOUDNESS} LUFS") | |
| except Exception: | |
| # FIX: Corrected RMS fallback formula | |
| # Old: audio * (10 ** (TARGET_LOUDNESS / 20.0) / rms) β wrong | |
| # New: scale so RMS matches target linear amplitude | |
| rms = np.sqrt(np.mean(audio**2)) | |
| if rms > 1e-9: | |
| target_rms = 10 ** (TARGET_LOUDNESS / 20.0) # β 0.126 | |
| audio = audio * (target_rms / rms) # correct ratio | |
| return np.clip(audio, -1.0, 1.0).astype(np.float32) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # HELPERS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _to_wav(self, src, dst, target_sr): | |
| result = subprocess.run([ | |
| "ffmpeg", "-y", "-i", src, | |
| "-acodec", "pcm_s24le", "-ar", str(target_sr), dst | |
| ], capture_output=True) | |
| if result.returncode != 0: | |
| data, sr = sf.read(src, always_2d=True) | |
| sf.write(dst, data, sr, subtype="PCM_24") | |
| def _resample(self, audio, orig_sr, target_sr): | |
| try: | |
| import librosa | |
| return librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr) | |
| except Exception: | |
| length = int(len(audio) * target_sr / orig_sr) | |
| return np.interp( | |
| np.linspace(0, len(audio), length), | |
| np.arange(len(audio)), audio | |
| ).astype(np.float32) |