pdf_explainer / src /processors /audio_concatenator.py
spagestic's picture
Revert "feat: implement AudioConcatenator package with audio processing utilities"
e9fc350
"""Audio concatenation utility for combining multiple audio chunks into a single audio file."""
import numpy as np
from typing import List, Tuple, Optional
import gradio as gr
class AudioConcatenator:
"""Handles concatenation of multiple audio chunks."""
def __init__(self, silence_duration: float = 0.5, fade_duration: float = 0.1):
"""
Initialize the audio concatenator.
Args:
silence_duration: Duration of silence between chunks (seconds)
fade_duration: Duration of fade in/out effects (seconds)
"""
self.silence_duration = silence_duration
self.fade_duration = fade_duration
def concatenate_audio_chunks(
self,
audio_chunks: List[Tuple[int, np.ndarray]],
progress_callback: Optional[callable] = None
) -> Tuple[int, np.ndarray]:
"""
Concatenate multiple audio chunks into a single audio file.
Args:
audio_chunks: List of (sample_rate, audio_data) tuples
progress_callback: Optional callback for progress updates
Returns:
Tuple of (sample_rate, concatenated_audio_data)
"""
if not audio_chunks:
raise gr.Error("No audio chunks to concatenate")
if len(audio_chunks) == 1:
return audio_chunks[0]
if progress_callback:
progress_callback(0.1, desc="Preparing audio concatenation...")
# Verify all chunks have the same sample rate
sample_rates = [chunk[0] for chunk in audio_chunks]
if len(set(sample_rates)) > 1:
raise gr.Error(f"Inconsistent sample rates found: {set(sample_rates)}. All chunks must have the same sample rate.")
sample_rate = sample_rates[0]
if progress_callback:
progress_callback(0.2, desc="Normalizing audio chunks...")
# Normalize and prepare audio data
normalized_chunks = []
for i, (_, audio_data) in enumerate(audio_chunks):
# Ensure audio data is in the correct format
if audio_data.ndim == 1:
normalized_audio = audio_data
elif audio_data.ndim == 2:
# Convert stereo to mono by averaging channels
normalized_audio = np.mean(audio_data, axis=1)
else:
raise gr.Error(f"Unsupported audio format in chunk {i + 1}: {audio_data.shape}")
# Normalize audio levels
normalized_audio = self._normalize_audio(normalized_audio)
# Apply fade effects
normalized_audio = self._apply_fade_effects(normalized_audio, sample_rate)
normalized_chunks.append(normalized_audio)
if progress_callback:
progress = 0.2 + (0.5 * (i + 1) / len(audio_chunks))
progress_callback(progress, desc=f"Processed chunk {i + 1}/{len(audio_chunks)}")
if progress_callback:
progress_callback(0.7, desc="Creating silence segments...")
# Create silence segments
silence_samples = int(self.silence_duration * sample_rate)
silence = np.zeros(silence_samples, dtype=np.float32)
if progress_callback:
progress_callback(0.8, desc="Concatenating audio segments...")
# Concatenate all chunks with silence in between
concatenated_segments = []
for i, chunk in enumerate(normalized_chunks):
concatenated_segments.append(chunk)
# Add silence between chunks (but not after the last chunk)
if i < len(normalized_chunks) - 1:
concatenated_segments.append(silence)
if progress_callback:
progress = 0.8 + (0.15 * (i + 1) / len(normalized_chunks))
progress_callback(progress, desc=f"Concatenated {i + 1}/{len(normalized_chunks)} chunks")
# Combine all segments
final_audio = np.concatenate(concatenated_segments)
if progress_callback:
progress_callback(0.95, desc="Finalizing audio...")
# Final normalization and cleanup
final_audio = self._normalize_audio(final_audio)
final_audio = self._remove_clicks_and_pops(final_audio)
if progress_callback:
progress_callback(1.0, desc="Audio concatenation complete!")
return sample_rate, final_audio
def _normalize_audio(self, audio_data: np.ndarray) -> np.ndarray:
"""Normalize audio to prevent clipping."""
# Find the maximum absolute value
max_val = np.max(np.abs(audio_data))
if max_val == 0:
return audio_data
# Normalize to 95% of maximum to leave some headroom
normalized = audio_data * (0.95 / max_val)
return normalized.astype(np.float32)
def _apply_fade_effects(self, audio_data: np.ndarray, sample_rate: int) -> np.ndarray:
"""Apply fade in and fade out effects to reduce pops and clicks."""
fade_samples = int(self.fade_duration * sample_rate)
if len(audio_data) < 2 * fade_samples:
# If audio is too short for fade effects, return as-is
return audio_data
audio_with_fades = audio_data.copy()
# Apply fade in
fade_in = np.linspace(0, 1, fade_samples)
audio_with_fades[:fade_samples] *= fade_in
# Apply fade out
fade_out = np.linspace(1, 0, fade_samples)
audio_with_fades[-fade_samples:] *= fade_out
return audio_with_fades
def _remove_clicks_and_pops(self, audio_data: np.ndarray) -> np.ndarray:
"""Apply basic filtering to remove clicks and pops."""
try:
# Simple high-pass filter to remove DC offset and low-frequency artifacts
from scipy import signal
# Design a high-pass filter (removes frequencies below 80 Hz)
# This helps remove some pops and clicks while preserving speech
sos = signal.butter(2, 80, btype='highpass', fs=22050, output='sos')
filtered_audio = signal.sosfilt(sos, audio_data)
return filtered_audio.astype(np.float32)
except ImportError:
# If scipy is not available, return audio as-is
return audio_data.astype(np.float32)
def get_concatenation_info(self, audio_chunks: List[Tuple[int, np.ndarray]]) -> dict:
"""Get information about the concatenation process."""
if not audio_chunks:
return {}
total_duration = 0
total_silence_duration = 0
chunk_durations = []
sample_rate = audio_chunks[0][0]
for _, audio_data in audio_chunks:
duration = len(audio_data) / sample_rate
chunk_durations.append(duration)
total_duration += duration
# Add silence duration (between chunks)
if len(audio_chunks) > 1:
total_silence_duration = (len(audio_chunks) - 1) * self.silence_duration
total_duration += total_silence_duration
return {
"num_chunks": len(audio_chunks),
"total_duration": total_duration,
"total_silence_duration": total_silence_duration,
"chunk_durations": chunk_durations,
"average_chunk_duration": np.mean(chunk_durations),
"sample_rate": sample_rate
}