Spaces:
Runtime error
Runtime error
| """ | |
| Parallel Processing Module for DittoTalkingHead | |
| Implements concurrent audio and image preprocessing | |
| """ | |
| import asyncio | |
| import concurrent.futures | |
| from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor | |
| import time | |
| from typing import Tuple, Dict, Any, Optional, Callable | |
| import numpy as np | |
| from pathlib import Path | |
| import threading | |
| import queue | |
| import torch | |
| from functools import partial | |
| class ParallelProcessor: | |
| """ | |
| Parallel processing for audio and image preprocessing | |
| """ | |
| def __init__( | |
| self, | |
| num_threads: int = 4, | |
| num_processes: int = 2, | |
| use_cuda_streams: bool = True | |
| ): | |
| """ | |
| Initialize parallel processor | |
| Args: | |
| num_threads: Number of threads for I/O operations | |
| num_processes: Number of processes for CPU-intensive tasks | |
| use_cuda_streams: Use CUDA streams for GPU operations | |
| """ | |
| self.num_threads = num_threads | |
| self.num_processes = num_processes | |
| self.use_cuda_streams = use_cuda_streams and torch.cuda.is_available() | |
| # Thread pool for I/O operations | |
| self.thread_executor = ThreadPoolExecutor(max_workers=num_threads) | |
| # Process pool for CPU-intensive operations | |
| self.process_executor = ProcessPoolExecutor(max_workers=num_processes) | |
| # CUDA streams for GPU operations | |
| if self.use_cuda_streams: | |
| self.cuda_streams = [torch.cuda.Stream() for _ in range(2)] | |
| else: | |
| self.cuda_streams = None | |
| print(f"✅ ParallelProcessor initialized: {num_threads} threads, {num_processes} processes") | |
| if self.use_cuda_streams: | |
| print("✅ CUDA streams enabled for GPU parallelism") | |
| def preprocess_audio_parallel(self, audio_path: str) -> Dict[str, Any]: | |
| """ | |
| Preprocess audio file in parallel | |
| Args: | |
| audio_path: Path to audio file | |
| Returns: | |
| Preprocessed audio data | |
| """ | |
| import librosa | |
| # Define subtasks | |
| def load_audio(): | |
| return librosa.load(audio_path, sr=16000) | |
| def extract_features(audio, sr): | |
| # Extract various audio features in parallel | |
| features = {} | |
| # MFCC features | |
| features['mfcc'] = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13) | |
| # Spectral features | |
| features['spectral_centroid'] = librosa.feature.spectral_centroid(y=audio, sr=sr) | |
| features['spectral_rolloff'] = librosa.feature.spectral_rolloff(y=audio, sr=sr) | |
| return features | |
| # Load audio | |
| audio, sr = load_audio() | |
| # Extract features in parallel (if needed) | |
| features = extract_features(audio, sr) | |
| return { | |
| 'audio': audio, | |
| 'sample_rate': sr, | |
| 'features': features, | |
| 'duration': len(audio) / sr | |
| } | |
| def preprocess_image_parallel(self, image_path: str, target_size: int = 320) -> Dict[str, Any]: | |
| """ | |
| Preprocess image file in parallel | |
| Args: | |
| image_path: Path to image file | |
| target_size: Target resolution | |
| Returns: | |
| Preprocessed image data | |
| """ | |
| from PIL import Image | |
| import cv2 | |
| # Define subtasks | |
| def load_and_resize(): | |
| # Load image | |
| img = Image.open(image_path).convert('RGB') | |
| # Resize | |
| img = img.resize((target_size, target_size), Image.Resampling.LANCZOS) | |
| return np.array(img) | |
| def extract_face_landmarks(img_array): | |
| # Face detection and landmark extraction | |
| # Simplified version - in production, use MediaPipe or similar | |
| return { | |
| 'has_face': True, | |
| 'landmarks': None # Placeholder | |
| } | |
| # Execute in parallel | |
| future_img = self.thread_executor.submit(load_and_resize) | |
| # Get results | |
| img_array = future_img.result() | |
| # Extract landmarks | |
| landmarks = extract_face_landmarks(img_array) | |
| return { | |
| 'image': img_array, | |
| 'shape': img_array.shape, | |
| 'landmarks': landmarks | |
| } | |
| async def preprocess_parallel_async( | |
| self, | |
| audio_path: str, | |
| image_path: str, | |
| target_size: int = 320 | |
| ) -> Tuple[Dict[str, Any], Dict[str, Any]]: | |
| """ | |
| Asynchronously preprocess audio and image in parallel | |
| Args: | |
| audio_path: Path to audio file | |
| image_path: Path to image file | |
| target_size: Target image resolution | |
| Returns: | |
| Tuple of (audio_data, image_data) | |
| """ | |
| loop = asyncio.get_event_loop() | |
| # Create tasks for parallel execution | |
| audio_task = loop.run_in_executor( | |
| self.thread_executor, | |
| self.preprocess_audio_parallel, | |
| audio_path | |
| ) | |
| image_task = loop.run_in_executor( | |
| self.thread_executor, | |
| partial(self.preprocess_image_parallel, target_size=target_size), | |
| image_path | |
| ) | |
| # Wait for both tasks to complete | |
| audio_data, image_data = await asyncio.gather(audio_task, image_task) | |
| return audio_data, image_data | |
| def preprocess_parallel_sync( | |
| self, | |
| audio_path: str, | |
| image_path: str, | |
| target_size: int = 320 | |
| ) -> Tuple[Dict[str, Any], Dict[str, Any]]: | |
| """ | |
| Synchronously preprocess audio and image in parallel | |
| Args: | |
| audio_path: Path to audio file | |
| image_path: Path to image file | |
| target_size: Target image resolution | |
| Returns: | |
| Tuple of (audio_data, image_data) | |
| """ | |
| # Submit tasks to thread pool | |
| audio_future = self.thread_executor.submit( | |
| self.preprocess_audio_parallel, | |
| audio_path | |
| ) | |
| image_future = self.thread_executor.submit( | |
| self.preprocess_image_parallel, | |
| image_path, | |
| target_size | |
| ) | |
| # Wait for results | |
| audio_data = audio_future.result() | |
| image_data = image_future.result() | |
| return audio_data, image_data | |
| def process_gpu_parallel( | |
| self, | |
| audio_tensor: torch.Tensor, | |
| image_tensor: torch.Tensor, | |
| model_audio: torch.nn.Module, | |
| model_image: torch.nn.Module | |
| ) -> Tuple[torch.Tensor, torch.Tensor]: | |
| """ | |
| Process audio and image through models using CUDA streams | |
| Args: | |
| audio_tensor: Audio tensor | |
| image_tensor: Image tensor | |
| model_audio: Audio processing model | |
| model_image: Image processing model | |
| Returns: | |
| Tuple of processed tensors | |
| """ | |
| if not self.use_cuda_streams: | |
| # Fallback to sequential processing | |
| audio_out = model_audio(audio_tensor) | |
| image_out = model_image(image_tensor) | |
| return audio_out, image_out | |
| # Use CUDA streams for parallel GPU processing | |
| with torch.cuda.stream(self.cuda_streams[0]): | |
| audio_out = model_audio(audio_tensor) | |
| with torch.cuda.stream(self.cuda_streams[1]): | |
| image_out = model_image(image_tensor) | |
| # Synchronize streams | |
| torch.cuda.synchronize() | |
| return audio_out, image_out | |
| def shutdown(self): | |
| """Shutdown executors""" | |
| self.thread_executor.shutdown(wait=True) | |
| self.process_executor.shutdown(wait=True) | |
| print("✅ ParallelProcessor shutdown complete") | |
| class PipelineProcessor: | |
| """ | |
| Pipeline-based processing for continuous operations | |
| """ | |
| def __init__(self, stages: Dict[str, Callable], buffer_size: int = 10): | |
| """ | |
| Initialize pipeline processor | |
| Args: | |
| stages: Dictionary of stage_name -> processing_function | |
| buffer_size: Size of queues between stages | |
| """ | |
| self.stages = stages | |
| self.buffer_size = buffer_size | |
| # Create queues between stages | |
| self.queues = {} | |
| stage_names = list(stages.keys()) | |
| for i in range(len(stage_names) - 1): | |
| queue_name = f"{stage_names[i]}_to_{stage_names[i+1]}" | |
| self.queues[queue_name] = queue.Queue(maxsize=buffer_size) | |
| # Input and output queues | |
| self.input_queue = queue.Queue(maxsize=buffer_size) | |
| self.output_queue = queue.Queue(maxsize=buffer_size) | |
| # Worker threads | |
| self.workers = [] | |
| self.stop_event = threading.Event() | |
| def _worker(self, stage_name: str, process_func: Callable, input_q: queue.Queue, output_q: queue.Queue): | |
| """Worker thread for a pipeline stage""" | |
| while not self.stop_event.is_set(): | |
| try: | |
| # Get input with timeout | |
| item = input_q.get(timeout=0.1) | |
| if item is None: # Poison pill | |
| output_q.put(None) | |
| break | |
| # Process item | |
| result = process_func(item) | |
| # Put result | |
| output_q.put(result) | |
| except queue.Empty: | |
| continue | |
| except Exception as e: | |
| print(f"Error in stage {stage_name}: {e}") | |
| output_q.put(None) | |
| def start(self): | |
| """Start pipeline processing""" | |
| stage_names = list(self.stages.keys()) | |
| # Create worker threads | |
| for i, (stage_name, process_func) in enumerate(self.stages.items()): | |
| # Determine input and output queues | |
| if i == 0: | |
| input_q = self.input_queue | |
| else: | |
| queue_name = f"{stage_names[i-1]}_to_{stage_names[i]}" | |
| input_q = self.queues[queue_name] | |
| if i == len(stage_names) - 1: | |
| output_q = self.output_queue | |
| else: | |
| queue_name = f"{stage_names[i]}_to_{stage_names[i+1]}" | |
| output_q = self.queues[queue_name] | |
| # Create and start worker | |
| worker = threading.Thread( | |
| target=self._worker, | |
| args=(stage_name, process_func, input_q, output_q) | |
| ) | |
| worker.start() | |
| self.workers.append(worker) | |
| print(f"✅ Pipeline started with {len(self.workers)} stages") | |
| def process(self, item: Any) -> Any: | |
| """Process an item through the pipeline""" | |
| self.input_queue.put(item) | |
| return self.output_queue.get() | |
| def stop(self): | |
| """Stop pipeline processing""" | |
| self.stop_event.set() | |
| # Send poison pills | |
| self.input_queue.put(None) | |
| # Wait for workers | |
| for worker in self.workers: | |
| worker.join() | |
| print("✅ Pipeline stopped") | |
| def benchmark_parallel_processing(): | |
| """Benchmark parallel vs sequential processing""" | |
| import time | |
| print("\n=== Parallel Processing Benchmark ===") | |
| # Create processor | |
| processor = ParallelProcessor(num_threads=4) | |
| # Test files (using example files) | |
| audio_path = "example/audio.wav" | |
| image_path = "example/image.png" | |
| # Sequential processing | |
| start_seq = time.time() | |
| audio_data_seq = processor.preprocess_audio_parallel(audio_path) | |
| image_data_seq = processor.preprocess_image_parallel(image_path) | |
| time_seq = time.time() - start_seq | |
| # Parallel processing | |
| start_par = time.time() | |
| audio_data_par, image_data_par = processor.preprocess_parallel_sync(audio_path, image_path) | |
| time_par = time.time() - start_par | |
| # Results | |
| print(f"Sequential processing: {time_seq:.3f}s") | |
| print(f"Parallel processing: {time_par:.3f}s") | |
| print(f"Speedup: {time_seq/time_par:.2f}x") | |
| processor.shutdown() | |
| return { | |
| 'sequential_time': time_seq, | |
| 'parallel_time': time_par, | |
| 'speedup': time_seq / time_par | |
| } |