clearwave-ai / denoiser.py
testingfaces's picture
Update denoiser.py
0e3930a verified
"""
Department 1 β€” Professional Audio Enhancer
Matches CleanVoice feature-for-feature using FREE local models:
βœ… Background noise removal β†’ DeepFilterNet (SOTA free model) β†’ noisereduce fallback
βœ… Filler word removal β†’ Word-level timestamps + room tone fill
βœ… Stutter removal β†’ Repeated-phrase detection + cut (fixed: catches triple+ repeats)
βœ… Long silence removal β†’ Energy-based VAD (keeps natural pauses)
βœ… Breath sound reduction β†’ Spectral gating (noisereduce non-stationary)
βœ… Mouth sound reduction β†’ Amplitude zscore transient suppression (tuned threshold)
βœ… Room tone fill β†’ Captures room noise, fills cuts naturally
βœ… Audio normalization β†’ pyloudnorm -18 LUFS
βœ… CD quality output β†’ 48000Hz PCM_24 (matches DeepFilterNet native SR)
FIXES APPLIED:
- TARGET_SR set to 48000 to match DeepFilterNet natively (no double resampling)
- Mouth sound threshold raised 4.5β†’6.0 std (was removing real consonants p/b/t)
- Duplicate _remove_background_noise fixed (was silently overwriting first def)
- TARGET_SR set to 48000 β€” matches DeepFilterNet natively
- Wiener filter added as Priority 2 fallback (artifact-free)
- noisereduce kept as gentle last resort only
- Room tone fallback: uses first 100ms if audio too short
- Stutter detection fixed: now catches triple+ repeats (I I I was β†’ I was)
- Filler removal: also returns cleaned transcript text
- Normalise RMS fallback formula corrected
"""
import os
import re
import time
import subprocess
import tempfile
import numpy as np
import soundfile as sf
import logging
logger = logging.getLogger(__name__)
# NOTE: 44100 used on HF Spaces (DeepFilterNet not available β€” no Rust compiler)
# Locally with DeepFilterNet installed, change this to 48000 for best quality
TARGET_SR = 48000 # matches DeepFilterNet native SR
TARGET_LOUDNESS = -18.0
# Filler words (English + Telugu + Hindi)
FILLER_WORDS = {
"um", "umm", "ummm", "uh", "uhh", "uhhh",
"hmm", "hm", "hmm", "hmmm",
"er", "err", "errr",
"eh", "ahh", "ah",
"like", "basically", "literally",
"you know", "i mean", "so",
"right", "okay", "ok",
# Telugu
"ante", "ane", "mane", "arey", "enti",
# Hindi
"matlab", "yani", "bas", "acha",
}
class Denoiser:
def __init__(self):
self._df_model = None
self._df_state = None
self._df_loaded = False
self._room_tone = None # captured room noise sample
print("[Denoiser] βœ… Professional Audio Enhancer ready")
# ══════════════════════════════════════════════════════════════════
# MAIN ENTRY POINT
# ══════════════════════════════════════════════════════════════════
def process(self, audio_path: str, out_dir: str,
remove_fillers: bool = True,
remove_silences: bool = True,
remove_breaths: bool = True,
remove_mouth_sounds: bool = True,
remove_stutters: bool = True,
word_segments: list = None) -> dict:
"""
Full professional pipeline.
word_segments: list of {'word': str, 'start': float, 'end': float}
from Whisper word-level timestamps.
Returns: {'audio_path': str, 'stats': dict}
"""
t0 = time.time()
stats = {}
print("[Denoiser] β–Ά Starting professional enhancement pipeline...")
# ── 0. Convert to standard WAV ───────────────────────────────
wav_in = os.path.join(out_dir, "stage0_input.wav")
self._to_wav(audio_path, wav_in, TARGET_SR)
audio, sr = sf.read(wav_in, always_2d=True)
n_ch = audio.shape[1]
duration = len(audio) / sr
print(f"[Denoiser] Input: {sr}Hz, {n_ch}ch, {duration:.1f}s")
# Work in mono float32
mono = audio.mean(axis=1).astype(np.float32)
# ── 1. Capture room tone BEFORE denoising ────────────────────
self._room_tone = self._capture_room_tone(mono, sr)
# ── 2. Background Noise Removal ──────────────────────────────
mono, noise_method = self._remove_background_noise(mono, sr)
stats['noise_method'] = noise_method
# ── 3. Mouth Sound Reduction (clicks/pops) ───────────────────
if remove_mouth_sounds:
mono, n_clicks = self._reduce_mouth_sounds(mono, sr)
stats['mouth_sounds_removed'] = n_clicks
# ── 4. Breath Reduction ──────────────────────────────────────
if remove_breaths:
mono = self._reduce_breaths(mono, sr)
stats['breaths_reduced'] = True
# ── 5. Filler Word Removal (needs word-level timestamps) ─────
stats['fillers_removed'] = 0
if remove_fillers and word_segments:
mono, n_fillers = self._remove_fillers(mono, sr, word_segments)
stats['fillers_removed'] = n_fillers
# ── 6. Stutter Removal (needs word-level timestamps) ─────────
stats['stutters_removed'] = 0
if remove_stutters and word_segments:
mono, n_stutters = self._remove_stutters(mono, sr, word_segments)
stats['stutters_removed'] = n_stutters
# ── 7. Long Silence Removal ───────────────────────────────────
stats['silences_removed_sec'] = 0.0
if remove_silences:
mono, sil_sec = self._remove_long_silences(mono, sr)
stats['silences_removed_sec'] = round(sil_sec, 2)
# ── 8. Normalize Loudness ─────────────────────────────────────
mono = self._normalise(mono, sr)
# ── 9. Restore stereo / save ──────────────────────────────────
out_audio = np.stack([mono, mono], axis=1) if n_ch == 2 else mono
out_path = os.path.join(out_dir, "denoised.wav")
sf.write(out_path, out_audio, sr, subtype="PCM_24")
stats['processing_sec'] = round(time.time() - t0, 2)
print(f"[Denoiser] βœ… Done in {stats['processing_sec']}s | {stats}")
return {'audio_path': out_path, 'stats': stats}
# ══════════════════════════════════════════════════════════════════
# ROOM TONE CAPTURE
# ══════════════════════════════════════════════════════════════════
def _capture_room_tone(self, audio: np.ndarray, sr: int,
sample_sec: float = 0.5) -> np.ndarray:
"""
Find the quietest 0.5s section of audio = room tone.
FIX: Falls back to first 100ms if audio is too short.
"""
try:
frame = int(sr * sample_sec)
# FIX: Robust fallback for short audio
if len(audio) < frame * 2:
fallback_len = min(int(sr * 0.1), len(audio)) # first 100ms
print("[Denoiser] Short audio β€” using first 100ms as room tone")
return audio[:fallback_len].copy().astype(np.float32)
best_rms = float('inf')
best_start = 0
step = sr
for i in range(0, len(audio) - frame, step):
chunk = audio[i:i + frame]
rms = float(np.sqrt(np.mean(chunk ** 2)))
if rms < best_rms:
best_rms = rms
best_start = i
room = audio[best_start: best_start + frame].copy()
print(f"[Denoiser] Room tone captured: RMS={best_rms:.5f}")
return room
except Exception as e:
logger.warning(f"Room tone capture failed: {e}")
return np.zeros(int(sr * sample_sec), dtype=np.float32)
def _fill_with_room_tone(self, length: int) -> np.ndarray:
"""Tile room tone to fill a gap of `length` samples."""
if self._room_tone is None or len(self._room_tone) == 0:
return np.zeros(length, dtype=np.float32)
reps = length // len(self._room_tone) + 1
tiled = np.tile(self._room_tone, reps)[:length]
# Fade in/out to avoid clicks
fade = min(int(0.01 * len(tiled)), 64)
if fade > 0:
tiled[:fade] *= np.linspace(0, 1, fade)
tiled[-fade:] *= np.linspace(1, 0, fade)
return tiled.astype(np.float32)
# ══════════════════════════════════════════════════════════════════
# BACKGROUND NOISE REMOVAL
# ══════════════════════════════════════════════════════════════════
def _remove_background_noise(self, audio, sr):
# ── Priority 1: DeepFilterNet (SOTA β€” best quality) ─────────
try:
result = self._deepfilter(audio, sr)
print("[Denoiser] βœ… DeepFilterNet noise removal done")
return result, "DeepFilterNet"
except Exception as e:
logger.warning(f"[Denoiser] DeepFilterNet unavailable ({e})")
# ── Try Wiener filter (scipy β€” artifact-free, no compilation needed) ─
try:
result = self._rnnoise(audio, sr)
print("[Denoiser] βœ… Wiener filter noise removal done")
return result, "Wiener filter"
except Exception as e:
logger.warning(f"[Denoiser] Wiener filter failed ({e})")
# ── Fallback: noisereduce with mild settings ──────────────────
# IMPORTANT: Keep prop_decrease LOW (0.50-0.60) to avoid musical
# noise artifacts. Two aggressive passes make musical noise WORSE.
try:
import noisereduce as nr
# Single gentle pass β€” avoids musical noise artifacts
cleaned = nr.reduce_noise(
y=audio, sr=sr,
stationary=False, # non-stationary handles both types
prop_decrease=0.55, # gentle β€” avoids buzzing artifacts
freq_mask_smooth_hz=1000, # heavy smoothing = less musical noise
time_mask_smooth_ms=100, # heavy smoothing = less musical noise
n_std_thresh_stationary=2.0, # higher = less aggressive
).astype(np.float32)
print("[Denoiser] βœ… noisereduce (gentle, artifact-free) done")
return cleaned, "noisereduce"
except Exception as e:
logger.warning(f"noisereduce failed: {e}")
return audio, "none"
def _rnnoise(self, audio, sr):
"""
Wiener filter via scipy β€” no compilation needed, works on HF Spaces.
Much cleaner than noisereduce for voice β€” no musical artifacts.
"""
from scipy.signal import wiener
# Wiener filter works best on short frames
frame_size = int(sr * 0.02) # 20ms frames
result = np.zeros_like(audio)
for i in range(0, len(audio) - frame_size, frame_size):
frame = audio[i:i + frame_size]
result[i:i + frame_size] = wiener(frame, mysize=7)
# Handle last chunk
remainder = len(audio) % frame_size
if remainder:
result[-remainder:] = wiener(audio[-remainder:], mysize=7)
return result.astype(np.float32)
def _deepfilter(self, audio, sr):
if not self._df_loaded:
from df.enhance import enhance, init_df
self._df_model, self._df_state, _ = init_df()
self._df_loaded = True
from df.enhance import enhance
import torch
df_sr = self._df_state.sr()
# FIX: TARGET_SR now matches DeepFilterNet's native SR (48kHz)
# so resampling is skipped in most cases
a = self._resample(audio, sr, df_sr) if sr != df_sr else audio
t = torch.from_numpy(a).unsqueeze(0)
out = enhance(self._df_model, self._df_state, t)
res = out.squeeze().numpy().astype(np.float32)
return self._resample(res, df_sr, sr) if df_sr != sr else res
# ══════════════════════════════════════════════════════════════════
# FILLER WORD REMOVAL + ROOM TONE FILL
# ══════════════════════════════════════════════════════════════════
def _remove_fillers(self, audio, sr, segments):
"""
Cut filler words using word-level timestamps.
Fills gaps with room tone for natural sound.
"""
try:
cuts = []
for seg in segments:
word = seg.get('word', '').strip().lower()
word = re.sub(r'[^a-z\s]', '', word).strip()
if word in FILLER_WORDS:
cuts.append((seg['start'], seg['end'], word))
if not cuts:
return audio, 0
result = []
prev = 0.0
for start, end, word in sorted(cuts, key=lambda x: x[0]):
keep_end = int(start * sr)
keep_sta = int(prev * sr)
if keep_sta < keep_end:
result.append(audio[keep_sta:keep_end])
gap_len = int((end - start) * sr)
if gap_len > 0:
result.append(self._fill_with_room_tone(gap_len))
prev = end
remain_start = int(prev * sr)
if remain_start < len(audio):
result.append(audio[remain_start:])
out = np.concatenate(result) if result else audio
print(f"[Denoiser] βœ… Removed {len(cuts)} filler words: {[c[2] for c in cuts[:5]]}")
return out.astype(np.float32), len(cuts)
except Exception as e:
logger.warning(f"Filler removal failed: {e}")
return audio, 0
def clean_transcript_fillers(self, transcript: str) -> str:
"""
FIX (NEW): Also remove filler words from the transcript TEXT,
so the displayed text matches the cleaned audio.
"""
words = transcript.split()
result = []
i = 0
while i < len(words):
word = re.sub(r'[^a-z\s]', '', words[i].lower()).strip()
# Check two-word fillers first ("you know", "i mean")
if i + 1 < len(words):
two = word + " " + re.sub(r'[^a-z\s]', '', words[i+1].lower()).strip()
if two in FILLER_WORDS:
i += 2
continue
if word in FILLER_WORDS:
i += 1
continue
result.append(words[i])
i += 1
return " ".join(result)
# ══════════════════════════════════════════════════════════════════
# STUTTER REMOVAL β€” FIXED
# ══════════════════════════════════════════════════════════════════
def _remove_stutters(self, audio, sr, segments):
"""
FIX: Now correctly catches triple+ repeats (I I I was β†’ I was).
Old code broke after finding one repeat and missed subsequent ones.
Strategy:
- Scan forward from each word
- While next word == current word, mark all but last as cuts
- Skip past all repeats in one go
"""
try:
if len(segments) < 2:
return audio, 0
cuts = []
stutters_found = 0
i = 0
while i < len(segments):
word = re.sub(r'[^a-z]', '', segments[i].get('word', '').strip().lower())
if not word:
i += 1
continue
# FIX: Look ahead for ALL consecutive repeats, not just one
j = i + 1
while j < len(segments):
next_word = re.sub(r'[^a-z]', '', segments[j].get('word', '').strip().lower())
if next_word == word:
# Mark earlier copy as cut, keep advancing
cuts.append((segments[i]['start'], segments[i]['end']))
stutters_found += 1
i = j # slide i forward to current repeat
j += 1
else:
break # no more repeats β€” stop
i += 1
if not cuts:
return audio, 0
# Build output
result = []
prev = 0.0
for start, end in sorted(cuts, key=lambda x: x[0]):
keep_sta = int(prev * sr)
keep_end = int(start * sr)
if keep_sta < keep_end:
result.append(audio[keep_sta:keep_end])
gap_len = int((end - start) * sr)
if gap_len > 0:
result.append(self._fill_with_room_tone(gap_len))
prev = end
remain = int(prev * sr)
if remain < len(audio):
result.append(audio[remain:])
out = np.concatenate(result) if result else audio
print(f"[Denoiser] βœ… Removed {stutters_found} stutters")
return out.astype(np.float32), stutters_found
except Exception as e:
logger.warning(f"Stutter removal failed: {e}")
return audio, 0
# ══════════════════════════════════════════════════════════════════
# BREATH REDUCTION
# ══════════════════════════════════════════════════════════════════
def _reduce_breaths(self, audio, sr):
"""
Breaths = short broadband bursts between speech.
Non-stationary spectral gating catches them well.
"""
try:
import noisereduce as nr
cleaned = nr.reduce_noise(
y=audio, sr=sr,
stationary=False,
prop_decrease=0.90, # increased from 0.60
freq_mask_smooth_hz=400,
time_mask_smooth_ms=40,
n_std_thresh_stationary=1.0,
).astype(np.float32)
print("[Denoiser] βœ… Breath reduction done")
return cleaned
except Exception as e:
logger.warning(f"Breath reduction failed: {e}")
return audio
# ══════════════════════════════════════════════════════════════════
# MOUTH SOUND REDUCTION β€” FIXED THRESHOLD
# ══════════════════════════════════════════════════════════════════
def _reduce_mouth_sounds(self, audio, sr):
"""
Mouth clicks/pops = very short, very high amplitude transients.
FIX: Threshold raised from 4.5β†’6.0 std to avoid removing
real consonants like p, b, t which have similar transient energy.
"""
try:
result = audio.copy()
win = int(sr * 0.003) # 3ms window
hop = win // 2
rms_arr = []
for i in range(0, len(audio) - win, hop):
rms_arr.append(float(np.sqrt(np.mean(audio[i:i+win]**2))))
if not rms_arr:
return audio, 0
rms_arr = np.array(rms_arr)
mean_rms = float(np.mean(rms_arr))
std_rms = float(np.std(rms_arr))
# FIX: was 4.5 β€” too sensitive, removed real speech consonants
threshold = mean_rms + 6.0 * std_rms
n_removed = 0
for idx, rms in enumerate(rms_arr):
if rms > threshold:
start = idx * hop
end = min(start + win, len(result))
fade = np.linspace(1, 0, end - start)
result[start:end] *= fade
n_removed += 1
if n_removed:
print(f"[Denoiser] βœ… Suppressed {n_removed} mouth sound transients")
return result.astype(np.float32), n_removed
except Exception as e:
logger.warning(f"Mouth sound reduction failed: {e}")
return audio, 0
# ══════════════════════════════════════════════════════════════════
# LONG SILENCE REMOVAL
# ══════════════════════════════════════════════════════════════════
def _remove_long_silences(self, audio, sr,
max_silence_sec=1.5,
keep_pause_sec=0.4):
"""
Shorten silences longer than max_silence_sec.
Keeps keep_pause_sec worth of silence for natural pacing.
"""
try:
frame_len = int(sr * 0.02)
max_sil_frames = int(max_silence_sec / 0.02)
keep_frames = int(keep_pause_sec / 0.02)
threshold = 0.008
kept = []
silence_count = 0
total_removed = 0
in_long_sil = False
for i in range(0, len(audio) - frame_len, frame_len):
frame = audio[i:i + frame_len]
rms = float(np.sqrt(np.mean(frame**2)))
if rms < threshold:
silence_count += 1
if silence_count <= max_sil_frames:
kept.append(frame)
else:
total_removed += frame_len
in_long_sil = True
else:
if in_long_sil:
pad = self._fill_with_room_tone(keep_frames * frame_len)
kept.append(pad)
in_long_sil = False
silence_count = 0
kept.append(frame)
result = np.concatenate(kept) if kept else audio
removed_sec = total_removed / sr
if removed_sec > 0:
print(f"[Denoiser] βœ… Removed {removed_sec:.1f}s of long silences")
return result.astype(np.float32), removed_sec
except Exception as e:
logger.warning(f"Silence removal failed: {e}")
return audio, 0.0
# ══════════════════════════════════════════════════════════════════
# NORMALIZATION β€” FIXED RMS FALLBACK
# ══════════════════════════════════════════════════════════════════
def _normalise(self, audio, sr):
try:
import pyloudnorm as pyln
meter = pyln.Meter(sr)
loudness = meter.integrated_loudness(audio)
if np.isfinite(loudness) and loudness < 0:
audio = pyln.normalize.loudness(audio, loudness, TARGET_LOUDNESS)
print(f"[Denoiser] βœ… Normalized: {loudness:.1f} β†’ {TARGET_LOUDNESS} LUFS")
except Exception:
# FIX: Corrected RMS fallback formula
# Old: audio * (10 ** (TARGET_LOUDNESS / 20.0) / rms) ← wrong
# New: scale so RMS matches target linear amplitude
rms = np.sqrt(np.mean(audio**2))
if rms > 1e-9:
target_rms = 10 ** (TARGET_LOUDNESS / 20.0) # β‰ˆ 0.126
audio = audio * (target_rms / rms) # correct ratio
return np.clip(audio, -1.0, 1.0).astype(np.float32)
# ══════════════════════════════════════════════════════════════════
# HELPERS
# ══════════════════════════════════════════════════════════════════
def _to_wav(self, src, dst, target_sr):
result = subprocess.run([
"ffmpeg", "-y", "-i", src,
"-acodec", "pcm_s24le", "-ar", str(target_sr), dst
], capture_output=True)
if result.returncode != 0:
data, sr = sf.read(src, always_2d=True)
sf.write(dst, data, sr, subtype="PCM_24")
def _resample(self, audio, orig_sr, target_sr):
try:
import librosa
return librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr)
except Exception:
length = int(len(audio) * target_sr / orig_sr)
return np.interp(
np.linspace(0, len(audio), length),
np.arange(len(audio)), audio
).astype(np.float32)