Spaces:
Sleeping
Sleeping
File size: 7,694 Bytes
91e586d e9fc350 91e586d e9fc350 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
"""Audio concatenation utility for combining multiple audio chunks into a single audio file."""
import numpy as np
from typing import List, Tuple, Optional
import gradio as gr
class AudioConcatenator:
"""Handles concatenation of multiple audio chunks."""
def __init__(self, silence_duration: float = 0.5, fade_duration: float = 0.1):
"""
Initialize the audio concatenator.
Args:
silence_duration: Duration of silence between chunks (seconds)
fade_duration: Duration of fade in/out effects (seconds)
"""
self.silence_duration = silence_duration
self.fade_duration = fade_duration
def concatenate_audio_chunks(
self,
audio_chunks: List[Tuple[int, np.ndarray]],
progress_callback: Optional[callable] = None
) -> Tuple[int, np.ndarray]:
"""
Concatenate multiple audio chunks into a single audio file.
Args:
audio_chunks: List of (sample_rate, audio_data) tuples
progress_callback: Optional callback for progress updates
Returns:
Tuple of (sample_rate, concatenated_audio_data)
"""
if not audio_chunks:
raise gr.Error("No audio chunks to concatenate")
if len(audio_chunks) == 1:
return audio_chunks[0]
if progress_callback:
progress_callback(0.1, desc="Preparing audio concatenation...")
# Verify all chunks have the same sample rate
sample_rates = [chunk[0] for chunk in audio_chunks]
if len(set(sample_rates)) > 1:
raise gr.Error(f"Inconsistent sample rates found: {set(sample_rates)}. All chunks must have the same sample rate.")
sample_rate = sample_rates[0]
if progress_callback:
progress_callback(0.2, desc="Normalizing audio chunks...")
# Normalize and prepare audio data
normalized_chunks = []
for i, (_, audio_data) in enumerate(audio_chunks):
# Ensure audio data is in the correct format
if audio_data.ndim == 1:
normalized_audio = audio_data
elif audio_data.ndim == 2:
# Convert stereo to mono by averaging channels
normalized_audio = np.mean(audio_data, axis=1)
else:
raise gr.Error(f"Unsupported audio format in chunk {i + 1}: {audio_data.shape}")
# Normalize audio levels
normalized_audio = self._normalize_audio(normalized_audio)
# Apply fade effects
normalized_audio = self._apply_fade_effects(normalized_audio, sample_rate)
normalized_chunks.append(normalized_audio)
if progress_callback:
progress = 0.2 + (0.5 * (i + 1) / len(audio_chunks))
progress_callback(progress, desc=f"Processed chunk {i + 1}/{len(audio_chunks)}")
if progress_callback:
progress_callback(0.7, desc="Creating silence segments...")
# Create silence segments
silence_samples = int(self.silence_duration * sample_rate)
silence = np.zeros(silence_samples, dtype=np.float32)
if progress_callback:
progress_callback(0.8, desc="Concatenating audio segments...")
# Concatenate all chunks with silence in between
concatenated_segments = []
for i, chunk in enumerate(normalized_chunks):
concatenated_segments.append(chunk)
# Add silence between chunks (but not after the last chunk)
if i < len(normalized_chunks) - 1:
concatenated_segments.append(silence)
if progress_callback:
progress = 0.8 + (0.15 * (i + 1) / len(normalized_chunks))
progress_callback(progress, desc=f"Concatenated {i + 1}/{len(normalized_chunks)} chunks")
# Combine all segments
final_audio = np.concatenate(concatenated_segments)
if progress_callback:
progress_callback(0.95, desc="Finalizing audio...")
# Final normalization and cleanup
final_audio = self._normalize_audio(final_audio)
final_audio = self._remove_clicks_and_pops(final_audio)
if progress_callback:
progress_callback(1.0, desc="Audio concatenation complete!")
return sample_rate, final_audio
def _normalize_audio(self, audio_data: np.ndarray) -> np.ndarray:
"""Normalize audio to prevent clipping."""
# Find the maximum absolute value
max_val = np.max(np.abs(audio_data))
if max_val == 0:
return audio_data
# Normalize to 95% of maximum to leave some headroom
normalized = audio_data * (0.95 / max_val)
return normalized.astype(np.float32)
def _apply_fade_effects(self, audio_data: np.ndarray, sample_rate: int) -> np.ndarray:
"""Apply fade in and fade out effects to reduce pops and clicks."""
fade_samples = int(self.fade_duration * sample_rate)
if len(audio_data) < 2 * fade_samples:
# If audio is too short for fade effects, return as-is
return audio_data
audio_with_fades = audio_data.copy()
# Apply fade in
fade_in = np.linspace(0, 1, fade_samples)
audio_with_fades[:fade_samples] *= fade_in
# Apply fade out
fade_out = np.linspace(1, 0, fade_samples)
audio_with_fades[-fade_samples:] *= fade_out
return audio_with_fades
def _remove_clicks_and_pops(self, audio_data: np.ndarray) -> np.ndarray:
"""Apply basic filtering to remove clicks and pops."""
try:
# Simple high-pass filter to remove DC offset and low-frequency artifacts
from scipy import signal
# Design a high-pass filter (removes frequencies below 80 Hz)
# This helps remove some pops and clicks while preserving speech
sos = signal.butter(2, 80, btype='highpass', fs=22050, output='sos')
filtered_audio = signal.sosfilt(sos, audio_data)
return filtered_audio.astype(np.float32)
except ImportError:
# If scipy is not available, return audio as-is
return audio_data.astype(np.float32)
def get_concatenation_info(self, audio_chunks: List[Tuple[int, np.ndarray]]) -> dict:
"""Get information about the concatenation process."""
if not audio_chunks:
return {}
total_duration = 0
total_silence_duration = 0
chunk_durations = []
sample_rate = audio_chunks[0][0]
for _, audio_data in audio_chunks:
duration = len(audio_data) / sample_rate
chunk_durations.append(duration)
total_duration += duration
# Add silence duration (between chunks)
if len(audio_chunks) > 1:
total_silence_duration = (len(audio_chunks) - 1) * self.silence_duration
total_duration += total_silence_duration
return {
"num_chunks": len(audio_chunks),
"total_duration": total_duration,
"total_silence_duration": total_silence_duration,
"chunk_durations": chunk_durations,
"average_chunk_duration": np.mean(chunk_durations),
"sample_rate": sample_rate
}
|