Spaces:
Runtime error
Runtime error
| """ | |
| Audio Post-Processing Module | |
| ============================ | |
| Handles audio post-processing, optimization, and quality enhancement. | |
| Implements cross-fading, noise reduction, and dynamic range optimization. | |
| Optimized for Hugging Face Spaces deployment. | |
| """ | |
| import logging | |
| import time | |
| from typing import Tuple, List, Optional | |
| import numpy as np | |
| import scipy.signal | |
| from scipy.ndimage import gaussian_filter1d | |
| logger = logging.getLogger(__name__) | |
| class AudioProcessor: | |
| """Advanced audio post-processor for TTS output optimization.""" | |
| def __init__(self, | |
| crossfade_duration: float = 0.1, | |
| sample_rate: int = 16000, | |
| apply_noise_gate: bool = True, | |
| normalize_audio: bool = True): | |
| """ | |
| Initialize audio processor. | |
| Args: | |
| crossfade_duration: Duration of crossfade between chunks in seconds | |
| sample_rate: Audio sample rate | |
| apply_noise_gate: Whether to apply noise gating | |
| normalize_audio: Whether to normalize audio levels | |
| """ | |
| self.crossfade_duration = crossfade_duration | |
| self.sample_rate = sample_rate | |
| self.apply_noise_gate = apply_noise_gate | |
| self.normalize_audio = normalize_audio | |
| # Calculate crossfade samples | |
| self.crossfade_samples = int(crossfade_duration * sample_rate) | |
| logger.info(f"AudioProcessor initialized with {crossfade_duration}s crossfade") | |
| def _create_crossfade_window(self, length: int) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Create crossfade windows for smooth transitions. | |
| Args: | |
| length: Length of crossfade in samples | |
| Returns: | |
| Tuple of (fade_out_window, fade_in_window) | |
| """ | |
| # Use raised cosine (Hann) window for smooth transitions | |
| window = np.hanning(2 * length) | |
| fade_out = window[:length] | |
| fade_in = window[length:] | |
| return fade_out, fade_in | |
| def crossfade_audio_segments(self, audio_segments: List[np.ndarray]) -> np.ndarray: | |
| """ | |
| Crossfade multiple audio segments for smooth concatenation. | |
| Args: | |
| audio_segments: List of audio arrays to concatenate | |
| Returns: | |
| Smoothly concatenated audio array | |
| """ | |
| if not audio_segments: | |
| return np.array([], dtype=np.int16) | |
| if len(audio_segments) == 1: | |
| return audio_segments[0] | |
| logger.debug(f"Crossfading {len(audio_segments)} audio segments") | |
| # Start with the first segment | |
| result = audio_segments[0].astype(np.float32) | |
| for i in range(1, len(audio_segments)): | |
| current_segment = audio_segments[i].astype(np.float32) | |
| # Determine crossfade length (limited by segment lengths) | |
| fade_length = min( | |
| self.crossfade_samples, | |
| len(result) // 2, | |
| len(current_segment) // 2 | |
| ) | |
| if fade_length > 0: | |
| # Create crossfade windows | |
| fade_out, fade_in = self._create_crossfade_window(fade_length) | |
| # Apply crossfade | |
| # Fade out end of result | |
| result[-fade_length:] *= fade_out | |
| # Fade in beginning of current segment | |
| current_segment[:fade_length] *= fade_in | |
| # Overlap and add | |
| overlap = result[-fade_length:] + current_segment[:fade_length] | |
| # Concatenate: result (except overlapped part) + overlap + current (except overlapped part) | |
| result = np.concatenate([ | |
| result[:-fade_length], | |
| overlap, | |
| current_segment[fade_length:] | |
| ]) | |
| else: | |
| # No crossfade possible, simple concatenation | |
| result = np.concatenate([result, current_segment]) | |
| return result.astype(np.int16) | |
| def _apply_noise_gate(self, audio: np.ndarray, threshold_db: float = -40.0) -> np.ndarray: | |
| """ | |
| Apply noise gate to reduce background noise. | |
| Args: | |
| audio: Input audio array | |
| threshold_db: Noise gate threshold in dB | |
| Returns: | |
| Noise-gated audio | |
| """ | |
| # Convert to float for processing | |
| audio_float = audio.astype(np.float32) | |
| # Calculate RMS energy in sliding window | |
| window_size = int(0.01 * self.sample_rate) # 10ms window | |
| if len(audio_float) < window_size: | |
| # For very short audio, return as-is | |
| return audio.astype(np.int16) | |
| # Pad audio for edge cases | |
| padded_audio = np.pad(audio_float, window_size//2, mode='reflect') | |
| # Calculate RMS energy | |
| rms = np.sqrt(np.convolve(padded_audio**2, | |
| np.ones(window_size)/window_size, | |
| mode='valid')) | |
| # Ensure rms has the same length as original audio | |
| if len(rms) != len(audio_float): | |
| # Resize to match original audio length | |
| from scipy.ndimage import zoom | |
| zoom_factor = len(audio_float) / len(rms) | |
| rms = zoom(rms, zoom_factor) | |
| # Convert to dB | |
| rms_db = 20 * np.log10(np.maximum(rms, 1e-10)) | |
| # Create gate mask | |
| threshold_linear = 10**(threshold_db/20) | |
| gate_mask = (rms / np.max(rms)) > threshold_linear | |
| # Smooth the gate mask to avoid clicks | |
| gate_mask = gaussian_filter1d(gate_mask.astype(float), sigma=2) | |
| # Ensure gate_mask has the same length as audio | |
| if len(gate_mask) != len(audio_float): | |
| from scipy.ndimage import zoom | |
| zoom_factor = len(audio_float) / len(gate_mask) | |
| gate_mask = zoom(gate_mask, zoom_factor) | |
| # Apply gate | |
| gated_audio = audio_float * gate_mask | |
| return gated_audio.astype(np.int16) | |
| def _normalize_audio(self, audio: np.ndarray, target_peak: float = 0.95) -> np.ndarray: | |
| """ | |
| Normalize audio to target peak level. | |
| Args: | |
| audio: Input audio array | |
| target_peak: Target peak level (0.0 to 1.0) | |
| Returns: | |
| Normalized audio | |
| """ | |
| audio_float = audio.astype(np.float32) | |
| # Find current peak | |
| current_peak = np.max(np.abs(audio_float)) | |
| if current_peak > 0: | |
| # Calculate scaling factor | |
| scale_factor = (target_peak * 32767) / current_peak | |
| # Apply scaling | |
| normalized = audio_float * scale_factor | |
| # Clip to prevent overflow | |
| normalized = np.clip(normalized, -32767, 32767) | |
| return normalized.astype(np.int16) | |
| return audio | |
| def _apply_dynamic_range_compression(self, audio: np.ndarray, | |
| ratio: float = 4.0, | |
| threshold_db: float = -12.0) -> np.ndarray: | |
| """ | |
| Apply dynamic range compression to even out volume levels. | |
| Args: | |
| audio: Input audio array | |
| ratio: Compression ratio | |
| threshold_db: Compression threshold in dB | |
| Returns: | |
| Compressed audio | |
| """ | |
| audio_float = audio.astype(np.float32) / 32767.0 | |
| # Calculate envelope | |
| envelope = np.abs(audio_float) | |
| envelope = gaussian_filter1d(envelope, sigma=int(0.001 * self.sample_rate)) | |
| # Convert to dB | |
| envelope_db = 20 * np.log10(np.maximum(envelope, 1e-10)) | |
| # Calculate gain reduction | |
| gain_reduction = np.zeros_like(envelope_db) | |
| over_threshold = envelope_db > threshold_db | |
| gain_reduction[over_threshold] = (envelope_db[over_threshold] - threshold_db) / ratio | |
| # Convert back to linear | |
| gain_linear = 10**(-gain_reduction / 20) | |
| # Apply compression | |
| compressed = audio_float * gain_linear | |
| return (compressed * 32767).astype(np.int16) | |
| def process_audio(self, audio: np.ndarray, | |
| apply_compression: bool = False, | |
| compression_ratio: float = 3.0) -> np.ndarray: | |
| """ | |
| Apply full audio processing pipeline. | |
| Args: | |
| audio: Input audio array | |
| apply_compression: Whether to apply dynamic range compression | |
| compression_ratio: Compression ratio if compression is applied | |
| Returns: | |
| Processed audio | |
| """ | |
| start_time = time.time() | |
| if len(audio) == 0: | |
| return audio | |
| processed_audio = audio.copy() | |
| try: | |
| # Apply noise gate | |
| if self.apply_noise_gate: | |
| processed_audio = self._apply_noise_gate(processed_audio) | |
| # Apply compression if requested | |
| if apply_compression: | |
| processed_audio = self._apply_dynamic_range_compression( | |
| processed_audio, ratio=compression_ratio | |
| ) | |
| # Normalize audio | |
| if self.normalize_audio: | |
| processed_audio = self._normalize_audio(processed_audio) | |
| processing_time = time.time() - start_time | |
| logger.debug(f"Audio processed in {processing_time:.3f}s") | |
| return processed_audio | |
| except Exception as e: | |
| logger.error(f"Audio processing failed: {e}") | |
| return audio # Return original audio on failure | |
| def process_and_concatenate(self, audio_segments: List[np.ndarray], | |
| apply_processing: bool = True) -> np.ndarray: | |
| """ | |
| Process and concatenate multiple audio segments. | |
| Args: | |
| audio_segments: List of audio arrays | |
| apply_processing: Whether to apply full processing pipeline | |
| Returns: | |
| Processed and concatenated audio | |
| """ | |
| if not audio_segments: | |
| return np.array([], dtype=np.int16) | |
| # First, crossfade the segments | |
| concatenated = self.crossfade_audio_segments(audio_segments) | |
| # Then apply processing if requested | |
| if apply_processing: | |
| concatenated = self.process_audio(concatenated) | |
| return concatenated | |
| def add_silence(self, audio: np.ndarray, | |
| start_silence: float = 0.1, | |
| end_silence: float = 0.1) -> np.ndarray: | |
| """ | |
| Add silence padding to audio. | |
| Args: | |
| audio: Input audio array | |
| start_silence: Silence duration at start in seconds | |
| end_silence: Silence duration at end in seconds | |
| Returns: | |
| Audio with added silence | |
| """ | |
| start_samples = int(start_silence * self.sample_rate) | |
| end_samples = int(end_silence * self.sample_rate) | |
| start_pad = np.zeros(start_samples, dtype=audio.dtype) | |
| end_pad = np.zeros(end_samples, dtype=audio.dtype) | |
| return np.concatenate([start_pad, audio, end_pad]) | |
| def get_audio_stats(self, audio: np.ndarray) -> dict: | |
| """ | |
| Get audio statistics for quality analysis. | |
| Args: | |
| audio: Audio array to analyze | |
| Returns: | |
| Dictionary of audio statistics | |
| """ | |
| if len(audio) == 0: | |
| return {"error": "Empty audio"} | |
| audio_float = audio.astype(np.float32) | |
| return { | |
| "duration_seconds": len(audio) / self.sample_rate, | |
| "sample_count": len(audio), | |
| "peak_amplitude": np.max(np.abs(audio_float)), | |
| "rms_level": np.sqrt(np.mean(audio_float**2)), | |
| "dynamic_range_db": 20 * np.log10(np.max(np.abs(audio_float)) / | |
| (np.sqrt(np.mean(audio_float**2)) + 1e-10)), | |
| "zero_crossings": np.sum(np.diff(np.signbit(audio_float))), | |
| "dc_offset": np.mean(audio_float) | |
| } | |