Spaces:
Sleeping
Sleeping
| """Audio processing utilities for the TTS API.""" | |
| import re | |
| from typing import List | |
| class AudioConcatenator: | |
| """Server-side audio concatenation with GPU acceleration.""" | |
| def __init__(self, silence_duration: float = 0.5, fade_duration: float = 0.1): | |
| """ | |
| Initialize the audio concatenator. | |
| Args: | |
| silence_duration: Duration of silence between chunks (seconds) | |
| fade_duration: Duration of fade in/out effects (seconds) | |
| """ | |
| self.silence_duration = silence_duration | |
| self.fade_duration = fade_duration | |
| def concatenate_audio_chunks(self, audio_chunks: List, sample_rate: int): | |
| """ | |
| Concatenate multiple audio chunks into a single audio file. | |
| Args: | |
| audio_chunks: List of audio arrays | |
| sample_rate: Sample rate for the audio | |
| Returns: | |
| Concatenated audio array | |
| """ | |
| if not audio_chunks: | |
| raise ValueError("No audio chunks to concatenate") | |
| if len(audio_chunks) == 1: | |
| # Handle single chunk case | |
| audio = audio_chunks[0] | |
| if isinstance(audio, tuple): | |
| return audio[0] # Extract audio data from tuple | |
| return audio | |
| import numpy as np | |
| import torch | |
| # Normalize and prepare audio data | |
| normalized_chunks = [] | |
| for i, audio_data in enumerate(audio_chunks): | |
| print(f"Processing chunk {i}: type={type(audio_data)}") | |
| # Handle tuple format (common from TTS models) | |
| if isinstance(audio_data, tuple): | |
| audio_data = audio_data[0] # Extract audio array from tuple | |
| print(f" Extracted from tuple: type={type(audio_data)}") | |
| # Convert torch tensor to numpy if needed | |
| if hasattr(audio_data, 'cpu'): # It's a torch tensor | |
| audio_data = audio_data.cpu().numpy() | |
| print(f" Converted from torch: shape={audio_data.shape}") | |
| # Convert to numpy array if needed | |
| if not isinstance(audio_data, np.ndarray): | |
| audio_data = np.array(audio_data) | |
| print(f" Final shape before processing: {audio_data.shape}") | |
| # Handle different audio shapes | |
| if audio_data.ndim == 1: | |
| # Already 1D, perfect | |
| normalized_audio = audio_data | |
| elif audio_data.ndim == 2: | |
| # Handle 2D audio - could be (channels, samples) or (samples, channels) | |
| if audio_data.shape[0] < audio_data.shape[1]: | |
| # Likely (channels, samples) - take first channel | |
| normalized_audio = audio_data[0, :] | |
| print(f" Used first channel from (C, L) format: {normalized_audio.shape}") | |
| else: | |
| # Likely (samples, channels) - take first channel | |
| normalized_audio = audio_data[:, 0] | |
| print(f" Used first channel from (L, C) format: {normalized_audio.shape}") | |
| else: | |
| # Flatten higher dimensional arrays | |
| normalized_audio = audio_data.flatten() | |
| print(f" Flattened {audio_data.ndim}D array: {normalized_audio.shape}") | |
| # Ensure we have valid audio data | |
| if len(normalized_audio) == 0: | |
| print(f" Warning: Empty audio chunk {i}") | |
| continue | |
| print(f" Chunk {i} final length: {len(normalized_audio)} samples ({len(normalized_audio)/sample_rate:.2f}s)") | |
| # Normalize audio levels | |
| normalized_audio = self._normalize_audio(normalized_audio) | |
| # Apply fade effects | |
| normalized_audio = self._apply_fade_effects(normalized_audio, sample_rate) | |
| normalized_chunks.append(normalized_audio) | |
| if not normalized_chunks: | |
| raise ValueError("No valid audio chunks after processing") | |
| print(f"Successfully processed {len(normalized_chunks)} chunks") | |
| # Create silence segments | |
| silence_samples = int(self.silence_duration * sample_rate) | |
| silence = np.zeros(silence_samples, dtype=np.float32) | |
| print(f"Adding {silence_samples} silence samples ({self.silence_duration}s) between chunks") | |
| # Concatenate all chunks with silence in between | |
| concatenated_segments = [] | |
| total_audio_length = 0 | |
| for i, chunk in enumerate(normalized_chunks): | |
| concatenated_segments.append(chunk) | |
| total_audio_length += len(chunk) | |
| print(f"Added chunk {i}: {len(chunk)} samples") | |
| # Add silence between chunks (but not after the last chunk) | |
| if i < len(normalized_chunks) - 1: | |
| concatenated_segments.append(silence) | |
| total_audio_length += len(silence) | |
| print(f"Added silence: {len(silence)} samples") | |
| # Combine all segments | |
| final_audio = np.concatenate(concatenated_segments) | |
| print(f"Final concatenated audio: {len(final_audio)} samples ({len(final_audio)/sample_rate:.2f}s)") | |
| # Final normalization and cleanup | |
| final_audio = self._normalize_audio(final_audio) | |
| final_audio = self._remove_clicks_and_pops(final_audio) | |
| return final_audio | |
| def _normalize_audio(self, audio_data): | |
| """Normalize audio to prevent clipping.""" | |
| import numpy as np | |
| # Convert to numpy array if it's not already | |
| if not isinstance(audio_data, np.ndarray): | |
| audio_data = np.array(audio_data) | |
| # Ensure it's a 1D array | |
| if audio_data.ndim > 1: | |
| audio_data = audio_data.flatten() | |
| # Find the maximum absolute value | |
| max_val = np.max(np.abs(audio_data)) | |
| if max_val == 0: | |
| return audio_data | |
| # Normalize to 95% of maximum to leave some headroom | |
| normalized = audio_data * (0.95 / max_val) | |
| return normalized.astype(np.float32) | |
| def _apply_fade_effects(self, audio_data, sample_rate: int): | |
| """Apply fade in and fade out effects to reduce pops and clicks.""" | |
| import numpy as np | |
| fade_samples = int(self.fade_duration * sample_rate) | |
| if len(audio_data) < 2 * fade_samples: | |
| # If audio is too short for fade effects, return as-is | |
| return audio_data | |
| audio_with_fades = audio_data.copy() | |
| # Apply fade in | |
| fade_in = np.linspace(0, 1, fade_samples) | |
| audio_with_fades[:fade_samples] *= fade_in | |
| # Apply fade out | |
| fade_out = np.linspace(1, 0, fade_samples) | |
| audio_with_fades[-fade_samples:] *= fade_out | |
| return audio_with_fades | |
| def _remove_clicks_and_pops(self, audio_data): | |
| """Apply basic filtering to remove clicks and pops.""" | |
| try: | |
| # Simple high-pass filter to remove DC offset and low-frequency artifacts | |
| from scipy import signal | |
| import numpy as np | |
| # Design a high-pass filter (removes frequencies below 80 Hz) | |
| # This helps remove some pops and clicks while preserving speech | |
| nyquist = 22050 / 2 # Assuming common sample rate | |
| low = 80 / nyquist | |
| b, a = signal.butter(4, low, btype='high') | |
| filtered_audio = signal.filtfilt(b, a, audio_data) | |
| return filtered_audio.astype(np.float32) | |
| except ImportError: | |
| # If scipy is not available, return original audio | |
| return audio_data |