Spaces:
Running
Running
| """ | |
| BarVox Audio Processing API - Audio Preprocessing Functions | |
| """ | |
| import logging | |
| import numpy as np | |
| import torch | |
| import torchaudio.transforms as T | |
| from scipy.signal import butter, filtfilt | |
| from typing import Optional | |
| logger = logging.getLogger(__name__) | |
| def butter_highpass(cutoff, fs, order=5): | |
| """Create a Butterworth high-pass filter.""" | |
| nyq = 0.5 * fs | |
| normal_cutoff = cutoff / nyq | |
| b, a = butter(order, normal_cutoff, btype='high', analog=False) | |
| return b, a | |
| def butter_bandpass(lowcut, highcut, fs, order=5): | |
| """Create a Butterworth band-pass filter.""" | |
| nyq = 0.5 * fs | |
| low = lowcut / nyq | |
| high = highcut / nyq | |
| b, a = butter(order, [low, high], btype='band') | |
| return b, a | |
| def highpass_filter(data, cutoff=30, fs=16000, order=2): | |
| """Apply a high-pass filter to remove low-frequency noise.""" | |
| b, a = butter_highpass(cutoff, fs, order=order) | |
| y = filtfilt(b, a, data).copy() | |
| return y | |
| def rms_normalize(waveform_np, target_dbfs=-20.0): | |
| """Normalize waveform to target dBFS level.""" | |
| rms = np.sqrt(np.mean(np.square(waveform_np))) | |
| if rms == 0: | |
| return waveform_np | |
| scalar = (10 ** (target_dbfs / 20)) / rms | |
| return waveform_np * scalar | |
| def resample_to_16khz_mono(waveform: torch.Tensor, orig_sample_rate: int) -> torch.Tensor: | |
| """Resample audio to 16kHz mono.""" | |
| if waveform.shape[0] > 1: | |
| waveform = torch.mean(waveform, dim=0, keepdim=True) | |
| if orig_sample_rate != 16000: | |
| resampler = T.Resample(orig_freq=orig_sample_rate, new_freq=16000) | |
| waveform = resampler(waveform) | |
| return waveform | |
| def apply_noise_reduction(waveform_np, sample_rate=16000): | |
| """Apply noise reduction.""" | |
| waveform_np = waveform_np - np.mean(waveform_np) | |
| waveform_np = highpass_filter(waveform_np, cutoff=30, fs=sample_rate, order=2) | |
| return waveform_np | |
| def apply_normalization(waveform_np, target_dbfs=-20.0): | |
| """Apply RMS normalization.""" | |
| return rms_normalize(waveform_np, target_dbfs=target_dbfs) | |
| def apply_dynamic_compression(waveform_np, threshold_ratio=0.5, ratio=3.0): | |
| """Apply dynamic range compression.""" | |
| rms = np.sqrt(np.mean(np.square(waveform_np))) | |
| threshold = rms * threshold_ratio | |
| compressed = np.copy(waveform_np) | |
| mask = np.abs(waveform_np) > threshold | |
| compressed[mask] = np.sign(waveform_np[mask]) * ( | |
| threshold + (np.abs(waveform_np[mask]) - threshold) / ratio | |
| ) | |
| gain = np.max(np.abs(waveform_np)) / np.max(np.abs(compressed)) if np.max(np.abs(compressed)) > 0 else 1.0 | |
| compressed = compressed * min(gain, 2.0) | |
| logger.info(f"Applied dynamic compression (threshold_ratio={threshold_ratio}, ratio={ratio})") | |
| return compressed | |
| def apply_transient_enhancement(waveform_np, sample_rate=16000, attack_boost=1.5): | |
| """Enhance transients.""" | |
| window_size = int(sample_rate * 0.010) | |
| envelope = np.zeros_like(waveform_np) | |
| for i in range(len(waveform_np)): | |
| start = max(0, i - window_size // 2) | |
| end = min(len(waveform_np), i + window_size // 2) | |
| envelope[i] = np.sqrt(np.mean(np.square(waveform_np[start:end]))) | |
| envelope_diff = np.diff(envelope, prepend=envelope[0]) | |
| transient_mask = envelope_diff > (np.std(envelope_diff) * 0.5) | |
| boost = np.ones_like(waveform_np) | |
| boost[transient_mask] = attack_boost | |
| from scipy.ndimage import gaussian_filter1d | |
| boost_smooth = gaussian_filter1d(boost, sigma=window_size / 4) | |
| enhanced = waveform_np * boost_smooth | |
| max_val = np.max(np.abs(enhanced)) | |
| if max_val > 0: | |
| enhanced = enhanced / max_val * np.max(np.abs(waveform_np)) | |
| logger.info(f"Applied transient enhancement (boost={attack_boost})") | |
| return enhanced | |
| def apply_high_frequency_boost(waveform_np, sample_rate=16000, boost_db=6.0): | |
| """Apply high-frequency boost (EQ).""" | |
| b, a = butter_bandpass(2000, 6000, sample_rate, order=4) | |
| presence_band = filtfilt(b, a, waveform_np) | |
| boost_factor = 10 ** (boost_db / 20.0) | |
| boosted = waveform_np + presence_band * (boost_factor - 1.0) | |
| max_val = np.max(np.abs(boosted)) | |
| if max_val > 1.0: | |
| boosted = boosted / max_val | |
| logger.info(f"Applied high-frequency boost (+{boost_db} dB in 2-6 kHz)") | |
| return boosted | |
| def apply_silero_vad( | |
| waveform: torch.Tensor, | |
| sample_rate: int = 16000, | |
| threshold: float = 0.35, | |
| min_speech_duration_ms: int = 60, | |
| min_silence_duration_ms: int = 300, | |
| padding_before_ms: int = 180, | |
| padding_after_ms: int = 900, | |
| max_speech_duration_s: float = 0, | |
| chunk_selection: str = 'longest' | |
| ) -> Optional[torch.Tensor]: | |
| """Apply Silero VAD with support for 'first', 'longest', or 'last' chunk selection.""" | |
| try: | |
| from model_loader import get_models | |
| models = get_models() | |
| model = models['silero_vad'] | |
| utils = models['silero_utils'] | |
| (get_speech_timestamps, _, _, _, _) = utils | |
| if isinstance(waveform, torch.Tensor): | |
| waveform_np = waveform.squeeze().cpu().numpy() | |
| else: | |
| waveform_np = waveform | |
| total_duration_ms = (len(waveform_np) / sample_rate) * 1000 | |
| speech_timestamps = get_speech_timestamps( | |
| waveform_np, # This was the input for 'x' | |
| model, | |
| sampling_rate=sample_rate, # This was the input for 'sr' or 'r' | |
| threshold=threshold, | |
| min_speech_duration_ms=min_speech_duration_ms, | |
| min_silence_duration_ms=min_silence_duration_ms, | |
| speech_pad_ms=0, | |
| max_speech_duration_s=max_speech_duration_s if max_speech_duration_s > 0 else float('inf'), | |
| return_seconds=False | |
| ) | |
| if not speech_timestamps: | |
| logger.warning("No speech detected by Silero VAD") | |
| return None | |
| # Original logic was: | |
| # if chunk_selection == 'first': | |
| # selected_chunk = speech_timestamps[0] | |
| # else: | |
| # selected_chunk = max(speech_timestamps, key=lambda x: x['end'] - x['start']) | |
| # Updated to include 'last' option | |
| if chunk_selection == 'first': | |
| selected_chunk = speech_timestamps[0] | |
| elif chunk_selection == 'last': | |
| selected_chunk = speech_timestamps[-1] | |
| else: # 'longest' (default) | |
| selected_chunk = max(speech_timestamps, key=lambda x: x['end'] - x['start']) | |
| vad_start_ms = (selected_chunk['start'] / sample_rate) * 1000 | |
| vad_end_ms = (selected_chunk['end'] / sample_rate) * 1000 | |
| pad_before_samples = int((padding_before_ms / 1000) * sample_rate) | |
| pad_after_samples = int((padding_after_ms / 1000) * sample_rate) | |
| start_idx = max(0, selected_chunk['start'] - pad_before_samples) | |
| end_idx = min(len(waveform_np), selected_chunk['end'] + pad_after_samples) | |
| trimmed_waveform_np = waveform_np.copy()[start_idx:end_idx] | |
| trimmed_tensor = torch.from_numpy(trimmed_waveform_np).float() | |
| final_start_ms = (start_idx / sample_rate) * 1000 | |
| final_end_ms = (end_idx / sample_rate) * 1000 | |
| final_duration_ms = final_end_ms - final_start_ms | |
| logger.info(f"VAD: Original={total_duration_ms:.0f}ms | Speech=[{vad_start_ms:.0f}-{vad_end_ms:.0f}]ms | Final=[{final_start_ms:.0f}-{final_end_ms:.0f}]ms ({final_duration_ms:.0f}ms) | Selection={chunk_selection}") | |
| return trimmed_tensor | |
| except Exception as e: | |
| logger.error(f"Error in Silero VAD: {e}") | |
| return None |