Spaces:
Sleeping
Sleeping
File size: 7,582 Bytes
4ce49c4 91e586d 4ce49c4 91e586d 4ce49c4 91e586d 4ce49c4 91e586d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
"""Audio generation functionality."""
import gradio as gr
from typing import Tuple, Optional
import numpy as np
from .text_chunker import TextChunker
from .parallel_processor import ParallelAudioProcessor
from .audio_concatenator import AudioConcatenator
class AudioProcessor:
"""Handles audio generation operations with parallel processing and chunking."""
def __init__(self,
max_chunk_size: int = 800,
max_workers: int = 4,
silence_duration: float = 0.5,
enable_parallel: bool = True):
"""
Initialize the audio processor.
Args:
max_chunk_size: Maximum characters per chunk
max_workers: Maximum parallel workers
silence_duration: Silence between chunks (seconds)
enable_parallel: Whether to use parallel processing
"""
self.text_chunker = TextChunker(max_chunk_size=max_chunk_size)
self.parallel_processor = ParallelAudioProcessor(max_workers=max_workers)
self.audio_concatenator = AudioConcatenator(silence_duration=silence_duration)
self.enable_parallel = enable_parallel
def generate_audio(self, explanation_text: str, progress=None) -> Tuple[Tuple[int, np.ndarray], dict]:
"""
Generate TTS audio for explanations with chunking and parallel processing.
Args:
explanation_text: The text to convert to audio
progress: Optional progress callback
Returns:
Tuple of (audio_result, update_dict) where audio_result is (sample_rate, audio_data)
"""
if not explanation_text or explanation_text.strip() == "":
raise gr.Error("No explanations available to convert to audio. Please generate explanations first.")
try:
clean_text = explanation_text.strip()
if progress:
progress(0.05, desc="Analyzing text for chunking...")
# Step 1: Chunk the text
text_chunks = self.text_chunker.chunk_text(clean_text)
chunk_info = self.text_chunker.get_chunk_info(text_chunks)
if progress:
progress(0.1, desc=f"Split text into {len(text_chunks)} chunks")
# If only one chunk and it's small enough, use simple processing
if len(text_chunks) == 1 and len(text_chunks[0]) <= 1000:
if progress:
progress(0.2, desc="Processing single chunk...")
from .generate_tts_audio import generate_tts_audio
audio_result = generate_tts_audio(text_chunks[0], None, progress=progress)
if progress:
progress(1.0, desc="Audio generation complete!")
return audio_result, gr.update(visible=True)
# Step 2: Process chunks in parallel (or sequentially if disabled)
if self.enable_parallel and len(text_chunks) > 1:
if progress:
progress(0.15, desc="Starting parallel audio processing...")
# Import the audio generation function
from .generate_tts_audio import generate_tts_audio
# Process chunks in parallel
def progress_wrapper(p, desc=""):
if progress:
# Map parallel progress to 15-80% of total progress
mapped_progress = 0.15 + (p * 0.65)
progress(mapped_progress, desc)
audio_chunks = self.parallel_processor.process_chunks_parallel(
text_chunks,
generate_tts_audio,
progress_callback=progress_wrapper
)
else:
# Sequential processing for single chunk or when parallel is disabled
if progress:
progress(0.15, desc="Processing chunks sequentially...")
from .generate_tts_audio import generate_tts_audio
audio_chunks = []
for i, chunk in enumerate(text_chunks):
if progress:
chunk_progress = 0.15 + (0.65 * i / len(text_chunks))
progress(chunk_progress, desc=f"Processing chunk {i + 1}/{len(text_chunks)}")
audio_result = generate_tts_audio(chunk, None)
audio_chunks.append(audio_result)
# Step 3: Concatenate audio chunks
if progress:
progress(0.8, desc="Concatenating audio chunks...")
def concat_progress_wrapper(p, desc=""):
if progress:
# Map concatenation progress to 80-100% of total progress
mapped_progress = 0.8 + (p * 0.2)
progress(mapped_progress, desc)
final_audio = self.audio_concatenator.concatenate_audio_chunks(
audio_chunks,
progress_callback=concat_progress_wrapper
)
if progress:
progress(1.0, desc=f"Generated audio from {len(text_chunks)} chunks!")
return final_audio, gr.update(visible=True)
except Exception as e:
raise gr.Error(f"Error generating audio: {str(e)}")
def generate_audio_legacy(self, explanation_text: str) -> Tuple[Tuple[int, np.ndarray], dict]:
"""
Legacy audio generation method (for backward compatibility).
"""
if not explanation_text or explanation_text.strip() == "":
raise gr.Error("No explanations available to convert to audio. Please generate explanations first.")
try:
from .generate_tts_audio import generate_tts_audio
clean_text = explanation_text.strip()
# Use the original truncation logic for legacy mode
if len(clean_text) > 1000:
sentences = clean_text[:950].split('.')
if len(sentences) > 1:
clean_text = '.'.join(sentences[:-1]) + '.'
else:
clean_text = clean_text[:950]
clean_text += " [Text has been truncated for audio generation]"
audio_result = generate_tts_audio(clean_text, None)
return audio_result, gr.update(visible=True)
except Exception as e:
raise gr.Error(f"Error generating audio: {str(e)}")
def get_processing_info(self, text: str) -> dict:
"""Get information about how the text would be processed."""
if not text or not text.strip():
return {"error": "No text provided"}
chunks = self.text_chunker.chunk_text(text.strip())
chunk_info = self.text_chunker.get_chunk_info(chunks)
estimated_time = self.parallel_processor.estimate_processing_time(chunks)
return {
"processing_mode": "parallel" if self.enable_parallel and len(chunks) > 1 else "sequential",
"chunk_info": chunk_info,
"estimated_time_seconds": estimated_time,
"estimated_time_readable": f"{estimated_time:.1f} seconds" if estimated_time < 60 else f"{estimated_time/60:.1f} minutes"
}
|