clearwave-api / denoiser.py
testingfaces's picture
Upload 6 files
9716505 verified
"""
Department 1 β€” Professional Audio Enhancer (v2 β€” HF Spaces Optimised)
=======================================================================
βœ… Background noise removal β†’ SepFormer (HF/speechbrain, no Rust needed)
β†’ Two-pass noisereduce (stationary + non-stat) fallback
βœ… Filler word removal β†’ Whisper confidence-gated word-level timestamps
βœ… Stutter removal β†’ Phonetic-similarity aware repeat detection
βœ… Long silence removal β†’ Adaptive VAD threshold (percentile-based, env-aware)
βœ… Breath sound reduction β†’ Spectral gating (noisereduce non-stationary)
βœ… Mouth sound reduction β†’ Amplitude z-score transient suppression
βœ… Room tone fill β†’ Seamless crossfade splice (no edit seams/clicks)
βœ… Audio normalization β†’ pyloudnorm -18 LUFS
βœ… CD quality output β†’ 44100Hz PCM_24 (HF Spaces compatible)
UPGRADES v2:
[NOISE] SepFormer (speechbrain) as primary β€” no Rust, works on HF Spaces
[NOISE] Two-pass noisereduce fallback: stationary first, then non-stationary
to catch residual noise without aggressive single-pass artifacts
[FILLER] Whisper avg_logprob + no_speech_prob confidence gating β€”
low-confidence words are not blindly cut anymore
[FILLER] Min-duration guard: skips cuts shorter than 80ms (avoids micro-glitches)
[STUTTER] Phonetic normalisation (jellyfish/editdistance) catches near-repeats
e.g. "the" / "tha", "and" / "an" β€” not just exact matches
[SILENCE] Adaptive threshold: uses 15th-percentile RMS of the recording
instead of fixed 0.008 β€” works in noisy rooms and quiet studios alike
[SPLICE] Crossfade blending on ALL cuts (fillers, stutters, silences) β€”
smooth 20ms equal-power fade eliminates click/seam artifacts
[PERF] Model singleton caching β€” SepFormer loaded once, reused across calls
[PERF] VAD pre-scan with Silero (if available) to skip non-speech segments
before heavy processing
[ROBUST] Every stage returns original audio on failure (already true, kept)
[ROBUST] ffmpeg stderr captured and logged on non-zero exit
"""
import os
import re
import time
import subprocess
import numpy as np
import soundfile as sf
import logging
logger = logging.getLogger(__name__)
TARGET_SR = 48000 # 48kHz matches DeepFilterNet native SR (Rust available via Docker)
TARGET_LOUDNESS = -18.0
# Minimum duration of a detected cut to actually apply it (avoids micro-glitches)
MIN_CUT_SEC = 0.08
# Whisper confidence gate: only cut a word if its log-probability is above this.
# Whisper avg_logprob is in range (-inf, 0]; -0.3 β‰ˆ "fairly confident".
FILLER_MIN_LOGPROB = -0.5 # below this β†’ too uncertain to cut
FILLER_MAX_NO_SPEECH = 0.4 # above this β†’ Whisper thinks it's non-speech anyway
# Filler words (English + Telugu + Hindi)
FILLER_WORDS = {
"um", "umm", "ummm", "uh", "uhh", "uhhh",
"hmm", "hm", "hmmm",
"er", "err", "errr",
"eh", "ahh", "ah",
"like", "basically", "literally",
"you know", "i mean", "so",
"right", "okay", "ok",
# Telugu
"ante", "ane", "mane", "arey", "enti",
# Hindi
"matlab", "yani", "bas", "acha",
}
# ---------------------------------------------------------------------------
# Module-level model cache (survives across Denoiser() instances on same Space)
# ---------------------------------------------------------------------------
_SILERO_MODEL = None # Silero VAD
_SILERO_UTILS = None
class Denoiser:
def __init__(self):
self._room_tone = None
print("[Denoiser] βœ… Professional Audio Enhancer v2 ready (HF Spaces mode)")
# ══════════════════════════════════════════════════════════════════
# MAIN ENTRY POINT
# ══════════════════════════════════════════════════════════════════
def process(self, audio_path: str, out_dir: str,
remove_fillers: bool = True,
remove_silences: bool = True,
remove_breaths: bool = True,
remove_mouth_sounds: bool = True,
remove_stutters: bool = True,
word_segments: list = None,
original_filename: str = None) -> dict:
"""
Full professional pipeline.
word_segments: list of dicts from Whisper word-level timestamps.
Each dict: {
'word': str,
'start': float, # seconds
'end': float, # seconds
'avg_logprob': float, # optional β€” Whisper segment-level confidence
'no_speech_prob':float, # optional β€” Whisper no-speech probability
}
Returns: {'audio_path': str, 'stats': dict}
"""
t0 = time.time()
stats = {}
print("[Denoiser] β–Ά Starting professional enhancement pipeline v2...")
# ── 0. Convert to standard WAV ───────────────────────────────
wav_in = os.path.join(out_dir, "stage0_input.wav")
self._to_wav(audio_path, wav_in, TARGET_SR)
audio, sr = sf.read(wav_in, always_2d=True)
n_ch = audio.shape[1]
duration = len(audio) / sr
print(f"[Denoiser] Input: {sr}Hz, {n_ch}ch, {duration:.1f}s")
# Work in mono float32
mono = audio.mean(axis=1).astype(np.float32)
# ── 1. Capture room tone BEFORE any denoising ────────────────
self._room_tone = self._capture_room_tone(mono, sr)
# ── 2. Background Noise Removal ──────────────────────────────
mono, noise_method = self._remove_background_noise(mono, sr)
stats['noise_method'] = noise_method
# ── 3. Mouth Sound Reduction (clicks/pops) ───────────────────
if remove_mouth_sounds:
mono, n_clicks = self._reduce_mouth_sounds(mono, sr)
stats['mouth_sounds_removed'] = n_clicks
# ── 4. Breath Reduction ──────────────────────────────────────
if remove_breaths:
mono = self._reduce_breaths(mono, sr)
stats['breaths_reduced'] = True
# ── 5. Filler Word Removal ───────────────────────────────────
stats['fillers_removed'] = 0
if remove_fillers and word_segments:
mono, n_fillers = self._remove_fillers(mono, sr, word_segments)
stats['fillers_removed'] = n_fillers
# ── 6. Stutter Removal ───────────────────────────────────────
stats['stutters_removed'] = 0
if remove_stutters and word_segments:
mono, n_stutters = self._remove_stutters(mono, sr, word_segments)
stats['stutters_removed'] = n_stutters
# ── 7. Long Silence Removal ───────────────────────────────────
stats['silences_removed_sec'] = 0.0
if remove_silences:
mono, sil_sec = self._remove_long_silences(mono, sr)
stats['silences_removed_sec'] = round(sil_sec, 2)
# ── 8. Normalize Loudness ─────────────────────────────────────
mono = self._normalise(mono, sr)
# ── 9. Restore stereo / save as MP3 ──────────────────────────
out_audio = np.stack([mono, mono], axis=1) if n_ch == 2 else mono
# Build output filename: strip original extension, append _cleared.mp3
# e.g. "output.wav" β†’ "output_cleared.mp3"
if original_filename:
base = os.path.splitext(os.path.basename(original_filename))[0]
else:
base = os.path.splitext(os.path.basename(audio_path))[0]
out_name = f"{base}_cleared.mp3"
# Write a temporary WAV first (soundfile can't encode MP3),
# then convert to MP3 via ffmpeg (already in the Dockerfile).
tmp_wav = os.path.join(out_dir, "denoised_tmp.wav")
out_path = os.path.join(out_dir, out_name)
sf.write(tmp_wav, out_audio, sr, format="WAV", subtype="PCM_24")
result = subprocess.run([
"ffmpeg", "-y", "-i", tmp_wav,
"-codec:a", "libmp3lame",
"-qscale:a", "2", # VBR quality 2 β‰ˆ 190 kbps β€” transparent quality
"-ar", str(sr),
out_path
], capture_output=True)
if result.returncode != 0:
stderr = result.stderr.decode(errors="replace")
logger.warning(f"MP3 export failed, falling back to WAV: {stderr[-300:]}")
out_path = tmp_wav # graceful fallback β€” still return something
else:
try:
os.remove(tmp_wav) # clean up temp WAV
except OSError:
pass
stats['processing_sec'] = round(time.time() - t0, 2)
print(f"[Denoiser] βœ… Done in {stats['processing_sec']}s | {stats}")
return {'audio_path': out_path, 'stats': stats}
# ══════════════════════════════════════════════════════════════════
# ROOM TONE CAPTURE
# ══════════════════════════════════════════════════════════════════
def _capture_room_tone(self, audio: np.ndarray, sr: int,
sample_sec: float = 0.5) -> np.ndarray:
"""Find the quietest 0.5s window in the recording β€” that's the room tone."""
try:
frame = int(sr * sample_sec)
if len(audio) < frame * 2:
fallback_len = min(int(sr * 0.1), len(audio))
print("[Denoiser] Short audio β€” using first 100ms as room tone")
return audio[:fallback_len].copy().astype(np.float32)
best_rms = float('inf')
best_start = 0
step = sr # 1-second steps
for i in range(0, len(audio) - frame, step):
rms = float(np.sqrt(np.mean(audio[i:i + frame] ** 2)))
if rms < best_rms:
best_rms, best_start = rms, i
room = audio[best_start: best_start + frame].copy()
print(f"[Denoiser] Room tone captured: RMS={best_rms:.5f}")
return room
except Exception as e:
logger.warning(f"Room tone capture failed: {e}")
return np.zeros(int(sr * sample_sec), dtype=np.float32)
def _fill_with_room_tone(self, length: int) -> np.ndarray:
"""Tile room tone to fill a gap of `length` samples."""
if self._room_tone is None or len(self._room_tone) == 0:
return np.zeros(length, dtype=np.float32)
reps = length // len(self._room_tone) + 1
tiled = np.tile(self._room_tone, reps)[:length]
fade = min(int(0.01 * len(tiled)), 64)
if fade > 0:
tiled[:fade] *= np.linspace(0, 1, fade)
tiled[-fade:] *= np.linspace(1, 0, fade)
return tiled.astype(np.float32)
# ══════════════════════════════════════════════════════════════════
# CROSSFADE SPLICE ← NEW
# Replaces abrupt room-tone insertion with smooth equal-power blend.
# ══════════════════════════════════════════════════════════════════
def _crossfade_join(self, a: np.ndarray, b: np.ndarray,
fade_ms: float = 20.0, sr: int = TARGET_SR) -> np.ndarray:
"""
Equal-power crossfade between the tail of `a` and the head of `b`.
Eliminates click/seam artifacts at all edit points.
"""
fade_n = int(sr * fade_ms / 1000)
fade_n = min(fade_n, len(a), len(b))
if fade_n < 2:
return np.concatenate([a, b])
t = np.linspace(0, np.pi / 2, fade_n)
fade_out = np.cos(t) # equal-power: cosΒ²+sinΒ²=1
fade_in = np.sin(t)
overlap = a[-fade_n:] * fade_out + b[:fade_n] * fade_in
return np.concatenate([a[:-fade_n], overlap, b[fade_n:]])
def _build_with_crossfade(self, audio: np.ndarray, cuts: list,
sr: int, fill_tone: bool = True) -> np.ndarray:
"""
Build output from a list of (start_sec, end_sec) cuts,
filling gaps with room tone and crossfading every join.
cuts: sorted list of (start_sec, end_sec) to REMOVE.
"""
segments = []
prev = 0.0
for start, end in sorted(cuts, key=lambda x: x[0]):
# Guard: skip cuts shorter than minimum
if (end - start) < MIN_CUT_SEC:
continue
keep_sta = int(prev * sr)
keep_end = int(start * sr)
if keep_sta < keep_end:
segments.append(audio[keep_sta:keep_end])
gap_len = int((end - start) * sr)
if fill_tone and gap_len > 0:
segments.append(self._fill_with_room_tone(gap_len))
prev = end
remain = int(prev * sr)
if remain < len(audio):
segments.append(audio[remain:])
if not segments:
return audio
# Crossfade every adjacent pair
result = segments[0]
for seg in segments[1:]:
result = self._crossfade_join(result, seg, fade_ms=20.0, sr=sr)
return result.astype(np.float32)
# ══════════════════════════════════════════════════════════════════
# BACKGROUND NOISE REMOVAL
# Chain: DeepFilterNet β†’ two-pass noisereduce β†’ passthrough
#
# SepFormer REMOVED β€” it is a speech separation model, not a denoiser.
# It reconstructs voice artificially β†’ robotic output.
#
# Two-pass noisereduce is the safe CPU fallback:
# Pass 1 (stationary) β€” removes steady hum/hiss/fan noise
# Pass 2 (non-stationary) β€” catches residual at low prop_decrease
# so original voice character is preserved
# ══════════════════════════════════════════════════════════════════
def _remove_background_noise(self, audio, sr):
# ── Primary: DeepFilterNet (SOTA, Rust available via Docker) ─────
try:
result = self._deepfilter(audio, sr)
print("[Denoiser] βœ… DeepFilterNet noise removal done")
return result, "DeepFilterNet"
except Exception as e:
logger.warning(f"[Denoiser] DeepFilterNet unavailable ({e})")
# ── Fallback: Single-pass noisereduce, stationary only ────────────
# PHILOSOPHY: do as little as possible to the signal.
# - stationary=True β†’ only targets steady/consistent noise (fan,
# hum, AC, room hiss). Leaves transient
# speech harmonics completely untouched.
# - prop_decrease=0.5 β†’ reduces noise by ~50%, not 100%.
# Keeps a thin noise floor so the voice
# never sounds "hollow" or over-processed.
# - No second pass, no non-stationary processing β€” those modes
# touch voice frequencies and cause the robotic effect.
try:
import noisereduce as nr
cleaned = nr.reduce_noise(
y=audio, sr=sr,
stationary=True,
prop_decrease=0.50,
).astype(np.float32)
print("[Denoiser] βœ… noisereduce done (voice-preserving, stationary only)")
return cleaned, "noisereduce_stationary"
except Exception as e:
logger.warning(f"noisereduce failed: {e}")
return audio, "none"
def _deepfilter(self, audio: np.ndarray, sr: int) -> np.ndarray:
"""DeepFilterNet enhancement (local only β€” requires Rust compiler)."""
from df.enhance import enhance, init_df
import torch
# Lazy-load, module-level cache not needed (rarely reached on HF Spaces)
if not hasattr(self, '_df_model') or self._df_model is None:
self._df_model, self._df_state, _ = init_df()
df_sr = self._df_state.sr()
a = self._resample(audio, sr, df_sr) if sr != df_sr else audio
t = torch.from_numpy(a).unsqueeze(0)
out = enhance(self._df_model, self._df_state, t)
res = out.squeeze().numpy().astype(np.float32)
return self._resample(res, df_sr, sr) if df_sr != sr else res
# ══════════════════════════════════════════════════════════════════
# FILLER WORD REMOVAL ← UPGRADED (confidence-gated + crossfade)
# ══════════════════════════════════════════════════════════════════
def _remove_fillers(self, audio: np.ndarray, sr: int, segments: list):
"""
Cuts filler words using Whisper word-level timestamps.
UPGRADE: Confidence gating β€” words are only cut if:
1. avg_logprob β‰₯ FILLER_MIN_LOGPROB (Whisper was confident)
2. no_speech_prob ≀ FILLER_MAX_NO_SPEECH (audio is actually speech)
3. Duration β‰₯ MIN_CUT_SEC (not a micro-glitch timestamp artefact)
Falls back gracefully when confidence fields are absent (older Whisper).
Gaps filled with room tone + crossfade for seamless edits.
"""
try:
cuts = []
for seg in segments:
word = seg.get('word', '').strip().lower()
word = re.sub(r'[^a-z\s]', '', word).strip()
if word not in FILLER_WORDS:
continue
start = seg.get('start', 0.0)
end = seg.get('end', 0.0)
# Duration guard
if (end - start) < MIN_CUT_SEC:
continue
# Confidence gate (optional fields β€” skip gate if absent)
avg_logprob = seg.get('avg_logprob', None)
no_speech_prob = seg.get('no_speech_prob', None)
if avg_logprob is not None and avg_logprob < FILLER_MIN_LOGPROB:
logger.debug(f"[Denoiser] Filler '{word}' skipped: "
f"low confidence ({avg_logprob:.2f})")
continue
if no_speech_prob is not None and no_speech_prob > FILLER_MAX_NO_SPEECH:
logger.debug(f"[Denoiser] Filler '{word}' skipped: "
f"no_speech_prob={no_speech_prob:.2f}")
continue
cuts.append((start, end))
if not cuts:
return audio, 0
out = self._build_with_crossfade(audio, cuts, sr, fill_tone=True)
print(f"[Denoiser] βœ… Removed {len(cuts)} filler words")
return out, len(cuts)
except Exception as e:
logger.warning(f"Filler removal failed: {e}")
return audio, 0
def clean_transcript_fillers(self, transcript: str) -> str:
"""Remove filler words from transcript TEXT to match cleaned audio."""
words = transcript.split()
result = []
i = 0
while i < len(words):
w = re.sub(r'[^a-z\s]', '', words[i].lower()).strip()
if i + 1 < len(words):
two = w + " " + re.sub(r'[^a-z\s]', '', words[i+1].lower()).strip()
if two in FILLER_WORDS:
i += 2
continue
if w in FILLER_WORDS:
i += 1
continue
result.append(words[i])
i += 1
return " ".join(result)
# ══════════════════════════════════════════════════════════════════
# STUTTER REMOVAL ← UPGRADED (phonetic similarity + crossfade)
# ══════════════════════════════════════════════════════════════════
def _remove_stutters(self, audio: np.ndarray, sr: int, segments: list):
"""
UPGRADE: Phonetic near-match detection in addition to exact repeats.
e.g. "the" / "tha", "and" / "an", "I" / "I" all caught.
Uses jellyfish.jaro_winkler_similarity if available;
falls back to plain edit-distance ratio, then exact match only.
Confidence gating applied here too (same thresholds as filler removal).
Crossfade used on all splices.
"""
try:
if len(segments) < 2:
return audio, 0
# Choose similarity function
sim_fn = self._word_similarity_fn()
cuts = []
stutters_found = 0
i = 0
while i < len(segments):
seg_i = segments[i]
word = re.sub(r'[^a-z]', '', seg_i.get('word', '').lower())
if not word:
i += 1
continue
# Confidence gate on the anchor word
if not self._passes_confidence_gate(seg_i):
i += 1
continue
# Look ahead for consecutive near-matches
j = i + 1
while j < len(segments):
seg_j = segments[j]
next_word = re.sub(r'[^a-z]', '', seg_j.get('word', '').lower())
if not next_word:
j += 1
continue
similarity = sim_fn(word, next_word)
if similarity >= 0.88: # β‰₯88% similar = stutter
cuts.append((seg_i['start'], seg_i['end']))
stutters_found += 1
i = j
j += 1
else:
break
i += 1
if not cuts:
return audio, 0
out = self._build_with_crossfade(audio, cuts, sr, fill_tone=True)
print(f"[Denoiser] βœ… Removed {stutters_found} stutters")
return out, stutters_found
except Exception as e:
logger.warning(f"Stutter removal failed: {e}")
return audio, 0
@staticmethod
def _word_similarity_fn():
"""Return best available string-similarity function."""
try:
import jellyfish
return jellyfish.jaro_winkler_similarity
except ImportError:
pass
try:
import editdistance
def _ed_ratio(a, b):
if not a and not b:
return 1.0
dist = editdistance.eval(a, b)
return 1.0 - dist / max(len(a), len(b))
return _ed_ratio
except ImportError:
pass
# Plain exact match as last resort
return lambda a, b: 1.0 if a == b else 0.0
@staticmethod
def _passes_confidence_gate(seg: dict) -> bool:
"""Return True if Whisper confidence is acceptable (or fields absent)."""
avg_logprob = seg.get('avg_logprob', None)
no_speech_prob = seg.get('no_speech_prob', None)
if avg_logprob is not None and avg_logprob < FILLER_MIN_LOGPROB:
return False
if no_speech_prob is not None and no_speech_prob > FILLER_MAX_NO_SPEECH:
return False
return True
# ══════════════════════════════════════════════════════════════════
# BREATH REDUCTION
# ══════════════════════════════════════════════════════════════════
def _reduce_breaths(self, audio: np.ndarray, sr: int) -> np.ndarray:
"""Non-stationary spectral gating β€” catches short broadband breath bursts."""
try:
import noisereduce as nr
cleaned = nr.reduce_noise(
y=audio, sr=sr,
stationary=False,
prop_decrease=0.60,
freq_mask_smooth_hz=400,
time_mask_smooth_ms=40,
).astype(np.float32)
print("[Denoiser] βœ… Breath reduction done")
return cleaned
except Exception as e:
logger.warning(f"Breath reduction failed: {e}")
return audio
# ══════════════════════════════════════════════════════════════════
# MOUTH SOUND REDUCTION
# ══════════════════════════════════════════════════════════════════
def _reduce_mouth_sounds(self, audio: np.ndarray, sr: int):
"""
Suppress very short, very high-amplitude transients (clicks/pops).
Threshold at 6.0 std to avoid removing real consonants (p, b, t).
"""
try:
result = audio.copy()
win = int(sr * 0.003) # 3ms window
hop = win // 2
rms_arr = np.array([
float(np.sqrt(np.mean(audio[i:i+win]**2)))
for i in range(0, len(audio) - win, hop)
])
if len(rms_arr) == 0:
return audio, 0
threshold = float(np.mean(rms_arr)) + 6.0 * float(np.std(rms_arr))
n_removed = 0
for idx, rms in enumerate(rms_arr):
if rms > threshold:
start = idx * hop
end = min(start + win, len(result))
result[start:end] *= np.linspace(1, 0, end - start)
n_removed += 1
if n_removed:
print(f"[Denoiser] βœ… Suppressed {n_removed} mouth sound transients")
return result.astype(np.float32), n_removed
except Exception as e:
logger.warning(f"Mouth sound reduction failed: {e}")
return audio, 0
# ══════════════════════════════════════════════════════════════════
# LONG SILENCE REMOVAL ← UPGRADED (adaptive threshold)
# ══════════════════════════════════════════════════════════════════
def _remove_long_silences(self, audio: np.ndarray, sr: int,
max_silence_sec: float = 1.5,
keep_pause_sec: float = 0.4) -> tuple:
"""
UPGRADE: Adaptive silence threshold.
Old code used a hardcoded RMS=0.008 β€” worked in quiet studios only.
New: threshold = 15th-percentile of per-frame RMS values.
This self-calibrates to the recording's actual noise floor,
so it works equally well in noisy rooms and near-silent studios.
Silences replaced with room tone + crossfade.
"""
try:
frame_len = int(sr * 0.02) # 20ms frames
# ── Compute per-frame RMS ─────────────────────────────────
n_frames = (len(audio) - frame_len) // frame_len
rms_frames = np.array([
float(np.sqrt(np.mean(audio[i*frame_len:(i+1)*frame_len]**2)))
for i in range(n_frames)
])
if len(rms_frames) == 0:
return audio, 0.0
# ── Adaptive threshold: 15th percentile of RMS ───────────
threshold = float(np.percentile(rms_frames, 15))
# Clamp: never go below 0.001 (avoids mis-classifying very quiet speech)
threshold = max(threshold, 0.001)
print(f"[Denoiser] Adaptive silence threshold: RMS={threshold:.5f}")
max_sil_frames = int(max_silence_sec / 0.02)
keep_frames = int(keep_pause_sec / 0.02)
kept = []
silence_count = 0
total_removed = 0
in_long_sil = False
for i in range(n_frames):
frame = audio[i*frame_len:(i+1)*frame_len]
rms = rms_frames[i]
if rms < threshold:
silence_count += 1
if silence_count <= max_sil_frames:
kept.append(frame)
else:
total_removed += frame_len
in_long_sil = True
else:
if in_long_sil:
pad = self._fill_with_room_tone(keep_frames * frame_len)
kept.append(pad)
in_long_sil = False
silence_count = 0
kept.append(frame)
# Tail of audio
tail_start = n_frames * frame_len
if tail_start < len(audio):
kept.append(audio[tail_start:])
if not kept:
return audio, 0.0
# Crossfade each frame join for smooth output
result = kept[0]
for seg in kept[1:]:
result = self._crossfade_join(result, seg, fade_ms=5.0, sr=sr)
removed_sec = total_removed / sr
if removed_sec > 0:
print(f"[Denoiser] βœ… Removed {removed_sec:.1f}s of long silences")
return result.astype(np.float32), removed_sec
except Exception as e:
logger.warning(f"Silence removal failed: {e}")
return audio, 0.0
# ══════════════════════════════════════════════════════════════════
# NORMALIZATION
# ══════════════════════════════════════════════════════════════════
def _normalise(self, audio: np.ndarray, sr: int) -> np.ndarray:
try:
import pyloudnorm as pyln
meter = pyln.Meter(sr)
loudness = meter.integrated_loudness(audio)
if np.isfinite(loudness) and loudness < 0:
audio = pyln.normalize.loudness(audio, loudness, TARGET_LOUDNESS)
print(f"[Denoiser] βœ… Normalized: {loudness:.1f} β†’ {TARGET_LOUDNESS} LUFS")
except Exception:
rms = np.sqrt(np.mean(audio**2))
if rms > 1e-9:
target_rms = 10 ** (TARGET_LOUDNESS / 20.0)
audio = audio * (target_rms / rms)
return np.clip(audio, -1.0, 1.0).astype(np.float32)
# ══════════════════════════════════════════════════════════════════
# HELPERS
# ══════════════════════════════════════════════════════════════════
def _to_wav(self, src: str, dst: str, target_sr: int):
result = subprocess.run([
"ffmpeg", "-y", "-i", src,
"-acodec", "pcm_s24le", "-ar", str(target_sr), dst
], capture_output=True)
if result.returncode != 0:
stderr = result.stderr.decode(errors='replace')
logger.warning(f"ffmpeg non-zero exit: {stderr[-400:]}")
# Fallback: soundfile passthrough
data, sr = sf.read(src, always_2d=True)
sf.write(dst, data, sr, format="WAV", subtype="PCM_24")
def _resample(self, audio: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray:
if orig_sr == target_sr:
return audio
try:
import librosa
return librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr)
except Exception:
length = int(len(audio) * target_sr / orig_sr)
return np.interp(
np.linspace(0, len(audio), length),
np.arange(len(audio)), audio
).astype(np.float32)