"""Audio generation functionality.""" import gradio as gr from typing import Tuple, Optional import numpy as np from .text_chunker import TextChunker from .parallel_processor import ParallelAudioProcessor from .audio_concatenator import AudioConcatenator class AudioProcessor: """Handles audio generation operations with parallel processing and chunking.""" def __init__(self, max_chunk_size: int = 800, max_workers: int = 4, silence_duration: float = 0.5, enable_parallel: bool = True): """ Initialize the audio processor. Args: max_chunk_size: Maximum characters per chunk max_workers: Maximum parallel workers silence_duration: Silence between chunks (seconds) enable_parallel: Whether to use parallel processing """ self.text_chunker = TextChunker(max_chunk_size=max_chunk_size) self.parallel_processor = ParallelAudioProcessor(max_workers=max_workers) self.audio_concatenator = AudioConcatenator(silence_duration=silence_duration) self.enable_parallel = enable_parallel def generate_audio(self, explanation_text: str, progress=None) -> Tuple[Tuple[int, np.ndarray], dict]: """ Generate TTS audio for explanations with chunking and parallel processing. Args: explanation_text: The text to convert to audio progress: Optional progress callback Returns: Tuple of (audio_result, update_dict) where audio_result is (sample_rate, audio_data) """ if not explanation_text or explanation_text.strip() == "": raise gr.Error("No explanations available to convert to audio. Please generate explanations first.") try: clean_text = explanation_text.strip() if progress: progress(0.05, desc="Analyzing text for chunking...") # Step 1: Chunk the text text_chunks = self.text_chunker.chunk_text(clean_text) chunk_info = self.text_chunker.get_chunk_info(text_chunks) if progress: progress(0.1, desc=f"Split text into {len(text_chunks)} chunks") # If only one chunk and it's small enough, use simple processing if len(text_chunks) == 1 and len(text_chunks[0]) <= 1000: if progress: progress(0.2, desc="Processing single chunk...") from .generate_tts_audio import generate_tts_audio audio_result = generate_tts_audio(text_chunks[0], None, progress=progress) if progress: progress(1.0, desc="Audio generation complete!") return audio_result, gr.update(visible=True) # Step 2: Process chunks in parallel (or sequentially if disabled) if self.enable_parallel and len(text_chunks) > 1: if progress: progress(0.15, desc="Starting parallel audio processing...") # Import the audio generation function from .generate_tts_audio import generate_tts_audio # Process chunks in parallel def progress_wrapper(p, desc=""): if progress: # Map parallel progress to 15-80% of total progress mapped_progress = 0.15 + (p * 0.65) progress(mapped_progress, desc) audio_chunks = self.parallel_processor.process_chunks_parallel( text_chunks, generate_tts_audio, progress_callback=progress_wrapper ) else: # Sequential processing for single chunk or when parallel is disabled if progress: progress(0.15, desc="Processing chunks sequentially...") from .generate_tts_audio import generate_tts_audio audio_chunks = [] for i, chunk in enumerate(text_chunks): if progress: chunk_progress = 0.15 + (0.65 * i / len(text_chunks)) progress(chunk_progress, desc=f"Processing chunk {i + 1}/{len(text_chunks)}") audio_result = generate_tts_audio(chunk, None) audio_chunks.append(audio_result) # Step 3: Concatenate audio chunks if progress: progress(0.8, desc="Concatenating audio chunks...") def concat_progress_wrapper(p, desc=""): if progress: # Map concatenation progress to 80-100% of total progress mapped_progress = 0.8 + (p * 0.2) progress(mapped_progress, desc) final_audio = self.audio_concatenator.concatenate_audio_chunks( audio_chunks, progress_callback=concat_progress_wrapper ) if progress: progress(1.0, desc=f"Generated audio from {len(text_chunks)} chunks!") return final_audio, gr.update(visible=True) except Exception as e: raise gr.Error(f"Error generating audio: {str(e)}") def generate_audio_legacy(self, explanation_text: str) -> Tuple[Tuple[int, np.ndarray], dict]: """ Legacy audio generation method (for backward compatibility). """ if not explanation_text or explanation_text.strip() == "": raise gr.Error("No explanations available to convert to audio. Please generate explanations first.") try: from .generate_tts_audio import generate_tts_audio clean_text = explanation_text.strip() # Use the original truncation logic for legacy mode if len(clean_text) > 1000: sentences = clean_text[:950].split('.') if len(sentences) > 1: clean_text = '.'.join(sentences[:-1]) + '.' else: clean_text = clean_text[:950] clean_text += " [Text has been truncated for audio generation]" audio_result = generate_tts_audio(clean_text, None) return audio_result, gr.update(visible=True) except Exception as e: raise gr.Error(f"Error generating audio: {str(e)}") def get_processing_info(self, text: str) -> dict: """Get information about how the text would be processed.""" if not text or not text.strip(): return {"error": "No text provided"} chunks = self.text_chunker.chunk_text(text.strip()) chunk_info = self.text_chunker.get_chunk_info(chunks) estimated_time = self.parallel_processor.estimate_processing_time(chunks) return { "processing_mode": "parallel" if self.enable_parallel and len(chunks) > 1 else "sequential", "chunk_info": chunk_info, "estimated_time_seconds": estimated_time, "estimated_time_readable": f"{estimated_time:.1f} seconds" if estimated_time < 60 else f"{estimated_time/60:.1f} minutes" }