"""Audio concatenation utility for combining multiple audio chunks into a single audio file.""" import numpy as np from typing import List, Tuple, Optional import gradio as gr class AudioConcatenator: """Handles concatenation of multiple audio chunks.""" def __init__(self, silence_duration: float = 0.5, fade_duration: float = 0.1): """ Initialize the audio concatenator. Args: silence_duration: Duration of silence between chunks (seconds) fade_duration: Duration of fade in/out effects (seconds) """ self.silence_duration = silence_duration self.fade_duration = fade_duration def concatenate_audio_chunks( self, audio_chunks: List[Tuple[int, np.ndarray]], progress_callback: Optional[callable] = None ) -> Tuple[int, np.ndarray]: """ Concatenate multiple audio chunks into a single audio file. Args: audio_chunks: List of (sample_rate, audio_data) tuples progress_callback: Optional callback for progress updates Returns: Tuple of (sample_rate, concatenated_audio_data) """ if not audio_chunks: raise gr.Error("No audio chunks to concatenate") if len(audio_chunks) == 1: return audio_chunks[0] if progress_callback: progress_callback(0.1, desc="Preparing audio concatenation...") # Verify all chunks have the same sample rate sample_rates = [chunk[0] for chunk in audio_chunks] if len(set(sample_rates)) > 1: raise gr.Error(f"Inconsistent sample rates found: {set(sample_rates)}. All chunks must have the same sample rate.") sample_rate = sample_rates[0] if progress_callback: progress_callback(0.2, desc="Normalizing audio chunks...") # Normalize and prepare audio data normalized_chunks = [] for i, (_, audio_data) in enumerate(audio_chunks): # Ensure audio data is in the correct format if audio_data.ndim == 1: normalized_audio = audio_data elif audio_data.ndim == 2: # Convert stereo to mono by averaging channels normalized_audio = np.mean(audio_data, axis=1) else: raise gr.Error(f"Unsupported audio format in chunk {i + 1}: {audio_data.shape}") # Normalize audio levels normalized_audio = self._normalize_audio(normalized_audio) # Apply fade effects normalized_audio = self._apply_fade_effects(normalized_audio, sample_rate) normalized_chunks.append(normalized_audio) if progress_callback: progress = 0.2 + (0.5 * (i + 1) / len(audio_chunks)) progress_callback(progress, desc=f"Processed chunk {i + 1}/{len(audio_chunks)}") if progress_callback: progress_callback(0.7, desc="Creating silence segments...") # Create silence segments silence_samples = int(self.silence_duration * sample_rate) silence = np.zeros(silence_samples, dtype=np.float32) if progress_callback: progress_callback(0.8, desc="Concatenating audio segments...") # Concatenate all chunks with silence in between concatenated_segments = [] for i, chunk in enumerate(normalized_chunks): concatenated_segments.append(chunk) # Add silence between chunks (but not after the last chunk) if i < len(normalized_chunks) - 1: concatenated_segments.append(silence) if progress_callback: progress = 0.8 + (0.15 * (i + 1) / len(normalized_chunks)) progress_callback(progress, desc=f"Concatenated {i + 1}/{len(normalized_chunks)} chunks") # Combine all segments final_audio = np.concatenate(concatenated_segments) if progress_callback: progress_callback(0.95, desc="Finalizing audio...") # Final normalization and cleanup final_audio = self._normalize_audio(final_audio) final_audio = self._remove_clicks_and_pops(final_audio) if progress_callback: progress_callback(1.0, desc="Audio concatenation complete!") return sample_rate, final_audio def _normalize_audio(self, audio_data: np.ndarray) -> np.ndarray: """Normalize audio to prevent clipping.""" # Find the maximum absolute value max_val = np.max(np.abs(audio_data)) if max_val == 0: return audio_data # Normalize to 95% of maximum to leave some headroom normalized = audio_data * (0.95 / max_val) return normalized.astype(np.float32) def _apply_fade_effects(self, audio_data: np.ndarray, sample_rate: int) -> np.ndarray: """Apply fade in and fade out effects to reduce pops and clicks.""" fade_samples = int(self.fade_duration * sample_rate) if len(audio_data) < 2 * fade_samples: # If audio is too short for fade effects, return as-is return audio_data audio_with_fades = audio_data.copy() # Apply fade in fade_in = np.linspace(0, 1, fade_samples) audio_with_fades[:fade_samples] *= fade_in # Apply fade out fade_out = np.linspace(1, 0, fade_samples) audio_with_fades[-fade_samples:] *= fade_out return audio_with_fades def _remove_clicks_and_pops(self, audio_data: np.ndarray) -> np.ndarray: """Apply basic filtering to remove clicks and pops.""" try: # Simple high-pass filter to remove DC offset and low-frequency artifacts from scipy import signal # Design a high-pass filter (removes frequencies below 80 Hz) # This helps remove some pops and clicks while preserving speech sos = signal.butter(2, 80, btype='highpass', fs=22050, output='sos') filtered_audio = signal.sosfilt(sos, audio_data) return filtered_audio.astype(np.float32) except ImportError: # If scipy is not available, return audio as-is return audio_data.astype(np.float32) def get_concatenation_info(self, audio_chunks: List[Tuple[int, np.ndarray]]) -> dict: """Get information about the concatenation process.""" if not audio_chunks: return {} total_duration = 0 total_silence_duration = 0 chunk_durations = [] sample_rate = audio_chunks[0][0] for _, audio_data in audio_chunks: duration = len(audio_data) / sample_rate chunk_durations.append(duration) total_duration += duration # Add silence duration (between chunks) if len(audio_chunks) > 1: total_silence_duration = (len(audio_chunks) - 1) * self.silence_duration total_duration += total_silence_duration return { "num_chunks": len(audio_chunks), "total_duration": total_duration, "total_silence_duration": total_silence_duration, "chunk_durations": chunk_durations, "average_chunk_duration": np.mean(chunk_durations), "sample_rate": sample_rate }