pdf_explainer / src /processors /audio_processor.py
spagestic's picture
feat: Update audio processing to support parallel chunking and enhance text chunking logic
91e586d
"""Audio generation functionality."""
import gradio as gr
from typing import Tuple, Optional
import numpy as np
from .text_chunker import TextChunker
from .parallel_processor import ParallelAudioProcessor
from .audio_concatenator import AudioConcatenator
class AudioProcessor:
"""Handles audio generation operations with parallel processing and chunking."""
def __init__(self,
max_chunk_size: int = 800,
max_workers: int = 4,
silence_duration: float = 0.5,
enable_parallel: bool = True):
"""
Initialize the audio processor.
Args:
max_chunk_size: Maximum characters per chunk
max_workers: Maximum parallel workers
silence_duration: Silence between chunks (seconds)
enable_parallel: Whether to use parallel processing
"""
self.text_chunker = TextChunker(max_chunk_size=max_chunk_size)
self.parallel_processor = ParallelAudioProcessor(max_workers=max_workers)
self.audio_concatenator = AudioConcatenator(silence_duration=silence_duration)
self.enable_parallel = enable_parallel
def generate_audio(self, explanation_text: str, progress=None) -> Tuple[Tuple[int, np.ndarray], dict]:
"""
Generate TTS audio for explanations with chunking and parallel processing.
Args:
explanation_text: The text to convert to audio
progress: Optional progress callback
Returns:
Tuple of (audio_result, update_dict) where audio_result is (sample_rate, audio_data)
"""
if not explanation_text or explanation_text.strip() == "":
raise gr.Error("No explanations available to convert to audio. Please generate explanations first.")
try:
clean_text = explanation_text.strip()
if progress:
progress(0.05, desc="Analyzing text for chunking...")
# Step 1: Chunk the text
text_chunks = self.text_chunker.chunk_text(clean_text)
chunk_info = self.text_chunker.get_chunk_info(text_chunks)
if progress:
progress(0.1, desc=f"Split text into {len(text_chunks)} chunks")
# If only one chunk and it's small enough, use simple processing
if len(text_chunks) == 1 and len(text_chunks[0]) <= 1000:
if progress:
progress(0.2, desc="Processing single chunk...")
from .generate_tts_audio import generate_tts_audio
audio_result = generate_tts_audio(text_chunks[0], None, progress=progress)
if progress:
progress(1.0, desc="Audio generation complete!")
return audio_result, gr.update(visible=True)
# Step 2: Process chunks in parallel (or sequentially if disabled)
if self.enable_parallel and len(text_chunks) > 1:
if progress:
progress(0.15, desc="Starting parallel audio processing...")
# Import the audio generation function
from .generate_tts_audio import generate_tts_audio
# Process chunks in parallel
def progress_wrapper(p, desc=""):
if progress:
# Map parallel progress to 15-80% of total progress
mapped_progress = 0.15 + (p * 0.65)
progress(mapped_progress, desc)
audio_chunks = self.parallel_processor.process_chunks_parallel(
text_chunks,
generate_tts_audio,
progress_callback=progress_wrapper
)
else:
# Sequential processing for single chunk or when parallel is disabled
if progress:
progress(0.15, desc="Processing chunks sequentially...")
from .generate_tts_audio import generate_tts_audio
audio_chunks = []
for i, chunk in enumerate(text_chunks):
if progress:
chunk_progress = 0.15 + (0.65 * i / len(text_chunks))
progress(chunk_progress, desc=f"Processing chunk {i + 1}/{len(text_chunks)}")
audio_result = generate_tts_audio(chunk, None)
audio_chunks.append(audio_result)
# Step 3: Concatenate audio chunks
if progress:
progress(0.8, desc="Concatenating audio chunks...")
def concat_progress_wrapper(p, desc=""):
if progress:
# Map concatenation progress to 80-100% of total progress
mapped_progress = 0.8 + (p * 0.2)
progress(mapped_progress, desc)
final_audio = self.audio_concatenator.concatenate_audio_chunks(
audio_chunks,
progress_callback=concat_progress_wrapper
)
if progress:
progress(1.0, desc=f"Generated audio from {len(text_chunks)} chunks!")
return final_audio, gr.update(visible=True)
except Exception as e:
raise gr.Error(f"Error generating audio: {str(e)}")
def generate_audio_legacy(self, explanation_text: str) -> Tuple[Tuple[int, np.ndarray], dict]:
"""
Legacy audio generation method (for backward compatibility).
"""
if not explanation_text or explanation_text.strip() == "":
raise gr.Error("No explanations available to convert to audio. Please generate explanations first.")
try:
from .generate_tts_audio import generate_tts_audio
clean_text = explanation_text.strip()
# Use the original truncation logic for legacy mode
if len(clean_text) > 1000:
sentences = clean_text[:950].split('.')
if len(sentences) > 1:
clean_text = '.'.join(sentences[:-1]) + '.'
else:
clean_text = clean_text[:950]
clean_text += " [Text has been truncated for audio generation]"
audio_result = generate_tts_audio(clean_text, None)
return audio_result, gr.update(visible=True)
except Exception as e:
raise gr.Error(f"Error generating audio: {str(e)}")
def get_processing_info(self, text: str) -> dict:
"""Get information about how the text would be processed."""
if not text or not text.strip():
return {"error": "No text provided"}
chunks = self.text_chunker.chunk_text(text.strip())
chunk_info = self.text_chunker.get_chunk_info(chunks)
estimated_time = self.parallel_processor.estimate_processing_time(chunks)
return {
"processing_mode": "parallel" if self.enable_parallel and len(chunks) > 1 else "sequential",
"chunk_info": chunk_info,
"estimated_time_seconds": estimated_time,
"estimated_time_readable": f"{estimated_time:.1f} seconds" if estimated_time < 60 else f"{estimated_time/60:.1f} minutes"
}