Spaces:
Sleeping
Sleeping
| """Audio concatenation utility for combining multiple audio chunks into a single audio file.""" | |
| import numpy as np | |
| from typing import List, Tuple, Optional | |
| import gradio as gr | |
| class AudioConcatenator: | |
| """Handles concatenation of multiple audio chunks.""" | |
| def __init__(self, silence_duration: float = 0.5, fade_duration: float = 0.1): | |
| """ | |
| Initialize the audio concatenator. | |
| Args: | |
| silence_duration: Duration of silence between chunks (seconds) | |
| fade_duration: Duration of fade in/out effects (seconds) | |
| """ | |
| self.silence_duration = silence_duration | |
| self.fade_duration = fade_duration | |
| def concatenate_audio_chunks( | |
| self, | |
| audio_chunks: List[Tuple[int, np.ndarray]], | |
| progress_callback: Optional[callable] = None | |
| ) -> Tuple[int, np.ndarray]: | |
| """ | |
| Concatenate multiple audio chunks into a single audio file. | |
| Args: | |
| audio_chunks: List of (sample_rate, audio_data) tuples | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| Tuple of (sample_rate, concatenated_audio_data) | |
| """ | |
| if not audio_chunks: | |
| raise gr.Error("No audio chunks to concatenate") | |
| if len(audio_chunks) == 1: | |
| return audio_chunks[0] | |
| if progress_callback: | |
| progress_callback(0.1, desc="Preparing audio concatenation...") | |
| # Verify all chunks have the same sample rate | |
| sample_rates = [chunk[0] for chunk in audio_chunks] | |
| if len(set(sample_rates)) > 1: | |
| raise gr.Error(f"Inconsistent sample rates found: {set(sample_rates)}. All chunks must have the same sample rate.") | |
| sample_rate = sample_rates[0] | |
| if progress_callback: | |
| progress_callback(0.2, desc="Normalizing audio chunks...") | |
| # Normalize and prepare audio data | |
| normalized_chunks = [] | |
| for i, (_, audio_data) in enumerate(audio_chunks): | |
| # Ensure audio data is in the correct format | |
| if audio_data.ndim == 1: | |
| normalized_audio = audio_data | |
| elif audio_data.ndim == 2: | |
| # Convert stereo to mono by averaging channels | |
| normalized_audio = np.mean(audio_data, axis=1) | |
| else: | |
| raise gr.Error(f"Unsupported audio format in chunk {i + 1}: {audio_data.shape}") | |
| # Normalize audio levels | |
| normalized_audio = self._normalize_audio(normalized_audio) | |
| # Apply fade effects | |
| normalized_audio = self._apply_fade_effects(normalized_audio, sample_rate) | |
| normalized_chunks.append(normalized_audio) | |
| if progress_callback: | |
| progress = 0.2 + (0.5 * (i + 1) / len(audio_chunks)) | |
| progress_callback(progress, desc=f"Processed chunk {i + 1}/{len(audio_chunks)}") | |
| if progress_callback: | |
| progress_callback(0.7, desc="Creating silence segments...") | |
| # Create silence segments | |
| silence_samples = int(self.silence_duration * sample_rate) | |
| silence = np.zeros(silence_samples, dtype=np.float32) | |
| if progress_callback: | |
| progress_callback(0.8, desc="Concatenating audio segments...") | |
| # Concatenate all chunks with silence in between | |
| concatenated_segments = [] | |
| for i, chunk in enumerate(normalized_chunks): | |
| concatenated_segments.append(chunk) | |
| # Add silence between chunks (but not after the last chunk) | |
| if i < len(normalized_chunks) - 1: | |
| concatenated_segments.append(silence) | |
| if progress_callback: | |
| progress = 0.8 + (0.15 * (i + 1) / len(normalized_chunks)) | |
| progress_callback(progress, desc=f"Concatenated {i + 1}/{len(normalized_chunks)} chunks") | |
| # Combine all segments | |
| final_audio = np.concatenate(concatenated_segments) | |
| if progress_callback: | |
| progress_callback(0.95, desc="Finalizing audio...") | |
| # Final normalization and cleanup | |
| final_audio = self._normalize_audio(final_audio) | |
| final_audio = self._remove_clicks_and_pops(final_audio) | |
| if progress_callback: | |
| progress_callback(1.0, desc="Audio concatenation complete!") | |
| return sample_rate, final_audio | |
| def _normalize_audio(self, audio_data: np.ndarray) -> np.ndarray: | |
| """Normalize audio to prevent clipping.""" | |
| # Find the maximum absolute value | |
| max_val = np.max(np.abs(audio_data)) | |
| if max_val == 0: | |
| return audio_data | |
| # Normalize to 95% of maximum to leave some headroom | |
| normalized = audio_data * (0.95 / max_val) | |
| return normalized.astype(np.float32) | |
| def _apply_fade_effects(self, audio_data: np.ndarray, sample_rate: int) -> np.ndarray: | |
| """Apply fade in and fade out effects to reduce pops and clicks.""" | |
| fade_samples = int(self.fade_duration * sample_rate) | |
| if len(audio_data) < 2 * fade_samples: | |
| # If audio is too short for fade effects, return as-is | |
| return audio_data | |
| audio_with_fades = audio_data.copy() | |
| # Apply fade in | |
| fade_in = np.linspace(0, 1, fade_samples) | |
| audio_with_fades[:fade_samples] *= fade_in | |
| # Apply fade out | |
| fade_out = np.linspace(1, 0, fade_samples) | |
| audio_with_fades[-fade_samples:] *= fade_out | |
| return audio_with_fades | |
| def _remove_clicks_and_pops(self, audio_data: np.ndarray) -> np.ndarray: | |
| """Apply basic filtering to remove clicks and pops.""" | |
| try: | |
| # Simple high-pass filter to remove DC offset and low-frequency artifacts | |
| from scipy import signal | |
| # Design a high-pass filter (removes frequencies below 80 Hz) | |
| # This helps remove some pops and clicks while preserving speech | |
| sos = signal.butter(2, 80, btype='highpass', fs=22050, output='sos') | |
| filtered_audio = signal.sosfilt(sos, audio_data) | |
| return filtered_audio.astype(np.float32) | |
| except ImportError: | |
| # If scipy is not available, return audio as-is | |
| return audio_data.astype(np.float32) | |
| def get_concatenation_info(self, audio_chunks: List[Tuple[int, np.ndarray]]) -> dict: | |
| """Get information about the concatenation process.""" | |
| if not audio_chunks: | |
| return {} | |
| total_duration = 0 | |
| total_silence_duration = 0 | |
| chunk_durations = [] | |
| sample_rate = audio_chunks[0][0] | |
| for _, audio_data in audio_chunks: | |
| duration = len(audio_data) / sample_rate | |
| chunk_durations.append(duration) | |
| total_duration += duration | |
| # Add silence duration (between chunks) | |
| if len(audio_chunks) > 1: | |
| total_silence_duration = (len(audio_chunks) - 1) * self.silence_duration | |
| total_duration += total_silence_duration | |
| return { | |
| "num_chunks": len(audio_chunks), | |
| "total_duration": total_duration, | |
| "total_silence_duration": total_silence_duration, | |
| "chunk_durations": chunk_durations, | |
| "average_chunk_duration": np.mean(chunk_durations), | |
| "sample_rate": sample_rate | |
| } | |