Spaces:
Sleeping
Sleeping
File size: 6,871 Bytes
91e586d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
"""Parallel audio processing for generating multiple audio chunks concurrently."""
import asyncio
import concurrent.futures
from typing import List, Tuple, Optional, Callable
import numpy as np
import gradio as gr
class ParallelAudioProcessor:
"""Handles parallel processing of multiple audio chunks."""
def __init__(self, max_workers: int = 4):
"""
Initialize the parallel processor.
Args:
max_workers: Maximum number of concurrent workers for audio generation
"""
self.max_workers = max_workers
def process_chunks_parallel(
self,
text_chunks: List[str],
audio_generator_func: Callable,
progress_callback: Optional[Callable] = None
) -> List[Tuple[int, np.ndarray]]:
"""
Process multiple text chunks in parallel to generate audio.
Args:
text_chunks: List of text chunks to process
audio_generator_func: Function to generate audio from text
progress_callback: Optional callback for progress updates
Returns:
List of tuples containing (sample_rate, audio_data) for each chunk
"""
if not text_chunks:
return []
total_chunks = len(text_chunks)
completed_chunks = 0
results = [None] * total_chunks
def update_progress(chunk_index: int, desc: str = ""):
nonlocal completed_chunks
if progress_callback:
progress = completed_chunks / total_chunks
progress_callback(progress, desc=f"Processing chunk {completed_chunks + 1}/{total_chunks}{': ' + desc if desc else ''}")
def process_single_chunk(chunk_index: int, text_chunk: str) -> Tuple[int, Tuple[int, np.ndarray]]:
"""Process a single chunk and return the result with its index."""
try:
# Create a local progress callback for this chunk
def chunk_progress(progress: float, desc: str = ""):
update_progress(chunk_index, f"Chunk {chunk_index + 1}: {desc}")
# Generate audio for this chunk
audio_result = audio_generator_func(text_chunk, None, progress=chunk_progress)
return chunk_index, audio_result
except Exception as e:
raise Exception(f"Error processing chunk {chunk_index + 1}: {str(e)}")
# Use ThreadPoolExecutor for parallel processing
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all chunks for processing
future_to_index = {
executor.submit(process_single_chunk, i, chunk): i
for i, chunk in enumerate(text_chunks)
}
# Collect results as they complete
for future in concurrent.futures.as_completed(future_to_index):
chunk_index = future_to_index[future]
try:
index, audio_result = future.result()
results[index] = audio_result
completed_chunks += 1
if progress_callback:
progress = completed_chunks / total_chunks
progress_callback(
progress,
desc=f"Completed {completed_chunks}/{total_chunks} audio chunks"
)
except Exception as e:
raise gr.Error(f"Failed to process chunk {chunk_index + 1}: {str(e)}")
# Filter out any None results (shouldn't happen, but just in case)
valid_results = [result for result in results if result is not None]
if len(valid_results) != total_chunks:
raise gr.Error(f"Only {len(valid_results)} out of {total_chunks} chunks processed successfully")
return valid_results
async def process_chunks_async(
self,
text_chunks: List[str],
audio_generator_func: Callable,
progress_callback: Optional[Callable] = None
) -> List[Tuple[int, np.ndarray]]:
"""
Async version of parallel chunk processing.
Args:
text_chunks: List of text chunks to process
audio_generator_func: Function to generate audio from text
progress_callback: Optional callback for progress updates
Returns:
List of tuples containing (sample_rate, audio_data) for each chunk
"""
if not text_chunks:
return []
async def process_chunk_async(chunk_index: int, text_chunk: str):
"""Process a single chunk asynchronously."""
loop = asyncio.get_event_loop()
def chunk_progress(progress: float, desc: str = ""):
if progress_callback:
progress_callback(
(chunk_index + progress) / len(text_chunks),
desc=f"Chunk {chunk_index + 1}: {desc}"
)
# Run the audio generation in a thread pool
audio_result = await loop.run_in_executor(
None,
lambda: audio_generator_func(text_chunk, None, progress=chunk_progress)
)
return chunk_index, audio_result
# Create tasks for all chunks
tasks = [
process_chunk_async(i, chunk)
for i, chunk in enumerate(text_chunks)
]
# Process all chunks concurrently
try:
results = await asyncio.gather(*tasks)
# Sort results by chunk index to maintain order
results.sort(key=lambda x: x[0])
return [result[1] for result in results]
except Exception as e:
raise gr.Error(f"Error in async processing: {str(e)}")
def estimate_processing_time(self, text_chunks: List[str], avg_time_per_char: float = 0.1) -> float:
"""
Estimate total processing time for all chunks.
Args:
text_chunks: List of text chunks
avg_time_per_char: Average processing time per character (seconds)
Returns:
Estimated processing time in seconds
"""
total_chars = sum(len(chunk) for chunk in text_chunks)
sequential_time = total_chars * avg_time_per_char
# Account for parallelization
parallel_efficiency = min(len(text_chunks), self.max_workers) / len(text_chunks) if text_chunks else 1
estimated_time = sequential_time * parallel_efficiency
return estimated_time
|