Spaces:
Sleeping
Sleeping
| """Audio generation functionality.""" | |
| import gradio as gr | |
| from typing import Tuple, Optional | |
| import numpy as np | |
| from .text_chunker import TextChunker | |
| from .parallel_processor import ParallelAudioProcessor | |
| from .audio_concatenator import AudioConcatenator | |
| class AudioProcessor: | |
| """Handles audio generation operations with parallel processing and chunking.""" | |
| def __init__(self, | |
| max_chunk_size: int = 800, | |
| max_workers: int = 4, | |
| silence_duration: float = 0.5, | |
| enable_parallel: bool = True): | |
| """ | |
| Initialize the audio processor. | |
| Args: | |
| max_chunk_size: Maximum characters per chunk | |
| max_workers: Maximum parallel workers | |
| silence_duration: Silence between chunks (seconds) | |
| enable_parallel: Whether to use parallel processing | |
| """ | |
| self.text_chunker = TextChunker(max_chunk_size=max_chunk_size) | |
| self.parallel_processor = ParallelAudioProcessor(max_workers=max_workers) | |
| self.audio_concatenator = AudioConcatenator(silence_duration=silence_duration) | |
| self.enable_parallel = enable_parallel | |
| def generate_audio(self, explanation_text: str, progress=None) -> Tuple[Tuple[int, np.ndarray], dict]: | |
| """ | |
| Generate TTS audio for explanations with chunking and parallel processing. | |
| Args: | |
| explanation_text: The text to convert to audio | |
| progress: Optional progress callback | |
| Returns: | |
| Tuple of (audio_result, update_dict) where audio_result is (sample_rate, audio_data) | |
| """ | |
| if not explanation_text or explanation_text.strip() == "": | |
| raise gr.Error("No explanations available to convert to audio. Please generate explanations first.") | |
| try: | |
| clean_text = explanation_text.strip() | |
| if progress: | |
| progress(0.05, desc="Analyzing text for chunking...") | |
| # Step 1: Chunk the text | |
| text_chunks = self.text_chunker.chunk_text(clean_text) | |
| chunk_info = self.text_chunker.get_chunk_info(text_chunks) | |
| if progress: | |
| progress(0.1, desc=f"Split text into {len(text_chunks)} chunks") | |
| # If only one chunk and it's small enough, use simple processing | |
| if len(text_chunks) == 1 and len(text_chunks[0]) <= 1000: | |
| if progress: | |
| progress(0.2, desc="Processing single chunk...") | |
| from .generate_tts_audio import generate_tts_audio | |
| audio_result = generate_tts_audio(text_chunks[0], None, progress=progress) | |
| if progress: | |
| progress(1.0, desc="Audio generation complete!") | |
| return audio_result, gr.update(visible=True) | |
| # Step 2: Process chunks in parallel (or sequentially if disabled) | |
| if self.enable_parallel and len(text_chunks) > 1: | |
| if progress: | |
| progress(0.15, desc="Starting parallel audio processing...") | |
| # Import the audio generation function | |
| from .generate_tts_audio import generate_tts_audio | |
| # Process chunks in parallel | |
| def progress_wrapper(p, desc=""): | |
| if progress: | |
| # Map parallel progress to 15-80% of total progress | |
| mapped_progress = 0.15 + (p * 0.65) | |
| progress(mapped_progress, desc) | |
| audio_chunks = self.parallel_processor.process_chunks_parallel( | |
| text_chunks, | |
| generate_tts_audio, | |
| progress_callback=progress_wrapper | |
| ) | |
| else: | |
| # Sequential processing for single chunk or when parallel is disabled | |
| if progress: | |
| progress(0.15, desc="Processing chunks sequentially...") | |
| from .generate_tts_audio import generate_tts_audio | |
| audio_chunks = [] | |
| for i, chunk in enumerate(text_chunks): | |
| if progress: | |
| chunk_progress = 0.15 + (0.65 * i / len(text_chunks)) | |
| progress(chunk_progress, desc=f"Processing chunk {i + 1}/{len(text_chunks)}") | |
| audio_result = generate_tts_audio(chunk, None) | |
| audio_chunks.append(audio_result) | |
| # Step 3: Concatenate audio chunks | |
| if progress: | |
| progress(0.8, desc="Concatenating audio chunks...") | |
| def concat_progress_wrapper(p, desc=""): | |
| if progress: | |
| # Map concatenation progress to 80-100% of total progress | |
| mapped_progress = 0.8 + (p * 0.2) | |
| progress(mapped_progress, desc) | |
| final_audio = self.audio_concatenator.concatenate_audio_chunks( | |
| audio_chunks, | |
| progress_callback=concat_progress_wrapper | |
| ) | |
| if progress: | |
| progress(1.0, desc=f"Generated audio from {len(text_chunks)} chunks!") | |
| return final_audio, gr.update(visible=True) | |
| except Exception as e: | |
| raise gr.Error(f"Error generating audio: {str(e)}") | |
| def generate_audio_legacy(self, explanation_text: str) -> Tuple[Tuple[int, np.ndarray], dict]: | |
| """ | |
| Legacy audio generation method (for backward compatibility). | |
| """ | |
| if not explanation_text or explanation_text.strip() == "": | |
| raise gr.Error("No explanations available to convert to audio. Please generate explanations first.") | |
| try: | |
| from .generate_tts_audio import generate_tts_audio | |
| clean_text = explanation_text.strip() | |
| # Use the original truncation logic for legacy mode | |
| if len(clean_text) > 1000: | |
| sentences = clean_text[:950].split('.') | |
| if len(sentences) > 1: | |
| clean_text = '.'.join(sentences[:-1]) + '.' | |
| else: | |
| clean_text = clean_text[:950] | |
| clean_text += " [Text has been truncated for audio generation]" | |
| audio_result = generate_tts_audio(clean_text, None) | |
| return audio_result, gr.update(visible=True) | |
| except Exception as e: | |
| raise gr.Error(f"Error generating audio: {str(e)}") | |
| def get_processing_info(self, text: str) -> dict: | |
| """Get information about how the text would be processed.""" | |
| if not text or not text.strip(): | |
| return {"error": "No text provided"} | |
| chunks = self.text_chunker.chunk_text(text.strip()) | |
| chunk_info = self.text_chunker.get_chunk_info(chunks) | |
| estimated_time = self.parallel_processor.estimate_processing_time(chunks) | |
| return { | |
| "processing_mode": "parallel" if self.enable_parallel and len(chunks) > 1 else "sequential", | |
| "chunk_info": chunk_info, | |
| "estimated_time_seconds": estimated_time, | |
| "estimated_time_readable": f"{estimated_time:.1f} seconds" if estimated_time < 60 else f"{estimated_time/60:.1f} minutes" | |
| } | |